Added callback-based JobLockService refreshLock

- JobLockService.refreshLock(..., JobLockRefreshCallback)
 - Added to support ALF-4898: Deployment: Reduce lock TTL


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@22812 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Derek Hulley
2010-10-01 14:50:43 +00:00
parent d82de33326
commit 71f1312474
3 changed files with 667 additions and 273 deletions

View File

@@ -44,8 +44,9 @@ public interface JobLockService
/** /**
* Take a transactionally-managed lock. This method can be called repeatedly to both * Take a transactionally-managed lock. This method can be called repeatedly to both
* initially acquire the lock as well as to maintain the lock. This method should * initially acquire the lock as well as to maintain the lock. This method should
* either be called again before the lock expires or the transaction should end before * be called repeatedly during the transaction to ensure that the lock remains refreshed.
* the lock expires. * <b>DO NOT</b> use a long-lived lock to avoid calling this method at intervals; long-lived
* locks get left behind during server crashes, amongst other things.
* <p> * <p>
* The following rules apply to taking and releasing locks:<br> * The following rules apply to taking and releasing locks:<br>
* - Expired locks can be taken by any process<br> * - Expired locks can be taken by any process<br>
@@ -58,20 +59,42 @@ public interface JobLockService
* release the owned locks will invalidate the transaction and cause rollback. * release the owned locks will invalidate the transaction and cause rollback.
* *
* @param lockQName the name of the lock to acquire * @param lockQName the name of the lock to acquire
* @param timeToLive the time (in milliseconds) for the lock to remain valid * @param timeToLive the time (in milliseconds) for the lock to remain valid.
* This value <b>must not</b> be larger than either the anticipated
* operation time or a server startup time. Typically, it should be
* a few seconds.
* @throws LockAcquisitionException if the lock could not be acquired * @throws LockAcquisitionException if the lock could not be acquired
* @throws IllegalStateException if a transaction is not active * @throws IllegalStateException if a transaction is not active
*/ */
void getTransactionalLock(QName lockQName, long timeToLive); void getTransactionalLock(QName lockQName, long timeToLive);
/** /**
* {@inheritDoc JobLockService#getTransactionalLock(QName, long)} * Take a transactionally-managed lock. This method can be called repeatedly to both
* initially acquire the lock as well as to maintain the lock. This method should
* be called repeatedly during the transaction to ensure that the lock remains refreshed.
* <b>DO NOT</b> use a long-lived lock to avoid calling this method at intervals; long-lived
* locks get left behind during server crashes, amongst other things.
* <p>
* The following rules apply to taking and releasing locks:<br>
* - Expired locks can be taken by any process<br>
* - Lock expiration does not prevent a lock from being refreshed or released<br>
* - Only locks that were manipulated using another token will cause failure
* <p>
* The locks are automatically released when the transaction is terminated.
* <p>
* Any failure to acquire the lock (after retries), refresh the lock or subsequently
* release the owned locks will invalidate the transaction and cause rollback.
* <p> * <p>
* If the lock cannot be immediately acquired, the process will wait and retry. Note * If the lock cannot be immediately acquired, the process will wait and retry. Note
* that second and subsequent attempts to get the lock during a transaction cannot * that second and subsequent attempts to get the lock during a transaction cannot
* make use of retrying; the lock is actually being refreshed and will therefore never * make use of retrying; the lock is actually being refreshed and will therefore never
* become valid if it doesn't refresh directly. * become valid if it doesn't refresh directly.
* *
* @param lockQName the name of the lock to acquire
* @param timeToLive the time (in milliseconds) for the lock to remain valid.
* This value <b>must not</b> be larger than either the anticipated
* operation time or a server startup time. Typically, it should be
* a few seconds.
* @param retryWait the time (in milliseconds) to wait before trying again * @param retryWait the time (in milliseconds) to wait before trying again
* @param retryCount the maximum number of times to attempt the lock acquisition * @param retryCount the maximum number of times to attempt the lock acquisition
* @throws LockAcquisitionException if the lock could not be acquired * @throws LockAcquisitionException if the lock could not be acquired
@@ -87,22 +110,34 @@ public interface JobLockService
* available by expiry. No deadlock management is provided, either. * available by expiry. No deadlock management is provided, either.
* *
* @param lockQName the name of the lock to acquire * @param lockQName the name of the lock to acquire
* @param timeToLive the time (in milliseconds) for the lock to remain valid * @param timeToLive the time (in milliseconds) for the lock to remain valid.
* This value <b>must not</b> be larger than either the anticipated
* operation time or a server startup time. Typically, it should be
* a few seconds.
* @return Returns the newly-created lock token * @return Returns the newly-created lock token
* @throws LockAcquisitionException if the lock could not be acquired * @throws LockAcquisitionException if the lock could not be acquired
*/ */
String getLock(QName lockQName, long timeToLive); String getLock(QName lockQName, long timeToLive);
/** /**
* {@inheritDoc JobLockService#getLock(QName, long)} * Take a manually-managed lock. The lock current thread or transaction will not be tagged -
* the returned lock token must be used for further management of the lock.
* <p>
* No lock management is provided: the lock must be released manually or will only become
* available by expiry. No deadlock management is provided, either.
* <p> * <p>
* If the lock cannot be immediately acquired, the process will wait and retry. * If the lock cannot be immediately acquired, the process will wait and retry.
* *
* @param lockQName the name of the lock to acquire
* @param timeToLive the time (in milliseconds) for the lock to remain valid.
* This value <b>must not</b> be larger than either the anticipated
* operation time or a server startup time. Typically, it should be
* a few seconds.
* @param retryWait the time (in milliseconds) to wait before trying again * @param retryWait the time (in milliseconds) to wait before trying again
* @param retryCount the maximum number of times to attempt the lock acquisition * @param retryCount the maximum number of times to attempt the lock acquisition
* @throws LockAcquisitionException if the lock could not be acquired * @throws LockAcquisitionException if the lock could not be acquired
*/ */
String getLock(QName lockQName, long timeToLive, long retryWait, int retryCount); String getLock(QName lockQName, long timeToLive, long retryWait, int retryCount);
/** /**
* Refresh the lock using a valid lock token. * Refresh the lock using a valid lock token.
@@ -114,6 +149,25 @@ public interface JobLockService
*/ */
void refreshLock(String lockToken, QName lockQName, long timeToLive); void refreshLock(String lockToken, QName lockQName, long timeToLive);
/**
* Provide a callback to refresh a lock using a valid lock token, pushing responsibility
* for regular lock refreshing onto the service implementation code. This method should only
* be called <u>once for a given lock token</u> to prevent unnecessary refreshing.
* <p/>
* Since the lock is not actually refreshed by this method, there will be no LockAcquisitionException.
* <p/>
* The TTL (time to live) will be divided by two and the result used to trigger a timer thread
* to initiate the callback.
*
* @param lockToken the lock token returned when the lock was acquired
* @param lockQName the name of the previously-acquired lock
* @param timeToLive the time (in milliseconds) for the lock to remain valid
* @param callback the object that will be called at intervals of timeToLive/2 (about)
*
* @since 3.4.0a
*/
void refreshLock(String lockToken, QName lockQName, long timeToLive, JobLockRefreshCallback callback);
/** /**
* Release the lock using a valid lock token. * Release the lock using a valid lock token.
* *
@@ -121,4 +175,64 @@ public interface JobLockService
* @param lockQName the name of the previously-acquired lock * @param lockQName the name of the previously-acquired lock
*/ */
void releaseLock(String lockToken, QName lockQName); void releaseLock(String lockToken, QName lockQName);
/**
* Interface for implementations that need a timed callback in order to refresh the lock.
* <p/>
* This callback is designed for processes that need to lock and wait for external processes
* to complete; keeping a local thread to refresh the lock is possible but it is more
* efficient for the thread pool and timer mechanisms to be shared.
* <p/>
* The callback implementations <b>must</b> be thread-safe and <b>should be</b> independent
* of other callbacks i.e. the simplest and safest is to use an anonymous inner class for
* the implementation.
* <p/>
* <b>IMPORTANT:</b> Do not block the calls to this interface - other callbacks might be held
* up producing inconsistent behaviour. Failure to observe this will lead
* to warnings and lock termination i.e. the service implementation will
* force early termination of the lock and will discard the callback.
*
* @author Derek Hulley
* @since 3.4.0b
*/
public interface JobLockRefreshCallback
{
/**
* Timed callback from the service to determine if the lock is still required.
* <p/>
* <b>IMPORTANT:</b> Do not block calls to this method for any reason and do perform any
* non-trivial determination of state i.e. have the answer to this
* method immediately available at all times. Failure to observe this
* will lead to warnings and lock termination.
* <p/>
* The original lock token <b>is not</b> provided in the callback; this is to prevent
* implementations from attempting to link the lock token back to the specific callback
* instances.
*
* @return Return <tt>true</tt> if the task associated with the callback
* is still active i.e. it still needs the lock associated with the
* callback or <tt>false</tt> if the lock is no longer required.
*
* @since 3.4.0b
*/
boolean isActive();
/**
* Callback received when the lock refresh has failed. Implementations should immediately and
* gracefully terminate their associated processes as the associated lock is no longer valid,
* which is a direct indication that a competing process has taken and is using the required
* lock or that the process has already completed and released the lock.
* <p/>
* As a convenenience, this method is called when a VM shutdown is detected as well; the
* associated lock is not refreshed and this method is called to instruct the locking process
* to terminate.
* <p/>
* This method is also called if the initiating process is self-terminated i.e. if the originating
* process releases the lock itself. This method is not called if the process is not
* {@link #isActive() active}.
*
* @since 3.4.0b
*/
void lockReleased();
}
} }

View File

@@ -19,6 +19,9 @@
package org.alfresco.repo.lock; package org.alfresco.repo.lock;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.alfresco.repo.domain.locks.LockDAO; import org.alfresco.repo.domain.locks.LockDAO;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport; import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
@@ -29,9 +32,12 @@ import org.alfresco.repo.transaction.AlfrescoTransactionSupport.TxnReadState;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.namespace.QName; import org.alfresco.service.namespace.QName;
import org.alfresco.util.GUID; import org.alfresco.util.GUID;
import org.alfresco.util.VmShutdownListener;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.sun.star.uno.RuntimeException;
/** /**
* {@inheritDoc JobLockService} * {@inheritDoc JobLockService}
* *
@@ -49,6 +55,9 @@ public class JobLockServiceImpl implements JobLockService
private int defaultRetryCount; private int defaultRetryCount;
private long defaultRetryWait; private long defaultRetryWait;
private ScheduledExecutorService scheduler;
private VmShutdownListener shutdownListener;
/** /**
* Stateless listener that does post-transaction cleanup. * Stateless listener that does post-transaction cleanup.
*/ */
@@ -59,6 +68,8 @@ public class JobLockServiceImpl implements JobLockService
defaultRetryWait = 20; defaultRetryWait = 20;
defaultRetryCount = 10; defaultRetryCount = 10;
txnListener = new LockTransactionListener(); txnListener = new LockTransactionListener();
scheduler = Executors.newScheduledThreadPool(1);
shutdownListener = new VmShutdownListener("JobLockService");
} }
/** /**
@@ -228,6 +239,155 @@ public class JobLockServiceImpl implements JobLockService
} }
} }
/**
* {@inheritDoc}
*/
@Override
public void refreshLock(
final String lockToken, final QName lockQName, final long timeToLive,
final JobLockRefreshCallback callback)
{
// Do nothing if the scheduler has shut down
if (scheduler.isShutdown() || scheduler.isTerminated())
{
if (logger.isDebugEnabled())
{
logger.debug(
"Lock refresh failed: \n" +
" Lock: " + lockQName + "\n" +
" TTL: " + timeToLive + "\n" +
" Txn: " + lockToken + "\n" +
" Error: " + "Lock refresh scheduler has shut down. The VM may be terminating.");
}
// Don't schedule
throw new LockAcquisitionException(lockQName, lockToken);
}
final long delay = timeToLive / 2;
if (delay < 1)
{
throw new IllegalArgumentException("Very small timeToLive: " + timeToLive);
}
// Our runnable does the callbacks
Runnable runnable = new Runnable()
{
@Override
public void run()
{
// Most lock debug is done elsewhere; just note that this is a timed process.
if (logger.isDebugEnabled())
{
logger.debug(
"Initiating timed Lock refresh: \n" +
" Lock: " + lockQName + "\n" +
" TTL: " + timeToLive + "\n" +
" Txn: " + lockToken);
}
// First check the VM
if (shutdownListener.isVmShuttingDown())
{
callback.lockReleased();
return;
}
boolean isActive = false;
try
{
isActive = callIsActive(callback, delay);
}
catch (Throwable e)
{
logger.error(
"Lock isActive check failed: \n" +
" Lock: " + lockQName + "\n" +
" TTL: " + timeToLive + "\n" +
" Txn: " + lockToken,
e);
// The callback must be informed
callLockReleased(callback);
return;
}
if (!isActive)
{
// Debug
if (logger.isDebugEnabled())
{
logger.debug(
"Lock callback is inactive. Releasing lock: \n" +
" Lock: " + lockQName + "\n" +
" TTL: " + timeToLive + "\n" +
" Txn: " + lockToken);
}
// The callback is no longer active, so we don't need to refresh.
// Release the lock in case the initiator did not do it.
try
{
releaseLock(lockToken, lockQName);
}
catch (LockAcquisitionException e)
{
// The lock is already gone: job done
}
// The callback must be informed
callLockReleased(callback);
}
else
{
try
{
refreshLock(lockToken, lockQName, timeToLive);
// Success. The callback does not need to know.
}
catch (LockAcquisitionException e)
{
// The callback must be informed
callLockReleased(callback);
}
}
}
};
// Schedule this
scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
/**
* Calls the callback {@link JobLockRefreshCallback#isActive() isActive} with time-check.
*/
private boolean callIsActive(JobLockRefreshCallback callback, long delay) throws Throwable
{
long t1 = System.nanoTime();
boolean isActive = callback.isActive();
long t2 = System.nanoTime();
double timeWastedMs = (double)(t2 - t1)/(double)10E6;
if (timeWastedMs > delay || timeWastedMs > 1000L)
{
// The isActive did not come back quickly enough. There is no point taking longer than
// the delay, but in any case 1s to provide a boolean is too much. This is probably an
// indication that the isActive implementation is performing complex state determination,
// which is specifically referenced in the API doc.
throw new RuntimeException(
"isActive check took " + timeWastedMs + " to return, which is too long.");
}
return isActive;
}
/**
* Calls the callback {@link JobLockRefreshCallback#lockReleased()}.
*/
private void callLockReleased(JobLockRefreshCallback callback)
{
try
{
callback.lockReleased();
}
catch (Throwable ee)
{
logger.error("Callback to lockReleased failed.", ee);
}
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View File

@@ -1,4 +1,4 @@
/* /*
* Copyright (C) 2005-2010 Alfresco Software Limited. * Copyright (C) 2005-2010 Alfresco Software Limited.
* *
* This file is part of Alfresco * This file is part of Alfresco
@@ -14,268 +14,388 @@
* GNU Lesser General Public License for more details. * GNU Lesser General Public License for more details.
* *
* You should have received a copy of the GNU Lesser General Public License * You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>. * along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.alfresco.repo.lock; package org.alfresco.repo.lock;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.alfresco.repo.domain.locks.LockDAO; import org.alfresco.repo.domain.locks.LockDAO;
import org.alfresco.repo.transaction.RetryingTransactionHelper; import org.alfresco.repo.lock.JobLockService.JobLockRefreshCallback;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.repo.transaction.RetryingTransactionHelper;
import org.alfresco.service.ServiceRegistry; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.namespace.QName; import org.alfresco.service.ServiceRegistry;
import org.alfresco.service.transaction.TransactionService; import org.alfresco.service.namespace.QName;
import org.alfresco.util.ApplicationContextHelper; import org.alfresco.service.transaction.TransactionService;
import org.springframework.context.ApplicationContext; import org.alfresco.util.ApplicationContextHelper;
import org.springframework.context.ApplicationContext;
/**
* Tests the high-level capabilities provided by the service implementation. The DAO tests /**
* stress the underlying database work, so we only need to deal with deadlock resolution, etc. * Tests the high-level capabilities provided by the service implementation. The DAO tests
* * stress the underlying database work, so we only need to deal with deadlock resolution, etc.
* @see JobLockService the service being tested *
* @see LockDAO the DAO being indirectly tested * @see JobLockService the service being tested
* * @see LockDAO the DAO being indirectly tested
* @author Derek Hulley *
* @since 3.2 * @author Derek Hulley
*/ * @since 3.2
public class JobLockServiceTest extends TestCase */
{ public class JobLockServiceTest extends TestCase
public static final String NAMESPACE = "http://www.alfresco.org/test/JobLockServiceTest"; {
public static final String NAMESPACE = "http://www.alfresco.org/test/JobLockServiceTest";
private ApplicationContext ctx = ApplicationContextHelper.getApplicationContext();
private ApplicationContext ctx = ApplicationContextHelper.getApplicationContext();
private TransactionService transactionService;
private RetryingTransactionHelper txnHelper; private TransactionService transactionService;
private JobLockService jobLockService; private RetryingTransactionHelper txnHelper;
// Lock names for the tests private JobLockService jobLockService;
private QName lockA; // Lock names for the tests
private QName lockAA; private QName lockA;
private QName lockAAA; private QName lockAA;
private QName lockAAB; private QName lockAAA;
private QName lockAAC; private QName lockAAB;
private QName lockAB; private QName lockAAC;
private QName lockABA; private QName lockAB;
private QName lockABB; private QName lockABA;
private QName lockABC; private QName lockABB;
private QName lockABC;
@Override
public void setUp() throws Exception @Override
{ public void setUp() throws Exception
ServiceRegistry serviceRegistry = (ServiceRegistry) ctx.getBean(ServiceRegistry.SERVICE_REGISTRY); {
transactionService = serviceRegistry.getTransactionService(); ServiceRegistry serviceRegistry = (ServiceRegistry) ctx.getBean(ServiceRegistry.SERVICE_REGISTRY);
txnHelper = transactionService.getRetryingTransactionHelper(); transactionService = serviceRegistry.getTransactionService();
txnHelper = transactionService.getRetryingTransactionHelper();
jobLockService = (JobLockService) ctx.getBean("jobLockService");
jobLockService = (JobLockService) ctx.getBean("jobLockService");
// Get the test name
String testName = getName(); // Get the test name
// Build lock names for the test String testName = getName();
lockA = QName.createQName(NAMESPACE, "a-" + testName); // Build lock names for the test
lockAA = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName); lockA = QName.createQName(NAMESPACE, "a-" + testName);
lockAAA = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".a-" + testName); lockAA = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName);
lockAAB = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".b-" + testName); lockAAA = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".a-" + testName);
lockAAC = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".c-" + testName); lockAAB = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".b-" + testName);
lockAB = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName); lockAAC = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".c-" + testName);
lockABA = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".a-" + testName); lockAB = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName);
lockABB = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".b-" + testName); lockABA = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".a-" + testName);
lockABC = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".c-" + testName); lockABB = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".b-" + testName);
} lockABC = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".c-" + testName);
}
public void testSetUp()
{ public void testSetUp()
assertNotNull(jobLockService); {
} assertNotNull(jobLockService);
}
public void testEnforceTxn()
{ public void testEnforceTxn()
try {
{ try
jobLockService.getTransactionalLock(lockAAA, 50L); {
fail("Service did not enforce the presence of a transaction"); jobLockService.getTransactionalLock(lockAAA, 50L);
} fail("Service did not enforce the presence of a transaction");
catch (IllegalStateException e) }
{ catch (IllegalStateException e)
// Expected {
} // Expected
} }
}
/**
* Checks that the lock can be aquired by a read-only transaction i.e. that locking is /**
* independent of the outer transaction. * Checks that the lock can be aquired by a read-only transaction i.e. that locking is
*/ * independent of the outer transaction.
public void testLockInReadOnly() throws Exception */
{ public void testLockInReadOnly() throws Exception
RetryingTransactionCallback<Object> lockCallback = new RetryingTransactionCallback<Object>() {
{ RetryingTransactionCallback<Object> lockCallback = new RetryingTransactionCallback<Object>()
public Object execute() throws Throwable {
{ public Object execute() throws Throwable
jobLockService.getTransactionalLock(lockAAA, 500); {
return null; jobLockService.getTransactionalLock(lockAAA, 500);
} return null;
}; }
txnHelper.doInTransaction(lockCallback, true, true); };
} txnHelper.doInTransaction(lockCallback, true, true);
}
/**
* Checks that locks are released on commit /**
*/ * Checks that locks are released on commit
public void testLockReleaseOnCommit() throws Exception */
{ public void testLockReleaseOnCommit() throws Exception
RetryingTransactionCallback<Object> lockCallback = new RetryingTransactionCallback<Object>() {
{ RetryingTransactionCallback<Object> lockCallback = new RetryingTransactionCallback<Object>()
public Object execute() throws Throwable {
{ public Object execute() throws Throwable
jobLockService.getTransactionalLock(lockAAA, 5000); {
return null; jobLockService.getTransactionalLock(lockAAA, 5000);
} return null;
}; }
txnHelper.doInTransaction(lockCallback, true, true); };
// The lock should be free now, even though the TTL was high txnHelper.doInTransaction(lockCallback, true, true);
RetryingTransactionCallback<Object> lockCheckCallback = new RetryingTransactionCallback<Object>() // The lock should be free now, even though the TTL was high
{ RetryingTransactionCallback<Object> lockCheckCallback = new RetryingTransactionCallback<Object>()
public Object execute() throws Throwable {
{ public Object execute() throws Throwable
jobLockService.getTransactionalLock(lockAAA, 50); {
return null; jobLockService.getTransactionalLock(lockAAA, 50);
} return null;
}; }
txnHelper.doInTransaction(lockCheckCallback, true, true); };
} txnHelper.doInTransaction(lockCheckCallback, true, true);
}
/**
* Checks that locks are released on rollback /**
*/ * Checks that locks are released on rollback
public void testLockReleaseOnRollback() throws Exception */
{ public void testLockReleaseOnRollback() throws Exception
RetryingTransactionCallback<Object> lockCallback = new RetryingTransactionCallback<Object>() {
{ RetryingTransactionCallback<Object> lockCallback = new RetryingTransactionCallback<Object>()
public Object execute() throws Throwable {
{ public Object execute() throws Throwable
jobLockService.getTransactionalLock(lockAAA, 5000); {
throw new UnsupportedOperationException("ALERT!"); jobLockService.getTransactionalLock(lockAAA, 5000);
} throw new UnsupportedOperationException("ALERT!");
}; }
try };
{ try
txnHelper.doInTransaction(lockCallback, true, true); {
fail("Expected transaction failure"); txnHelper.doInTransaction(lockCallback, true, true);
} fail("Expected transaction failure");
catch (UnsupportedOperationException e) }
{ catch (UnsupportedOperationException e)
// Expected {
} // Expected
// The lock should be free now, even though the TTL was high }
RetryingTransactionCallback<Object> lockCheckCallback = new RetryingTransactionCallback<Object>() // The lock should be free now, even though the TTL was high
{ RetryingTransactionCallback<Object> lockCheckCallback = new RetryingTransactionCallback<Object>()
public Object execute() throws Throwable {
{ public Object execute() throws Throwable
jobLockService.getTransactionalLock(lockAAA, 50); {
return null; jobLockService.getTransactionalLock(lockAAA, 50);
} return null;
}; }
txnHelper.doInTransaction(lockCheckCallback, true, true); };
} txnHelper.doInTransaction(lockCheckCallback, true, true);
}
/**
* Sets up two threads in a deadlock scenario. Each of the threads has a long wait timeout /**
* for the required locks. If there were a deadlock, the shorter of the the wait times would * Sets up two threads in a deadlock scenario. Each of the threads has a long wait timeout
* be how long it would take before one of them is thrown out. Firstly, we check that one * for the required locks. If there were a deadlock, the shorter of the the wait times would
* of the threads <i>is</i> thrown out. Then we check that the thread is thrown out quickly. * be how long it would take before one of them is thrown out. Firstly, we check that one
*/ * of the threads <i>is</i> thrown out. Then we check that the thread is thrown out quickly.
public synchronized void testDeadlockPrevention() throws Throwable */
{ public synchronized void testDeadlockPrevention() throws Throwable
DeadlockingThread t1 = new DeadlockingThread(lockAAA, lockAAB); {
DeadlockingThread t2 = new DeadlockingThread(lockAAB, lockAAA); DeadlockingThread t1 = new DeadlockingThread(lockAAA, lockAAB);
// Start them DeadlockingThread t2 = new DeadlockingThread(lockAAB, lockAAA);
t1.start(); // Start them
t2.start(); t1.start();
// They can take their first locks (there should be no contention) t2.start();
t1.incrementNextLock(); // They can take their first locks (there should be no contention)
t2.incrementNextLock(); t1.incrementNextLock();
// Wait for them to do this t2.incrementNextLock();
try { this.wait(2000L); } catch (InterruptedException e) {} // Wait for them to do this
// Advance again try { this.wait(2000L); } catch (InterruptedException e) {}
t1.incrementNextLock(); // Advance again
t2.incrementNextLock(); t1.incrementNextLock();
// Wait for them to do this t2.incrementNextLock();
try { this.wait(2000L); } catch (InterruptedException e) {} // Wait for them to do this
// Advance again, to end threads try { this.wait(2000L); } catch (InterruptedException e) {}
t1.incrementNextLock(); // Advance again, to end threads
t2.incrementNextLock(); t1.incrementNextLock();
// Wait for them to end (commit/rollback) t2.incrementNextLock();
try { this.wait(2000L); } catch (InterruptedException e) {} // Wait for them to end (commit/rollback)
try { this.wait(2000L); } catch (InterruptedException e) {}
if (t1.otherFailure != null)
{ if (t1.otherFailure != null)
throw t1.otherFailure; {
} throw t1.otherFailure;
if (t2.otherFailure != null) }
{ if (t2.otherFailure != null)
throw t2.otherFailure; {
} throw t2.otherFailure;
assertNull("T1 should have succeeded as the ordered locker: " + t1.lockFailure, t1.lockFailure); }
assertNotNull("T2 should have failed as the unordered locker.", t2.lockFailure); assertNull("T1 should have succeeded as the ordered locker: " + t1.lockFailure, t1.lockFailure);
} assertNotNull("T2 should have failed as the unordered locker.", t2.lockFailure);
}
private class DeadlockingThread extends Thread
{ private class DeadlockingThread extends Thread
private final QName[] lockQNames; {
private volatile int nextLock = -1; private final QName[] lockQNames;
private LockAcquisitionException lockFailure; private volatile int nextLock = -1;
private Throwable otherFailure; private LockAcquisitionException lockFailure;
private Throwable otherFailure;
private DeadlockingThread(QName ... lockQNames)
{ private DeadlockingThread(QName ... lockQNames)
super("DeadlockingThread"); {
this.lockQNames = lockQNames; super("DeadlockingThread");
setDaemon(true); this.lockQNames = lockQNames;
} setDaemon(true);
}
private void incrementNextLock()
{ private void incrementNextLock()
nextLock++; {
} nextLock++;
}
@Override
public void run() @Override
{ public void run()
RetryingTransactionCallback<Object> runCallback = new RetryingTransactionCallback<Object>() {
{ RetryingTransactionCallback<Object> runCallback = new RetryingTransactionCallback<Object>()
public synchronized Object execute() throws Throwable {
{ public synchronized Object execute() throws Throwable
int currentLock = -1; {
// Take the locks in turn, quitting when told to take a lock that's not there int currentLock = -1;
while (currentLock < lockQNames.length - 1) // Take the locks in turn, quitting when told to take a lock that's not there
{ while (currentLock < lockQNames.length - 1)
// Check if we have been instructed to take a lock {
if (nextLock > currentLock) // Check if we have been instructed to take a lock
{ if (nextLock > currentLock)
// Advance and grab the lock {
currentLock++; // Advance and grab the lock
jobLockService.getTransactionalLock(lockQNames[currentLock], 5000L); currentLock++;
} jobLockService.getTransactionalLock(lockQNames[currentLock], 5000L);
else }
{ else
// No advance, so wait a bit more {
try { this.wait(20L); } catch (InterruptedException e) {} // No advance, so wait a bit more
} try { this.wait(20L); } catch (InterruptedException e) {}
} }
return null; }
} return null;
}; }
try };
{ try
txnHelper.doInTransaction(runCallback, true); {
} txnHelper.doInTransaction(runCallback, true);
catch (LockAcquisitionException e) }
{ catch (LockAcquisitionException e)
lockFailure = e; {
} lockFailure = e;
catch (Throwable e) }
{ catch (Throwable e)
otherFailure = e; {
} otherFailure = e;
} }
} }
} }
public synchronized void testLockCallbackReleaseInactive() throws Exception
{
final QName lockQName = QName.createQName(NAMESPACE, getName());
final long lockTTL = 1000L;
final String lockToken = jobLockService.getLock(lockQName, lockTTL);
final boolean[] released = new boolean[1];
// Immediately-inactive job
JobLockRefreshCallback callback = new JobLockRefreshCallback()
{
@Override
public boolean isActive()
{
return false;
}
@Override
public void lockReleased()
{
released[0] = true;
}
};
jobLockService.refreshLock(lockToken, lockQName, lockTTL, callback);
// The first refresh will occur in 500ms
wait(1000L);
// Should have been released by now
assertTrue("Expected lockReleased to have been called", released[0]);
try
{
jobLockService.getLock(lockQName, lockTTL);
}
catch (LockAcquisitionException e)
{
fail("Lock should have been released by callback infrastructure");
}
}
public synchronized void testLockCallbackReleaseSelf() throws Exception
{
final QName lockQName = QName.createQName(NAMESPACE, getName());
final long lockTTL = 1000L;
final String lockToken = jobLockService.getLock(lockQName, lockTTL);
final boolean[] released = new boolean[1];
// Immediately-inactive job, releasing the lock
JobLockRefreshCallback callback = new JobLockRefreshCallback()
{
@Override
public boolean isActive()
{
jobLockService.releaseLock(lockToken, lockQName);
return false;
}
@Override
public void lockReleased()
{
released[0] = true;
}
};
jobLockService.refreshLock(lockToken, lockQName, lockTTL, callback);
// The first refresh will occur in 500ms
wait(1000L);
// Should have been released by now
assertTrue("Expected lockReleased to have been called", released[0]);
try
{
jobLockService.getLock(lockQName, lockTTL);
}
catch (LockAcquisitionException e)
{
fail("Lock should have been released by callback infrastructure");
}
}
public synchronized void testLockCallbackReleaseTimed() throws Exception
{
final QName lockQName = QName.createQName(NAMESPACE, getName());
final long lockTTL = 1000L;
final String lockToken = jobLockService.getLock(lockQName, lockTTL);
final boolean[] checked = new boolean[1];
final boolean[] released = new boolean[1];
// Do not release and remain active
JobLockRefreshCallback callback = new JobLockRefreshCallback()
{
@Override
public boolean isActive()
{
checked[0] = true;
return true;
}
@Override
public void lockReleased()
{
released[0] = true;
}
};
jobLockService.refreshLock(lockToken, lockQName, lockTTL, callback);
// The first refresh will occur in 500ms
wait(1000L);
assertTrue("Active check has not occured", checked[0]);
assertFalse("lockReleased should NOT have been called", released[0]);
try
{
jobLockService.getLock(lockQName, lockTTL);
fail("Lock should still be held");
}
catch (LockAcquisitionException e)
{
// Expected
}
}
}