Fix for ALF-786: WCM Cluster / Lucene: Searching in staging returns duplicate entries (when concurrently submtting)

- fixed duplication arising from indexing 0-1 and 0-2 against the index for near simultaneous snapshots 
- tracking has not been modified nor has index rebuild - the latest index info from the AVM index is definitive and complete - we do not have to do additional work as we do for DM

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@19898 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Andrew Hind
2010-04-19 14:11:47 +00:00
parent 16c4b49359
commit 0d3f558e11
19 changed files with 1366 additions and 239 deletions

View File

@@ -18,8 +18,11 @@
*/
package org.alfresco.repo.search.impl.lucene;
import java.util.List;
import org.alfresco.repo.search.BackgroundIndexerAware;
import org.alfresco.repo.search.IndexMode;
import org.alfresco.service.cmr.avm.VersionDescriptor;
/**
* AVM specific indxer support
@@ -56,17 +59,6 @@ public interface AVMLuceneIndexer extends LuceneIndexer, BackgroundIndexerAware
*/
public void createIndex(String store, IndexMode mode);
/**
* Get the id of the last snapshot added to the index
* @param store
*
* @param mode
* - IndexMode.SYNCHRONOUS - the last searchable snapshop
* - IndexMode.ASYNCHRONOUS - the last pending snapshot to be indexed. It may or may not be searchable.
* @return - the snapshot id
*/
public int getLastIndexedSnapshot(String store);
/**
* Is the snapshot applied to the index?
*
@@ -99,4 +91,12 @@ public interface AVMLuceneIndexer extends LuceneIndexer, BackgroundIndexerAware
* @return
*/
public long getIndexedDocCount();
/**
* Get the last snapshot in the index - this does not mean that all snapshots before it have been indexed.
*
* @param store
* @return
*/
public int getLastIndexedSnapshot(String store);
}

View File

@@ -59,6 +59,7 @@ import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
import org.alfresco.service.cmr.avm.AVMException;
import org.alfresco.service.cmr.avm.AVMNodeDescriptor;
import org.alfresco.service.cmr.avm.AVMService;
import org.alfresco.service.cmr.avm.VersionDescriptor;
import org.alfresco.service.cmr.avmsync.AVMDifference;
import org.alfresco.service.cmr.avmsync.AVMSyncException;
import org.alfresco.service.cmr.avmsync.AVMSyncService;
@@ -129,7 +130,7 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
private int startVersion = -1;
private int endVersion = -1;
private long indexedDocCount = 0;
/**
@@ -281,7 +282,12 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
}
else
{
index(difference.getDestinationPath());
if (s_logger.isDebugEnabled())
{
s_logger.debug("new: ("+srcVersion+", "+dstVersion+") "+difference.getDestinationPath());
}
// AR-786
reindex(difference.getDestinationPath(), dstDesc.isDirectory());
if (dstDesc.isDirectory())
{
indexDirectory(dstDesc);
@@ -289,13 +295,6 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
reindexAllAncestors(difference.getDestinationPath());
}
}
// New Delete
else if (!srcDesc.isDeleted() && ((dstDesc == null) || dstDesc.isDeleted()))
{
delete(difference.getSourcePath());
delete(difference.getDestinationPath());
reindexAllAncestors(difference.getDestinationPath());
}
// Existing delete
else if (srcDesc.isDeleted())
{
@@ -311,7 +310,12 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
{
// We are back from the dead ...the node used to be deleted
// Treat as new
index(difference.getDestinationPath());
// AR-786
if (s_logger.isDebugEnabled())
{
s_logger.debug("back: ("+srcVersion+", "+dstVersion+") "+difference.getDestinationPath());
}
reindex(difference.getDestinationPath(), dstDesc.isDirectory());
if (dstDesc.isDirectory())
{
indexDirectory(dstDesc);
@@ -319,18 +323,49 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
reindexAllAncestors(difference.getDestinationPath());
}
}
// Anything else then we reindex
// Anything else then we have a delete or reindex
else
{
if (!difference.getSourcePath().equals(difference.getDestinationPath()))
// Unknown end state ...
if(dstDesc == null)
{
// we do not know what has happened - whatever it is does not apply to this snapshot
if (s_logger.isDebugEnabled())
{
s_logger.debug("unknown: ("+srcVersion+", "+dstVersion+") "+difference.getDestinationPath());
}
}
// Delete
else if (dstDesc.isDeleted())
{
if (s_logger.isDebugEnabled())
{
s_logger.debug("delete: ("+srcVersion+", "+dstVersion+") "+difference.getDestinationPath());
}
delete(difference.getSourcePath());
reindexAllAncestors(difference.getSourcePath());
delete(difference.getDestinationPath());
reindexAllAncestors(difference.getDestinationPath());
}
// move
else if (!difference.getSourcePath().equals(difference.getDestinationPath()))
{
if (s_logger.isDebugEnabled())
{
s_logger.debug("move: ("+srcVersion+", "+dstVersion+") "+difference.getDestinationPath());
}
reindex(difference.getSourcePath(), srcDesc.isDirectory());
reindex(difference.getDestinationPath(), dstDesc.isDirectory());
reindexAllAncestors(difference.getSourcePath());
reindexAllAncestors(difference.getDestinationPath());
}
// update
else
{
if (s_logger.isDebugEnabled())
{
s_logger.debug("update: ("+srcVersion+", "+dstVersion+") "+difference.getDestinationPath());
}
// If it is a directory, it is at the same path,
// so no cascade update is required for the bridge table data.
reindex(difference.getDestinationPath(), false);
@@ -382,10 +417,12 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
private void indexDirectory(AVMNodeDescriptor dir)
{
Map<String, AVMNodeDescriptor> children = avmService.getDirectoryListing(dir);
Map<String, AVMNodeDescriptor> children = avmService.getDirectoryListing(dir, false);
for (AVMNodeDescriptor child : children.values())
{
index(child.getPath());
// AR-786
reindex(child.getPath(), child.isDirectory());
reindexAllAncestors(child.getPath());
if (child.isDirectory())
{
indexDirectory(child);
@@ -542,7 +579,7 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
}
else
// not a root node
// not a root node
{
xdoc.add(new Field("QNAME", qNameBuffer.toString(), Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.NO));
@@ -755,7 +792,7 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
if (isContent)
{
// Content is always tokenised
ContentData contentData = DefaultTypeConverter.INSTANCE.convert(ContentData.class, serializableValue);
if (!index || contentData.getMimetype() == null)
{
@@ -878,7 +915,7 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
}
}
else
// URL not present (null reader) or no content at the URL (file missing)
// URL not present (null reader) or no content at the URL (file missing)
{
// log it
if (s_logger.isDebugEnabled())
@@ -898,20 +935,20 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
if (index)
{
switch (tokenise) {
case TRUE:
case BOTH:
default:
fieldIndex = Field.Index.TOKENIZED;
break;
case FALSE:
fieldIndex = Field.Index.UN_TOKENIZED;
break;
switch (tokenise) {
case TRUE:
case BOTH:
default:
fieldIndex = Field.Index.TOKENIZED;
break;
case FALSE:
fieldIndex = Field.Index.UN_TOKENIZED;
break;
}
} else {
fieldIndex = Field.Index.NO;
}
}
} else {
fieldIndex = Field.Index.NO;
}
if ((fieldIndex != Field.Index.NO) || (fieldStore != Field.Store.NO))
{
@@ -956,7 +993,7 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
{
doc.add(new Field(attributeName, t.termText(), Field.Store.NO, Field.Index.NO_NORMS, Field.TermVector.NO));
}
doc.add(new Field(attributeName, t.termText(), Field.Store.NO, Field.Index.NO_NORMS, Field.TermVector.NO));
}
@@ -1027,7 +1064,7 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
{
locale = I18NUtil.getLocale();
}
StringBuilder builder;
MLAnalysisMode analysisMode;
VerbatimAnalyser vba;
@@ -1063,7 +1100,7 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
{
doc.add(new Field(attributeName, t.termText(), Field.Store.NO, Field.Index.NO_NORMS, Field.TermVector.NO));
}
doc.add(new Field(attributeName, t.termText(), Field.Store.NO, Field.Index.NO_NORMS, Field.TermVector.NO));
}
}
@@ -1132,7 +1169,7 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
break;
case BOTH:
doc.add(new Field(attributeName, strValue, fieldStore, fieldIndex, Field.TermVector.NO));
df = CachingDateFormat.getDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS", true);
try
{
@@ -1161,15 +1198,15 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
@Override
protected void doPrepare() throws IOException
{
AuthenticationUtil.runAs(new RunAsWork<String>()
{
AuthenticationUtil.runAs(new RunAsWork<String>()
{
public String doWork() throws Exception
{
saveDelta();
flushPending();
return null;
saveDelta();
flushPending();
return null;
}
}, AuthenticationUtil.getSystemUserName());
}, AuthenticationUtil.getSystemUserName());
}
@Override
@@ -1542,6 +1579,301 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
this.fullTextSearchIndexer = fullTextSearchIndexer;
}
public boolean isSnapshotIndexed(String store, int id)
{
if (id == 0)
{
return hasIndexBeenCreated(store);
}
else
{
return isSynchronousSnapshotPresent(store, id) || isSynchronousSnapshotPresent(store, id);
}
}
private boolean isSynchronousSnapshotPresent(String store, int id)
{
return isSynchronousSnapshotPresent(store, IndexChannel.MAIN, id) || isSynchronousSnapshotPresent(store, IndexChannel.DELTA, id) ;
}
private boolean isAsynchronousSnapshotPresent(String store, int id)
{
return isAsynchronousSnapshotPresent(store, IndexChannel.MAIN, id) || isAsynchronousSnapshotPresent(store, IndexChannel.DELTA, id) ;
}
public boolean isSnapshotSearchable(String store, int id)
{
if (id == 0)
{
return hasIndexBeenCreated(store);
}
else
{
return isSynchronousSnapshotPresent(store, id);
}
}
private boolean isSynchronousSnapshotPresent(String store, IndexChannel channel, int snapshot)
{
String prefix = SNAP_SHOT_ID + ":" + store + ":";
IndexReader reader = null;
try
{
if (channel == IndexChannel.DELTA)
{
flushPending();
reader = getDeltaReader();
}
else
{
reader = getReader();
}
TermEnum terms = null;
try
{
terms = reader.terms(new Term("ID", prefix));
if (terms.term() != null)
{
do
{
Term term = terms.term();
if (term.text().startsWith(prefix))
{
TermDocs docs = null;
try
{
docs = reader.termDocs(term);
if (docs.next())
{
String[] split = term.text().split(":");
int test = Integer.parseInt(split[3]);
if (test == snapshot)
{
return true;
}
}
}
finally
{
if (docs != null)
{
docs.close();
}
}
}
else
{
break;
}
}
while (terms.next());
}
}
finally
{
if (terms != null)
{
terms.close();
}
}
return false;
}
catch (IOException e)
{
throw new AlfrescoRuntimeException("IO error", e);
}
finally
{
try
{
if (reader != null)
{
if (channel == IndexChannel.DELTA)
{
closeDeltaReader();
}
else
{
reader.close();
}
}
}
catch (IOException e)
{
s_logger.warn("Failed to close main reader", e);
}
}
}
private boolean isAsynchronousSnapshotPresent(String store, IndexChannel channel, int snapshot)
{
String prefix = "\u0000BG:STORE:" + store + ":";
IndexReader reader = null;
try
{
if (channel == IndexChannel.DELTA)
{
flushPending();
reader = getDeltaReader();
}
else
{
reader = getReader();
}
TermEnum terms = null;
try
{
terms = reader.terms(new Term("ID", prefix));
if (terms.term() != null)
{
do
{
Term term = terms.term();
if (term.text().startsWith(prefix))
{
TermDocs docs = null;
try
{
docs = reader.termDocs(term);
if (docs.next())
{
String[] split = term.text().split(":");
int test = Integer.parseInt(split[4]);
if (test == snapshot)
{
return true;
}
}
}
finally
{
if (docs != null)
{
docs.close();
}
}
}
else
{
break;
}
}
while (terms.next());
}
}
finally
{
if (terms != null)
{
terms.close();
}
}
return false;
}
catch (IOException e)
{
throw new AlfrescoRuntimeException("IO error", e);
}
finally
{
try
{
if (reader != null)
{
if (channel == IndexChannel.DELTA)
{
closeDeltaReader();
}
else
{
reader.close();
}
}
}
catch (IOException e)
{
s_logger.warn("Failed to close main reader", e);
}
}
}
public boolean hasIndexBeenCreated(String store)
{
return hasIndexBeenCreatedimpl(store, IndexChannel.MAIN) || hasIndexBeenCreatedimpl(store, IndexChannel.DELTA);
}
public boolean hasIndexBeenCreatedimpl(String store, IndexChannel channel)
{
IndexReader reader = null;
try
{
if (channel == IndexChannel.DELTA)
{
flushPending();
reader = getDeltaReader();
}
else
{
reader = getReader();
}
TermDocs termDocs = null;
try
{
termDocs = reader.termDocs(new Term("ISROOT", "T"));
return termDocs.next();
}
finally
{
if (termDocs != null)
{
termDocs.close();
}
}
}
catch (IOException e)
{
throw new AlfrescoRuntimeException("IO error", e);
}
finally
{
try
{
if (reader != null)
{
if (channel == IndexChannel.DELTA)
{
closeDeltaReader();
}
else
{
reader.close();
}
}
}
catch (IOException e)
{
s_logger.warn("Failed to close main reader", e);
}
}
}
public synchronized long getIndexedDocCount()
{
return indexedDocCount;
}
private synchronized void incrementDocCount()
{
indexedDocCount++;
}
public int getLastIndexedSnapshot(String store)
{
int last = getLastAsynchronousSnapshot(store);
@@ -1556,31 +1888,7 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
}
return hasIndexBeenCreated(store) ? 0 : -1;
}
public boolean isSnapshotIndexed(String store, int id)
{
if (id == 0)
{
return hasIndexBeenCreated(store);
}
else
{
return (id <= getLastAsynchronousSnapshot(store)) || (id <= getLastSynchronousSnapshot(store));
}
}
public boolean isSnapshotSearchable(String store, int id)
{
if (id == 0)
{
return hasIndexBeenCreated(store);
}
else
{
return (id <= getLastSynchronousSnapshot(store));
}
}
private int getLastSynchronousSnapshot(String store)
{
int answer = getLastSynchronousSnapshot(store, IndexChannel.DELTA);
@@ -1798,76 +2106,4 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl<String> impl
}
}
}
public boolean hasIndexBeenCreated(String store)
{
return hasIndexBeenCreatedimpl(store, IndexChannel.MAIN) || hasIndexBeenCreatedimpl(store, IndexChannel.DELTA);
}
public boolean hasIndexBeenCreatedimpl(String store, IndexChannel channel)
{
IndexReader reader = null;
try
{
if (channel == IndexChannel.DELTA)
{
flushPending();
reader = getDeltaReader();
}
else
{
reader = getReader();
}
TermDocs termDocs = null;
try
{
termDocs = reader.termDocs(new Term("ISROOT", "T"));
return termDocs.next();
}
finally
{
if (termDocs != null)
{
termDocs.close();
}
}
}
catch (IOException e)
{
throw new AlfrescoRuntimeException("IO error", e);
}
finally
{
try
{
if (reader != null)
{
if (channel == IndexChannel.DELTA)
{
closeDeltaReader();
}
else
{
reader.close();
}
}
}
catch (IOException e)
{
s_logger.warn("Failed to close main reader", e);
}
}
}
public synchronized long getIndexedDocCount()
{
return indexedDocCount;
}
private synchronized void incrementDocCount()
{
indexedDocCount++;
}
}