diff --git a/config/alfresco/replication-services-context.xml b/config/alfresco/replication-services-context.xml
index 58fad68cd2..013be38cc7 100644
--- a/config/alfresco/replication-services-context.xml
+++ b/config/alfresco/replication-services-context.xml
@@ -71,6 +71,7 @@
+
diff --git a/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java b/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java
index 8677c64aef..db3f4a1d8d 100644
--- a/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java
+++ b/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java
@@ -18,15 +18,28 @@
*/
package org.alfresco.repo.replication;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+
+import org.alfresco.model.ContentModel;
import org.alfresco.repo.action.executer.ActionExecuterAbstractBase;
import org.alfresco.repo.lock.JobLockService;
+import org.alfresco.repo.lock.LockAcquisitionException;
+import org.alfresco.repo.transfer.ChildAssociatedNodeFinder;
import org.alfresco.service.cmr.action.Action;
+import org.alfresco.service.cmr.action.ActionDefinition;
import org.alfresco.service.cmr.action.ParameterDefinition;
+import org.alfresco.service.cmr.replication.ReplicationDefinition;
import org.alfresco.service.cmr.replication.ReplicationService;
+import org.alfresco.service.cmr.replication.ReplicationServiceException;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
+import org.alfresco.service.cmr.transfer.NodeCrawler;
+import org.alfresco.service.cmr.transfer.NodeCrawlerFactory;
+import org.alfresco.service.cmr.transfer.TransferCallback;
+import org.alfresco.service.cmr.transfer.TransferEvent;
import org.alfresco.service.cmr.transfer.TransferService;
/**
@@ -36,8 +49,11 @@ import org.alfresco.service.cmr.transfer.TransferService;
public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
private NodeService nodeService;
private JobLockService jobLockService;
- private ReplicationService replicationService;
private TransferService transferService;
+ private ReplicationService replicationService;
+ private NodeCrawlerFactory nodeCrawlerFactory;
+
+ private long replicationActionLockDuration = 10*60*1000;
/**
* Injects the NodeService bean.
@@ -52,7 +68,7 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
/**
* Injects the JobLockService bean.
*
- * @param nodeService the JobLockService.
+ * @param jobLockService the JobLockService.
*/
public void setJobLockService(JobLockService jobLockService)
{
@@ -79,6 +95,16 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
this.transferService = transferService;
}
+ /**
+ * Injects the NodeCrawlerFactory bean.
+ *
+ * @param nodeCrawlerFactory the NodeCrawlerFactory.
+ */
+ public void setNodeCrawlerFactory(NodeCrawlerFactory nodeCrawlerFactory)
+ {
+ this.nodeCrawlerFactory = nodeCrawlerFactory;
+ }
+
@Override
protected void addParameterDefinitions(List paramList) {
// TODO
@@ -86,15 +112,131 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
@Override
protected void executeImpl(Action action, NodeRef actionedUponNodeRef) {
+ final ReplicationDefinition replicationDef = (ReplicationDefinition)action;
+ if(replicationDef.getTargetName() == null ||
+ replicationDef.getTargetName().equals(""))
+ {
+ throw new ReplicationServiceException("The target is required but wasn't given");
+ }
+ if(replicationDef.getPayload().size() == 0)
+ {
+ throw new ReplicationServiceException("No payloads were specified");
+ }
+
// Lock the service - only one instance of the replication
// should occur at a time
+ ReplicationDefinitionLockExtender lock =
+ new ReplicationDefinitionLockExtender(replicationDef);
// Turn our payload list of root nodes into something that
// the transfer service can work with
+ Set toTransfer = new HashSet(89);
+ try {
+ NodeCrawler crawler = nodeCrawlerFactory.getNodeCrawler();
+ crawler.setNodeFinders(new ChildAssociatedNodeFinder(ContentModel.ASSOC_CONTAINS));
+
+ for(NodeRef payload : replicationDef.getPayload()) {
+ Set crawledNodes = crawler.crawl(payload);
+ toTransfer.addAll(crawledNodes);
+ }
+ } catch(Exception e) {
+ // TODO - Record the error
+ System.err.println(e);
+ lock.close();
+ throw new ReplicationServiceException("Error processing payload list", e);
+ }
// Ask the transfer service to do the replication
// work for us
+ try {
+ // TODO
+ System.err.println("TODO - Execute '" + replicationDef.getReplicationName() + "'");
+
+ } catch(Exception e) {
+ // TODO - Record the error
+ System.err.println(e);
+ lock.close();
+ throw new ReplicationServiceException("Error executing transfer", e);
+ }
- // TODO
+ // All done
+ lock.close();
+ }
+
+ /**
+ * A {@link TransferCallback} which periodically renews the
+ * lock held against a {@link ReplicationDefinition}
+ */
+ protected class ReplicationDefinitionLockExtender implements TransferCallback
+ {
+ private ReplicationDefinition replicationDef;
+ private String lockToken;
+
+ protected ReplicationDefinitionLockExtender(ReplicationDefinition replicationDef)
+ {
+ this.replicationDef = replicationDef;
+ acquireLock();
+ }
+ /**
+ * No matter what the event is, refresh
+ * our lock on the {@link ReplicationDefinition}
+ */
+ public void processEvent(TransferEvent event)
+ {
+ refreshLock();
+ }
+ /**
+ * Give up our lock on the
+ * {@link ReplicationDefinition}
+ */
+ public void close()
+ {
+ releaseLock();
+ }
+
+ /**
+ * Get a lock on the job.
+ * Tries every 5 seconds for 30 seconds, then
+ * every 30 seconds until 3 times the lock
+ * duration.
+ */
+ private void acquireLock()
+ {
+ long retryTime = 30*1000;
+ int retries = (int)(replicationActionLockDuration * 3 / retryTime);
+
+ try {
+ // Quick try
+ lockToken = jobLockService.getLock(
+ replicationDef.getReplicationName(),
+ replicationActionLockDuration,
+ 5 * 1000, // Every 5 seconds
+ 6 // 6 times = wait up to 30 seconds
+ );
+ } catch(LockAcquisitionException e) {
+ // Long try - every 30 seconds
+ lockToken = jobLockService.getLock(
+ replicationDef.getReplicationName(),
+ replicationActionLockDuration,
+ retryTime,
+ retries
+ );
+ }
+ }
+ private void refreshLock()
+ {
+ jobLockService.refreshLock(
+ lockToken,
+ replicationDef.getReplicationName(),
+ replicationActionLockDuration
+ );
+ }
+ private void releaseLock()
+ {
+ jobLockService.releaseLock(
+ lockToken,
+ replicationDef.getReplicationName()
+ );
+ }
}
}
diff --git a/source/java/org/alfresco/repo/replication/ReplicationDefinitionImpl.java b/source/java/org/alfresco/repo/replication/ReplicationDefinitionImpl.java
index 2d4942be49..a68925365b 100644
--- a/source/java/org/alfresco/repo/replication/ReplicationDefinitionImpl.java
+++ b/source/java/org/alfresco/repo/replication/ReplicationDefinitionImpl.java
@@ -91,12 +91,24 @@ public class ReplicationDefinitionImpl extends ActionImpl implements Replication
*/
@SuppressWarnings("unchecked")
public List getPayload() {
- List payload = (List)
- getParameterValue(REPLICATION_DEFINITION_PAYLOAD);
- if(payload == null) {
+ Object payloadO =
+ getParameterValue(REPLICATION_DEFINITION_PAYLOAD);
+
+ List payload;
+ if(payloadO == null) {
payload = new ArrayList();
setParameterValue(REPLICATION_DEFINITION_PAYLOAD, (Serializable)payload);
+ } else {
+ // If there's only one entry, comes back as just
+ // that, unwrapped from the list
+ if(payloadO instanceof List) {
+ payload = (List)payloadO;
+ } else {
+ payload = new ArrayList();
+ payload.add((NodeRef)payloadO);
+ }
}
+
return payload;
}
diff --git a/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java b/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java
index d76a9ed1eb..22b0edef53 100644
--- a/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java
+++ b/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java
@@ -19,16 +19,24 @@
package org.alfresco.repo.replication;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.alfresco.model.ContentModel;
+import org.alfresco.repo.lock.JobLockService;
import org.alfresco.repo.model.Repository;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.service.cmr.replication.ReplicationDefinition;
import org.alfresco.service.cmr.replication.ReplicationService;
+import org.alfresco.service.cmr.replication.ReplicationServiceException;
import org.alfresco.service.cmr.repository.ChildAssociationRef;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.alfresco.util.BaseAlfrescoSpringTest;
+import org.alfresco.util.GUID;
/**
* @author Nick Burch
@@ -36,10 +44,22 @@ import org.alfresco.util.BaseAlfrescoSpringTest;
public class ReplicationServiceIntegrationTest extends BaseAlfrescoSpringTest
{
private ReplicationService replicationService;
+ private JobLockService jobLockService;
private NodeService nodeService;
+ private Repository repositoryHelper;
private NodeRef replicationRoot;
+ private NodeRef folder1;
+ private NodeRef folder2;
+ private NodeRef folder2a;
+ private NodeRef folder2b;
+ private NodeRef content1_1;
+ private NodeRef content1_2;
+ private NodeRef thumbnail1_3;
+ private NodeRef content2a_1;
+ private NodeRef thumbnail2a_2;
+
private final QName ACTION_NAME = QName.createQName(NamespaceService.ALFRESCO_URI, "testName");
private final QName ACTION_NAME2 = QName.createQName(NamespaceService.ALFRESCO_URI, "testName2");
@@ -48,12 +68,14 @@ public class ReplicationServiceIntegrationTest extends BaseAlfrescoSpringTest
{
super.onSetUpInTransaction();
replicationService = (ReplicationService) this.applicationContext.getBean("replicationService");
+ jobLockService = (JobLockService) this.applicationContext.getBean("jobLockService");
nodeService = (NodeService) this.applicationContext.getBean("nodeService");
+ repositoryHelper = (Repository) this.applicationContext.getBean("repositoryHelper");
// Set the current security context as admin
AuthenticationUtil.setFullyAuthenticatedUser(AuthenticationUtil.getAdminUserName());
- // Zap any existing entries
+ // Zap any existing replication entries
replicationRoot = ReplicationDefinitionPersisterImpl.REPLICATION_ACTION_ROOT_NODE_REF;
for(ChildAssociationRef child : nodeService.getChildAssocs(replicationRoot)) {
QName type = nodeService.getType( child.getChildRef() );
@@ -61,9 +83,34 @@ public class ReplicationServiceIntegrationTest extends BaseAlfrescoSpringTest
nodeService.deleteNode(child.getChildRef());
}
}
+
+ // Create the test folder structure
+ folder1 = makeNode(repositoryHelper.getCompanyHome(), ContentModel.TYPE_FOLDER);
+ folder2 = makeNode(repositoryHelper.getCompanyHome(), ContentModel.TYPE_FOLDER);
+ folder2a = makeNode(folder2, ContentModel.TYPE_FOLDER);
+ folder2b = makeNode(folder2, ContentModel.TYPE_FOLDER);
+
+ content1_1 = makeNode(folder1, ContentModel.TYPE_CONTENT);
+ content1_2 = makeNode(folder1, ContentModel.TYPE_CONTENT);
+ thumbnail1_3 = makeNode(folder1, ContentModel.TYPE_THUMBNAIL);
+ content2a_1 = makeNode(folder2a, ContentModel.TYPE_CONTENT);
+ thumbnail2a_2 = makeNode(folder2a, ContentModel.TYPE_THUMBNAIL);
}
- public void testCreation() throws Exception
+ @Override
+ protected void onTearDownInTransaction() throws Exception {
+ super.onTearDownInTransaction();
+ if(folder1 != null) {
+ nodeService.deleteNode(folder1);
+ }
+ if(folder2 != null) {
+ nodeService.deleteNode(folder2);
+ }
+ }
+
+
+
+ public void testCreation() throws Exception
{
ReplicationDefinition replicationAction =
replicationService.createReplicationDefinition(ACTION_NAME, "Test Definition");
@@ -145,16 +192,70 @@ public class ReplicationServiceIntegrationTest extends BaseAlfrescoSpringTest
*/
public void testBasicExecution() throws Exception
{
- // First with a transient definition
+ // First one with no target, which isn't allowed
ReplicationDefinition rd = replicationService.createReplicationDefinition(ACTION_NAME, "Test");
- rd.setTargetName("TestTarget");
- rd.getPayload().add(
- new NodeRef("workspace://SpacesStore/Testing")
- );
+ try {
+ actionService.executeAction(rd, replicationRoot);
+ fail("Shouldn't be permitted with no Target defined");
+ } catch(ReplicationServiceException e) {}
+
+ // Now no payload, also not allowed
+ rd.setTargetName("TestTarget");
+ try {
+ actionService.executeAction(rd, replicationRoot);
+ fail("Shouldn't be permitted with no payload defined");
+ } catch(ReplicationServiceException e) {}
+
+
+ // Next a proper one with a transient definition
+ rd = replicationService.createReplicationDefinition(ACTION_NAME, "Test");
+ rd.setTargetName("TestTarget");
+ rd.getPayload().add( folder1 );
+ // Will execute without error
actionService.executeAction(rd, replicationRoot);
+
// Now with one that's in the repo
+ ReplicationDefinition rd2 = replicationService.createReplicationDefinition(ACTION_NAME2, "Test");
+ rd2.setTargetName("TestTarget");
+ rd2.getPayload().add(
+ folder2
+ );
+ replicationService.saveReplicationDefinition(rd2);
+ rd2 = replicationService.loadReplicationDefinition(ACTION_NAME2);
+ // Again no errors
+ actionService.executeAction(rd2, replicationRoot);
+ }
+
+ /**
+ * Check that the locking works.
+ * Take a 5 second lock on the job, then execute.
+ * Ensure that we really wait a little over 5 seconds.
+ */
+ public void testReplicationExectionLocking() throws Exception
+ {
+ ReplicationDefinition rd = replicationService.createReplicationDefinition(ACTION_NAME, "Test");
+ rd.setTargetName("TestTarget");
+ rd.getPayload().add(folder1);
+ rd.getPayload().add(folder2a);
+
+ // Get the lock, and run
+ long start = System.currentTimeMillis();
+ String token = jobLockService.getLock(
+ rd.getReplicationName(),
+ 5 * 1000,
+ 1,
+ 1
+ );
+ actionService.executeAction(rd, replicationRoot);
+ long end = System.currentTimeMillis();
+
+ assertTrue(
+ "Should wait for the lock, but didn't (waited " +
+ ((end-start)/1000.0) + " seconds, not 5)",
+ end-start > 5000
+ );
}
/**
@@ -166,4 +267,15 @@ public class ReplicationServiceIntegrationTest extends BaseAlfrescoSpringTest
{
// TODO
}
-}
\ No newline at end of file
+
+
+ private NodeRef makeNode(NodeRef parent, QName nodeType)
+ {
+ String uuid = GUID.generate();
+ Map props = new HashMap();
+ props.put(ContentModel.PROP_NAME, uuid);
+ ChildAssociationRef assoc = nodeService.createNode(parent, ContentModel.ASSOC_CONTAINS, QName.createQName(
+ NamespaceService.APP_MODEL_1_0_URI, uuid), nodeType, props);
+ return assoc.getChildRef();
+ }
+}