diff --git a/config/alfresco/authentication-services-context.xml b/config/alfresco/authentication-services-context.xml index ea5c2bcc18..32fcba35c5 100644 --- a/config/alfresco/authentication-services-context.xml +++ b/config/alfresco/authentication-services-context.xml @@ -213,6 +213,9 @@ + + + diff --git a/config/alfresco/bootstrap-context.xml b/config/alfresco/bootstrap-context.xml index 0600983779..00c64aaded 100644 --- a/config/alfresco/bootstrap-context.xml +++ b/config/alfresco/bootstrap-context.xml @@ -381,6 +381,21 @@ + + + + + + + + + + + + + + + @@ -404,21 +419,6 @@ - - - - - - - - - - - - - - - diff --git a/config/alfresco/core-services-context.xml b/config/alfresco/core-services-context.xml index a3040311ae..a85fc04226 100644 --- a/config/alfresco/core-services-context.xml +++ b/config/alfresco/core-services-context.xml @@ -267,6 +267,18 @@ ${server.transaction.allow-writes} + + ${server.transaction.max-retries} + + + ${server.transaction.min-retry-wait-ms} + + + ${server.transaction.max-retry-wait-ms} + + + ${server.transaction.wait-increment-ms} + @@ -276,6 +288,15 @@ ${server.transaction.max-retries} + + ${server.transaction.min-retry-wait-ms} + + + ${server.transaction.max-retry-wait-ms} + + + ${server.transaction.wait-increment-ms} + @@ -459,13 +480,16 @@ - + 10 - - 5 - + + 20 + + + 7 + @@ -999,10 +1023,13 @@ - + 2 + + 10 + diff --git a/config/alfresco/domain/transaction.properties b/config/alfresco/domain/transaction.properties index f5c65ba4a4..a4fe290b7b 100644 --- a/config/alfresco/domain/transaction.properties +++ b/config/alfresco/domain/transaction.properties @@ -9,3 +9,6 @@ server.transaction.mode.default=PROPAGATION_REQUIRED server.transaction.allow-writes=true server.transaction.max-retries=20 +server.transaction.min-retry-wait-ms=100 +server.transaction.max-retry-wait-ms=2000 +server.transaction.wait-increment-ms=100 \ No newline at end of file diff --git a/config/alfresco/extension/custom-repository.properties.sample b/config/alfresco/extension/custom-repository.properties.sample index 7b636ecdf4..245f573ca6 100644 --- a/config/alfresco/extension/custom-repository.properties.sample +++ b/config/alfresco/extension/custom-repository.properties.sample @@ -16,9 +16,10 @@ #db.pool.max=100 # -# Sample index tracking frequency +# Activate index tracking and recovery # #index.tracking.cronExpression=0/5 * * * * ? +#index.recovery.mode=AUTO # # Property to control whether schema updates are performed automatically. diff --git a/config/alfresco/index-recovery-context.xml b/config/alfresco/index-recovery-context.xml index 568c28242a..365f0c570c 100644 --- a/config/alfresco/index-recovery-context.xml +++ b/config/alfresco/index-recovery-context.xml @@ -3,7 +3,25 @@ + + + + ${index.recovery.maximumPoolSize} + + + ${index.recovery.maximumPoolSize} + + + 100 + + + + + + @@ -38,6 +56,9 @@ ${index.recovery.stopOnError} + + ${index.tracking.maxTransactionsPerLuceneCommit} + @@ -68,6 +89,9 @@ id="admIndexTrackerComponent" class="org.alfresco.repo.node.index.IndexTransactionTracker" parent="indexRecoveryComponentBase"> + + + ${index.tracking.maxTxnDurationMinutes} @@ -77,6 +101,12 @@ ${index.tracking.maxRecordSetSize} + + ${index.tracking.maxTransactionsPerLuceneCommit} + + + ${index.tracking.disableInTransactionIndexing} + diff --git a/config/alfresco/repository.properties b/config/alfresco/repository.properties index ae70309917..c16ef6ff61 100644 --- a/config/alfresco/repository.properties +++ b/config/alfresco/repository.properties @@ -26,8 +26,9 @@ dir.indexes.lock=${dir.indexes}/locks # AUTO: Validates and auto-recovers if validation fails # FULL: Full index rebuild, processing all transactions in order. The server is temporarily suspended. index.recovery.mode=VALIDATE -# Force FULL recovery to stop when encountering errors -index.recovery.stopOnError=true +# FULL recovery continues when encountering errors +index.recovery.stopOnError=false +index.recovery.maximumPoolSize=5 # Set the frequency with which the index tracking is triggered. # For more information on index tracking in a cluster: # http://wiki.alfresco.com/wiki/High_Availability_Configuration_V1.4_to_V2.1#Version_1.4.5.2C_2.1.1_and_later @@ -40,9 +41,11 @@ index.tracking.cronExpression=* * * * * ? 2099 index.tracking.adm.cronExpression=${index.tracking.cronExpression} index.tracking.avm.cronExpression=${index.tracking.cronExpression} # Other properties. -index.tracking.maxTxnDurationMinutes=60 +index.tracking.maxTxnDurationMinutes=10 index.tracking.reindexLagMs=1000 index.tracking.maxRecordSetSize=1000 +index.tracking.maxTransactionsPerLuceneCommit=100 +index.tracking.disableInTransactionIndexing=false # Change the failure behaviour of the configuration checker system.bootstrap.config_check.strict=true diff --git a/source/java/org/alfresco/repo/cache/CacheTest.java b/source/java/org/alfresco/repo/cache/CacheTest.java index acc3de142c..51c24381d0 100644 --- a/source/java/org/alfresco/repo/cache/CacheTest.java +++ b/source/java/org/alfresco/repo/cache/CacheTest.java @@ -25,6 +25,7 @@ package org.alfresco.repo.cache; import java.io.Serializable; +import java.sql.SQLException; import java.util.Collection; import javax.transaction.Status; @@ -36,12 +37,14 @@ import net.sf.ehcache.CacheManager; import org.alfresco.error.AlfrescoRuntimeException; import org.alfresco.repo.transaction.AlfrescoTransactionSupport; import org.alfresco.repo.transaction.TransactionListenerAdapter; +import org.alfresco.repo.transaction.RetryingTransactionHelper; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.ServiceRegistry; import org.alfresco.service.transaction.TransactionService; import org.alfresco.util.ApplicationContextHelper; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.dao.DataAccessException; /** * @see org.alfresco.repo.cache.EhCacheAdapter @@ -136,6 +139,32 @@ public class CacheTest extends TestCase assertNull("Non-transactional remove didn't go to backing cache", backingCache.get(key)); } + public void testRollbackCleanup() throws Exception + { + TransactionService transactionService = serviceRegistry.getTransactionService(); + RetryingTransactionHelper txnHelper = transactionService.getRetryingTransactionHelper(); + + RetryingTransactionCallback callback = new RetryingTransactionCallback() + { + private int throwCount = 0; + public Object execute() throws Throwable + { + String key = "B"; + String value = "BBB"; + // no transaction - do a put + transactionalCache.put(key, value); + // Blow up + if (throwCount < 5) + { + throwCount++; + throw new SQLException("Dummy"); + } + return null; + } + }; + txnHelper.doInTransaction(callback); + } + public void testTransactionalCacheWithSingleTxn() throws Throwable { String newGlobalOne = "new_global_one"; diff --git a/source/java/org/alfresco/repo/dictionary/DictionaryModelType.java b/source/java/org/alfresco/repo/dictionary/DictionaryModelType.java index ac438be7c1..e70a4fc92c 100644 --- a/source/java/org/alfresco/repo/dictionary/DictionaryModelType.java +++ b/source/java/org/alfresco/repo/dictionary/DictionaryModelType.java @@ -43,7 +43,7 @@ import org.alfresco.repo.tenant.Tenant; import org.alfresco.repo.tenant.TenantDeployerService; import org.alfresco.repo.tenant.TenantService; import org.alfresco.repo.transaction.AlfrescoTransactionSupport; -import org.alfresco.repo.transaction.TransactionListener; +import org.alfresco.repo.transaction.TransactionListenerAdapter; import org.alfresco.repo.workflow.BPMEngineRegistry; import org.alfresco.service.cmr.dictionary.AspectDefinition; import org.alfresco.service.cmr.dictionary.ClassDefinition; @@ -388,7 +388,7 @@ public class DictionaryModelType implements ContentServicePolicies.OnContentUpda /** * Dictionary model type transaction listener class. */ - public class DictionaryModelTypeTransactionListener implements TransactionListener + public class DictionaryModelTypeTransactionListener extends TransactionListenerAdapter { /** * Id used in equals and hash @@ -404,17 +404,11 @@ public class DictionaryModelType implements ContentServicePolicies.OnContentUpda this.contentService = contentService; } - /** - * @see org.alfresco.repo.transaction.TransactionListener#flush() - */ - public void flush() - { - } - /** * @see org.alfresco.repo.transaction.TransactionListener#beforeCommit(boolean) */ @SuppressWarnings("unchecked") + @Override public void beforeCommit(boolean readOnly) { Set pendingModels = (Set)AlfrescoTransactionSupport.getResource(KEY_PENDING_MODELS); @@ -515,27 +509,6 @@ public class DictionaryModelType implements ContentServicePolicies.OnContentUpda } } - /** - * @see org.alfresco.repo.transaction.TransactionListener#beforeCompletion() - */ - public void beforeCompletion() - { - } - - /** - * @see org.alfresco.repo.transaction.TransactionListener#afterCommit() - */ - public void afterCommit() - { - } - - /** - * @see org.alfresco.repo.transaction.TransactionListener#afterRollback() - */ - public void afterRollback() - { - } - /** * @see java.lang.Object#equals(java.lang.Object) */ diff --git a/source/java/org/alfresco/repo/domain/hibernate/Node.hbm.xml b/source/java/org/alfresco/repo/domain/hibernate/Node.hbm.xml index 4311d58add..c9bee9c0a9 100644 --- a/source/java/org/alfresco/repo/domain/hibernate/Node.hbm.xml +++ b/source/java/org/alfresco/repo/domain/hibernate/Node.hbm.xml @@ -72,7 +72,7 @@ batch-size="128" sort="unsorted" inverse="false" - optimistic-lock="true" + optimistic-lock="false" cascade="delete" > @@ -98,7 +98,7 @@ batch-size="128" inverse="false" sort="unsorted" - optimistic-lock="true" + optimistic-lock="false" cascade="delete" > diff --git a/source/java/org/alfresco/repo/domain/hibernate/Transaction.hbm.xml b/source/java/org/alfresco/repo/domain/hibernate/Transaction.hbm.xml index 2786c018d5..095362b8ff 100644 --- a/source/java/org/alfresco/repo/domain/hibernate/Transaction.hbm.xml +++ b/source/java/org/alfresco/repo/domain/hibernate/Transaction.hbm.xml @@ -72,8 +72,6 @@ org.alfresco.repo.domain.hibernate.TransactionImpl as txn where txn.id = :txnId - order by - txn.commitTimeMs @@ -85,9 +83,11 @@ where txn.commitTimeMs >= :fromTimeInclusive and txn.commitTimeMs < :toTimeExclusive and - txn.id not in (:excludeTxnIds) + txn.id not in (:excludeTxnIds) and + txn.server.id not in (:excludeServerIds) order by - txn.commitTimeMs + txn.commitTimeMs asc, + txn.id asc ]]> @@ -100,9 +100,25 @@ where txn.commitTimeMs >= :fromTimeInclusive and txn.commitTimeMs < :toTimeExclusive and - txn.id not in (:excludeTxnIds) + txn.id not in (:excludeTxnIds) and + txn.server.id not in (:excludeServerIds) order by - txn.commitTimeMs desc + txn.commitTimeMs desc, + txn.id desc + ]]> + + + + diff --git a/source/java/org/alfresco/repo/node/db/NodeDaoService.java b/source/java/org/alfresco/repo/node/db/NodeDaoService.java index 30f75ec3c7..6a42fe6905 100644 --- a/source/java/org/alfresco/repo/node/db/NodeDaoService.java +++ b/source/java/org/alfresco/repo/node/db/NodeDaoService.java @@ -318,23 +318,34 @@ public interface NodeDaoService * for any given millisecond, a list of optional exclusions may be provided. * * @param excludeTxnIds a list of txn IDs to ignore. null is allowed. + * @param remoteOnly true if locally-written transactions must be ignored */ public List getTxnsByCommitTimeAscending( long fromTimeInclusive, long toTimeExclusive, int count, - List excludeTxnIds); + List excludeTxnIds, + boolean remoteOnly); /** * Get all transactions in a given time range. Since time-based retrieval doesn't guarantee uniqueness * for any given millisecond, a list of optional exclusions may be provided. * * @param excludeTxnIds a list of txn IDs to ignore. null is allowed. + * @param remoteOnly true if locally-written transactions must be ignored */ public List getTxnsByCommitTimeDescending( long fromTimeInclusive, long toTimeExclusive, int count, - List excludeTxnIds); + List excludeTxnIds, + boolean remoteOnly); + /** + * Get the lowest commit time for a set of transactions + * + * @param includeTxnIds a list of transaction IDs to search for + * @return Returns the transactions by commit time for the given IDs + */ + public List getTxnsByMinCommitTime(List includeTxnIds); public int getTxnUpdateCount(final long txnId); public int getTxnDeleteCount(final long txnId); public int getTransactionCount(); diff --git a/source/java/org/alfresco/repo/node/db/hibernate/HibernateNodeDaoServiceImpl.java b/source/java/org/alfresco/repo/node/db/hibernate/HibernateNodeDaoServiceImpl.java index 9fed05f090..b862e80793 100644 --- a/source/java/org/alfresco/repo/node/db/hibernate/HibernateNodeDaoServiceImpl.java +++ b/source/java/org/alfresco/repo/node/db/hibernate/HibernateNodeDaoServiceImpl.java @@ -215,6 +215,40 @@ public class HibernateNodeDaoServiceImpl extends HibernateDaoSupport implements this.parentAssocsCache = parentAssocsCache; } + /** + * @return Returns the ID of this instance's server instance or null + */ + private Long getServerIdOrNull() + { + Long serverId = serverIdSingleton.get(); + if (serverId != null) + { + return serverId; + } + // Query for it + // The server already exists, so get it + HibernateCallback callback = new HibernateCallback() + { + public Object doInHibernate(Session session) + { + Query query = session + .getNamedQuery(HibernateNodeDaoServiceImpl.QUERY_GET_SERVER_BY_IPADDRESS) + .setString("ipAddress", ipAddress); + return query.uniqueResult(); + } + }; + Server server = (Server) getHibernateTemplate().execute(callback); + if (server != null) + { + // It exists, so just return the ID + return server.getId(); + } + else + { + return null; + } + } + /** * Gets/creates the server instance to use for the life of this instance */ @@ -1522,6 +1556,7 @@ public class HibernateNodeDaoServiceImpl extends HibernateDaoSupport implements private static final String QUERY_GET_TXN_BY_ID = "txn.GetTxnById"; private static final String QUERY_GET_TXNS_BY_COMMIT_TIME_ASC = "txn.GetTxnsByCommitTimeAsc"; private static final String QUERY_GET_TXNS_BY_COMMIT_TIME_DESC = "txn.GetTxnsByCommitTimeDesc"; + private static final String QUERY_GET_SELECTED_TXNS_BY_COMMIT_TIME_ASC = "txn.GetSelectedTxnsByCommitAsc"; private static final String QUERY_GET_TXN_UPDATE_COUNT_FOR_STORE = "txn.GetTxnUpdateCountForStore"; private static final String QUERY_GET_TXN_DELETE_COUNT_FOR_STORE = "txn.GetTxnDeleteCountForStore"; private static final String QUERY_COUNT_TRANSACTIONS = "txn.CountTransactions"; @@ -1545,6 +1580,28 @@ public class HibernateNodeDaoServiceImpl extends HibernateDaoSupport implements return txn; } + @SuppressWarnings("unchecked") + public List getTxnsByMinCommitTime(final List includeTxnIds) + { + if (includeTxnIds.size() == 0) + { + return null; + } + HibernateCallback callback = new HibernateCallback() + { + public Object doInHibernate(Session session) + { + Query query = session.getNamedQuery(QUERY_GET_SELECTED_TXNS_BY_COMMIT_TIME_ASC); + query.setParameterList("includeTxnIds", includeTxnIds) + .setReadOnly(true); + return query.list(); + } + }; + List txns = (List) getHibernateTemplate().execute(callback); + // done + return txns; + } + @SuppressWarnings("unchecked") public int getTxnUpdateCount(final long txnId) { @@ -1600,12 +1657,14 @@ public class HibernateNodeDaoServiceImpl extends HibernateDaoSupport implements } private static final Long TXN_ID_DUD = Long.valueOf(-1L); + private static final Long SERVER_ID_DUD = Long.valueOf(-1L); @SuppressWarnings("unchecked") public List getTxnsByCommitTimeAscending( final long fromTimeInclusive, final long toTimeExclusive, final int count, - List excludeTxnIds) + List excludeTxnIds, + boolean remoteOnly) { // Make sure that we have at least one entry in the exclude list final List excludeTxnIdsInner = new ArrayList(excludeTxnIds == null ? 1 : excludeTxnIds.size()); @@ -1617,6 +1676,25 @@ public class HibernateNodeDaoServiceImpl extends HibernateDaoSupport implements { excludeTxnIdsInner.addAll(excludeTxnIds); } + final List excludeServerIds = new ArrayList(1); + if (remoteOnly) + { + // Get the current server ID. This can be null if no transactions have been written by + // a server with this IP address. + Long serverId = getServerIdOrNull(); + if (serverId == null) + { + excludeServerIds.add(SERVER_ID_DUD); + } + else + { + excludeServerIds.add(serverId); + } + } + else + { + excludeServerIds.add(SERVER_ID_DUD); + } HibernateCallback callback = new HibernateCallback() { public Object doInHibernate(Session session) @@ -1625,6 +1703,7 @@ public class HibernateNodeDaoServiceImpl extends HibernateDaoSupport implements query.setLong("fromTimeInclusive", fromTimeInclusive) .setLong("toTimeExclusive", toTimeExclusive) .setParameterList("excludeTxnIds", excludeTxnIdsInner) + .setParameterList("excludeServerIds", excludeServerIds) .setMaxResults(count) .setReadOnly(true); return query.list(); @@ -1640,7 +1719,8 @@ public class HibernateNodeDaoServiceImpl extends HibernateDaoSupport implements final long fromTimeInclusive, final long toTimeExclusive, final int count, - List excludeTxnIds) + List excludeTxnIds, + boolean remoteOnly) { // Make sure that we have at least one entry in the exclude list final List excludeTxnIdsInner = new ArrayList(excludeTxnIds == null ? 1 : excludeTxnIds.size()); @@ -1652,6 +1732,25 @@ public class HibernateNodeDaoServiceImpl extends HibernateDaoSupport implements { excludeTxnIdsInner.addAll(excludeTxnIds); } + final List excludeServerIds = new ArrayList(1); + if (remoteOnly) + { + // Get the current server ID. This can be null if no transactions have been written by + // a server with this IP address. + Long serverId = getServerIdOrNull(); + if (serverId == null) + { + excludeServerIds.add(SERVER_ID_DUD); + } + else + { + excludeServerIds.add(serverId); + } + } + else + { + excludeServerIds.add(SERVER_ID_DUD); + } HibernateCallback callback = new HibernateCallback() { public Object doInHibernate(Session session) @@ -1660,6 +1759,7 @@ public class HibernateNodeDaoServiceImpl extends HibernateDaoSupport implements query.setLong("fromTimeInclusive", fromTimeInclusive) .setLong("toTimeExclusive", toTimeExclusive) .setParameterList("excludeTxnIds", excludeTxnIdsInner) + .setParameterList("excludeServerIds", excludeServerIds) .setMaxResults(count) .setReadOnly(true); return query.list(); diff --git a/source/java/org/alfresco/repo/node/index/AVMRemoteSnapshotTrackerTest.java b/source/java/org/alfresco/repo/node/index/AVMRemoteSnapshotTrackerTest.java index c369c7d124..91a9d01bea 100644 --- a/source/java/org/alfresco/repo/node/index/AVMRemoteSnapshotTrackerTest.java +++ b/source/java/org/alfresco/repo/node/index/AVMRemoteSnapshotTrackerTest.java @@ -24,6 +24,8 @@ */ package org.alfresco.repo.node.index; +import java.util.concurrent.ThreadPoolExecutor; + import javax.transaction.Status; import javax.transaction.UserTransaction; @@ -68,6 +70,8 @@ public class AVMRemoteSnapshotTrackerTest extends BaseSpringTest private Indexer indexer; private NodeDaoService nodeDaoService; + + private ThreadPoolExecutor threadPoolExecutor; public AVMRemoteSnapshotTrackerTest() { @@ -89,6 +93,7 @@ public class AVMRemoteSnapshotTrackerTest extends BaseSpringTest ftsIndexer = (FullTextSearchIndexer) applicationContext.getBean("LuceneFullTextSearchIndexer"); indexer = (Indexer) applicationContext.getBean("indexerComponent"); nodeDaoService = (NodeDaoService) applicationContext.getBean("nodeDaoService"); + threadPoolExecutor = (ThreadPoolExecutor) applicationContext.getBean("indexTrackerThreadPoolExecutor"); testTX = transactionService.getUserTransaction(); @@ -190,6 +195,7 @@ public class AVMRemoteSnapshotTrackerTest extends BaseSpringTest tracker.setNodeDaoService(nodeDaoService); tracker.setNodeService(nodeService); tracker.setSearcher(searchService); + tracker.setThreadPoolExecutor(threadPoolExecutor); tracker.reindex(); diff --git a/source/java/org/alfresco/repo/node/index/AbstractReindexComponent.java b/source/java/org/alfresco/repo/node/index/AbstractReindexComponent.java index ff787c6183..022d540fa0 100644 --- a/source/java/org/alfresco/repo/node/index/AbstractReindexComponent.java +++ b/source/java/org/alfresco/repo/node/index/AbstractReindexComponent.java @@ -24,12 +24,17 @@ */ package org.alfresco.repo.node.index; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import net.sf.acegisecurity.Authentication; +import org.alfresco.error.AlfrescoRuntimeException; import org.alfresco.model.ContentModel; import org.alfresco.repo.domain.Transaction; import org.alfresco.repo.node.db.NodeDaoService; @@ -38,7 +43,10 @@ import org.alfresco.repo.search.impl.lucene.LuceneQueryParser; import org.alfresco.repo.search.impl.lucene.fts.FullTextSearchIndexer; import org.alfresco.repo.security.authentication.AuthenticationComponent; import org.alfresco.repo.security.authentication.AuthenticationUtil; +import org.alfresco.repo.transaction.AlfrescoTransactionSupport; +import org.alfresco.repo.transaction.TransactionListenerAdapter; import org.alfresco.repo.transaction.TransactionServiceImpl; +import org.alfresco.repo.transaction.AlfrescoTransactionSupport.TxnReadState; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.cmr.repository.ChildAssociationRef; import org.alfresco.service.cmr.repository.NodeRef; @@ -48,6 +56,7 @@ import org.alfresco.service.cmr.repository.NodeRef.Status; import org.alfresco.service.cmr.search.ResultSet; import org.alfresco.service.cmr.search.SearchParameters; import org.alfresco.service.cmr.search.SearchService; +import org.alfresco.util.ParameterCheck; import org.alfresco.util.PropertyCheck; import org.alfresco.util.VmShutdownListener; import org.apache.commons.logging.Log; @@ -65,6 +74,7 @@ import org.apache.commons.logging.LogFactory; public abstract class AbstractReindexComponent implements IndexRecovery { private static Log logger = LogFactory.getLog(AbstractReindexComponent.class); + private static Log loggerOnThread = LogFactory.getLog(AbstractReindexComponent.class.getName() + ".threads"); /** kept to notify the thread that it should quit */ private static VmShutdownListener vmShutdownListener = new VmShutdownListener("IndexRecovery"); @@ -82,6 +92,8 @@ public abstract class AbstractReindexComponent implements IndexRecovery protected NodeService nodeService; /** the component giving direct access to transaction instances */ protected NodeDaoService nodeDaoService; + /** the component that holds the reindex worker threads */ + private ThreadPoolExecutor threadPoolExecutor; private volatile boolean shutdown; private final WriteLock indexerWriteLock; @@ -180,6 +192,30 @@ public abstract class AbstractReindexComponent implements IndexRecovery this.nodeDaoService = nodeDaoService; } + /** + * Set the thread pool to use when doing asynchronous reindexing. Use null + * to have the calling thread do the indexing. + * + * @param threadPoolExecutor a pre-configured thread pool for the reindex work + * + * @since 2.1.4 + */ + public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) + { + this.threadPoolExecutor = threadPoolExecutor; + } + + /** + * Determines if calls to {@link #reindexImpl()} should be wrapped in a transaction or not. + * The default is true. + * + * @return Returns true if an existing transaction is required for reindexing. + */ + protected boolean requireTransaction() + { + return true; + } + /** * Perform the actual work. This method will be called as the system user * and within an existing transaction. This thread will only ever be accessed @@ -217,7 +253,18 @@ public abstract class AbstractReindexComponent implements IndexRecovery return null; } }; - transactionService.getRetryingTransactionHelper().doInTransaction(reindexWork, true); + if (requireTransaction()) + { + transactionService.getRetryingTransactionHelper().doInTransaction(reindexWork, true); + } + else + { + reindexWork.execute(); + } + } + catch (Throwable e) + { + throw new AlfrescoRuntimeException("Reindex failure for " + this.getClass().getName(), e); } finally { @@ -247,56 +294,131 @@ public abstract class AbstractReindexComponent implements IndexRecovery YES, NO, INDETERMINATE; } + private static final String KEY_STORE_REFS = "StoreRefCacheMethodInterceptor.StoreRefs"; + @SuppressWarnings("unchecked") /** - * Determines if a given transaction is definitely in the index or not. - * - * @param txnId a specific transaction - * @return Returns true if the transaction is definitely in the index + * Helper method that caches ADM store references to prevent repeated and unnecessary calls to the + * NodeService for this list. */ + private List getAdmStoreRefs() + { + List storeRefs = (List) AlfrescoTransactionSupport.getResource(KEY_STORE_REFS); + if (storeRefs != null) + { + return storeRefs; + } + else + { + storeRefs = nodeService.getStores(); + Iterator storeRefsIterator = storeRefs.iterator(); + while (storeRefsIterator.hasNext()) + { + // Remove AVM stores + StoreRef storeRef = storeRefsIterator.next(); + if (storeRef.getProtocol().equals(StoreRef.PROTOCOL_AVM)) + { + storeRefsIterator.remove(); + } + } + // Change the ordering to favour the most common stores + if (storeRefs.contains(StoreRef.STORE_REF_ARCHIVE_SPACESSTORE)) + { + storeRefs.remove(StoreRef.STORE_REF_ARCHIVE_SPACESSTORE); + storeRefs.add(0, StoreRef.STORE_REF_ARCHIVE_SPACESSTORE); + } + if (storeRefs.contains(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE)) + { + storeRefs.remove(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE); + storeRefs.add(0, StoreRef.STORE_REF_WORKSPACE_SPACESSTORE); + } + // Bind it in + AlfrescoTransactionSupport.bindResource(KEY_STORE_REFS, storeRefs); + } + return storeRefs; + } + protected InIndex isTxnIdPresentInIndex(long txnId) { - if (logger.isDebugEnabled()) - { - logger.debug("Checking for transaction in index: " + txnId); - } - Transaction txn = nodeDaoService.getTxnById(txnId); if (txn == null) { return InIndex.YES; } + return isTxnPresentInIndex(txn); + } + + /** + * Determines if a given transaction is definitely in the index or not. + * + * @param txn a specific transaction + * @return Returns true if the transaction is definitely in the index + */ + protected InIndex isTxnPresentInIndex(final Transaction txn) + { + if (txn == null) + { + return InIndex.YES; + } - // count the changes in the transaction - int updateCount = nodeDaoService.getTxnUpdateCount(txnId); - int deleteCount = nodeDaoService.getTxnDeleteCount(txnId); + final Long txnId = txn.getId(); if (logger.isDebugEnabled()) { - logger.debug("Transaction " + txnId + " has " + updateCount + " updates and " + deleteCount + " deletes."); + logger.debug("Checking for transaction in index: " + txnId); } - - InIndex result = InIndex.NO; - if (updateCount == 0 && deleteCount == 0) + // Check if the txn ID is present in any store's index + boolean foundInIndex = false; + List storeRefs = getAdmStoreRefs(); + for (StoreRef storeRef : storeRefs) { - // If there are no update or deletes, then it is impossible to know if the transaction was removed - // from the index or was never there in the first place. - result = InIndex.INDETERMINATE; - } - else - { - // get the stores - List storeRefs = nodeService.getStores(); - for (StoreRef storeRef : storeRefs) + boolean inStore = isTxnIdPresentInIndex(storeRef, txn); + if (inStore) { - boolean inStore = isTxnIdPresentInIndex(storeRef, txn, updateCount, deleteCount); - if (inStore) + // found in a particular store + foundInIndex = true; + break; + } + } + InIndex result = InIndex.NO; + if (!foundInIndex) + { + // If none of the stores have the transaction, then that might be because it consists of 0 modifications + int updateCount = nodeDaoService.getTxnUpdateCount(txnId); + if (updateCount > 0) + { + // There were updates, but there is no sign in the indexes + result = InIndex.NO; + } + else + { + // We're now in the case where there were no updates + int deleteCount = nodeDaoService.getTxnDeleteCount(txnId); + if (deleteCount == 0) { - // found in a particular store + // There are no updates or deletes and no entry in the indexes. + // There are outdated nodes in the index. result = InIndex.YES; - break; + } + else + { + // There were deleted nodes only. Check that all the deleted nodes were + // removed from the index otherwise it is out of date. + for (StoreRef storeRef : storeRefs) + { + if (!haveNodesBeenRemovedFromIndex(storeRef, txn)) + { + result = InIndex.NO; + break; + } + } } } } + else + { + result = InIndex.YES; + } + // done if (logger.isDebugEnabled()) { @@ -306,18 +428,64 @@ public abstract class AbstractReindexComponent implements IndexRecovery } /** - * @param updateCount the number of node updates in the transaction - * @param deleteCount the number of node deletions in the transaction - * @return Returns true if the given transaction is indexed, - * or if there are no updates or deletes + * @return Returns true if the given transaction is present in the index */ - private boolean isTxnIdPresentInIndex(StoreRef storeRef, Transaction txn, int updateCount, int deleteCount) + private boolean isTxnIdPresentInIndex(StoreRef storeRef, Transaction txn) { long txnId = txn.getId(); String changeTxnId = txn.getChangeTxnId(); // do the most update check, which is most common - if (updateCount > 0) + ResultSet results = null; + try { + SearchParameters sp = new SearchParameters(); + sp.addStore(storeRef); + // search for it in the index, sorting with youngest first, fetching only 1 + sp.setLanguage(SearchService.LANGUAGE_LUCENE); + sp.setQuery("TX:" + LuceneQueryParser.escape(changeTxnId)); + sp.setLimit(1); + + results = searcher.query(sp); + + if (results.length() > 0) + { + if (logger.isDebugEnabled()) + { + logger.debug("Index has results for txn " + txnId + " for store " + storeRef); + } + return true; // there were updates/creates and results for the txn were found + } + else + { + if (logger.isDebugEnabled()) + { + logger.debug("Transaction " + txnId + " not in index for store " + storeRef + ". Possibly out of date."); + } + return false; + } + } + finally + { + if (results != null) { results.close(); } + } + } + + private boolean haveNodesBeenRemovedFromIndex(final StoreRef storeRef, final Transaction txn) + { + final Long txnId = txn.getId(); + // there have been deletes, so we have to ensure that none of the nodes deleted are present in the index + // get all node refs for the transaction + List nodeRefs = nodeDaoService.getTxnChangesForStore(storeRef, txnId); + boolean foundNodeRef = false; + for (NodeRef nodeRef : nodeRefs) + { + if (logger.isDebugEnabled()) + { + logger.debug("Searching for node in index: \n" + + " node: " + nodeRef + "\n" + + " txn: " + txnId); + } + // we know that these are all deletions ResultSet results = null; try { @@ -325,26 +493,15 @@ public abstract class AbstractReindexComponent implements IndexRecovery sp.addStore(storeRef); // search for it in the index, sorting with youngest first, fetching only 1 sp.setLanguage(SearchService.LANGUAGE_LUCENE); - sp.setQuery("TX:" + LuceneQueryParser.escape(changeTxnId)); + sp.setQuery("ID:" + LuceneQueryParser.escape(nodeRef.toString())); sp.setLimit(1); - + results = searcher.query(sp); - + if (results.length() > 0) { - if (logger.isDebugEnabled()) - { - logger.debug("Index has results for txn " + txnId + " for store " + storeRef); - } - return true; // there were updates/creates and results for the txn were found - } - else - { - if (logger.isDebugEnabled()) - { - logger.debug("Transaction " + txnId + " not in index for store " + storeRef + ". Possibly out of date."); - } - return false; + foundNodeRef = true; + break; } } finally @@ -352,64 +509,22 @@ public abstract class AbstractReindexComponent implements IndexRecovery if (results != null) { results.close(); } } } - else if (deleteCount > 0) + if (foundNodeRef) { - // there have been deletes, so we have to ensure that none of the nodes deleted are present in the index - // get all node refs for the transaction - List nodeRefs = nodeDaoService.getTxnChangesForStore(storeRef, txnId); - for (NodeRef nodeRef : nodeRefs) + if (logger.isDebugEnabled()) { - if (logger.isDebugEnabled()) - { - logger.debug("Searching for node in index: \n" + - " node: " + nodeRef + "\n" + - " txn: " + txnId); - } - // we know that these are all deletions - ResultSet results = null; - try - { - SearchParameters sp = new SearchParameters(); - sp.addStore(storeRef); - // search for it in the index, sorting with youngest first, fetching only 1 - sp.setLanguage(SearchService.LANGUAGE_LUCENE); - sp.setQuery("ID:" + LuceneQueryParser.escape(nodeRef.toString())); - sp.setLimit(1); - - results = searcher.query(sp); - - if (results.length() == 0) - { - // no results, as expected - if (logger.isDebugEnabled()) - { - logger.debug(" --> Node not found (OK)"); - } - continue; - } - else - { - if (logger.isDebugEnabled()) - { - logger.debug(" --> Node found (Index out of date)"); - } - return false; - } - } - finally - { - if (results != null) { results.close(); } - } + logger.debug(" --> Node found (Index out of date)"); } } - // else -> The fallthrough case where there are no updates or deletes - - // all tests passed - if (logger.isDebugEnabled()) + else { - logger.debug("Index is in synch with transaction: " + txnId); + // No nodes found + if (logger.isDebugEnabled()) + { + logger.debug(" --> Node not found (OK)"); + } } - return true; + return !foundNodeRef; } /** @@ -419,8 +534,7 @@ public abstract class AbstractReindexComponent implements IndexRecovery { for (Transaction txn : txns) { - long txnId = txn.getId().longValue(); - if (isTxnIdPresentInIndex(txnId) == InIndex.NO) + if (isTxnPresentInIndex(txn) == InIndex.NO) { // Missing txn return false; @@ -430,54 +544,501 @@ public abstract class AbstractReindexComponent implements IndexRecovery } /** - * Perform a full reindexing of the given transaction in the context of a completely - * new transaction. + * Marker exception to neatly handle VM-driven termination of a reindex * - * @param txnId the transaction identifier + * @author Derek Hulley + * @since 2.1.4 */ - protected void reindexTransaction(final long txnId) + public static class ReindexTerminatedException extends RuntimeException { + private static final long serialVersionUID = -7928720932368892814L; + } + + /** + * Callback to notify caller whenever a node has been indexed + * + * @see + * @author Derek Hulley + * @since 2.1.4 + */ + protected interface ReindexNodeCallback + { + void reindexedNode(NodeRef nodeRef); + } + + protected void reindexTransaction(Long txnId) + { + reindexTransaction(txnId, null); + } + + /** + * Perform a full reindexing of the given transaction on the current thread. + * The calling thread must be in the context of a read-only transaction. + * + * @param txnId the transaction identifier + * @param callback the callback to notify of each node indexed + * + * @throws ReindexTerminatedException if the VM is shutdown during the reindex + */ + protected void reindexTransaction(final long txnId, ReindexNodeCallback callback) + { + ParameterCheck.mandatory("txnId", txnId); if (logger.isDebugEnabled()) { logger.debug("Reindexing transaction: " + txnId); } - - RetryingTransactionCallback reindexWork = new RetryingTransactionCallback() + if (AlfrescoTransactionSupport.getTransactionReadState() != TxnReadState.TXN_READ_ONLY) { - public Object execute() throws Exception + throw new AlfrescoRuntimeException("Reindex work must be done in the context of a read-only transaction"); + } + + // get the node references pertinent to the transaction + List nodeRefs = nodeDaoService.getTxnChanges(txnId); + // reindex each node + int nodeCount = 0; + for (NodeRef nodeRef : nodeRefs) + { + Status nodeStatus = nodeService.getNodeStatus(nodeRef); + if (nodeStatus == null) { - // get the node references pertinent to the transaction - List nodeRefs = nodeDaoService.getTxnChanges(txnId); - // reindex each node - for (NodeRef nodeRef : nodeRefs) - { - Status nodeStatus = nodeService.getNodeStatus(nodeRef); - if (nodeStatus == null) - { - // it's not there any more - continue; - } - if (nodeStatus.isDeleted()) // node deleted - { - // only the child node ref is relevant - ChildAssociationRef assocRef = new ChildAssociationRef( - ContentModel.ASSOC_CHILDREN, - null, - null, - nodeRef); - indexer.deleteNode(assocRef); - } - else // node created - { - // reindex - indexer.updateNode(nodeRef); - } - } - // done - return null; + // it's not there any more + continue; } - }; - transactionService.getRetryingTransactionHelper().doInTransaction(reindexWork, true); + if (nodeStatus.isDeleted()) // node deleted + { + // only the child node ref is relevant + ChildAssociationRef assocRef = new ChildAssociationRef( + ContentModel.ASSOC_CHILDREN, + null, + null, + nodeRef); + indexer.deleteNode(assocRef); + } + else // node created + { + // reindex + indexer.updateNode(nodeRef); + } + // Make the callback + if (callback != null) + { + callback.reindexedNode(nodeRef); + } + // Check for VM shutdown every 100 nodes + if (++nodeCount % 100 == 0 && isShuttingDown()) + { + // We can't fail gracefully and run the risk of committing a half-baked transaction + logger.info("Reindexing of transaction " + txnId + " terminated by VM shutdown."); + throw new ReindexTerminatedException(); + } + } // done } + + private static final AtomicInteger ID_GENERATOR = new AtomicInteger(); + /** + * Runnable that does reindex work for a given transaction but waits on a queue before + * triggering the commit phase. + *

+ * This class uses Object's default equality and hashcode generation. + * + * @author Derek Hulley + * @since 2.1.4 + */ + private class ReindexWorkerRunnable extends TransactionListenerAdapter implements Runnable, ReindexNodeCallback + { + private final int id; + private final int uidHashCode; + private final List txnIds; + private long lastIndexedTimestamp; + private boolean atHeadOfQueue; + private boolean killed; + + private ReindexWorkerRunnable(List txnIds) + { + this.id = ID_GENERATOR.addAndGet(1); + if (ID_GENERATOR.get() > 1000) + { + ID_GENERATOR.set(0); + } + this.uidHashCode = id * 13 + 11; + this.txnIds = txnIds; + this.atHeadOfQueue = false; + this.killed = false; + recordTimestamp(); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(128); + sb.append("ReindexWorkerRunnable") + .append("[id=").append(id) + .append("[txnIds=").append(txnIds) + .append("]"); + return sb.toString(); + } + + @Override + public boolean equals(Object obj) + { + if (!(obj instanceof ReindexWorkerRunnable)) + { + return false; + } + ReindexWorkerRunnable that = (ReindexWorkerRunnable) obj; + return this.id == that.id; + } + + @Override + public int hashCode() + { + return uidHashCode; + } + + public synchronized void kill() + { + this.killed = true; + } + + private synchronized boolean isKilled() + { + return killed; + } + + /** + * @return the time that the last node was indexed (nanoseconds) + */ + public synchronized long getLastIndexedTimestamp() + { + return lastIndexedTimestamp; + } + + private synchronized void recordTimestamp() + { + this.lastIndexedTimestamp = System.nanoTime(); + } + + private synchronized boolean isAtHeadOfQueue() + { + return atHeadOfQueue; + } + + private synchronized void waitForHeadOfQueue() + { + try { wait(100L); } catch (InterruptedException e) {} + } + + public synchronized void setAtHeadOfQueue() + { + this.notifyAll(); + this.atHeadOfQueue = true; + } + + public void run() + { + RetryingTransactionCallback reindexCallback = new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + // The first thing is to ensure that beforeCommit will be called + AlfrescoTransactionSupport.bindListener(ReindexWorkerRunnable.this); + // Now reindex + for (Long txnId : txnIds) + { + if (loggerOnThread.isDebugEnabled()) + { + String msg = String.format( + " -> Reindexer %5d reindexing %10d", + id, txnId.longValue()); + loggerOnThread.debug(msg); + } + reindexTransaction(txnId, ReindexWorkerRunnable.this); + } + // Done + return null; + } + }; + // Timestamp for when we start + recordTimestamp(); + try + { + if (loggerOnThread.isDebugEnabled()) + { + int txnIdsSize = txnIds.size(); + String msg = String.format( + "Reindexer %5d starting [%10d, %10d] on %s.", + id, + (txnIdsSize == 0 ? -1 : txnIds.get(0)), + (txnIdsSize == 0 ? -1 : txnIds.get(txnIdsSize-1)), + Thread.currentThread().getName()); + loggerOnThread.debug(msg); + } + // Do the work + transactionService.getRetryingTransactionHelper().doInTransaction(reindexCallback, true, true); + } + catch (ReindexTerminatedException e) + { + // This is OK + String msg = String.format( + "Reindexer %5d terminated: %s.", + id, + e.getMessage()); + loggerOnThread.warn(msg); + } + catch (Throwable e) + { + String msg = String.format( + "Reindexer %5d failed with error: %s.", + id, + e.getMessage()); + loggerOnThread.error(msg); + } + finally + { + // Triple check that we get the queue state right + removeFromQueueAndProdHead(); + } + } + public synchronized void reindexedNode(NodeRef nodeRef) + { + // Check for forced kill + if (isKilled()) + { + throw new ReindexTerminatedException(); + } + recordTimestamp(); + } + + /** + * Removes this instance from the queue and notifies the HEAD + */ + private void removeFromQueueAndProdHead() + { + try + { + reindexThreadLock.writeLock().lock(); + // Remove self from head of queue + reindexThreadQueue.remove(this); + } + finally + { + reindexThreadLock.writeLock().unlock(); + } + // Now notify the new head object + ReindexWorkerRunnable newPeek = peekHeadReindexWorker(); + if (newPeek != null) + { + newPeek.setAtHeadOfQueue(); + } + if (loggerOnThread.isDebugEnabled()) + { + String msg = String.format( + "Reindexer %5d removed from queue. Current HEAD is %s.", + id, newPeek); + loggerOnThread.debug(msg); + } + } + + @Override + public void afterCommit() + { + handleQueue(); + } + @Override + public void afterRollback() + { + handleQueue(); + } + /** + * Lucene will do its final commit once this has been allowed to proceed. + */ + private void handleQueue() + { + while (true) + { + // Quick check to see if we're at the head of the queue + ReindexWorkerRunnable peek = peekHeadReindexWorker(); + // Release the current queue head to finish (this might be this instance) + if (peek != null) + { + peek.setAtHeadOfQueue(); + } + // Check kill switch + if (peek == null || isKilled() || isAtHeadOfQueue()) + { + // Going to close + break; + } + else + { + // This thread is not at the head of the queue and has not been flagged + // for death, so just wait until someone notifies us to carry on + waitForHeadOfQueue(); + // Loop again + } + } + // Lucene can now get on with the commit. We didn't have ordering at this level + // and the IndexInfo locks are handled by Lucene. So we let the thread go and + // the other worker threads can get on with it. + // Record the fact that the thread is on the final straight. From here on, no + // more work notifications will be possible so the timestamp needs to spoof it. + recordTimestamp(); + } + } + + /** + * FIFO queue to control the ordering of transaction commits. Synchronization around this object is + * controlled by the read-write lock. + */ + private LinkedBlockingQueue reindexThreadQueue = new LinkedBlockingQueue(); + private ReentrantReadWriteLock reindexThreadLock = new ReentrantReadWriteLock(true); + + /** + * Read-safe method to peek at the head of the queue + */ + private ReindexWorkerRunnable peekHeadReindexWorker() + { + try + { + reindexThreadLock.readLock().lock(); + return reindexThreadQueue.peek(); + } + finally + { + reindexThreadLock.readLock().unlock(); + } + } + + /** + * Performs indexing off the current thread, which may return quickly if there are threads immediately + * available in the thread pool. + *

+ * Commits are guaranteed to occur in the order in which this reindex jobs are added to the queue. + * + * @see #reindexTransaction(long) + * @see #waitForAsynchronousReindexing() + * @since 2.1.4 + */ + protected void reindexTransactionAsynchronously(final List txnIds) + { + // Bypass if there is no thread pool + if (threadPoolExecutor == null || threadPoolExecutor.getMaximumPoolSize() < 2) + { + if (loggerOnThread.isDebugEnabled()) + { + String msg = String.format( + "Reindexing inline: %s.", + txnIds.toString()); + loggerOnThread.debug(msg); + } + RetryingTransactionCallback reindexCallback = new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + for (Long txnId : txnIds) + { + if (loggerOnThread.isDebugEnabled()) + { + String msg = String.format( + "Reindex %10d.", + txnId.longValue()); + loggerOnThread.debug(msg); + } + reindexTransaction(txnId, null); + } + return null; + } + }; + transactionService.getRetryingTransactionHelper().doInTransaction(reindexCallback, true, true); + return; + } + + ReindexWorkerRunnable runnable = new ReindexWorkerRunnable(txnIds); + try + { + reindexThreadLock.writeLock().lock(); + // Add the runnable to the queue to ensure ordering + reindexThreadQueue.add(runnable); + } + finally + { + reindexThreadLock.writeLock().unlock(); + } + // Ship it to a thread. + // We don't do this in the lock - but the situation should be avoided by having the blocking + // queue size less than the maximum pool size + threadPoolExecutor.execute(runnable); + } + + /** + * Wait for all asynchronous indexing to finish before returning. This is useful if the calling thread + * wants to ensure that all reindex work has finished before continuing. + */ + protected synchronized void waitForAsynchronousReindexing() + { + ReindexWorkerRunnable lastRunnable = null; + long lastTimestamp = Long.MAX_VALUE; + + ReindexWorkerRunnable currentRunnable = peekHeadReindexWorker(); + while (currentRunnable != null && !isShuttingDown()) + { + // Notify the runnable that it is at the head of the queue + currentRunnable.setAtHeadOfQueue(); + // Give the thread chance to commit + synchronized(this) + { + try { wait(100); } catch (InterruptedException e) {} + } + + long currentTimestamp = currentRunnable.getLastIndexedTimestamp(); + // The head of the queue holds proceedings, so it can't be allowed to continue forever + // Allow 60s of inactivity. We don't anticipate more than a few milliseconds between + // timestamp advances for the reindex threads so this checking is just for emergencies + // to prevent the queue from getting locked up. + if (lastRunnable == currentRunnable) + { + if (currentTimestamp - lastTimestamp > 60E9) + { + + try + { + reindexThreadLock.writeLock().lock(); + // Double check + ReindexWorkerRunnable checkCurrentRunnable = reindexThreadQueue.peek(); + if (lastRunnable != checkCurrentRunnable) + { + // It's moved on - just in time + } + else + { + loggerOnThread.info("Terminating reindex thread for inactivity: " + currentRunnable); + reindexThreadQueue.remove(currentRunnable); + currentRunnable.kill(); + } + // Reset + lastRunnable = null; + lastTimestamp = Long.MAX_VALUE; + // Peek at the queue and check again + currentRunnable = reindexThreadQueue.peek(); + } + finally + { + reindexThreadLock.writeLock().unlock(); + } + continue; + } + // Swap timestamps + lastRunnable = currentRunnable; + lastTimestamp = currentTimestamp; + } + else + { + // Swap timestamps + lastRunnable = currentRunnable; + lastTimestamp = currentTimestamp; + } + currentRunnable = peekHeadReindexWorker(); + } + } } \ No newline at end of file diff --git a/source/java/org/alfresco/repo/node/index/FullIndexRecoveryComponent.java b/source/java/org/alfresco/repo/node/index/FullIndexRecoveryComponent.java index 690c89e2d4..fdcffff6e6 100644 --- a/source/java/org/alfresco/repo/node/index/FullIndexRecoveryComponent.java +++ b/source/java/org/alfresco/repo/node/index/FullIndexRecoveryComponent.java @@ -27,6 +27,7 @@ package org.alfresco.repo.node.index; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.Iterator; import java.util.List; import org.alfresco.i18n.I18NUtil; @@ -94,6 +95,7 @@ public class FullIndexRecoveryComponent extends AbstractReindexComponent private boolean lockServer; private IndexTransactionTracker indexTracker; private boolean stopOnError; + private int maxTransactionsPerLuceneCommit; /** *
    @@ -105,6 +107,7 @@ public class FullIndexRecoveryComponent extends AbstractReindexComponent public FullIndexRecoveryComponent() { recoveryMode = RecoveryMode.VALIDATE; + maxTransactionsPerLuceneCommit = 100; } /** @@ -118,6 +121,15 @@ public class FullIndexRecoveryComponent extends AbstractReindexComponent this.recoveryMode = RecoveryMode.valueOf(recoveryMode); } + /** + * Set the number of transactions to process per Lucene write. + * Larger values generate less contention on the Lucene IndexInfo files. + */ + public void setMaxTransactionsPerLuceneCommit(int maxTransactionsPerLuceneCommit) + { + this.maxTransactionsPerLuceneCommit = maxTransactionsPerLuceneCommit; + } + /** * Set this on to put the server into READ-ONLY mode for the duration of the index recovery. * The default is true, i.e. the server will be locked against further updates. @@ -178,10 +190,10 @@ public class FullIndexRecoveryComponent extends AbstractReindexComponent // Check that the first and last meaningful transactions are indexed List startTxns = nodeDaoService.getTxnsByCommitTimeAscending( - Long.MIN_VALUE, Long.MAX_VALUE, 10, null); + Long.MIN_VALUE, Long.MAX_VALUE, 10, null, false); boolean startAllPresent = areTxnsInIndex(startTxns); List endTxns = nodeDaoService.getTxnsByCommitTimeDescending( - Long.MIN_VALUE, Long.MAX_VALUE, 10, null); + Long.MIN_VALUE, Long.MAX_VALUE, 10, null, false); boolean endAllPresent = areTxnsInIndex(endTxns); // check the level of cover required @@ -275,12 +287,16 @@ public class FullIndexRecoveryComponent extends AbstractReindexComponent fromTimeInclusive, toTimeExclusive, MAX_TRANSACTIONS_PER_ITERATION, - lastTxnIds); + lastTxnIds, + false); lastTxnIds = new ArrayList(nextTxns.size()); // reindex each transaction - for (Transaction txn : nextTxns) + List txnIdBuffer = new ArrayList(maxTransactionsPerLuceneCommit); + Iterator txnIterator = nextTxns.iterator(); + while (txnIterator.hasNext()) { + Transaction txn = txnIterator.next(); Long txnId = txn.getId(); // Keep it to ensure we exclude it from the next iteration lastTxnIds.add(txnId); @@ -298,14 +314,22 @@ public class FullIndexRecoveryComponent extends AbstractReindexComponent } else { - try + // Add the transaction ID to the buffer + txnIdBuffer.add(txnId); + // Reindex if the buffer is full or if there are no more transactions + if (!txnIterator.hasNext() || txnIdBuffer.size() >= maxTransactionsPerLuceneCommit) { - reindexTransaction(txnId); - } - catch (Throwable e) - { - String msgError = I18NUtil.getMessage(MSG_RECOVERY_ERROR, txnId, e.getMessage()); - logger.info(msgError, e); + try + { + reindexTransactionAsynchronously(txnIdBuffer); + } + catch (Throwable e) + { + String msgError = I18NUtil.getMessage(MSG_RECOVERY_ERROR, txnId, e.getMessage()); + logger.info(msgError, e); + } + // Clear the buffer + txnIdBuffer = new ArrayList(maxTransactionsPerLuceneCommit); } } // Although we use the same time as this transaction for the next iteration, we also @@ -324,6 +348,9 @@ public class FullIndexRecoveryComponent extends AbstractReindexComponent } } + // Wait for the asynchronous process to catch up + waitForAsynchronousReindexing(); + // have we finished? if (nextTxns.size() == 0) { @@ -337,8 +364,8 @@ public class FullIndexRecoveryComponent extends AbstractReindexComponent } /** - * Perform a full reindexing of the given transaction in the context of a completely - * new transaction. + * Perform full reindexing of the given transaction. A read-only transaction is created + * if one doesn't already exist. * * @param txnId the transaction identifier */ @@ -384,7 +411,7 @@ public class FullIndexRecoveryComponent extends AbstractReindexComponent return null; } }; - transactionService.getRetryingTransactionHelper().doInTransaction(reindexWork, true, true); + transactionService.getRetryingTransactionHelper().doInTransaction(reindexWork, true, false); // done } } \ No newline at end of file diff --git a/source/java/org/alfresco/repo/node/index/IndexTransactionTracker.java b/source/java/org/alfresco/repo/node/index/IndexTransactionTracker.java index f8879ab43c..2e874601c9 100644 --- a/source/java/org/alfresco/repo/node/index/IndexTransactionTracker.java +++ b/source/java/org/alfresco/repo/node/index/IndexTransactionTracker.java @@ -19,14 +19,15 @@ package org.alfresco.repo.node.index; import java.util.ArrayList; import java.util.Collections; import java.util.Date; -import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import org.alfresco.error.AlfrescoRuntimeException; import org.alfresco.repo.domain.Transaction; +import org.alfresco.repo.transaction.RetryingTransactionHelper; +import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,16 +41,20 @@ public class IndexTransactionTracker extends AbstractReindexComponent private static Log logger = LogFactory.getLog(IndexTransactionTracker.class); private IndexTransactionTrackerListener listener; + private NodeIndexer nodeIndexer; private long maxTxnDurationMs; private long reindexLagMs; private int maxRecordSetSize; + private int maxTransactionsPerLuceneCommit; + private boolean disableInTransactionIndexing; private boolean started; private List previousTxnIds; - private long lastMaxTxnId; + private Long lastMaxTxnId; private long fromTimeInclusive; private Map voids; + private boolean forceReindex; /** * Set the defaults. @@ -57,6 +62,8 @@ public class IndexTransactionTracker extends AbstractReindexComponent *
  • Maximum transaction duration: 1 hour
  • *
  • Reindex lag: 1 second
  • *
  • Maximum recordset size: 1000
  • + *
  • Maximum transactions per Lucene commit: 100
  • + *
  • Disable in-transaction indexing: false
  • *
*/ public IndexTransactionTracker() @@ -64,10 +71,13 @@ public class IndexTransactionTracker extends AbstractReindexComponent maxTxnDurationMs = 3600L * 1000L; reindexLagMs = 1000L; maxRecordSetSize = 1000; + maxTransactionsPerLuceneCommit = 100; + disableInTransactionIndexing = false; previousTxnIds = Collections.emptyList(); lastMaxTxnId = Long.MAX_VALUE; fromTimeInclusive = -1L; voids = new TreeMap(); + forceReindex = false; } public synchronized void setListener(IndexTransactionTrackerListener listener) @@ -75,6 +85,11 @@ public class IndexTransactionTracker extends AbstractReindexComponent this.listener = listener; } + public void setNodeIndexer(NodeIndexer nodeIndexer) + { + this.nodeIndexer = nodeIndexer; + } + /** * Set the expected maximum duration of transaction supported. This value is used to adjust the * look-back used to detect transactions that committed. Values must be greater than zero. @@ -91,7 +106,7 @@ public class IndexTransactionTracker extends AbstractReindexComponent } this.maxTxnDurationMs = maxTxnDurationMinutes * 60L * 1000L; } - + /** * Transaction tracking should lag by the average commit time for a transaction. This will minimize * the number of holes in the transaction sequence. Values must be greater than zero. @@ -118,80 +133,173 @@ public class IndexTransactionTracker extends AbstractReindexComponent this.maxRecordSetSize = maxRecordSetSize; } + /** + * Set the number of transactions to process per Lucene write. + * Larger values generate less contention on the Lucene IndexInfo files. + */ + public void setMaxTransactionsPerLuceneCommit(int maxTransactionsPerLuceneCommit) + { + this.maxTransactionsPerLuceneCommit = maxTransactionsPerLuceneCommit; + } + + /** + * Enable or disabled in-transaction indexing. Under certain circumstances, the system + * can run with only index tracking enabled - in-transaction indexing is not always + * required. The {@link NodeIndexer} is disabled when this component initialises. + */ + public void setDisableInTransactionIndexing(boolean disableInTransactionIndexing) + { + this.disableInTransactionIndexing = disableInTransactionIndexing; + } + + /** + * @return Returns false always. Transactions are handled internally. + */ + @Override + protected boolean requireTransaction() + { + return false; + } + + /** Worker callback for transactional use */ + RetryingTransactionCallback getStartingCommitTimeWork = new RetryingTransactionCallback() + { + public Long execute() throws Exception + { + return getStartingTxnCommitTime(); + } + }; + /** Worker callback for transactional use */ + RetryingTransactionCallback reindexWork = new RetryingTransactionCallback() + { + public Boolean execute() throws Exception + { + return reindexInTransaction(); + } + }; + @Override protected void reindexImpl() { + RetryingTransactionHelper retryingTransactionHelper = transactionService.getRetryingTransactionHelper(); if (!started) { + // Disable in-transaction indexing + if (disableInTransactionIndexing && nodeIndexer != null) + { + logger.warn("In-transaction indexing is being disabled."); + nodeIndexer.setEnabled(false); + } // Make sure that we start clean voids.clear(); previousTxnIds = new ArrayList(maxRecordSetSize); - lastMaxTxnId = Long.MAX_VALUE; // So that it is ignored at first - fromTimeInclusive = getStartingTxnCommitTime(); + lastMaxTxnId = null; // So that it is ignored at first + fromTimeInclusive = retryingTransactionHelper.doInTransaction(getStartingCommitTimeWork, true, true); started = true; } while (true) { - long toTimeExclusive = System.currentTimeMillis() - reindexLagMs; - - // Check that the voids haven't been filled - fromTimeInclusive = checkVoids(fromTimeInclusive); - - // get next transactions to index - List txns = getNextTransactions(fromTimeInclusive, toTimeExclusive, previousTxnIds); - - if (logger.isDebugEnabled()) - { - String msg = String.format( - "Reindexing %d transactions from %s (%s)", - txns.size(), - (new Date(fromTimeInclusive)).toString(), - txns.isEmpty() ? "---" : txns.get(0).getId().toString()); - logger.debug(msg); - } - - // Reindex the transactions. Voids between the last set of transactions and this - // set will be detected as well. Additionally, the last max transaction will be - // updated by this method. - reindexTransactions(txns); - - // Call the listener - synchronized (this) - { - if (listener != null) - { - listener.indexedTransactions(fromTimeInclusive, toTimeExclusive); - } - } - - // Move the time on. - // Note the subtraction here. Yes, it's odd. But the results of the getNextTransactions - // may be limited by recordset size and it is possible to have multiple transactions share - // the same commit time. If these txns get split up and we exclude the time period, then - // they won't be requeried. The list of previously used transaction IDs is passed back to - // be exluded from the next query. - fromTimeInclusive = toTimeExclusive - 1L; - previousTxnIds.clear(); - for (Transaction txn : txns) - { - previousTxnIds.add(txn.getId()); - } - - // Break out if there were no transactions processed - if (previousTxnIds.isEmpty()) - { - break; - } - - // break out if the VM is shutting down - if (isShuttingDown()) + Boolean repeat = retryingTransactionHelper.doInTransaction(reindexWork, true, true); + // Only break out if there isn't any more work to do (for now) + if (repeat == null || repeat.booleanValue() == false) { break; } } + // Wait for the asynchronous reindexing to complete + waitForAsynchronousReindexing(); + } + + /** + * @return Returns true if the reindex process can exit otherwise false if + * a new transaction should be created and the process kicked off again + */ + private boolean reindexInTransaction() + { + long toTimeExclusive = System.currentTimeMillis() - reindexLagMs; + + // Check that the voids haven't been filled + long minLiveVoidTime = checkVoids(); + if (minLiveVoidTime <= fromTimeInclusive) + { + // A void was discovered. + // We need to adjust the search time for transactions, i.e. hop back in time but + // this also entails a full build from that point on. So all previous transactions + // need to be reindexed. + fromTimeInclusive = minLiveVoidTime; + previousTxnIds.clear(); + } + + // get next transactions to index + List txns = getNextTransactions(fromTimeInclusive, toTimeExclusive, previousTxnIds); + + // If there are no transactions, then all the work is done + if (txns.size() == 0) + { + // We have caught up. + // There is no need to force reindexing until the next unindex transactions appear. + forceReindex = false; + return false; + } + + if (logger.isDebugEnabled()) + { + String msg = String.format( + "Reindexing %d transactions from %s (%s)", + txns.size(), + (new Date(fromTimeInclusive)).toString(), + txns.isEmpty() ? "---" : txns.get(0).getId().toString()); + logger.debug(msg); + } + + // Reindex the transactions. Voids between the last set of transactions and this + // set will be detected as well. Additionally, the last max transaction will be + // updated by this method. + long maxProcessedTxnCommitTime = reindexTransactions(txns); + + // Call the listener + synchronized (this) + { + if (listener != null) + { + listener.indexedTransactions(fromTimeInclusive, maxProcessedTxnCommitTime); + } + } + + // Move the time on. + // The next fromTimeInclusive may well pull back transactions that have just been + // processed. But we keep track of those and exclude them from the results. + if (fromTimeInclusive == maxProcessedTxnCommitTime) + { + // The time didn't advance. If no new transaction appear, we could spin on + // two or more transactions with the same commit time. So we DON'T clear + // the list of previous transactions and we allow them to live on. + } + else + { + // The processing time has moved on + fromTimeInclusive = maxProcessedTxnCommitTime; + previousTxnIds.clear(); + } + for (Transaction txn : txns) + { + previousTxnIds.add(txn.getId()); + } + + if (isShuttingDown()) + { + // break out if the VM is shutting down + return false; + } + else + { + // There is more work to do and we should be called back right away + return true; + } } + private static final long ONE_HOUR_MS = 3600*1000; /** * Find a transaction time to start indexing from (inclusive). The last recorded transaction by ID * is taken and the max transaction duration substracted from its commit time. A transaction is @@ -200,11 +308,34 @@ public class IndexTransactionTracker extends AbstractReindexComponent * or a transaction is found in the index. */ protected long getStartingTxnCommitTime() + { + long now = System.currentTimeMillis(); + // Get the last indexed transaction for all transactions + long lastIndexedAllCommitTimeMs = getLastIndexedCommitTime(now, false); + // Now check back from this time to make sure there are no remote transactions that weren't indexed + long lastIndexedRemoteCommitTimeMs = getLastIndexedCommitTime(now, true); + // The one to start at is the least of the two times + long startTime = Math.min(lastIndexedAllCommitTimeMs, lastIndexedRemoteCommitTimeMs); + // Done + return startTime; + } + /** + * Gets the commit time for the last indexed transaction. If there are no transactions, then the + * current time is returned. + * + * @param maxCommitTimeMs the largest commit time to consider + * @param remoteOnly true to only look at remotely-committed transactions + * @return Returns the last indexed transaction commit time for all or + * remote-only transactions. + */ + private long getLastIndexedCommitTime(long maxCommitTimeMs, boolean remoteOnly) { // Look back in time by the maximum transaction duration - long toTimeExclusive = System.currentTimeMillis() - maxTxnDurationMs; + long maxToTimeExclusive = maxCommitTimeMs - maxTxnDurationMs; + long toTimeExclusive = maxToTimeExclusive; long fromTimeInclusive = 0L; double stepFactor = 1.0D; + boolean firstWasInIndex = true; found: while (true) { @@ -213,7 +344,8 @@ found: 0L, toTimeExclusive, 1, - null); + null, + remoteOnly); // There are no transactions in that time range if (nextTransactions.size() == 0) { @@ -221,93 +353,119 @@ found: } // We found a transaction Transaction txn = nextTransactions.get(0); - Long txnId = txn.getId(); long txnCommitTime = txn.getCommitTimeMs(); // Check that it is in the index - InIndex txnInIndex = isTxnIdPresentInIndex(txnId); + InIndex txnInIndex = isTxnPresentInIndex(txn); switch (txnInIndex) { case YES: fromTimeInclusive = txnCommitTime; break found; default: - // Look further back in time. Step back by the maximum transaction duration and - // increase this step back by a factor of 10% each iteration. - toTimeExclusive = txnCommitTime - (long)(maxTxnDurationMs * stepFactor); + firstWasInIndex = false; + // Look further back in time. Step back by 60 seconds each time, increasing + // the step by 10% each iteration. + // Don't step back by more than a day + long decrement = Math.min(ONE_HOUR_MS, (long) (60000.0D * stepFactor)); + toTimeExclusive = txnCommitTime - decrement; stepFactor *= 1.1D; continue; } } - // We have a starting value - return fromTimeInclusive; + // If the last transaction (given the max txn duration) was in the index, then we used the + // maximum commit time i.e. the indexes were up to date up until the most recent time. + if (firstWasInIndex) + { + return maxToTimeExclusive; + } + else + { + return fromTimeInclusive; + } } + private static final int VOID_BATCH_SIZE = 100; /** * Voids - otherwise known as 'holes' - in the transaction sequence are timestamped when they are * discovered. This method discards voids that were timestamped before the given date. It checks * all remaining voids, passing back the transaction time for the newly-filled void. Otherwise * the value passed in is passed back. * - * @param fromTimeInclusive the oldest void to consider - * @return Returns an adjused start position based on any voids being filled + * @return Returns an adjused start position based on any voids being filled + * or Long.MAX_VALUE if no new voids were found */ - private long checkVoids(long fromTimeInclusive) + private long checkVoids() { long maxHistoricalTime = (fromTimeInclusive - maxTxnDurationMs); - long fromTimeAdjusted = fromTimeInclusive; + long fromTimeAdjusted = Long.MAX_VALUE; List toExpireTxnIds = new ArrayList(1); - // The voids are stored in a sorted map, sorted by the txn ID - for (Long voidTxnId : voids.keySet()) + Iterator voidTxnIdIterator = voids.keySet().iterator(); + List voidTxnIdBatch = new ArrayList(VOID_BATCH_SIZE); + + while (voidTxnIdIterator.hasNext()) { + Long voidTxnId = voidTxnIdIterator.next(); + // Add it to the batch + voidTxnIdBatch.add(voidTxnId); + // If the batch is full or if there are no more voids, fire the query + if (voidTxnIdBatch.size() == VOID_BATCH_SIZE || !voidTxnIdIterator.hasNext()) + { + List filledTxns = nodeDaoService.getTxnsByMinCommitTime(voidTxnIdBatch); + for (Transaction txn : filledTxns) + { + if (txn.getCommitTimeMs() == null) // Just coping with Hibernate mysteries + { + continue; + } + else if (isTxnPresentInIndex(txn) != InIndex.NO) + { + // It is in the index so expire it from the voids. + // This can happen if void was committed locally. + toExpireTxnIds.add(txn.getId()); + } + else + { + // It's not in the index so we have a timespamp from which to kick off + // It is a bone fide first transaction. A void has been filled. + long txnCommitTimeMs = txn.getCommitTimeMs().longValue(); + // If the value is lower than our current one we keep it + if (txnCommitTimeMs < fromTimeAdjusted) + { + fromTimeAdjusted = txnCommitTimeMs; + } + // The query selected them in timestamp order so there is no need to process + // the remaining transactions in this batch - we have our minimum. + break; + } + } + // Wipe the batch clean + voidTxnIdBatch.clear(); + } + // Check if the void must be expired or not TxnRecord voidTxnRecord = voids.get(voidTxnId); - // Is the transaction around, yet? - Transaction voidTxn = nodeDaoService.getTxnById(voidTxnId); - if (voidTxn == null) + if (voidTxnRecord.txnCommitTime < maxHistoricalTime) { - // It's still just a void. Shall we expire it? - if (voidTxnRecord.txnCommitTime < maxHistoricalTime) - { - // It's too late for this void - toExpireTxnIds.add(voidTxnId); - } - continue; - } - else if (voidTxn.getCommitTimeMs() == null) - { - // http://issues.alfresco.com/browse/AR-2041 - // An object was found, but sometimes it is still not fully formed. - // Perhaps it's the direct request by ID that gives back an uncommitted transaction. - // So this transaction is very likely to become live soon but we just leave it until it does. - // When the issue has been seen, there have not been any committed transactions with null commit times. - if (logger.isDebugEnabled()) - { - logger.debug("Void is visible but not live: " + voidTxn); - } - } - else - { - if (logger.isDebugEnabled()) - { - logger.debug("Void has become live: " + voidTxn); - } - // We found one that has become a real transaction. - // We don't throw the other voids away. - fromTimeAdjusted = voidTxn.getCommitTimeMs(); - // Break out as sequential rebuilding is required - break; + // It's too late for this void whether or not it has become live + toExpireTxnIds.add(voidTxnId); } } - // Throw away all the expired ones - for (Long toExpireTxnId : toExpireTxnIds) + // Throw away all the expired or removable voids + int voidCountBefore = voids.size(); + for (Long toRemoveTxnId : toExpireTxnIds) { - voids.remove(toExpireTxnId); - if (logger.isDebugEnabled()) - { - logger.debug("Void has expired: " + toExpireTxnId); - } + voids.remove(toRemoveTxnId); + } + int voidCountAfter = voids.size(); + if (logger.isDebugEnabled() && voidCountBefore != voidCountAfter) + { + logger.debug("Void count " + voidCountBefore + " -> " + voidCountAfter); } // Done + if (logger.isDebugEnabled() && fromTimeAdjusted < Long.MAX_VALUE) + { + logger.debug("Returning to void time " + fromTimeAdjusted); + } return fromTimeAdjusted; } @@ -317,7 +475,8 @@ found: fromTimeInclusive, toTimeExclusive, maxRecordSetSize, - previousTxnIds); + previousTxnIds, + false); // done return txns; } @@ -328,60 +487,52 @@ found: * of transaction IDs will be examined for any voids. These will be recorded. * * @param txns transactions ordered by time ascending - * @return returns the + * @return returns the commit time of the last transaction in the list + * @throws IllegalArgumentException if there are no transactions */ - private void reindexTransactions(List txns) + private long reindexTransactions(List txns) { if (txns.isEmpty()) { - return; + throw new IllegalArgumentException("There are no transactions to process"); } - Set processedTxnIds = new HashSet(13); - - boolean forceReindex = false; - long minNewTxnId = Long.MAX_VALUE; - long maxNewTxnId = Long.MIN_VALUE; - long maxNewTxnCommitTime = System.currentTimeMillis(); - for (Transaction txn : txns) + // Determines the window for void retention + long now = System.currentTimeMillis(); + long oldestVoidRetentionTime = (now - maxTxnDurationMs); + + // Keep an ordered map of IDs that we process along with their commit times + Map processedTxnRecords = new TreeMap(); + + List txnIdBuffer = new ArrayList(maxTransactionsPerLuceneCommit); + Iterator txnIterator = txns.iterator(); + while (txnIterator.hasNext()) { + Transaction txn = txnIterator.next(); Long txnId = txn.getId(); - long txnIdLong = txnId.longValue(); - if (txnIdLong < minNewTxnId) + Long txnCommitTimeMs = txn.getCommitTimeMs(); + if (txnCommitTimeMs == null) { - minNewTxnId = txnIdLong; + // What? But let's be cautious and treat this as a void + continue; } - if (txnIdLong > maxNewTxnId) - { - maxNewTxnId = txnIdLong; - maxNewTxnCommitTime = txn.getCommitTimeMs(); - } - // Keep track of it for void checking - processedTxnIds.add(txnId); + // Keep a record of it + TxnRecord processedTxnRecord = new TxnRecord(); + processedTxnRecord.txnCommitTime = txnCommitTimeMs; + processedTxnRecords.put(txnId, processedTxnRecord); // Remove this entry from the void list - it is not void voids.remove(txnId); // Reindex the transaction if we are forcing it or if it isn't in the index already - if (forceReindex || isTxnIdPresentInIndex(txnId) == InIndex.NO) + if (forceReindex || isTxnPresentInIndex(txn) == InIndex.NO) { - // Any indexing means that all the next transactions have to be indexed + // From this point on, until the tracker has caught up, all transactions need to be indexed forceReindex = true; - try + // Add the transaction to the buffer of transactions that need processing + txnIdBuffer.add(txnId); + if (logger.isDebugEnabled()) { - if (logger.isDebugEnabled()) - { - logger.debug("Reindexing transaction: " + txn); - } - // We try the reindex, but for the sake of continuity, have to let it run on - reindexTransaction(txnId); - } - catch (Throwable e) - { - logger.warn("\n" + - "Reindex of transaction failed: \n" + - " Transaction ID: " + txnId + "\n" + - " Error: " + e.getMessage(), - e); + logger.debug("Reindexing transaction: " + txn); } } else @@ -391,50 +542,82 @@ found: logger.debug("Reindex skipping transaction: " + txn); } } + + if (isShuttingDown()) + { + break; + } + // Flush the reindex buffer, if it is full or if we are on the last transaction and there are no more + if (txnIdBuffer.size() >= maxTransactionsPerLuceneCommit || (!txnIterator.hasNext() && txnIdBuffer.size() > 0)) + { + try + { + // We try the reindex, but for the sake of continuity, have to let it run on + reindexTransactionAsynchronously(txnIdBuffer); + } + catch (Throwable e) + { + logger.warn("\n" + + "Reindex of transactions failed: \n" + + " Transaction IDs: " + txnIdBuffer + "\n" + + " Error: " + e.getMessage(), + e); + } + // Clear the buffer + txnIdBuffer = new ArrayList(maxTransactionsPerLuceneCommit); + } } - // We have to search for voids now. Don't start at the min transaction, - // but start at the least of the lastMaxTxnId and minNewTxnId - long voidCheckStartTxnId = (lastMaxTxnId < minNewTxnId ? lastMaxTxnId : minNewTxnId) + 1; - long voidCheckEndTxnId = maxNewTxnId; - // Check for voids in new transactions - for (long i = voidCheckStartTxnId; i <= voidCheckEndTxnId; i++) + // Use the last ID from the previous iteration as our starting point + Long lastId = lastMaxTxnId; + long lastCommitTime = -1L; + // Walk the processed txn IDs + for (Map.Entry entry : processedTxnRecords.entrySet()) { - Long txnId = Long.valueOf(i); - if (processedTxnIds.contains(txnId)) + Long processedTxnId = entry.getKey(); + TxnRecord processedTxnRecord = entry.getValue(); + boolean voidsAreYoungEnough = processedTxnRecord.txnCommitTime >= oldestVoidRetentionTime; + if (lastId != null && voidsAreYoungEnough) { - // It is there - continue; - } - - // First make sure that it is a real void. Sometimes, transactions are in the table but don't - // fall within the commit time window that we queried. If they're in the DB AND in the index, - // then they're not really voids and don't need further checks. If they're missing from either, - // then they're voids and must be processed. - Transaction voidTxn = nodeDaoService.getTxnById(txnId); - if (voidTxn != null && isTxnIdPresentInIndex(txnId) != InIndex.NO) - { - // It is a real transaction (not a void) and is already in the index, so just ignore it. - continue; - } - - // Calculate an age for the void. We can't use the current time as that will mean we keep all - // discovered voids, even if they are very old. Rather, we use the commit time of the last transaction - // in the set as it represents the query time for this iteration. - TxnRecord voidRecord = new TxnRecord(); - voidRecord.txnCommitTime = maxNewTxnCommitTime; - voids.put(txnId, voidRecord); - if (logger.isDebugEnabled()) - { - logger.debug("Void detected: " + txnId); + int voidCount = 0; + // Iterate BETWEEN the last ID and the current one to find voids + // Only enter the loop if the current upper limit transaction is young enough to + // consider for voids. + for (long i = lastId.longValue() + 1; i < processedTxnId; i++) + { + // The voids are optimistically given the same transaction time as transaction with the + // largest ID. We only bother w + TxnRecord voidRecord = new TxnRecord(); + voidRecord.txnCommitTime = processedTxnRecord.txnCommitTime; + voids.put(new Long(i), voidRecord); + voidCount++; + } + if (logger.isDebugEnabled()&& voidCount > 0) + { + logger.debug("Voids detected: " + voidCount + " in range [" + lastId + ", " + processedTxnId + "]"); + } } + lastId = processedTxnId; + lastCommitTime = processedTxnRecord.txnCommitTime; } // Having searched for the nodes, we've recorded all the voids. So move the lastMaxTxnId up. - lastMaxTxnId = voidCheckEndTxnId; + lastMaxTxnId = lastId; + + // Done + return lastCommitTime; } private class TxnRecord { private long txnCommitTime; + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(128); + sb.append("TxnRecord") + .append("[time=").append(txnCommitTime <= 0 ? "---" : new Date(txnCommitTime)) + .append("]"); + return sb.toString(); + } } /** diff --git a/source/java/org/alfresco/repo/node/index/IndexTransactionTrackerTest.java b/source/java/org/alfresco/repo/node/index/IndexTransactionTrackerTest.java index 1c34f22be6..5126566412 100644 --- a/source/java/org/alfresco/repo/node/index/IndexTransactionTrackerTest.java +++ b/source/java/org/alfresco/repo/node/index/IndexTransactionTrackerTest.java @@ -16,6 +16,8 @@ */ package org.alfresco.repo.node.index; +import java.util.concurrent.ThreadPoolExecutor; + import junit.framework.TestCase; import org.alfresco.model.ContentModel; @@ -52,6 +54,7 @@ public class IndexTransactionTrackerTest extends TestCase private AuthenticationComponent authenticationComponent; private SearchService searchService; private NodeService nodeService; + private ThreadPoolExecutor threadPoolExecutor; private FileFolderService fileFolderService; private ContentStore contentStore; private FullTextSearchIndexer ftsIndexer; @@ -65,6 +68,7 @@ public class IndexTransactionTrackerTest extends TestCase ServiceRegistry serviceRegistry = (ServiceRegistry) ctx.getBean(ServiceRegistry.SERVICE_REGISTRY); searchService = serviceRegistry.getSearchService(); nodeService = serviceRegistry.getNodeService(); + threadPoolExecutor = (ThreadPoolExecutor) ctx.getBean("indexTrackerThreadPoolExecutor"); fileFolderService = serviceRegistry.getFileFolderService(); authenticationComponent = (AuthenticationComponent) ctx.getBean("authenticationComponent"); contentStore = (ContentStore) ctx.getBean("fileContentStore"); @@ -79,6 +83,7 @@ public class IndexTransactionTrackerTest extends TestCase indexTracker.setIndexer(indexer); indexTracker.setNodeDaoService(nodeDaoService); indexTracker.setNodeService(nodeService); + indexTracker.setThreadPoolExecutor(threadPoolExecutor); indexTracker.setSearcher(searchService); indexTracker.setTransactionService((TransactionServiceImpl)transactionService); @@ -118,7 +123,19 @@ public class IndexTransactionTrackerTest extends TestCase public synchronized void testStartup() throws Exception { - indexTracker.reindex(); - indexTracker.reindex(); + Thread reindexThread = new Thread() + { + public void run() + { + indexTracker.reindex(); + indexTracker.reindex(); + } + }; + reindexThread.setDaemon(true); + reindexThread.start(); + // wait a bit and then terminate + wait(20000); + indexTracker.setShutdown(true); + wait(20000); } } diff --git a/source/java/org/alfresco/repo/node/index/MissingContentReindexComponentTest.java b/source/java/org/alfresco/repo/node/index/MissingContentReindexComponentTest.java index 11ecb1641e..27e63e6f89 100644 --- a/source/java/org/alfresco/repo/node/index/MissingContentReindexComponentTest.java +++ b/source/java/org/alfresco/repo/node/index/MissingContentReindexComponentTest.java @@ -24,6 +24,8 @@ */ package org.alfresco.repo.node.index; +import java.util.concurrent.ThreadPoolExecutor; + import junit.framework.TestCase; import org.alfresco.model.ContentModel; @@ -64,6 +66,7 @@ public class MissingContentReindexComponentTest extends TestCase private AuthenticationComponent authenticationComponent; private SearchService searchService; private NodeService nodeService; + private ThreadPoolExecutor threadPoolExecutor; private FileFolderService fileFolderService; private ContentStore contentStore; private FullTextSearchIndexer ftsIndexer; @@ -76,6 +79,7 @@ public class MissingContentReindexComponentTest extends TestCase ServiceRegistry serviceRegistry = (ServiceRegistry) ctx.getBean(ServiceRegistry.SERVICE_REGISTRY); searchService = serviceRegistry.getSearchService(); nodeService = serviceRegistry.getNodeService(); + threadPoolExecutor = (ThreadPoolExecutor) ctx.getBean("indexTrackerThreadPoolExecutor"); fileFolderService = serviceRegistry.getFileFolderService(); authenticationComponent = (AuthenticationComponent) ctx.getBean("authenticationComponent"); contentStore = (ContentStore) ctx.getBean("fileContentStore"); @@ -90,6 +94,7 @@ public class MissingContentReindexComponentTest extends TestCase reindexer.setIndexer(indexer); reindexer.setNodeDaoService(nodeDaoService); reindexer.setNodeService(nodeService); + reindexer.setThreadPoolExecutor(threadPoolExecutor); reindexer.setSearcher(searchService); reindexer.setTransactionService((TransactionServiceImpl)transactionService); diff --git a/source/java/org/alfresco/repo/node/index/NodeIndexer.java b/source/java/org/alfresco/repo/node/index/NodeIndexer.java index 9f26c3a6ad..1169f0d6f1 100644 --- a/source/java/org/alfresco/repo/node/index/NodeIndexer.java +++ b/source/java/org/alfresco/repo/node/index/NodeIndexer.java @@ -52,6 +52,13 @@ public class NodeIndexer /** the component to index the node hierarchy */ private Indexer indexer; private TenantService tenantService; + /** enabled or disabled */ + private boolean enabled; + + public NodeIndexer() + { + enabled = true; + } /** * @param policyComponent used for registrations @@ -73,6 +80,11 @@ public class NodeIndexer { this.tenantService = tenantService; } + + /* package */ void setEnabled(boolean enabled) + { + this.enabled = enabled; + } /** * Registers the policy behaviour methods @@ -103,22 +115,31 @@ public class NodeIndexer public void onCreateNode(ChildAssociationRef childAssocRef) { - indexer.createNode(tenantService.getName(childAssocRef)); + if (enabled) + { + indexer.createNode(tenantService.getName(childAssocRef)); + } } public void onUpdateNode(NodeRef nodeRef) { - indexer.updateNode(tenantService.getName(nodeRef)); + if (enabled) + { + indexer.updateNode(tenantService.getName(nodeRef)); + } } public void onDeleteNode(ChildAssociationRef childAssocRef, boolean isArchivedNode) { - indexer.deleteNode(tenantService.getName(childAssocRef)); + if (enabled) + { + indexer.deleteNode(tenantService.getName(childAssocRef)); + } } public void onCreateChildAssociation(ChildAssociationRef childAssocRef, boolean isNew) { - if (!isNew) + if (!isNew && enabled) { indexer.createChildRelationship(tenantService.getName(childAssocRef)); } @@ -126,6 +147,9 @@ public class NodeIndexer public void onDeleteChildAssociation(ChildAssociationRef childAssocRef) { - indexer.deleteChildRelationship(tenantService.getName(childAssocRef)); + if (enabled) + { + indexer.deleteChildRelationship(tenantService.getName(childAssocRef)); + } } } diff --git a/source/java/org/alfresco/repo/rule/RuleTestSuite.java b/source/java/org/alfresco/repo/rule/RuleTestSuite.java index 941b78359a..8357b40c28 100644 --- a/source/java/org/alfresco/repo/rule/RuleTestSuite.java +++ b/source/java/org/alfresco/repo/rule/RuleTestSuite.java @@ -24,11 +24,11 @@ */ package org.alfresco.repo.rule; -import org.alfresco.repo.rule.ruletrigger.RuleTriggerTest; - import junit.framework.Test; import junit.framework.TestSuite; +import org.alfresco.repo.rule.ruletrigger.RuleTriggerTest; + /** * Version test suite diff --git a/source/java/org/alfresco/repo/rule/RuleTransactionListener.java b/source/java/org/alfresco/repo/rule/RuleTransactionListener.java index f728ce464e..0c631f3835 100644 --- a/source/java/org/alfresco/repo/rule/RuleTransactionListener.java +++ b/source/java/org/alfresco/repo/rule/RuleTransactionListener.java @@ -24,7 +24,7 @@ */ package org.alfresco.repo.rule; -import org.alfresco.repo.transaction.TransactionListener; +import org.alfresco.repo.transaction.TransactionListenerAdapter; import org.alfresco.util.GUID; /** @@ -32,7 +32,7 @@ import org.alfresco.util.GUID; * * @author Roy Wetherall */ -public class RuleTransactionListener implements TransactionListener +public class RuleTransactionListener extends TransactionListenerAdapter { /** * Id used in equals and hash @@ -54,42 +54,15 @@ public class RuleTransactionListener implements TransactionListener this.ruleService = ruleService; } - /** - * @see org.alfresco.repo.transaction.TransactionListener#flush() - */ - public void flush() - { - } - /** * @see org.alfresco.repo.transaction.TransactionListener#beforeCommit(boolean) */ + @Override public void beforeCommit(boolean readOnly) { this.ruleService.executePendingRules(); } - /** - * @see org.alfresco.repo.transaction.TransactionListener#beforeCompletion() - */ - public void beforeCompletion() - { - } - - /** - * @see org.alfresco.repo.transaction.TransactionListener#afterCommit() - */ - public void afterCommit() - { - } - - /** - * @see org.alfresco.repo.transaction.TransactionListener#afterRollback() - */ - public void afterRollback() - { - } - /** * @see java.lang.Object#hashCode() */ diff --git a/source/java/org/alfresco/repo/search/impl/lucene/AVMLuceneIndexerImpl.java b/source/java/org/alfresco/repo/search/impl/lucene/AVMLuceneIndexerImpl.java index 1c858b72b0..f22db494ab 100644 --- a/source/java/org/alfresco/repo/search/impl/lucene/AVMLuceneIndexerImpl.java +++ b/source/java/org/alfresco/repo/search/impl/lucene/AVMLuceneIndexerImpl.java @@ -254,11 +254,9 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl impl } catch (AVMSyncException e) { - s_logger.warn("\n" + - "Unable to generate change list for synchronous indexing: \n" + - " Store: " + store + "\n" + - " Start version: " + srcVersion + "\n" + - " End version: " + endVersion); + s_logger.warn("\n" + + "Unable to generate change list for synchronous indexing: \n" + " Store: " + store + "\n" + " Start version: " + srcVersion + "\n" + + " End version: " + endVersion); return; } for (AVMDifference difference : changeList) @@ -1420,11 +1418,9 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl impl TermEnum terms = null; try { - terms = reader.terms(); - - if (terms.skipTo(new Term("ID", prefix))) + terms = reader.terms(new Term("ID", prefix)); + if (terms.term() != null) { - do { Term term = terms.term(); @@ -1459,7 +1455,6 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl impl } while (terms.next()); } - } finally { @@ -1531,9 +1526,8 @@ public class AVMLuceneIndexerImpl extends AbstractLuceneIndexerImpl impl TermEnum terms = null; try { - terms = reader.terms(); - - if (terms.skipTo(new Term("ID", prefix))) + terms = reader.terms(new Term("ID", prefix)); + if (terms.term() != null) { do { diff --git a/source/java/org/alfresco/repo/security/person/PersonServiceImpl.java b/source/java/org/alfresco/repo/security/person/PersonServiceImpl.java index 458dceccf0..138c6b7249 100644 --- a/source/java/org/alfresco/repo/security/person/PersonServiceImpl.java +++ b/source/java/org/alfresco/repo/security/person/PersonServiceImpl.java @@ -41,6 +41,10 @@ import org.alfresco.repo.policy.JavaBehaviour; import org.alfresco.repo.policy.PolicyComponent; import org.alfresco.repo.security.permissions.PermissionServiceSPI; import org.alfresco.repo.tenant.TenantService; +import org.alfresco.repo.transaction.AlfrescoTransactionSupport; +import org.alfresco.repo.transaction.TransactionListenerAdapter; +import org.alfresco.repo.transaction.AlfrescoTransactionSupport.TxnReadState; +import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.cmr.dictionary.DictionaryService; import org.alfresco.service.cmr.repository.ChildAssociationRef; import org.alfresco.service.cmr.repository.NodeRef; @@ -57,12 +61,15 @@ import org.alfresco.service.cmr.security.PersonService; import org.alfresco.service.namespace.NamespacePrefixResolver; import org.alfresco.service.namespace.NamespaceService; import org.alfresco.service.namespace.QName; +import org.alfresco.service.transaction.TransactionService; import org.alfresco.util.GUID; +import org.alfresco.util.PropertyCheck; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -public class PersonServiceImpl implements PersonService, - NodeServicePolicies.OnCreateNodePolicy, NodeServicePolicies.BeforeDeleteNodePolicy +public class PersonServiceImpl + extends TransactionListenerAdapter + implements PersonService, NodeServicePolicies.OnCreateNodePolicy, NodeServicePolicies.BeforeDeleteNodePolicy { private static Log s_logger = LogFactory.getLog(PersonServiceImpl.class); @@ -80,6 +87,8 @@ public class PersonServiceImpl implements PersonService, private StoreRef storeRef; + private TransactionService transactionService; + private NodeService nodeService; private TenantService tenantService; @@ -126,12 +135,33 @@ public class PersonServiceImpl implements PersonService, props.add(ContentModel.PROP_ORGID); mutableProperties = Collections.unmodifiableSet(props); } + + @Override + public boolean equals(Object obj) + { + return this == obj; + } + @Override + public int hashCode() + { + return 1; + } /** * Spring bean init method */ public void init() { + PropertyCheck.mandatory(this, "storeUrl", storeRef); + PropertyCheck.mandatory(this, "transactionService", transactionService); + PropertyCheck.mandatory(this, "nodeService", nodeService); + PropertyCheck.mandatory(this, "searchService", searchService); + PropertyCheck.mandatory(this, "permissionServiceSPI", permissionServiceSPI); + PropertyCheck.mandatory(this, "authorityService", authorityService); + PropertyCheck.mandatory(this, "namespacePrefixResolver", namespacePrefixResolver); + PropertyCheck.mandatory(this, "policyComponent", policyComponent); + PropertyCheck.mandatory(this, "personCache", personCache); + this.policyComponent.bindClassBehaviour( QName.createQName(NamespaceService.ALFRESCO_URI, "onCreateNode"), ContentModel.TYPE_PERSON, @@ -202,8 +232,10 @@ public class PersonServiceImpl implements PersonService, NodeRef personNode = getPersonOrNull(userName); if (personNode == null) { - if (createMissingPeople()) + TxnReadState txnReadState = AlfrescoTransactionSupport.getTransactionReadState(); + if (createMissingPeople() && txnReadState == TxnReadState.TXN_READ_WRITE) { + // We create missing people AND are in a read-write txn return createMissingPerson(userName); } else @@ -289,11 +321,7 @@ public class PersonServiceImpl implements PersonService, rs.close(); } } - if (singleton) - { - returnRef = returnRef; - } - else + if (!singleton) { returnRef = handleDuplicates(searchUserName); } @@ -303,29 +331,12 @@ public class PersonServiceImpl implements PersonService, } return returnRef; } - private NodeRef handleDuplicates(String searchUserName) { if (processDuplicates) { NodeRef best = findBest(searchUserName); - if (duplicateMode.equalsIgnoreCase(SPLIT)) - { - split(searchUserName, best); - s_logger.info("Split duplicate person objects for uid " + searchUserName); - } - else if (duplicateMode.equalsIgnoreCase(DELETE)) - { - delete(searchUserName, best); - s_logger.info("Deleted duplicate person objects for uid " + searchUserName); - } - else - { - if (s_logger.isDebugEnabled()) - { - s_logger.debug("Duplicate person objects exist for uid " + searchUserName); - } - } + addDuplicateUserNameToHandle(searchUserName, best); return best; } else @@ -343,6 +354,74 @@ public class PersonServiceImpl implements PersonService, } } + private static final String KEY_POST_TXN_DUPLICATES = "PersonServiceImpl.KEY_POST_TXN_DUPLICATES"; + /** + * Get the txn-bound usernames that need cleaning up + */ + private Map getPostTxnDuplicates() + { + @SuppressWarnings("unchecked") + Map postTxnDuplicates = (Map) AlfrescoTransactionSupport.getResource(KEY_POST_TXN_DUPLICATES); + if (postTxnDuplicates == null) + { + postTxnDuplicates = new HashMap(7); + AlfrescoTransactionSupport.bindResource(KEY_POST_TXN_DUPLICATES, postTxnDuplicates); + } + return postTxnDuplicates; + } + /** + * Flag a username for cleanup after the transaction. + */ + private void addDuplicateUserNameToHandle(String searchUserName, NodeRef best) + { + // Firstly, bind this service to the transaction + AlfrescoTransactionSupport.bindListener(this); + // Now get the post txn duplicate list + Map postTxnDuplicates = getPostTxnDuplicates(); + postTxnDuplicates.put(searchUserName, best); + } + /** + * Process clean up any duplicates that were flagged during the transaction. + */ + @Override + public void afterCommit() + { + // Get the duplicates in a form that can be read by the transaction work anonymous instance + final Map postTxnDuplicates = getPostTxnDuplicates(); + + RetryingTransactionCallback processDuplicateWork = new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + for (Map.Entry entry : postTxnDuplicates.entrySet()) + { + String username = entry.getKey(); + NodeRef best = entry.getValue(); + if (duplicateMode.equalsIgnoreCase(SPLIT)) + { + split(username, best); + s_logger.info("Split duplicate person objects for uid " + username); + } + else if (duplicateMode.equalsIgnoreCase(DELETE)) + { + delete(username, best); + s_logger.info("Deleted duplicate person objects for uid " + username); + } + else + { + if (s_logger.isDebugEnabled()) + { + s_logger.debug("Duplicate person objects exist for uid " + username); + } + } + } + // Done + return null; + } + }; + transactionService.getRetryingTransactionHelper().doInTransaction(processDuplicateWork, false, true); + } + private void delete(String searchUserName, NodeRef best) { SearchParameters sp = new SearchParameters(); @@ -799,6 +878,11 @@ public class PersonServiceImpl implements PersonService, this.permissionServiceSPI = permissionServiceSPI; } + public void setTransactionService(TransactionService transactionService) + { + this.transactionService = transactionService; + } + public void setNodeService(NodeService nodeService) { this.nodeService = nodeService; diff --git a/source/java/org/alfresco/repo/security/person/PersonTest.java b/source/java/org/alfresco/repo/security/person/PersonTest.java index 3c5fe5fe0f..4762173f0c 100644 --- a/source/java/org/alfresco/repo/security/person/PersonTest.java +++ b/source/java/org/alfresco/repo/security/person/PersonTest.java @@ -27,20 +27,27 @@ package org.alfresco.repo.security.person; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.alfresco.model.ContentModel; +import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.repository.NodeService; import org.alfresco.service.cmr.repository.StoreRef; import org.alfresco.service.cmr.repository.datatype.DefaultTypeConverter; +import org.alfresco.service.cmr.security.NoSuchPersonException; import org.alfresco.service.cmr.security.PersonService; import org.alfresco.service.namespace.QName; +import org.alfresco.service.transaction.TransactionService; import org.alfresco.util.BaseSpringTest; import org.alfresco.util.EqualsHelper; +import org.alfresco.util.GUID; public class PersonTest extends BaseSpringTest { - + private TransactionService transactionService; + private PersonService personService; private NodeService nodeService; @@ -55,6 +62,7 @@ public class PersonTest extends BaseSpringTest protected void onSetUpInTransaction() throws Exception { + transactionService = (TransactionService) applicationContext.getBean("transactionService"); personService = (PersonService) applicationContext.getBean("personService"); nodeService = (NodeService) applicationContext.getBean("nodeService"); @@ -66,6 +74,7 @@ public class PersonTest extends BaseSpringTest nodeService.deleteNode(nodeRef); } + personService.setCreateMissingPeople(true); } protected void onTearDownInTransaction() throws Exception @@ -409,5 +418,143 @@ public class PersonTest extends BaseSpringTest } personService.getPerson("Derek"); } + + public void testReadOnlyTransactionHandling() throws Exception + { + // Kill the annoying Spring-managed txn + super.setComplete(); + super.endTransaction(); + + boolean createMissingPeople = personService.createMissingPeople(); + assertTrue("Default should be to create missing people", createMissingPeople); + + final String username = "Derek"; + // Make sure that the person is missing + RetryingTransactionCallback deletePersonWork = new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + personService.deletePerson(username); + return null; + } + }; + transactionService.getRetryingTransactionHelper().doInTransaction(deletePersonWork, false, true); + // Make a read-only transaction and check that we get NoSuchPersonException + RetryingTransactionCallback getMissingPersonWork = new RetryingTransactionCallback() + { + public NodeRef execute() throws Throwable + { + return personService.getPerson(username); + } + }; + try + { + transactionService.getRetryingTransactionHelper().doInTransaction(getMissingPersonWork, true, true); + fail("Expected auto-creation of person to fail gracefully"); + } + catch (NoSuchPersonException e) + { + // Expected + } + // It should work in a write transaction, though + transactionService.getRetryingTransactionHelper().doInTransaction(getMissingPersonWork, false, true); + } + public void testSplitPersonCleanup() throws Exception + { + // Kill the annoying Spring-managed txn + super.setComplete(); + super.endTransaction(); + + boolean createMissingPeople = personService.createMissingPeople(); + assertTrue("Default should be to create missing people", createMissingPeople); + + PersonServiceImpl personServiceImpl = (PersonServiceImpl) personService; + personServiceImpl.setDuplicateMode("LEAVE"); + + // The user to duplicate + final String duplicateUsername = GUID.generate(); + // Make sure that the person is missing + RetryingTransactionCallback deletePersonWork = new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + personService.deletePerson(duplicateUsername); + return null; + } + }; + transactionService.getRetryingTransactionHelper().doInTransaction(deletePersonWork, false, true); + // Fire off 10 threads to create the same person + int threadCount = 10; + final CountDownLatch startLatch = new CountDownLatch(threadCount); + final CountDownLatch endLatch = new CountDownLatch(threadCount); + final Map cleanableNodeRefs = new HashMap(17); + Runnable createPersonRunnable = new Runnable() + { + public void run() + { + final RetryingTransactionCallback createPersonWork = new RetryingTransactionCallback() + { + public NodeRef execute() throws Throwable + { + // Wait for the trigger to start + try { startLatch.await(); } catch (InterruptedException e) {} + + // Trigger + NodeRef personNodeRef = personService.getPerson(duplicateUsername); + return personNodeRef; + } + }; + startLatch.countDown(); + try + { + NodeRef nodeRef = transactionService.getRetryingTransactionHelper().doInTransaction(createPersonWork, false, true); + // Store the noderef for later checking + String threadName = Thread.currentThread().getName(); + cleanableNodeRefs.put(threadName, nodeRef); + } + catch (Throwable e) + { + // Errrm + e.printStackTrace(); + } + endLatch.countDown(); + } + }; + // Fire the threads + for (int i = 0; i < threadCount; i++) + { + Thread thread = new Thread(createPersonRunnable); + thread.setName(getName() + "-" + i); + thread.setDaemon(true); + thread.start(); + } + // Wait for the threads to have finished + try { endLatch.await(60, TimeUnit.SECONDS); } catch (InterruptedException e) {} + + // Now, get the user with full split person handling + personServiceImpl.setDuplicateMode("DELETE"); + + RetryingTransactionCallback getPersonWork = new RetryingTransactionCallback() + { + public NodeRef execute() throws Throwable + { + return personService.getPerson(duplicateUsername); + } + }; + NodeRef remainingNodeRef = transactionService.getRetryingTransactionHelper().doInTransaction(getPersonWork, false, true); + // Should all be cleaned up now, but no way to check + for (NodeRef nodeRef : cleanableNodeRefs.values()) + { + if (nodeRef.equals(remainingNodeRef)) + { + // This one should still be around + continue; + } + if (nodeService.exists(nodeRef)) + { + fail("Expected unused person noderef to have been cleaned up: " + nodeRef); + } + } + } } diff --git a/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupport.java b/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupport.java index eaa357f2c0..bfba82d915 100644 --- a/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupport.java +++ b/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupport.java @@ -134,6 +134,42 @@ public abstract class AlfrescoTransactionSupport } } + /** + * + * @author Derek Hulley + * @since 2.1.4 + */ + public static enum TxnReadState + { + /** No transaction is active */ + TXN_NONE, + /** The current transaction is read-only */ + TXN_READ_ONLY, + /** The current transaction supports writes */ + TXN_READ_WRITE + } + + /** + * @return Returns the read-write state of the current transaction + * @since 2.1.4 + */ + public static TxnReadState getTransactionReadState() + { + if (!TransactionSynchronizationManager.isSynchronizationActive()) + { + return TxnReadState.TXN_NONE; + } + // Find the read-write state of the txn + if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) + { + return TxnReadState.TXN_READ_ONLY; + } + else + { + return TxnReadState.TXN_READ_WRITE; + } + } + /** * Are there any pending changes which must be synchronized with the store? * @@ -710,26 +746,6 @@ public abstract class AlfrescoTransactionSupport logger.debug("After completion (" + statusStr + "): " + this); } - // commit/rollback Lucene - for (LuceneIndexerAndSearcher lucene : lucenes) - { - try - { - if (status == TransactionSynchronization.STATUS_COMMITTED) - { - lucene.commit(); - } - else - { - lucene.rollback(); - } - } - catch (RuntimeException e) - { - logger.error("After completion (" + statusStr + ") Lucene exception", e); - } - } - List iterableListeners = getListenersIterable(); // notify listeners if (status == TransactionSynchronization.STATUS_COMMITTED) @@ -765,6 +781,46 @@ public abstract class AlfrescoTransactionSupport } } + // commit/rollback Lucene + for (LuceneIndexerAndSearcher lucene : lucenes) + { + try + { + if (status == TransactionSynchronization.STATUS_COMMITTED) + { + lucene.commit(); + } + else + { + lucene.rollback(); + } + } + catch (RuntimeException e) + { + logger.error("After completion (" + statusStr + ") Lucene exception", e); + } + } + + // Clean up the transactional caches + for (TransactionalCache cache : transactionalCaches) + { + try + { + if (status == TransactionSynchronization.STATUS_COMMITTED) + { + cache.afterCommit(); + } + else + { + cache.afterRollback(); + } + } + catch (RuntimeException e) + { + logger.error("After completion (" + statusStr + ") TransactionalCache exception", e); + } + } + // clear the thread's registrations and synchronizations AlfrescoTransactionSupport.clearSynchronization(); } diff --git a/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupportTest.java b/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupportTest.java index adf84cb4b1..68ca4c6074 100644 --- a/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupportTest.java +++ b/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupportTest.java @@ -31,6 +31,7 @@ import javax.transaction.UserTransaction; import junit.framework.TestCase; +import org.alfresco.repo.transaction.AlfrescoTransactionSupport.TxnReadState; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.ServiceRegistry; import org.alfresco.service.transaction.TransactionService; @@ -236,4 +237,25 @@ public class AlfrescoTransactionSupportTest extends TestCase // make sure that the binding all worked assertTrue("Expected callbacks not all processed: " + testList, testList.size() == 0); } + + public void testReadWriteStateRetrieval() throws Exception + { + RetryingTransactionCallback getReadStateWork = new RetryingTransactionCallback() + { + public TxnReadState execute() throws Exception + { + return AlfrescoTransactionSupport.getTransactionReadState(); + } + }; + + // Check TXN_NONE + TxnReadState checkTxnReadState = AlfrescoTransactionSupport.getTransactionReadState(); + assertEquals("Expected 'no transaction'", TxnReadState.TXN_NONE, checkTxnReadState); + // Check TXN_READ_ONLY + checkTxnReadState = transactionService.getRetryingTransactionHelper().doInTransaction(getReadStateWork, true); + assertEquals("Expected 'read-only transaction'", TxnReadState.TXN_READ_ONLY, checkTxnReadState); + // check TXN_READ_WRITE + checkTxnReadState = transactionService.getRetryingTransactionHelper().doInTransaction(getReadStateWork, false); + assertEquals("Expected 'read-write transaction'", TxnReadState.TXN_READ_WRITE, checkTxnReadState); + } } diff --git a/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java b/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java index e5f9fce950..a61e667157 100644 --- a/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java +++ b/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java @@ -54,8 +54,21 @@ import org.springframework.jdbc.UncategorizedSQLException; * A helper that runs a unit of work inside a UserTransaction, * transparently retrying the unit of work if the cause of * failure is an optimistic locking or deadlock condition. + *

+ * Defaults: + *

    + *
  • maxRetries: 20
  • + *
  • minRetryWaitMs: 100
  • + *
  • maxRetryWaitMs: 2000
  • + *
  • retryWaitIncrementMs: 100
  • + *
+ *

+ * To get details of 'why' transactions are retried use the following log level:
+ * Summary: log4j.logger.org.alfresco.repo.transaction.RetryingTransactionHelper=INFO
+ * Details: log4j.logger.org.alfresco.repo.transaction.RetryingTransactionHelper=DEBUG
+ * * - * @author britt + * @author Derek Hulley */ public class RetryingTransactionHelper { @@ -90,16 +103,20 @@ public class RetryingTransactionHelper */ private TransactionService txnService; - /** - * The maximum number of retries. -1 for infinity. - */ + /** The maximum number of retries. -1 for infinity. */ private int maxRetries; - + /** The minimum time to wait between retries. */ + private int minRetryWaitMs; + /** The maximum time to wait between retries. */ + private int maxRetryWaitMs; + /** How much to increase the wait time with each retry. */ + private int retryWaitIncrementMs; + /** * Whether the the transactions may only be reads */ private boolean readOnly; - + /** * Random number generator for retry delays. */ @@ -126,6 +143,10 @@ public class RetryingTransactionHelper public RetryingTransactionHelper() { this.random = new Random(System.currentTimeMillis()); + this.maxRetries = 20; + this.minRetryWaitMs = 100; + this.maxRetryWaitMs = 2000; + this.retryWaitIncrementMs = 100; } // Setters. @@ -145,6 +166,21 @@ public class RetryingTransactionHelper this.maxRetries = maxRetries; } + public void setMinRetryWaitMs(int minRetryWaitMs) + { + this.minRetryWaitMs = minRetryWaitMs; + } + + public void setMaxRetryWaitMs(int maxRetryWaitMs) + { + this.maxRetryWaitMs = maxRetryWaitMs; + } + + public void setRetryWaitIncrementMs(int retryWaitIncrementMs) + { + this.retryWaitIncrementMs = retryWaitIncrementMs; + } + /** * Set whether this helper only supports read transactions. */ @@ -332,9 +368,22 @@ public class RetryingTransactionHelper { // Sleep a random amount of time before retrying. // The sleep interval increases with the number of retries. + int sleepIntervalRandom = count > 0 ? random.nextInt(count * retryWaitIncrementMs) : minRetryWaitMs; + int sleepInterval = Math.min(maxRetryWaitMs, sleepIntervalRandom); + sleepInterval = Math.max(sleepInterval, minRetryWaitMs); + if (logger.isInfoEnabled() && !logger.isDebugEnabled()) + { + String msg = String.format( + "Retrying %s: count %2d; wait: %1.1fs; msg: \"%s\"; exception: (%s)", + Thread.currentThread().getName(), + count, (double)sleepInterval/1000D, + retryCause.getMessage(), + retryCause.getClass().getName()); + logger.info(msg); + } try { - Thread.sleep(random.nextInt(500 * count + 500)); + Thread.sleep(sleepInterval); } catch (InterruptedException ie) { diff --git a/source/java/org/alfresco/repo/transaction/TransactionServiceImpl.java b/source/java/org/alfresco/repo/transaction/TransactionServiceImpl.java index 33925af7e0..a41bbe7841 100644 --- a/source/java/org/alfresco/repo/transaction/TransactionServiceImpl.java +++ b/source/java/org/alfresco/repo/transaction/TransactionServiceImpl.java @@ -33,14 +33,19 @@ import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; /** - * Default implementation of Transaction Service + * Default implementation of Transaction Service. + *

+ * Default retry behaviour: see {@link RetryingTransactionHelper#RetryingTransactionHelper()} * * @author David Caruana */ public class TransactionServiceImpl implements TransactionService { private PlatformTransactionManager transactionManager; - private int maxRetries = 20; + private int maxRetries = -1; + private int minRetryWaitMs = -1; + private int maxRetryWaitMs = -1; + private int retryWaitIncrementMs = -1; // SysAdmin cache - used to cluster certain JMX operations private SimpleCache sysAdminCache; @@ -79,16 +84,37 @@ public class TransactionServiceImpl implements TransactionService } /** - * Set the maximum number of retries that will be done by the - * {@link RetryingTransactionHelper transaction helper}. - * - * @param maxRetries the maximum transaction retries + * @see RetryingTransactionHelper#setMaxRetries(int) */ public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } + /** + * @see RetryingTransactionHelper#setMinRetryWaitMs(int) + */ + public void setMinRetryWaitMs(int minRetryWaitMs) + { + this.minRetryWaitMs = minRetryWaitMs; + } + + /** + * @see RetryingTransactionHelper#setMaxRetryWaitMs(int) + */ + public void setMaxRetryWaitMs(int maxRetryWaitMs) + { + this.maxRetryWaitMs = maxRetryWaitMs; + } + + /** + * @see RetryingTransactionHelper#setRetryWaitIncrementMs(int) + */ + public void setRetryWaitIncrementMs(int retryWaitIncrementMs) + { + this.retryWaitIncrementMs = retryWaitIncrementMs; + } + /** * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_REQUIRED */ @@ -146,14 +172,30 @@ public class TransactionServiceImpl implements TransactionService } /** - * Creates a new helper instance. It can be reused. + * Creates a new helper instance. It can be reused or customized by the client code: + * each instance is new and initialized afresh. */ public RetryingTransactionHelper getRetryingTransactionHelper() { RetryingTransactionHelper helper = new RetryingTransactionHelper(); - helper.setMaxRetries(maxRetries); helper.setTransactionService(this); helper.setReadOnly(isReadOnly()); + if (maxRetries >= 0) + { + helper.setMaxRetries(maxRetries); + } + if (minRetryWaitMs > 0) + { + helper.setMinRetryWaitMs(minRetryWaitMs); + } + if (maxRetryWaitMs > 0) + { + helper.setMaxRetryWaitMs(maxRetryWaitMs); + } + if (retryWaitIncrementMs > 0) + { + helper.setRetryWaitIncrementMs(retryWaitIncrementMs); + } return helper; } } diff --git a/source/java/org/alfresco/service/cmr/repository/StoreRef.java b/source/java/org/alfresco/service/cmr/repository/StoreRef.java index 11bec38b52..9b9a95dd5c 100644 --- a/source/java/org/alfresco/service/cmr/repository/StoreRef.java +++ b/source/java/org/alfresco/service/cmr/repository/StoreRef.java @@ -38,8 +38,12 @@ public final class StoreRef implements EntityRef, Serializable private static final long serialVersionUID = 3905808565129394486L; public static final String PROTOCOL_WORKSPACE = "workspace"; + public static final String PROTOCOL_ARCHIVE = "archive"; public static final String PROTOCOL_AVM = "avm"; + public static final StoreRef STORE_REF_WORKSPACE_SPACESSTORE = new StoreRef(PROTOCOL_WORKSPACE, "SpacesStore"); + public static final StoreRef STORE_REF_ARCHIVE_SPACESSTORE = new StoreRef(PROTOCOL_ARCHIVE, "SpacesStore"); + public static final String URI_FILLER = "://"; private final String protocol; diff --git a/source/java/org/alfresco/util/ThreadPoolExecutorFactoryBean.java b/source/java/org/alfresco/util/ThreadPoolExecutorFactoryBean.java index 675fdd436f..88d982ce2a 100644 --- a/source/java/org/alfresco/util/ThreadPoolExecutorFactoryBean.java +++ b/source/java/org/alfresco/util/ThreadPoolExecutorFactoryBean.java @@ -51,11 +51,11 @@ import org.springframework.beans.factory.InitializingBean; *

  • {@link #setKeepAliveTime(int) keepAliveTime}: * 90 seconds
  • *
  • {@link #setThreadPriority(int) threadPriority}: - * 1 (LOWEST)
  • + * 5 (NORM) *
  • {@link #setThreadDaemon(boolean) threadDaemon}: * true
  • - *
  • {@link #setWorkQueue(BlockingQueue) workQueue}: - * An unbounded LinkedBlockingQueue
  • + *
  • {@link #setWorkQueueSize(int) workQueueSize}: + * -1 or less (No upper bound)
  • *
  • {@link #setRejectedExecutionHandler(RejectedExecutionHandler) rejectedExecutionHandler: * ThreadPoolExecutor.CallerRunsPolicy
  • * @@ -67,9 +67,9 @@ public class ThreadPoolExecutorFactoryBean implements FactoryBean, InitializingB private static final int DEFAULT_CORE_POOL_SIZE = 20; private static final int DEFAULT_MAXIMUM_POOL_SIZE = -1; // -1 is a sign that it must match the core pool size private static final int DEFAULT_KEEP_ALIVE_TIME = 90; // seconds - private static final int DEFAULT_THREAD_PRIORITY = Thread.MIN_PRIORITY; + private static final int DEFAULT_THREAD_PRIORITY = Thread.NORM_PRIORITY; private static final boolean DEFAULT_THREAD_DAEMON = Boolean.TRUE; - private static final BlockingQueue DEFAULT_WORK_QUEUE = new LinkedBlockingQueue(); + private static final int DEFAULT_WORK_QUEUE_SIZE = -1; private static final RejectedExecutionHandler DEFAULT_REJECTED_EXECUTION_HANDLER = new ThreadPoolExecutor.CallerRunsPolicy(); private int corePoolSize; @@ -77,7 +77,7 @@ public class ThreadPoolExecutorFactoryBean implements FactoryBean, InitializingB private int keepAliveTime; private int threadPriority; private boolean threadDaemon; - private BlockingQueue workQueue; + private int workQueueSize; private RejectedExecutionHandler rejectedExecutionHandler; /** the instance that will be given out by the factory */ private ThreadPoolExecutor instance; @@ -92,7 +92,7 @@ public class ThreadPoolExecutorFactoryBean implements FactoryBean, InitializingB keepAliveTime = DEFAULT_KEEP_ALIVE_TIME; threadPriority = DEFAULT_THREAD_PRIORITY; threadDaemon = DEFAULT_THREAD_DAEMON; - workQueue = DEFAULT_WORK_QUEUE; + workQueueSize = DEFAULT_WORK_QUEUE_SIZE; rejectedExecutionHandler = DEFAULT_REJECTED_EXECUTION_HANDLER; } @@ -148,13 +148,15 @@ public class ThreadPoolExecutorFactoryBean implements FactoryBean, InitializingB } /** - * The optional queue instance to use + * The maximum number of queued work instances to keep before blocking + * against further adds. * - * @param workQueue optional queue implementation + * @param size the queue size before blocks, or -1 default + * to indicate no upper bound */ - public void setWorkQueue(BlockingQueue workQueue) + public void setWorkQueueSize(int workQueueSize) { - this.workQueue = workQueue; + this.workQueueSize = workQueueSize; } /** @@ -181,6 +183,12 @@ public class ThreadPoolExecutorFactoryBean implements FactoryBean, InitializingB threadFactory.setThreadDaemon(threadDaemon); threadFactory.setThreadPriority(threadPriority); + if (workQueueSize < 0) + { + workQueueSize = Integer.MAX_VALUE; + } + BlockingQueue workQueue = new LinkedBlockingQueue(workQueueSize); + // construct the instance instance = new ThreadPoolExecutor( corePoolSize, @@ -197,7 +205,7 @@ public class ThreadPoolExecutorFactoryBean implements FactoryBean, InitializingB */ public boolean isSingleton() { - return true; + return false; } /**