diff --git a/config/alfresco/messages/transfer-service.properties b/config/alfresco/messages/transfer-service.properties
index 0f05d3408b..2e750f539b 100644
--- a/config/alfresco/messages/transfer-service.properties
+++ b/config/alfresco/messages/transfer-service.properties
@@ -32,6 +32,10 @@ transfer_service.receiver.transfer_not_found=Failed to find any record of reques
transfer_service.receiver.transfer_cancelled=Transfer has been cancelled: {0}
transfer_service.no_encoding=Unable to deserialize value, no transformation for encoding {0}
transfer_service.unable_to_deserialise=Unable to deserialize value
+transfer_service.receiver.lock_timed_out=Transfer lock timed out transferId: {0}
+transfer_service.receiver.lock_not_found=Transfer lock not found
+transfer_service.receiver.error_start=Unable to start a transfer
+transfer_service.receiver.error_generating_requisite="Unable to generate transfer requisite
transfer_service.missing_endpoint_path=An endpoint path has not been specified for transfer target: {0}
transfer_service.missing_endpoint_protocol=An endpoint protocol has not been specified for transfer target: {0}
diff --git a/config/alfresco/repository.properties b/config/alfresco/repository.properties
index 756ff58662..7560aa2432 100644
--- a/config/alfresco/repository.properties
+++ b/config/alfresco/repository.properties
@@ -500,7 +500,6 @@ subsystems.test.simpleProp3=Global Default3
deployment.service.numberOfSendingThreads=5
deployment.service.corePoolSize=2
deployment.service.maximumPoolSize=3
-
# How long to wait in mS before refreshing a target lock - detects shutdown servers
deployment.service.targetLockRefreshTime=60000
# How long to wait in mS from the last communication before deciding that deployment has failed, possibly
@@ -510,3 +509,18 @@ deployment.service.targetLockTimeout=3600000
# Transfer Service
transferservice.receiver.enabled=true
transferservice.receiver.stagingDir=${java.io.tmpdir}/alfresco-transfer-staging
+#
+# How long to wait in mS before refreshing a transfer lock - detects shutdown servers
+# Default 1 minute.
+transferservice.receiver.lockRefreshTime=60000
+#
+# How many times to attempt retry the transfer lock
+transferservice.receiver.lockRetryCount=3
+# How long to wait, in mS, before retrying the transfer lock
+transferservice.receiver.lockRetryWait=100
+#
+# How long to wait, in mS, since the last contact with from the client before
+# timing out a transfer. Needs to be long enough to cope with network delays and "thinking
+# time" for both source and destination. Default 5 minutes.
+transferservice.receiver.lockTimeOut=300000
+
diff --git a/config/alfresco/transfer-service-context.xml b/config/alfresco/transfer-service-context.xml
index 496e687a4a..e4faa51677 100644
--- a/config/alfresco/transfer-service-context.xml
+++ b/config/alfresco/transfer-service-context.xml
@@ -79,6 +79,7 @@
+ /${spaces.company_home.childname}/${spaces.dictionary.childname}/${spaces.transfers.childname}
@@ -92,6 +93,18 @@
${transferservice.receiver.stagingDir}
+
+ ${transferservice.receiver.lockRefreshTime}
+
+
+ ${transferservice.receiver.lockRetryCount}
+
+
+ ${transferservice.receiver.lockRetryWait}
+
+
+ ${transferservice.receiver.lockTimeOut}
+
diff --git a/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java b/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java
index c5302e182f..7c478c45f0 100644
--- a/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java
+++ b/source/java/org/alfresco/repo/replication/ReplicationActionExecutor.java
@@ -75,9 +75,10 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
private ReplicationParams replicationParams;
/**
- * By default, we lock for 30 minutes
+ * By default, we lock for a minute, so if this server is shutdown another can take over a
+ * minute later.
*/
- private long replicationActionLockDuration = 30*60*1000;
+ private long replicationActionLockDuration = 60*1000;
/**
* Injects the NodeService bean.
@@ -260,9 +261,11 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
// Turn our payload list of root nodes into something that
// the transfer service can work with
Set toTransfer;
- try {
+ try
+ {
toTransfer = expandPayload(replicationDef);
- } catch(Exception e) {
+ }
+ catch(Exception e) {
lock.close();
throw new ReplicationServiceException("Error processing payload list - " + e.getMessage(), e);
}
@@ -346,17 +349,21 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
* A {@link TransferCallback} which periodically renews the
* lock held against a {@link ReplicationDefinition}
*/
- protected class ReplicationDefinitionLockExtender implements TransferCallback
+ protected class ReplicationDefinitionLockExtender
+ implements TransferCallback, JobLockService.JobLockRefreshCallback
+
{
private ReplicationDefinition replicationDef;
private String transferId;
private String lockToken;
+ private boolean active;
protected ReplicationDefinitionLockExtender(ReplicationDefinition replicationDef)
{
this.replicationDef = replicationDef;
acquireLock();
}
+
/**
* No matter what the event is, refresh
* our lock on the {@link ReplicationDefinition}, and
@@ -364,35 +371,19 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
*/
public void processEvent(TransferEvent event)
{
- // Extend our lock
- refreshLock();
-
- // If it's the enter event, do skip
- if(event instanceof TransferEventEnterState)
- {
- return;
- }
+ // If it's the enter event, do skip
+ if(event instanceof TransferEventEnterState)
+ {
+ return;
+ }
- // If this is a begin event, make a note of the ID
- if(event instanceof TransferEventBegin)
- {
- transferId = ((TransferEventBegin)event).getTransferId();
- }
-
- // Has someone tried to cancel us?
- if(actionTrackingService.isCancellationRequested(replicationDef))
- {
- // Tell the transfer service to cancel, if we can
- if(transferId != null)
- {
- transferService.cancelAsync(transferId);
- logger.debug("Replication cancel was requested for " + replicationDef.getReplicationQName());
- }
- else
- {
- logger.warn("Unable to cancel replication as requested, as transfer has yet to reach a cancellable state");
- }
- }
+ // If this is a begin event, make a note of the ID
+ if(event instanceof TransferEventBegin)
+ {
+ transferId = ((TransferEventBegin)event).getTransferId();
+ }
+
+ checkCancel();
}
/**
@@ -401,21 +392,20 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
*/
public void close()
{
- releaseLock();
+ releaseLock();
}
/**
* Get a lock on the job.
* Tries every 5 seconds for 30 seconds, then
- * every 30 seconds until 3 times the lock
- * duration.
+ * every 30 seconds for half an hour.
+ *
+ * @throws LockAcquisitionException
*/
private void acquireLock()
{
- long retryTime = 30*1000;
- int retries = (int)(replicationActionLockDuration * 3 / retryTime);
-
- try {
+ try
+ {
// Quick try
lockToken = jobLockService.getLock(
replicationDef.getReplicationQName(),
@@ -423,13 +413,38 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
5 * 1000, // Every 5 seconds
6 // 6 times = wait up to 30 seconds
);
- } catch(LockAcquisitionException e) {
+
+ active = true;
+
+ /**
+ * Got the lock - now register the refresh callback which will keep the
+ * lock alive
+ */
+ jobLockService.refreshLock(
+ lockToken,
+ replicationDef.getReplicationQName(),
+ replicationActionLockDuration,
+ this
+ );
+
+ if(logger.isDebugEnabled())
+ {
+ logger.debug("lock aquired:" + replicationDef.getReplicationQName() );
+ }
+ }
+ catch(LockAcquisitionException e)
+ {
+ long retryTime = 30*1000;
+ int retries = (int)(60);
+
logger.debug(
"Unable to get the replication job lock on " +
replicationDef.getReplicationQName() +
", retrying every " + (int)(retryTime/1000) + " seconds"
);
+ active = true;
+
// Long try - every 30 seconds
lockToken = jobLockService.getLock(
replicationDef.getReplicationQName(),
@@ -437,22 +452,80 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
retryTime,
retries
);
+
+ /**
+ * Got the lock - now register the refresh callback which will keep the
+ * lock alive
+ */
+ jobLockService.refreshLock(
+ lockToken,
+ replicationDef.getReplicationQName(),
+ replicationActionLockDuration,
+ this
+ );
+
+ if(logger.isDebugEnabled())
+ {
+ logger.debug("lock aquired (from long timeout):" + replicationDef.getReplicationQName() );
+ }
}
}
- private void refreshLock()
- {
- jobLockService.refreshLock(
- lockToken,
- replicationDef.getReplicationQName(),
- replicationActionLockDuration
- );
- }
+
private void releaseLock()
{
- jobLockService.releaseLock(
- lockToken,
- replicationDef.getReplicationQName()
- );
+ if(active)
+ {
+ if(logger.isDebugEnabled())
+ {
+ logger.debug("about to release lock:" + replicationDef.getReplicationQName());
+ }
+ jobLockService.releaseLock(
+ lockToken,
+ replicationDef.getReplicationQName());
+ active=false;
+ }
+ }
+
+ private void checkCancel()
+ {
+ // Has someone tried to cancel us?
+ if(actionTrackingService.isCancellationRequested(replicationDef))
+ {
+ // Tell the transfer service to cancel, if we can
+ if(transferId != null)
+ {
+ transferService.cancelAsync(transferId);
+ logger.debug("Replication cancel was requested for " + replicationDef.getReplicationQName());
+ }
+ else
+ {
+ logger.warn("Unable to cancel replication as requested, as transfer has yet to reach a cancellable state");
+ }
+ }
+ }
+
+ /**
+ * Job Lock Refresh
+ * @return
+ */
+ @Override
+ public boolean isActive()
+ {
+ if(logger.isDebugEnabled())
+ {
+ logger.debug("lock callback isActive:" + active + ", " + replicationDef.getReplicationQName());
+ }
+ return active;
+ }
+
+ /**
+ * Job Lock Service has released us.
+ */
+ @Override
+ public void lockReleased()
+ {
+ logger.debug("lock released:" + replicationDef.getReplicationQName());
+ // nothing to do
}
}
}
diff --git a/source/java/org/alfresco/repo/transfer/RepoTransferReceiverImpl.java b/source/java/org/alfresco/repo/transfer/RepoTransferReceiverImpl.java
index dc8e3e4262..c90ec6cd19 100644
--- a/source/java/org/alfresco/repo/transfer/RepoTransferReceiverImpl.java
+++ b/source/java/org/alfresco/repo/transfer/RepoTransferReceiverImpl.java
@@ -45,6 +45,8 @@ import org.alfresco.repo.copy.CopyBehaviourCallback;
import org.alfresco.repo.copy.CopyDetails;
import org.alfresco.repo.copy.CopyServicePolicies;
import org.alfresco.repo.copy.DefaultCopyBehaviourCallback;
+import org.alfresco.repo.lock.JobLockService;
+import org.alfresco.repo.lock.LockAcquisitionException;
import org.alfresco.repo.node.NodeServicePolicies;
import org.alfresco.repo.policy.BehaviourFilter;
import org.alfresco.repo.policy.ClassPolicyDelegate;
@@ -64,7 +66,7 @@ import org.alfresco.repo.transfer.requisite.XMLTransferRequsiteWriter;
import org.alfresco.service.cmr.action.Action;
import org.alfresco.service.cmr.action.ActionService;
import org.alfresco.service.cmr.repository.ChildAssociationRef;
-import org.alfresco.service.cmr.repository.DuplicateChildNodeNameException;
+import org.alfresco.service.cmr.repository.ContentWriter;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.repository.StoreRef;
@@ -90,8 +92,17 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.util.FileCopyUtils;
/**
+ * The Repo Transfer Receiver is the "Back-End" for transfer subsystem.
+ *
+ * Provides the implementation of the transfer commands on the destination repository.
+ *
+ * Provides callback handlers for Aliens and Transferred Aspects.
+ *
+ * Calls transfer policies.
+ *
+ * Co-ordinates locking and logging as the transfer progresses.
+ *
* @author brian
- *
*/
public class RepoTransferReceiverImpl implements TransferReceiver,
NodeServicePolicies.OnCreateChildAssociationPolicy,
@@ -99,7 +110,6 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
NodeServicePolicies.OnRestoreNodePolicy,
NodeServicePolicies.OnMoveNodePolicy,
ContentServicePolicies.OnContentUpdatePolicy
-
{
/**
* This embedded class is used to push requests for asynchronous commits onto a different thread
@@ -144,20 +154,20 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
private final static Log log = LogFactory.getLog(RepoTransferReceiverImpl.class);
private static final String MSG_FAILED_TO_CREATE_STAGING_FOLDER = "transfer_service.receiver.failed_to_create_staging_folder";
- private static final String MSG_TRANSFER_LOCK_FOLDER_NOT_FOUND = "transfer_service.receiver.lock_folder_not_found";
+ private static final String MSG_ERROR_WHILE_STARTING = "transfer_service.receiver.error_start";
private static final String MSG_TRANSFER_TEMP_FOLDER_NOT_FOUND = "transfer_service.receiver.temp_folder_not_found";
private static final String MSG_TRANSFER_LOCK_UNAVAILABLE = "transfer_service.receiver.lock_unavailable";
private static final String MSG_INBOUND_TRANSFER_FOLDER_NOT_FOUND = "transfer_service.receiver.record_folder_not_found";
- private static final String MSG_NOT_LOCK_OWNER = "transfer_service.receiver.not_lock_owner";
+
private static final String MSG_ERROR_WHILE_ENDING_TRANSFER = "transfer_service.receiver.error_ending_transfer";
private static final String MSG_ERROR_WHILE_STAGING_SNAPSHOT = "transfer_service.receiver.error_staging_snapshot";
private static final String MSG_ERROR_WHILE_STAGING_CONTENT = "transfer_service.receiver.error_staging_content";
private static final String MSG_NO_SNAPSHOT_RECEIVED = "transfer_service.receiver.no_snapshot_received";
private static final String MSG_ERROR_WHILE_COMMITTING_TRANSFER = "transfer_service.receiver.error_committing_transfer";
- private static final String MSG_ERROR_WHILE_GENERATING_REQUISITE = "transfer_service.receiver.error_generating_requsite";
+ private static final String MSG_ERROR_WHILE_GENERATING_REQUISITE = "transfer_service.receiver.error_generating_requisite";
+ private static final String MSG_LOCK_TIMED_OUT = "transfer_service.receiver.lock_timed_out";
+ private static final String MSG_LOCK_NOT_FOUND = "transfer_service.receiver.lock_not_found";
- private static final String LOCK_FILE_NAME = ".lock";
- private static final QName LOCK_QNAME = QName.createQName(NamespaceService.APP_MODEL_1_0_URI, LOCK_FILE_NAME);
private static final String SNAPSHOT_FILE_NAME = "snapshot.xml";
private NodeService nodeService;
@@ -176,17 +186,50 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
private PolicyComponent policyComponent;
private DescriptorService descriptorService;
private AlienProcessor alienProcessor;
+ private JobLockService jobLockService;
- //private String localRepositoryId = descriptorService.getCurrentRepositoryDescriptor().getId();
-
- private Map transferLockFolderMap = new ConcurrentHashMap();
+ /**
+ * Where the temporary files are stored. Tenant Domain Name, NodeRef
+ */
private Map transferTempFolderMap = new ConcurrentHashMap();
+
+ /**
+ * Where the destination side transfer report is generated. Tenant Domain Name, NodeRef
+ */
private Map inboundTransferRecordsFolderMap = new ConcurrentHashMap();
private ClassPolicyDelegate beforeStartInboundTransferDelegate;
private ClassPolicyDelegate onStartInboundTransferDelegate;
private ClassPolicyDelegate onEndInboundTransferDelegate;
+ /**
+ * Locks for the transfers in progress
+ *
+ * TransferId, Lock
+ */
+ private Map locks = new ConcurrentHashMap();
+
+ /**
+ * How many mS before refreshing the lock?
+ */
+ private long lockRefreshTime = 60000;
+
+ /**
+ * How many times to retry to obtain the lock
+ */
+ private int lockRetryCount = 2;
+
+ /**
+ * How long to wait between retries
+ */
+ private long lockRetryWait = 100;
+
+ /**
+ * How long in mS to keep the lock before giving up and ending the transfer,
+ * possibly the client has terminated?
+ */
+ private long lockTimeOut = 3600000;
+
public void init()
{
PropertyCheck.mandatory(this, "nodeService", nodeService);
@@ -202,6 +245,7 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
PropertyCheck.mandatory(this, "policyComponent", policyComponent);
PropertyCheck.mandatory(this, "descriptorService", descriptorService);
PropertyCheck.mandatory(this, "alienProcessor", alienProcessor);
+ PropertyCheck.mandatory(this, "jobLockService", getJobLockService());
beforeStartInboundTransferDelegate = policyComponent.registerClassPolicy(TransferServicePolicies.BeforeStartInboundTransferPolicy.class);
onStartInboundTransferDelegate = policyComponent.registerClassPolicy(TransferServicePolicies.OnStartInboundTransferPolicy.class);
@@ -301,31 +345,6 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
}
- private NodeRef getLockFolder()
- {
- String tenantDomain = tenantService.getUserDomain(AuthenticationUtil.getRunAsUser());
- NodeRef transferLockFolder = transferLockFolderMap.get(tenantDomain);
-
- // Have we already resolved the node that is the parent of the lock node?
- // If not then do so.
- if (transferLockFolder == null)
- {
- ResultSet rs = searchService.query(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, SearchService.LANGUAGE_XPATH,
- transferLockFolderPath);
- if (rs.length() > 0)
- {
- transferLockFolder = rs.getNodeRef(0);
- transferLockFolderMap.put(tenantDomain, transferLockFolder);
- }
- else
- {
- throw new TransferException(MSG_TRANSFER_LOCK_FOLDER_NOT_FOUND, new Object[] { transferLockFolderPath });
- }
- }
- return transferLockFolder;
-
- }
-
public NodeRef getTempFolder(String transferId)
{
String tenantDomain = tenantService.getUserDomain(AuthenticationUtil.getRunAsUser());
@@ -381,16 +400,36 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
*/
public String start()
{
- final NodeRef lockFolder = getLockFolder();
- String transferId = null;
-
- RetryingTransactionHelper txHelper = transactionService.getRetryingTransactionHelper();
+ log.debug("Start transfer");
+
+ /**
+ * First get the transfer lock for this domain
+ */
+ String tenantDomain = tenantService.getUserDomain(AuthenticationUtil.getRunAsUser());
+ String lockStr = tenantDomain.isEmpty() ? "transfer.server.default" : "transfer.server.tenant." + tenantDomain;
+ QName lockQName = QName.createQName(TransferModel.TRANSFER_MODEL_1_0_URI, lockStr);
+ Lock lock = new Lock(lockQName);
+
try
{
- transferId = txHelper.doInTransaction(
+ lock.makeLock();
+
+ /**
+ * Transfer Lock held if we get this far
+ */
+ String transferId = null;
+
+ try
+ {
+ /**
+ * Now create a transfer record and use its NodeRef as the transfer id
+ */
+ RetryingTransactionHelper txHelper = transactionService.getRetryingTransactionHelper();
+
+ transferId = txHelper.doInTransaction(
new RetryingTransactionHelper.RetryingTransactionCallback()
- {
- public String execute() throws Throwable
+ {
+ public String execute() throws Throwable
{
TransferServicePolicies.BeforeStartInboundTransferPolicy beforeStartPolicy =
beforeStartInboundTransferDelegate.get(TransferModel.TYPE_TRANSFER_RECORD);
@@ -399,24 +438,7 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
final NodeRef relatedTransferRecord = createTransferRecord();
String transferId = relatedTransferRecord.toString();
getTempFolder(transferId);
-
- Map props = new HashMap();
- props.put(ContentModel.PROP_NAME, LOCK_FILE_NAME);
- props.put(TransferModel.PROP_TRANSFER_ID, transferId);
-
- if (log.isInfoEnabled())
- {
- log.info("Creating transfer lock associated with this transfer record: "
- + relatedTransferRecord);
- }
-
- ChildAssociationRef assoc = nodeService.createNode(lockFolder, ContentModel.ASSOC_CONTAINS,
- LOCK_QNAME, TransferModel.TYPE_TRANSFER_LOCK, props);
-
- if (log.isInfoEnabled())
- {
- log.info("Transfer lock created as node " + assoc.getChildRef());
- }
+ getStagingFolder(transferId);
TransferServicePolicies.OnStartInboundTransferPolicy onStartPolicy =
onStartInboundTransferDelegate.get(TransferModel.TYPE_TRANSFER_RECORD);
@@ -425,15 +447,31 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
return transferId;
}
}, false, true);
+ }
+ catch (Exception e)
+ {
+ log.debug("Exception while staring transfer", e);
+ log.debug("releasing lock - we never created the transfer id");
+ lock.releaseLock();
+ throw new TransferException(MSG_ERROR_WHILE_STARTING, e);
+ }
+
+ /**
+ * Here if we have begun a transfer and have a valid transfer id
+ */
+ lock.transferId = transferId;
+ locks.put(transferId, lock);
+ log.info("transfer started:" + transferId);
+ lock.enableLockTimeout();
+ return transferId;
+
}
- catch (DuplicateChildNodeNameException ex)
+ catch (LockAcquisitionException lae)
{
- log.debug("lock is already taken");
+ log.debug("transfer lock is already taken", lae);
// lock is already taken.
throw new TransferException(MSG_TRANSFER_LOCK_UNAVAILABLE);
}
- getStagingFolder(transferId);
- return transferId;
}
/**
@@ -481,6 +519,61 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
return assoc.getChildRef();
}
+
+ /**
+ * Timeout a transfer. Called after the lock has been released via a timeout.
+ *
+ * This is the last chance to clean up.
+ *
+ * @param transferId
+ */
+ private void timeout(final String transferId)
+ {
+ log.info("Inbound Transfer has timed out transferId:" + transferId);
+ /*
+ * There is no transaction or authentication context in this method since it is called via a
+ * timer thread.
+ */
+ final RetryingTransactionCallback timeoutCB = new RetryingTransactionCallback() {
+
+ @Override
+ public Void execute() throws Throwable
+ {
+ TransferProgress progress = getProgressMonitor().getProgress(transferId);
+
+ if (progress.getStatus().equals(TransferProgress.Status.PRE_COMMIT))
+ {
+ log.warn("Inbound Transfer Lock Timeout - transferId:" + transferId);
+ /**
+ * Did not get out of PRE_COMMIT. The client has probably "gone away" after calling
+ * "start", but before calling commit, cancel or error.
+ */
+ locks.remove(transferId);
+ removeTempFolders(transferId);
+ Object[] msgParams = { transferId };
+ getProgressMonitor().logException(transferId, "transfer timeout", new TransferException(MSG_LOCK_TIMED_OUT, msgParams));
+ getProgressMonitor().updateStatus(transferId, TransferProgress.Status.ERROR);
+ }
+ else
+ {
+ // We got beyond PRE_COMMIT, therefore leave the clean up to either
+ // commit, cancel or error command, since there may still be "in-flight"
+ // transfer in another thread. Although why, in that case, are we here?
+ log.warn("Inbound Transfer Lock Timeout - already past PRE-COMMIT - do no cleanup transferId:" + transferId);
+ }
+ return null;
+ }
+ };
+
+ AuthenticationUtil.runAs(new AuthenticationUtil.RunAsWork()
+ {
+ public String doWork() throws Exception
+ {
+ transactionService.getRetryingTransactionHelper().doInTransaction(timeoutCB, false, true);
+ return null;
+ }
+ }, AuthenticationUtil.getSystemUserName());
+ }
/*
* (non-Javadoc)
@@ -500,57 +593,16 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
try
{
- // We remove the lock node in a separate transaction, since it was created in a separate transaction
- transactionService.getRetryingTransactionHelper().doInTransaction(
- new RetryingTransactionHelper.RetryingTransactionCallback()
- {
- public NodeRef execute() throws Throwable
- {
- // Find the lock node
- NodeRef lockId = getLockNode();
- if (lockId != null)
- {
- if (!testLockedTransfer(lockId, transferId))
- {
- throw new TransferException(MSG_NOT_LOCK_OWNER, new Object[] { transferId });
- }
- // Delete the lock node.
- log.debug("Deleting lock node :" + lockId);
- nodeService.deleteNode(lockId);
- log.debug("Lock deleted :" + lockId);
- }
- return null;
- }
- }, false, true);
-
- NodeRef tempStoreNode = null;
- try
+ Lock lock = locks.get(transferId);
+ if(lock != null)
{
- log.debug("Deleting temporary store node...");
- tempStoreNode = getTempFolder(transferId);
- nodeService.deleteNode(tempStoreNode);
- log.debug("Deleted temporary store node.");
- }
- catch (Exception ex)
- {
- log.warn("Failed to delete temp store node for transfer id " + transferId +
- "\nTemp store noderef = " + tempStoreNode);
+ log.debug("releasing lock:" + lock.lockToken);
+ lock.releaseLock();
+ locks.remove(lock);
}
- File stagingFolder = null;
- try
- {
- log.debug("delete staging folder " + transferId);
- // Delete the staging folder.
- stagingFolder = getStagingFolder(transferId);
- deleteFile(stagingFolder);
- log.debug("Staging folder deleted");
- }
- catch(Exception ex)
- {
- log.warn("Failed to delete staging folder for transfer id " + transferId +
- "\nStaging folder = " + stagingFolder.toString());
- }
+ removeTempFolders(transferId);
+
//Fire the OnEndInboundTransfer policy
Set createdNodes = Collections.emptySet();
@@ -577,8 +629,42 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
}
}
+ private void removeTempFolders(final String transferId)
+ {
+ NodeRef tempStoreNode = null;
+ try
+ {
+ log.debug("Deleting temporary store node...");
+ tempStoreNode = getTempFolder(transferId);
+ nodeService.deleteNode(tempStoreNode);
+ log.debug("Deleted temporary store node.");
+ }
+ catch (Exception ex)
+ {
+ log.warn("Failed to delete temp store node for transfer id " + transferId +
+ "\nTemp store noderef = " + tempStoreNode);
+ }
+
+ File stagingFolder = null;
+ try
+ {
+ log.debug("delete staging folder " + transferId);
+ // Delete the staging folder.
+ stagingFolder = getStagingFolder(transferId);
+ deleteFile(stagingFolder);
+ log.debug("Staging folder deleted");
+ }
+ catch(Exception ex)
+ {
+ log.warn("Failed to delete staging folder for transfer id " + transferId +
+ "\nStaging folder = " + stagingFolder.toString());
+ }
+ }
+
+
public void cancel(String transferId) throws TransferException
{
+ // no need to check the lock
TransferProgress progress = getProgressMonitor().getProgress(transferId);
getProgressMonitor().updateStatus(transferId, TransferProgress.Status.CANCELLED);
if (progress.getStatus().equals(TransferProgress.Status.PRE_COMMIT))
@@ -589,6 +675,17 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
public void prepare(String transferId) throws TransferException
{
+ // Check that this transfer still owns the lock
+ Lock lock = checkLock(transferId);
+ try
+ {
+
+ }
+ finally
+ {
+ lock.enableLockTimeout();
+ }
+
}
/**
@@ -609,58 +706,41 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
}
file.delete();
}
-
- private NodeRef getLockNode()
- {
- final NodeRef lockFolder = getLockFolder();
- List assocs = nodeService.getChildAssocs(lockFolder, ContentModel.ASSOC_CONTAINS,
- LOCK_QNAME);
- NodeRef lockId = assocs.size() == 0 ? null : assocs.get(0).getChildRef();
- return lockId;
- }
-
- private boolean testLockedTransfer(NodeRef lockId, String transferId)
- {
- if (lockId == null)
- {
- throw new IllegalArgumentException("lockId = null");
- }
- if (transferId == null)
- {
- throw new IllegalArgumentException("transferId = null");
- }
- String currentTransferId = (String) nodeService.getProperty(lockId, TransferModel.PROP_TRANSFER_ID);
- // Check that the lock is held for the specified transfer (error if not)
- return (transferId.equals(currentTransferId));
- }
-
+
/*
* (non-Javadoc)
*
* @see org.alfresco.service.cmr.transfer.TransferReceiver#nudgeLock(java.lang.String)
*/
- public void nudgeLock(final String transferId) throws TransferException
+ public Lock checkLock(final String transferId) throws TransferException
{
if (transferId == null)
- throw new IllegalArgumentException("transferId = null");
-
- transactionService.getRetryingTransactionHelper().doInTransaction(
- new RetryingTransactionHelper.RetryingTransactionCallback()
- {
- public NodeRef execute() throws Throwable
- {
- // Find the lock node
- NodeRef lockId = getLockNode();
- // Check that the specified transfer is the one that owns the lock
- if (!testLockedTransfer(lockId, transferId))
- {
- throw new TransferException(MSG_NOT_LOCK_OWNER);
- }
- // Just write the lock file name again (no change, but forces the modified time to be updated)
- nodeService.setProperty(lockId, ContentModel.PROP_NAME, LOCK_FILE_NAME);
- return null;
- }
- }, false, true);
+ {
+ throw new IllegalArgumentException("nudgeLock: transferId = null");
+ }
+
+ Lock lock = locks.get(transferId);
+ if(lock != null)
+ {
+ if(lock.isActive())
+ {
+ lock.suspendLockTimeout();
+ return lock;
+ }
+ else
+ {
+ // lock is no longer active
+ log.debug("lock not active");
+ throw new TransferException(MSG_LOCK_TIMED_OUT, new Object[]{transferId});
+
+ }
+ }
+ else
+ {
+ log.debug("lock not found");
+ throw new TransferException(MSG_LOCK_NOT_FOUND, new Object[]{transferId});
+ // lock not found
+ }
}
/*
@@ -670,28 +750,35 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
*/
public void saveSnapshot(String transferId, InputStream openStream) throws TransferException
{
- // Check that this transfer owns the lock and give it a nudge to stop it expiring
- nudgeLock(transferId);
-
- if (log.isDebugEnabled())
- {
- log.debug("Saving snapshot for transferId =" + transferId);
- }
- File snapshotFile = new File(getStagingFolder(transferId), SNAPSHOT_FILE_NAME);
+ // Check that this transfer still owns the lock
+ Lock lock = checkLock(transferId);
try
{
- if (snapshotFile.createNewFile())
- {
- FileCopyUtils.copy(openStream, new FileOutputStream(snapshotFile));
- }
if (log.isDebugEnabled())
{
- log.debug("Saved snapshot for transferId =" + transferId);
+ log.debug("Saving snapshot for transferId =" + transferId);
+ }
+
+ File snapshotFile = new File(getStagingFolder(transferId), SNAPSHOT_FILE_NAME);
+ try
+ {
+ if (snapshotFile.createNewFile())
+ {
+ FileCopyUtils.copy(openStream, new BufferedOutputStream(new FileOutputStream(snapshotFile)));
+ }
+ if (log.isDebugEnabled())
+ {
+ log.debug("Saved snapshot for transferId =" + transferId);
+ }
+ }
+ catch (Exception ex)
+ {
+ throw new TransferException(MSG_ERROR_WHILE_STAGING_SNAPSHOT, new Object[]{transferId}, ex);
}
}
- catch (Exception ex)
+ finally
{
- throw new TransferException(MSG_ERROR_WHILE_STAGING_SNAPSHOT, new Object[]{transferId}, ex);
+ lock.enableLockTimeout();
}
}
@@ -704,10 +791,10 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
public void saveContent(String transferId, String contentFileId, InputStream contentStream)
throws TransferException
{
- nudgeLock(transferId);
- File stagedFile = new File(getStagingFolder(transferId), contentFileId);
+ Lock lock = checkLock(transferId);
try
- {
+ {
+ File stagedFile = new File(getStagingFolder(transferId), contentFileId);
if (stagedFile.createNewFile())
{
FileCopyUtils.copy(contentStream, new BufferedOutputStream(new FileOutputStream(stagedFile)));
@@ -717,20 +804,47 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
{
throw new TransferException(MSG_ERROR_WHILE_STAGING_CONTENT, new Object[]{transferId, contentFileId}, ex);
}
+ finally
+ {
+ lock.enableLockTimeout();
+ }
}
public void commitAsync(String transferId)
{
- nudgeLock(transferId);
- progressMonitor.updateStatus(transferId, Status.COMMIT_REQUESTED);
- Action commitAction = actionService.createAction(TransferCommitActionExecuter.NAME);
- commitAction.setParameterValue(TransferCommitActionExecuter.PARAM_TRANSFER_ID, transferId);
- commitAction.setExecuteAsynchronously(true);
- actionService.executeAction(commitAction, new NodeRef(transferId));
- if (log.isDebugEnabled())
+ /**
+ * A side-effect of checking the lock here is that the lock timeout is suspended.
+ *
+ */
+ Lock lock = checkLock(transferId);
+ try
{
- log.debug("Registered transfer commit for asynchronous execution: " + transferId);
+ progressMonitor.updateStatus(transferId, Status.COMMIT_REQUESTED);
+ Action commitAction = actionService.createAction(TransferCommitActionExecuter.NAME);
+ commitAction.setParameterValue(TransferCommitActionExecuter.PARAM_TRANSFER_ID, transferId);
+ commitAction.setExecuteAsynchronously(true);
+ actionService.executeAction(commitAction, new NodeRef(transferId));
+ if (log.isDebugEnabled())
+ {
+ log.debug("Registered transfer commit for asynchronous execution: " + transferId);
+ }
}
+ catch (Exception error)
+ {
+ /**
+ * Error somewhere in the action service?
+ */
+ //TODO consider whether the methods in this class should be retried/retryable..
+
+ // need to re-enable the lock timeout otherwise we will hold the lock forever...
+ lock.enableLockTimeout();
+
+ throw new TransferException(MSG_ERROR_WHILE_COMMITTING_TRANSFER, new Object[]{transferId}, error);
+ }
+
+ /**
+ * Lock intentionally not re-enabled here
+ */
}
public void commit(final String transferId) throws TransferException
@@ -740,6 +854,11 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
log.debug("Committing transferId=" + transferId);
}
+ /**
+ * A side-effect of checking the lock here is that it ensures that the lock timeout is suspended.
+ */
+ checkLock(transferId);
+
/**
* Turn off rules while transfer is being committed.
*/
@@ -748,7 +867,7 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
try
{
- nudgeLock(transferId);
+ /* lock is going to be released */ checkLock(transferId);
progressMonitor.updateStatus(transferId, Status.COMMITTING);
RetryingTransactionHelper.RetryingTransactionCallback