ALF-4898 - rework to deployment locks to implement Job Lock Service timeout callback.

- there is now two callbacks.   A short timeout (1 minute) for keeping the lock alive and a longer lock for timing 
out the whole deployment.

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@22961 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Mark Rogers
2010-10-07 13:56:35 +00:00
parent 839d522808
commit ee52855f62
4 changed files with 579 additions and 384 deletions

View File

@@ -75,6 +75,17 @@
<value>${deployment.service.numberOfSendingThreads}</value> <value>${deployment.service.numberOfSendingThreads}</value>
</property> </property>
<!-- Short timeout, to detect a shutdown server -->
<property name="targetLockRefreshTime">
<value>${deployment.service.targetLockRefreshTime}</value>
</property>
<!-- Long timeout, how long to wait for the next step in the deployment process, may need to be fairly
long to cope with large transfers over slow networks -->
<property name="targetLockTimeToLive">
<value>${deployment.service.targetLockTimeout}</value>
</property>
<!-- Which adapters are provided to communicate with remote File System Receivers --> <!-- Which adapters are provided to communicate with remote File System Receivers -->
<property name="deploymentReceiverTransportAdapters"> <property name="deploymentReceiverTransportAdapters">
<map> <map>

View File

@@ -499,6 +499,12 @@ deployment.service.numberOfSendingThreads=5
deployment.service.corePoolSize=2 deployment.service.corePoolSize=2
deployment.service.maximumPoolSize=3 deployment.service.maximumPoolSize=3
# How long to wait in mS before refreshing a target lock - detects shutdown servers
deployment.service.targetLockRefreshTime=60000
# How long to wait in mS from the last communication before deciding that deployment has failed, possibly
# the destination is no longer available?
deployment.service.targetLockTimeout=3600000
# Transfer Service # Transfer Service
transferservice.receiver.enabled=true transferservice.receiver.enabled=true
transferservice.receiver.stagingDir=${java.io.tmpdir}/alfresco-transfer-staging transferservice.receiver.stagingDir=${java.io.tmpdir}/alfresco-transfer-staging

View File

@@ -53,6 +53,7 @@ import org.alfresco.repo.avm.AVMNodeService;
import org.alfresco.repo.avm.util.SimplePath; import org.alfresco.repo.avm.util.SimplePath;
import org.alfresco.repo.domain.PropertyValue; import org.alfresco.repo.domain.PropertyValue;
import org.alfresco.repo.lock.JobLockService; import org.alfresco.repo.lock.JobLockService;
import org.alfresco.repo.lock.JobLockService.JobLockRefreshCallback;
import org.alfresco.repo.remote.AVMRemoteImpl; import org.alfresco.repo.remote.AVMRemoteImpl;
import org.alfresco.repo.remote.AVMSyncServiceRemote; import org.alfresco.repo.remote.AVMSyncServiceRemote;
import org.alfresco.repo.remote.ClientTicketHolder; import org.alfresco.repo.remote.ClientTicketHolder;
@@ -96,11 +97,6 @@ public class DeploymentServiceImpl implements DeploymentService
{ {
private static Log fgLogger = LogFactory.getLog(DeploymentServiceImpl.class); private static Log fgLogger = LogFactory.getLog(DeploymentServiceImpl.class);
/**
* Holds locks for all deployment destinations (alfresco->alfresco)
*/
private Map<DeploymentDestination, DeploymentDestination> fDestinations;
/** /**
* The local AVMService Instance. * The local AVMService Instance.
*/ */
@@ -130,18 +126,32 @@ public class DeploymentServiceImpl implements DeploymentService
/** /**
* Hold the deployment lock for 3600 seconds (1 hour) * Hold the deployment lock for 3600 seconds (1 hour)
* <p>
* This is how long we will wait for a business process to complete.
* And needs to be fairly long to allow transmission of of big files
* over high latency networks.
*/ */
private long targetLockTimeToLive = 3600000; private long targetLockTimeToLive = 3600000;
/** /**
* Retry for target lock every 10 seconds * Refresh the lock every minute or so
* <p>
* This is how long we keep the lock for before nudging it. So if
* this node in the cluster is shut down during deployment then
* another node can take over.
*/ */
private long targetLockRetryWait = 10000; private long targetLockRefreshTime = 10000;
/** /**
* Retry 10000 times before giving up * Retry for target lock every 1 second
*/ */
private int targetLockRetryCount = 10000; private long targetLockRetryWait = 1000;
/**
* Retry 10000 times before giving up, basically we
* never want to give up.
*/
private int targetLockRetryCount = 10001;
/** /**
* The size of the output buffers * The size of the output buffers
@@ -164,7 +174,6 @@ public class DeploymentServiceImpl implements DeploymentService
public DeploymentServiceImpl() public DeploymentServiceImpl()
{ {
fTicketHolder = new ClientTicketHolderThread(); fTicketHolder = new ClientTicketHolderThread();
fDestinations = new HashMap<DeploymentDestination, DeploymentDestination>();
} }
/** /**
@@ -209,10 +218,12 @@ public class DeploymentServiceImpl implements DeploymentService
* Lock the cluster for the remote target * Lock the cluster for the remote target
*/ */
String lockStr = hostName + "." + "asr." + storeName; String lockStr = hostName + "." + "asr." + storeName;
QName lockQName = QName.createQName("{org.alfresco.deployment.lock}" + lockStr); QName lockQName = QName.createQName("{http://www.alfresco.org/deploymentService/1.0}" + lockStr);
Lock lock = new Lock(lockQName); Lock lock = new Lock(lockQName);
lock.makeLock(); lock.makeLock();
try
{
/** /**
* Got the lock - now do a deployment * Got the lock - now do a deployment
*/ */
@@ -383,6 +394,13 @@ public class DeploymentServiceImpl implements DeploymentService
fTicketHolder.setTicket(null); fTicketHolder.setTicket(null);
} }
} }
finally
{
fgLogger.debug("about to release lock");
lock.releaseLock();
}
}
/** /**
* Deploy all the children of corresponding directories. (ASR version) * Deploy all the children of corresponding directories. (ASR version)
@@ -1052,19 +1070,19 @@ public class DeploymentServiceImpl implements DeploymentService
List<DeploymentCallback> callbacks) List<DeploymentCallback> callbacks)
{ {
fgLogger.debug("deployDifferenceFS start");
/** /**
* Lock cluster for the remote target * Lock cluster for the remote target
*/ */
String lockStr = hostName + "." + port + "." + target; String lockStr = "deploy." + hostName + "." + port + "." + target;
QName lockQName = QName.createQName("{org.alfresco.deployment.lock}" + lockStr); QName lockQName = QName.createQName("{http://www.alfresco.org/deploymentService/1.0}" + lockStr);
final Lock lock = new Lock(lockQName); final Lock lock = new Lock(lockQName);
lock.makeLock(); lock.makeLock();
try
{
/** /**
* Cluster Lock held here * Cluster Lock held here
*/ */
if (fgLogger.isDebugEnabled()) if (fgLogger.isDebugEnabled())
{ {
Object[] objs = {version, srcPath, adapterName, hostName, port, target}; Object[] objs = {version, srcPath, adapterName, hostName, port, target};
@@ -1144,9 +1162,12 @@ public class DeploymentServiceImpl implements DeploymentService
try try
{ {
fgLogger.debug("calling begin");
DeploymentToken token = service.begin(target, storeName, version, userName, password.toCharArray()); DeploymentToken token = service.begin(target, storeName, version, userName, password.toCharArray());
ticket = token.getTicket(); ticket = token.getTicket();
lock.checkLock();
// run this in its own txn // run this in its own txn
final DeploymentReceiverService fservice = service; final DeploymentReceiverService fservice = service;
final String fTicket = ticket; final String fTicket = ticket;
@@ -1187,8 +1208,13 @@ public class DeploymentServiceImpl implements DeploymentService
try try
{ {
fgLogger.debug("no errors - prepare and commit"); fgLogger.debug("no errors - prepare and commit");
lock.checkLock();
service.prepare(ticket); service.prepare(ticket);
lock.checkLock();
service.commit(ticket); service.commit(ticket);
// no point checking the lock here - we have committed.
} }
catch (Exception e) catch (Exception e)
{ {
@@ -1249,6 +1275,12 @@ public class DeploymentServiceImpl implements DeploymentService
throw new AVMException(f.format(objs), e); throw new AVMException(f.format(objs), e);
} }
} }
finally
{
fgLogger.debug("At end of method - about to release lock");
lock.releaseLock();
}
} // End of deploy difference FS
private class ComparatorFileDescriptorCaseSensitive implements Comparator<FileDescriptor> private class ComparatorFileDescriptorCaseSensitive implements Comparator<FileDescriptor>
@@ -1309,7 +1341,7 @@ public class DeploymentServiceImpl implements DeploymentService
Iterator<FileDescriptor> dstIter = dstListing.iterator(); Iterator<FileDescriptor> dstIter = dstListing.iterator();
Iterator<AVMNodeDescriptor> srcIter = srcListing.iterator(); Iterator<AVMNodeDescriptor> srcIter = srcListing.iterator();
lock.refreshLock(); lock.checkLock();
// Here with two sorted directory listings // Here with two sorted directory listings
AVMNodeDescriptor src = null; AVMNodeDescriptor src = null;
@@ -1353,7 +1385,7 @@ public class DeploymentServiceImpl implements DeploymentService
fgLogger.debug("comparing src:" + src + " dst:"+ dst); fgLogger.debug("comparing src:" + src + " dst:"+ dst);
} }
lock.refreshLock(); lock.checkLock();
// This means no entry on src so delete what is on dst. // This means no entry on src so delete what is on dst.
if (src == null) if (src == null)
@@ -1721,13 +1753,37 @@ public class DeploymentServiceImpl implements DeploymentService
} }
/** /**
* Inner Class to Decorate the jobLockService to add control over the refreshLock behaviour to * Inner Class to Decorate the jobLockService to
* reduce the number of calls to the underlying lock service. * add control over the refreshLock behaviour.
*
* Deployment service calls (On deployment main thread)
* makeLock and releaseLock around the deployment.
* periodically calls checkLock as it does its work.
* checkLock can throw an exception if the business process has timed out.
*
* isActive and lockReleased called by Job Lock Thread
*/ */
private class Lock private class Lock implements JobLockRefreshCallback
{ {
/**
* The name of the lock - unique for each target
*/
QName lockQName; QName lockQName;
long lockTime;
/**
* The unique token for this lock instance.
*/
String lockToken;
/**
* Is the lock active ?
*/
boolean active = false;
/**
* When did we last check whether the lock is active
*/
Date lastActive = new Date();
public Lock(QName lockQName) public Lock(QName lockQName)
{ {
@@ -1735,34 +1791,130 @@ public class DeploymentServiceImpl implements DeploymentService
} }
/** /**
* Make the lock - called on main deployment thread
*
* @throws LockAquisitionException * @throws LockAquisitionException
*/ */
public void makeLock() public void makeLock()
{ {
jobLockService.getTransactionalLock(lockQName, getTargetLockTimeToLive(), getTargetLockRetryWait(), getTargetLockRetryCount()); if(fgLogger.isDebugEnabled())
lockTime = new Date().getTime(); {
fgLogger.debug("target lock refresh time :" + getTargetLockRefreshTime() + "targetLockRetryWait:" + targetLockRetryWait + "targetLockRetryCount:" + targetLockRetryCount);
}
lockToken = jobLockService.getLock(lockQName, targetLockRefreshTime, targetLockRetryWait, targetLockRetryCount);
synchronized(this)
{
active = true;
}
if (fgLogger.isDebugEnabled()) if (fgLogger.isDebugEnabled())
{ {
fgLogger.debug("lock taken" + lockQName); fgLogger.debug("lock taken:" + lockQName);
}
// We may have taken so long to begin that we have already timed out !
checkLock();
fgLogger.debug("register lock callback, target lock refresh time :" + getTargetLockRefreshTime());
jobLockService.refreshLock(lockToken, lockQName, getTargetLockRefreshTime(), this);
fgLogger.debug("callback registered");
}
/**
* Refresh the lock - called as the business process progresses.
*
* Called on main deployment thread.
* @throws AVMException (Lock timeout)
*/
public void checkLock()
{
// Do I need to sync this?
if(active)
{
Date now = new Date();
if(now.getTime() > lastActive.getTime() + targetLockTimeToLive)
{
// lock time to live has expired.
MessageFormat f = new MessageFormat("Deployment Lock timeout, lock time to live exceeded, timeout:{0}mS time since last activity:{1}mS");
Object[] objs = {new Long(targetLockTimeToLive), new Long(now.getTime() - lastActive.getTime()) };
throw new AVMException(f.format(objs));
}
// Update lastActive to 1S boundary
if(now.getTime() > lastActive.getTime() + 1000)
{
lastActive = new Date();
fgLogger.debug("lastActive:" + lastActive);
}
}
else
{
// lock not active. Has been switched off by Job Lock Service.
MessageFormat f = new MessageFormat("Lock timeout, lock not active");
Object[] objs = { };
throw new AVMException(f.format(objs));
} }
} }
public void refreshLock()
{
/** /**
* Optimisation to stop the lock being refreshed thousands of times, refresh lock only after half lock time has expired * Release the lock
*
* Called on main deployment thread
*/ */
Date now = new Date(); public void releaseLock()
if(now.getTime() - lockTime > (targetLockTimeToLive / 2))
{ {
if (fgLogger.isDebugEnabled()) if(fgLogger.isDebugEnabled())
{ {
fgLogger.debug("lock refreshed" + lockQName); fgLogger.debug("deployment service about to releaseLock : " + lockQName);
} }
jobLockService.getTransactionalLock(lockQName, getTargetLockTimeToLive(), getTargetLockRetryWait(), getTargetLockRetryCount()); if(active)
lockTime = new Date().getTime(); {
jobLockService.releaseLock(lockToken, lockQName);
}
fgLogger.debug("setting active = false" + lockQName);
// may need to sync this
synchronized(this)
{
active = false;
}
}
/**
* Job Lock Callback
*
* Callback from the job lock service. Is the deployment active?
*/
@Override
public boolean isActive()
{
// may need to sync active flag
if(fgLogger.isDebugEnabled())
{
fgLogger.debug("deployment service callback active: " + active);
}
synchronized(this)
{
return active;
} }
} }
/**
* Job Lock Callback.
*/
@Override
public void lockReleased()
{
fgLogger.debug("deployment service: lock released callback");
synchronized(this)
{
active = false;
}
}
} }
@@ -1978,4 +2130,21 @@ public class DeploymentServiceImpl implements DeploymentService
} }
return false; return false;
} }
public void setTargetLockRefreshTime(long targetLockRefreshTime)
{
this.targetLockRefreshTime = targetLockRefreshTime;
}
/**
* How long to keep a lock before refreshing it?
* <p>
* Short time-out, typically a minute.
* @return the time in mS for how long to keep the lock.
*/
public long getTargetLockRefreshTime()
{
return targetLockRefreshTime;
}
} }

View File

@@ -40,6 +40,8 @@ import org.alfresco.service.cmr.avm.deploy.DeploymentService;
import org.alfresco.service.cmr.repository.ContentWriter; import org.alfresco.service.cmr.repository.ContentWriter;
import org.alfresco.util.Deleter; import org.alfresco.util.Deleter;
import org.alfresco.util.NameMatcher; import org.alfresco.util.NameMatcher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.support.FileSystemXmlApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext;
/** /**
@@ -47,7 +49,7 @@ import org.springframework.context.support.FileSystemXmlApplicationContext;
* @author britt * @author britt
* @author mrogers * @author mrogers
*/ */
public class FSDeploymentTest extends AVMServiceTestBase public class DeploymentServiceImplFSTest extends AVMServiceTestBase
{ {
private File log = null; private File log = null;
private File metadata = null; private File metadata = null;
@@ -60,6 +62,8 @@ public class FSDeploymentTest extends AVMServiceTestBase
DeploymentService service = null; DeploymentService service = null;
private static Log logger = LogFactory.getLog(DeploymentServiceImplFSTest.class);
@Override @Override
protected void setUp() throws Exception protected void setUp() throws Exception
@@ -410,6 +414,7 @@ public class FSDeploymentTest extends AVMServiceTestBase
*/ */
public void testWrongPassword() public void testWrongPassword()
{ {
logger.debug("Start testWrongPassword");
try { try {
service.deployDifferenceFS(-1, "main:/", "default", "localhost", 44100, TEST_USER, "Wrong!", TEST_TARGET, null, false, false, false, null); service.deployDifferenceFS(-1, "main:/", "default", "localhost", 44100, TEST_USER, "Wrong!", TEST_TARGET, null, false, false, false, null);
@@ -428,6 +433,7 @@ public class FSDeploymentTest extends AVMServiceTestBase
*/ */
public void testWrongTarget() public void testWrongTarget()
{ {
logger.debug("Start testWrongTarget");
try { try {
service.deployDifferenceFS(-1, "main:/", "default", "localhost", 44100, TEST_USER, TEST_PASSWORD, "crapTarget", null, false, false, false, null); service.deployDifferenceFS(-1, "main:/", "default", "localhost", 44100, TEST_USER, TEST_PASSWORD, "crapTarget", null, false, false, false, null);
fail("Wrong target should have thrown an exception"); fail("Wrong target should have thrown an exception");
@@ -443,6 +449,7 @@ public class FSDeploymentTest extends AVMServiceTestBase
*/ */
public void testNoExclusionFilter() throws Exception public void testNoExclusionFilter() throws Exception
{ {
logger.debug("Start testNoExclusionFilter");
DeploymentReport report = new DeploymentReport(); DeploymentReport report = new DeploymentReport();
List<DeploymentCallback> callbacks = new ArrayList<DeploymentCallback>(); List<DeploymentCallback> callbacks = new ArrayList<DeploymentCallback>();
callbacks.add(new DeploymentReportCallback(report)); callbacks.add(new DeploymentReportCallback(report));
@@ -471,6 +478,7 @@ public class FSDeploymentTest extends AVMServiceTestBase
*/ */
public void testRevertToPreviousVersion() throws Exception public void testRevertToPreviousVersion() throws Exception
{ {
logger.debug("Start testRevertToPreviousVersion");
DeploymentReport report = new DeploymentReport(); DeploymentReport report = new DeploymentReport();
List<DeploymentCallback> callbacks = new ArrayList<DeploymentCallback>(); List<DeploymentCallback> callbacks = new ArrayList<DeploymentCallback>();
callbacks.add(new DeploymentReportCallback(report)); callbacks.add(new DeploymentReportCallback(report));
@@ -527,6 +535,7 @@ public class FSDeploymentTest extends AVMServiceTestBase
*/ */
public void testBulkLoad() throws Exception public void testBulkLoad() throws Exception
{ {
logger.debug("Start testBulkLoad");
DeploymentReport report = new DeploymentReport(); DeploymentReport report = new DeploymentReport();
List<DeploymentCallback> callbacks = new ArrayList<DeploymentCallback>(); List<DeploymentCallback> callbacks = new ArrayList<DeploymentCallback>();
callbacks.add(new DeploymentReportCallback(report)); callbacks.add(new DeploymentReportCallback(report));