mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
Ability to cancel replication actions (replication 94)
Cance ability, which feeds down into the transfer service, and feeds it back up to the action tracking service. Also, refactor of the replication service unit tests to do the transactions itself, which is needed to reliably test async actions git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@21387 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -23,51 +23,52 @@ import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.alfresco.model.ContentModel;
|
||||
import org.alfresco.repo.action.ActionCancelledException;
|
||||
import org.alfresco.repo.action.executer.ActionExecuterAbstractBase;
|
||||
import org.alfresco.repo.lock.JobLockService;
|
||||
import org.alfresco.repo.lock.LockAcquisitionException;
|
||||
import org.alfresco.repo.transfer.ChildAssociatedNodeFinder;
|
||||
import org.alfresco.repo.transfer.ContentClassFilter;
|
||||
import org.alfresco.service.cmr.action.Action;
|
||||
import org.alfresco.service.cmr.action.ActionTrackingService;
|
||||
import org.alfresco.service.cmr.action.ParameterDefinition;
|
||||
import org.alfresco.service.cmr.replication.ReplicationDefinition;
|
||||
import org.alfresco.service.cmr.replication.ReplicationService;
|
||||
import org.alfresco.service.cmr.replication.ReplicationServiceException;
|
||||
import org.alfresco.service.cmr.repository.NodeRef;
|
||||
import org.alfresco.service.cmr.repository.NodeService;
|
||||
import org.alfresco.service.cmr.transfer.NodeCrawler;
|
||||
import org.alfresco.service.cmr.transfer.NodeCrawlerFactory;
|
||||
import org.alfresco.service.cmr.transfer.TransferCallback;
|
||||
import org.alfresco.service.cmr.transfer.TransferCancelledException;
|
||||
import org.alfresco.service.cmr.transfer.TransferDefinition;
|
||||
import org.alfresco.service.cmr.transfer.TransferEvent;
|
||||
import org.alfresco.service.cmr.transfer.TransferEventBegin;
|
||||
import org.alfresco.service.cmr.transfer.TransferEventEnterState;
|
||||
import org.alfresco.service.cmr.transfer.TransferService;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* @author Nick Burch
|
||||
* @since 3.4
|
||||
*/
|
||||
public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
||||
private NodeService nodeService;
|
||||
/**
|
||||
* The logger
|
||||
*/
|
||||
private static Log logger = LogFactory.getLog(ReplicationActionExecutor.class);
|
||||
|
||||
private JobLockService jobLockService;
|
||||
private TransferService transferService;
|
||||
private ReplicationService replicationService;
|
||||
private NodeCrawlerFactory nodeCrawlerFactory;
|
||||
private ActionTrackingService actionTrackingService;
|
||||
|
||||
/**
|
||||
* By default, we lock for 30 minutes
|
||||
*/
|
||||
private long replicationActionLockDuration = 30*60*1000;
|
||||
|
||||
/**
|
||||
* Injects the NodeService bean.
|
||||
*
|
||||
* @param nodeService the NodeService.
|
||||
*/
|
||||
public void setNodeService(NodeService nodeService)
|
||||
{
|
||||
this.nodeService = nodeService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Injects the JobLockService bean.
|
||||
*
|
||||
@@ -108,9 +109,19 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
||||
this.nodeCrawlerFactory = nodeCrawlerFactory;
|
||||
}
|
||||
|
||||
/**
|
||||
* Injects the ActionTrackingService bean.
|
||||
*
|
||||
* @param actionTrackingService the ActionTrackingService.
|
||||
*/
|
||||
public void setActionTrackingService(ActionTrackingService actionTrackingService)
|
||||
{
|
||||
this.actionTrackingService = actionTrackingService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addParameterDefinitions(List<ParameterDefinition> paramList) {
|
||||
// TODO Is this needed?
|
||||
// Not used - our definitions hold everything on them
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -182,8 +193,6 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
||||
try {
|
||||
toTransfer = expandPayload(replicationDef);
|
||||
} catch(Exception e) {
|
||||
// TODO - Record the error
|
||||
System.err.println(e);
|
||||
lock.close();
|
||||
throw new ReplicationServiceException("Error processing payload list", e);
|
||||
}
|
||||
@@ -202,14 +211,22 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
||||
lock
|
||||
);
|
||||
} catch(Exception e) {
|
||||
// TODO - Record the error
|
||||
System.err.println(e);
|
||||
lock.close();
|
||||
throw new ReplicationServiceException("Error executing transfer", e);
|
||||
if(! (e instanceof TransferCancelledException))
|
||||
{
|
||||
lock.close();
|
||||
throw new ReplicationServiceException("Error executing transfer", e);
|
||||
}
|
||||
}
|
||||
|
||||
// All done
|
||||
// All done, release our lock
|
||||
lock.close();
|
||||
|
||||
// If we were cancelled, throw the magic exception so
|
||||
// that this is correctly recorded
|
||||
if(actionTrackingService.isCancellationRequested(replicationDef))
|
||||
{
|
||||
throw new ActionCancelledException(replicationDef);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -219,6 +236,7 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
||||
protected class ReplicationDefinitionLockExtender implements TransferCallback
|
||||
{
|
||||
private ReplicationDefinition replicationDef;
|
||||
private String transferId;
|
||||
private String lockToken;
|
||||
|
||||
protected ReplicationDefinitionLockExtender(ReplicationDefinition replicationDef)
|
||||
@@ -233,11 +251,36 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
||||
*/
|
||||
public void processEvent(TransferEvent event)
|
||||
{
|
||||
// Extend our lock
|
||||
refreshLock();
|
||||
|
||||
// If it's the enter event, do skip
|
||||
if(event instanceof TransferEventEnterState)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO - Check to see if cancel was requested
|
||||
// TODO - If it was, use TransferService.cancelAsync(transferId)
|
||||
// If this is a begin event, make a note of the ID
|
||||
if(event instanceof TransferEventBegin)
|
||||
{
|
||||
transferId = ((TransferEventBegin)event).getTransferId();
|
||||
}
|
||||
|
||||
// Has someone tried to cancel us?
|
||||
if(actionTrackingService.isCancellationRequested(replicationDef))
|
||||
{
|
||||
// Tell the transfer service to cancel, if we can
|
||||
if(transferId != null)
|
||||
{
|
||||
transferService.cancelAsync(transferId);
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.warn("Unable to cancel replication as requested, as transfer has yet to reach a cancellable state");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Give up our lock on the
|
||||
* {@link ReplicationDefinition}
|
||||
|
Reference in New Issue
Block a user