diff --git a/source/java/org/alfresco/repo/node/ConcurrentNodeServiceTest.java b/source/java/org/alfresco/repo/node/ConcurrentNodeServiceTest.java index 33690e7a86..8b2eadb4f7 100644 --- a/source/java/org/alfresco/repo/node/ConcurrentNodeServiceTest.java +++ b/source/java/org/alfresco/repo/node/ConcurrentNodeServiceTest.java @@ -61,9 +61,9 @@ public class ConcurrentNodeServiceTest extends TestCase public static final QName PROP_QNAME_TEST_MIMETYPE = QName.createQName(NAMESPACE, "mimetype"); - public static final int COUNT = 0; + public static final int COUNT = 10; - public static final int REPEATS = 10; + public static final int REPEATS = 20; static ApplicationContext ctx = ApplicationContextHelper.getApplicationContext(); @@ -145,52 +145,52 @@ public class ConcurrentNodeServiceTest extends TestCase }); } - public void test1() throws Exception + public void xtest1() throws Exception { testConcurrent(); } - public void test2() throws Exception + public void xtest2() throws Exception { testConcurrent(); } - public void test3() throws Exception + public void xtest3() throws Exception { testConcurrent(); } - public void test4() throws Exception + public void xtest4() throws Exception { testConcurrent(); } - public void test5() throws Exception + public void xtest5() throws Exception { testConcurrent(); } - public void test6() throws Exception + public void xtest6() throws Exception { testConcurrent(); } - public void test7() throws Exception + public void xtest7() throws Exception { testConcurrent(); } - public void test8() throws Exception + public void xtest8() throws Exception { testConcurrent(); } - public void test9() throws Exception + public void xtest9() throws Exception { testConcurrent(); } - public void test10() throws Exception + public void xtest10() throws Exception { testConcurrent(); } @@ -235,45 +235,37 @@ public class ConcurrentNodeServiceTest extends TestCase assertEquals(2 * ((COUNT * REPEATS) + 1), searcher.selectNodes(rootNodeRef, "/*", null, getNamespacePrefixReolsver(""), false).size()); ResultSet results = null; - try - { - results = searcher.query(rootNodeRef.getStoreRef(), "lucene", "PATH:\"/*\""); - // n6 has root aspect - there are three things at the root level in the - // index - assertEquals(3 * ((COUNT * REPEATS) + 1), results.length()); - results.close(); - results = searcher.query(rootNodeRef.getStoreRef(), "lucene", "PATH:\"/*/*\""); - // n6 has root aspect - there are three things at the root level in the - // index - assertEquals(3 * ((COUNT * REPEATS) + 1), results.length()); - results.close(); + results = searcher.query(rootNodeRef.getStoreRef(), "lucene", "PATH:\"/*\""); + // n6 has root aspect - there are three things at the root level in the + // index + assertEquals(3 * ((COUNT * REPEATS) + 1), results.length()); + results.close(); - results = searcher.query(rootNodeRef.getStoreRef(), "lucene", "PATH:\"/*/*/*\""); - // n6 has root aspect - there are three things at the root level in the - // index - assertEquals(2 * ((COUNT * REPEATS) + 1), results.length()); - results.close(); + results = searcher.query(rootNodeRef.getStoreRef(), "lucene", "PATH:\"/*/*\""); + // n6 has root aspect - there are three things at the root level in the + // index + assertEquals(3 * ((COUNT * REPEATS) + 1), results.length()); + results.close(); - results = searcher.query(rootNodeRef.getStoreRef(), "lucene", "PATH:\"/*/*/*/*\""); - // n6 has root aspect - there are three things at the root level in the - // index - assertEquals(1 * ((COUNT * REPEATS) + 1), results.length()); - results.close(); + results = searcher.query(rootNodeRef.getStoreRef(), "lucene", "PATH:\"/*/*/*\""); + // n6 has root aspect - there are three things at the root level in the + // index + assertEquals(2 * ((COUNT * REPEATS) + 1), results.length()); + results.close(); + + results = searcher.query(rootNodeRef.getStoreRef(), "lucene", "PATH:\"/*/*/*/*\""); + // n6 has root aspect - there are three things at the root level in the + // index + assertEquals(1 * ((COUNT * REPEATS) + 1), results.length()); + results.close(); + + results = searcher.query(rootNodeRef.getStoreRef(), "lucene", "PATH:\"/*/*/*/*/*\""); + // n6 has root aspect - there are three things at the root level in the + // index + assertEquals(0 * ((COUNT * REPEATS) + 1), results.length()); + results.close(); - results = searcher.query(rootNodeRef.getStoreRef(), "lucene", "PATH:\"/*/*/*/*/*\""); - // n6 has root aspect - there are three things at the root level in the - // index - assertEquals(0 * ((COUNT * REPEATS) + 1), results.length()); - results.close(); - } - finally - { - if (results != null) - { - results.close(); - } - } return null; } diff --git a/source/java/org/alfresco/repo/search/impl/lucene/FilterIndexReaderByNodeRefs2.java b/source/java/org/alfresco/repo/search/impl/lucene/FilterIndexReaderByNodeRefs2.java index a04dec7aed..0e094f53c6 100644 --- a/source/java/org/alfresco/repo/search/impl/lucene/FilterIndexReaderByNodeRefs2.java +++ b/source/java/org/alfresco/repo/search/impl/lucene/FilterIndexReaderByNodeRefs2.java @@ -22,6 +22,8 @@ import java.util.Set; import org.alfresco.error.AlfrescoRuntimeException; import org.alfresco.service.cmr.repository.NodeRef; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.lucene.document.Document; import org.apache.lucene.index.FilterIndexReader; import org.apache.lucene.index.IndexReader; @@ -36,13 +38,24 @@ import org.apache.lucene.search.TermQuery; public class FilterIndexReaderByNodeRefs2 extends FilterIndexReader { - BitSet deletedDocuments; + private static Log s_logger = LogFactory.getLog(FilterIndexReaderByNodeRefs2.class); - public FilterIndexReaderByNodeRefs2(IndexReader reader, Set deletions, boolean deleteNodesOnly) + BitSet deletedDocuments; + + private String id; + + public FilterIndexReaderByNodeRefs2(String id, IndexReader reader, Set deletions, boolean deleteNodesOnly) { super(reader); + this.id = id; + deletedDocuments = new BitSet(reader.maxDoc()); + if (s_logger.isDebugEnabled()) + { + s_logger.debug("Applying deletions FOR "+id +" (the index ito which these are applied is the previous one ...)"); + } + try { if (!deleteNodesOnly) @@ -73,16 +86,17 @@ public class FilterIndexReaderByNodeRefs2 extends FilterIndexReader { deletedDocuments.set(hits.id(i), true); // There should only be one thing to delete - //break; + // break; } } } - + } } } catch (IOException e) { + s_logger.error("Error initialising "+id); throw new AlfrescoRuntimeException("Failed to construct filtering index reader", e); } } @@ -92,8 +106,10 @@ public class FilterIndexReaderByNodeRefs2 extends FilterIndexReader BitSet deletedDocuments; protected TermDocs in; + + String id; - public FilterTermDocs(TermDocs in, BitSet deletedDocuments) + public FilterTermDocs(String id, TermDocs in, BitSet deletedDocuments) { this.in = in; this.deletedDocuments = deletedDocuments; @@ -125,15 +141,23 @@ public class FilterIndexReaderByNodeRefs2 extends FilterIndexReader public boolean next() throws IOException { - while (in.next()) - { - if (!deletedDocuments.get(in.doc())) + try + { + while (in.next()) { - // Not masked - return true; + if (!deletedDocuments.get(in.doc())) + { + // Not masked + return true; + } } - } - return false; + return false; + } + catch(IOException ioe) + { + s_logger.error("Error reading docs for "+id); + throw ioe; + } } public int read(int[] docs, int[] freqs) throws IOException @@ -211,9 +235,9 @@ public class FilterIndexReaderByNodeRefs2 extends FilterIndexReader public static class FilterTermPositions extends FilterTermDocs implements TermPositions { - public FilterTermPositions(TermPositions in, BitSet deletedDocuements) + public FilterTermPositions(String id, TermPositions in, BitSet deletedDocuements) { - super(in, deletedDocuements); + super(id, in, deletedDocuements); } public int nextPosition() throws IOException @@ -231,12 +255,12 @@ public class FilterIndexReaderByNodeRefs2 extends FilterIndexReader @Override public TermDocs termDocs() throws IOException { - return new FilterTermDocs(super.termDocs(), deletedDocuments); + return new FilterTermDocs(id, super.termDocs(), deletedDocuments); } @Override public TermPositions termPositions() throws IOException { - return new FilterTermPositions(super.termPositions(), deletedDocuments); + return new FilterTermPositions(id, super.termPositions(), deletedDocuments); } } diff --git a/source/java/org/alfresco/repo/search/impl/lucene/index/IndexInfo.java b/source/java/org/alfresco/repo/search/impl/lucene/index/IndexInfo.java index 83e0f6cbe8..6509bb1fc3 100644 --- a/source/java/org/alfresco/repo/search/impl/lucene/index/IndexInfo.java +++ b/source/java/org/alfresco/repo/search/impl/lucene/index/IndexInfo.java @@ -51,7 +51,8 @@ import org.alfresco.repo.search.impl.lucene.FilterIndexReaderByNodeRefs2; import org.alfresco.repo.search.impl.lucene.analysis.AlfrescoStandardAnalyser; import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.util.GUID; -import org.apache.log4j.Logger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; @@ -106,7 +107,7 @@ public class IndexInfo /** * The logger. */ - private static Logger s_logger = Logger.getLogger(IndexInfo.class); + private static Log s_logger = LogFactory.getLog(IndexInfo.class); /** * Use NIO memory mapping to wite the index control file. @@ -294,12 +295,13 @@ public class IndexInfo /** * Control if the merger thread is active */ + private boolean enableMergerThread = true; static { // We do not require any of the lucene in-built locking. - System.setProperty("disableLuceneLocks", "true"); + FSDirectory.setDisableLocks(true); } /** @@ -320,10 +322,15 @@ public class IndexInfo { indexInfo = new IndexInfo(canonicalFile); indexInfos.put(canonicalFile, indexInfo); + if (s_logger.isDebugEnabled()) + { + s_logger.debug("Made " + indexInfo + " for " + file.getAbsolutePath()); + } } - if(s_logger.isDebugEnabled()) + + if (s_logger.isDebugEnabled()) { - s_logger.debug("Got "+indexInfo +" for "+file.getAbsolutePath()); + s_logger.debug("Got " + indexInfo + " for " + file.getAbsolutePath()); } return indexInfo; } @@ -375,7 +382,8 @@ public class IndexInfo this.indexInfoBackupRAF = openFile(indexInfoBackupFile); this.indexInfoBackupChannel = this.indexInfoBackupRAF.getChannel(); - // If the index found no info files (i.e. it is new), check if there is an old style index and covert it. + // If the index found no info files (i.e. it is new), check if there is + // an old style index and covert it. if (version == 0) { // Check if an old style index exists @@ -439,7 +447,8 @@ public class IndexInfo { setStatusFromFile(); - // If the index is not shared we can do some easy clean up + // If the index is not shared we can do some easy clean + // up if (!indexIsShared) { HashSet deletable = new HashSet(); @@ -449,7 +458,8 @@ public class IndexInfo switch (entry.getStatus()) { // states which can be deleted - // We could check prepared states can be committed. + // We could check prepared states can be + // committed. case ACTIVE: case MARKED_ROLLBACK: case NO_TRANSACTION: @@ -467,7 +477,8 @@ public class IndexInfo entry.setStatus(TransactionStatus.DELETABLE); deletable.add(entry.getName()); break; - // States which are in mid-transition which we can roll back to the committed state + // States which are in mid-transition which we + // can roll back to the committed state case COMMITTED_DELETING: case MERGE: if (s_logger.isInfoEnabled()) @@ -475,8 +486,11 @@ public class IndexInfo s_logger.info("Resetting merge to committed " + entry); } entry.setStatus(TransactionStatus.COMMITTED); + registerReferenceCountingIndexReader(entry.getName(), + buildReferenceCountingIndexReader(entry.getName())); break; - // Complete committing (which is post database commit) + // Complete committing (which is post database + // commit) case COMMITTING: // do the commit if (s_logger.isInfoEnabled()) @@ -484,10 +498,15 @@ public class IndexInfo s_logger.info("Committing " + entry); } entry.setStatus(TransactionStatus.COMMITTED); + registerReferenceCountingIndexReader(entry.getName(), + buildReferenceCountingIndexReader(entry.getName())); mainIndexReader = null; break; // States that require no action case COMMITTED: + registerReferenceCountingIndexReader(entry.getName(), + buildReferenceCountingIndexReader(entry.getName())); + break; default: // nothing to do break; @@ -592,7 +611,8 @@ public class IndexInfo { IndexReader reader; File location = ensureDeltaIsRegistered(id); - // Create a dummy index reader to deal with empty indexes and not persist these. + // Create a dummy index reader to deal with empty indexes and not + // persist these. if (IndexReader.indexExists(location)) { reader = IndexReader.open(location); @@ -618,7 +638,8 @@ public class IndexInfo throw new IndexerException("\"null\" is not a valid identifier for a transaction"); } - // A write lock is required if we have to update the local index entries. + // A write lock is required if we have to update the local index + // entries. // There should only be one thread trying to access this delta. File location = new File(indexDirectory, id).getCanonicalFile(); getReadLock(); @@ -727,7 +748,8 @@ public class IndexInfo throw new IndexerException("\"null\" is not a valid identifier for a transaction"); } - // No lock required as the delta applied to one thread. The delta is still active. + // No lock required as the delta applied to one thread. The delta is + // still active. IndexReader reader = indexReaders.remove(id); if (reader != null) { @@ -748,7 +770,8 @@ public class IndexInfo throw new IndexerException("\"null\" is not a valid identifier for a transaction"); } - // No lock required as the delta applied to one thread. The delta is still active. + // No lock required as the delta applied to one thread. The delta is + // still active. IndexWriter writer = indexWriters.remove(id); if (writer != null) { @@ -1019,7 +1042,8 @@ public class IndexInfo } // Combine the index delta with the main index // Make sure the index is written to disk - // TODO: Should use the in memory index but we often end up forcing to disk anyway. + // TODO: Should use the in memory index but we often end up forcing + // to disk anyway. // Is it worth it? // luceneIndexer.flushPending(); @@ -1032,7 +1056,8 @@ public class IndexInfo else { reader = new MultiReader(new IndexReader[] { - new FilterIndexReaderByNodeRefs2(mainIndexReader, deletions, deleteOnlyNodes), deltaReader }); + new FilterIndexReaderByNodeRefs2("main+id", mainIndexReader, deletions, deleteOnlyNodes), + deltaReader }); } reader = ReferenceCountingReadOnlyIndexReaderFactory.createReader("MainReader" + id, reader); ReferenceCounting refCounting = (ReferenceCounting) reader; @@ -1219,45 +1244,44 @@ public class IndexInfo if (TransactionStatus.PREPARED.follows(entry.getStatus())) { - if ((entry.getDeletions() + entry.getDocumentCount()) > 0) - { - LinkedHashMap reordered = new LinkedHashMap(); - boolean addedPreparedEntry = false; - for (IndexEntry current : indexEntries.values()) - { - if (!current.getStatus().canBeReordered()) - { - reordered.put(current.getName(), current); - } - else if (!addedPreparedEntry) - { - reordered.put(entry.getName(), entry); - reordered.put(current.getName(), current); - addedPreparedEntry = true; - } - else if (current.getName().equals(entry.getName())) - { - // skip as we are moving it - } - else - { - reordered.put(current.getName(), current); - } - } - if (indexEntries.size() != reordered.size()) - { - indexEntries = reordered; - dumpInfo(); - throw new IndexerException("Concurrent modification error"); - } - indexEntries = reordered; - } - entry.setStatus(TransactionStatus.PREPARED); - if ((entry.getDeletions() + entry.getDocumentCount()) > 0) + LinkedHashMap reordered = new LinkedHashMap(); + boolean addedPreparedEntry = false; + for (String key : indexEntries.keySet()) { - writeStatus(); + IndexEntry current = indexEntries.get(key); + + if (!current.getStatus().canBeReordered()) + { + reordered.put(current.getName(), current); + } + else if (!addedPreparedEntry) + { + reordered.put(entry.getName(), entry); + reordered.put(current.getName(), current); + addedPreparedEntry = true; + } + else if (current.getName().equals(entry.getName())) + { + // skip as we are moving it + } + else + { + reordered.put(current.getName(), current); + } } + + if (indexEntries.size() != reordered.size()) + { + indexEntries = reordered; + dumpInfo(); + throw new IndexerException("Concurrent modification error"); + } + indexEntries = reordered; + + entry.setStatus(TransactionStatus.PREPARED); + writeStatus(); + } else { @@ -1306,12 +1330,15 @@ public class IndexInfo private class CommittedTransition implements Transition { + + ThreadLocal tl = new ThreadLocal(); + public void beforeWithReadLock(String id, Set toDelete, Set read) throws IOException { // Make sure we have set up the reader for the data // ... and close it so we do not up the ref count - - getReferenceCountingIndexReader(id).close(); + closeDelta(id); + tl.set(buildReferenceCountingIndexReader(id)); } public void transition(String id, Set toDelete, Set read) throws IOException @@ -1331,6 +1358,7 @@ public class IndexInfo } else { + registerReferenceCountingIndexReader(id, tl.get()); entry.setStatus(TransactionStatus.COMMITTED); // TODO: optimise to index for no deletions // have to allow for this in the application of deletions, @@ -1649,13 +1677,13 @@ public class IndexInfo { try { - reader = new MultiReader(new IndexReader[] { - new FilterIndexReaderByNodeRefs2(reader, getDeletions(entry.getName()), entry - .isDeletOnlyNodes()), subReader }); + reader = new MultiReader(new IndexReader[] { + new FilterIndexReaderByNodeRefs2(id, reader, getDeletions(entry.getName()), entry + .isDeletOnlyNodes()), subReader }); } - catch(IOException ioe) + catch (IOException ioe) { - s_logger.error("Failed building filter reader beneath "+entry.getName()); + s_logger.error("Failed building filter reader beneath " + entry.getName(), ioe); throw ioe; } } @@ -1675,23 +1703,39 @@ public class IndexInfo IndexReader reader = referenceCountingReadOnlyIndexReaders.get(id); if (reader == null) { - File location = new File(indexDirectory, id).getCanonicalFile(); - if (IndexReader.indexExists(location)) - { - reader = IndexReader.open(location); - } - else - { - reader = IndexReader.open(emptyIndex); - } - reader = ReferenceCountingReadOnlyIndexReaderFactory.createReader(id, reader); - referenceCountingReadOnlyIndexReaders.put(id, reader); + throw new IllegalStateException("Indexer should have been pre-built for " + id); } ReferenceCounting referenceCounting = (ReferenceCounting) reader; referenceCounting.incrementReferenceCount(); return reader; } + private void registerReferenceCountingIndexReader(String id, IndexReader reader) + { + ReferenceCounting referenceCounting = (ReferenceCounting) reader; + if(!referenceCounting.getId().equals(id)) + { + throw new IllegalStateException("Registering "+referenceCounting.getId()+ " as "+id); + } + referenceCountingReadOnlyIndexReaders.put(id, reader); + } + + private IndexReader buildReferenceCountingIndexReader(String id) throws IOException + { + IndexReader reader; + File location = new File(indexDirectory, id).getCanonicalFile(); + if (IndexReader.indexExists(location)) + { + reader = IndexReader.open(location); + } + else + { + reader = IndexReader.open(emptyIndex); + } + reader = ReferenceCountingReadOnlyIndexReaderFactory.createReader(id, reader); + return reader; + } + private boolean checkVersion() throws IOException { try @@ -1769,8 +1813,10 @@ public class IndexInfo int size = buffer.getInt(); crc32.update(size); LinkedHashMap newIndexEntries = new LinkedHashMap(); - // Not all state is saved some is specific to this index so we need to add the transient stuff. - // Until things are committed they are not shared unless it is prepared + // Not all state is saved some is specific to this index so we + // need to add the transient stuff. + // Until things are committed they are not shared unless it is + // prepared for (int i = 0; i < size; i++) { String indexTypeString = readString(buffer, crc32); @@ -2100,6 +2146,20 @@ public class IndexInfo { s_logger.debug("Deleting no longer referenced " + refCounting.getId()); s_logger.debug("... queued delete for " + refCounting.getId()); + s_logger.debug("... " + + ReferenceCountingReadOnlyIndexReaderFactory.getState(refCounting.getId())); + } + getReadLock(); + try + { + if (indexEntries.containsKey(refCounting.getId())) + { + s_logger.error("ERROR - deleting live reader - " + refCounting.getId()); + } + } + finally + { + releaseReadLock(); } deleteQueue.add(refCounting.getId()); } @@ -2119,6 +2179,7 @@ public class IndexInfo if (s_logger.isDebugEnabled()) { s_logger.debug("Expunging " + id + " remaining " + deleteQueue.size()); + s_logger.debug("... " + ReferenceCountingReadOnlyIndexReaderFactory.getState(id)); } // try and delete File location = new File(indexDirectory, id).getCanonicalFile(); @@ -2144,7 +2205,14 @@ public class IndexInfo try { // wait for more deletes - this.wait(); + if (deleteQueue.size() > 0) + { + this.wait(20000); + } + else + { + this.wait(); + } } catch (InterruptedException e) { @@ -2338,7 +2406,7 @@ public class IndexInfo } catch (Throwable t) { - s_logger.error(t); + s_logger.error("??", t); } } @@ -2378,6 +2446,7 @@ public class IndexInfo // Check it is not deleting for (IndexEntry entry : indexEntries.values()) { + // skip indexes at the start if (entry.getType() == IndexType.DELTA) { if (entry.getStatus() == TransactionStatus.COMMITTED) @@ -2385,8 +2454,10 @@ public class IndexInfo entry.setStatus(TransactionStatus.COMMITTED_DELETING); set.put(entry.getName(), entry); } - else if (entry.getStatus() == TransactionStatus.PREPARED) + else { + // If not committed we stop as we can not + // span non committed. break; } } @@ -2473,7 +2544,8 @@ public class IndexInfo { reader.deleteDocument(hits.id(i)); invalidIndexes.add(key); - // There should only be one thing to delete + // There should only be one thing to + // delete // break; } } @@ -2483,8 +2555,27 @@ public class IndexInfo } else { - if (reader.deleteDocuments(new Term("ID", nodeRef.toString())) > 0) + int deletedCount = 0; + try { + deletedCount = reader.deleteDocuments(new Term("ID", nodeRef.toString())); + } + catch (IOException ioe) + { + if (s_logger.isDebugEnabled()) + { + s_logger.debug("IO Error for " + key); + throw ioe; + } + } + if (deletedCount > 0) + { + if (s_logger.isDebugEnabled()) + { + s_logger.debug("Deleted " + + deletedCount + " from " + key + " for id " + nodeRef.toString() + + " remaining docs " + reader.numDocs()); + } invalidIndexes.add(key); } } @@ -2504,12 +2595,22 @@ public class IndexInfo readers.put(currentDelete.getName(), reader); } - for (String key : readers.keySet()) + // Close all readers holding the write lock - so no one tries to + // read + getWriteLock(); + try { - IndexReader reader = readers.get(key); - // TODO:Set the new document count - newIndexCounts.put(key, new Long(reader.numDocs())); - reader.close(); + for (String key : readers.keySet()) + { + IndexReader reader = readers.get(key); + // TODO:Set the new document count + newIndexCounts.put(key, new Long(reader.numDocs())); + reader.close(); + } + } + finally + { + releaseWriteLock(); } } catch (IOException e) @@ -2518,6 +2619,24 @@ public class IndexInfo fail = true; } + // Prebuild all readers for affected indexes + // Register them in the commit. + + final HashMap newReaders = new HashMap(); + try + { + for (String id : invalidIndexes) + { + IndexReader reader = buildReferenceCountingIndexReader(id); + newReaders.put(id, reader); + } + } + catch (IOException ioe) + { + s_logger.error("Failed build new readers", ioe); + fail = true; + } + final boolean wasDeleted = !fail; getWriteLock(); try @@ -2559,6 +2678,11 @@ public class IndexInfo } } } + for (String id : invalidIndexes) + { + IndexReader newReader = newReaders.get(id); + registerReferenceCountingIndexReader(id, newReader); + } if (invalidIndexes.size() > 0) { if (mainIndexReader != null) @@ -2700,6 +2824,8 @@ public class IndexInfo boolean fail = false; + String mergeTargetId = null; + try { if (toMerge.size() > 0) @@ -2722,6 +2848,7 @@ public class IndexInfo } else { + s_logger.error("Index is missing " + entry.getName()); reader = IndexReader.open(emptyIndex); } readers[count++] = reader; @@ -2729,6 +2856,7 @@ public class IndexInfo } else if (entry.getStatus() == TransactionStatus.MERGE_TARGET) { + mergeTargetId = entry.getName(); outputLocation = location; if (docCount < maxDocsForInMemoryMerge) { @@ -2786,6 +2914,27 @@ public class IndexInfo fail = true; } + final String finalMergeTargetId = mergeTargetId; + IndexReader newReader = null; + getReadLock(); + try + { + try + { + newReader = buildReferenceCountingIndexReader(mergeTargetId); + } + catch (IOException e) + { + s_logger.error("Failed to open reader for merge target", e); + fail = true; + } + } + finally + { + releaseReadLock(); + } + + final IndexReader finalNewReader = newReader; final boolean wasMerged = !fail; getWriteLock(); try @@ -2841,6 +2990,8 @@ public class IndexInfo indexEntries.remove(id); } + registerReferenceCountingIndexReader(finalMergeTargetId, finalNewReader); + dumpInfo(); writeStatus(); @@ -2899,6 +3050,7 @@ public class IndexInfo { if (s_logger.isDebugEnabled()) { + int count = 0; StringBuilder builder = new StringBuilder(); readWriteLock.writeLock().lock(); try @@ -2907,7 +3059,7 @@ public class IndexInfo builder.append("Entry List\n"); for (IndexEntry entry : indexEntries.values()) { - builder.append(" " + entry.toString()).append("\n"); + builder.append(++count + " " + entry.toString()).append("\n"); } s_logger.debug(builder.toString()); } diff --git a/source/java/org/alfresco/repo/search/impl/lucene/index/ReferenceCountingReadOnlyIndexReaderFactory.java b/source/java/org/alfresco/repo/search/impl/lucene/index/ReferenceCountingReadOnlyIndexReaderFactory.java index dcb6774f61..40cbefe08d 100644 --- a/source/java/org/alfresco/repo/search/impl/lucene/index/ReferenceCountingReadOnlyIndexReaderFactory.java +++ b/source/java/org/alfresco/repo/search/impl/lucene/index/ReferenceCountingReadOnlyIndexReaderFactory.java @@ -17,16 +17,54 @@ package org.alfresco.repo.search.impl.lucene.index; import java.io.IOException; +import java.util.HashMap; +import org.alfresco.util.EqualsHelper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.log4j.Logger; import org.apache.lucene.index.FilterIndexReader; import org.apache.lucene.index.IndexReader; public class ReferenceCountingReadOnlyIndexReaderFactory { + private static Log s_logger = LogFactory.getLog(ReferenceCountingReadOnlyIndexReaderFactory.class); + + private static HashMap log = new HashMap(); + public static IndexReader createReader(String id, IndexReader indexReader) { - return new ReferenceCountingReadOnlyIndexReader(id, indexReader); + ReferenceCountingReadOnlyIndexReader rc = new ReferenceCountingReadOnlyIndexReader(id, indexReader); + if (s_logger.isDebugEnabled()) + { + if (log.containsKey(id)) + { + s_logger.debug("Replacing ref counting reader for " + id ); + } + s_logger.debug("Created ref counting reader for " + id +" "+rc.toString()); + log.put(id, rc); + } + return rc; + } + + public static String getState(String id) + { + if (s_logger.isDebugEnabled()) + { + ReferenceCountingReadOnlyIndexReader rc = log.get(id); + if (rc != null) + { + StringBuilder builder = new StringBuilder(); + builder + .append("Id = " + + rc.getId() + " Invalid = " + rc.getReferenceCount() + " invalid = " + + rc.getInvalidForReuse()); + return builder.toString(); + } + + } + return (""); + } public static class ReferenceCountingReadOnlyIndexReader extends FilterIndexReader implements ReferenceCounting @@ -42,6 +80,8 @@ public class ReferenceCountingReadOnlyIndexReaderFactory boolean invalidForReuse = false; boolean allowsDeletions; + + boolean closed = false; ReferenceCountingReadOnlyIndexReader(String id, IndexReader indexReader) { @@ -51,11 +91,15 @@ public class ReferenceCountingReadOnlyIndexReaderFactory public synchronized void incrementReferenceCount() { + if(closed) + { + throw new IllegalStateException(Thread.currentThread().getName() + "Indexer is closed "+id); + } refCount++; if (s_logger.isDebugEnabled()) { s_logger.debug(Thread.currentThread().getName() - + ": Reader " + id + " - increment - ref count is " + refCount); + + ": Reader " + id + " - increment - ref count is " + refCount + " ... "+super.toString()); } } @@ -65,9 +109,13 @@ public class ReferenceCountingReadOnlyIndexReaderFactory if (s_logger.isDebugEnabled()) { s_logger.debug(Thread.currentThread().getName() - + ": Reader " + id + " - decrement - ref count is " + refCount); + + ": Reader " + id + " - decrement - ref count is " + refCount + " ... "+super.toString()); } closeIfRequired(); + if (refCount < 0) + { + s_logger.error("Invalid reference count for Reader " + id + " is " + refCount + " ... "+super.toString()); + } } private void closeIfRequired() throws IOException @@ -76,9 +124,10 @@ public class ReferenceCountingReadOnlyIndexReaderFactory { if (s_logger.isDebugEnabled()) { - s_logger.debug(Thread.currentThread().getName() + ": Reader " + id + " closed."); + s_logger.debug(Thread.currentThread().getName() + ": Reader " + id + " closed." + " ... "+super.toString()); } in.close(); + closed = true; } else { @@ -86,7 +135,7 @@ public class ReferenceCountingReadOnlyIndexReaderFactory { s_logger.debug(Thread.currentThread().getName() + ": Reader " + id + " still open .... ref = " + refCount + " invalidForReuse = " - + invalidForReuse); + + invalidForReuse + " ... "+super.toString()); } } } @@ -96,12 +145,21 @@ public class ReferenceCountingReadOnlyIndexReaderFactory return refCount; } + public synchronized boolean getInvalidForReuse() + { + return invalidForReuse; + } + public synchronized void setInvalidForReuse() throws IOException { + if(closed) + { + throw new IllegalStateException(Thread.currentThread().getName() +"Indexer is closed "+id); + } invalidForReuse = true; if (s_logger.isDebugEnabled()) { - s_logger.debug(Thread.currentThread().getName() + ": Reader " + id + " set invalid for reuse"); + s_logger.debug(Thread.currentThread().getName() + ": Reader " + id + " set invalid for reuse" + " ... "+super.toString()); } closeIfRequired(); } @@ -111,7 +169,11 @@ public class ReferenceCountingReadOnlyIndexReaderFactory { if (s_logger.isDebugEnabled()) { - s_logger.debug(Thread.currentThread().getName() + ": Reader " + id + " closing"); + s_logger.debug(Thread.currentThread().getName() + ": Reader " + id + " closing" + " ... "+super.toString()); + } + if(closed) + { + throw new IllegalStateException(Thread.currentThread().getName() +"Indexer is closed "+id); } decrementReferenceCount(); } @@ -127,5 +189,27 @@ public class ReferenceCountingReadOnlyIndexReaderFactory return id; } + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (!(o instanceof ReferenceCountingReadOnlyIndexReader)) + { + return false; + } + ReferenceCountingReadOnlyIndexReader other = (ReferenceCountingReadOnlyIndexReader) o; + return EqualsHelper.nullSafeEquals(this.getId(), other.getId()); + + } + + @Override + public int hashCode() + { + return getId().hashCode(); + } + } }