Merged 5.1.N (5.1.1) to HEAD (5.1)

113397 tvalkevych: Merged 5.0.N (5.0.3) to 5.1.N (5.1.1)
      113290 adavis: Merged V4.2-BUG-FIX (4.2.6) to 5.0.N (5.0.3)
         113278 adavis: Merged V4.1-BUG-FIX (4.1.11) to V4.2-BUG-FIX (4.2.6)
            112985 amorarasu: Merged DEV to V4.1-BUG-FIX (4.1.11)
               111742: MNT-10305: DeletedNodeCleanupWorker should be improved
                  - Delete in multiple statements, filtering by transactions with commit time in a time interval.
               112722: Improved the algorithm that sets appropriate time windows for delete, based on the evolution of the purge process.


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@123576 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Alan Davis
2016-03-11 17:25:39 +00:00
parent 2fab2cb742
commit f959c5d77b
6 changed files with 150 additions and 74 deletions

View File

@@ -13,6 +13,7 @@
(
select max(txn.id) from alf_transaction txn
where
txn.commit_time_ms >= #{minCommitTime} and
txn.commit_time_ms < #{maxCommitTime}
)
]]>
@@ -30,7 +31,8 @@
where
node.type_qname_id = #{typeQNameId} and
alf_node_properties.node_id = node.id and
txn.commit_time_ms < #{maxCommitTime}
(txn.commit_time_ms >= #{minCommitTime} and
txn.commit_time_ms < #{maxCommitTime})
)
]]>
</delete>

View File

@@ -11,7 +11,8 @@
join alf_transaction txn on (txn.id = n.transaction_id)
where
n.type_qname_id = #{typeQNameId} and
txn.commit_time_ms < #{maxCommitTime}
(txn.commit_time_ms >= #{minCommitTime} and
txn.commit_time_ms < #{maxCommitTime})
]]>
</delete>
@@ -22,7 +23,8 @@
join alf_transaction txn on (txn.id = n.transaction_id)
where
n.type_qname_id = #{typeQNameId} and
txn.commit_time_ms < #{maxCommitTime}
(txn.commit_time_ms >= #{minCommitTime} and
txn.commit_time_ms < #{maxCommitTime})
]]>
</delete>

View File

@@ -2039,9 +2039,9 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO
}
@Override
public int purgeNodes(long maxTxnCommitTimeMs)
public int purgeNodes(long fromTxnCommitTimeMs, long toTxnCommitTimeMs)
{
return deleteNodesByCommitTime(maxTxnCommitTimeMs);
return deleteNodesByCommitTime(fromTxnCommitTimeMs, toTxnCommitTimeMs);
}
/*
@@ -4908,7 +4908,7 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO
Long optionalOldSharedAlcIdInAdditionToNull,
Long newSharedAlcId);
protected abstract int deleteNodeById(Long nodeId);
protected abstract int deleteNodesByCommitTime(long maxTxnCommitTimeMs);
protected abstract int deleteNodesByCommitTime(long fromTxnCommitTimeMs, long toTxnCommitTimeMs);
protected abstract NodeEntity selectNodeById(Long id);
protected abstract NodeEntity selectNodeByNodeRef(NodeRef nodeRef);
protected abstract List<Node> selectNodesByUuids(Long storeId, SortedSet<String> uuids);

View File

@@ -323,12 +323,13 @@ public interface NodeDAO extends NodeBulkLoader
public void deleteNode(Long nodeId);
/**
* Purge deleted nodes where their participating transactions are older than a given time.
* Purge deleted nodes where their participating transactions are in-between the given time interval.
*
* @param maxTxnCommitTimeMs ignore transactions created <i>after</i> this time
* @return Returns the number of deleted nodes purged
* @param fromTxnCommitTimeMs from commit time
* @param toTxnCommitTimeMs to commit time
* @return Returns the number of deleted nodes purged
*/
public int purgeNodes(long maxTxnCommitTimeMs);
public int purgeNodes(long fromTxnCommitTimeMs, long toTxnCommitTimeMs);
/*
* Properties

View File

@@ -388,7 +388,7 @@ public class NodeDAOImpl extends AbstractNodeDAOImpl
}
@Override
protected int deleteNodesByCommitTime(long maxTxnCommitTimeMs)
protected int deleteNodesByCommitTime(long fromTxnCommitTimeMs, long toTxnCommitTimeMs)
{
// Get the deleted nodes
Pair<Long, QName> deletedTypePair = qnameDAO.getQName(ContentModel.TYPE_DELETED);
@@ -399,7 +399,8 @@ public class NodeDAOImpl extends AbstractNodeDAOImpl
}
TransactionQueryEntity query = new TransactionQueryEntity();
query.setTypeQNameId(deletedTypePair.getFirst());
query.setMaxCommitTime(maxTxnCommitTimeMs);
query.setMinCommitTime(fromTxnCommitTimeMs);
query.setMaxCommitTime(toTxnCommitTimeMs);
// TODO: Fix ALF-16030 Use ON DELETE CASCADE for node aspects and properties
// First clean up properties
template.delete(DELETE_NODE_PROPS_BY_TXN_COMMIT_TIME, query);

View File

@@ -1,5 +1,5 @@
/*
* Copyright (C) 2005-2010 Alfresco Software Limited.
* Copyright (C) 2005-2015 Alfresco Software Limited.
*
* This file is part of Alfresco
*
@@ -88,8 +88,8 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker
*/
public void setPurgeSize(int purgeSize)
{
this.purgeSize = purgeSize;
}
this.purgeSize = purgeSize;
}
/**
* Cleans up deleted nodes that are older than the given minimum age.
@@ -101,54 +101,97 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker
{
final List<String> results = new ArrayList<String>(100);
final long maxCommitTimeMs = System.currentTimeMillis() - minAge;
final long maxCommitTime = System.currentTimeMillis() - minAge;
long fromCommitTime = nodeDAO.getMinUnusedTxnCommitTime().longValue();
RetryingTransactionCallback<Integer> purgeNodesCallback = new RetryingTransactionCallback<Integer>()
{
public Integer execute() throws Throwable
{
return nodeDAO.purgeNodes(maxCommitTimeMs);
}
};
// TODO: Add error catching and decrement the maxCommitTimeMs to reduce DB resource usage
RetryingTransactionHelper txnHelper = transactionService.getRetryingTransactionHelper();
txnHelper.setMaxRetries(5); // Limit number of retries
txnHelper.setRetryWaitIncrementMs(1000); // 1 second to allow other cleanups time to get through
// Get nodes to delete
Integer purgeCount = new Integer(0);
// Purge nodes
try
long loopPurgeSize = purgeSize;
Long purgeCount = new Long(0);
while (true)
{
purgeCount = txnHelper.doInTransaction(purgeNodesCallback, false, true);
if (purgeCount.intValue() > 0)
// Ensure we keep the lock
refreshLock();
long toCommitTime = fromCommitTime + loopPurgeSize;
if(toCommitTime > maxCommitTime)
{
String msg =
"Purged old nodes: \n" +
" Max commit time: " + maxCommitTimeMs + "\n" +
" Purge count: " + purgeCount;
toCommitTime = maxCommitTime;
}
try
{
DeleteNodesByTransactionsCallback purgeNodesCallback = new DeleteNodesByTransactionsCallback(nodeDAO, fromCommitTime, toCommitTime);
purgeCount = txnHelper.doInTransaction(purgeNodesCallback, false, true);
if (purgeCount.longValue() > 0)
{
String msg =
"Purged old nodes: \n" +
" From commit time (ms): " + fromCommitTime + "\n" +
" To commit time (ms): " + toCommitTime + "\n" +
" Purge count: " + purgeCount;
results.add(msg);
}
fromCommitTime += loopPurgeSize;
// If the delete succeeded, double the loopPurgeSize
loopPurgeSize *= 2L;
if (loopPurgeSize > purgeSize)
{
loopPurgeSize = purgeSize;
}
}
catch (Throwable e)
{
String msg =
"Failed to purge nodes. \n" +
" If the purgable set is too large for the available DB resources \n" +
" then the nodes can be purged manually as well. \n" +
" Set log level to WARN for this class to get exception log: \n" +
" From commit time (ms): " + fromCommitTime + "\n" +
" To commit time (ms): " + toCommitTime + "\n" +
" Error: " + e.getMessage();
// It failed; do a full log in WARN mode
if (logger.isWarnEnabled())
{
logger.warn(msg, e);
}
else
{
logger.error(msg);
}
results.add(msg);
// If delete failed, halve the loopPurgeSize and try again
loopPurgeSize /= 2L;
// If the purge size drops below 10% of the original size, the entire process must stop
if (loopPurgeSize < 0.1 * purgeSize)
{
msg ="Failed to purge nodes. \n" +
" The purge time interval dropped below 10% of the original size (" + purgeSize + "), so the purging process was stopped.";
if (logger.isWarnEnabled())
{
logger.warn(msg, e);
}
else
{
logger.error(msg);
}
results.add(msg);
break;
}
}
if(fromCommitTime >= maxCommitTime)
{
break;
}
}
catch (Throwable e)
{
String msg =
"Failed to purge nodes." +
" If the purgable set is too large for the available DB resources \n" +
" then the nodes can be purged manually as well. \n" +
" Set log level to WARN for this class to get exception log: \n" +
" Max commit time: " + maxCommitTimeMs + "\n" +
" Error: " + e.getMessage();
// It failed; do a full log in WARN mode
if (logger.isWarnEnabled())
{
logger.warn(msg, e);
}
else
{
logger.error(msg);
}
results.add(msg);
}
// Done
return results;
}
@@ -169,6 +212,10 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker
final long maxCommitTime = System.currentTimeMillis() - minAge;
long fromCommitTime = nodeDAO.getMinUnusedTxnCommitTime().longValue();
RetryingTransactionHelper txnHelper = transactionService.getRetryingTransactionHelper();
txnHelper.setMaxRetries(5); // Limit number of retries
txnHelper.setRetryWaitIncrementMs(1000); // 1 second to allow other cleanups time to get through
// delete unused transactions in batches of size 'purgeTxnBlockSize'
while (true)
@@ -176,10 +223,6 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker
// Ensure we keep the lock
refreshLock();
RetryingTransactionHelper txnHelper = transactionService.getRetryingTransactionHelper();
txnHelper.setMaxRetries(5); // Limit number of retries
txnHelper.setRetryWaitIncrementMs(1000); // 1 second to allow other cleanups time to get through
long toCommitTime = fromCommitTime + purgeSize;
if(toCommitTime >= maxCommitTime)
{
@@ -195,7 +238,7 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker
{
String msg =
"Purged old txns: \n" +
" From commit time (ms): " + fromCommitTime + "\n" +
" From commit time (ms): " + fromCommitTime + "\n" +
" To commit time (ms): " + toCommitTime + "\n" +
" Purge count: " + purgeCount;
results.add(msg);
@@ -222,7 +265,7 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker
break;
}
fromCommitTime += purgeSize;
fromCommitTime += purgeSize;
if(fromCommitTime >= maxCommitTime)
{
break;
@@ -232,26 +275,53 @@ public class DeletedNodeCleanupWorker extends AbstractNodeCleanupWorker
return results;
}
private static abstract class DeleteByTransactionsCallback implements RetryingTransactionCallback<Long>
{
protected NodeDAO nodeDAO;
protected long fromCommitTime;
protected long toCommitTime;
DeleteByTransactionsCallback(NodeDAO nodeDAO, long fromCommitTime, long toCommitTime)
{
this.nodeDAO = nodeDAO;
this.fromCommitTime = fromCommitTime;
this.toCommitTime = toCommitTime;
}
public abstract Long execute() throws Throwable;
}
/*
* Delete a block of unused transactions
*/
private static class DeleteTransactionsCallback implements RetryingTransactionCallback<Long>
private static class DeleteTransactionsCallback extends DeleteByTransactionsCallback
{
private NodeDAO nodeDAO;
private long fromCommitTime;
private long toCommitTime;
DeleteTransactionsCallback(NodeDAO nodeDAO, long fromCommitTime, long toCommitTime)
{
this.nodeDAO = nodeDAO;
this.fromCommitTime = fromCommitTime;
this.toCommitTime = toCommitTime;
}
DeleteTransactionsCallback(NodeDAO nodeDAO, long fromCommitTime, long toCommitTime)
{
super(nodeDAO, fromCommitTime, toCommitTime);
}
public Long execute() throws Throwable
{
long count = nodeDAO.deleteTxnsUnused(fromCommitTime, toCommitTime);
return count;
}
long count = nodeDAO.deleteTxnsUnused(fromCommitTime, toCommitTime);
return count;
}
}
/*
* Purge a block of deleted nodes and their properties
*/
private static class DeleteNodesByTransactionsCallback extends DeleteByTransactionsCallback
{
DeleteNodesByTransactionsCallback(NodeDAO nodeDAO, long fromCommitTime, long toCommitTime)
{
super(nodeDAO, fromCommitTime, toCommitTime);
}
public Long execute() throws Throwable
{
long count = nodeDAO.purgeNodes(fromCommitTime, toCommitTime);
return count;
}
}
}