From 35b2b7a122c1775e54d7edd9226c2f7329e0f0a2 Mon Sep 17 00:00:00 2001 From: David Caruana Date: Fri, 10 Sep 2010 14:00:05 +0000 Subject: [PATCH] Fixes to replication job status handling: - success, error and cancelled states now correctly reported - source and target reports now correctly provided for each of above Changes: - deprecated TransferService interface, replaced by TransferService2 - introduces new sync transfer methods - new TransferServiceImpl2 class, old TransferServiceImpl delegates to new class - sync transfer now returns TransferEndEvent - sync transfer now raises TransferFailureException - success, error and cancelled events are now end events (raised after report events) - transfer client handling refactored to support cancel and errors appropriately - converted to event loop with polling of server status for all states - cancel request may now end with success or error (depending on when cancel requested) - extract transfer errors from server - only raise exception for errors (cancelled now returns) - source and destination reports written for all states - Added TransferEndEvent interface for end events - reports attached to end event - replication service fixed to record source and dest reports in error case - action service fixed to record cancelled state git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@22390 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261 --- .../messages/transfer-service.properties | 3 + .../alfresco/replication-services-context.xml | 15 +- config/alfresco/transfer-service-context.xml | 63 +- .../action/ActionTrackingServiceImpl.java | 14 +- .../ReplicationActionExecutor.java | 193 ++- .../replication/ReplicationServiceImpl.java | 33 - .../ReplicationServiceImplTest.java | 3 - .../ReplicationServiceIntegrationTest.java | 35 +- .../repo/transfer/TransferEndEventImpl.java | 67 + .../repo/transfer/TransferEventProcessor.java | 69 +- .../transfer/TransferServiceCallbackTest.java | 43 +- .../repo/transfer/TransferServiceImpl.java | 1307 ++------------- .../repo/transfer/TransferServiceImpl2.java | 1402 +++++++++++++++++ .../transfer/TransferServiceImplTest.java | 25 +- .../transfer/report/TransferReporterImpl.java | 9 +- .../cmr/transfer/TransferEndEvent.java | 49 + .../service/cmr/transfer/TransferEvent.java | 2 +- .../cmr/transfer/TransferEventCancelled.java | 32 + .../cmr/transfer/TransferEventError.java | 4 +- .../cmr/transfer/TransferEventReport.java | 2 - .../cmr/transfer/TransferEventSuccess.java | 6 +- .../cmr/transfer/TransferException.java | 2 +- .../transfer/TransferFailureException.java | 48 + .../service/cmr/transfer/TransferService.java | 7 +- .../cmr/transfer/TransferService2.java | 235 +++ 25 files changed, 2176 insertions(+), 1492 deletions(-) create mode 100644 source/java/org/alfresco/repo/transfer/TransferEndEventImpl.java create mode 100644 source/java/org/alfresco/repo/transfer/TransferServiceImpl2.java create mode 100644 source/java/org/alfresco/service/cmr/transfer/TransferEndEvent.java create mode 100644 source/java/org/alfresco/service/cmr/transfer/TransferEventCancelled.java create mode 100644 source/java/org/alfresco/service/cmr/transfer/TransferFailureException.java create mode 100644 source/java/org/alfresco/service/cmr/transfer/TransferService2.java diff --git a/config/alfresco/messages/transfer-service.properties b/config/alfresco/messages/transfer-service.properties index 38cd7f96a1..d86e81da72 100644 --- a/config/alfresco/messages/transfer-service.properties +++ b/config/alfresco/messages/transfer-service.properties @@ -10,6 +10,9 @@ transfer_service.comms.unsuccessful_response=Received unsuccessful response code transfer_service.comms.http_request_failed=Failed to execute HTTP request {0} to target: {1} status: {2} transfer_service.no_nodes=No nodes to transfer transfer_service.cancelled=Transfer cancelled +transfer_service.failed_to_get_transfer_status=Failed to retrieve transfer status from target {0} +transfer_service.target_error=Transfer target failed with {0} +transfer_service.unknown_target_error=Unknown error transfer_service.receiver.failed_to_create_staging_folder=Unable to create staging directory for transfer {0} transfer_service.receiver.lock_folder_not_found=Unable to locate specified lock folder: {0} diff --git a/config/alfresco/replication-services-context.xml b/config/alfresco/replication-services-context.xml index f1913222d4..65790f1227 100644 --- a/config/alfresco/replication-services-context.xml +++ b/config/alfresco/replication-services-context.xml @@ -41,10 +41,7 @@ - - - @@ -68,14 +65,14 @@ {http://www.alfresco.org/model/content/1.0}folder - - - - - + + + + + + - diff --git a/config/alfresco/transfer-service-context.xml b/config/alfresco/transfer-service-context.xml index 5dc93022c4..56c070fcea 100644 --- a/config/alfresco/transfer-service-context.xml +++ b/config/alfresco/transfer-service-context.xml @@ -1,8 +1,14 @@ - - + + + + + + + @@ -146,24 +152,41 @@ + + + + + org.alfresco.service.cmr.transfer.TransferService + + + + + + + + + + + + + - - - - org.alfresco.service.cmr.transfer.TransferService - - - - - - - - - - - - - + + + org.alfresco.service.cmr.transfer.TransferService2 + + + + + + + + + + + + + @@ -180,6 +203,7 @@ + @@ -198,7 +222,6 @@ - diff --git a/source/java/org/alfresco/repo/action/ActionTrackingServiceImpl.java b/source/java/org/alfresco/repo/action/ActionTrackingServiceImpl.java index bdcd72088e..5af5a4cbf8 100644 --- a/source/java/org/alfresco/repo/action/ActionTrackingServiceImpl.java +++ b/source/java/org/alfresco/repo/action/ActionTrackingServiceImpl.java @@ -244,7 +244,7 @@ public class ActionTrackingServiceImpl implements ActionTrackingService * Schedule the recording of the action failure to occur in another * transaction */ - public void recordActionFailure(Action action, Throwable exception) + public void recordActionFailure(Action action, final Throwable exception) { if (logger.isDebugEnabled() == true) { @@ -313,10 +313,18 @@ public class ActionTrackingServiceImpl implements ActionTrackingService .createAction(actionNode); // Update it + if (exception instanceof ActionCancelledException) + { + action.setExecutionStatus(ActionStatus.Cancelled); + action.setExecutionFailureMessage(null); + } + else + { + action.setExecutionStatus(ActionStatus.Failed); + action.setExecutionFailureMessage(exception.getMessage()); + } action.setExecutionStartDate(startedAt); action.setExecutionEndDate(endedAt); - action.setExecutionStatus(ActionStatus.Failed); - action.setExecutionFailureMessage(message); runtimeActionService.saveActionImpl(actionNode, action); if (logger.isDebugEnabled() == true) diff --git a/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java b/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java index 57ef0db34b..7bf3fb4cb7 100644 --- a/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java +++ b/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java @@ -27,25 +27,28 @@ import org.alfresco.repo.action.ActionCancelledException; import org.alfresco.repo.action.executer.ActionExecuterAbstractBase; import org.alfresco.repo.lock.JobLockService; import org.alfresco.repo.lock.LockAcquisitionException; +import org.alfresco.repo.transaction.RetryingTransactionHelper; import org.alfresco.repo.transfer.ChildAssociatedNodeFinder; import org.alfresco.repo.transfer.ContentClassFilter; import org.alfresco.service.cmr.action.Action; import org.alfresco.service.cmr.action.ActionTrackingService; import org.alfresco.service.cmr.action.ParameterDefinition; import org.alfresco.service.cmr.replication.ReplicationDefinition; -import org.alfresco.service.cmr.replication.ReplicationService; import org.alfresco.service.cmr.replication.ReplicationServiceException; import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.transfer.NodeCrawler; import org.alfresco.service.cmr.transfer.NodeCrawlerFactory; import org.alfresco.service.cmr.transfer.TransferCallback; -import org.alfresco.service.cmr.transfer.TransferCancelledException; import org.alfresco.service.cmr.transfer.TransferDefinition; +import org.alfresco.service.cmr.transfer.TransferEndEvent; import org.alfresco.service.cmr.transfer.TransferEvent; import org.alfresco.service.cmr.transfer.TransferEventBegin; +import org.alfresco.service.cmr.transfer.TransferEventCancelled; import org.alfresco.service.cmr.transfer.TransferEventEnterState; -import org.alfresco.service.cmr.transfer.TransferEventReport; -import org.alfresco.service.cmr.transfer.TransferService; +import org.alfresco.service.cmr.transfer.TransferEventError; +import org.alfresco.service.cmr.transfer.TransferFailureException; +import org.alfresco.service.cmr.transfer.TransferService2; +import org.alfresco.service.transaction.TransactionService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,10 +63,11 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase { private static Log logger = LogFactory.getLog(ReplicationActionExecutor.class); private JobLockService jobLockService; - private TransferService transferService; - private ReplicationService replicationService; + private TransferService2 transferService; private NodeCrawlerFactory nodeCrawlerFactory; private ActionTrackingService actionTrackingService; + private TransactionService transactionService; + private ReplicationDefinitionPersisterImpl replicationDefinitionPersister; /** * By default, we lock for 30 minutes @@ -80,22 +84,12 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase { this.jobLockService = jobLockService; } - /** - * Injects the ReplicationService bean. - * - * @param nodeService the ReplicationService. - */ - public void setReplicationService(ReplicationService replicationService) - { - this.replicationService = replicationService; - } - /** * Injects the TransferService bean. * * @param transferService the TransferService. */ - public void setTransferService(TransferService transferService) + public void setTransferService(TransferService2 transferService) { this.transferService = transferService; } @@ -120,6 +114,25 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase { this.actionTrackingService = actionTrackingService; } + /** + * Injects the TransactionService bean. + * + * @param transactionService the TransactionService. + */ + public void setTransactionService(TransactionService transactionService) + { + this.transactionService = transactionService; + } + + /** + * Injects the ReplicationDefinitionPersister bean. + * @param replicationDefinitionPersister + */ + public void setReplicationDefinitionPersister(ReplicationDefinitionPersisterImpl replicationDefinitionPersister) + { + this.replicationDefinitionPersister = replicationDefinitionPersister; + } + @Override protected void addParameterDefinitions(List paramList) { // Not used - our definitions hold everything on them @@ -197,8 +210,8 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase { } // Clear the previous transfer report references - replicationDef.setLocalTransferReport(null); - replicationDef.setRemoteTransferReport(null); +// replicationDef.setLocalTransferReport(null); +// replicationDef.setRemoteTransferReport(null); // Lock the service - only one instance of the replication // should occur at a time @@ -215,43 +228,78 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase { throw new ReplicationServiceException("Error processing payload list - " + e.getMessage(), e); } - // Holder for reports generated by the transfer - ReplicationReportCollector reports = new ReplicationReportCollector(); - // Ask the transfer service to do the replication // work for us - try { - // Build the definition - TransferDefinition transferDefinition = - buildTransferDefinition(replicationDef, toTransfer); - - // Off we go - transferService.transfer( + TransferEndEvent endEvent = null; + try + { + // Build the definition + TransferDefinition transferDefinition = + buildTransferDefinition(replicationDef, toTransfer); + + // Off we go + endEvent = transferService.transfer( replicationDef.getTargetName(), transferDefinition, - lock, reports - ); + lock); - // Record the details of the transfer reports - replicationDef.setLocalTransferReport(reports.getLocalReport()); - replicationDef.setRemoteTransferReport(reports.getRemoteReport()); - } catch(Exception e) { - if(! (e instanceof TransferCancelledException)) - { - lock.close(); - throw new ReplicationServiceException("Error executing transfer - " + e.getMessage(), e); - } + if (endEvent instanceof TransferEventCancelled) + { + if (logger.isDebugEnabled()) + logger.debug("Cancelling replication job"); + + // If we were cancelled, throw the magic exception so + // that this is correctly recorded + throw new ActionCancelledException(replicationDef); + } + + // Record details of the transfer reports (in success case) + replicationDef.setLocalTransferReport(endEvent.getSourceReport()); + replicationDef.setRemoteTransferReport(endEvent.getDestinationReport()); } - - // All done, release our lock - lock.close(); - - // If we were cancelled, throw the magic exception so - // that this is correctly recorded - if(actionTrackingService.isCancellationRequested(replicationDef)) + catch(Exception e) { - throw new ActionCancelledException(replicationDef); + if (e instanceof ActionCancelledException) + { + writeDefinitionReports(replicationDef, endEvent.getSourceReport(), endEvent.getDestinationReport()); + throw (ActionCancelledException)e; + } + if (e instanceof TransferFailureException) + { + TransferEventError failureEndEvent = ((TransferFailureException)e).getErrorEvent(); + writeDefinitionReports(replicationDef, failureEndEvent.getSourceReport(), failureEndEvent.getDestinationReport()); + throw new ReplicationServiceException("Error executing transfer - " + e.getCause().getMessage(), e); + } + writeDefinitionReports(replicationDef, null, null); + throw new ReplicationServiceException("Error executing transfer - " + e.getMessage(), e); } + finally + { + lock.close(); + } + } + + private void writeDefinitionReports(final ReplicationDefinition replicationDef, NodeRef sourceReport, NodeRef destinationReport) + { + replicationDef.setLocalTransferReport(sourceReport); + replicationDef.setRemoteTransferReport(destinationReport); + + if (replicationDef.getNodeRef() != null) + { + // Record details of the transfer reports + transactionService.getRetryingTransactionHelper().doInTransaction( + new RetryingTransactionHelper.RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + if (logger.isDebugEnabled()) + logger.debug("Exception - writing replication def reports"); + + replicationDefinitionPersister.saveReplicationDefinition(replicationDef); + return null; + } + }, false, true); + } } /** @@ -367,53 +415,4 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase { ); } } - - /** - * A {@link TransferCallback} which collects the various reports generated by - * the transfer. - */ - protected class ReplicationReportCollector implements TransferCallback - { - private NodeRef localReport; - private NodeRef remoteReport; - - protected ReplicationReportCollector() - { - } - - /** - * Collect source and destination repository target reports - */ - public void processEvent(TransferEvent event) - { - if(event instanceof TransferEventReport) - { - TransferEventReport reportEvent = (TransferEventReport)event; - if (reportEvent.getReportType().equals(TransferEventReport.ReportType.SOURCE)) - { - localReport = reportEvent.getNodeRef(); - } - else if (reportEvent.getReportType().equals(TransferEventReport.ReportType.DESTINATION)) - { - remoteReport = reportEvent.getNodeRef(); - } - } - } - - /** - * @return local transfer report - */ - public NodeRef getLocalReport() - { - return localReport; - } - - /** - * @return target transfer report - */ - public NodeRef getRemoteReport() - { - return remoteReport; - } - } } diff --git a/source/java/org/alfresco/repo/replication/ReplicationServiceImpl.java b/source/java/org/alfresco/repo/replication/ReplicationServiceImpl.java index 2b3a62eca3..b3a2dc97d1 100644 --- a/source/java/org/alfresco/repo/replication/ReplicationServiceImpl.java +++ b/source/java/org/alfresco/repo/replication/ReplicationServiceImpl.java @@ -23,11 +23,8 @@ import java.util.List; import org.alfresco.service.cmr.action.ActionService; import org.alfresco.service.cmr.action.scheduled.ScheduledPersistedAction; import org.alfresco.service.cmr.action.scheduled.ScheduledPersistedActionService; -import org.alfresco.service.cmr.dictionary.DictionaryService; import org.alfresco.service.cmr.replication.ReplicationDefinition; import org.alfresco.service.cmr.replication.ReplicationService; -import org.alfresco.service.cmr.repository.NodeService; -import org.alfresco.service.cmr.transfer.TransferService; import org.alfresco.util.GUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,9 +37,6 @@ public class ReplicationServiceImpl implements ReplicationService, ReplicationDe private static final Log log = LogFactory.getLog(ReplicationServiceImpl.class); private ActionService actionService; - private DictionaryService dictionaryService; - private TransferService transferService; - private NodeService nodeService; private ScheduledPersistedActionService scheduledPersistedActionService; private ReplicationDefinitionPersisterImpl replicationDefinitionPersister; @@ -56,24 +50,6 @@ public class ReplicationServiceImpl implements ReplicationService, ReplicationDe this.replicationDefinitionPersister = replicationDefinitionPersister; } - /** - * Injects the TransferService bean - * @param transferService - */ - public void setTransferService(TransferService transferService) - { - this.transferService = transferService; - } - - /** - * Injects the NodeService bean. - * @param nodeService - */ - public void setNodeService(NodeService nodeService) - { - this.nodeService = nodeService; - } - /** * Injects the ActionService bean. * @param actionService @@ -83,15 +59,6 @@ public class ReplicationServiceImpl implements ReplicationService, ReplicationDe this.actionService = actionService; } - /** - * Injects the DictionaryService bean. - * @param dictionaryService - */ - public void setDictionaryService(DictionaryService dictionaryService) - { - this.dictionaryService = dictionaryService; - } - /** * Injects the Scheduled Persisted Action Service bean * @param scheduledPersistedActionService diff --git a/source/java/org/alfresco/repo/replication/ReplicationServiceImplTest.java b/source/java/org/alfresco/repo/replication/ReplicationServiceImplTest.java index 38e16a57ed..4cec1ee242 100644 --- a/source/java/org/alfresco/repo/replication/ReplicationServiceImplTest.java +++ b/source/java/org/alfresco/repo/replication/ReplicationServiceImplTest.java @@ -24,7 +24,6 @@ import junit.framework.TestCase; import org.alfresco.service.cmr.action.ActionService; import org.alfresco.service.cmr.replication.ReplicationDefinition; -import org.alfresco.service.cmr.repository.NodeService; /** * @author Nick Burch @@ -32,7 +31,6 @@ import org.alfresco.service.cmr.repository.NodeService; public class ReplicationServiceImplTest extends TestCase { private ActionService actionService = mock(ActionService.class); - private NodeService nodeService = mock(NodeService.class); private final ReplicationDefinitionPersisterImpl replicationDefinitionPersister = mock(ReplicationDefinitionPersisterImpl.class); private ReplicationServiceImpl replicationService; @@ -45,7 +43,6 @@ public class ReplicationServiceImplTest extends TestCase { replicationService = new ReplicationServiceImpl(); replicationService.setActionService(actionService); - replicationService.setNodeService(nodeService); replicationService.setReplicationDefinitionPersister(replicationDefinitionPersister); } diff --git a/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java b/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java index 7830a3a474..952462aab4 100644 --- a/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java +++ b/source/java/org/alfresco/repo/replication/ReplicationServiceIntegrationTest.java @@ -39,7 +39,7 @@ import org.alfresco.repo.model.Repository; import org.alfresco.repo.replication.script.ScriptReplicationDefinition; import org.alfresco.repo.security.authentication.AuthenticationUtil; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; -import org.alfresco.repo.transfer.TransferServiceImpl; +import org.alfresco.repo.transfer.TransferServiceImpl2; import org.alfresco.repo.transfer.TransferTransmitter; import org.alfresco.repo.transfer.UnitTestInProcessTransmitterImpl; import org.alfresco.repo.transfer.UnitTestTransferManifestNodeFactory; @@ -66,7 +66,7 @@ import org.alfresco.service.cmr.repository.ScriptService; import org.alfresco.service.cmr.transfer.TransferDefinition; import org.alfresco.service.cmr.transfer.TransferException; import org.alfresco.service.cmr.transfer.TransferReceiver; -import org.alfresco.service.cmr.transfer.TransferService; +import org.alfresco.service.cmr.transfer.TransferService2; import org.alfresco.service.cmr.transfer.TransferTarget; import org.alfresco.service.namespace.NamespaceService; import org.alfresco.service.namespace.QName; @@ -90,7 +90,7 @@ public class ReplicationServiceIntegrationTest extends TestCase private ReplicationActionExecutor replicationActionExecutor; private ReplicationService replicationService; private TransactionService transactionService; - private TransferService transferService; + private TransferService2 transferService; private ContentService contentService; private JobLockService jobLockService; private ScriptService scriptService; @@ -130,7 +130,7 @@ public class ReplicationServiceIntegrationTest extends TestCase replicationActionExecutor = (ReplicationActionExecutor) ctx.getBean("replicationActionExecutor"); replicationService = (ReplicationService) ctx.getBean("replicationService"); transactionService = (TransactionService) ctx.getBean("transactionService"); - transferService = (TransferService) ctx.getBean("transferService"); + transferService = (TransferService2) ctx.getBean("transferService2"); contentService = (ContentService) ctx.getBean("contentService"); jobLockService = (JobLockService) ctx.getBean("jobLockService"); actionService = (ActionService) ctx.getBean("actionService"); @@ -495,50 +495,61 @@ public class ReplicationServiceIntegrationTest extends TestCase // First one with no target, which isn't allowed ReplicationDefinition rd = replicationService.createReplicationDefinition(ACTION_NAME, "Test"); + UserTransaction txn = transactionService.getUserTransaction(); + txn.begin(); try { actionService.executeAction(rd, replicationRoot); fail("Shouldn't be permitted with no Target defined"); } catch(ReplicationServiceException e) {} + txn.rollback(); // Now no payload, also not allowed rd.setTargetName(TRANSFER_TARGET); + txn = transactionService.getUserTransaction(); + txn.begin(); try { actionService.executeAction(rd, replicationRoot); fail("Shouldn't be permitted with no payload defined"); } catch(ReplicationServiceException e) {} - + txn.rollback(); // Now disabled, not allowed assertEquals(true, rd.isEnabled()); rd.setEnabled(false); assertEquals(false, rd.isEnabled()); + txn = transactionService.getUserTransaction(); + txn.begin(); try { actionService.executeAction(rd, replicationRoot); fail("Shouldn't be permitted when disabled"); } catch(ReplicationServiceException e) {} - + txn.rollback(); // Invalid Transfer Target, not allowed rd = replicationService.createReplicationDefinition(ACTION_NAME, "Test"); rd.setTargetName("I am an invalid target that isn't there"); rd.getPayload().add( folder1 ); + txn = transactionService.getUserTransaction(); + txn.begin(); try { actionService.executeAction(rd, replicationRoot); fail("Shouldn't be permitted with an invalid transfer target"); } catch(ReplicationServiceException e) {} - + txn.rollback(); // Can't send Folder2a if Folder2 isn't there, as it // won't have anywhere to put it rd = replicationService.createReplicationDefinition(ACTION_NAME, "Test"); rd.setTargetName(TRANSFER_TARGET); rd.getPayload().add( folder2a ); + txn = transactionService.getUserTransaction(); + txn.begin(); try { actionService.executeAction(rd, replicationRoot); fail("Shouldn't be able to send Folder2a when Folder2 is missing!"); } catch(ReplicationServiceException e) {} - + txn.rollback(); // Next a proper one with a transient definition, // and a sensible set of folders @@ -547,7 +558,7 @@ public class ReplicationServiceIntegrationTest extends TestCase rd.getPayload().add( folder1 ); // Will execute without error - UserTransaction txn = transactionService.getUserTransaction(); + txn = transactionService.getUserTransaction(); txn.begin(); actionService.executeAction(rd, replicationRoot); txn.commit(); @@ -608,7 +619,7 @@ public class ReplicationServiceIntegrationTest extends TestCase * Take a 10 second lock on the job, then execute. * Ensure that we really wait a little over 10 seconds. */ - public void testReplicationExectionLocking() throws Exception + public void testReplicationExecutionLocking() throws Exception { // We need the test transfer target for this test makeTransferTarget(); @@ -718,6 +729,8 @@ public class ReplicationServiceIntegrationTest extends TestCase // Ensure it was cancelled assertEquals(null, rd.getExecutionFailureMessage()); + assertNotNull(rd.getLocalTransferReport()); + assertNotNull(rd.getRemoteTransferReport()); assertEquals(ActionStatus.Cancelled, rd.getExecutionStatus()); } @@ -1227,7 +1240,7 @@ public class ReplicationServiceIntegrationTest extends TestCase private void makeTransferServiceLocal() { TransferReceiver receiver = (TransferReceiver)ctx.getBean("transferReceiver"); TransferManifestNodeFactory transferManifestNodeFactory = (TransferManifestNodeFactory)ctx.getBean("transferManifestNodeFactory"); - TransferServiceImpl transferServiceImpl = (TransferServiceImpl) ctx.getBean("transferService"); + TransferServiceImpl2 transferServiceImpl = (TransferServiceImpl2) ctx.getBean("transferService2"); ContentService contentService = (ContentService) ctx.getBean("contentService"); TransferTransmitter transmitter = diff --git a/source/java/org/alfresco/repo/transfer/TransferEndEventImpl.java b/source/java/org/alfresco/repo/transfer/TransferEndEventImpl.java new file mode 100644 index 0000000000..4df6f91959 --- /dev/null +++ b/source/java/org/alfresco/repo/transfer/TransferEndEventImpl.java @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2009-2010 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.repo.transfer; + +import org.alfresco.service.cmr.repository.NodeRef; +import org.alfresco.service.cmr.transfer.RangedTransferEvent; +import org.alfresco.service.cmr.transfer.TransferEndEvent; +import org.alfresco.service.cmr.transfer.TransferEvent; + +/** + * An abstract implementation of TransferEndEvent. + + * @see TransferEvent + * @see RangedTransferEvent + */ +public class TransferEndEventImpl extends TransferEventImpl implements TransferEndEvent +{ + private NodeRef sourceReport; + private NodeRef destinationReport; + + public void setSourceReport(NodeRef sourceReport) + { + this.sourceReport = sourceReport; + } + + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferEndEvent#getSourceReport() + */ + @Override + public NodeRef getSourceReport() + { + return sourceReport; + } + + public void setDestinationReport(NodeRef destinationReport) + { + this.destinationReport = destinationReport; + } + + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferEndEvent#getDestinationReport() + */ + @Override + public NodeRef getDestinationReport() + { + return destinationReport; + } + +} diff --git a/source/java/org/alfresco/repo/transfer/TransferEventProcessor.java b/source/java/org/alfresco/repo/transfer/TransferEventProcessor.java index 930756c8b4..c670d76ba2 100644 --- a/source/java/org/alfresco/repo/transfer/TransferEventProcessor.java +++ b/source/java/org/alfresco/repo/transfer/TransferEventProcessor.java @@ -25,16 +25,15 @@ import java.util.concurrent.LinkedBlockingQueue; import org.alfresco.service.cmr.repository.ContentData; import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.transfer.TransferCallback; +import org.alfresco.service.cmr.transfer.TransferEndEvent; import org.alfresco.service.cmr.transfer.TransferEvent; import org.alfresco.service.cmr.transfer.TransferEventBegin; import org.alfresco.service.cmr.transfer.TransferEventCommittingStatus; import org.alfresco.service.cmr.transfer.TransferEventEndState; import org.alfresco.service.cmr.transfer.TransferEventEnterState; -import org.alfresco.service.cmr.transfer.TransferEventError; +import org.alfresco.service.cmr.transfer.TransferEventReport; import org.alfresco.service.cmr.transfer.TransferEventSendingContent; import org.alfresco.service.cmr.transfer.TransferEventSendingSnapshot; -import org.alfresco.service.cmr.transfer.TransferEventSuccess; -import org.alfresco.service.cmr.transfer.TransferEventReport; /** * Class to bring together all the transfer event stuff. @@ -45,15 +44,15 @@ import org.alfresco.service.cmr.transfer.TransferEventReport; * * @author Mark Rogers */ - - public class TransferEventProcessor { public Set observers = new HashSet(); LinkedBlockingQueue queue = new LinkedBlockingQueue(); - + public TransferEventProcessor() + { + } public void addObserver(TransferCallback observer) { @@ -65,14 +64,6 @@ public class TransferEventProcessor observers.remove(observer); } - /** - * - */ - public TransferEventProcessor() - { - - } - public void begin(String transferId) { setState(TransferEvent.TransferState.START); @@ -80,47 +71,23 @@ public class TransferEventProcessor event.setTransferState(TransferEvent.TransferState.START); event.setMessage("begin transferId:" + transferId); queue.add(event); - event.setTransferId(transferId); + event.setTransferId(transferId); notifyObservers(); } public void start() { - setState(TransferEvent.TransferState.START); + setState(TransferEvent.TransferState.START); notifyObservers(); } - - public void success() + + public void end(TransferEndEvent endEvent) { - setState(TransferEvent.TransferState.SUCCESS); - - /** - * Write the success event - */ - TransferEventSuccess event = new TransferEventSuccess(); - event.setTransferState(TransferEvent.TransferState.SUCCESS); - event.setLast(true); - event.setMessage("success lastEvent:true"); - queue.add(event); + setState(endEvent.getTransferState()); + queue.add(endEvent); notifyObservers(); } - - public void error(Exception exception) - { - setState(TransferEvent.TransferState.ERROR); - - /** - * Write the error event - */ - TransferEventError event = new TransferEventError(); - event.setTransferState(TransferEvent.TransferState.ERROR); - event.setLast(true); - event.setMessage("error lastEvent:true, " + exception.getMessage()); - event.setException(exception); - queue.add(event); - notifyObservers(); - } - + /** * * @param data @@ -175,6 +142,7 @@ public class TransferEventProcessor public void writeReport(NodeRef nodeRef, TransferEventReport.ReportType reportType) { TransferEventReport event = new TransferEventReport(); + event.setTransferState(currentState); event.setNodeRef(nodeRef); event.setReportType(reportType); event.setMessage("report nodeRef:" + nodeRef + ", reportType :" + reportType ); @@ -198,13 +166,8 @@ public class TransferEventProcessor event.setMessage("committing " + position + " of " + range); queue.add(event); notifyObservers(); - } - public void abort() - { - - } private TransferEvent.TransferState currentState; @@ -217,13 +180,13 @@ public class TransferEventProcessor TransferEventImpl event = new TransferEventEndState(); event.setMessage("End State: " + currentState); event.setTransferState(currentState); - queue.add(event); + queue.add(event); } - + TransferEventImpl event = new TransferEventEnterState(); event.setMessage("Enter State: " + state); event.setTransferState(state); - queue.add(event); + queue.add(event); currentState = state; } } diff --git a/source/java/org/alfresco/repo/transfer/TransferServiceCallbackTest.java b/source/java/org/alfresco/repo/transfer/TransferServiceCallbackTest.java index 5bf34eebb5..96b43b0eef 100644 --- a/source/java/org/alfresco/repo/transfer/TransferServiceCallbackTest.java +++ b/source/java/org/alfresco/repo/transfer/TransferServiceCallbackTest.java @@ -18,8 +18,12 @@ */ package org.alfresco.repo.transfer; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.io.OutputStream; @@ -55,7 +59,7 @@ import org.alfresco.service.cmr.transfer.TransferEventSendingSnapshot; import org.alfresco.service.cmr.transfer.TransferEventSuccess; import org.alfresco.service.cmr.transfer.TransferException; import org.alfresco.service.cmr.transfer.TransferProgress; -import org.alfresco.service.cmr.transfer.TransferService; +import org.alfresco.service.cmr.transfer.TransferService2; import org.alfresco.service.cmr.transfer.TransferTarget; import org.alfresco.service.cmr.transfer.TransferEvent.TransferState; import org.alfresco.service.cmr.transfer.TransferProgress.Status; @@ -74,10 +78,10 @@ public class TransferServiceCallbackTest extends TestCase private static final String TRANSFER_TARGET_NAME = "TransferServiceImplUnitTest"; private ApplicationContext applicationContext; - private TransferServiceImpl transferServiceImpl; + private TransferServiceImpl2 transferServiceImpl; private AuthenticationComponent authenticationComponent; private TransferTransmitter mockedTransferTransmitter; - private TransferService transferService; + private TransferService2 transferService; private TransactionService transactionService; private UserTransaction tx; private Repository repository; @@ -103,7 +107,7 @@ public class TransferServiceCallbackTest extends TestCase applicationContext = ApplicationContextHelper.getApplicationContext(); // Get the required services - transferServiceImpl = (TransferServiceImpl) this.applicationContext.getBean("transferService"); + transferServiceImpl = (TransferServiceImpl2) this.applicationContext.getBean("transferService2"); transferService = transferServiceImpl; authenticationComponent = (AuthenticationComponent) applicationContext.getBean("authenticationComponent"); transactionService = (TransactionService) applicationContext.getBean("transactionComponent"); @@ -266,6 +270,14 @@ public class TransferServiceCallbackTest extends TestCase event.setTransferState(TransferState.COMMITTING); expectedEvents.add(event); + event = new TransferEventReport(); + event.setTransferState(TransferState.COMMITTING); + expectedEvents.add(event); + + event = new TransferEventReport(); + event.setTransferState(TransferState.COMMITTING); + expectedEvents.add(event); + event = new TransferEventEndState(); event.setTransferState(TransferState.COMMITTING); expectedEvents.add(event); @@ -278,12 +290,6 @@ public class TransferServiceCallbackTest extends TestCase event.setTransferState(TransferState.SUCCESS); expectedEvents.add(event); - event = new TransferEventReport(); - expectedEvents.add(event); - - event = new TransferEventReport(); - expectedEvents.add(event); - verifyCallback(expectedEvents); } @@ -443,6 +449,14 @@ public class TransferServiceCallbackTest extends TestCase event.setTransferState(TransferState.START); expectedEvents.add(event); + event = new TransferEventReport(); + event.setTransferState(TransferState.START); + expectedEvents.add(event); + + event = new TransferEventReport(); + event.setTransferState(TransferState.START); + expectedEvents.add(event); + event = new TransferEventEndState(); event.setTransferState(TransferState.START); expectedEvents.add(event); @@ -450,15 +464,12 @@ public class TransferServiceCallbackTest extends TestCase event = new TransferEventEnterState(); event.setTransferState(TransferState.ERROR); expectedEvents.add(event); - + event = new TransferEventError(); event.setTransferState(TransferState.ERROR); ((TransferEventError)event).setException(ex); expectedEvents.add(event); - event = new TransferEventReport(); - expectedEvents.add(event); - verifyCallback(expectedEvents); } } diff --git a/source/java/org/alfresco/repo/transfer/TransferServiceImpl.java b/source/java/org/alfresco/repo/transfer/TransferServiceImpl.java index 0513ce9873..67433c42dd 100644 --- a/source/java/org/alfresco/repo/transfer/TransferServiceImpl.java +++ b/source/java/org/alfresco/repo/transfer/TransferServiceImpl.java @@ -18,1318 +18,187 @@ */ package org.alfresco.repo.transfer; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.Serializable; -import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import javax.transaction.UserTransaction; -import javax.xml.parsers.SAXParser; -import javax.xml.parsers.SAXParserFactory; - -import org.alfresco.error.AlfrescoRuntimeException; -import org.alfresco.model.ContentModel; -import org.alfresco.repo.content.MimetypeMap; -import org.alfresco.repo.security.authentication.AuthenticationUtil; -import org.alfresco.repo.tenant.TenantService; -import org.alfresco.repo.transaction.RetryingTransactionHelper; -import org.alfresco.repo.transfer.manifest.TransferManifestDeletedNode; -import org.alfresco.repo.transfer.manifest.TransferManifestHeader; -import org.alfresco.repo.transfer.manifest.TransferManifestNode; -import org.alfresco.repo.transfer.manifest.TransferManifestNodeFactory; -import org.alfresco.repo.transfer.manifest.TransferManifestNodeHelper; -import org.alfresco.repo.transfer.manifest.TransferManifestNormalNode; -import org.alfresco.repo.transfer.manifest.TransferManifestProcessor; -import org.alfresco.repo.transfer.manifest.TransferManifestWriter; -import org.alfresco.repo.transfer.manifest.XMLTransferManifestReader; -import org.alfresco.repo.transfer.manifest.XMLTransferManifestWriter; -import org.alfresco.repo.transfer.report.TransferReporter; -import org.alfresco.repo.transfer.requisite.DeltaListRequsiteProcessor; -import org.alfresco.repo.transfer.requisite.XMLTransferRequsiteReader; -import org.alfresco.service.cmr.action.Action; -import org.alfresco.service.cmr.action.ActionService; -import org.alfresco.service.cmr.repository.ChildAssociationRef; -import org.alfresco.service.cmr.repository.ContentData; -import org.alfresco.service.cmr.repository.ContentWriter; import org.alfresco.service.cmr.repository.NodeRef; -import org.alfresco.service.cmr.repository.NodeService; -import org.alfresco.service.cmr.repository.StoreRef; -import org.alfresco.service.cmr.search.ResultSet; -import org.alfresco.service.cmr.search.SearchService; import org.alfresco.service.cmr.transfer.TransferCallback; import org.alfresco.service.cmr.transfer.TransferCancelledException; import org.alfresco.service.cmr.transfer.TransferDefinition; -import org.alfresco.service.cmr.transfer.TransferEvent; +import org.alfresco.service.cmr.transfer.TransferEndEvent; +import org.alfresco.service.cmr.transfer.TransferEventCancelled; import org.alfresco.service.cmr.transfer.TransferException; -import org.alfresco.service.cmr.transfer.TransferProgress; -import org.alfresco.service.cmr.transfer.TransferEventReport; import org.alfresco.service.cmr.transfer.TransferService; import org.alfresco.service.cmr.transfer.TransferTarget; -import org.alfresco.service.descriptor.Descriptor; -import org.alfresco.service.descriptor.DescriptorService; -import org.alfresco.service.namespace.NamespaceService; -import org.alfresco.service.namespace.QName; -import org.alfresco.service.namespace.RegexQNamePattern; -import org.alfresco.service.transaction.TransactionService; -import org.alfresco.util.TempFileProvider; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.alfresco.util.PropertyCheck; +/** + * Implementation of the Transfer Service. + * + * Note: The TransferService interface is now deprecated (replaced by TransferService2). This implementation + * delegates to the implementation of TransferService2. + * + * @author davidc + * + */ public class TransferServiceImpl implements TransferService { - private static final String MSG_NO_HOME = "transfer_service.unable_to_find_transfer_home"; - private static final String MSG_NO_GROUP = "transfer_service.unable_to_find_transfer_group"; - private static final String MSG_NO_TARGET = "transfer_service.unable_to_find_transfer_target"; - private static final String MSG_ERR_TRANSFER_ASYNC = "transfer_service.unable_to_transfer_async"; - private static final String MSG_TARGET_EXISTS = "transfer_service.target_exists"; - private static final String MSG_NO_NODES = "transfer_service.no_nodes"; - private static final String MSG_MISSING_ENDPOINT_PATH = "transfer_service.missing_endpoint_path"; - private static final String MSG_MISSING_ENDPOINT_PROTOCOL = "transfer_service.missing_endpoint_protocol"; - private static final String MSG_MISSING_ENDPOINT_HOST = "transfer_service.missing_endpoint_host"; - private static final String MSG_MISSING_ENDPOINT_PORT = "transfer_service.missing_endpoint_port"; - private static final String MSG_MISSING_ENDPOINT_USERNAME = "transfer_service.missing_endpoint_username"; - private static final String MSG_MISSING_ENDPOINT_PASSWORD = "transfer_service.missing_endpoint_password"; + private TransferServiceImpl2 transferServiceImpl2; - /** - * The synchronised list of transfers in progress. - */ - private Map transferMonitoring = Collections.synchronizedMap(new TreeMap()); - - private static Log logger = LogFactory.getLog(TransferServiceImpl.class); - - public void init() + public void setTransferServiceImpl2(TransferServiceImpl2 transferServiceImpl2) { - PropertyCheck.mandatory(this, "nodeService", nodeService); - PropertyCheck.mandatory(this, "searchService", getSearchService()); - PropertyCheck.mandatory(this, "transferSpaceQuery", transferSpaceQuery); - PropertyCheck.mandatory(this, "defaultTransferGroup", defaultTransferGroup); - PropertyCheck.mandatory(this, "transmitter", transmitter); - PropertyCheck.mandatory(this, "namespaceResolver", transmitter); - PropertyCheck.mandatory(this, "actionService", actionService); - PropertyCheck.mandatory(this, "transactionService", transactionService); - PropertyCheck.mandatory(this, "descriptorService", descriptorService); + this.transferServiceImpl2 = transferServiceImpl2; } - private String transferSpaceQuery; - private String defaultTransferGroup; - private NodeService nodeService; - private SearchService searchService; - private TransferTransmitter transmitter; - private TransactionService transactionService; - private ActionService actionService; - private TransferManifestNodeFactory transferManifestNodeFactory; - private TransferReporter transferReporter; - private TenantService tenantService; - private DescriptorService descriptorService; - - /** - * How long to delay while polling for commit status. - */ - private long commitPollDelay = 2000; - - /** - * Create a new in memory transfer target + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#createTransferTarget(java.lang.String) */ public TransferTarget createTransferTarget(String name) { - NodeRef dummy = lookupTransferTarget(name); - if(dummy != null) - { - throw new TransferException(MSG_TARGET_EXISTS, new Object[]{name} ); - } - - TransferTargetImpl newTarget = new TransferTargetImpl(); - newTarget.setName(name); - return newTarget; + return transferServiceImpl2.createTransferTarget(name); } - /** - * create transfer target + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#createAndSaveTransferTarget(java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, int, java.lang.String, java.lang.String, char[]) */ public TransferTarget createAndSaveTransferTarget(String name, String title, String description, String endpointProtocol, String endpointHost, int endpointPort, String endpointPath, String username, char[] password) { - TransferTargetImpl newTarget = new TransferTargetImpl(); - newTarget.setName(name); - newTarget.setTitle(title); - newTarget.setDescription(description); - newTarget.setEndpointProtocol(endpointProtocol); - newTarget.setEndpointHost(endpointHost); - newTarget.setEndpointPort(endpointPort); - newTarget.setEndpointPath(endpointPath); - newTarget.setUsername(username); - newTarget.setPassword(password); - return createTransferTarget(newTarget); - + return transferServiceImpl2.createAndSaveTransferTarget(name, title, description, endpointProtocol, endpointHost, endpointPort, endpointPath, username, password); } - /** - * create transfer target - */ - private TransferTarget createTransferTarget(TransferTarget newTarget) - { - /** - * Check whether name is already used - */ - NodeRef dummy = lookupTransferTarget(newTarget.getName()); - if (dummy != null) { throw new TransferException(MSG_TARGET_EXISTS, - new Object[] { newTarget.getName() }); } - - Map properties = new HashMap(); - - // type properties - properties.put(TransferModel.PROP_ENDPOINT_HOST, newTarget.getEndpointHost()); - properties.put(TransferModel.PROP_ENDPOINT_PORT, newTarget.getEndpointPort()); - properties.put(TransferModel.PROP_ENDPOINT_PROTOCOL, newTarget.getEndpointProtocol()); - properties.put(TransferModel.PROP_ENDPOINT_PATH, newTarget.getEndpointPath()); - properties.put(TransferModel.PROP_USERNAME, newTarget.getUsername()); - properties.put(TransferModel.PROP_PASSWORD, encrypt(newTarget.getPassword())); - - // titled aspect - properties.put(ContentModel.PROP_TITLE, newTarget.getTitle()); - properties.put(ContentModel.PROP_NAME, newTarget.getName()); - properties.put(ContentModel.PROP_DESCRIPTION, newTarget.getDescription()); - - // enableable aspect - properties.put(TransferModel.PROP_ENABLED, Boolean.TRUE); - - NodeRef home = getTransferHome(); - - /** - * Work out which group the transfer target is for, in this case the - * default group. - */ - NodeRef defaultGroup = nodeService.getChildByName(home, ContentModel.ASSOC_CONTAINS, - defaultTransferGroup); - - /** - * Go ahead and create the new node - */ - ChildAssociationRef ref = nodeService.createNode(defaultGroup, ContentModel.ASSOC_CONTAINS, - QName.createQName(TransferModel.TRANSFER_MODEL_1_0_URI, newTarget.getName()), - TransferModel.TYPE_TRANSFER_TARGET, properties); - - /** - * Now create a new TransferTarget object to return to the caller. - */ - TransferTargetImpl retVal = new TransferTargetImpl(); - mapTransferTarget(ref.getChildRef(), retVal); - - return retVal; - } - - /** - * Get all transfer targets + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#getTransferTargets() */ public Set getTransferTargets() { - NodeRef home = getTransferHome(); - - Set ret = new HashSet(); - - // get all groups - List groups = nodeService.getChildAssocs(home); - - // for each group - for(ChildAssociationRef group : groups) - { - NodeRef groupNode = group.getChildRef(); - ret.addAll(getTransferTargets(groupNode)); - } - - return ret; + return transferServiceImpl2.getTransferTargets(); } - /** - * Get all transfer targets in the specified group + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#getTransferTargets(java.lang.String) */ public Set getTransferTargets(String groupName) { - NodeRef home = getTransferHome(); - - // get group with assoc groupName - NodeRef groupNode = nodeService.getChildByName(home, ContentModel.ASSOC_CONTAINS, groupName); - - if(groupNode == null) - { - // No transfer group. - throw new TransferException(MSG_NO_GROUP, new Object[]{groupName}); - } - - return getTransferTargets(groupNode); + return transferServiceImpl2.getTransferTargets(groupName); } - /** - * Given the noderef of a group of transfer targets, return all the contained transfer targets. - * @param groupNode - * @return - */ - private Set getTransferTargets(NodeRef groupNode) - { - Set result = new HashSet(); - Listchildren = nodeService.getChildAssocs(groupNode, ContentModel.ASSOC_CONTAINS, RegexQNamePattern.MATCH_ALL); - - for(ChildAssociationRef child : children) - { - if(nodeService.getType(child.getChildRef()).equals(TransferModel.TYPE_TRANSFER_TARGET)) - { - TransferTargetImpl newTarget = new TransferTargetImpl(); - mapTransferTarget(child.getChildRef(), newTarget); - result.add(newTarget); - } - } - return result; - } - - /** - * + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#deleteTransferTarget(java.lang.String) */ public void deleteTransferTarget(String name) { - NodeRef nodeRef = lookupTransferTarget(name); - - if(nodeRef == null) - { - // target does not exist - throw new TransferException(MSG_NO_TARGET, new Object[]{name} ); - } - nodeService.deleteNode(nodeRef); + transferServiceImpl2.deleteTransferTarget(name); } - /** - * Enables/Disables the named transfer target + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#enableTransferTarget(java.lang.String, boolean) */ public void enableTransferTarget(String name, boolean enable) { - NodeRef nodeRef = lookupTransferTarget(name); - nodeService.setProperty(nodeRef, TransferModel.PROP_ENABLED, new Boolean(enable)); + transferServiceImpl2.enableTransferTarget(name, enable); } + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#targetExists(java.lang.String) + */ public boolean targetExists(String name) { - return (lookupTransferTarget(name) != null); + return transferServiceImpl2.targetExists(name); } - /** - * + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#getTransferTarget(java.lang.String) */ public TransferTarget getTransferTarget(String name) { - NodeRef nodeRef = lookupTransferTarget(name); - - if(nodeRef == null) - { - // target does not exist - throw new TransferException(MSG_NO_TARGET, new Object[]{name} ); - } - TransferTargetImpl newTarget = new TransferTargetImpl(); - mapTransferTarget(nodeRef, newTarget); - - return newTarget; + return transferServiceImpl2.getTransferTarget(name); } - /** - * create or update a transfer target. + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#saveTransferTarget(org.alfresco.service.cmr.transfer.TransferTarget) */ public TransferTarget saveTransferTarget(TransferTarget update) - { - if(update.getNodeRef() == null) - { - // This is a save for the first time - return createTransferTarget(update); - } - - NodeRef nodeRef = lookupTransferTarget(update.getName()); - if(nodeRef == null) - { - // target does not exist - throw new TransferException(MSG_NO_TARGET, new Object[]{update.getName()} ); - } - - Map properties = new HashMap(); - properties.put(TransferModel.PROP_ENDPOINT_HOST, update.getEndpointHost()); - properties.put(TransferModel.PROP_ENDPOINT_PORT, update.getEndpointPort()); - properties.put(TransferModel.PROP_ENDPOINT_PROTOCOL, update.getEndpointProtocol()); - properties.put(TransferModel.PROP_ENDPOINT_PATH, update.getEndpointPath()); - properties.put(TransferModel.PROP_USERNAME, update.getUsername()); - properties.put(TransferModel.PROP_PASSWORD, encrypt(update.getPassword())); - - // titled aspect - properties.put(ContentModel.PROP_TITLE, update.getTitle()); - properties.put(ContentModel.PROP_NAME, update.getName()); - properties.put(ContentModel.PROP_DESCRIPTION, update.getDescription()); - - properties.put(TransferModel.PROP_ENABLED, new Boolean(update.isEnabled())); - nodeService.setProperties(nodeRef, properties); - - TransferTargetImpl newTarget = new TransferTargetImpl(); - mapTransferTarget(nodeRef, newTarget); - return newTarget; - } - - /** - * Transfer sync without callbacks. - */ - public NodeRef transfer(String targetName, TransferDefinition definition) { - final TransferEventProcessor eventProcessor = new TransferEventProcessor(); - return transferImpl(targetName, definition, eventProcessor); + return transferServiceImpl2.saveTransferTarget(update); } - /** - * Transfer async. - * - * @param targetName - * @param definition - * @param callbacks - * + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#transferAsync(java.lang.String, org.alfresco.service.cmr.transfer.TransferDefinition, org.alfresco.service.cmr.transfer.TransferCallback[]) */ public void transferAsync(String targetName, TransferDefinition definition, TransferCallback... callbacks) { - transferAsync(targetName, definition, Arrays.asList(callbacks)); + transferServiceImpl2.transferAsync(targetName, definition, callbacks); } - /** - * Transfer async. - * - * @param targetName - * @param definition - * @param callbacks - * + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#transferAsync(java.lang.String, org.alfresco.service.cmr.transfer.TransferDefinition, java.util.Collection) */ public void transferAsync(String targetName, TransferDefinition definition, Collection callbacks) { - /** - * Event processor for this transfer instance - */ - final TransferEventProcessor eventProcessor = new TransferEventProcessor(); - if(callbacks != null) - { - eventProcessor.observers.addAll(callbacks); - } - - /* - * Note: - * callback should be Serializable to be passed through the action API - * However Serializable is not used so it does not matter. Perhaps the action API should be - * changed? Or we could add a Serializable proxy here. - */ - - Map params = new HashMap(); - params.put("targetName", targetName); - params.put("definition", definition); - params.put("callbacks", (Serializable)callbacks); - - Action transferAction = getActionService().createAction("transfer-async", params); - - /** - * Execute transfer async in its own transaction. - * The action service only runs actions in the post commit which is why there's - * a separate transaction here. - */ - boolean success = false; - UserTransaction trx = transactionService.getNonPropagatingUserTransaction(); - try - { - trx.begin(); - logger.debug("calling action service to execute action"); - getActionService().executeAction(transferAction, null, false, true); - trx.commit(); - logger.debug("committed successfully"); - success = true; - } - catch (Exception error) - { - logger.error("unexpected exception", error); - throw new AlfrescoRuntimeException(MSG_ERR_TRANSFER_ASYNC, error); - } - finally - { - if(!success) - { - try - { - logger.debug("rolling back after error"); - trx.rollback(); - } - catch (Exception error) - { - logger.error("unexpected exception during rollback", error); - // There's nothing much we can do here - } - } - } + transferServiceImpl2.transferAsync(targetName, definition, callbacks); } - /** - * Transfer Synchronous - * @param targetName - * @param definition - * @param callbacks + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#transfer(java.lang.String, org.alfresco.service.cmr.transfer.TransferDefinition) + */ + public NodeRef transfer(String targetName, TransferDefinition definition) + { + return transfer(targetName, definition, new TransferCallback[]{}); + } + + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#transfer(java.lang.String, org.alfresco.service.cmr.transfer.TransferDefinition, org.alfresco.service.cmr.transfer.TransferCallback[]) */ public NodeRef transfer(String targetName, TransferDefinition definition, TransferCallback... callbacks) { return transfer(targetName, definition, Arrays.asList(callbacks)); } - /** - * Transfer Synchronous - * - * @param targetName - * @param definition - * @param callbacks + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#transfer(java.lang.String, org.alfresco.service.cmr.transfer.TransferDefinition, java.util.Collection) */ - public NodeRef transfer(String targetName, TransferDefinition definition, Collection callbacks) { - /** - * Event processor for this transfer instance - */ - final TransferEventProcessor eventProcessor = new TransferEventProcessor(); - if(callbacks != null) + TransferEndEvent event = transferServiceImpl2.transfer(targetName, definition, callbacks); + if (event instanceof TransferEventCancelled) { - eventProcessor.observers.addAll(callbacks); + // NOTE: throw this exception to keep compatibility with TransferService contract + throw new TransferCancelledException(); } - - /** - * Now go ahead and do the transfer - */ - return transferImpl(targetName, definition, eventProcessor); + return event.getSourceReport(); } - - /** - * Transfer Implementation - * @param targetName name of transfer target - * @param definition thranser definition - * @param eventProcessor - */ - private NodeRef transferImpl(String targetName, final TransferDefinition definition, final TransferEventProcessor eventProcessor) - { - if(logger.isDebugEnabled()) - { - logger.debug("transfer started to :" + targetName); - } - - SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmssSSSZ"); - String transferName = format.format(new Date()); - - /** - * Wire in the transferReport - so any callbacks are stored in transferReport - */ - final List transferReport = new LinkedList(); - TransferCallback reportCallback = new TransferCallback() - { - private static final long serialVersionUID = 4072579605731036522L; - - public void processEvent(TransferEvent event) - { - transferReport.add(event); - } - }; - eventProcessor.addObserver(reportCallback); - - File snapshotFile = null; - File reqFile = null; - - TransferTarget target = null; - try - { - target = getTransferTarget(targetName); - - // which nodes to write to the snapshot - Setnodes = definition.getNodes(); - - if(nodes == null || nodes.size() == 0) - { - logger.debug("no nodes to transfer"); - throw new TransferException(MSG_NO_NODES); - } - - /** - * create snapshot - */ - String prefix = "TRX-SNAP"; - String suffix = ".xml"; - - logger.debug("create snapshot"); - - // where to put snapshot ? - File tempDir = TempFileProvider.getLongLifeTempDir("transfer"); - snapshotFile = TempFileProvider.createTempFile(prefix, suffix, tempDir); - reqFile = TempFileProvider.createTempFile("TRX-REQ", suffix, tempDir); - - FileOutputStream reqOutput = new FileOutputStream(reqFile); - FileWriter snapshotWriter = new FileWriter(snapshotFile); - - // Write the manifest file - TransferManifestWriter formatter = new XMLTransferManifestWriter(); - TransferManifestHeader header = new TransferManifestHeader(); - Descriptor descriptor = descriptorService.getCurrentRepositoryDescriptor(); - header.setRepositoryId(descriptor.getId()); - header.setCreatedDate(new Date()); - header.setNodeCount(nodes.size()); - header.setSync(definition.isSync()); - header.setReadOnly(definition.isReadOnly()); - formatter.startTransferManifest(snapshotWriter); - formatter.writeTransferManifestHeader(header); - for(NodeRef nodeRef : nodes) - { - TransferManifestNode node = transferManifestNodeFactory.createTransferManifestNode(nodeRef); - formatter.writeTransferManifestNode(node); - } - formatter.endTransferManifest(); - snapshotWriter.close(); - - logger.debug("snapshot file written to local filesystem"); - // If we are debugging then write the file to stdout. - if(logger.isDebugEnabled()) - { - try - { - outputFile(snapshotFile); - } - catch (IOException error) - { - // This is debug code - so an exception thrown while debugging - logger.debug("error while outputting snapshotFile"); - error.printStackTrace(); - } - } - - /** - * Begin - */ - logger.debug("transfer begin"); - eventProcessor.start(); - final Transfer transfer = transmitter.begin(target); - if(transfer != null) - { - String transferId = transfer.getTransferId(); - TransferStatus status = new TransferStatus(transferId); - transferMonitoring.put(transferId, status); - logger.debug("transfer begun transferId:" + transferId); - - boolean prepared = false; - try - { - eventProcessor.begin(transferId); - checkCancel(transferId); - - /** - * send Manifest, get the requsite back. - */ - eventProcessor.sendSnapshot(1,1); - transmitter.sendManifest(transfer, snapshotFile, reqOutput); - - if(logger.isDebugEnabled()) - { - logger.debug("requsite file written to local filesystem"); - try - { - outputFile(reqFile); - } - catch (IOException error) - { - // This is debug code - so an exception thrown while debugging - logger.debug("error while outputting snapshotFile"); - error.printStackTrace(); - } - } - - - logger.debug("manifest sent"); - - SAXParserFactory saxParserFactory = SAXParserFactory.newInstance(); - SAXParser parser; - parser = saxParserFactory.newSAXParser(); - - /** - * Parse the requsite file to generate the delta list - */ - DeltaListRequsiteProcessor reqProcessor = new DeltaListRequsiteProcessor(); - XMLTransferRequsiteReader reqReader = new XMLTransferRequsiteReader(reqProcessor); - parser.parse(reqFile, reqReader); - - final DeltaList deltaList = reqProcessor.getDeltaList(); - - /** - * Parse the manifest file and transfer chunks over - * - * ManifestFile -> Manifest Processor -> Chunker -> Transmitter - * - * Step 1: Create a chunker and wire it up to the transmitter - */ - final ContentChunker chunker = new ContentChunkerImpl(); - final Long fRange = Long.valueOf(nodes.size()); - chunker.setHandler( - new ContentChunkProcessor(){ - private long counter = 0; - public void processChunk(Set data) - { - checkCancel(transfer.getTransferId()); - logger.debug("send chunk to transmitter"); - for(ContentData file : data) - { - counter++; - eventProcessor.sendContent(file, fRange, counter); - } - transmitter.sendContent(transfer, data); - } - } - ); - - /** - * Step 2 : create a manifest processor and wire it up to the chunker - */ - TransferManifestProcessor processor = new TransferManifestProcessor() - { - public void processTransferManifestNode(TransferManifestNormalNode node) - { - Set data = TransferManifestNodeHelper.getContentData(node); - for(ContentData d : data) - { - checkCancel(transfer.getTransferId()); - logger.debug("add content to chunker"); - - /** - * Check with the deltaList whether we need to send the content item - */ - if(deltaList != null) - { - String partName = TransferCommons.URLToPartName(d.getContentUrl()); - if(deltaList.getRequiredParts().contains(partName)) - { - logger.debug("content is required :" + d.getContentUrl()); - chunker.addContent(d); - } - } - else - { - // No delta list - so send all content items - chunker.addContent(d); - } - } - } - - public void processTransferManifiestHeader(TransferManifestHeader header){/* NO-OP */ } - public void startTransferManifest(){ /* NO-OP */ } - public void endTransferManifest(){ /* NO-OP */ } - public void processTransferManifestNode(TransferManifestDeletedNode node) - { /* NO-OP */ - } - }; - - /** - * Step 3: wire up the manifest reader to a manifest processor - */ - - XMLTransferManifestReader reader = new XMLTransferManifestReader(processor); - - /** - * Step 4: start the magic - Give the manifest file to the manifest reader - */ - parser.parse(snapshotFile, reader); - chunker.flush(); - - /** - * Content all sent over - */ - logger.debug("content sending finished"); - checkCancel(transfer.getTransferId()); - - /** - * prepare - */ - eventProcessor.prepare(); - transmitter.prepare(transfer); - logger.debug("prepared transferId:" + transferId); - checkCancel(transfer.getTransferId()); - - /** - * committing - */ - eventProcessor.commit(); - transmitter.commit(transfer); - logger.debug("committing transferId:" + transferId); - checkCancel(transfer.getTransferId()); - - /** - * need to poll for committed status - */ - TransferProgress progress = null; - - int position = -1; - for(int retries = 0; retries < 3; retries++) - { - checkCancel(transfer.getTransferId()); - try - { - progress = transmitter.getStatus(transfer); - if(progress.getCurrentPosition() != position) - { - position = progress.getCurrentPosition(); - eventProcessor.committing(progress.getEndPosition(), position); - } - if(progress.isFinished()) - { - logger.debug("isFinished=true"); - break; - } - retries = 0; - } - catch (TransferException te) - { - logger.debug("error while committing - retrying", te); - } - - /** - * Now sleep for a while. - */ - Thread.sleep(commitPollDelay); - } - logger.debug("Finished transferId:" + transferId); - - checkCancel(transfer.getTransferId()); - - /** - * committed - */ - eventProcessor.success(); - checkCancel(transfer.getTransferId()); - prepared = true; - - logger.debug("committed transferId:" + transferId); - - /** - * Now pull back and persist the destination transfer report. - */ - logger.debug("now pull back the destination transfer report"); - NodeRef destReportNode = persistDestinationTransferReport(transferName, transfer, target, tempDir); - if(destReportNode != null) - { - eventProcessor.writeReport(destReportNode, TransferEventReport.ReportType.DESTINATION); - } - - /** - * Write the Successful transfer report if we get here - */ - logger.debug("now persist the client side transfer report"); - NodeRef reportNode = persistTransferReport(transferName, transfer, target, definition, transferReport, snapshotFile); - if(reportNode != null) - { - eventProcessor.writeReport(reportNode, TransferEventReport.ReportType.SOURCE); - } - - logger.debug("success - at end of method transferId:" + transferId); - return reportNode; - } - finally - { - - logger.debug("remove monitoring for transferId:" + transferId); - transferMonitoring.remove(transferId); - logger.debug("removed monitoring for transferId:" + transferId); - - if(!prepared) - { - - logger.debug("abort incomplete transfer"); - transmitter.abort(transfer); - - /** - * Now pull back and persist the destination error transfer report. - */ - logger.debug("now pull back the destination transfer report"); - NodeRef destReportNode = persistDestinationTransferReport(transferName + " error", transfer, target, tempDir); - if(destReportNode != null) - { - eventProcessor.writeReport(destReportNode, TransferEventReport.ReportType.DESTINATION); - } - } - } - } // end of transfer - - //TODO Do we ever get here ? - logger.debug("returning null - unable lock target"); - return null; - } - catch (TransferException t) - { - logger.debug("TransferException - unable to transfer", t); - eventProcessor.error(t); - - /** - * Write the transfer report. This is an error report so needs to be out - */ - if(target != null ) - { - NodeRef reportNode = persistTransferReport(transferName, t, target, definition, transferReport, snapshotFile); - if(reportNode != null) - { - eventProcessor.writeReport(reportNode, TransferEventReport.ReportType.SOURCE); - } - } - throw t; - } - catch (Exception t) - { - // Wrap any other exception as a transfer exception - logger.debug("Exception - unable to transfer", t); - eventProcessor.error(t); - - /** - * Write the transfer report. This is an error report so needs to be out - */ - if(target != null ) - { - NodeRef reportNode = persistTransferReport(transferName, t, target, definition, transferReport, snapshotFile); - if(reportNode != null) - { - eventProcessor.writeReport(reportNode, TransferEventReport.ReportType.SOURCE); - } - } - - /** - * Wrap the exception as a transfer exception - */ - throw new TransferException("unable to transfer:" + t.toString(), t); - } - finally - { - /** - * clean up - */ - if(snapshotFile != null) - { - snapshotFile.delete(); - } - logger.debug("snapshot file deleted"); - - if(reqFile != null) - { - reqFile.delete(); - } - logger.debug("req file deleted"); - - - } - } // end of transferImpl - - /** - * CancelAsync - */ - public void cancelAsync(String transferHandle) - { - TransferStatus status = transferMonitoring.get(transferHandle); - if(status != null) - { - logger.debug("canceling transfer :" + transferHandle); - status.cancelMe = true; - } - } - - /** - * Check whether the specified transfer should be cancelled. - * @param transferHandle - * @throws TransferException - the transfer has been cancelled. - */ - private void checkCancel(String transferHandle) throws TransferException - { - TransferStatus status = transferMonitoring.get(transferHandle); - if(status != null) - { - if(status.cancelMe) - { - throw new TransferCancelledException(); - } - } - } - - public void setNodeService(NodeService nodeService) - { - this.nodeService = nodeService; - } - - public NodeService getNodeService() - { - return nodeService; - } - - public void setSearchService(SearchService searchService) - { - this.searchService = searchService; - } - - public SearchService getSearchService() - { - return searchService; - } - - public void setTenantService(TenantService tenantService) - { - this.tenantService = tenantService; - } - - public void setTransferSpaceQuery(String transferSpaceQuery) - { - this.transferSpaceQuery = transferSpaceQuery; - } - - public String getTransferSpaceQuery() - { - return transferSpaceQuery; - } - - public void setDefaultTransferGroup(String defaultGroup) - { - this.defaultTransferGroup = defaultGroup; - } - - public String getDefaultTransferGroup() - { - return defaultTransferGroup; - } - - public TransferTransmitter getTransmitter() - { - return transmitter; - } - - public void setTransmitter(TransferTransmitter transmitter) - { - this.transmitter = transmitter; - } - - private Map transferHomeMap = new ConcurrentHashMap(); - protected NodeRef getTransferHome() - { - String tenantDomain = tenantService.getUserDomain(AuthenticationUtil.getRunAsUser()); - NodeRef transferHome = transferHomeMap.get(tenantDomain); - if(transferHome == null) - { - String query = transferSpaceQuery; - - ResultSet result = searchService.query(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, - SearchService.LANGUAGE_XPATH, query); - - if(result.length() == 0) - { - // No transfer home. - throw new TransferException(MSG_NO_HOME, new Object[]{query}); - } - if (result.getNodeRefs().size() != 0) - { - transferHome = result.getNodeRef(0); - transferHomeMap.put(tenantDomain, transferHome); - } - } - return transferHome; - } - - private char[] encrypt(char[] text) - { - // placeholder dummy implementation - add an 'E' to the start -// String dummy = new String("E" + text); -// String dummy = new String(text); -// return dummy.toCharArray(); - return text; - } - - private char[] decrypt(char[] text) - { - // placeholder dummy implementation - strips off leading 'E' -// String dummy = new String(text); - return text; - //return dummy.substring(1).toCharArray(); - } - - /** - * - * @param name - * @return - */ - private NodeRef lookupTransferTarget(String name) - { - String query = "+TYPE:\"trx:transferTarget\" +@cm\\:name:\"" +name + "\""; - - ResultSet result = searchService.query(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, - SearchService.LANGUAGE_LUCENE, query); - - if(result.length() == 1) - { - return result.getNodeRef(0); - } - return null; - } - - private void mapTransferTarget(NodeRef nodeRef, TransferTargetImpl def) - { - def.setNodeRef(nodeRef); - Map properties = nodeService.getProperties(nodeRef); - String name = (String)properties.get(ContentModel.PROP_NAME); - - String endpointPath = (String)properties.get(TransferModel.PROP_ENDPOINT_PATH); - if (endpointPath == null) - throw new TransferException(MSG_MISSING_ENDPOINT_PATH, new Object[] {name}); - def.setEndpointPath(endpointPath); - - String endpointProtocol = (String)properties.get(TransferModel.PROP_ENDPOINT_PROTOCOL); - if (endpointProtocol == null) - throw new TransferException(MSG_MISSING_ENDPOINT_PROTOCOL, new Object[] {name}); - def.setEndpointProtocol(endpointProtocol); - - String endpointHost = (String)properties.get(TransferModel.PROP_ENDPOINT_HOST); - if (endpointHost== null) - throw new TransferException(MSG_MISSING_ENDPOINT_HOST, new Object[] {name}); - def.setEndpointHost(endpointHost); - - Integer endpointPort = (Integer)properties.get(TransferModel.PROP_ENDPOINT_PORT); - if (endpointPort == null) - throw new TransferException(MSG_MISSING_ENDPOINT_PORT, new Object[] {name}); - def.setEndpointPort(endpointPort); - - String username = (String)properties.get(TransferModel.PROP_USERNAME); - if (username == null) - throw new TransferException(MSG_MISSING_ENDPOINT_USERNAME, new Object[] {name}); - def.setUsername(username); - - Serializable passwordVal = properties.get(TransferModel.PROP_PASSWORD); - if (passwordVal == null) - throw new TransferException(MSG_MISSING_ENDPOINT_PASSWORD, new Object[] {name}); - if(passwordVal.getClass().isArray()) - { - def.setPassword(decrypt((char[])passwordVal)); - } - if(passwordVal instanceof String) - { - String password = (String)passwordVal; - def.setPassword(decrypt(password.toCharArray())); - } - - def.setName(name); - def.setTitle((String)properties.get(ContentModel.PROP_TITLE)); - def.setDescription((String)properties.get(ContentModel.PROP_DESCRIPTION)); - - if(nodeService.hasAspect(nodeRef, TransferModel.ASPECT_ENABLEABLE)) - { - def.setEnabled((Boolean)properties.get(TransferModel.PROP_ENABLED)); - } - } - - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.alfresco.service.cmr.transfer.TransferService#verify(org.alfresco.service.cmr.transfer.TransferTarget) */ public void verify(TransferTarget target) throws TransferException { - transmitter.verifyTarget(target); + transferServiceImpl2.verify(target); } - - /** - * Utility to dump the contents of a file to the console - * @param file + + /* + * (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#cancelAsync(java.lang.String) */ - private static void outputFile(File file) throws IOException + public void cancelAsync(String transferId) { - BufferedReader reader = new BufferedReader(new FileReader(file)); - String s = reader.readLine(); - while(s != null) - { - System.out.println(s); - s = reader.readLine(); - } + transferServiceImpl2.cancelAsync(transferId); } - - - /** - * Success transfer report - */ - private NodeRef persistTransferReport(final String transferName, final Transfer transfer, final TransferTarget target, final TransferDefinition definition, final List events, final File snapshotFile) - { - /** - * persist the transfer report in its own transaction so it cannot be rolled back - */ - NodeRef reportNode = transactionService.getRetryingTransactionHelper().doInTransaction( - new RetryingTransactionHelper.RetryingTransactionCallback() - { - public NodeRef execute() throws Throwable - { - logger.debug("transfer report starting"); - NodeRef reportNode = transferReporter.createTransferReport(transferName, transfer, target, definition, events, snapshotFile); - logger.debug("transfer report done"); - return reportNode; - } - }, false, true); - return reportNode; - } - - /** - * Error Transfer report - */ - private NodeRef persistTransferReport(final String transferName, final Exception t, final TransferTarget target, final TransferDefinition definition, final List events, final File snapshotFile) - { - /** - * in its own transaction so it cannot be rolled back and equally if it does roll back itself - * then it does not affect the enclosing transfer. - */ - - NodeRef reportNode = transactionService.getRetryingTransactionHelper().doInTransaction( - new RetryingTransactionHelper.RetryingTransactionCallback() - { - public NodeRef execute() throws Throwable - { - logger.debug("transfer report starting"); - NodeRef reportNode = transferReporter.createTransferReport(transferName, t, target, definition, events, snapshotFile); - logger.debug("transfer report done"); - return reportNode; - } - }, false, true); - return reportNode; - } - - /** - * Destination Transfer report - * @return the node ref of the transfer report or null if there isn't one. - */ - private NodeRef persistDestinationTransferReport(final String transferName, - final Transfer transfer, - final TransferTarget target, - final File tempDir) - { - /** - * in its own transaction so it cannot be rolled back - */ - NodeRef reportNode = transactionService.getRetryingTransactionHelper().doInTransaction( - new RetryingTransactionHelper.RetryingTransactionCallback() - { - public NodeRef execute() throws Throwable - { - try - { - File destReportFile = TempFileProvider.createTempFile("TRX-DREP", ".xml", tempDir); - FileOutputStream destReportOutput = new FileOutputStream(destReportFile); - transmitter.getTransferReport(transfer, destReportOutput); - logger.debug("transfer report (destination) starting"); - - NodeRef reportNode = transferReporter.writeDestinationReport(transferName, target, destReportFile); - logger.debug("transfer report (destination) done"); - - if(destReportFile != null) - { - destReportFile.delete(); - } - logger.debug("destination report temp file deleted"); - - return reportNode; - } - catch(FileNotFoundException ie) - { - // there's nothing we can do here. - but we do not want the exception to propogate up. - logger.debug("unexpected error while obtaining destination transfer report", ie); - return null; - } - catch(TransferException ie) - { - // there's nothing we can do here. - but we do not want the exception to propogate up. - logger.debug("unexpected error while obtaining destination transfer report", ie); - return null; - } - } // end execute - }); - - return reportNode; - } - - public void setTransferManifestNodeFactory(TransferManifestNodeFactory transferManifestNodeFactory) - { - this.transferManifestNodeFactory = transferManifestNodeFactory; - } - - public TransferManifestNodeFactory getTransferManifestNodeFactory() - { - return transferManifestNodeFactory; - } - - public void setActionService(ActionService actionService) - { - this.actionService = actionService; - } - - public ActionService getActionService() - { - return actionService; - } - - public void setTransactionService(TransactionService transactionService) - { - this.transactionService = transactionService; - } - - public TransactionService getTransactionService() - { - return transactionService; - } - - public void setTransferReporter(TransferReporter transferReporter) - { - this.transferReporter = transferReporter; - } - - public TransferReporter getTransferReporter() - { - return transferReporter; - } - - public void setCommitPollDelay(long commitPollDelay) - { - this.commitPollDelay = commitPollDelay; - } - - public long getCommitPollDelay() - { - return commitPollDelay; - } - - public void setDescriptorService(DescriptorService descriptorService) - { - this.descriptorService = descriptorService; - } - - public DescriptorService getDescriptorService() - { - return descriptorService; - } - - private class TransferStatus - { - TransferStatus(String transferId) - { - this.transferId = transferId; - } - String transferId; - boolean cancelMe = false; - } - - - } diff --git a/source/java/org/alfresco/repo/transfer/TransferServiceImpl2.java b/source/java/org/alfresco/repo/transfer/TransferServiceImpl2.java new file mode 100644 index 0000000000..94938ec229 --- /dev/null +++ b/source/java/org/alfresco/repo/transfer/TransferServiceImpl2.java @@ -0,0 +1,1402 @@ +/* + * Copyright (C) 2009-2010 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.repo.transfer; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +import javax.transaction.UserTransaction; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.parsers.SAXParser; +import javax.xml.parsers.SAXParserFactory; + +import org.alfresco.error.AlfrescoRuntimeException; +import org.alfresco.model.ContentModel; +import org.alfresco.repo.security.authentication.AuthenticationUtil; +import org.alfresco.repo.tenant.TenantService; +import org.alfresco.repo.transaction.RetryingTransactionHelper; +import org.alfresco.repo.transfer.manifest.TransferManifestDeletedNode; +import org.alfresco.repo.transfer.manifest.TransferManifestHeader; +import org.alfresco.repo.transfer.manifest.TransferManifestNode; +import org.alfresco.repo.transfer.manifest.TransferManifestNodeFactory; +import org.alfresco.repo.transfer.manifest.TransferManifestNodeHelper; +import org.alfresco.repo.transfer.manifest.TransferManifestNormalNode; +import org.alfresco.repo.transfer.manifest.TransferManifestProcessor; +import org.alfresco.repo.transfer.manifest.TransferManifestWriter; +import org.alfresco.repo.transfer.manifest.XMLTransferManifestReader; +import org.alfresco.repo.transfer.manifest.XMLTransferManifestWriter; +import org.alfresco.repo.transfer.report.TransferReporter; +import org.alfresco.repo.transfer.requisite.DeltaListRequsiteProcessor; +import org.alfresco.repo.transfer.requisite.XMLTransferRequsiteReader; +import org.alfresco.service.cmr.action.Action; +import org.alfresco.service.cmr.action.ActionService; +import org.alfresco.service.cmr.repository.ChildAssociationRef; +import org.alfresco.service.cmr.repository.ContentData; +import org.alfresco.service.cmr.repository.NodeRef; +import org.alfresco.service.cmr.repository.NodeService; +import org.alfresco.service.cmr.repository.StoreRef; +import org.alfresco.service.cmr.search.ResultSet; +import org.alfresco.service.cmr.search.SearchService; +import org.alfresco.service.cmr.transfer.TransferCallback; +import org.alfresco.service.cmr.transfer.TransferCancelledException; +import org.alfresco.service.cmr.transfer.TransferDefinition; +import org.alfresco.service.cmr.transfer.TransferEndEvent; +import org.alfresco.service.cmr.transfer.TransferEvent; +import org.alfresco.service.cmr.transfer.TransferEventCancelled; +import org.alfresco.service.cmr.transfer.TransferEventError; +import org.alfresco.service.cmr.transfer.TransferEventReport; +import org.alfresco.service.cmr.transfer.TransferEventSuccess; +import org.alfresco.service.cmr.transfer.TransferException; +import org.alfresco.service.cmr.transfer.TransferFailureException; +import org.alfresco.service.cmr.transfer.TransferProgress; +import org.alfresco.service.cmr.transfer.TransferService2; +import org.alfresco.service.cmr.transfer.TransferTarget; +import org.alfresco.service.descriptor.Descriptor; +import org.alfresco.service.descriptor.DescriptorService; +import org.alfresco.service.namespace.QName; +import org.alfresco.service.namespace.RegexQNamePattern; +import org.alfresco.service.transaction.TransactionService; +import org.alfresco.util.PropertyCheck; +import org.alfresco.util.TempFileProvider; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.xml.sax.SAXException; + + +/** + * Implementation of the Transfer Service. + * + * @author Mark Rogers + * + */ +public class TransferServiceImpl2 implements TransferService2 +{ + private static Log logger = LogFactory.getLog(TransferServiceImpl2.class); + + private static final String MSG_NO_HOME = "transfer_service.unable_to_find_transfer_home"; + private static final String MSG_NO_GROUP = "transfer_service.unable_to_find_transfer_group"; + private static final String MSG_NO_TARGET = "transfer_service.unable_to_find_transfer_target"; + private static final String MSG_ERR_TRANSFER_ASYNC = "transfer_service.unable_to_transfer_async"; + private static final String MSG_TARGET_EXISTS = "transfer_service.target_exists"; + private static final String MSG_NO_NODES = "transfer_service.no_nodes"; + private static final String MSG_MISSING_ENDPOINT_PATH = "transfer_service.missing_endpoint_path"; + private static final String MSG_MISSING_ENDPOINT_PROTOCOL = "transfer_service.missing_endpoint_protocol"; + private static final String MSG_MISSING_ENDPOINT_HOST = "transfer_service.missing_endpoint_host"; + private static final String MSG_MISSING_ENDPOINT_PORT = "transfer_service.missing_endpoint_port"; + private static final String MSG_MISSING_ENDPOINT_USERNAME = "transfer_service.missing_endpoint_username"; + private static final String MSG_MISSING_ENDPOINT_PASSWORD = "transfer_service.missing_endpoint_password"; + private static final String MSG_FAILED_TO_GET_TRANSFER_STATUS = "transfer_service.failed_to_get_transfer_status"; + private static final String MSG_TARGET_ERROR = "transfer_service.target_error"; + private static final String MSG_UNKNOWN_TARGET_ERROR = "transfer_service.unknown_target_error"; + + private static final String FILE_DIRECTORY = "transfer"; + private static final String FILE_SUFFIX = ".xml"; + + private enum ClientTransferState { Begin, Prepare, Commit, Poll, Cancel, Finished, Exit; }; + + /** + * The synchronised list of transfers in progress. + */ + private Map transferMonitoring = Collections.synchronizedMap(new TreeMap()); + + public void init() + { + PropertyCheck.mandatory(this, "nodeService", nodeService); + PropertyCheck.mandatory(this, "searchService", getSearchService()); + PropertyCheck.mandatory(this, "transferSpaceQuery", transferSpaceQuery); + PropertyCheck.mandatory(this, "defaultTransferGroup", defaultTransferGroup); + PropertyCheck.mandatory(this, "transmitter", transmitter); + PropertyCheck.mandatory(this, "namespaceResolver", transmitter); + PropertyCheck.mandatory(this, "actionService", actionService); + PropertyCheck.mandatory(this, "transactionService", transactionService); + PropertyCheck.mandatory(this, "descriptorService", descriptorService); + } + + private String transferSpaceQuery; + private String defaultTransferGroup; + private NodeService nodeService; + private SearchService searchService; + private TransferTransmitter transmitter; + private TransactionService transactionService; + private ActionService actionService; + private TransferManifestNodeFactory transferManifestNodeFactory; + private TransferReporter transferReporter; + private TenantService tenantService; + private DescriptorService descriptorService; + + /** + * How long to delay while polling for commit status. + */ + private long commitPollDelay = 2000; + + /** + * Create a new in memory transfer target + */ + public TransferTarget createTransferTarget(String name) + { + NodeRef dummy = lookupTransferTarget(name); + if(dummy != null) + { + throw new TransferException(MSG_TARGET_EXISTS, new Object[]{name} ); + } + + TransferTargetImpl newTarget = new TransferTargetImpl(); + newTarget.setName(name); + return newTarget; + } + + /** + * create transfer target + */ + public TransferTarget createAndSaveTransferTarget(String name, String title, String description, String endpointProtocol, String endpointHost, int endpointPort, String endpointPath, String username, char[] password) + { + TransferTargetImpl newTarget = new TransferTargetImpl(); + newTarget.setName(name); + newTarget.setTitle(title); + newTarget.setDescription(description); + newTarget.setEndpointProtocol(endpointProtocol); + newTarget.setEndpointHost(endpointHost); + newTarget.setEndpointPort(endpointPort); + newTarget.setEndpointPath(endpointPath); + newTarget.setUsername(username); + newTarget.setPassword(password); + return createTransferTarget(newTarget); + + } + + /** + * create transfer target + */ + private TransferTarget createTransferTarget(TransferTarget newTarget) + { + /** + * Check whether name is already used + */ + NodeRef dummy = lookupTransferTarget(newTarget.getName()); + if (dummy != null) { throw new TransferException(MSG_TARGET_EXISTS, + new Object[] { newTarget.getName() }); } + + Map properties = new HashMap(); + + // type properties + properties.put(TransferModel.PROP_ENDPOINT_HOST, newTarget.getEndpointHost()); + properties.put(TransferModel.PROP_ENDPOINT_PORT, newTarget.getEndpointPort()); + properties.put(TransferModel.PROP_ENDPOINT_PROTOCOL, newTarget.getEndpointProtocol()); + properties.put(TransferModel.PROP_ENDPOINT_PATH, newTarget.getEndpointPath()); + properties.put(TransferModel.PROP_USERNAME, newTarget.getUsername()); + properties.put(TransferModel.PROP_PASSWORD, encrypt(newTarget.getPassword())); + + // titled aspect + properties.put(ContentModel.PROP_TITLE, newTarget.getTitle()); + properties.put(ContentModel.PROP_NAME, newTarget.getName()); + properties.put(ContentModel.PROP_DESCRIPTION, newTarget.getDescription()); + + // enableable aspect + properties.put(TransferModel.PROP_ENABLED, Boolean.TRUE); + + NodeRef home = getTransferHome(); + + /** + * Work out which group the transfer target is for, in this case the + * default group. + */ + NodeRef defaultGroup = nodeService.getChildByName(home, ContentModel.ASSOC_CONTAINS, + defaultTransferGroup); + + /** + * Go ahead and create the new node + */ + ChildAssociationRef ref = nodeService.createNode(defaultGroup, ContentModel.ASSOC_CONTAINS, + QName.createQName(TransferModel.TRANSFER_MODEL_1_0_URI, newTarget.getName()), + TransferModel.TYPE_TRANSFER_TARGET, properties); + + /** + * Now create a new TransferTarget object to return to the caller. + */ + TransferTargetImpl retVal = new TransferTargetImpl(); + mapTransferTarget(ref.getChildRef(), retVal); + + return retVal; + } + + /** + * Get all transfer targets + */ + public Set getTransferTargets() + { + NodeRef home = getTransferHome(); + + Set ret = new HashSet(); + + // get all groups + List groups = nodeService.getChildAssocs(home); + + // for each group + for(ChildAssociationRef group : groups) + { + NodeRef groupNode = group.getChildRef(); + ret.addAll(getTransferTargets(groupNode)); + } + + return ret; + } + + /** + * Get all transfer targets in the specified group + */ + public Set getTransferTargets(String groupName) + { + NodeRef home = getTransferHome(); + + // get group with assoc groupName + NodeRef groupNode = nodeService.getChildByName(home, ContentModel.ASSOC_CONTAINS, groupName); + + if(groupNode == null) + { + // No transfer group. + throw new TransferException(MSG_NO_GROUP, new Object[]{groupName}); + } + + return getTransferTargets(groupNode); + } + + /** + * Given the noderef of a group of transfer targets, return all the contained transfer targets. + * @param groupNode + * @return + */ + private Set getTransferTargets(NodeRef groupNode) + { + Set result = new HashSet(); + Listchildren = nodeService.getChildAssocs(groupNode, ContentModel.ASSOC_CONTAINS, RegexQNamePattern.MATCH_ALL); + + for(ChildAssociationRef child : children) + { + if(nodeService.getType(child.getChildRef()).equals(TransferModel.TYPE_TRANSFER_TARGET)) + { + TransferTargetImpl newTarget = new TransferTargetImpl(); + mapTransferTarget(child.getChildRef(), newTarget); + result.add(newTarget); + } + } + return result; + } + + /** + * + */ + public void deleteTransferTarget(String name) + { + NodeRef nodeRef = lookupTransferTarget(name); + + if(nodeRef == null) + { + // target does not exist + throw new TransferException(MSG_NO_TARGET, new Object[]{name} ); + } + nodeService.deleteNode(nodeRef); + } + + /** + * Enables/Disables the named transfer target + */ + public void enableTransferTarget(String name, boolean enable) + { + NodeRef nodeRef = lookupTransferTarget(name); + nodeService.setProperty(nodeRef, TransferModel.PROP_ENABLED, new Boolean(enable)); + } + + public boolean targetExists(String name) + { + return (lookupTransferTarget(name) != null); + } + + /** + * + */ + public TransferTarget getTransferTarget(String name) + { + NodeRef nodeRef = lookupTransferTarget(name); + + if(nodeRef == null) + { + // target does not exist + throw new TransferException(MSG_NO_TARGET, new Object[]{name} ); + } + TransferTargetImpl newTarget = new TransferTargetImpl(); + mapTransferTarget(nodeRef, newTarget); + + return newTarget; + } + + /** + * create or update a transfer target. + */ + public TransferTarget saveTransferTarget(TransferTarget update) + { + if(update.getNodeRef() == null) + { + // This is a save for the first time + return createTransferTarget(update); + } + + NodeRef nodeRef = lookupTransferTarget(update.getName()); + if(nodeRef == null) + { + // target does not exist + throw new TransferException(MSG_NO_TARGET, new Object[]{update.getName()} ); + } + + Map properties = new HashMap(); + properties.put(TransferModel.PROP_ENDPOINT_HOST, update.getEndpointHost()); + properties.put(TransferModel.PROP_ENDPOINT_PORT, update.getEndpointPort()); + properties.put(TransferModel.PROP_ENDPOINT_PROTOCOL, update.getEndpointProtocol()); + properties.put(TransferModel.PROP_ENDPOINT_PATH, update.getEndpointPath()); + properties.put(TransferModel.PROP_USERNAME, update.getUsername()); + properties.put(TransferModel.PROP_PASSWORD, encrypt(update.getPassword())); + + // titled aspect + properties.put(ContentModel.PROP_TITLE, update.getTitle()); + properties.put(ContentModel.PROP_NAME, update.getName()); + properties.put(ContentModel.PROP_DESCRIPTION, update.getDescription()); + + properties.put(TransferModel.PROP_ENABLED, new Boolean(update.isEnabled())); + nodeService.setProperties(nodeRef, properties); + + TransferTargetImpl newTarget = new TransferTargetImpl(); + mapTransferTarget(nodeRef, newTarget); + return newTarget; + } + + /** + * Transfer async. + * + * @param targetName + * @param definition + * @param callbacks + * + */ + public void transferAsync(String targetName, TransferDefinition definition, TransferCallback... callbacks) + { + transferAsync(targetName, definition, Arrays.asList(callbacks)); + } + + /** + * Transfer async. + * + * @param targetName + * @param definition + * @param callbacks + * + */ + public void transferAsync(String targetName, TransferDefinition definition, Collection callbacks) + { + /** + * Event processor for this transfer instance + */ + final TransferEventProcessor eventProcessor = new TransferEventProcessor(); + if(callbacks != null) + { + eventProcessor.observers.addAll(callbacks); + } + + /* + * Note: + * callback should be Serializable to be passed through the action API + * However Serializable is not used so it does not matter. Perhaps the action API should be + * changed? Or we could add a Serializable proxy here. + */ + + Map params = new HashMap(); + params.put("targetName", targetName); + params.put("definition", definition); + params.put("callbacks", (Serializable)callbacks); + + Action transferAction = getActionService().createAction("transfer-async", params); + + /** + * Execute transfer async in its own transaction. + * The action service only runs actions in the post commit which is why there's + * a separate transaction here. + */ + boolean success = false; + UserTransaction trx = transactionService.getNonPropagatingUserTransaction(); + try + { + trx.begin(); + logger.debug("calling action service to execute action"); + getActionService().executeAction(transferAction, null, false, true); + trx.commit(); + logger.debug("committed successfully"); + success = true; + } + catch (Exception error) + { + logger.error("unexpected exception", error); + throw new AlfrescoRuntimeException(MSG_ERR_TRANSFER_ASYNC, error); + } + finally + { + if(!success) + { + try + { + logger.debug("rolling back after error"); + trx.rollback(); + } + catch (Exception error) + { + logger.error("unexpected exception during rollback", error); + // There's nothing much we can do here + } + } + } + } + + /** + * Transfer Synchronous + * + * @param targetName + * @param definition + * @param callbacks + */ + public TransferEndEvent transfer(String targetName, TransferDefinition definition, TransferCallback... callbacks) + throws TransferFailureException + { + return transfer(targetName, definition, Arrays.asList(callbacks)); + } + + /** + * Transfer Synchronous + * + * @param targetName + * @param definition + * @param callbacks + */ + public TransferEndEvent transfer(String targetName, TransferDefinition definition, Collection callbacks) + throws TransferFailureException + { + /** + * Event processor for this transfer instance + */ + final TransferEventProcessor eventProcessor = new TransferEventProcessor(); + if(callbacks != null) + { + eventProcessor.observers.addAll(callbacks); + } + + /** + * Now go ahead and do the transfer + */ + return transferImpl(targetName, definition, eventProcessor); + } + + private TransferEndEvent transferImpl(String targetName, final TransferDefinition definition, final TransferEventProcessor eventProcessor) + throws TransferFailureException + { + if(logger.isDebugEnabled()) + { + logger.debug("transfer started to :" + targetName); + } + + // transfer end event + TransferEndEvent endEvent = null; + Exception failureException = null; + TransferTarget target = null; + Transfer transfer = null; + final List transferReportEvents = new LinkedList(); + NodeRef sourceReport = null; + NodeRef destinationReport = null; + File manifest = null; + File requisite = null; + int pollRetries = 0; + int pollPosition = -1; + boolean cancelled = false; + + // Wire in the transferReport - so any callbacks are stored in transferReport + TransferCallback reportCallback = new TransferCallback() + { + private static final long serialVersionUID = 4072579605731036522L; + + public void processEvent(TransferEvent event) + { + transferReportEvents.add(event); + } + }; + eventProcessor.addObserver(reportCallback); + + // start transfer + ClientTransferState clientState = ClientTransferState.Begin; + while(clientState != ClientTransferState.Exit) + { + try + { + switch (clientState) + { + case Begin: + { + eventProcessor.start(); + manifest = createManifest(definition); + logger.debug("transfer begin"); + target = getTransferTarget(targetName); + transfer = transmitter.begin(target); + String transferId = transfer.getTransferId(); + TransferStatus status = new TransferStatus(); + transferMonitoring.put(transferId, status); + logger.debug("transfer begun transferId:" + transferId); + eventProcessor.begin(transferId); + checkCancel(transferId); + + // next state + clientState = ClientTransferState.Prepare; + break; + } + + case Prepare: + { + // send Manifest, get the requsite back. + eventProcessor.sendSnapshot(1,1); + + requisite = createRequisiteFile(); + FileOutputStream reqOutput = new FileOutputStream(requisite); + transmitter.sendManifest(transfer, manifest, reqOutput); + logger.debug("manifest sent"); + checkCancel(transfer.getTransferId()); + + if(logger.isDebugEnabled()) + { + logger.debug("requisite file written to local filesystem"); + try + { + outputFile(requisite); + } + catch (IOException error) + { + // This is debug code - so an exception thrown while debugging + logger.debug("error while outputting snapshotFile"); + error.printStackTrace(); + } + } + + sendContent(transfer, definition, eventProcessor, manifest, requisite); + logger.debug("content sending finished"); + checkCancel(transfer.getTransferId()); + + // prepare + eventProcessor.prepare(); + transmitter.prepare(transfer); + checkCancel(transfer.getTransferId()); + + // next state + clientState = ClientTransferState.Commit; + break; + } + + case Commit: + { + eventProcessor.commit(); + transmitter.commit(transfer); + logger.debug("committing transferId:" + transfer.getTransferId()); + checkCancel(transfer.getTransferId()); + + // next state + clientState = ClientTransferState.Poll; + break; + } + + case Poll: + { + TransferProgress progress = null; + try + { + progress = transmitter.getStatus(transfer); + + // reset retries for next poll + pollRetries = 0; + } + catch(TransferException e) + { + pollRetries++; + if (pollRetries == 3) + { + throw new TransferException(MSG_FAILED_TO_GET_TRANSFER_STATUS, new Object[] {target.getName()}); + } + } + + // notify transfer progress + if (progress.getCurrentPosition() != pollPosition) + { + pollPosition = progress.getCurrentPosition(); + eventProcessor.committing(progress.getEndPosition(), pollPosition); + } + + // check status + if (progress.getStatus() == TransferProgress.Status.ERROR) + { + Throwable targetError = progress.getError(); + // NOTE: it's possible the error is not returned from pre v3.4 target repositories + if (targetError == null) + { + targetError = new TransferException(MSG_UNKNOWN_TARGET_ERROR); + } + failureException = new TransferException(MSG_TARGET_ERROR, new Object[] {targetError.getMessage()}, targetError); + clientState = ClientTransferState.Finished; + break; + } + else if (progress.getStatus() == TransferProgress.Status.CANCELLED) + { + cancelled = true; + clientState = ClientTransferState.Finished; + break; + } + else if (progress.getStatus() == TransferProgress.Status.COMPLETE) + { + clientState = ClientTransferState.Finished; + break; + } + + checkCancel(transfer.getTransferId()); + + // NOTE: stay in poll state... + // sleep before next poll + try + { + Thread.sleep(commitPollDelay); + } + catch (InterruptedException e) + { + // carry on + } + break; + } + + case Cancel: + { + logger.debug("Abort - waiting for target confirmation of cancel"); + transmitter.abort(transfer); + + // next state... poll for confirmation of cancel from target + clientState = ClientTransferState.Poll; + break; + } + + case Finished: + { + try + { + TransferEndEventImpl endEventImpl = null; + + try + { + if (failureException != null) + { + logger.debug("TransferException - unable to transfer", failureException); + TransferEventError errorEvent = new TransferEventError(); + errorEvent.setTransferState(TransferEvent.TransferState.ERROR); + TransferFailureException endException = new TransferFailureException(errorEvent); + errorEvent.setMessage(endException.getMessage()); + errorEvent.setException(endException); + endEventImpl = errorEvent; + } + else if (cancelled) + { + endEventImpl = new TransferEventCancelled(); + endEventImpl.setTransferState(TransferEvent.TransferState.CANCELLED); + endEventImpl.setMessage("cancelled"); + } + else + { + logger.debug("committed transferId:" + transfer.getTransferId()); + endEventImpl = new TransferEventSuccess(); + endEventImpl.setTransferState(TransferEvent.TransferState.SUCCESS); + endEventImpl.setMessage("success"); + } + + // manually add the terminal event to the transfer report event list + transferReportEvents.add(endEventImpl); + } + catch(Exception e) + { + // report this failure as last resort + failureException = e; + logger.warn("Exception - unable to notify end transfer state", e); + } + + String transferName = new SimpleDateFormat("yyyyMMddhhmmssSSSZ").format(new Date()); + + try + { + logger.debug("now pull back the destination transfer report"); + destinationReport = persistDestinationTransferReport(transferName, transfer, target); + if (destinationReport != null) + { + eventProcessor.writeReport(destinationReport, TransferEventReport.ReportType.DESTINATION); + } + + logger.debug("now persist the client side transfer report"); + sourceReport = persistTransferReport(transferName, transfer, target, definition, transferReportEvents, manifest, failureException); + if (sourceReport != null) + { + eventProcessor.writeReport(sourceReport, TransferEventReport.ReportType.SOURCE); + } + } + catch(Exception e) + { + logger.warn("Exception - unable to write transfer reports", e); + } + + try + { + endEventImpl.setLast(true); + endEventImpl.setSourceReport(sourceReport); + endEventImpl.setDestinationReport(destinationReport); + endEvent = endEventImpl; + eventProcessor.end(endEvent); + } + catch(Exception e) + { + // report this failure as last resort + failureException = e; + logger.warn("Exception - unable to notify end transfer state", e); + } + } + finally + { + clientState = ClientTransferState.Exit; + } + } + } + } + catch(TransferCancelledException e) + { + logger.debug("Interrupted by transfer cancel request from client"); + clientState = ClientTransferState.Cancel; + } + catch(Exception e) + { + logger.debug("Exception - unable to transfer", e); + failureException = e; + clientState = ClientTransferState.Finished; + } + } + + try + { + if (endEvent == null) + { + TransferEventError error = new TransferEventError(); + error.setTransferState(TransferEvent.TransferState.ERROR); + TransferFailureException endException = new TransferFailureException(error); + error.setMessage(endException.getMessage()); + error.setException(endException); + error.setSourceReport(sourceReport); + error.setDestinationReport(destinationReport); + error.setLast(true); + endEvent = error; + } + if (endEvent instanceof TransferEventError) + { + TransferEventError endError = (TransferEventError)endEvent; + throw (TransferFailureException)endError.getException(); + } + return endEvent; + } + finally + { + // clean up + if (transfer != null) + { + transferMonitoring.remove(transfer.getTransferId()); + } + if(manifest != null) + { + manifest.delete(); + logger.debug("manifest file deleted"); + } + + if(requisite != null) + { + requisite.delete(); + logger.debug("requisite file deleted"); + } + } + } + + private File createManifest(TransferDefinition definition) + throws IOException, SAXException + { + // which nodes to write to the snapshot + Setnodes = definition.getNodes(); + + if(nodes == null || nodes.size() == 0) + { + logger.debug("no nodes to transfer"); + throw new TransferException(MSG_NO_NODES); + } + + /** + * create snapshot + */ + logger.debug("create snapshot"); + + // where to put snapshot ? + File tempDir = TempFileProvider.getLongLifeTempDir(FILE_DIRECTORY); + File snapshotFile = TempFileProvider.createTempFile("TRX-SNAP", FILE_SUFFIX, tempDir); + FileWriter snapshotWriter = new FileWriter(snapshotFile); + + // Write the manifest file + TransferManifestWriter formatter = new XMLTransferManifestWriter(); + TransferManifestHeader header = new TransferManifestHeader(); + Descriptor descriptor = descriptorService.getCurrentRepositoryDescriptor(); + header.setRepositoryId(descriptor.getId()); + header.setCreatedDate(new Date()); + header.setNodeCount(nodes.size()); + header.setSync(definition.isSync()); + header.setReadOnly(definition.isReadOnly()); + formatter.startTransferManifest(snapshotWriter); + formatter.writeTransferManifestHeader(header); + for(NodeRef nodeRef : nodes) + { + TransferManifestNode node = transferManifestNodeFactory.createTransferManifestNode(nodeRef); + formatter.writeTransferManifestNode(node); + } + formatter.endTransferManifest(); + snapshotWriter.close(); + + logger.debug("snapshot file written to local filesystem"); + // If we are debugging then write the file to stdout. + if(logger.isDebugEnabled()) + { + try + { + outputFile(snapshotFile); + } + catch (IOException error) + { + // This is debug code - so an exception thrown while debugging + logger.debug("error while outputting snapshotFile"); + error.printStackTrace(); + } + } + + return snapshotFile; + } + + private File createRequisiteFile() + { + File tempDir = TempFileProvider.getLongLifeTempDir(FILE_DIRECTORY); + File reqFile = TempFileProvider.createTempFile("TRX-REQ", FILE_SUFFIX, tempDir); + return reqFile; + } + + private void sendContent(final Transfer transfer, final TransferDefinition definition, final TransferEventProcessor eventProcessor, + File manifest, File requisite) + throws SAXException, ParserConfigurationException, IOException + { + SAXParserFactory saxParserFactory = SAXParserFactory.newInstance(); + SAXParser parser; + parser = saxParserFactory.newSAXParser(); + + /** + * Parse the requisite file to generate the delta list + */ + DeltaListRequsiteProcessor reqProcessor = new DeltaListRequsiteProcessor(); + XMLTransferRequsiteReader reqReader = new XMLTransferRequsiteReader(reqProcessor); + parser.parse(requisite, reqReader); + + final DeltaList deltaList = reqProcessor.getDeltaList(); + + /** + * Parse the manifest file and transfer chunks over + * + * ManifestFile -> Manifest Processor -> Chunker -> Transmitter + * + * Step 1: Create a chunker and wire it up to the transmitter + */ + final ContentChunker chunker = new ContentChunkerImpl(); + final Long fRange = Long.valueOf(definition.getNodes().size()); + chunker.setHandler( + new ContentChunkProcessor(){ + private long counter = 0; + public void processChunk(Set data) + { + checkCancel(transfer.getTransferId()); + logger.debug("send chunk to transmitter"); + for(ContentData file : data) + { + counter++; + eventProcessor.sendContent(file, fRange, counter); + } + transmitter.sendContent(transfer, data); + } + } + ); + + /** + * Step 2 : create a manifest processor and wire it up to the chunker + */ + TransferManifestProcessor processor = new TransferManifestProcessor() + { + public void processTransferManifestNode(TransferManifestNormalNode node) + { + Set data = TransferManifestNodeHelper.getContentData(node); + for(ContentData d : data) + { + checkCancel(transfer.getTransferId()); + logger.debug("add content to chunker"); + + /** + * Check with the deltaList whether we need to send the content item + */ + if(deltaList != null) + { + String partName = TransferCommons.URLToPartName(d.getContentUrl()); + if(deltaList.getRequiredParts().contains(partName)) + { + logger.debug("content is required :" + d.getContentUrl()); + chunker.addContent(d); + } + } + else + { + // No delta list - so send all content items + chunker.addContent(d); + } + } + } + + public void processTransferManifiestHeader(TransferManifestHeader header){/* NO-OP */ } + public void startTransferManifest(){ /* NO-OP */ } + public void endTransferManifest(){ /* NO-OP */ } + public void processTransferManifestNode(TransferManifestDeletedNode node) + { /* NO-OP */ + } + }; + + /** + * Step 3: wire up the manifest reader to a manifest processor + */ + + XMLTransferManifestReader reader = new XMLTransferManifestReader(processor); + + /** + * Step 4: start the magic - Give the manifest file to the manifest reader + */ + parser.parse(manifest, reader); + chunker.flush(); + } + + /** + * CancelAsync + */ + public void cancelAsync(String transferHandle) + { + TransferStatus status = transferMonitoring.get(transferHandle); + if(status != null) + { + logger.debug("canceling transfer :" + transferHandle); + status.cancelMe = true; + } + } + + /** + * Check whether the specified transfer should be cancelled. + * @param transferHandle + * @throws TransferException - the transfer has been cancelled. + */ + private void checkCancel(String transferHandle) throws TransferException + { + TransferStatus status = transferMonitoring.get(transferHandle); + if(status != null) + { + if(status.cancelMe) + { + throw new TransferCancelledException(); + } + } + } + + public void setNodeService(NodeService nodeService) + { + this.nodeService = nodeService; + } + + public NodeService getNodeService() + { + return nodeService; + } + + public void setSearchService(SearchService searchService) + { + this.searchService = searchService; + } + + public SearchService getSearchService() + { + return searchService; + } + + public void setTenantService(TenantService tenantService) + { + this.tenantService = tenantService; + } + + public void setTransferSpaceQuery(String transferSpaceQuery) + { + this.transferSpaceQuery = transferSpaceQuery; + } + + public String getTransferSpaceQuery() + { + return transferSpaceQuery; + } + + public void setDefaultTransferGroup(String defaultGroup) + { + this.defaultTransferGroup = defaultGroup; + } + + public String getDefaultTransferGroup() + { + return defaultTransferGroup; + } + + public TransferTransmitter getTransmitter() + { + return transmitter; + } + + public void setTransmitter(TransferTransmitter transmitter) + { + this.transmitter = transmitter; + } + + private Map transferHomeMap = new ConcurrentHashMap(); + protected NodeRef getTransferHome() + { + String tenantDomain = tenantService.getUserDomain(AuthenticationUtil.getRunAsUser()); + NodeRef transferHome = transferHomeMap.get(tenantDomain); + if(transferHome == null) + { + String query = transferSpaceQuery; + + ResultSet result = searchService.query(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, + SearchService.LANGUAGE_XPATH, query); + + if(result.length() == 0) + { + // No transfer home. + throw new TransferException(MSG_NO_HOME, new Object[]{query}); + } + if (result.getNodeRefs().size() != 0) + { + transferHome = result.getNodeRef(0); + transferHomeMap.put(tenantDomain, transferHome); + } + } + return transferHome; + } + + private char[] encrypt(char[] text) + { + // placeholder dummy implementation - add an 'E' to the start +// String dummy = new String("E" + text); +// String dummy = new String(text); +// return dummy.toCharArray(); + return text; + } + + private char[] decrypt(char[] text) + { + // placeholder dummy implementation - strips off leading 'E' +// String dummy = new String(text); + return text; + //return dummy.substring(1).toCharArray(); + } + + /** + * + * @param name + * @return + */ + private NodeRef lookupTransferTarget(String name) + { + String query = "+TYPE:\"trx:transferTarget\" +@cm\\:name:\"" +name + "\""; + + ResultSet result = searchService.query(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, + SearchService.LANGUAGE_LUCENE, query); + + if(result.length() == 1) + { + return result.getNodeRef(0); + } + return null; + } + + private void mapTransferTarget(NodeRef nodeRef, TransferTargetImpl def) + { + def.setNodeRef(nodeRef); + Map properties = nodeService.getProperties(nodeRef); + String name = (String)properties.get(ContentModel.PROP_NAME); + + String endpointPath = (String)properties.get(TransferModel.PROP_ENDPOINT_PATH); + if (endpointPath == null) + throw new TransferException(MSG_MISSING_ENDPOINT_PATH, new Object[] {name}); + def.setEndpointPath(endpointPath); + + String endpointProtocol = (String)properties.get(TransferModel.PROP_ENDPOINT_PROTOCOL); + if (endpointProtocol == null) + throw new TransferException(MSG_MISSING_ENDPOINT_PROTOCOL, new Object[] {name}); + def.setEndpointProtocol(endpointProtocol); + + String endpointHost = (String)properties.get(TransferModel.PROP_ENDPOINT_HOST); + if (endpointHost== null) + throw new TransferException(MSG_MISSING_ENDPOINT_HOST, new Object[] {name}); + def.setEndpointHost(endpointHost); + + Integer endpointPort = (Integer)properties.get(TransferModel.PROP_ENDPOINT_PORT); + if (endpointPort == null) + throw new TransferException(MSG_MISSING_ENDPOINT_PORT, new Object[] {name}); + def.setEndpointPort(endpointPort); + + String username = (String)properties.get(TransferModel.PROP_USERNAME); + if (username == null) + throw new TransferException(MSG_MISSING_ENDPOINT_USERNAME, new Object[] {name}); + def.setUsername(username); + + Serializable passwordVal = properties.get(TransferModel.PROP_PASSWORD); + if (passwordVal == null) + throw new TransferException(MSG_MISSING_ENDPOINT_PASSWORD, new Object[] {name}); + if(passwordVal.getClass().isArray()) + { + def.setPassword(decrypt((char[])passwordVal)); + } + if(passwordVal instanceof String) + { + String password = (String)passwordVal; + def.setPassword(decrypt(password.toCharArray())); + } + + def.setName(name); + def.setTitle((String)properties.get(ContentModel.PROP_TITLE)); + def.setDescription((String)properties.get(ContentModel.PROP_DESCRIPTION)); + + if(nodeService.hasAspect(nodeRef, TransferModel.ASPECT_ENABLEABLE)) + { + def.setEnabled((Boolean)properties.get(TransferModel.PROP_ENABLED)); + } + } + + /* (non-Javadoc) + * @see org.alfresco.service.cmr.transfer.TransferService#verify(org.alfresco.service.cmr.transfer.TransferTarget) + */ + public void verify(TransferTarget target) throws TransferException + { + transmitter.verifyTarget(target); + } + + /** + * Utility to dump the contents of a file to the console + * @param file + */ + private static void outputFile(File file) throws IOException + { + BufferedReader reader = new BufferedReader(new FileReader(file)); + String s = reader.readLine(); + while(s != null) + { + System.out.println(s); + s = reader.readLine(); + } + } + + + /** + * Success transfer report + */ + private NodeRef persistTransferReport(final String transferName, final Transfer transfer, final TransferTarget target, final TransferDefinition definition, + final List events, final File snapshotFile, final Exception exception) + { + /** + * persist the transfer report in its own transaction so it cannot be rolled back + */ + NodeRef reportNode = transactionService.getRetryingTransactionHelper().doInTransaction( + new RetryingTransactionHelper.RetryingTransactionCallback() + { + public NodeRef execute() throws Throwable + { + logger.debug("transfer report starting"); + NodeRef reportNode = null; + if (exception != null) + { + reportNode = transferReporter.createTransferReport(transferName, exception, target, definition, events, snapshotFile); + + } + else + { + reportNode = transferReporter.createTransferReport(transferName, transfer, target, definition, events, snapshotFile); + } + logger.debug("transfer report done"); + return reportNode; + } + }, false, true); + return reportNode; + } + + /** + * Destination Transfer report + * @return the node ref of the transfer report or null if there isn't one. + */ + private NodeRef persistDestinationTransferReport(final String transferName, + final Transfer transfer, + final TransferTarget target) + { + /** + * in its own transaction so it cannot be rolled back + */ + NodeRef reportNode = transactionService.getRetryingTransactionHelper().doInTransaction( + new RetryingTransactionHelper.RetryingTransactionCallback() + { + public NodeRef execute() throws Throwable + { + try + { + File tempDir = TempFileProvider.getLongLifeTempDir(FILE_DIRECTORY); + File destReportFile = TempFileProvider.createTempFile("TRX-DREP", FILE_SUFFIX, tempDir); + FileOutputStream destReportOutput = new FileOutputStream(destReportFile); + transmitter.getTransferReport(transfer, destReportOutput); + logger.debug("transfer report (destination) starting"); + + NodeRef reportNode = transferReporter.writeDestinationReport(transferName, target, destReportFile); + logger.debug("transfer report (destination) done"); + + if(destReportFile != null) + { + destReportFile.delete(); + } + logger.debug("destination report temp file deleted"); + + return reportNode; + } + catch(FileNotFoundException ie) + { + // there's nothing we can do here. - but we do not want the exception to propogate up. + logger.debug("unexpected error while obtaining destination transfer report", ie); + return null; + } + catch(TransferException ie) + { + // there's nothing we can do here. - but we do not want the exception to propogate up. + logger.debug("unexpected error while obtaining destination transfer report", ie); + return null; + } + } // end execute + }, false, true); + + return reportNode; + } + + public void setTransferManifestNodeFactory(TransferManifestNodeFactory transferManifestNodeFactory) + { + this.transferManifestNodeFactory = transferManifestNodeFactory; + } + + public TransferManifestNodeFactory getTransferManifestNodeFactory() + { + return transferManifestNodeFactory; + } + + public void setActionService(ActionService actionService) + { + this.actionService = actionService; + } + + public ActionService getActionService() + { + return actionService; + } + + public void setTransactionService(TransactionService transactionService) + { + this.transactionService = transactionService; + } + + public TransactionService getTransactionService() + { + return transactionService; + } + + public void setTransferReporter(TransferReporter transferReporter) + { + this.transferReporter = transferReporter; + } + + public TransferReporter getTransferReporter() + { + return transferReporter; + } + + public void setCommitPollDelay(long commitPollDelay) + { + this.commitPollDelay = commitPollDelay; + } + + public long getCommitPollDelay() + { + return commitPollDelay; + } + + public void setDescriptorService(DescriptorService descriptorService) + { + this.descriptorService = descriptorService; + } + + public DescriptorService getDescriptorService() + { + return descriptorService; + } + + private class TransferStatus + { + boolean cancelMe = false; + } + + + + +} diff --git a/source/java/org/alfresco/repo/transfer/TransferServiceImplTest.java b/source/java/org/alfresco/repo/transfer/TransferServiceImplTest.java index 1eb8da4de5..e1e3d6e40b 100644 --- a/source/java/org/alfresco/repo/transfer/TransferServiceImplTest.java +++ b/source/java/org/alfresco/repo/transfer/TransferServiceImplTest.java @@ -39,6 +39,7 @@ import javax.xml.validation.SchemaFactory; import javax.xml.validation.Validator; import org.alfresco.model.ContentModel; +import org.alfresco.repo.content.MimetypeMap; import org.alfresco.repo.security.authentication.AuthenticationComponent; import org.alfresco.repo.security.authentication.AuthenticationUtil; import org.alfresco.repo.transfer.manifest.TransferManifestNodeFactory; @@ -95,7 +96,7 @@ public class TransferServiceImplTest extends BaseAlfrescoSpringTest { private TransferService transferService; private ContentService contentService; - private TransferServiceImpl transferServiceImpl; + private TransferServiceImpl2 transferServiceImpl; private SearchService searchService; private TransactionService transactionService; private TransferReceiver receiver; @@ -130,7 +131,7 @@ public class TransferServiceImplTest extends BaseAlfrescoSpringTest // Get the required services this.transferService = (TransferService)this.applicationContext.getBean("TransferService"); this.contentService = (ContentService)this.applicationContext.getBean("ContentService"); - this.transferServiceImpl = (TransferServiceImpl)this.applicationContext.getBean("transferService"); + this.transferServiceImpl = (TransferServiceImpl2)this.applicationContext.getBean("transferService2"); this.searchService = (SearchService)this.applicationContext.getBean("SearchService"); this.transactionService = (TransactionService)this.applicationContext.getBean("TransactionService"); this.nodeService = (NodeService) this.applicationContext.getBean("nodeService"); @@ -1803,7 +1804,7 @@ public class TransferServiceImplTest extends BaseAlfrescoSpringTest */ assertTrue("transfer report is too small", transferReport.size() > 3); assertTrue("transfer report does not start with START", transferReport.get(0).getTransferState().equals(TransferEvent.TransferState.START)); - assertTrue("transfer report does not end with ERROR", transferReport.get(transferReport.size()-2).getTransferState().equals(TransferEvent.TransferState.ERROR)); + assertTrue("transfer report does not end with CANCELLED", transferReport.get(transferReport.size()-1).getTransferState().equals(TransferEvent.TransferState.CANCELLED)); // last event is the transfer report event. } finally @@ -2057,16 +2058,18 @@ public class TransferServiceImplTest extends BaseAlfrescoSpringTest // Now validate the destination side transfer report against the XSD ContentReader reader = contentService.getReader(reportNode, ContentModel.PROP_CONTENT); assertNotNull("transfer reader is null", reader); - - Source transferReportSource = new StreamSource(reader.getContentInputStream()); - try + if (reader.getMimetype().equals(MimetypeMap.MIMETYPE_XML)) { - validator.validate(transferReportSource); + Source transferReportSource = new StreamSource(reader.getContentInputStream()); + try + { + validator.validate(transferReportSource); + } + catch (Exception e) + { + fail("Destination Transfer Report reportNode:" + reportNode + " message :" + e.getMessage() ); + } } - catch (Exception e) - { - fail("Destination Transfer Report reportNode:" + reportNode + " message :" + e.getMessage() ); - } } } finally diff --git a/source/java/org/alfresco/repo/transfer/report/TransferReporterImpl.java b/source/java/org/alfresco/repo/transfer/report/TransferReporterImpl.java index 1f21bcf596..aabc98cefd 100644 --- a/source/java/org/alfresco/repo/transfer/report/TransferReporterImpl.java +++ b/source/java/org/alfresco/repo/transfer/report/TransferReporterImpl.java @@ -23,8 +23,6 @@ import java.io.File; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Serializable; -import java.text.SimpleDateFormat; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -38,7 +36,6 @@ import org.alfresco.model.ContentModel; import org.alfresco.repo.content.MimetypeMap; import org.alfresco.repo.transfer.Transfer; import org.alfresco.repo.transfer.TransferModel; -import org.alfresco.repo.transfer.TransferServiceImpl; import org.alfresco.repo.transfer.manifest.TransferManifestDeletedNode; import org.alfresco.repo.transfer.manifest.TransferManifestHeader; import org.alfresco.repo.transfer.manifest.TransferManifestNormalNode; @@ -99,7 +96,7 @@ public class TransferReporterImpl implements TransferReporter { Map properties = new HashMap (); - String title = transferName + " , error"; + String title = transferName; String description = "Transfer error report, " + transferName + " targetName " + target.getName(); String name = transferName; @@ -167,7 +164,7 @@ public class TransferReporterImpl implements TransferReporter { Map properties = new HashMap (); - String title = transferName + ", success"; + String title = transferName; String description = "Transfer success report : " + transferName + " targetName: " + target.getName(); String name = transferName; @@ -290,7 +287,7 @@ public class TransferReporterImpl implements TransferReporter File tempFile) { - String title = transferName + ", destination, success"; + String title = transferName + " destination"; String description = "Transfer Destination Report, success, targetName : " + target.getName(); String name = transferName + " destination"; diff --git a/source/java/org/alfresco/service/cmr/transfer/TransferEndEvent.java b/source/java/org/alfresco/service/cmr/transfer/TransferEndEvent.java new file mode 100644 index 0000000000..1b9d3349be --- /dev/null +++ b/source/java/org/alfresco/service/cmr/transfer/TransferEndEvent.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2005-2010 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.service.cmr.transfer; + +import org.alfresco.service.cmr.repository.NodeRef; + +/** + * TransferEvents are produced by the Transfer service during an in flight + * transfer. + * + *

+ * The TransferCallback presents TransferEvents for processing. + * + * @see TransferCallback + * + * @author davidc + */ +public interface TransferEndEvent extends TransferEvent +{ + /** + * Gets the report generated by the transfer source repository + * + * @return source transfer report + */ + public NodeRef getSourceReport(); + + /** + * Gets the report generated by the transfer destination repository + * + * @return destination transfer report + */ + public NodeRef getDestinationReport(); +} diff --git a/source/java/org/alfresco/service/cmr/transfer/TransferEvent.java b/source/java/org/alfresco/service/cmr/transfer/TransferEvent.java index e7a0cc7eda..ab19d123c8 100644 --- a/source/java/org/alfresco/service/cmr/transfer/TransferEvent.java +++ b/source/java/org/alfresco/service/cmr/transfer/TransferEvent.java @@ -35,7 +35,7 @@ public interface TransferEvent /** * The transfer events will Start with a START event and finish with either SUCCESS or ERROR */ - enum TransferState { START, SENDING_SNAPSHOT, SENDING_CONTENT, PREPARING, COMMITTING, SUCCESS, ERROR }; + enum TransferState { START, SENDING_SNAPSHOT, SENDING_CONTENT, PREPARING, COMMITTING, SUCCESS, ERROR, CANCELLED }; /** * Get the state of this transfer diff --git a/source/java/org/alfresco/service/cmr/transfer/TransferEventCancelled.java b/source/java/org/alfresco/service/cmr/transfer/TransferEventCancelled.java new file mode 100644 index 0000000000..38a5f5815a --- /dev/null +++ b/source/java/org/alfresco/service/cmr/transfer/TransferEventCancelled.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2005-2010 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.service.cmr.transfer; + +import org.alfresco.repo.transfer.TransferEndEventImpl; + +/** + * The cancelled event indicates a transfer was aborted + */ +public class TransferEventCancelled extends TransferEndEventImpl +{ + public String toString() + { + return "TransferEventCancelled"; + } +} diff --git a/source/java/org/alfresco/service/cmr/transfer/TransferEventError.java b/source/java/org/alfresco/service/cmr/transfer/TransferEventError.java index 476027d121..b1cfcdb164 100644 --- a/source/java/org/alfresco/service/cmr/transfer/TransferEventError.java +++ b/source/java/org/alfresco/service/cmr/transfer/TransferEventError.java @@ -18,12 +18,12 @@ */ package org.alfresco.service.cmr.transfer; -import org.alfresco.repo.transfer.TransferEventImpl; +import org.alfresco.repo.transfer.TransferEndEventImpl; /** * Indicates the reason why a transfer failed */ -public class TransferEventError extends TransferEventImpl implements TransferEvent +public class TransferEventError extends TransferEndEventImpl { private Exception exception; diff --git a/source/java/org/alfresco/service/cmr/transfer/TransferEventReport.java b/source/java/org/alfresco/service/cmr/transfer/TransferEventReport.java index b074fea3e8..71c8bbac2c 100644 --- a/source/java/org/alfresco/service/cmr/transfer/TransferEventReport.java +++ b/source/java/org/alfresco/service/cmr/transfer/TransferEventReport.java @@ -1,7 +1,5 @@ package org.alfresco.service.cmr.transfer; -import java.util.Date; - import org.alfresco.repo.transfer.TransferEventImpl; import org.alfresco.service.cmr.repository.NodeRef; diff --git a/source/java/org/alfresco/service/cmr/transfer/TransferEventSuccess.java b/source/java/org/alfresco/service/cmr/transfer/TransferEventSuccess.java index e8ea1079d7..2c62dd593b 100644 --- a/source/java/org/alfresco/service/cmr/transfer/TransferEventSuccess.java +++ b/source/java/org/alfresco/service/cmr/transfer/TransferEventSuccess.java @@ -18,12 +18,12 @@ */ package org.alfresco.service.cmr.transfer; -import org.alfresco.repo.transfer.TransferEventImpl; +import org.alfresco.repo.transfer.TransferEndEventImpl; /** - * The success event indicates a successfull transfer + * The success event indicates a successful transfer */ -public class TransferEventSuccess extends TransferEventImpl implements TransferEvent +public class TransferEventSuccess extends TransferEndEventImpl { public String toString() { diff --git a/source/java/org/alfresco/service/cmr/transfer/TransferException.java b/source/java/org/alfresco/service/cmr/transfer/TransferException.java index 000b82eb01..9b9b1297f2 100644 --- a/source/java/org/alfresco/service/cmr/transfer/TransferException.java +++ b/source/java/org/alfresco/service/cmr/transfer/TransferException.java @@ -27,7 +27,7 @@ import org.alfresco.error.AlfrescoRuntimeException; */ public class TransferException extends AlfrescoRuntimeException { - /** + /** * Serial version UID */ private static final long serialVersionUID = 3257571685241467958L; diff --git a/source/java/org/alfresco/service/cmr/transfer/TransferFailureException.java b/source/java/org/alfresco/service/cmr/transfer/TransferFailureException.java new file mode 100644 index 0000000000..26b071d835 --- /dev/null +++ b/source/java/org/alfresco/service/cmr/transfer/TransferFailureException.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2005-2010 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.service.cmr.transfer; + + +/** + * Transfer failure exception + * + * @author davidc + */ +public class TransferFailureException extends TransferException +{ + private static final long serialVersionUID = 9009938314128119981L; + + private TransferEventError event; + + public TransferFailureException(TransferEventError event) + { + super(event.getMessage(), event.getException()); + this.event = event; + } + + /** + * Gets the end event (representing the failure) + * + * @return end event + */ + public TransferEventError getErrorEvent() + { + return event; + } +} diff --git a/source/java/org/alfresco/service/cmr/transfer/TransferService.java b/source/java/org/alfresco/service/cmr/transfer/TransferService.java index d5f9f43aca..46e0e49fb4 100644 --- a/source/java/org/alfresco/service/cmr/transfer/TransferService.java +++ b/source/java/org/alfresco/service/cmr/transfer/TransferService.java @@ -28,12 +28,15 @@ import org.alfresco.service.Auditable; import org.alfresco.service.NotAuditable; /** - * The transfer service is responsible for transfering nodes between one instance of Alfresco and another remote instance. - * as well as the transfer method, this interface also provides methods for managing the + * The transfer service is responsible for transferring nodes between one instance of Alfresco and another remote instance. + * as well as the transfer method, this interface also provides methods for managing transfer targets. + * + * @see TransferService2 * * @author Mark Rogers */ @PublicService +@Deprecated public interface TransferService { diff --git a/source/java/org/alfresco/service/cmr/transfer/TransferService2.java b/source/java/org/alfresco/service/cmr/transfer/TransferService2.java new file mode 100644 index 0000000000..c1f5834aa7 --- /dev/null +++ b/source/java/org/alfresco/service/cmr/transfer/TransferService2.java @@ -0,0 +1,235 @@ +/* + * Copyright (C) 2009-2010 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.service.cmr.transfer; + +import java.util.Collection; +import java.util.Set; + +import org.alfresco.service.Auditable; +import org.alfresco.service.NotAuditable; +import org.alfresco.service.PublicService; + +/** + * The transfer service is responsible for transferring nodes between one instance of Alfresco and another remote instance. + * as well as the transfer method, this interface also provides methods for managing transfer targets. + * + * @author davidc + */ +@PublicService +public interface TransferService2 +{ + + /** + * Transfer nodes sync, with callback. This synchronous version of the transfer method waits for the transfer to complete + * before returning to the caller. Callbacks are called in the current thread context, so will be associated with the current + * transaction and user. + * + * @param targetName the name of the target to transfer to + * @param definition - the definition of the transfer. Specifies which nodes to transfer. + * The following properties must be set, nodes + * @param callback - a set of callback handlers that will be called as transfer proceeds. May be null. + * @throws TransferException + * @return transfer end event (in case of success or cancellation) + */ + @Auditable(parameters={"targetName"}) + public TransferEndEvent transfer(String targetName, TransferDefinition definition, Collection callback) throws TransferFailureException; + + /** + * Transfer nodes sync, with callback. This synchronous version of the transfer method waits for the transfer to complete + * before returning to the caller. Callbacks are called in the current thread context, so will be associated with the current + * transaction and user. + * + * @param targetName the name of the target to transfer to + * @param definition - the definition of the transfer. Specifies which nodes to transfer. + * The following properties must be set, nodes + * @param callbacks - a list of callback handlers that will be called as transfer proceeds. May be null. + * @throws TransferException + * @return transfer end event (in case of success or cancellation) + */ + @Auditable(parameters={"targetName"}) + public TransferEndEvent transfer(String targetName, TransferDefinition definition, TransferCallback... callbacks) throws TransferFailureException; + + /** + * Transfer nodes async with callback. The asynchronous version of the transfer method starts a transfer and returns as + * soon as possible. + * + * The transfer callbacks will be called by a different thread to that used to call the transferAsync method so transaction + * context will be different to the calling context. The asychronous transfer does not have access to uncommitted + * data in the calling transaction. + * + * @param targetName the name of the target to transfer to + * @param definition - the definition of the transfer. Specifies which nodes to transfer. + * The following properties must be set, nodes + * @param callback - a collection of callback handlers that will be called as transfer proceeds. May be null. + * + * @throws TransferException + */ + @Auditable(parameters={"targetName"}) + public void transferAsync(String targetName, TransferDefinition definition, Collection callback) throws TransferException; + + /** + * Transfer nodes async with callback. The asynchronous version of the transfer method starts a transfer and returns as + * soon as possible. + * + * The transfer callbacks will be called by a different thread to that used to call the transferAsync method so transaction + * context will be different to the calling context. The asychronous transfer does not have access to uncommitted + * data in the calling transaction. + * + * @param targetName the name of the target to transfer to + * @param definition - the definition of the transfer. Specifies which nodes to transfer. + * The following properties must be set, nodes + * @param callbacks - a collection of callback handlers that will be called as transfer proceeds. May be null. + * + * @throws TransferException + */ + @Auditable(parameters={"targetName"}) + public void transferAsync(String targetName, TransferDefinition definition, TransferCallback... callbacks) throws TransferException; + + /** + * Verify a target is available and that the configured credentials are valid. + * @throws TransferException + */ + @NotAuditable + public void verify(TransferTarget target) throws TransferException; + + /** + * Create and save a new transfer target. Creates and saves a new transfer target with a single, but long, method call. + * + * @param name, the name of this transfer target, which must be unique + * @param title, the display name of this transfer target + * @param description, + * @param endpointProtocol, either http or https + * @param endpointHost, + * @param endpointPort, + * @param endpointPath, + * @param username, + * @param password, + * @return the newly created transfer target. + */ + @Auditable + public TransferTarget createAndSaveTransferTarget(String name, String title, String description, String endpointProtocol, + String endpointHost, int endpointPort, String endpointPath, String username, char[] password) throws TransferException; + + /** + * Creates an in memory transfer target. Before it is used it must be populated with the following values and + * saved with the saveTransferTarget method. The name of the transfer target must be unique. + *

    + *
  • title
  • + *
  • description
  • + *
  • endpointProtocol
  • + *
  • endpointHost
  • + *
  • endpointPort
  • + *
  • endpointPath
  • + *
  • username
  • + *
  • password
  • + *
+ * @return an in memory transfer target + */ + @Auditable(parameters={"name"}) + public TransferTarget createTransferTarget(String name); + + /** + * Get all the transfer targets + */ + @NotAuditable + public SetgetTransferTargets() throws TransferException; + + /** + * Get All the transfer targets for a particular transfer target group. + * @param groupName, the name of the transfer group + */ + @NotAuditable + public SetgetTransferTargets(String groupName) throws TransferException; + + /** + * Get a transfer target by its name + * @throws TransferException - target does not exist + */ + @NotAuditable + public TransferTarget getTransferTarget(String name) throws TransferException; + + /** + * Test to see if the target with the specified name exists + * @param name + * @return true if the specified target exists, and false otherwise + */ + @NotAuditable + public boolean targetExists(String name); + + /** + * Delete a transfer target. After calling this method the transfer target will no longer exist. + * @throws TransferException - target does not exist + * @param name, the name of this transfer target, + */ + @Auditable(parameters={"name"}) + public void deleteTransferTarget(String name) throws TransferException; + + /** + * Save TransferTarget, will create a transfer target if it does not already exist or update an existing transfer target. + * + * The following properties may be updated: + * endpointHost, + * endpointPort, + * endpointProtocol, + * endpointPath, + * username, + * password, + * title, + * description + * + * The following properties may not be updated: + * name, must be specified. + * nodeRef, if specified will be ignored. + * + * @param update + */ + @Auditable + public TransferTarget saveTransferTarget(TransferTarget update) throws TransferException; + + /** + * Enables/Disables the named transfer target + * @param name the name of the transfer target + * @param enable (or false=disable) + */ + @Auditable(parameters={"name", "enable"}) + public void enableTransferTarget(String name, boolean enable) throws TransferException; + + /** + * Asynchronously cancel an in-progress transfer + * + * This method tells an in-process transfer to give up, rollback and stop as soon as possible. + * + * Depending upon the state of the in-progress transfer, the transfer may still complete, + * despite calling this method, however in most cases the transfer will not complete. + * + * Calling this method for a transfer that does not exist, possibly because it has already finished, has no + * effect and will not throw an exception. + * + * The transfer identifier can be obtained from the TransferEventBegin event that is passed to registered callbacks when + * transfer starts. + * + * @param transferId the unique identifier of the transfer to cancel. + * + * @see TransferEventBegin; + */ + @Auditable(parameters={"transferId"}) + public void cancelAsync(String transferId); + +}