From 9d3ec267b12669a00550168c07a4ff4748702e74 Mon Sep 17 00:00:00 2001 From: Nick Burch Date: Wed, 7 Jul 2010 16:23:34 +0000 Subject: [PATCH] Replication Service execution work Now handles locking the replication job, deciding what to replicate for a given payload, and refreshing the lock as the transfer progresses, plus tests. Still needs to call the transfer service though. git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@20989 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261 --- .../alfresco/replication-services-context.xml | 1 + .../ReplicationActionExecutor.java | 148 +++++++++++++++++- .../ReplicationDefinitionImpl.java | 18 ++- .../ReplicationServiceIntegrationTest.java | 128 ++++++++++++++- 4 files changed, 281 insertions(+), 14 deletions(-) diff --git a/config/alfresco/replication-services-context.xml b/config/alfresco/replication-services-context.xml index 58fad68cd2..013be38cc7 100644 --- a/config/alfresco/replication-services-context.xml +++ b/config/alfresco/replication-services-context.xml @@ -71,6 +71,7 @@ + diff --git a/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java b/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java index 8677c64aef..db3f4a1d8d 100644 --- a/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java +++ b/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java @@ -18,15 +18,28 @@ */ package org.alfresco.repo.replication; +import java.util.HashSet; import java.util.List; +import java.util.Set; + +import org.alfresco.model.ContentModel; import org.alfresco.repo.action.executer.ActionExecuterAbstractBase; import org.alfresco.repo.lock.JobLockService; +import org.alfresco.repo.lock.LockAcquisitionException; +import org.alfresco.repo.transfer.ChildAssociatedNodeFinder; import org.alfresco.service.cmr.action.Action; +import org.alfresco.service.cmr.action.ActionDefinition; import org.alfresco.service.cmr.action.ParameterDefinition; +import org.alfresco.service.cmr.replication.ReplicationDefinition; import org.alfresco.service.cmr.replication.ReplicationService; +import org.alfresco.service.cmr.replication.ReplicationServiceException; import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.repository.NodeService; +import org.alfresco.service.cmr.transfer.NodeCrawler; +import org.alfresco.service.cmr.transfer.NodeCrawlerFactory; +import org.alfresco.service.cmr.transfer.TransferCallback; +import org.alfresco.service.cmr.transfer.TransferEvent; import org.alfresco.service.cmr.transfer.TransferService; /** @@ -36,8 +49,11 @@ import org.alfresco.service.cmr.transfer.TransferService; public class ReplicationActionExecutor extends ActionExecuterAbstractBase { private NodeService nodeService; private JobLockService jobLockService; - private ReplicationService replicationService; private TransferService transferService; + private ReplicationService replicationService; + private NodeCrawlerFactory nodeCrawlerFactory; + + private long replicationActionLockDuration = 10*60*1000; /** * Injects the NodeService bean. @@ -52,7 +68,7 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase { /** * Injects the JobLockService bean. * - * @param nodeService the JobLockService. + * @param jobLockService the JobLockService. */ public void setJobLockService(JobLockService jobLockService) { @@ -79,6 +95,16 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase { this.transferService = transferService; } + /** + * Injects the NodeCrawlerFactory bean. + * + * @param nodeCrawlerFactory the NodeCrawlerFactory. + */ + public void setNodeCrawlerFactory(NodeCrawlerFactory nodeCrawlerFactory) + { + this.nodeCrawlerFactory = nodeCrawlerFactory; + } + @Override protected void addParameterDefinitions(List paramList) { // TODO @@ -86,15 +112,131 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase { @Override protected void executeImpl(Action action, NodeRef actionedUponNodeRef) { + final ReplicationDefinition replicationDef = (ReplicationDefinition)action; + if(replicationDef.getTargetName() == null || + replicationDef.getTargetName().equals("")) + { + throw new ReplicationServiceException("The target is required but wasn't given"); + } + if(replicationDef.getPayload().size() == 0) + { + throw new ReplicationServiceException("No payloads were specified"); + } + // Lock the service - only one instance of the replication // should occur at a time + ReplicationDefinitionLockExtender lock = + new ReplicationDefinitionLockExtender(replicationDef); // Turn our payload list of root nodes into something that // the transfer service can work with + Set toTransfer = new HashSet(89); + try { + NodeCrawler crawler = nodeCrawlerFactory.getNodeCrawler(); + crawler.setNodeFinders(new ChildAssociatedNodeFinder(ContentModel.ASSOC_CONTAINS)); + + for(NodeRef payload : replicationDef.getPayload()) { + Set crawledNodes = crawler.crawl(payload); + toTransfer.addAll(crawledNodes); + } + } catch(Exception e) { + // TODO - Record the error + System.err.println(e); + lock.close(); + throw new ReplicationServiceException("Error processing payload list", e); + } // Ask the transfer service to do the replication // work for us + try { + // TODO + System.err.println("TODO - Execute '" + replicationDef.getReplicationName() + "'"); + + } catch(Exception e) { + // TODO - Record the error + System.err.println(e); + lock.close(); + throw new ReplicationServiceException("Error executing transfer", e); + } - // TODO + // All done + lock.close(); + } + + /** + * A {@link TransferCallback} which periodically renews the + * lock held against a {@link ReplicationDefinition} + */ + protected class ReplicationDefinitionLockExtender implements TransferCallback + { + private ReplicationDefinition replicationDef; + private String lockToken; + + protected ReplicationDefinitionLockExtender(ReplicationDefinition replicationDef) + { + this.replicationDef = replicationDef; + acquireLock(); + } + /** + * No matter what the event is, refresh + * our lock on the {@link ReplicationDefinition} + */ + public void processEvent(TransferEvent event) + { + refreshLock(); + } + /** + * Give up our lock on the + * {@link ReplicationDefinition} + */ + public void close() + { + releaseLock(); + } + + /** + * Get a lock on the job. + * Tries every 5 seconds for 30 seconds, then + * every 30 seconds until 3 times the lock + * duration. + */ + private void acquireLock() + { + long retryTime = 30*1000; + int retries = (int)(replicationActionLockDuration * 3 / retryTime); + + try { + // Quick try + lockToken = jobLockService.getLock( + replicationDef.getReplicationName(), + replicationActionLockDuration, + 5 * 1000, // Every 5 seconds + 6 // 6 times = wait up to 30 seconds + ); + } catch(LockAcquisitionException e) { + // Long try - every 30 seconds + lockToken = jobLockService.getLock( + replicationDef.getReplicationName(), + replicationActionLockDuration, + retryTime, + retries + ); + } + } + private void refreshLock() + { + jobLockService.refreshLock( + lockToken, + replicationDef.getReplicationName(), + replicationActionLockDuration + ); + } + private void releaseLock() + { + jobLockService.releaseLock( + lockToken, + replicationDef.getReplicationName() + ); + } } } diff --git a/source/java/org/alfresco/repo/replication/ReplicationDefinitionImpl.java b/source/java/org/alfresco/repo/replication/ReplicationDefinitionImpl.java index 2d4942be49..a68925365b 100644 --- a/source/java/org/alfresco/repo/replication/ReplicationDefinitionImpl.java +++ b/source/java/org/alfresco/repo/replication/ReplicationDefinitionImpl.java @@ -91,12 +91,24 @@ public class ReplicationDefinitionImpl extends ActionImpl implements Replication */ @SuppressWarnings("unchecked") public List getPayload() { - List payload = (List) - getParameterValue(REPLICATION_DEFINITION_PAYLOAD); - if(payload == null) { + Object payloadO = + getParameterValue(REPLICATION_DEFINITION_PAYLOAD); + + List payload; + if(payloadO == null) { payload = new ArrayList(); setParameterValue(REPLICATION_DEFINITION_PAYLOAD, (Serializable)payload); + } else { + // If there's only one entry, comes back as just + // that, unwrapped from the list + if(payloadO instanceof List) { + payload = (List)payloadO; + } else { + payload = new ArrayList(); + payload.add((NodeRef)payloadO); + } } + return payload; } diff --git a/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java b/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java index d76a9ed1eb..22b0edef53 100644 --- a/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java +++ b/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java @@ -19,16 +19,24 @@ package org.alfresco.repo.replication; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.alfresco.model.ContentModel; +import org.alfresco.repo.lock.JobLockService; import org.alfresco.repo.model.Repository; import org.alfresco.repo.security.authentication.AuthenticationUtil; import org.alfresco.service.cmr.replication.ReplicationDefinition; import org.alfresco.service.cmr.replication.ReplicationService; +import org.alfresco.service.cmr.replication.ReplicationServiceException; import org.alfresco.service.cmr.repository.ChildAssociationRef; import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.repository.NodeService; import org.alfresco.service.namespace.NamespaceService; import org.alfresco.service.namespace.QName; import org.alfresco.util.BaseAlfrescoSpringTest; +import org.alfresco.util.GUID; /** * @author Nick Burch @@ -36,10 +44,22 @@ import org.alfresco.util.BaseAlfrescoSpringTest; public class ReplicationServiceIntegrationTest extends BaseAlfrescoSpringTest { private ReplicationService replicationService; + private JobLockService jobLockService; private NodeService nodeService; + private Repository repositoryHelper; private NodeRef replicationRoot; + private NodeRef folder1; + private NodeRef folder2; + private NodeRef folder2a; + private NodeRef folder2b; + private NodeRef content1_1; + private NodeRef content1_2; + private NodeRef thumbnail1_3; + private NodeRef content2a_1; + private NodeRef thumbnail2a_2; + private final QName ACTION_NAME = QName.createQName(NamespaceService.ALFRESCO_URI, "testName"); private final QName ACTION_NAME2 = QName.createQName(NamespaceService.ALFRESCO_URI, "testName2"); @@ -48,12 +68,14 @@ public class ReplicationServiceIntegrationTest extends BaseAlfrescoSpringTest { super.onSetUpInTransaction(); replicationService = (ReplicationService) this.applicationContext.getBean("replicationService"); + jobLockService = (JobLockService) this.applicationContext.getBean("jobLockService"); nodeService = (NodeService) this.applicationContext.getBean("nodeService"); + repositoryHelper = (Repository) this.applicationContext.getBean("repositoryHelper"); // Set the current security context as admin AuthenticationUtil.setFullyAuthenticatedUser(AuthenticationUtil.getAdminUserName()); - // Zap any existing entries + // Zap any existing replication entries replicationRoot = ReplicationDefinitionPersisterImpl.REPLICATION_ACTION_ROOT_NODE_REF; for(ChildAssociationRef child : nodeService.getChildAssocs(replicationRoot)) { QName type = nodeService.getType( child.getChildRef() ); @@ -61,9 +83,34 @@ public class ReplicationServiceIntegrationTest extends BaseAlfrescoSpringTest nodeService.deleteNode(child.getChildRef()); } } + + // Create the test folder structure + folder1 = makeNode(repositoryHelper.getCompanyHome(), ContentModel.TYPE_FOLDER); + folder2 = makeNode(repositoryHelper.getCompanyHome(), ContentModel.TYPE_FOLDER); + folder2a = makeNode(folder2, ContentModel.TYPE_FOLDER); + folder2b = makeNode(folder2, ContentModel.TYPE_FOLDER); + + content1_1 = makeNode(folder1, ContentModel.TYPE_CONTENT); + content1_2 = makeNode(folder1, ContentModel.TYPE_CONTENT); + thumbnail1_3 = makeNode(folder1, ContentModel.TYPE_THUMBNAIL); + content2a_1 = makeNode(folder2a, ContentModel.TYPE_CONTENT); + thumbnail2a_2 = makeNode(folder2a, ContentModel.TYPE_THUMBNAIL); } - public void testCreation() throws Exception + @Override + protected void onTearDownInTransaction() throws Exception { + super.onTearDownInTransaction(); + if(folder1 != null) { + nodeService.deleteNode(folder1); + } + if(folder2 != null) { + nodeService.deleteNode(folder2); + } + } + + + + public void testCreation() throws Exception { ReplicationDefinition replicationAction = replicationService.createReplicationDefinition(ACTION_NAME, "Test Definition"); @@ -145,16 +192,70 @@ public class ReplicationServiceIntegrationTest extends BaseAlfrescoSpringTest */ public void testBasicExecution() throws Exception { - // First with a transient definition + // First one with no target, which isn't allowed ReplicationDefinition rd = replicationService.createReplicationDefinition(ACTION_NAME, "Test"); - rd.setTargetName("TestTarget"); - rd.getPayload().add( - new NodeRef("workspace://SpacesStore/Testing") - ); + try { + actionService.executeAction(rd, replicationRoot); + fail("Shouldn't be permitted with no Target defined"); + } catch(ReplicationServiceException e) {} + + // Now no payload, also not allowed + rd.setTargetName("TestTarget"); + try { + actionService.executeAction(rd, replicationRoot); + fail("Shouldn't be permitted with no payload defined"); + } catch(ReplicationServiceException e) {} + + + // Next a proper one with a transient definition + rd = replicationService.createReplicationDefinition(ACTION_NAME, "Test"); + rd.setTargetName("TestTarget"); + rd.getPayload().add( folder1 ); + // Will execute without error actionService.executeAction(rd, replicationRoot); + // Now with one that's in the repo + ReplicationDefinition rd2 = replicationService.createReplicationDefinition(ACTION_NAME2, "Test"); + rd2.setTargetName("TestTarget"); + rd2.getPayload().add( + folder2 + ); + replicationService.saveReplicationDefinition(rd2); + rd2 = replicationService.loadReplicationDefinition(ACTION_NAME2); + // Again no errors + actionService.executeAction(rd2, replicationRoot); + } + + /** + * Check that the locking works. + * Take a 5 second lock on the job, then execute. + * Ensure that we really wait a little over 5 seconds. + */ + public void testReplicationExectionLocking() throws Exception + { + ReplicationDefinition rd = replicationService.createReplicationDefinition(ACTION_NAME, "Test"); + rd.setTargetName("TestTarget"); + rd.getPayload().add(folder1); + rd.getPayload().add(folder2a); + + // Get the lock, and run + long start = System.currentTimeMillis(); + String token = jobLockService.getLock( + rd.getReplicationName(), + 5 * 1000, + 1, + 1 + ); + actionService.executeAction(rd, replicationRoot); + long end = System.currentTimeMillis(); + + assertTrue( + "Should wait for the lock, but didn't (waited " + + ((end-start)/1000.0) + " seconds, not 5)", + end-start > 5000 + ); } /** @@ -166,4 +267,15 @@ public class ReplicationServiceIntegrationTest extends BaseAlfrescoSpringTest { // TODO } -} \ No newline at end of file + + + private NodeRef makeNode(NodeRef parent, QName nodeType) + { + String uuid = GUID.generate(); + Map props = new HashMap(); + props.put(ContentModel.PROP_NAME, uuid); + ChildAssociationRef assoc = nodeService.createNode(parent, ContentModel.ASSOC_CONTAINS, QName.createQName( + NamespaceService.APP_MODEL_1_0_URI, uuid), nodeType, props); + return assoc.getChildRef(); + } +}