Nick Burch 3ffb67e824 Replication Service transfer report tracking
Make the transfer service report available through the replication service, and add unit tests to show that it turns up as expected


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@21441 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
2010-07-27 15:25:54 +00:00

345 lines
11 KiB
Java

/*
* Copyright (C) 2005-2010 Alfresco Software Limited.
*
* This file is part of Alfresco
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
*/
package org.alfresco.repo.replication;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.action.ActionCancelledException;
import org.alfresco.repo.action.executer.ActionExecuterAbstractBase;
import org.alfresco.repo.lock.JobLockService;
import org.alfresco.repo.lock.LockAcquisitionException;
import org.alfresco.repo.transfer.ChildAssociatedNodeFinder;
import org.alfresco.repo.transfer.ContentClassFilter;
import org.alfresco.service.cmr.action.Action;
import org.alfresco.service.cmr.action.ActionTrackingService;
import org.alfresco.service.cmr.action.ParameterDefinition;
import org.alfresco.service.cmr.replication.ReplicationDefinition;
import org.alfresco.service.cmr.replication.ReplicationService;
import org.alfresco.service.cmr.replication.ReplicationServiceException;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.transfer.NodeCrawler;
import org.alfresco.service.cmr.transfer.NodeCrawlerFactory;
import org.alfresco.service.cmr.transfer.TransferCallback;
import org.alfresco.service.cmr.transfer.TransferCancelledException;
import org.alfresco.service.cmr.transfer.TransferDefinition;
import org.alfresco.service.cmr.transfer.TransferEvent;
import org.alfresco.service.cmr.transfer.TransferEventBegin;
import org.alfresco.service.cmr.transfer.TransferEventEnterState;
import org.alfresco.service.cmr.transfer.TransferService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @author Nick Burch
* @since 3.4
*/
public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
/**
* The logger
*/
private static Log logger = LogFactory.getLog(ReplicationActionExecutor.class);
private JobLockService jobLockService;
private TransferService transferService;
private ReplicationService replicationService;
private NodeCrawlerFactory nodeCrawlerFactory;
private ActionTrackingService actionTrackingService;
/**
* By default, we lock for 30 minutes
*/
private long replicationActionLockDuration = 30*60*1000;
/**
* Injects the JobLockService bean.
*
* @param jobLockService the JobLockService.
*/
public void setJobLockService(JobLockService jobLockService)
{
this.jobLockService = jobLockService;
}
/**
* Injects the ReplicationService bean.
*
* @param nodeService the ReplicationService.
*/
public void setReplicationService(ReplicationService replicationService)
{
this.replicationService = replicationService;
}
/**
* Injects the TransferService bean.
*
* @param transferService the TransferService.
*/
public void setTransferService(TransferService transferService)
{
this.transferService = transferService;
}
/**
* Injects the NodeCrawlerFactory bean.
*
* @param nodeCrawlerFactory the NodeCrawlerFactory.
*/
public void setNodeCrawlerFactory(NodeCrawlerFactory nodeCrawlerFactory)
{
this.nodeCrawlerFactory = nodeCrawlerFactory;
}
/**
* Injects the ActionTrackingService bean.
*
* @param actionTrackingService the ActionTrackingService.
*/
public void setActionTrackingService(ActionTrackingService actionTrackingService)
{
this.actionTrackingService = actionTrackingService;
}
@Override
protected void addParameterDefinitions(List<ParameterDefinition> paramList) {
// Not used - our definitions hold everything on them
}
/**
* Takes a {@link ReplicationDefinition}, which contains one or
* more payloads {@link NodeRef}s, and expands them into a
* full list of nodes to be transfered.
*/
protected Set<NodeRef> expandPayload(ReplicationDefinition replicationDef) {
// Turn our payload list of root nodes into something that
// the transfer service can work with
Set<NodeRef> toTransfer = new HashSet<NodeRef>(89);
NodeCrawler crawler = nodeCrawlerFactory.getNodeCrawler();
crawler.setNodeFinders(new ChildAssociatedNodeFinder(ContentModel.ASSOC_CONTAINS));
crawler.setNodeFilters(new ContentClassFilter(
ContentModel.TYPE_FOLDER,
ContentModel.TYPE_CONTENT
));
for(NodeRef payload : replicationDef.getPayload()) {
Set<NodeRef> crawledNodes = crawler.crawl(payload);
toTransfer.addAll(crawledNodes);
}
return toTransfer;
}
/**
* Takes a {@link ReplicationDefinition} and a list of
* {@link NodeRef}s, and returns the
* {@link TransferDefinition} which will allow the
* replication to be run.
*/
protected TransferDefinition buildTransferDefinition(
ReplicationDefinition replicationDef, Set<NodeRef> toTransfer
) {
TransferDefinition transferDefinition =
new TransferDefinition();
transferDefinition.setNodes(toTransfer);
transferDefinition.setSync(true);
return transferDefinition;
}
@Override
protected void executeImpl(Action action, NodeRef actionedUponNodeRef) {
final ReplicationDefinition replicationDef = (ReplicationDefinition)action;
if(replicationDef.getTargetName() == null ||
replicationDef.getTargetName().equals(""))
{
throw new ReplicationServiceException("The target is required but wasn't given");
}
if(replicationDef.getPayload().size() == 0)
{
throw new ReplicationServiceException("No payloads were specified");
}
if(!replicationDef.isEnabled())
{
throw new ReplicationServiceException("Unable to execute a disabled replication definition");
}
// Clear the previous transfer report reference
replicationDef.setLocalTransferReport(null);
// Lock the service - only one instance of the replication
// should occur at a time
ReplicationDefinitionLockExtender lock =
new ReplicationDefinitionLockExtender(replicationDef);
// Turn our payload list of root nodes into something that
// the transfer service can work with
Set<NodeRef> toTransfer;
try {
toTransfer = expandPayload(replicationDef);
} catch(Exception e) {
lock.close();
throw new ReplicationServiceException("Error processing payload list", e);
}
// Ask the transfer service to do the replication
// work for us
try {
// Build the definition
TransferDefinition transferDefinition =
buildTransferDefinition(replicationDef, toTransfer);
// Off we go
NodeRef transferReport = transferService.transfer(
replicationDef.getTargetName(),
transferDefinition,
lock
);
// Record the details of the transfer report
replicationDef.setLocalTransferReport(transferReport);
} catch(Exception e) {
if(! (e instanceof TransferCancelledException))
{
lock.close();
throw new ReplicationServiceException("Error executing transfer", e);
}
}
// All done, release our lock
lock.close();
// If we were cancelled, throw the magic exception so
// that this is correctly recorded
if(actionTrackingService.isCancellationRequested(replicationDef))
{
throw new ActionCancelledException(replicationDef);
}
}
/**
* A {@link TransferCallback} which periodically renews the
* lock held against a {@link ReplicationDefinition}
*/
protected class ReplicationDefinitionLockExtender implements TransferCallback
{
private ReplicationDefinition replicationDef;
private String transferId;
private String lockToken;
protected ReplicationDefinitionLockExtender(ReplicationDefinition replicationDef)
{
this.replicationDef = replicationDef;
acquireLock();
}
/**
* No matter what the event is, refresh
* our lock on the {@link ReplicationDefinition}, and
* handle a cancel if it was requested.
*/
public void processEvent(TransferEvent event)
{
// Extend our lock
refreshLock();
// If it's the enter event, do skip
if(event instanceof TransferEventEnterState)
{
return;
}
// If this is a begin event, make a note of the ID
if(event instanceof TransferEventBegin)
{
transferId = ((TransferEventBegin)event).getTransferId();
}
// Has someone tried to cancel us?
if(actionTrackingService.isCancellationRequested(replicationDef))
{
// Tell the transfer service to cancel, if we can
if(transferId != null)
{
transferService.cancelAsync(transferId);
}
else
{
logger.warn("Unable to cancel replication as requested, as transfer has yet to reach a cancellable state");
}
}
}
/**
* Give up our lock on the
* {@link ReplicationDefinition}
*/
public void close()
{
releaseLock();
}
/**
* Get a lock on the job.
* Tries every 5 seconds for 30 seconds, then
* every 30 seconds until 3 times the lock
* duration.
*/
private void acquireLock()
{
long retryTime = 30*1000;
int retries = (int)(replicationActionLockDuration * 3 / retryTime);
try {
// Quick try
lockToken = jobLockService.getLock(
replicationDef.getReplicationQName(),
replicationActionLockDuration,
5 * 1000, // Every 5 seconds
6 // 6 times = wait up to 30 seconds
);
} catch(LockAcquisitionException e) {
// Long try - every 30 seconds
lockToken = jobLockService.getLock(
replicationDef.getReplicationQName(),
replicationActionLockDuration,
retryTime,
retries
);
}
}
private void refreshLock()
{
jobLockService.refreshLock(
lockToken,
replicationDef.getReplicationQName(),
replicationActionLockDuration
);
}
private void releaseLock()
{
jobLockService.releaseLock(
lockToken,
replicationDef.getReplicationQName()
);
}
}
}