diff --git a/config/alfresco/deployment-service-context.xml b/config/alfresco/deployment-service-context.xml index 22a2edad0c..561ce9db8e 100644 --- a/config/alfresco/deployment-service-context.xml +++ b/config/alfresco/deployment-service-context.xml @@ -75,6 +75,17 @@ ${deployment.service.numberOfSendingThreads} + + + ${deployment.service.targetLockRefreshTime} + + + + + ${deployment.service.targetLockTimeout} + + diff --git a/config/alfresco/repository.properties b/config/alfresco/repository.properties index 2b7fdb73cb..9b0ebd0809 100644 --- a/config/alfresco/repository.properties +++ b/config/alfresco/repository.properties @@ -499,6 +499,12 @@ deployment.service.numberOfSendingThreads=5 deployment.service.corePoolSize=2 deployment.service.maximumPoolSize=3 +# How long to wait in mS before refreshing a target lock - detects shutdown servers +deployment.service.targetLockRefreshTime=60000 +# How long to wait in mS from the last communication before deciding that deployment has failed, possibly +# the destination is no longer available? +deployment.service.targetLockTimeout=3600000 + # Transfer Service transferservice.receiver.enabled=true transferservice.receiver.stagingDir=${java.io.tmpdir}/alfresco-transfer-staging diff --git a/source/java/org/alfresco/repo/deploy/DeploymentServiceImpl.java b/source/java/org/alfresco/repo/deploy/DeploymentServiceImpl.java index 33f2035092..3b1a77f0d3 100644 --- a/source/java/org/alfresco/repo/deploy/DeploymentServiceImpl.java +++ b/source/java/org/alfresco/repo/deploy/DeploymentServiceImpl.java @@ -53,6 +53,7 @@ import org.alfresco.repo.avm.AVMNodeService; import org.alfresco.repo.avm.util.SimplePath; import org.alfresco.repo.domain.PropertyValue; import org.alfresco.repo.lock.JobLockService; +import org.alfresco.repo.lock.JobLockService.JobLockRefreshCallback; import org.alfresco.repo.remote.AVMRemoteImpl; import org.alfresco.repo.remote.AVMSyncServiceRemote; import org.alfresco.repo.remote.ClientTicketHolder; @@ -96,11 +97,6 @@ public class DeploymentServiceImpl implements DeploymentService { private static Log fgLogger = LogFactory.getLog(DeploymentServiceImpl.class); - /** - * Holds locks for all deployment destinations (alfresco->alfresco) - */ - private Map fDestinations; - /** * The local AVMService Instance. */ @@ -130,18 +126,32 @@ public class DeploymentServiceImpl implements DeploymentService /** * Hold the deployment lock for 3600 seconds (1 hour) + *

+ * This is how long we will wait for a business process to complete. + * And needs to be fairly long to allow transmission of of big files + * over high latency networks. */ private long targetLockTimeToLive = 3600000; /** - * Retry for target lock every 10 seconds + * Refresh the lock every minute or so + *

+ * This is how long we keep the lock for before nudging it. So if + * this node in the cluster is shut down during deployment then + * another node can take over. */ - private long targetLockRetryWait = 10000; + private long targetLockRefreshTime = 10000; /** - * Retry 10000 times before giving up + * Retry for target lock every 1 second */ - private int targetLockRetryCount = 10000; + private long targetLockRetryWait = 1000; + + /** + * Retry 10000 times before giving up, basically we + * never want to give up. + */ + private int targetLockRetryCount = 10001; /** * The size of the output buffers @@ -164,7 +174,6 @@ public class DeploymentServiceImpl implements DeploymentService public DeploymentServiceImpl() { fTicketHolder = new ClientTicketHolderThread(); - fDestinations = new HashMap(); } /** @@ -203,185 +212,194 @@ public class DeploymentServiceImpl implements DeploymentService final boolean dontDo, final List callbacks) { - final String storeName = srcPath.substring(0, srcPath.indexOf(":")); - - /** - * Lock the cluster for the remote target - */ - String lockStr = hostName + "." + "asr." + storeName; - QName lockQName = QName.createQName("{org.alfresco.deployment.lock}" + lockStr); - Lock lock = new Lock(lockQName); - lock.makeLock(); - - /** - * Got the lock - now do a deployment - */ - if (fgLogger.isDebugEnabled()) - { - fgLogger.debug("Deploying to Remote Alfresco at " + hostName); - } - - - try - { - RetryingTransactionHelper trn = trxService.getRetryingTransactionHelper(); + final String storeName = srcPath.substring(0, srcPath.indexOf(":")); - fgLogger.debug("Connecting to remote AVM at " + hostName + ":" +port); - final AVMRemote remote = getRemote(hostName, port, userName, password); - if (version < 0) - { - /** - * If version is -1, Create a local snapshot to deploy - */ - fgLogger.debug("creating snapshot of local version"); - + /** + * Lock the cluster for the remote target + */ + String lockStr = hostName + "." + "asr." + storeName; + QName lockQName = QName.createQName("{http://www.alfresco.org/deploymentService/1.0}" + lockStr); - RetryingTransactionCallback localSnapshot = new RetryingTransactionCallback() - { - public Integer execute() throws Throwable - { - int newVersion = fAVMService.createSnapshot(storeName, null, null).get(storeName); - return new Integer(newVersion); - } - }; - version = trn.doInTransaction(localSnapshot, false, true).intValue(); - fgLogger.debug("snapshot local created " + storeName + ", " + version); - } + Lock lock = new Lock(lockQName); + lock.makeLock(); + try + { + /** + * Got the lock - now do a deployment + */ + if (fgLogger.isDebugEnabled()) + { + fgLogger.debug("Deploying to Remote Alfresco at " + hostName); + } - { - DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.START, - new Pair(version, srcPath), - dstPath); - processEvent(event, callbacks); - } - /* - * Create a snapshot on the destination server. - */ - boolean createdRoot = false; - String [] storePath = dstPath.split(":"); - int snapshot = -1; + try + { + RetryingTransactionHelper trn = trxService.getRetryingTransactionHelper(); - // Get the root of the deployment on the destination server. - AVMNodeDescriptor dstRoot = remote.lookup(-1, dstPath); + fgLogger.debug("Connecting to remote AVM at " + hostName + ":" +port); + final AVMRemote remote = getRemote(hostName, port, userName, password); + if (version < 0) + { + /** + * If version is -1, Create a local snapshot to deploy + */ + fgLogger.debug("creating snapshot of local version"); - if (!dontDo) - { - // Get the root of the deployment on the destination server. - if (dstRoot == null) - { - if (createDst) - { - fgLogger.debug("Create destination parent folder:" + dstPath); - createDestination(remote, dstPath); - dstRoot = remote.lookup(-1, dstPath); - createdRoot = true; - } - else - { - throw new AVMNotFoundException("Node Not Found: " + dstRoot); - } - } - fgLogger.debug("create snapshot on remote"); - snapshot = remote.createSnapshot(storePath[0], "PreDeploy", "Pre Deployment Snapshot").get(storePath[0]); - fgLogger.debug("snapshot created on remote"); - } + RetryingTransactionCallback localSnapshot = new RetryingTransactionCallback() + { + public Integer execute() throws Throwable + { + int newVersion = fAVMService.createSnapshot(storeName, null, null).get(storeName); + return new Integer(newVersion); + } + }; + version = trn.doInTransaction(localSnapshot, false, true).intValue(); + fgLogger.debug("snapshot local created " + storeName + ", " + version); + } - final int srcVersion = version; - final String srcFinalPath = srcPath; - RetryingTransactionCallback readRoot = new RetryingTransactionCallback() - { - public AVMNodeDescriptor execute() throws Throwable - { - return fAVMService.lookup(srcVersion, srcFinalPath); - } - }; + { + DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.START, + new Pair(version, srcPath), + dstPath); + processEvent(event, callbacks); + } - final AVMNodeDescriptor srcRoot = trn.doInTransaction(readRoot, true, true); + /* + * Create a snapshot on the destination server. + */ + boolean createdRoot = false; + String [] storePath = dstPath.split(":"); + int snapshot = -1; - // Get the root of the deployment from this server. - // AVMNodeDescriptor srcRoot = fAVMService.lookup(version, srcPath); + // Get the root of the deployment on the destination server. + AVMNodeDescriptor dstRoot = remote.lookup(-1, dstPath); - if (srcRoot == null) - { - throw new AVMNotFoundException("Directory Not Found: " + srcPath); - } - if (!srcRoot.isDirectory()) - { - throw new AVMWrongTypeException("Not a directory: " + srcPath); - } + if (!dontDo) + { + // Get the root of the deployment on the destination server. - /** - * The destination directory exists - check is actually a directory - */ - if (!dstRoot.isDirectory()) - { - throw new AVMWrongTypeException("Not a Directory: " + dstPath); - } + if (dstRoot == null) + { + if (createDst) + { + fgLogger.debug("Create destination parent folder:" + dstPath); + createDestination(remote, dstPath); + dstRoot = remote.lookup(-1, dstPath); + createdRoot = true; + } + else + { + throw new AVMNotFoundException("Node Not Found: " + dstRoot); + } + } + fgLogger.debug("create snapshot on remote"); + snapshot = remote.createSnapshot(storePath[0], "PreDeploy", "Pre Deployment Snapshot").get(storePath[0]); + fgLogger.debug("snapshot created on remote"); + } - try - { - /** - * Recursivly copy - */ - fgLogger.debug("both src and dest exist, recursivly deploy"); - final AVMNodeDescriptor dstParentNode = dstRoot; - RetryingTransactionCallback copyContentsRecursivly = new RetryingTransactionCallback() - { - public Integer execute() throws Throwable - { - deployDirectoryPush(srcVersion, srcRoot, dstParentNode, remote, matcher, dontDelete, dontDo, callbacks); - return new Integer(0); - } - }; + final int srcVersion = version; + final String srcFinalPath = srcPath; + RetryingTransactionCallback readRoot = new RetryingTransactionCallback() + { + public AVMNodeDescriptor execute() throws Throwable + { + return fAVMService.lookup(srcVersion, srcFinalPath); + } + }; - trn.setMaxRetries(1); - trn.doInTransaction(copyContentsRecursivly, false, true); + final AVMNodeDescriptor srcRoot = trn.doInTransaction(readRoot, true, true); - fgLogger.debug("finished copying, snapshot remote"); - remote.createSnapshot(storePath[0], "Deployment", "Post Deployment Snapshot."); + // Get the root of the deployment from this server. + // AVMNodeDescriptor srcRoot = fAVMService.lookup(version, srcPath); - DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.END, - new Pair(version, srcPath), - dstPath); - processEvent(event, callbacks); - return; - } - catch (AVMException e) - { - fgLogger.debug("error during remote copy and snapshot"); - try - { - if (snapshot != -1) - { - fgLogger.debug("Attempting to roll back "); - AVMSyncService syncService = getSyncService(hostName, port); - List diffs = syncService.compare(snapshot, dstPath, -1, dstPath, null); - syncService.update(diffs, null, false, false, true, true, "Aborted Deployment", "Aborted Deployment"); - } - } - catch (Exception ee) - { - throw new AVMException("Failed to rollback to version " + snapshot + " on " + hostName, ee); - } - throw new AVMException("Deployment to " + hostName + " failed.", e); - } - } - catch (Exception e) - { - DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.FAILED, - new Pair(version, srcPath), - dstPath, e.getMessage()); - processEvent(event, callbacks); + if (srcRoot == null) + { + throw new AVMNotFoundException("Directory Not Found: " + srcPath); + } + if (!srcRoot.isDirectory()) + { + throw new AVMWrongTypeException("Not a directory: " + srcPath); + } + + /** + * The destination directory exists - check is actually a directory + */ + if (!dstRoot.isDirectory()) + { + throw new AVMWrongTypeException("Not a Directory: " + dstPath); + } + + try + { + /** + * Recursivly copy + */ + fgLogger.debug("both src and dest exist, recursivly deploy"); + final AVMNodeDescriptor dstParentNode = dstRoot; + RetryingTransactionCallback copyContentsRecursivly = new RetryingTransactionCallback() + { + public Integer execute() throws Throwable + { + deployDirectoryPush(srcVersion, srcRoot, dstParentNode, remote, matcher, dontDelete, dontDo, callbacks); + return new Integer(0); + } + }; + + trn.setMaxRetries(1); + trn.doInTransaction(copyContentsRecursivly, false, true); + + fgLogger.debug("finished copying, snapshot remote"); + remote.createSnapshot(storePath[0], "Deployment", "Post Deployment Snapshot."); + + DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.END, + new Pair(version, srcPath), + dstPath); + processEvent(event, callbacks); + return; + } + catch (AVMException e) + { + fgLogger.debug("error during remote copy and snapshot"); + try + { + if (snapshot != -1) + { + fgLogger.debug("Attempting to roll back "); + AVMSyncService syncService = getSyncService(hostName, port); + List diffs = syncService.compare(snapshot, dstPath, -1, dstPath, null); + syncService.update(diffs, null, false, false, true, true, "Aborted Deployment", "Aborted Deployment"); + } + } + catch (Exception ee) + { + throw new AVMException("Failed to rollback to version " + snapshot + " on " + hostName, ee); + } + throw new AVMException("Deployment to " + hostName + " failed.", e); + } + } + catch (Exception e) + { + DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.FAILED, + new Pair(version, srcPath), + dstPath, e.getMessage()); + processEvent(event, callbacks); + + throw new AVMException("Deployment to " + hostName + " failed." + e.toString(), e); + } + finally + { + fgLogger.debug("ASR Finally block, Releasing ASR deployment ticket"); + fTicketHolder.setTicket(null); + } + } + finally + { + fgLogger.debug("about to release lock"); + lock.releaseLock(); + } - throw new AVMException("Deployment to " + hostName + " failed." + e.toString(), e); - } - finally - { - fgLogger.debug("ASR Finally block, Releasing ASR deployment ticket"); - fTicketHolder.setTicket(null); - } } /** @@ -1052,203 +1070,217 @@ public class DeploymentServiceImpl implements DeploymentService List callbacks) { + fgLogger.debug("deployDifferenceFS start"); /** * Lock cluster for the remote target */ - String lockStr = hostName + "." + port + "." + target; - QName lockQName = QName.createQName("{org.alfresco.deployment.lock}" + lockStr); + String lockStr = "deploy." + hostName + "." + port + "." + target; + QName lockQName = QName.createQName("{http://www.alfresco.org/deploymentService/1.0}" + lockStr); final Lock lock = new Lock(lockQName); lock.makeLock(); - + try + { + /** + * Cluster Lock held here + */ + if (fgLogger.isDebugEnabled()) + { + Object[] objs = {version, srcPath, adapterName, hostName, port, target}; + MessageFormat f = new MessageFormat("Deployment Lock Held: version {0}, srcPath {1}, adapterName {2}, hostName {3}, port {4}, target {5}"); + fgLogger.debug(f.format(objs)); + } + + DeploymentReceiverService service = null; + Listtransformers = null; + String ticket = null; + + String currentEffectiveUser = AuthenticationUtil.getRunAsUser(); - /** - * Cluster Lock held here - */ - - if (fgLogger.isDebugEnabled()) - { - Object[] objs = {version, srcPath, adapterName, hostName, port, target}; - MessageFormat f = new MessageFormat("Deployment Lock Held: version {0}, srcPath {1}, adapterName {2}, hostName {3}, port {4}, target {5}"); - fgLogger.debug(f.format(objs)); - } - - DeploymentReceiverService service = null; - Listtransformers = null; - String ticket = null; - - String currentEffectiveUser = AuthenticationUtil.getRunAsUser(); + try + { + // Kick off the event queue that will process deployment call-backs + final LinkedBlockingQueue eventQueue = new LinkedBlockingQueue(); + EventQueueWorker eventQueueWorker = new EventQueueWorker(currentEffectiveUser, eventQueue, callbacks); + eventQueueWorker.setName(eventQueueWorker.getClass().getName()); + eventQueueWorker.setPriority(Thread.currentThread().getPriority()); + eventQueueWorker.start(); + + try + { + final String storeName = srcPath.substring(0, srcPath.indexOf(':')); + try { + + if (version < 0) + { + RetryingTransactionHelper trn = trxService.getRetryingTransactionHelper(); + + RetryingTransactionCallback localSnapshot = new RetryingTransactionCallback() + { + public Integer execute() throws Throwable + { + int newVersion = fAVMService.createSnapshot(storeName, null, null).get(storeName); + return new Integer(newVersion); + } + }; + version = trn.doInTransaction(localSnapshot, false, true).intValue(); + fgLogger.debug("snapshot local created " + storeName + ", " + version); + } - try - { - // Kick off the event queue that will process deployment call-backs - final LinkedBlockingQueue eventQueue = new LinkedBlockingQueue(); - EventQueueWorker eventQueueWorker = new EventQueueWorker(currentEffectiveUser, eventQueue, callbacks); - eventQueueWorker.setName(eventQueueWorker.getClass().getName()); - eventQueueWorker.setPriority(Thread.currentThread().getPriority()); - eventQueueWorker.start(); - - try - { - final String storeName = srcPath.substring(0, srcPath.indexOf(':')); - try { - - if (version < 0) - { - RetryingTransactionHelper trn = trxService.getRetryingTransactionHelper(); - - RetryingTransactionCallback localSnapshot = new RetryingTransactionCallback() - { - public Integer execute() throws Throwable - { - int newVersion = fAVMService.createSnapshot(storeName, null, null).get(storeName); - return new Integer(newVersion); - } - }; - version = trn.doInTransaction(localSnapshot, false, true).intValue(); - fgLogger.debug("snapshot local created " + storeName + ", " + version); - } + transformers = getTransformers(adapterName); + service = getDeploymentReceiverService(adapterName, hostName, port, version, srcPath); + } + catch (Exception e) + { + // unable to get service + eventQueue.add(new DeploymentEvent(DeploymentEvent.Type.FAILED, + new Pair(version, srcPath), + target, e.getMessage())); + throw e; + } + + eventQueue.add(new DeploymentEvent(DeploymentEvent.Type.START, + new Pair(version, srcPath), + target)); + + // Go parallel to reduce the problems of high network latency - transformers = getTransformers(adapterName); - service = getDeploymentReceiverService(adapterName, hostName, port, version, srcPath); - } - catch (Exception e) - { - // unable to get service - eventQueue.add(new DeploymentEvent(DeploymentEvent.Type.FAILED, - new Pair(version, srcPath), - target, e.getMessage())); - throw e; - } - - eventQueue.add(new DeploymentEvent(DeploymentEvent.Type.START, - new Pair(version, srcPath), - target)); - - // Go parallel to reduce the problems of high network latency + final LinkedBlockingQueue sendQueue = new LinkedBlockingQueue(); + final List errors = Collections.synchronizedList(new ArrayList()); - final LinkedBlockingQueue sendQueue = new LinkedBlockingQueue(); - final List errors = Collections.synchronizedList(new ArrayList()); + SendQueueWorker[] workers = new SendQueueWorker[numberOfSendingThreads]; + for(int i = 0; i < numberOfSendingThreads; i++) + { + workers[i] = new SendQueueWorker(currentEffectiveUser, service, fAVMService, trxService, errors, eventQueue, sendQueue, transformers); + workers[i].setName(workers[i].getClass().getName()); + workers[i].setPriority(Thread.currentThread().getPriority()); + } + + for(SendQueueWorker sender : workers) + { + sender.start(); + } + + try + { + fgLogger.debug("calling begin"); + DeploymentToken token = service.begin(target, storeName, version, userName, password.toCharArray()); + ticket = token.getTicket(); + + lock.checkLock(); + + // run this in its own txn + final DeploymentReceiverService fservice = service; + final String fTicket = ticket; + final int fVersion = version; + RetryingTransactionCallback pushFSR = new RetryingTransactionCallback() + { + public Integer execute() throws Throwable + { + deployDirectoryPushFSR(fservice, fTicket, fVersion, srcPath, "/", matcher, eventQueue, sendQueue, errors, lock); + return 0; + } + }; + + RetryingTransactionHelper trn = trxService.getRetryingTransactionHelper(); + trn.doInTransaction(pushFSR, false, true); + + } + catch (Exception e) + { + errors.add(e); + } + finally + { + // clean up senders thread pool + fgLogger.debug("closing deployment workers"); + for(SendQueueWorker sender : workers) + { + sender.stopMeWhenIdle(); + } + for(SendQueueWorker sender : workers) + { + sender.join(); + } + fgLogger.debug("deployment workers closed"); + + if (errors.size() <= 0 && ticket != null) + { + try + { + fgLogger.debug("no errors - prepare and commit"); + lock.checkLock(); + + service.prepare(ticket); + lock.checkLock(); + + service.commit(ticket); + // no point checking the lock here - we have committed. + } + catch (Exception e) + { + errors.add(e); + } + } + + if(errors.size() > 0) + { + fgLogger.debug("errors on deployment workers"); + Exception firstError = errors.get(0); + + eventQueue.add(new DeploymentEvent(DeploymentEvent.Type.FAILED, + new Pair(version, srcPath), + target, firstError.getMessage())); - SendQueueWorker[] workers = new SendQueueWorker[numberOfSendingThreads]; - for(int i = 0; i < numberOfSendingThreads; i++) - { - workers[i] = new SendQueueWorker(currentEffectiveUser, service, fAVMService, trxService, errors, eventQueue, sendQueue, transformers); - workers[i].setName(workers[i].getClass().getName()); - workers[i].setPriority(Thread.currentThread().getPriority()); - } - - for(SendQueueWorker sender : workers) - { - sender.start(); - } - - try - { - DeploymentToken token = service.begin(target, storeName, version, userName, password.toCharArray()); - ticket = token.getTicket(); - - // run this in its own txn - final DeploymentReceiverService fservice = service; - final String fTicket = ticket; - final int fVersion = version; - RetryingTransactionCallback pushFSR = new RetryingTransactionCallback() - { - public Integer execute() throws Throwable - { - deployDirectoryPushFSR(fservice, fTicket, fVersion, srcPath, "/", matcher, eventQueue, sendQueue, errors, lock); - return 0; - } - }; - - RetryingTransactionHelper trn = trxService.getRetryingTransactionHelper(); - trn.doInTransaction(pushFSR, false, true); - - } - catch (Exception e) - { - errors.add(e); - } - finally - { - // clean up senders thread pool - fgLogger.debug("closing deployment workers"); - for(SendQueueWorker sender : workers) - { - sender.stopMeWhenIdle(); - } - for(SendQueueWorker sender : workers) - { - sender.join(); - } - fgLogger.debug("deployment workers closed"); - - if (errors.size() <= 0 && ticket != null) - { - try - { - fgLogger.debug("no errors - prepare and commit"); - service.prepare(ticket); - service.commit(ticket); - } - catch (Exception e) - { - errors.add(e); - } - } - - if(errors.size() > 0) - { - fgLogger.debug("errors on deployment workers"); - Exception firstError = errors.get(0); - - eventQueue.add(new DeploymentEvent(DeploymentEvent.Type.FAILED, - new Pair(version, srcPath), - target, firstError.getMessage())); - - if (ticket != null) - { - try - { - service.abort(ticket); - } - catch (Exception ae) - { - // nothing we can do here - fgLogger.error("Unable to abort deployment. Error in exception handler", ae); - } - } - // yes there were errors, throw the first exception that was saved - MessageFormat f = new MessageFormat("Error during deployment srcPath: {0}, version:{1}, adapterName:{2}, hostName:{3}, port:{4}, error:{5}"); - Object[] objs = { srcPath, version, adapterName, hostName, port, firstError }; - - throw new AVMException(f.format(objs), firstError); - } - } // end of finally block - - // Success if we get here - eventQueue.add(new DeploymentEvent(DeploymentEvent.Type.END, - new Pair(version, srcPath), - target)); - - fgLogger.debug("deployment completed successfully"); - } - finally - { - // Now stutdown the event queue - fgLogger.debug("closing event queue"); - eventQueueWorker.stopMeWhenIdle(); - eventQueueWorker.join(); - fgLogger.debug("event queue closed"); - } - } - catch (Exception e) - { - // yes there were errors - MessageFormat f = new MessageFormat("Deployment exception, unable to deploy : srcPath:{0}, target:{1}, version:{2}, adapterName:{3}, hostName:{4}, port:{5}, error:{6}"); - Object[] objs = { srcPath, target, version, adapterName, hostName, port, e }; - throw new AVMException(f.format(objs), e); - } - } + if (ticket != null) + { + try + { + service.abort(ticket); + } + catch (Exception ae) + { + // nothing we can do here + fgLogger.error("Unable to abort deployment. Error in exception handler", ae); + } + } + // yes there were errors, throw the first exception that was saved + MessageFormat f = new MessageFormat("Error during deployment srcPath: {0}, version:{1}, adapterName:{2}, hostName:{3}, port:{4}, error:{5}"); + Object[] objs = { srcPath, version, adapterName, hostName, port, firstError }; + + throw new AVMException(f.format(objs), firstError); + } + } // end of finally block + + // Success if we get here + eventQueue.add(new DeploymentEvent(DeploymentEvent.Type.END, + new Pair(version, srcPath), + target)); + + fgLogger.debug("deployment completed successfully"); + } + finally + { + // Now stutdown the event queue + fgLogger.debug("closing event queue"); + eventQueueWorker.stopMeWhenIdle(); + eventQueueWorker.join(); + fgLogger.debug("event queue closed"); + } + } + catch (Exception e) + { + // yes there were errors + MessageFormat f = new MessageFormat("Deployment exception, unable to deploy : srcPath:{0}, target:{1}, version:{2}, adapterName:{3}, hostName:{4}, port:{5}, error:{6}"); + Object[] objs = { srcPath, target, version, adapterName, hostName, port, e }; + throw new AVMException(f.format(objs), e); + } + } + finally + { + fgLogger.debug("At end of method - about to release lock"); + lock.releaseLock(); + } + } // End of deploy difference FS private class ComparatorFileDescriptorCaseSensitive implements Comparator @@ -1309,7 +1341,7 @@ public class DeploymentServiceImpl implements DeploymentService Iterator dstIter = dstListing.iterator(); Iterator srcIter = srcListing.iterator(); - lock.refreshLock(); + lock.checkLock(); // Here with two sorted directory listings AVMNodeDescriptor src = null; @@ -1353,7 +1385,7 @@ public class DeploymentServiceImpl implements DeploymentService fgLogger.debug("comparing src:" + src + " dst:"+ dst); } - lock.refreshLock(); + lock.checkLock(); // This means no entry on src so delete what is on dst. if (src == null) @@ -1721,13 +1753,37 @@ public class DeploymentServiceImpl implements DeploymentService } /** - * Inner Class to Decorate the jobLockService to add control over the refreshLock behaviour to - * reduce the number of calls to the underlying lock service. + * Inner Class to Decorate the jobLockService to + * add control over the refreshLock behaviour. + * + * Deployment service calls (On deployment main thread) + * makeLock and releaseLock around the deployment. + * periodically calls checkLock as it does its work. + * checkLock can throw an exception if the business process has timed out. + * + * isActive and lockReleased called by Job Lock Thread */ - private class Lock + private class Lock implements JobLockRefreshCallback { + /** + * The name of the lock - unique for each target + */ QName lockQName; - long lockTime; + + /** + * The unique token for this lock instance. + */ + String lockToken; + + /** + * Is the lock active ? + */ + boolean active = false; + + /** + * When did we last check whether the lock is active + */ + Date lastActive = new Date(); public Lock(QName lockQName) { @@ -1735,34 +1791,130 @@ public class DeploymentServiceImpl implements DeploymentService } /** + * Make the lock - called on main deployment thread + * * @throws LockAquisitionException */ public void makeLock() { - jobLockService.getTransactionalLock(lockQName, getTargetLockTimeToLive(), getTargetLockRetryWait(), getTargetLockRetryCount()); - lockTime = new Date().getTime(); + if(fgLogger.isDebugEnabled()) + { + fgLogger.debug("target lock refresh time :" + getTargetLockRefreshTime() + "targetLockRetryWait:" + targetLockRetryWait + "targetLockRetryCount:" + targetLockRetryCount); + } + lockToken = jobLockService.getLock(lockQName, targetLockRefreshTime, targetLockRetryWait, targetLockRetryCount); + + synchronized(this) + { + active = true; + } if (fgLogger.isDebugEnabled()) { - fgLogger.debug("lock taken" + lockQName); + fgLogger.debug("lock taken:" + lockQName); } + + // We may have taken so long to begin that we have already timed out ! + checkLock(); + + fgLogger.debug("register lock callback, target lock refresh time :" + getTargetLockRefreshTime()); + jobLockService.refreshLock(lockToken, lockQName, getTargetLockRefreshTime(), this); + fgLogger.debug("callback registered"); } - public void refreshLock() + /** + * Refresh the lock - called as the business process progresses. + * + * Called on main deployment thread. + * @throws AVMException (Lock timeout) + */ + public void checkLock() { - /** - * Optimisation to stop the lock being refreshed thousands of times, refresh lock only after half lock time has expired - */ - Date now = new Date(); - if(now.getTime() - lockTime > (targetLockTimeToLive / 2)) - { - if (fgLogger.isDebugEnabled()) - { - fgLogger.debug("lock refreshed" + lockQName); - } - jobLockService.getTransactionalLock(lockQName, getTargetLockTimeToLive(), getTargetLockRetryWait(), getTargetLockRetryCount()); - lockTime = new Date().getTime(); - } + // Do I need to sync this? + + if(active) + { + Date now = new Date(); + + if(now.getTime() > lastActive.getTime() + targetLockTimeToLive) + { + // lock time to live has expired. + MessageFormat f = new MessageFormat("Deployment Lock timeout, lock time to live exceeded, timeout:{0}mS time since last activity:{1}mS"); + Object[] objs = {new Long(targetLockTimeToLive), new Long(now.getTime() - lastActive.getTime()) }; + throw new AVMException(f.format(objs)); + } + + // Update lastActive to 1S boundary + if(now.getTime() > lastActive.getTime() + 1000) + { + lastActive = new Date(); + fgLogger.debug("lastActive:" + lastActive); + } + } + else + { + // lock not active. Has been switched off by Job Lock Service. + MessageFormat f = new MessageFormat("Lock timeout, lock not active"); + Object[] objs = { }; + throw new AVMException(f.format(objs)); + } } + + /** + * Release the lock + * + * Called on main deployment thread + */ + public void releaseLock() + { + if(fgLogger.isDebugEnabled()) + { + fgLogger.debug("deployment service about to releaseLock : " + lockQName); + } + if(active) + { + jobLockService.releaseLock(lockToken, lockQName); + } + fgLogger.debug("setting active = false" + lockQName); + + // may need to sync this + synchronized(this) + { + active = false; + } + + } + + /** + * Job Lock Callback + * + * Callback from the job lock service. Is the deployment active? + */ + @Override + public boolean isActive() + { + // may need to sync active flag + if(fgLogger.isDebugEnabled()) + { + fgLogger.debug("deployment service callback active: " + active); + } + synchronized(this) + { + return active; + } + } + + /** + * Job Lock Callback. + */ + @Override + public void lockReleased() + { + fgLogger.debug("deployment service: lock released callback"); + synchronized(this) + { + active = false; + } + } + } @@ -1978,4 +2130,21 @@ public class DeploymentServiceImpl implements DeploymentService } return false; } + + + public void setTargetLockRefreshTime(long targetLockRefreshTime) + { + this.targetLockRefreshTime = targetLockRefreshTime; + } + + /** + * How long to keep a lock before refreshing it? + *

+ * Short time-out, typically a minute. + * @return the time in mS for how long to keep the lock. + */ + public long getTargetLockRefreshTime() + { + return targetLockRefreshTime; + } } diff --git a/source/java/org/alfresco/repo/deploy/FSDeploymentTest.java b/source/java/org/alfresco/repo/deploy/DeploymentServiceImplFSTest.java similarity index 95% rename from source/java/org/alfresco/repo/deploy/FSDeploymentTest.java rename to source/java/org/alfresco/repo/deploy/DeploymentServiceImplFSTest.java index 4a8e4f63a3..3ae87ba865 100644 --- a/source/java/org/alfresco/repo/deploy/FSDeploymentTest.java +++ b/source/java/org/alfresco/repo/deploy/DeploymentServiceImplFSTest.java @@ -40,6 +40,8 @@ import org.alfresco.service.cmr.avm.deploy.DeploymentService; import org.alfresco.service.cmr.repository.ContentWriter; import org.alfresco.util.Deleter; import org.alfresco.util.NameMatcher; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.context.support.FileSystemXmlApplicationContext; /** @@ -47,7 +49,7 @@ import org.springframework.context.support.FileSystemXmlApplicationContext; * @author britt * @author mrogers */ -public class FSDeploymentTest extends AVMServiceTestBase +public class DeploymentServiceImplFSTest extends AVMServiceTestBase { private File log = null; private File metadata = null; @@ -60,6 +62,8 @@ public class FSDeploymentTest extends AVMServiceTestBase DeploymentService service = null; + private static Log logger = LogFactory.getLog(DeploymentServiceImplFSTest.class); + @Override protected void setUp() throws Exception @@ -410,6 +414,7 @@ public class FSDeploymentTest extends AVMServiceTestBase */ public void testWrongPassword() { + logger.debug("Start testWrongPassword"); try { service.deployDifferenceFS(-1, "main:/", "default", "localhost", 44100, TEST_USER, "Wrong!", TEST_TARGET, null, false, false, false, null); @@ -428,6 +433,7 @@ public class FSDeploymentTest extends AVMServiceTestBase */ public void testWrongTarget() { + logger.debug("Start testWrongTarget"); try { service.deployDifferenceFS(-1, "main:/", "default", "localhost", 44100, TEST_USER, TEST_PASSWORD, "crapTarget", null, false, false, false, null); fail("Wrong target should have thrown an exception"); @@ -443,6 +449,7 @@ public class FSDeploymentTest extends AVMServiceTestBase */ public void testNoExclusionFilter() throws Exception { + logger.debug("Start testNoExclusionFilter"); DeploymentReport report = new DeploymentReport(); List callbacks = new ArrayList(); callbacks.add(new DeploymentReportCallback(report)); @@ -471,6 +478,7 @@ public class FSDeploymentTest extends AVMServiceTestBase */ public void testRevertToPreviousVersion() throws Exception { + logger.debug("Start testRevertToPreviousVersion"); DeploymentReport report = new DeploymentReport(); List callbacks = new ArrayList(); callbacks.add(new DeploymentReportCallback(report)); @@ -526,7 +534,8 @@ public class FSDeploymentTest extends AVMServiceTestBase * Do a deployment - should only see start and end events and the two above. */ public void testBulkLoad() throws Exception - { + { + logger.debug("Start testBulkLoad"); DeploymentReport report = new DeploymentReport(); List callbacks = new ArrayList(); callbacks.add(new DeploymentReportCallback(report));