Fixes to replication job status handling:

- success, error and cancelled states now correctly reported
- source and target reports now correctly provided for each of above

Changes:
- deprecated TransferService interface, replaced by TransferService2
  - introduces new sync transfer methods
  - new TransferServiceImpl2 class, old TransferServiceImpl delegates to new class
- sync transfer now returns TransferEndEvent
- sync transfer now raises TransferFailureException
- success, error and cancelled events are now end events (raised after report events)
- transfer client handling refactored to support cancel and errors appropriately
  - converted to event loop with polling of server status for all states
  - cancel request may now end with success or error (depending on when cancel requested)
  - extract transfer errors from server
  - only raise exception for errors (cancelled now returns)
  - source and destination reports written for all states
- Added TransferEndEvent interface for end events - reports attached to end event
- replication service fixed to record source and dest reports in error case
- action service fixed to record cancelled state

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@22390 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
David Caruana
2010-09-10 14:00:05 +00:00
parent 82955f3ae2
commit 35b2b7a122
25 changed files with 2176 additions and 1492 deletions

View File

@@ -27,25 +27,28 @@ import org.alfresco.repo.action.ActionCancelledException;
import org.alfresco.repo.action.executer.ActionExecuterAbstractBase;
import org.alfresco.repo.lock.JobLockService;
import org.alfresco.repo.lock.LockAcquisitionException;
import org.alfresco.repo.transaction.RetryingTransactionHelper;
import org.alfresco.repo.transfer.ChildAssociatedNodeFinder;
import org.alfresco.repo.transfer.ContentClassFilter;
import org.alfresco.service.cmr.action.Action;
import org.alfresco.service.cmr.action.ActionTrackingService;
import org.alfresco.service.cmr.action.ParameterDefinition;
import org.alfresco.service.cmr.replication.ReplicationDefinition;
import org.alfresco.service.cmr.replication.ReplicationService;
import org.alfresco.service.cmr.replication.ReplicationServiceException;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.transfer.NodeCrawler;
import org.alfresco.service.cmr.transfer.NodeCrawlerFactory;
import org.alfresco.service.cmr.transfer.TransferCallback;
import org.alfresco.service.cmr.transfer.TransferCancelledException;
import org.alfresco.service.cmr.transfer.TransferDefinition;
import org.alfresco.service.cmr.transfer.TransferEndEvent;
import org.alfresco.service.cmr.transfer.TransferEvent;
import org.alfresco.service.cmr.transfer.TransferEventBegin;
import org.alfresco.service.cmr.transfer.TransferEventCancelled;
import org.alfresco.service.cmr.transfer.TransferEventEnterState;
import org.alfresco.service.cmr.transfer.TransferEventReport;
import org.alfresco.service.cmr.transfer.TransferService;
import org.alfresco.service.cmr.transfer.TransferEventError;
import org.alfresco.service.cmr.transfer.TransferFailureException;
import org.alfresco.service.cmr.transfer.TransferService2;
import org.alfresco.service.transaction.TransactionService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,10 +63,11 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
private static Log logger = LogFactory.getLog(ReplicationActionExecutor.class);
private JobLockService jobLockService;
private TransferService transferService;
private ReplicationService replicationService;
private TransferService2 transferService;
private NodeCrawlerFactory nodeCrawlerFactory;
private ActionTrackingService actionTrackingService;
private TransactionService transactionService;
private ReplicationDefinitionPersisterImpl replicationDefinitionPersister;
/**
* By default, we lock for 30 minutes
@@ -80,22 +84,12 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
this.jobLockService = jobLockService;
}
/**
* Injects the ReplicationService bean.
*
* @param nodeService the ReplicationService.
*/
public void setReplicationService(ReplicationService replicationService)
{
this.replicationService = replicationService;
}
/**
* Injects the TransferService bean.
*
* @param transferService the TransferService.
*/
public void setTransferService(TransferService transferService)
public void setTransferService(TransferService2 transferService)
{
this.transferService = transferService;
}
@@ -120,6 +114,25 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
this.actionTrackingService = actionTrackingService;
}
/**
* Injects the TransactionService bean.
*
* @param transactionService the TransactionService.
*/
public void setTransactionService(TransactionService transactionService)
{
this.transactionService = transactionService;
}
/**
* Injects the ReplicationDefinitionPersister bean.
* @param replicationDefinitionPersister
*/
public void setReplicationDefinitionPersister(ReplicationDefinitionPersisterImpl replicationDefinitionPersister)
{
this.replicationDefinitionPersister = replicationDefinitionPersister;
}
@Override
protected void addParameterDefinitions(List<ParameterDefinition> paramList) {
// Not used - our definitions hold everything on them
@@ -197,8 +210,8 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
}
// Clear the previous transfer report references
replicationDef.setLocalTransferReport(null);
replicationDef.setRemoteTransferReport(null);
// replicationDef.setLocalTransferReport(null);
// replicationDef.setRemoteTransferReport(null);
// Lock the service - only one instance of the replication
// should occur at a time
@@ -215,43 +228,78 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
throw new ReplicationServiceException("Error processing payload list - " + e.getMessage(), e);
}
// Holder for reports generated by the transfer
ReplicationReportCollector reports = new ReplicationReportCollector();
// Ask the transfer service to do the replication
// work for us
try {
// Build the definition
TransferDefinition transferDefinition =
buildTransferDefinition(replicationDef, toTransfer);
// Off we go
transferService.transfer(
TransferEndEvent endEvent = null;
try
{
// Build the definition
TransferDefinition transferDefinition =
buildTransferDefinition(replicationDef, toTransfer);
// Off we go
endEvent = transferService.transfer(
replicationDef.getTargetName(),
transferDefinition,
lock, reports
);
lock);
// Record the details of the transfer reports
replicationDef.setLocalTransferReport(reports.getLocalReport());
replicationDef.setRemoteTransferReport(reports.getRemoteReport());
} catch(Exception e) {
if(! (e instanceof TransferCancelledException))
{
lock.close();
throw new ReplicationServiceException("Error executing transfer - " + e.getMessage(), e);
}
if (endEvent instanceof TransferEventCancelled)
{
if (logger.isDebugEnabled())
logger.debug("Cancelling replication job");
// If we were cancelled, throw the magic exception so
// that this is correctly recorded
throw new ActionCancelledException(replicationDef);
}
// Record details of the transfer reports (in success case)
replicationDef.setLocalTransferReport(endEvent.getSourceReport());
replicationDef.setRemoteTransferReport(endEvent.getDestinationReport());
}
// All done, release our lock
lock.close();
// If we were cancelled, throw the magic exception so
// that this is correctly recorded
if(actionTrackingService.isCancellationRequested(replicationDef))
catch(Exception e)
{
throw new ActionCancelledException(replicationDef);
if (e instanceof ActionCancelledException)
{
writeDefinitionReports(replicationDef, endEvent.getSourceReport(), endEvent.getDestinationReport());
throw (ActionCancelledException)e;
}
if (e instanceof TransferFailureException)
{
TransferEventError failureEndEvent = ((TransferFailureException)e).getErrorEvent();
writeDefinitionReports(replicationDef, failureEndEvent.getSourceReport(), failureEndEvent.getDestinationReport());
throw new ReplicationServiceException("Error executing transfer - " + e.getCause().getMessage(), e);
}
writeDefinitionReports(replicationDef, null, null);
throw new ReplicationServiceException("Error executing transfer - " + e.getMessage(), e);
}
finally
{
lock.close();
}
}
private void writeDefinitionReports(final ReplicationDefinition replicationDef, NodeRef sourceReport, NodeRef destinationReport)
{
replicationDef.setLocalTransferReport(sourceReport);
replicationDef.setRemoteTransferReport(destinationReport);
if (replicationDef.getNodeRef() != null)
{
// Record details of the transfer reports
transactionService.getRetryingTransactionHelper().doInTransaction(
new RetryingTransactionHelper.RetryingTransactionCallback<Object>()
{
public Object execute() throws Throwable
{
if (logger.isDebugEnabled())
logger.debug("Exception - writing replication def reports");
replicationDefinitionPersister.saveReplicationDefinition(replicationDef);
return null;
}
}, false, true);
}
}
/**
@@ -367,53 +415,4 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
);
}
}
/**
* A {@link TransferCallback} which collects the various reports generated by
* the transfer.
*/
protected class ReplicationReportCollector implements TransferCallback
{
private NodeRef localReport;
private NodeRef remoteReport;
protected ReplicationReportCollector()
{
}
/**
* Collect source and destination repository target reports
*/
public void processEvent(TransferEvent event)
{
if(event instanceof TransferEventReport)
{
TransferEventReport reportEvent = (TransferEventReport)event;
if (reportEvent.getReportType().equals(TransferEventReport.ReportType.SOURCE))
{
localReport = reportEvent.getNodeRef();
}
else if (reportEvent.getReportType().equals(TransferEventReport.ReportType.DESTINATION))
{
remoteReport = reportEvent.getNodeRef();
}
}
}
/**
* @return local transfer report
*/
public NodeRef getLocalReport()
{
return localReport;
}
/**
* @return target transfer report
*/
public NodeRef getRemoteReport()
{
return remoteReport;
}
}
}