Fixed ALF-4551: DeletedNodeCleanupWorker will throw UnsupportedOperationException

- Included unit test to start NodeCleanupRegistry (sanity check only)
 - DAO call to 'purgeNodes'
 - Add job locking to cleanup tasks


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@22255 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Derek Hulley
2010-09-03 22:55:23 +00:00
parent 10d8672fa9
commit 5f9da921d7
9 changed files with 201 additions and 170 deletions

View File

@@ -419,6 +419,20 @@
<isEqual property="deleted" compareValue="true">and node_deleted = #deleted#</isEqual>
</delete>
<delete id="delete_NodesByTxnCommitTime" parameterClass="TransactionQuery">
<![CDATA[
delete from alf_node
where
node_deleted = #deletedNodes# and
transaction_id <=
(
select max(txn.id) from alf_transaction txn
where
txn.commit_time_ms < #maxCommitTime#
)
]]>
</delete>
<delete id="delete_NodeProperties" parameterClass="NodeProperty">
delete from alf_node_properties
where
@@ -853,6 +867,7 @@
<sql id="select_Transaction_FullWhere">
<dynamic prepend="where">
<isNotNull property="id" prepend="---">txn.id = #id#</isNotNull>
<isNotNull property="deletedNodes" prepend="and"> node.node_deleted = #deletedNodes#</isNotNull>
<isNotNull property="storeId" prepend="and"> store.id = #storeId#</isNotNull>
<isNotNull property="excludeServerId" prepend="and"><![CDATA[ server_id <> #excludeServerId#]]></isNotNull>
<isNotNull property="minCommitTime" prepend="and"><![CDATA[ txn.commit_time_ms >= #minCommitTime#]]></isNotNull>
@@ -922,7 +937,7 @@
txn.id
from
alf_transaction txn
right join alf_node node on (node.transaction_id = txn.id)
left join alf_node node on (node.transaction_id = txn.id)
where
node.id is null
<isNotNull property="minId"><![CDATA[and txn.id >= #minId#]]></isNotNull>

View File

@@ -192,6 +192,9 @@
<property name="transactionService">
<ref bean="transactionService" />
</property>
<property name="jobLockService">
<ref bean="jobLockService" />
</property>
<property name="dbNodeService">
<ref bean="dbNodeService" />
</property>
@@ -199,7 +202,7 @@
<ref bean="nodeDAO" />
</property>
</bean>
<bean id="nodeCleanup.deleteNodeCleanup"
<bean id="nodeCleanup.deletedNodeCleanup"
class="org.alfresco.repo.node.db.DeletedNodeCleanupWorker"
parent="nodeCleanupBase">
<property name="minPurgeAgeDays">

View File

@@ -1526,13 +1526,10 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO
}
}
public void purgeNode(Long nodeId)
@Override
public int purgeNodes(long maxTxnCommitTimeMs)
{
int count = deleteNodeById(nodeId, true);
if (count != 1)
{
throw new ConcurrencyFailureException("Failed to purge node: " + nodeId);
}
return deleteNodesByCommitTime(true, maxTxnCommitTimeMs);
}
/*
@@ -3046,15 +3043,6 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO
return (txn == null ? null : txn.getId());
}
public void getNodesDeletedInOldTxns(
Long minNodeId,
long maxCommitTime,
int count,
NodeRefQueryCallback resultsCallback)
{
selectNodesDeletedInOldTxns(minNodeId, maxCommitTime, count, resultsCallback);
}
public int getTransactionCount()
{
return selectTransactionCount();
@@ -3170,6 +3158,7 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO
Long optionalOldSharedAlcIdInAdditionToNull,
Long newSharedAlcId);
protected abstract int deleteNodeById(Long nodeId, boolean deletedOnly);
protected abstract int deleteNodesByCommitTime(boolean deletedOnly, long maxTxnCommitTimeMs);
protected abstract NodeEntity selectNodeById(Long id, Boolean deleted);
protected abstract NodeEntity selectNodeByNodeRef(NodeRef nodeRef, Boolean deleted);
protected abstract List<NodeEntity> selectNodesByUuids(Long storeId, SortedSet<String> uuids);
@@ -3267,11 +3256,6 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO
String childNodeName);
protected abstract Transaction selectLastTxnBeforeCommitTime(Long maxCommitTime);
protected abstract void selectNodesDeletedInOldTxns(
Long minNodeId,
Long maxCommitTime,
Integer count,
NodeRefQueryCallback resultsCallback);
protected abstract int selectTransactionCount();
protected abstract Transaction selectTxnById(Long txnId);
protected abstract List<NodeEntity> selectTxnChanges(Long txnId, Long storeId);

View File

@@ -208,14 +208,14 @@ public interface NodeDAO extends NodeBulkLoader
* associated with a live transaction.
*/
public void deleteNode(Long nodeId);
/**
* Remove all traces of the node. This assumes that the node has been marked
* for deletion using {@link #deleteNode(Long)}.
* Purge deleted nodes where their participating transactions are older than a given time.
*
* @deprecated This will be replaced with a purgeNodes(long maxTxnCommitTimeMs)
* @param maxTxnCommitTimeMs ignore transactions created <i>after</i> this time
* @return Returns the number of deleted nodes purged
*/
public void purgeNode(Long nodeId);
public int purgeNodes(long maxTxnCommitTimeMs);
/*
* Properties
@@ -538,22 +538,6 @@ public interface NodeDAO extends NodeBulkLoader
* Transactions
*/
/**
* Gets a batch of deleted nodes in old transactions.
*
* @param minNodeId the minimum node ID
* @param maxCommitTime the maximum commit time (to set a minimum transaction age)
* @param count the maximum number of results (for batching)
* @param resultsCallback the callback to pass results back
*
* @deprecated {@link #purgeNode(Long)}
*/
public void getNodesDeletedInOldTxns(
Long minNodeId,
long maxCommitTime,
int count,
NodeRefQueryCallback resultsCallback);
/**
* Retrieves the maximum transaction ID for which the commit time is less than the given time.
*
@@ -622,8 +606,14 @@ public interface NodeDAO extends NodeBulkLoader
public void purgeTxn(Long txnId);
/**
* @return Returns the minimum commit time or <tt>null</tt> if there are no transactions
*/
public Long getMinTxnCommitTime();
/**
* @return Returns the maximum commit time or <tt>null</tt> if there are no transactions
*/
public Long getMaxTxnCommitTime();
/**

View File

@@ -43,6 +43,7 @@ public class TransactionQueryEntity
private List<Long> excludeTxnIds;
private Long excludeServerId;
private Boolean ascending;
private Boolean deletedNodes;
private Long storeId;
/**
@@ -66,6 +67,7 @@ public class TransactionQueryEntity
.append(", excludeTxnIds=").append(excludeTxnIds)
.append(", excludeServerId=").append(excludeServerId)
.append(", ascending=").append(ascending)
.append(", deletedNodes=").append(deletedNodes)
.append(", storeId=").append(storeId)
.append("]");
return sb.toString();
@@ -161,6 +163,16 @@ public class TransactionQueryEntity
this.ascending = ascending;
}
public Boolean getDeletedNodes()
{
return deletedNodes;
}
public void setDeletedNodes(Boolean deletedNodes)
{
this.deletedNodes = deletedNodes;
}
public Long getStoreId()
{
return storeId;

View File

@@ -88,6 +88,7 @@ public class NodeDAOImpl extends AbstractNodeDAOImpl
private static final String UPDATE_NODE = "alfresco.node.update_Node";
private static final String UPDATE_NODE_PATCH_ACL = "alfresco.node.update_NodePatchAcl";
private static final String DELETE_NODE_BY_ID = "alfresco.node.delete_NodeById";
private static final String DELETE_NODES_BY_TXN_COMMIT_TIME = "alfresco.node.delete_NodesByTxnCommitTime";
private static final String SELECT_NODE_BY_ID = "alfresco.node.select_NodeById";
private static final String SELECT_NODE_BY_NODEREF = "alfresco.node.select_NodeByNodeRef";
private static final String SELECT_NODES_BY_UUIDS = "alfresco.node.select_NodesByUuids";
@@ -321,6 +322,15 @@ public class NodeDAOImpl extends AbstractNodeDAOImpl
return template.delete(DELETE_NODE_BY_ID, node);
}
@Override
protected int deleteNodesByCommitTime(boolean deletedOnly, long maxTxnCommitTimeMs)
{
TransactionQueryEntity query = new TransactionQueryEntity();
query.setDeletedNodes(Boolean.TRUE);
query.setMaxCommitTime(maxTxnCommitTimeMs);
return template.delete(DELETE_NODES_BY_TXN_COMMIT_TIME, query);
}
@Override
protected NodeEntity selectNodeById(Long id, Boolean deleted)
{
@@ -1224,16 +1234,6 @@ public class NodeDAOImpl extends AbstractNodeDAOImpl
}
}
@Override
protected void selectNodesDeletedInOldTxns(
Long minNodeId,
Long maxCommitTime,
Integer count,
NodeRefQueryCallback resultsCallback)
{
throw new UnsupportedOperationException();
}
@Override
protected int selectTransactionCount()
{

View File

@@ -20,16 +20,20 @@ package org.alfresco.repo.node.cleanup;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import org.alfresco.error.StackTraceUtil;
import org.alfresco.repo.domain.node.NodeDAO;
import org.alfresco.repo.lock.JobLockService;
import org.alfresco.repo.lock.LockAcquisitionException;
import org.alfresco.repo.node.db.DbNodeServiceImpl;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.PropertyCheck;
import org.alfresco.util.VmShutdownListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,18 +47,28 @@ import org.apache.commons.logging.LogFactory;
*/
public abstract class AbstractNodeCleanupWorker implements NodeCleanupWorker
{
/** Lock key: system:NodeCleanup */
private static final QName LOCK = QName.createQName(NamespaceService.SYSTEM_MODEL_1_0_URI, "NodeCleanup");
/** Default Lock time to live: 1 minute */
private static final long LOCK_TTL = 60*1000L;
protected final Log logger;
private final ReentrantLock cleanupLock;
private NodeCleanupRegistry registry;
protected TransactionService transactionService;
protected JobLockService jobLockService;
protected DbNodeServiceImpl dbNodeService;
protected NodeDAO nodeDAO;
private ThreadLocal<String> lockToken = new ThreadLocal<String>();
private VmShutdownListener shutdownListener = new VmShutdownListener("NodeCleanup");
/**
* Default constructor
*/
public AbstractNodeCleanupWorker()
{
logger = LogFactory.getLog(this.getClass());
cleanupLock = new ReentrantLock();
}
public void setRegistry(NodeCleanupRegistry registry)
@@ -67,6 +81,11 @@ public abstract class AbstractNodeCleanupWorker implements NodeCleanupWorker
this.transactionService = transactionService;
}
public void setJobLockService(JobLockService jobLockService)
{
this.jobLockService = jobLockService;
}
public void setDbNodeService(DbNodeServiceImpl dbNodeService)
{
this.dbNodeService = dbNodeService;
@@ -81,6 +100,7 @@ public abstract class AbstractNodeCleanupWorker implements NodeCleanupWorker
{
PropertyCheck.mandatory(this, "registry", registry);
PropertyCheck.mandatory(this, "transactionService", transactionService);
PropertyCheck.mandatory(this, "jobLockService", jobLockService);
PropertyCheck.mandatory(this, "dbNodeService", dbNodeService);
PropertyCheck.mandatory(this, "nodeDAO", nodeDAO);
@@ -94,46 +114,52 @@ public abstract class AbstractNodeCleanupWorker implements NodeCleanupWorker
*/
public List<String> doClean()
{
/** Prevent multiple executions of the implementation method */
boolean locked = cleanupLock.tryLock();
if (locked)
try
{
try
// Get a lock
lockToken.set(null);
String token = jobLockService.getLock(LOCK, LOCK_TTL);
lockToken.set(token);
// Do the work
return doCleanWithTxn();
}
catch (LockAcquisitionException e)
{
// Some other process was busy
return Collections.singletonList("Node cleanup in process: " + e.getMessage());
}
catch (Throwable e)
{
if (logger.isDebugEnabled())
{
return doCleanWithTxn();
}
catch (Throwable e)
{
if (logger.isDebugEnabled())
{
StringBuilder sb = new StringBuilder(1024);
StackTraceUtil.buildStackTrace(
"Node cleanup failed: " +
" Worker: " + this.getClass().getName() + "\n" +
" Error: " + e.getMessage(),
e.getStackTrace(),
sb,
Integer.MAX_VALUE);
logger.debug(sb.toString());
}
StringBuilder sb = new StringBuilder(1024);
StackTraceUtil.buildStackTrace(
"Node cleanup failed: " +
" Worker: " + this.getClass().getName() + "\n" +
" Error: " + e.getMessage(),
e.getStackTrace(),
sb,
20);
return Collections.singletonList(sb.toString());
}
finally
{
cleanupLock.unlock();
"Node cleanup failed: " +
" Worker: " + this.getClass().getName() + "\n" +
" Error: " + e.getMessage(),
e.getStackTrace(),
sb,
Integer.MAX_VALUE);
logger.debug(sb.toString());
}
StringBuilder sb = new StringBuilder(1024);
StackTraceUtil.buildStackTrace(
"Node cleanup failed: " +
" Worker: " + this.getClass().getName() + "\n" +
" Error: " + e.getMessage(),
e.getStackTrace(),
sb,
20);
return Collections.singletonList(sb.toString());
}
else
finally
{
return Collections.emptyList();
String token = this.lockToken.get();
if (token != null)
{
jobLockService.releaseLock(token, LOCK);
}
}
}
@@ -156,6 +182,24 @@ public abstract class AbstractNodeCleanupWorker implements NodeCleanupWorker
return AuthenticationUtil.runAs(doCleanRunAs, AuthenticationUtil.getSystemUserName());
}
/**
* Helper method to refresh the current job's lock token
*/
protected void refreshLock() throws LockAcquisitionException
{
String token = this.lockToken.get();
if (token != null && !shutdownListener.isVmShuttingDown())
{
// We had a lock token AND the VM is still going
jobLockService.refreshLock(token, LOCK, LOCK_TTL);
}
else
{
// There is no lock token on this thread, so we trigger a deliberate failure
jobLockService.refreshLock("lock token not available", LOCK, LOCK_TTL);
}
}
/**
* Do the actual cleanup. Any errors are handled by this base class.
*

View File

@@ -30,6 +30,7 @@ import javax.transaction.UserTransaction;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.domain.node.NodeDAO;
import org.alfresco.repo.node.BaseNodeServiceTest;
import org.alfresco.repo.node.cleanup.NodeCleanupRegistry;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.cmr.dictionary.DictionaryService;
@@ -70,6 +71,17 @@ public class DbNodeServiceImplTest extends BaseNodeServiceTest
nodeDAO = (NodeDAO) applicationContext.getBean("nodeDAO");
dictionaryService = (DictionaryService) applicationContext.getBean("dictionaryService");
}
/**
* Manually trigger the cleanup registry
*/
public void testNodeCleanupRegistry() throws Exception
{
setComplete();
endTransaction();
NodeCleanupRegistry cleanupRegistry = (NodeCleanupRegistry) applicationContext.getBean("nodeCleanupRegistry");
cleanupRegistry.doClean();
}
/**
* Deletes a child node and then iterates over the children of the parent node,

View File

@@ -22,12 +22,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.alfresco.repo.domain.node.NodeDAO.NodeRefQueryCallback;
import org.alfresco.repo.node.cleanup.AbstractNodeCleanupWorker;
import org.alfresco.repo.transaction.RetryingTransactionHelper;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.util.Pair;
import org.apache.commons.lang.mutable.MutableLong;
/**
@@ -53,15 +50,20 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker
*/
protected List<String> doCleanInternal() throws Throwable
{
List<String> purgedNodes = purgeOldDeletedNodes(minPurgeAgeMs);
List<String> purgedTxns = purgeOldEmptyTransactions(minPurgeAgeMs);
List<String> allResults = new ArrayList<String>(100);
allResults.addAll(purgedNodes);
allResults.addAll(purgedTxns);
// Done
return allResults;
if (minPurgeAgeMs < 0)
{
return Collections.singletonList("Minimum purge age is negative; purge disabled");
}
List<String> purgedNodes = purgeOldDeletedNodes(minPurgeAgeMs);
List<String> purgedTxns = purgeOldEmptyTransactions(minPurgeAgeMs);
List<String> allResults = new ArrayList<String>(100);
allResults.addAll(purgedNodes);
allResults.addAll(purgedTxns);
// Done
return allResults;
}
/**
@@ -75,7 +77,6 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker
this.minPurgeAgeMs = ((long) minPurgeAgeDays) * 24L * 3600L * 1000L;
}
private static final int NODE_PURGE_BATCH_SIZE = 1000;
/**
* Cleans up deleted nodes that are older than the given minimum age.
*
@@ -84,88 +85,55 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker
*/
private List<String> purgeOldDeletedNodes(long minAge)
{
if (minAge < 0)
{
return Collections.emptyList();
}
final List<String> results = new ArrayList<String>(100);
final MutableLong minNodeId = new MutableLong(0L);
final long maxCommitTime = System.currentTimeMillis() - minAge;
final long maxCommitTimeMs = System.currentTimeMillis() - minAge;
RetryingTransactionCallback<Integer> purgeNodesCallback = new RetryingTransactionCallback<Integer>()
{
public Integer execute() throws Throwable
{
final List<Pair<Long, NodeRef>> nodePairs = new ArrayList<Pair<Long, NodeRef>>(NODE_PURGE_BATCH_SIZE);
NodeRefQueryCallback callback = new NodeRefQueryCallback()
{
public boolean handle(Pair<Long, NodeRef> nodePair)
{
nodePairs.add(nodePair);
return true;
}
};
nodeDAO.getNodesDeletedInOldTxns(minNodeId.longValue(), maxCommitTime, NODE_PURGE_BATCH_SIZE, callback);
for (Pair<Long, NodeRef> nodePair : nodePairs)
{
Long nodeId = nodePair.getFirst();
nodeDAO.purgeNode(nodeId);
// Update the min node ID for the next query
if (nodeId.longValue() > minNodeId.longValue())
{
minNodeId.setValue(nodeId.longValue());
}
}
return nodePairs.size();
return nodeDAO.purgeNodes(maxCommitTimeMs);
}
};
while (true)
// TODO: Add error catching and decrement the maxCommitTimeMs to reduce DB resource usage
RetryingTransactionHelper txnHelper = transactionService.getRetryingTransactionHelper();
txnHelper.setMaxRetries(5); // Limit number of retries
txnHelper.setRetryWaitIncrementMs(1000); // 1 second to allow other cleanups time to get through
// Get nodes to delete
Integer purgeCount = new Integer(0);
// Purge nodes
try
{
RetryingTransactionHelper txnHelper = transactionService.getRetryingTransactionHelper();
txnHelper.setMaxRetries(5); // Limit number of retries
txnHelper.setRetryWaitIncrementMs(1000); // 1 second to allow other cleanups time to get through
// Get nodes to delete
Integer purgeCount = new Integer(0);
// Purge nodes
try
purgeCount = txnHelper.doInTransaction(purgeNodesCallback, false, true);
if (purgeCount.intValue() > 0)
{
purgeCount = txnHelper.doInTransaction(purgeNodesCallback, false, true);
if (purgeCount.intValue() > 0)
{
String msg =
"Purged old nodes: \n" +
" Min node ID: " + minNodeId.longValue() + "\n" +
" Batch size: " + NODE_PURGE_BATCH_SIZE + "\n" +
" Max commit time: " + maxCommitTime + "\n" +
" Purge count: " + purgeCount;
results.add(msg);
}
}
catch (Throwable e)
{
String msg =
"Failed to purge nodes." +
" Set log level to WARN for this class to get exception log: \n" +
" Min node ID: " + minNodeId.longValue() + "\n" +
" Batch size: " + NODE_PURGE_BATCH_SIZE + "\n" +
" Max commit time: " + maxCommitTime + "\n" +
" Error: " + e.getMessage();
// It failed; do a full log in WARN mode
if (logger.isWarnEnabled())
{
logger.warn(msg, e);
}
else
{
logger.error(msg);
}
String msg =
"Purged old nodes: \n" +
" Max commit time: " + maxCommitTimeMs + "\n" +
" Purge count: " + purgeCount;
results.add(msg);
break;
}
if (purgeCount.intValue() == 0)
}
catch (Throwable e)
{
String msg =
"Failed to purge nodes." +
" If the purgable set is too large for the available DB resources \n" +
" then the nodes can be purged manually as well. \n" +
" Set log level to WARN for this class to get exception log: \n" +
" Max commit time: " + maxCommitTimeMs + "\n" +
" Error: " + e.getMessage();
// It failed; do a full log in WARN mode
if (logger.isWarnEnabled())
{
break;
logger.warn(msg, e);
}
else
{
logger.error(msg);
}
results.add(msg);
}
// Done
return results;
@@ -210,6 +178,9 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker
};
while (true)
{
// Ensure we keep the lock
refreshLock();
RetryingTransactionHelper txnHelper = transactionService.getRetryingTransactionHelper();
txnHelper.setMaxRetries(5); // Limit number of retries
txnHelper.setRetryWaitIncrementMs(1000); // 1 second to allow other cleanups time to get through