diff --git a/config/alfresco/ibatis/org.hibernate.dialect.Dialect/node-common-SqlMap.xml b/config/alfresco/ibatis/org.hibernate.dialect.Dialect/node-common-SqlMap.xml index a45d84e348..468fd46875 100644 --- a/config/alfresco/ibatis/org.hibernate.dialect.Dialect/node-common-SqlMap.xml +++ b/config/alfresco/ibatis/org.hibernate.dialect.Dialect/node-common-SqlMap.xml @@ -419,6 +419,20 @@ and node_deleted = #deleted# + + + + delete from alf_node_properties where @@ -853,6 +867,7 @@ txn.id = #id# + node.node_deleted = #deletedNodes# store.id = #storeId# #excludeServerId#]]> = #minCommitTime#]]> @@ -922,7 +937,7 @@ txn.id from alf_transaction txn - right join alf_node node on (node.transaction_id = txn.id) + left join alf_node node on (node.transaction_id = txn.id) where node.id is null = #minId#]]> diff --git a/config/alfresco/node-services-context.xml b/config/alfresco/node-services-context.xml index 75f999cc6e..2d1c9db6ec 100644 --- a/config/alfresco/node-services-context.xml +++ b/config/alfresco/node-services-context.xml @@ -192,6 +192,9 @@ + + + @@ -199,7 +202,7 @@ - diff --git a/source/java/org/alfresco/repo/domain/node/AbstractNodeDAOImpl.java b/source/java/org/alfresco/repo/domain/node/AbstractNodeDAOImpl.java index fb7fb6bef1..79e8276236 100644 --- a/source/java/org/alfresco/repo/domain/node/AbstractNodeDAOImpl.java +++ b/source/java/org/alfresco/repo/domain/node/AbstractNodeDAOImpl.java @@ -1526,13 +1526,10 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO } } - public void purgeNode(Long nodeId) + @Override + public int purgeNodes(long maxTxnCommitTimeMs) { - int count = deleteNodeById(nodeId, true); - if (count != 1) - { - throw new ConcurrencyFailureException("Failed to purge node: " + nodeId); - } + return deleteNodesByCommitTime(true, maxTxnCommitTimeMs); } /* @@ -3046,15 +3043,6 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO return (txn == null ? null : txn.getId()); } - public void getNodesDeletedInOldTxns( - Long minNodeId, - long maxCommitTime, - int count, - NodeRefQueryCallback resultsCallback) - { - selectNodesDeletedInOldTxns(minNodeId, maxCommitTime, count, resultsCallback); - } - public int getTransactionCount() { return selectTransactionCount(); @@ -3170,6 +3158,7 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO Long optionalOldSharedAlcIdInAdditionToNull, Long newSharedAlcId); protected abstract int deleteNodeById(Long nodeId, boolean deletedOnly); + protected abstract int deleteNodesByCommitTime(boolean deletedOnly, long maxTxnCommitTimeMs); protected abstract NodeEntity selectNodeById(Long id, Boolean deleted); protected abstract NodeEntity selectNodeByNodeRef(NodeRef nodeRef, Boolean deleted); protected abstract List selectNodesByUuids(Long storeId, SortedSet uuids); @@ -3267,11 +3256,6 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO String childNodeName); protected abstract Transaction selectLastTxnBeforeCommitTime(Long maxCommitTime); - protected abstract void selectNodesDeletedInOldTxns( - Long minNodeId, - Long maxCommitTime, - Integer count, - NodeRefQueryCallback resultsCallback); protected abstract int selectTransactionCount(); protected abstract Transaction selectTxnById(Long txnId); protected abstract List selectTxnChanges(Long txnId, Long storeId); diff --git a/source/java/org/alfresco/repo/domain/node/NodeDAO.java b/source/java/org/alfresco/repo/domain/node/NodeDAO.java index 9f99e1499b..130d4d3f22 100644 --- a/source/java/org/alfresco/repo/domain/node/NodeDAO.java +++ b/source/java/org/alfresco/repo/domain/node/NodeDAO.java @@ -208,14 +208,14 @@ public interface NodeDAO extends NodeBulkLoader * associated with a live transaction. */ public void deleteNode(Long nodeId); - + /** - * Remove all traces of the node. This assumes that the node has been marked - * for deletion using {@link #deleteNode(Long)}. + * Purge deleted nodes where their participating transactions are older than a given time. * - * @deprecated This will be replaced with a purgeNodes(long maxTxnCommitTimeMs) + * @param maxTxnCommitTimeMs ignore transactions created after this time + * @return Returns the number of deleted nodes purged */ - public void purgeNode(Long nodeId); + public int purgeNodes(long maxTxnCommitTimeMs); /* * Properties @@ -538,22 +538,6 @@ public interface NodeDAO extends NodeBulkLoader * Transactions */ - /** - * Gets a batch of deleted nodes in old transactions. - * - * @param minNodeId the minimum node ID - * @param maxCommitTime the maximum commit time (to set a minimum transaction age) - * @param count the maximum number of results (for batching) - * @param resultsCallback the callback to pass results back - * - * @deprecated {@link #purgeNode(Long)} - */ - public void getNodesDeletedInOldTxns( - Long minNodeId, - long maxCommitTime, - int count, - NodeRefQueryCallback resultsCallback); - /** * Retrieves the maximum transaction ID for which the commit time is less than the given time. * @@ -622,8 +606,14 @@ public interface NodeDAO extends NodeBulkLoader public void purgeTxn(Long txnId); + /** + * @return Returns the minimum commit time or null if there are no transactions + */ public Long getMinTxnCommitTime(); + /** + * @return Returns the maximum commit time or null if there are no transactions + */ public Long getMaxTxnCommitTime(); /** diff --git a/source/java/org/alfresco/repo/domain/node/TransactionQueryEntity.java b/source/java/org/alfresco/repo/domain/node/TransactionQueryEntity.java index a2d93abe13..884e52905d 100644 --- a/source/java/org/alfresco/repo/domain/node/TransactionQueryEntity.java +++ b/source/java/org/alfresco/repo/domain/node/TransactionQueryEntity.java @@ -43,6 +43,7 @@ public class TransactionQueryEntity private List excludeTxnIds; private Long excludeServerId; private Boolean ascending; + private Boolean deletedNodes; private Long storeId; /** @@ -66,6 +67,7 @@ public class TransactionQueryEntity .append(", excludeTxnIds=").append(excludeTxnIds) .append(", excludeServerId=").append(excludeServerId) .append(", ascending=").append(ascending) + .append(", deletedNodes=").append(deletedNodes) .append(", storeId=").append(storeId) .append("]"); return sb.toString(); @@ -161,6 +163,16 @@ public class TransactionQueryEntity this.ascending = ascending; } + public Boolean getDeletedNodes() + { + return deletedNodes; + } + + public void setDeletedNodes(Boolean deletedNodes) + { + this.deletedNodes = deletedNodes; + } + public Long getStoreId() { return storeId; diff --git a/source/java/org/alfresco/repo/domain/node/ibatis/NodeDAOImpl.java b/source/java/org/alfresco/repo/domain/node/ibatis/NodeDAOImpl.java index 1eacb4a2eb..156849985f 100644 --- a/source/java/org/alfresco/repo/domain/node/ibatis/NodeDAOImpl.java +++ b/source/java/org/alfresco/repo/domain/node/ibatis/NodeDAOImpl.java @@ -88,6 +88,7 @@ public class NodeDAOImpl extends AbstractNodeDAOImpl private static final String UPDATE_NODE = "alfresco.node.update_Node"; private static final String UPDATE_NODE_PATCH_ACL = "alfresco.node.update_NodePatchAcl"; private static final String DELETE_NODE_BY_ID = "alfresco.node.delete_NodeById"; + private static final String DELETE_NODES_BY_TXN_COMMIT_TIME = "alfresco.node.delete_NodesByTxnCommitTime"; private static final String SELECT_NODE_BY_ID = "alfresco.node.select_NodeById"; private static final String SELECT_NODE_BY_NODEREF = "alfresco.node.select_NodeByNodeRef"; private static final String SELECT_NODES_BY_UUIDS = "alfresco.node.select_NodesByUuids"; @@ -321,6 +322,15 @@ public class NodeDAOImpl extends AbstractNodeDAOImpl return template.delete(DELETE_NODE_BY_ID, node); } + @Override + protected int deleteNodesByCommitTime(boolean deletedOnly, long maxTxnCommitTimeMs) + { + TransactionQueryEntity query = new TransactionQueryEntity(); + query.setDeletedNodes(Boolean.TRUE); + query.setMaxCommitTime(maxTxnCommitTimeMs); + return template.delete(DELETE_NODES_BY_TXN_COMMIT_TIME, query); + } + @Override protected NodeEntity selectNodeById(Long id, Boolean deleted) { @@ -1224,16 +1234,6 @@ public class NodeDAOImpl extends AbstractNodeDAOImpl } } - @Override - protected void selectNodesDeletedInOldTxns( - Long minNodeId, - Long maxCommitTime, - Integer count, - NodeRefQueryCallback resultsCallback) - { - throw new UnsupportedOperationException(); - } - @Override protected int selectTransactionCount() { diff --git a/source/java/org/alfresco/repo/node/cleanup/AbstractNodeCleanupWorker.java b/source/java/org/alfresco/repo/node/cleanup/AbstractNodeCleanupWorker.java index 552f126d62..30d9777d1f 100644 --- a/source/java/org/alfresco/repo/node/cleanup/AbstractNodeCleanupWorker.java +++ b/source/java/org/alfresco/repo/node/cleanup/AbstractNodeCleanupWorker.java @@ -20,16 +20,20 @@ package org.alfresco.repo.node.cleanup; import java.util.Collections; import java.util.List; -import java.util.concurrent.locks.ReentrantLock; import org.alfresco.error.StackTraceUtil; import org.alfresco.repo.domain.node.NodeDAO; +import org.alfresco.repo.lock.JobLockService; +import org.alfresco.repo.lock.LockAcquisitionException; import org.alfresco.repo.node.db.DbNodeServiceImpl; import org.alfresco.repo.security.authentication.AuthenticationUtil; import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; +import org.alfresco.service.namespace.NamespaceService; +import org.alfresco.service.namespace.QName; import org.alfresco.service.transaction.TransactionService; import org.alfresco.util.PropertyCheck; +import org.alfresco.util.VmShutdownListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,18 +47,28 @@ import org.apache.commons.logging.LogFactory; */ public abstract class AbstractNodeCleanupWorker implements NodeCleanupWorker { + /** Lock key: system:NodeCleanup */ + private static final QName LOCK = QName.createQName(NamespaceService.SYSTEM_MODEL_1_0_URI, "NodeCleanup"); + /** Default Lock time to live: 1 minute */ + private static final long LOCK_TTL = 60*1000L; + protected final Log logger; - private final ReentrantLock cleanupLock; private NodeCleanupRegistry registry; protected TransactionService transactionService; + protected JobLockService jobLockService; protected DbNodeServiceImpl dbNodeService; protected NodeDAO nodeDAO; + private ThreadLocal lockToken = new ThreadLocal(); + private VmShutdownListener shutdownListener = new VmShutdownListener("NodeCleanup"); + + /** + * Default constructor + */ public AbstractNodeCleanupWorker() { logger = LogFactory.getLog(this.getClass()); - cleanupLock = new ReentrantLock(); } public void setRegistry(NodeCleanupRegistry registry) @@ -67,6 +81,11 @@ public abstract class AbstractNodeCleanupWorker implements NodeCleanupWorker this.transactionService = transactionService; } + public void setJobLockService(JobLockService jobLockService) + { + this.jobLockService = jobLockService; + } + public void setDbNodeService(DbNodeServiceImpl dbNodeService) { this.dbNodeService = dbNodeService; @@ -81,6 +100,7 @@ public abstract class AbstractNodeCleanupWorker implements NodeCleanupWorker { PropertyCheck.mandatory(this, "registry", registry); PropertyCheck.mandatory(this, "transactionService", transactionService); + PropertyCheck.mandatory(this, "jobLockService", jobLockService); PropertyCheck.mandatory(this, "dbNodeService", dbNodeService); PropertyCheck.mandatory(this, "nodeDAO", nodeDAO); @@ -94,46 +114,52 @@ public abstract class AbstractNodeCleanupWorker implements NodeCleanupWorker */ public List doClean() { - /** Prevent multiple executions of the implementation method */ - boolean locked = cleanupLock.tryLock(); - if (locked) + try { - try + // Get a lock + lockToken.set(null); + String token = jobLockService.getLock(LOCK, LOCK_TTL); + lockToken.set(token); + + // Do the work + return doCleanWithTxn(); + } + catch (LockAcquisitionException e) + { + // Some other process was busy + return Collections.singletonList("Node cleanup in process: " + e.getMessage()); + } + catch (Throwable e) + { + if (logger.isDebugEnabled()) { - return doCleanWithTxn(); - } - catch (Throwable e) - { - if (logger.isDebugEnabled()) - { - StringBuilder sb = new StringBuilder(1024); - StackTraceUtil.buildStackTrace( - "Node cleanup failed: " + - " Worker: " + this.getClass().getName() + "\n" + - " Error: " + e.getMessage(), - e.getStackTrace(), - sb, - Integer.MAX_VALUE); - logger.debug(sb.toString()); - } StringBuilder sb = new StringBuilder(1024); StackTraceUtil.buildStackTrace( - "Node cleanup failed: " + - " Worker: " + this.getClass().getName() + "\n" + - " Error: " + e.getMessage(), - e.getStackTrace(), - sb, - 20); - return Collections.singletonList(sb.toString()); - } - finally - { - cleanupLock.unlock(); + "Node cleanup failed: " + + " Worker: " + this.getClass().getName() + "\n" + + " Error: " + e.getMessage(), + e.getStackTrace(), + sb, + Integer.MAX_VALUE); + logger.debug(sb.toString()); } + StringBuilder sb = new StringBuilder(1024); + StackTraceUtil.buildStackTrace( + "Node cleanup failed: " + + " Worker: " + this.getClass().getName() + "\n" + + " Error: " + e.getMessage(), + e.getStackTrace(), + sb, + 20); + return Collections.singletonList(sb.toString()); } - else + finally { - return Collections.emptyList(); + String token = this.lockToken.get(); + if (token != null) + { + jobLockService.releaseLock(token, LOCK); + } } } @@ -156,6 +182,24 @@ public abstract class AbstractNodeCleanupWorker implements NodeCleanupWorker return AuthenticationUtil.runAs(doCleanRunAs, AuthenticationUtil.getSystemUserName()); } + /** + * Helper method to refresh the current job's lock token + */ + protected void refreshLock() throws LockAcquisitionException + { + String token = this.lockToken.get(); + if (token != null && !shutdownListener.isVmShuttingDown()) + { + // We had a lock token AND the VM is still going + jobLockService.refreshLock(token, LOCK, LOCK_TTL); + } + else + { + // There is no lock token on this thread, so we trigger a deliberate failure + jobLockService.refreshLock("lock token not available", LOCK, LOCK_TTL); + } + } + /** * Do the actual cleanup. Any errors are handled by this base class. * diff --git a/source/java/org/alfresco/repo/node/db/DbNodeServiceImplTest.java b/source/java/org/alfresco/repo/node/db/DbNodeServiceImplTest.java index 266d68f8dc..6610fd5c10 100644 --- a/source/java/org/alfresco/repo/node/db/DbNodeServiceImplTest.java +++ b/source/java/org/alfresco/repo/node/db/DbNodeServiceImplTest.java @@ -30,6 +30,7 @@ import javax.transaction.UserTransaction; import org.alfresco.model.ContentModel; import org.alfresco.repo.domain.node.NodeDAO; import org.alfresco.repo.node.BaseNodeServiceTest; +import org.alfresco.repo.node.cleanup.NodeCleanupRegistry; import org.alfresco.repo.transaction.AlfrescoTransactionSupport; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.cmr.dictionary.DictionaryService; @@ -70,6 +71,17 @@ public class DbNodeServiceImplTest extends BaseNodeServiceTest nodeDAO = (NodeDAO) applicationContext.getBean("nodeDAO"); dictionaryService = (DictionaryService) applicationContext.getBean("dictionaryService"); } + + /** + * Manually trigger the cleanup registry + */ + public void testNodeCleanupRegistry() throws Exception + { + setComplete(); + endTransaction(); + NodeCleanupRegistry cleanupRegistry = (NodeCleanupRegistry) applicationContext.getBean("nodeCleanupRegistry"); + cleanupRegistry.doClean(); + } /** * Deletes a child node and then iterates over the children of the parent node, diff --git a/source/java/org/alfresco/repo/node/db/DeletedNodeCleanupWorker.java b/source/java/org/alfresco/repo/node/db/DeletedNodeCleanupWorker.java index 5cba74ad7c..f06094ad53 100644 --- a/source/java/org/alfresco/repo/node/db/DeletedNodeCleanupWorker.java +++ b/source/java/org/alfresco/repo/node/db/DeletedNodeCleanupWorker.java @@ -22,12 +22,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.alfresco.repo.domain.node.NodeDAO.NodeRefQueryCallback; import org.alfresco.repo.node.cleanup.AbstractNodeCleanupWorker; import org.alfresco.repo.transaction.RetryingTransactionHelper; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; -import org.alfresco.service.cmr.repository.NodeRef; -import org.alfresco.util.Pair; import org.apache.commons.lang.mutable.MutableLong; /** @@ -53,15 +50,20 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker */ protected List doCleanInternal() throws Throwable { - List purgedNodes = purgeOldDeletedNodes(minPurgeAgeMs); - List purgedTxns = purgeOldEmptyTransactions(minPurgeAgeMs); - - List allResults = new ArrayList(100); - allResults.addAll(purgedNodes); - allResults.addAll(purgedTxns); - - // Done - return allResults; + if (minPurgeAgeMs < 0) + { + return Collections.singletonList("Minimum purge age is negative; purge disabled"); + } + + List purgedNodes = purgeOldDeletedNodes(minPurgeAgeMs); + List purgedTxns = purgeOldEmptyTransactions(minPurgeAgeMs); + + List allResults = new ArrayList(100); + allResults.addAll(purgedNodes); + allResults.addAll(purgedTxns); + + // Done + return allResults; } /** @@ -75,7 +77,6 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker this.minPurgeAgeMs = ((long) minPurgeAgeDays) * 24L * 3600L * 1000L; } - private static final int NODE_PURGE_BATCH_SIZE = 1000; /** * Cleans up deleted nodes that are older than the given minimum age. * @@ -84,88 +85,55 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker */ private List purgeOldDeletedNodes(long minAge) { - if (minAge < 0) - { - return Collections.emptyList(); - } final List results = new ArrayList(100); - final MutableLong minNodeId = new MutableLong(0L); - final long maxCommitTime = System.currentTimeMillis() - minAge; + final long maxCommitTimeMs = System.currentTimeMillis() - minAge; + RetryingTransactionCallback purgeNodesCallback = new RetryingTransactionCallback() { public Integer execute() throws Throwable { - final List> nodePairs = new ArrayList>(NODE_PURGE_BATCH_SIZE); - NodeRefQueryCallback callback = new NodeRefQueryCallback() - { - public boolean handle(Pair nodePair) - { - nodePairs.add(nodePair); - return true; - } - }; - nodeDAO.getNodesDeletedInOldTxns(minNodeId.longValue(), maxCommitTime, NODE_PURGE_BATCH_SIZE, callback); - for (Pair nodePair : nodePairs) - { - Long nodeId = nodePair.getFirst(); - nodeDAO.purgeNode(nodeId); - // Update the min node ID for the next query - if (nodeId.longValue() > minNodeId.longValue()) - { - minNodeId.setValue(nodeId.longValue()); - } - } - return nodePairs.size(); + return nodeDAO.purgeNodes(maxCommitTimeMs); } }; - while (true) + // TODO: Add error catching and decrement the maxCommitTimeMs to reduce DB resource usage + RetryingTransactionHelper txnHelper = transactionService.getRetryingTransactionHelper(); + txnHelper.setMaxRetries(5); // Limit number of retries + txnHelper.setRetryWaitIncrementMs(1000); // 1 second to allow other cleanups time to get through + // Get nodes to delete + Integer purgeCount = new Integer(0); + // Purge nodes + try { - RetryingTransactionHelper txnHelper = transactionService.getRetryingTransactionHelper(); - txnHelper.setMaxRetries(5); // Limit number of retries - txnHelper.setRetryWaitIncrementMs(1000); // 1 second to allow other cleanups time to get through - // Get nodes to delete - Integer purgeCount = new Integer(0); - // Purge nodes - try + purgeCount = txnHelper.doInTransaction(purgeNodesCallback, false, true); + if (purgeCount.intValue() > 0) { - purgeCount = txnHelper.doInTransaction(purgeNodesCallback, false, true); - if (purgeCount.intValue() > 0) - { - String msg = - "Purged old nodes: \n" + - " Min node ID: " + minNodeId.longValue() + "\n" + - " Batch size: " + NODE_PURGE_BATCH_SIZE + "\n" + - " Max commit time: " + maxCommitTime + "\n" + - " Purge count: " + purgeCount; - results.add(msg); - } - } - catch (Throwable e) - { - String msg = - "Failed to purge nodes." + - " Set log level to WARN for this class to get exception log: \n" + - " Min node ID: " + minNodeId.longValue() + "\n" + - " Batch size: " + NODE_PURGE_BATCH_SIZE + "\n" + - " Max commit time: " + maxCommitTime + "\n" + - " Error: " + e.getMessage(); - // It failed; do a full log in WARN mode - if (logger.isWarnEnabled()) - { - logger.warn(msg, e); - } - else - { - logger.error(msg); - } + String msg = + "Purged old nodes: \n" + + " Max commit time: " + maxCommitTimeMs + "\n" + + " Purge count: " + purgeCount; results.add(msg); - break; } - if (purgeCount.intValue() == 0) + } + catch (Throwable e) + { + String msg = + "Failed to purge nodes." + + " If the purgable set is too large for the available DB resources \n" + + " then the nodes can be purged manually as well. \n" + + " Set log level to WARN for this class to get exception log: \n" + + " Max commit time: " + maxCommitTimeMs + "\n" + + " Error: " + e.getMessage(); + // It failed; do a full log in WARN mode + if (logger.isWarnEnabled()) { - break; + logger.warn(msg, e); } + else + { + logger.error(msg); + } + results.add(msg); } // Done return results; @@ -210,6 +178,9 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker }; while (true) { + // Ensure we keep the lock + refreshLock(); + RetryingTransactionHelper txnHelper = transactionService.getRetryingTransactionHelper(); txnHelper.setMaxRetries(5); // Limit number of retries txnHelper.setRetryWaitIncrementMs(1000); // 1 second to allow other cleanups time to get through