ALF-4936 - implementing transfer lock timeout.

Also reworked Replication Action Executor timeout code.

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@23155 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Mark Rogers
2010-10-15 14:59:14 +00:00
parent c47322b28e
commit 4240e7d2dc
8 changed files with 1288 additions and 467 deletions

View File

@@ -75,9 +75,10 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
private ReplicationParams replicationParams;
/**
* By default, we lock for 30 minutes
* By default, we lock for a minute, so if this server is shutdown another can take over a
* minute later.
*/
private long replicationActionLockDuration = 30*60*1000;
private long replicationActionLockDuration = 60*1000;
/**
* Injects the NodeService bean.
@@ -260,9 +261,11 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
// Turn our payload list of root nodes into something that
// the transfer service can work with
Set<NodeRef> toTransfer;
try {
try
{
toTransfer = expandPayload(replicationDef);
} catch(Exception e) {
}
catch(Exception e) {
lock.close();
throw new ReplicationServiceException("Error processing payload list - " + e.getMessage(), e);
}
@@ -346,17 +349,21 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
* A {@link TransferCallback} which periodically renews the
* lock held against a {@link ReplicationDefinition}
*/
protected class ReplicationDefinitionLockExtender implements TransferCallback
protected class ReplicationDefinitionLockExtender
implements TransferCallback, JobLockService.JobLockRefreshCallback
{
private ReplicationDefinition replicationDef;
private String transferId;
private String lockToken;
private boolean active;
protected ReplicationDefinitionLockExtender(ReplicationDefinition replicationDef)
{
this.replicationDef = replicationDef;
acquireLock();
}
/**
* No matter what the event is, refresh
* our lock on the {@link ReplicationDefinition}, and
@@ -364,35 +371,19 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
*/
public void processEvent(TransferEvent event)
{
// Extend our lock
refreshLock();
// If it's the enter event, do skip
if(event instanceof TransferEventEnterState)
{
return;
}
// If it's the enter event, do skip
if(event instanceof TransferEventEnterState)
{
return;
}
// If this is a begin event, make a note of the ID
if(event instanceof TransferEventBegin)
{
transferId = ((TransferEventBegin)event).getTransferId();
}
// Has someone tried to cancel us?
if(actionTrackingService.isCancellationRequested(replicationDef))
{
// Tell the transfer service to cancel, if we can
if(transferId != null)
{
transferService.cancelAsync(transferId);
logger.debug("Replication cancel was requested for " + replicationDef.getReplicationQName());
}
else
{
logger.warn("Unable to cancel replication as requested, as transfer has yet to reach a cancellable state");
}
}
// If this is a begin event, make a note of the ID
if(event instanceof TransferEventBegin)
{
transferId = ((TransferEventBegin)event).getTransferId();
}
checkCancel();
}
/**
@@ -401,21 +392,20 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
*/
public void close()
{
releaseLock();
releaseLock();
}
/**
* Get a lock on the job.
* Tries every 5 seconds for 30 seconds, then
* every 30 seconds until 3 times the lock
* duration.
* every 30 seconds for half an hour.
*
* @throws LockAcquisitionException
*/
private void acquireLock()
{
long retryTime = 30*1000;
int retries = (int)(replicationActionLockDuration * 3 / retryTime);
try {
try
{
// Quick try
lockToken = jobLockService.getLock(
replicationDef.getReplicationQName(),
@@ -423,13 +413,38 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
5 * 1000, // Every 5 seconds
6 // 6 times = wait up to 30 seconds
);
} catch(LockAcquisitionException e) {
active = true;
/**
* Got the lock - now register the refresh callback which will keep the
* lock alive
*/
jobLockService.refreshLock(
lockToken,
replicationDef.getReplicationQName(),
replicationActionLockDuration,
this
);
if(logger.isDebugEnabled())
{
logger.debug("lock aquired:" + replicationDef.getReplicationQName() );
}
}
catch(LockAcquisitionException e)
{
long retryTime = 30*1000;
int retries = (int)(60);
logger.debug(
"Unable to get the replication job lock on " +
replicationDef.getReplicationQName() +
", retrying every " + (int)(retryTime/1000) + " seconds"
);
active = true;
// Long try - every 30 seconds
lockToken = jobLockService.getLock(
replicationDef.getReplicationQName(),
@@ -437,22 +452,80 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
retryTime,
retries
);
/**
* Got the lock - now register the refresh callback which will keep the
* lock alive
*/
jobLockService.refreshLock(
lockToken,
replicationDef.getReplicationQName(),
replicationActionLockDuration,
this
);
if(logger.isDebugEnabled())
{
logger.debug("lock aquired (from long timeout):" + replicationDef.getReplicationQName() );
}
}
}
private void refreshLock()
{
jobLockService.refreshLock(
lockToken,
replicationDef.getReplicationQName(),
replicationActionLockDuration
);
}
private void releaseLock()
{
jobLockService.releaseLock(
lockToken,
replicationDef.getReplicationQName()
);
if(active)
{
if(logger.isDebugEnabled())
{
logger.debug("about to release lock:" + replicationDef.getReplicationQName());
}
jobLockService.releaseLock(
lockToken,
replicationDef.getReplicationQName());
active=false;
}
}
private void checkCancel()
{
// Has someone tried to cancel us?
if(actionTrackingService.isCancellationRequested(replicationDef))
{
// Tell the transfer service to cancel, if we can
if(transferId != null)
{
transferService.cancelAsync(transferId);
logger.debug("Replication cancel was requested for " + replicationDef.getReplicationQName());
}
else
{
logger.warn("Unable to cancel replication as requested, as transfer has yet to reach a cancellable state");
}
}
}
/**
* Job Lock Refresh
* @return
*/
@Override
public boolean isActive()
{
if(logger.isDebugEnabled())
{
logger.debug("lock callback isActive:" + active + ", " + replicationDef.getReplicationQName());
}
return active;
}
/**
* Job Lock Service has released us.
*/
@Override
public void lockReleased()
{
logger.debug("lock released:" + replicationDef.getReplicationQName());
// nothing to do
}
}
}