Merged HEAD-BUG-FIX (5.1/Cloud) to HEAD (5.1/Cloud)

96412: Merged DEV to HEAD-BUG-FIX (5.1.0)
      94103: ACE-3729: unhandled Exception: org.springframework.dao.CannotAcquireLockException on Cloud Staging
         - JobLockRefreshCallback will be used to refresh a lock.


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@96497 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Alan Davis
2015-02-07 10:32:05 +00:00
parent 3c3520c8d9
commit a069bf7000

View File

@@ -22,11 +22,13 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.repo.domain.contentdata.ContentDataDAO;
import org.alfresco.repo.domain.contentdata.ContentDataDAO.ContentUrlHandler;
import org.alfresco.repo.lock.JobLockService;
import org.alfresco.repo.lock.JobLockService.JobLockRefreshCallback;
import org.alfresco.repo.lock.LockAcquisitionException;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.cmr.dictionary.DictionaryService;
@@ -34,7 +36,6 @@ import org.alfresco.service.cmr.repository.ContentService;
import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.Pair;
import org.alfresco.util.PropertyCheck;
import org.alfresco.util.VmShutdownListener;
import org.alfresco.util.VmShutdownListener.VmShutdownException;
@@ -104,7 +105,6 @@ public class ContentStoreCleaner
private static final QName LOCK_QNAME = QName.createQName(NamespaceService.SYSTEM_MODEL_1_0_URI, "ContentStoreCleaner");
private static final long LOCK_TTL = 30000L;
private static ThreadLocal<Pair<Long, String>> lockThreadLocal = new ThreadLocal<Pair<Long, String>>();
private static Log logger = LogFactory.getLog(ContentStoreCleaner.class);
@@ -230,55 +230,42 @@ public class ContentStoreCleaner
}
/**
* Lazily update the job lock
* Acquire the job lock
*/
private void refreshLock()
private String acquireLock(JobLockRefreshCallback lockCallback)
{
Pair<Long, String> lockPair = lockThreadLocal.get();
if (lockPair == null)
// Get lock
String lockToken = jobLockService.getLock(LOCK_QNAME, LOCK_TTL);
// Register the refresh callback which will keep the lock alive
jobLockService.refreshLock(lockToken, LOCK_QNAME, LOCK_TTL, lockCallback);
if (logger.isDebugEnabled())
{
String lockToken = jobLockService.getLock(LOCK_QNAME, LOCK_TTL);
Long lastLock = new Long(System.currentTimeMillis());
// We have not locked before
lockPair = new Pair<Long, String>(lastLock, lockToken);
lockThreadLocal.set(lockPair);
}
else
{
long now = System.currentTimeMillis();
long lastLock = lockPair.getFirst().longValue();
String lockToken = lockPair.getSecond();
// Only refresh the lock if we are past a threshold
if (now - lastLock > (long)(LOCK_TTL/2L))
{
jobLockService.refreshLock(lockToken, LOCK_QNAME, LOCK_TTL);
lastLock = System.currentTimeMillis();
lockPair = new Pair<Long, String>(lastLock, lockToken);
lockThreadLocal.set(lockPair);
}
logger.debug("lock acquired: " + LOCK_QNAME + ": " + lockToken);
}
return lockToken;
}
/**
* Release the lock after the job completes
*/
private void releaseLock()
private void releaseLock(LockCallback lockCallback, String lockToken)
{
Pair<Long, String> lockPair = lockThreadLocal.get();
if (lockPair != null)
if (lockCallback != null)
{
// We can't release without a token
try
lockCallback.running.set(false);
}
if (lockToken != null)
{
jobLockService.releaseLock(lockToken, LOCK_QNAME);
if (logger.isDebugEnabled())
{
jobLockService.releaseLock(lockPair.getSecond(), LOCK_QNAME);
}
finally
{
// Reset
lockThreadLocal.set(null);
logger.debug("Lock released: " + LOCK_QNAME + ": " + lockToken);
}
}
// else: We can't release without a token
}
public void execute()
@@ -292,10 +279,12 @@ public class ContentStoreCleaner
return;
}
LockCallback lockCallback = new LockCallback();
String lockToken = null;
try
{
logger.debug("Content store cleanup started.");
refreshLock();
lockToken = acquireLock(lockCallback);
executeInternal();
// Done
if (logger.isDebugEnabled())
@@ -321,7 +310,7 @@ public class ContentStoreCleaner
}
finally
{
releaseLock();
releaseLock(lockCallback, lockToken);
}
}
@@ -338,7 +327,6 @@ public class ContentStoreCleaner
};
while (true)
{
refreshLock();
Long lastProcessedOrphanId = transactionService.getRetryingTransactionHelper().doInTransaction(getAndDeleteWork);
if (vmShutdownListener.isVmShuttingDown())
{
@@ -358,6 +346,27 @@ public class ContentStoreCleaner
// Done
}
private class LockCallback implements JobLockRefreshCallback
{
final AtomicBoolean running = new AtomicBoolean(true);
@Override
public boolean isActive()
{
return running.get();
}
@Override
public void lockReleased()
{
running.set(false);
if (logger.isDebugEnabled())
{
logger.debug("Lock release notification: " + LOCK_QNAME);
}
}
}
/**
*
* @param minIdInclusive the min content URL ID (inclusive)