mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
Replication Service execution work
Now handles locking the replication job, deciding what to replicate for a given payload, and refreshing the lock as the transfer progresses, plus tests. Still needs to call the transfer service though. git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@20989 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -18,15 +18,28 @@
|
||||
*/
|
||||
package org.alfresco.repo.replication;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
import org.alfresco.model.ContentModel;
|
||||
import org.alfresco.repo.action.executer.ActionExecuterAbstractBase;
|
||||
import org.alfresco.repo.lock.JobLockService;
|
||||
import org.alfresco.repo.lock.LockAcquisitionException;
|
||||
import org.alfresco.repo.transfer.ChildAssociatedNodeFinder;
|
||||
import org.alfresco.service.cmr.action.Action;
|
||||
import org.alfresco.service.cmr.action.ActionDefinition;
|
||||
import org.alfresco.service.cmr.action.ParameterDefinition;
|
||||
import org.alfresco.service.cmr.replication.ReplicationDefinition;
|
||||
import org.alfresco.service.cmr.replication.ReplicationService;
|
||||
import org.alfresco.service.cmr.replication.ReplicationServiceException;
|
||||
import org.alfresco.service.cmr.repository.NodeRef;
|
||||
import org.alfresco.service.cmr.repository.NodeService;
|
||||
import org.alfresco.service.cmr.transfer.NodeCrawler;
|
||||
import org.alfresco.service.cmr.transfer.NodeCrawlerFactory;
|
||||
import org.alfresco.service.cmr.transfer.TransferCallback;
|
||||
import org.alfresco.service.cmr.transfer.TransferEvent;
|
||||
import org.alfresco.service.cmr.transfer.TransferService;
|
||||
|
||||
/**
|
||||
@@ -36,8 +49,11 @@ import org.alfresco.service.cmr.transfer.TransferService;
|
||||
public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
||||
private NodeService nodeService;
|
||||
private JobLockService jobLockService;
|
||||
private ReplicationService replicationService;
|
||||
private TransferService transferService;
|
||||
private ReplicationService replicationService;
|
||||
private NodeCrawlerFactory nodeCrawlerFactory;
|
||||
|
||||
private long replicationActionLockDuration = 10*60*1000;
|
||||
|
||||
/**
|
||||
* Injects the NodeService bean.
|
||||
@@ -52,7 +68,7 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
||||
/**
|
||||
* Injects the JobLockService bean.
|
||||
*
|
||||
* @param nodeService the JobLockService.
|
||||
* @param jobLockService the JobLockService.
|
||||
*/
|
||||
public void setJobLockService(JobLockService jobLockService)
|
||||
{
|
||||
@@ -79,6 +95,16 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
||||
this.transferService = transferService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Injects the NodeCrawlerFactory bean.
|
||||
*
|
||||
* @param nodeCrawlerFactory the NodeCrawlerFactory.
|
||||
*/
|
||||
public void setNodeCrawlerFactory(NodeCrawlerFactory nodeCrawlerFactory)
|
||||
{
|
||||
this.nodeCrawlerFactory = nodeCrawlerFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addParameterDefinitions(List<ParameterDefinition> paramList) {
|
||||
// TODO
|
||||
@@ -86,15 +112,131 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
||||
|
||||
@Override
|
||||
protected void executeImpl(Action action, NodeRef actionedUponNodeRef) {
|
||||
final ReplicationDefinition replicationDef = (ReplicationDefinition)action;
|
||||
if(replicationDef.getTargetName() == null ||
|
||||
replicationDef.getTargetName().equals(""))
|
||||
{
|
||||
throw new ReplicationServiceException("The target is required but wasn't given");
|
||||
}
|
||||
if(replicationDef.getPayload().size() == 0)
|
||||
{
|
||||
throw new ReplicationServiceException("No payloads were specified");
|
||||
}
|
||||
|
||||
// Lock the service - only one instance of the replication
|
||||
// should occur at a time
|
||||
ReplicationDefinitionLockExtender lock =
|
||||
new ReplicationDefinitionLockExtender(replicationDef);
|
||||
|
||||
// Turn our payload list of root nodes into something that
|
||||
// the transfer service can work with
|
||||
Set<NodeRef> toTransfer = new HashSet<NodeRef>(89);
|
||||
try {
|
||||
NodeCrawler crawler = nodeCrawlerFactory.getNodeCrawler();
|
||||
crawler.setNodeFinders(new ChildAssociatedNodeFinder(ContentModel.ASSOC_CONTAINS));
|
||||
|
||||
for(NodeRef payload : replicationDef.getPayload()) {
|
||||
Set<NodeRef> crawledNodes = crawler.crawl(payload);
|
||||
toTransfer.addAll(crawledNodes);
|
||||
}
|
||||
} catch(Exception e) {
|
||||
// TODO - Record the error
|
||||
System.err.println(e);
|
||||
lock.close();
|
||||
throw new ReplicationServiceException("Error processing payload list", e);
|
||||
}
|
||||
|
||||
// Ask the transfer service to do the replication
|
||||
// work for us
|
||||
try {
|
||||
// TODO
|
||||
System.err.println("TODO - Execute '" + replicationDef.getReplicationName() + "'");
|
||||
|
||||
} catch(Exception e) {
|
||||
// TODO - Record the error
|
||||
System.err.println(e);
|
||||
lock.close();
|
||||
throw new ReplicationServiceException("Error executing transfer", e);
|
||||
}
|
||||
|
||||
// TODO
|
||||
// All done
|
||||
lock.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link TransferCallback} which periodically renews the
|
||||
* lock held against a {@link ReplicationDefinition}
|
||||
*/
|
||||
protected class ReplicationDefinitionLockExtender implements TransferCallback
|
||||
{
|
||||
private ReplicationDefinition replicationDef;
|
||||
private String lockToken;
|
||||
|
||||
protected ReplicationDefinitionLockExtender(ReplicationDefinition replicationDef)
|
||||
{
|
||||
this.replicationDef = replicationDef;
|
||||
acquireLock();
|
||||
}
|
||||
/**
|
||||
* No matter what the event is, refresh
|
||||
* our lock on the {@link ReplicationDefinition}
|
||||
*/
|
||||
public void processEvent(TransferEvent event)
|
||||
{
|
||||
refreshLock();
|
||||
}
|
||||
/**
|
||||
* Give up our lock on the
|
||||
* {@link ReplicationDefinition}
|
||||
*/
|
||||
public void close()
|
||||
{
|
||||
releaseLock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a lock on the job.
|
||||
* Tries every 5 seconds for 30 seconds, then
|
||||
* every 30 seconds until 3 times the lock
|
||||
* duration.
|
||||
*/
|
||||
private void acquireLock()
|
||||
{
|
||||
long retryTime = 30*1000;
|
||||
int retries = (int)(replicationActionLockDuration * 3 / retryTime);
|
||||
|
||||
try {
|
||||
// Quick try
|
||||
lockToken = jobLockService.getLock(
|
||||
replicationDef.getReplicationName(),
|
||||
replicationActionLockDuration,
|
||||
5 * 1000, // Every 5 seconds
|
||||
6 // 6 times = wait up to 30 seconds
|
||||
);
|
||||
} catch(LockAcquisitionException e) {
|
||||
// Long try - every 30 seconds
|
||||
lockToken = jobLockService.getLock(
|
||||
replicationDef.getReplicationName(),
|
||||
replicationActionLockDuration,
|
||||
retryTime,
|
||||
retries
|
||||
);
|
||||
}
|
||||
}
|
||||
private void refreshLock()
|
||||
{
|
||||
jobLockService.refreshLock(
|
||||
lockToken,
|
||||
replicationDef.getReplicationName(),
|
||||
replicationActionLockDuration
|
||||
);
|
||||
}
|
||||
private void releaseLock()
|
||||
{
|
||||
jobLockService.releaseLock(
|
||||
lockToken,
|
||||
replicationDef.getReplicationName()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user