From 71f13124748313b69f70af9908240032ef6eae7f Mon Sep 17 00:00:00 2001 From: Derek Hulley Date: Fri, 1 Oct 2010 14:50:43 +0000 Subject: [PATCH] Added callback-based JobLockService refreshLock - JobLockService.refreshLock(..., JobLockRefreshCallback) - Added to support ALF-4898: Deployment: Reduce lock TTL git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@22812 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261 --- .../alfresco/repo/lock/JobLockService.java | 128 +++- .../repo/lock/JobLockServiceImpl.java | 160 +++++ .../repo/lock/JobLockServiceTest.java | 652 +++++++++++------- 3 files changed, 667 insertions(+), 273 deletions(-) diff --git a/source/java/org/alfresco/repo/lock/JobLockService.java b/source/java/org/alfresco/repo/lock/JobLockService.java index 5ee46742c1..bc158a11a7 100644 --- a/source/java/org/alfresco/repo/lock/JobLockService.java +++ b/source/java/org/alfresco/repo/lock/JobLockService.java @@ -44,8 +44,9 @@ public interface JobLockService /** * Take a transactionally-managed lock. This method can be called repeatedly to both * initially acquire the lock as well as to maintain the lock. This method should - * either be called again before the lock expires or the transaction should end before - * the lock expires. + * be called repeatedly during the transaction to ensure that the lock remains refreshed. + * DO NOT use a long-lived lock to avoid calling this method at intervals; long-lived + * locks get left behind during server crashes, amongst other things. *

* The following rules apply to taking and releasing locks:
* - Expired locks can be taken by any process
@@ -58,20 +59,42 @@ public interface JobLockService * release the owned locks will invalidate the transaction and cause rollback. * * @param lockQName the name of the lock to acquire - * @param timeToLive the time (in milliseconds) for the lock to remain valid + * @param timeToLive the time (in milliseconds) for the lock to remain valid. + * This value must not be larger than either the anticipated + * operation time or a server startup time. Typically, it should be + * a few seconds. * @throws LockAcquisitionException if the lock could not be acquired * @throws IllegalStateException if a transaction is not active */ void getTransactionalLock(QName lockQName, long timeToLive); /** - * {@inheritDoc JobLockService#getTransactionalLock(QName, long)} + * Take a transactionally-managed lock. This method can be called repeatedly to both + * initially acquire the lock as well as to maintain the lock. This method should + * be called repeatedly during the transaction to ensure that the lock remains refreshed. + * DO NOT use a long-lived lock to avoid calling this method at intervals; long-lived + * locks get left behind during server crashes, amongst other things. + *

+ * The following rules apply to taking and releasing locks:
+ * - Expired locks can be taken by any process
+ * - Lock expiration does not prevent a lock from being refreshed or released
+ * - Only locks that were manipulated using another token will cause failure + *

+ * The locks are automatically released when the transaction is terminated. + *

+ * Any failure to acquire the lock (after retries), refresh the lock or subsequently + * release the owned locks will invalidate the transaction and cause rollback. *

* If the lock cannot be immediately acquired, the process will wait and retry. Note * that second and subsequent attempts to get the lock during a transaction cannot * make use of retrying; the lock is actually being refreshed and will therefore never * become valid if it doesn't refresh directly. * + * @param lockQName the name of the lock to acquire + * @param timeToLive the time (in milliseconds) for the lock to remain valid. + * This value must not be larger than either the anticipated + * operation time or a server startup time. Typically, it should be + * a few seconds. * @param retryWait the time (in milliseconds) to wait before trying again * @param retryCount the maximum number of times to attempt the lock acquisition * @throws LockAcquisitionException if the lock could not be acquired @@ -87,22 +110,34 @@ public interface JobLockService * available by expiry. No deadlock management is provided, either. * * @param lockQName the name of the lock to acquire - * @param timeToLive the time (in milliseconds) for the lock to remain valid + * @param timeToLive the time (in milliseconds) for the lock to remain valid. + * This value must not be larger than either the anticipated + * operation time or a server startup time. Typically, it should be + * a few seconds. * @return Returns the newly-created lock token * @throws LockAcquisitionException if the lock could not be acquired */ String getLock(QName lockQName, long timeToLive); /** - * {@inheritDoc JobLockService#getLock(QName, long)} + * Take a manually-managed lock. The lock current thread or transaction will not be tagged - + * the returned lock token must be used for further management of the lock. + *

+ * No lock management is provided: the lock must be released manually or will only become + * available by expiry. No deadlock management is provided, either. *

* If the lock cannot be immediately acquired, the process will wait and retry. * + * @param lockQName the name of the lock to acquire + * @param timeToLive the time (in milliseconds) for the lock to remain valid. + * This value must not be larger than either the anticipated + * operation time or a server startup time. Typically, it should be + * a few seconds. * @param retryWait the time (in milliseconds) to wait before trying again * @param retryCount the maximum number of times to attempt the lock acquisition * @throws LockAcquisitionException if the lock could not be acquired */ - String getLock(QName lockQName, long timeToLive, long retryWait, int retryCount); + String getLock(QName lockQName, long timeToLive, long retryWait, int retryCount); /** * Refresh the lock using a valid lock token. @@ -114,6 +149,25 @@ public interface JobLockService */ void refreshLock(String lockToken, QName lockQName, long timeToLive); + /** + * Provide a callback to refresh a lock using a valid lock token, pushing responsibility + * for regular lock refreshing onto the service implementation code. This method should only + * be called once for a given lock token to prevent unnecessary refreshing. + *

+ * Since the lock is not actually refreshed by this method, there will be no LockAcquisitionException. + *

+ * The TTL (time to live) will be divided by two and the result used to trigger a timer thread + * to initiate the callback. + * + * @param lockToken the lock token returned when the lock was acquired + * @param lockQName the name of the previously-acquired lock + * @param timeToLive the time (in milliseconds) for the lock to remain valid + * @param callback the object that will be called at intervals of timeToLive/2 (about) + * + * @since 3.4.0a + */ + void refreshLock(String lockToken, QName lockQName, long timeToLive, JobLockRefreshCallback callback); + /** * Release the lock using a valid lock token. * @@ -121,4 +175,64 @@ public interface JobLockService * @param lockQName the name of the previously-acquired lock */ void releaseLock(String lockToken, QName lockQName); + + /** + * Interface for implementations that need a timed callback in order to refresh the lock. + *

+ * This callback is designed for processes that need to lock and wait for external processes + * to complete; keeping a local thread to refresh the lock is possible but it is more + * efficient for the thread pool and timer mechanisms to be shared. + *

+ * The callback implementations must be thread-safe and should be independent + * of other callbacks i.e. the simplest and safest is to use an anonymous inner class for + * the implementation. + *

+ * IMPORTANT: Do not block the calls to this interface - other callbacks might be held + * up producing inconsistent behaviour. Failure to observe this will lead + * to warnings and lock termination i.e. the service implementation will + * force early termination of the lock and will discard the callback. + * + * @author Derek Hulley + * @since 3.4.0b + */ + public interface JobLockRefreshCallback + { + /** + * Timed callback from the service to determine if the lock is still required. + *

+ * IMPORTANT: Do not block calls to this method for any reason and do perform any + * non-trivial determination of state i.e. have the answer to this + * method immediately available at all times. Failure to observe this + * will lead to warnings and lock termination. + *

+ * The original lock token is not provided in the callback; this is to prevent + * implementations from attempting to link the lock token back to the specific callback + * instances. + * + * @return Return true if the task associated with the callback + * is still active i.e. it still needs the lock associated with the + * callback or false if the lock is no longer required. + * + * @since 3.4.0b + */ + boolean isActive(); + + /** + * Callback received when the lock refresh has failed. Implementations should immediately and + * gracefully terminate their associated processes as the associated lock is no longer valid, + * which is a direct indication that a competing process has taken and is using the required + * lock or that the process has already completed and released the lock. + *

+ * As a convenenience, this method is called when a VM shutdown is detected as well; the + * associated lock is not refreshed and this method is called to instruct the locking process + * to terminate. + *

+ * This method is also called if the initiating process is self-terminated i.e. if the originating + * process releases the lock itself. This method is not called if the process is not + * {@link #isActive() active}. + * + * @since 3.4.0b + */ + void lockReleased(); + } } diff --git a/source/java/org/alfresco/repo/lock/JobLockServiceImpl.java b/source/java/org/alfresco/repo/lock/JobLockServiceImpl.java index a6af561e68..777f89b92d 100644 --- a/source/java/org/alfresco/repo/lock/JobLockServiceImpl.java +++ b/source/java/org/alfresco/repo/lock/JobLockServiceImpl.java @@ -19,6 +19,9 @@ package org.alfresco.repo.lock; import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.alfresco.repo.domain.locks.LockDAO; import org.alfresco.repo.transaction.AlfrescoTransactionSupport; @@ -29,9 +32,12 @@ import org.alfresco.repo.transaction.AlfrescoTransactionSupport.TxnReadState; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.namespace.QName; import org.alfresco.util.GUID; +import org.alfresco.util.VmShutdownListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.sun.star.uno.RuntimeException; + /** * {@inheritDoc JobLockService} * @@ -49,6 +55,9 @@ public class JobLockServiceImpl implements JobLockService private int defaultRetryCount; private long defaultRetryWait; + private ScheduledExecutorService scheduler; + private VmShutdownListener shutdownListener; + /** * Stateless listener that does post-transaction cleanup. */ @@ -59,6 +68,8 @@ public class JobLockServiceImpl implements JobLockService defaultRetryWait = 20; defaultRetryCount = 10; txnListener = new LockTransactionListener(); + scheduler = Executors.newScheduledThreadPool(1); + shutdownListener = new VmShutdownListener("JobLockService"); } /** @@ -228,6 +239,155 @@ public class JobLockServiceImpl implements JobLockService } } + /** + * {@inheritDoc} + */ + @Override + public void refreshLock( + final String lockToken, final QName lockQName, final long timeToLive, + final JobLockRefreshCallback callback) + { + // Do nothing if the scheduler has shut down + if (scheduler.isShutdown() || scheduler.isTerminated()) + { + if (logger.isDebugEnabled()) + { + logger.debug( + "Lock refresh failed: \n" + + " Lock: " + lockQName + "\n" + + " TTL: " + timeToLive + "\n" + + " Txn: " + lockToken + "\n" + + " Error: " + "Lock refresh scheduler has shut down. The VM may be terminating."); + } + // Don't schedule + throw new LockAcquisitionException(lockQName, lockToken); + } + + final long delay = timeToLive / 2; + if (delay < 1) + { + throw new IllegalArgumentException("Very small timeToLive: " + timeToLive); + } + // Our runnable does the callbacks + Runnable runnable = new Runnable() + { + @Override + public void run() + { + // Most lock debug is done elsewhere; just note that this is a timed process. + if (logger.isDebugEnabled()) + { + logger.debug( + "Initiating timed Lock refresh: \n" + + " Lock: " + lockQName + "\n" + + " TTL: " + timeToLive + "\n" + + " Txn: " + lockToken); + } + + // First check the VM + if (shutdownListener.isVmShuttingDown()) + { + callback.lockReleased(); + return; + } + boolean isActive = false; + try + { + isActive = callIsActive(callback, delay); + } + catch (Throwable e) + { + logger.error( + "Lock isActive check failed: \n" + + " Lock: " + lockQName + "\n" + + " TTL: " + timeToLive + "\n" + + " Txn: " + lockToken, + e); + // The callback must be informed + callLockReleased(callback); + return; + } + + if (!isActive) + { + // Debug + if (logger.isDebugEnabled()) + { + logger.debug( + "Lock callback is inactive. Releasing lock: \n" + + " Lock: " + lockQName + "\n" + + " TTL: " + timeToLive + "\n" + + " Txn: " + lockToken); + } + // The callback is no longer active, so we don't need to refresh. + // Release the lock in case the initiator did not do it. + try + { + releaseLock(lockToken, lockQName); + } + catch (LockAcquisitionException e) + { + // The lock is already gone: job done + } + // The callback must be informed + callLockReleased(callback); + } + else + { + try + { + refreshLock(lockToken, lockQName, timeToLive); + // Success. The callback does not need to know. + } + catch (LockAcquisitionException e) + { + // The callback must be informed + callLockReleased(callback); + } + } + } + }; + // Schedule this + scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS); + } + + /** + * Calls the callback {@link JobLockRefreshCallback#isActive() isActive} with time-check. + */ + private boolean callIsActive(JobLockRefreshCallback callback, long delay) throws Throwable + { + long t1 = System.nanoTime(); + + boolean isActive = callback.isActive(); + + long t2 = System.nanoTime(); + double timeWastedMs = (double)(t2 - t1)/(double)10E6; + if (timeWastedMs > delay || timeWastedMs > 1000L) + { + // The isActive did not come back quickly enough. There is no point taking longer than + // the delay, but in any case 1s to provide a boolean is too much. This is probably an + // indication that the isActive implementation is performing complex state determination, + // which is specifically referenced in the API doc. + throw new RuntimeException( + "isActive check took " + timeWastedMs + " to return, which is too long."); + } + return isActive; + } + /** + * Calls the callback {@link JobLockRefreshCallback#lockReleased()}. + */ + private void callLockReleased(JobLockRefreshCallback callback) + { + try + { + callback.lockReleased(); + } + catch (Throwable ee) + { + logger.error("Callback to lockReleased failed.", ee); + } + } + /** * {@inheritDoc} */ diff --git a/source/java/org/alfresco/repo/lock/JobLockServiceTest.java b/source/java/org/alfresco/repo/lock/JobLockServiceTest.java index c70196fa9b..8c15ce09b9 100644 --- a/source/java/org/alfresco/repo/lock/JobLockServiceTest.java +++ b/source/java/org/alfresco/repo/lock/JobLockServiceTest.java @@ -1,4 +1,4 @@ -/* +/* * Copyright (C) 2005-2010 Alfresco Software Limited. * * This file is part of Alfresco @@ -14,268 +14,388 @@ * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License - * along with Alfresco. If not, see . - */ -package org.alfresco.repo.lock; - -import junit.framework.TestCase; - -import org.alfresco.repo.domain.locks.LockDAO; -import org.alfresco.repo.transaction.RetryingTransactionHelper; -import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; -import org.alfresco.service.ServiceRegistry; -import org.alfresco.service.namespace.QName; -import org.alfresco.service.transaction.TransactionService; -import org.alfresco.util.ApplicationContextHelper; -import org.springframework.context.ApplicationContext; - -/** - * Tests the high-level capabilities provided by the service implementation. The DAO tests - * stress the underlying database work, so we only need to deal with deadlock resolution, etc. - * - * @see JobLockService the service being tested - * @see LockDAO the DAO being indirectly tested - * - * @author Derek Hulley - * @since 3.2 - */ -public class JobLockServiceTest extends TestCase -{ - public static final String NAMESPACE = "http://www.alfresco.org/test/JobLockServiceTest"; - - private ApplicationContext ctx = ApplicationContextHelper.getApplicationContext(); - - private TransactionService transactionService; - private RetryingTransactionHelper txnHelper; - private JobLockService jobLockService; - // Lock names for the tests - private QName lockA; - private QName lockAA; - private QName lockAAA; - private QName lockAAB; - private QName lockAAC; - private QName lockAB; - private QName lockABA; - private QName lockABB; - private QName lockABC; - - @Override - public void setUp() throws Exception - { - ServiceRegistry serviceRegistry = (ServiceRegistry) ctx.getBean(ServiceRegistry.SERVICE_REGISTRY); - transactionService = serviceRegistry.getTransactionService(); - txnHelper = transactionService.getRetryingTransactionHelper(); - - jobLockService = (JobLockService) ctx.getBean("jobLockService"); - - // Get the test name - String testName = getName(); - // Build lock names for the test - lockA = QName.createQName(NAMESPACE, "a-" + testName); - lockAA = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName); - lockAAA = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".a-" + testName); - lockAAB = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".b-" + testName); - lockAAC = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".c-" + testName); - lockAB = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName); - lockABA = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".a-" + testName); - lockABB = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".b-" + testName); - lockABC = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".c-" + testName); - } - - public void testSetUp() - { - assertNotNull(jobLockService); - } - - public void testEnforceTxn() - { - try - { - jobLockService.getTransactionalLock(lockAAA, 50L); - fail("Service did not enforce the presence of a transaction"); - } - catch (IllegalStateException e) - { - // Expected - } - } - - /** - * Checks that the lock can be aquired by a read-only transaction i.e. that locking is - * independent of the outer transaction. - */ - public void testLockInReadOnly() throws Exception - { - RetryingTransactionCallback lockCallback = new RetryingTransactionCallback() - { - public Object execute() throws Throwable - { - jobLockService.getTransactionalLock(lockAAA, 500); - return null; - } - }; - txnHelper.doInTransaction(lockCallback, true, true); - } - - /** - * Checks that locks are released on commit - */ - public void testLockReleaseOnCommit() throws Exception - { - RetryingTransactionCallback lockCallback = new RetryingTransactionCallback() - { - public Object execute() throws Throwable - { - jobLockService.getTransactionalLock(lockAAA, 5000); - return null; - } - }; - txnHelper.doInTransaction(lockCallback, true, true); - // The lock should be free now, even though the TTL was high - RetryingTransactionCallback lockCheckCallback = new RetryingTransactionCallback() - { - public Object execute() throws Throwable - { - jobLockService.getTransactionalLock(lockAAA, 50); - return null; - } - }; - txnHelper.doInTransaction(lockCheckCallback, true, true); - } - - /** - * Checks that locks are released on rollback - */ - public void testLockReleaseOnRollback() throws Exception - { - RetryingTransactionCallback lockCallback = new RetryingTransactionCallback() - { - public Object execute() throws Throwable - { - jobLockService.getTransactionalLock(lockAAA, 5000); - throw new UnsupportedOperationException("ALERT!"); - } - }; - try - { - txnHelper.doInTransaction(lockCallback, true, true); - fail("Expected transaction failure"); - } - catch (UnsupportedOperationException e) - { - // Expected - } - // The lock should be free now, even though the TTL was high - RetryingTransactionCallback lockCheckCallback = new RetryingTransactionCallback() - { - public Object execute() throws Throwable - { - jobLockService.getTransactionalLock(lockAAA, 50); - return null; - } - }; - txnHelper.doInTransaction(lockCheckCallback, true, true); - } - - /** - * Sets up two threads in a deadlock scenario. Each of the threads has a long wait timeout - * for the required locks. If there were a deadlock, the shorter of the the wait times would - * be how long it would take before one of them is thrown out. Firstly, we check that one - * of the threads is thrown out. Then we check that the thread is thrown out quickly. - */ - public synchronized void testDeadlockPrevention() throws Throwable - { - DeadlockingThread t1 = new DeadlockingThread(lockAAA, lockAAB); - DeadlockingThread t2 = new DeadlockingThread(lockAAB, lockAAA); - // Start them - t1.start(); - t2.start(); - // They can take their first locks (there should be no contention) - t1.incrementNextLock(); - t2.incrementNextLock(); - // Wait for them to do this - try { this.wait(2000L); } catch (InterruptedException e) {} - // Advance again - t1.incrementNextLock(); - t2.incrementNextLock(); - // Wait for them to do this - try { this.wait(2000L); } catch (InterruptedException e) {} - // Advance again, to end threads - t1.incrementNextLock(); - t2.incrementNextLock(); - // Wait for them to end (commit/rollback) - try { this.wait(2000L); } catch (InterruptedException e) {} - - if (t1.otherFailure != null) - { - throw t1.otherFailure; - } - if (t2.otherFailure != null) - { - throw t2.otherFailure; - } - assertNull("T1 should have succeeded as the ordered locker: " + t1.lockFailure, t1.lockFailure); - assertNotNull("T2 should have failed as the unordered locker.", t2.lockFailure); - } - - private class DeadlockingThread extends Thread - { - private final QName[] lockQNames; - private volatile int nextLock = -1; - private LockAcquisitionException lockFailure; - private Throwable otherFailure; - - private DeadlockingThread(QName ... lockQNames) - { - super("DeadlockingThread"); - this.lockQNames = lockQNames; - setDaemon(true); - } - - private void incrementNextLock() - { - nextLock++; - } - - @Override - public void run() - { - RetryingTransactionCallback runCallback = new RetryingTransactionCallback() - { - public synchronized Object execute() throws Throwable - { - int currentLock = -1; - // Take the locks in turn, quitting when told to take a lock that's not there - while (currentLock < lockQNames.length - 1) - { - // Check if we have been instructed to take a lock - if (nextLock > currentLock) - { - // Advance and grab the lock - currentLock++; - jobLockService.getTransactionalLock(lockQNames[currentLock], 5000L); - } - else - { - // No advance, so wait a bit more - try { this.wait(20L); } catch (InterruptedException e) {} - } - } - return null; - } - }; - try - { - txnHelper.doInTransaction(runCallback, true); - } - catch (LockAcquisitionException e) - { - lockFailure = e; - } - catch (Throwable e) - { - otherFailure = e; - } - } - } -} + * along with Alfresco. If not, see . + */ +package org.alfresco.repo.lock; + +import junit.framework.TestCase; + +import org.alfresco.repo.domain.locks.LockDAO; +import org.alfresco.repo.lock.JobLockService.JobLockRefreshCallback; +import org.alfresco.repo.transaction.RetryingTransactionHelper; +import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; +import org.alfresco.service.ServiceRegistry; +import org.alfresco.service.namespace.QName; +import org.alfresco.service.transaction.TransactionService; +import org.alfresco.util.ApplicationContextHelper; +import org.springframework.context.ApplicationContext; + +/** + * Tests the high-level capabilities provided by the service implementation. The DAO tests + * stress the underlying database work, so we only need to deal with deadlock resolution, etc. + * + * @see JobLockService the service being tested + * @see LockDAO the DAO being indirectly tested + * + * @author Derek Hulley + * @since 3.2 + */ +public class JobLockServiceTest extends TestCase +{ + public static final String NAMESPACE = "http://www.alfresco.org/test/JobLockServiceTest"; + + private ApplicationContext ctx = ApplicationContextHelper.getApplicationContext(); + + private TransactionService transactionService; + private RetryingTransactionHelper txnHelper; + private JobLockService jobLockService; + // Lock names for the tests + private QName lockA; + private QName lockAA; + private QName lockAAA; + private QName lockAAB; + private QName lockAAC; + private QName lockAB; + private QName lockABA; + private QName lockABB; + private QName lockABC; + + @Override + public void setUp() throws Exception + { + ServiceRegistry serviceRegistry = (ServiceRegistry) ctx.getBean(ServiceRegistry.SERVICE_REGISTRY); + transactionService = serviceRegistry.getTransactionService(); + txnHelper = transactionService.getRetryingTransactionHelper(); + + jobLockService = (JobLockService) ctx.getBean("jobLockService"); + + // Get the test name + String testName = getName(); + // Build lock names for the test + lockA = QName.createQName(NAMESPACE, "a-" + testName); + lockAA = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName); + lockAAA = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".a-" + testName); + lockAAB = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".b-" + testName); + lockAAC = QName.createQName(NAMESPACE, "a-" + testName + ".a-" + testName + ".c-" + testName); + lockAB = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName); + lockABA = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".a-" + testName); + lockABB = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".b-" + testName); + lockABC = QName.createQName(NAMESPACE, "a-" + testName + ".b-" + testName + ".c-" + testName); + } + + public void testSetUp() + { + assertNotNull(jobLockService); + } + + public void testEnforceTxn() + { + try + { + jobLockService.getTransactionalLock(lockAAA, 50L); + fail("Service did not enforce the presence of a transaction"); + } + catch (IllegalStateException e) + { + // Expected + } + } + + /** + * Checks that the lock can be aquired by a read-only transaction i.e. that locking is + * independent of the outer transaction. + */ + public void testLockInReadOnly() throws Exception + { + RetryingTransactionCallback lockCallback = new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + jobLockService.getTransactionalLock(lockAAA, 500); + return null; + } + }; + txnHelper.doInTransaction(lockCallback, true, true); + } + + /** + * Checks that locks are released on commit + */ + public void testLockReleaseOnCommit() throws Exception + { + RetryingTransactionCallback lockCallback = new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + jobLockService.getTransactionalLock(lockAAA, 5000); + return null; + } + }; + txnHelper.doInTransaction(lockCallback, true, true); + // The lock should be free now, even though the TTL was high + RetryingTransactionCallback lockCheckCallback = new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + jobLockService.getTransactionalLock(lockAAA, 50); + return null; + } + }; + txnHelper.doInTransaction(lockCheckCallback, true, true); + } + + /** + * Checks that locks are released on rollback + */ + public void testLockReleaseOnRollback() throws Exception + { + RetryingTransactionCallback lockCallback = new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + jobLockService.getTransactionalLock(lockAAA, 5000); + throw new UnsupportedOperationException("ALERT!"); + } + }; + try + { + txnHelper.doInTransaction(lockCallback, true, true); + fail("Expected transaction failure"); + } + catch (UnsupportedOperationException e) + { + // Expected + } + // The lock should be free now, even though the TTL was high + RetryingTransactionCallback lockCheckCallback = new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + jobLockService.getTransactionalLock(lockAAA, 50); + return null; + } + }; + txnHelper.doInTransaction(lockCheckCallback, true, true); + } + + /** + * Sets up two threads in a deadlock scenario. Each of the threads has a long wait timeout + * for the required locks. If there were a deadlock, the shorter of the the wait times would + * be how long it would take before one of them is thrown out. Firstly, we check that one + * of the threads is thrown out. Then we check that the thread is thrown out quickly. + */ + public synchronized void testDeadlockPrevention() throws Throwable + { + DeadlockingThread t1 = new DeadlockingThread(lockAAA, lockAAB); + DeadlockingThread t2 = new DeadlockingThread(lockAAB, lockAAA); + // Start them + t1.start(); + t2.start(); + // They can take their first locks (there should be no contention) + t1.incrementNextLock(); + t2.incrementNextLock(); + // Wait for them to do this + try { this.wait(2000L); } catch (InterruptedException e) {} + // Advance again + t1.incrementNextLock(); + t2.incrementNextLock(); + // Wait for them to do this + try { this.wait(2000L); } catch (InterruptedException e) {} + // Advance again, to end threads + t1.incrementNextLock(); + t2.incrementNextLock(); + // Wait for them to end (commit/rollback) + try { this.wait(2000L); } catch (InterruptedException e) {} + + if (t1.otherFailure != null) + { + throw t1.otherFailure; + } + if (t2.otherFailure != null) + { + throw t2.otherFailure; + } + assertNull("T1 should have succeeded as the ordered locker: " + t1.lockFailure, t1.lockFailure); + assertNotNull("T2 should have failed as the unordered locker.", t2.lockFailure); + } + + private class DeadlockingThread extends Thread + { + private final QName[] lockQNames; + private volatile int nextLock = -1; + private LockAcquisitionException lockFailure; + private Throwable otherFailure; + + private DeadlockingThread(QName ... lockQNames) + { + super("DeadlockingThread"); + this.lockQNames = lockQNames; + setDaemon(true); + } + + private void incrementNextLock() + { + nextLock++; + } + + @Override + public void run() + { + RetryingTransactionCallback runCallback = new RetryingTransactionCallback() + { + public synchronized Object execute() throws Throwable + { + int currentLock = -1; + // Take the locks in turn, quitting when told to take a lock that's not there + while (currentLock < lockQNames.length - 1) + { + // Check if we have been instructed to take a lock + if (nextLock > currentLock) + { + // Advance and grab the lock + currentLock++; + jobLockService.getTransactionalLock(lockQNames[currentLock], 5000L); + } + else + { + // No advance, so wait a bit more + try { this.wait(20L); } catch (InterruptedException e) {} + } + } + return null; + } + }; + try + { + txnHelper.doInTransaction(runCallback, true); + } + catch (LockAcquisitionException e) + { + lockFailure = e; + } + catch (Throwable e) + { + otherFailure = e; + } + } + } + + public synchronized void testLockCallbackReleaseInactive() throws Exception + { + final QName lockQName = QName.createQName(NAMESPACE, getName()); + final long lockTTL = 1000L; + final String lockToken = jobLockService.getLock(lockQName, lockTTL); + + final boolean[] released = new boolean[1]; + // Immediately-inactive job + JobLockRefreshCallback callback = new JobLockRefreshCallback() + { + @Override + public boolean isActive() + { + return false; + } + + @Override + public void lockReleased() + { + released[0] = true; + } + }; + + jobLockService.refreshLock(lockToken, lockQName, lockTTL, callback); + // The first refresh will occur in 500ms + wait(1000L); + // Should have been released by now + assertTrue("Expected lockReleased to have been called", released[0]); + try + { + jobLockService.getLock(lockQName, lockTTL); + } + catch (LockAcquisitionException e) + { + fail("Lock should have been released by callback infrastructure"); + } + } + + public synchronized void testLockCallbackReleaseSelf() throws Exception + { + final QName lockQName = QName.createQName(NAMESPACE, getName()); + final long lockTTL = 1000L; + final String lockToken = jobLockService.getLock(lockQName, lockTTL); + + final boolean[] released = new boolean[1]; + // Immediately-inactive job, releasing the lock + JobLockRefreshCallback callback = new JobLockRefreshCallback() + { + @Override + public boolean isActive() + { + jobLockService.releaseLock(lockToken, lockQName); + return false; + } + + @Override + public void lockReleased() + { + released[0] = true; + } + }; + + jobLockService.refreshLock(lockToken, lockQName, lockTTL, callback); + // The first refresh will occur in 500ms + wait(1000L); + // Should have been released by now + assertTrue("Expected lockReleased to have been called", released[0]); + try + { + jobLockService.getLock(lockQName, lockTTL); + } + catch (LockAcquisitionException e) + { + fail("Lock should have been released by callback infrastructure"); + } + } + + public synchronized void testLockCallbackReleaseTimed() throws Exception + { + final QName lockQName = QName.createQName(NAMESPACE, getName()); + final long lockTTL = 1000L; + final String lockToken = jobLockService.getLock(lockQName, lockTTL); + + final boolean[] checked = new boolean[1]; + final boolean[] released = new boolean[1]; + // Do not release and remain active + JobLockRefreshCallback callback = new JobLockRefreshCallback() + { + @Override + public boolean isActive() + { + checked[0] = true; + return true; + } + + @Override + public void lockReleased() + { + released[0] = true; + } + }; + + jobLockService.refreshLock(lockToken, lockQName, lockTTL, callback); + // The first refresh will occur in 500ms + wait(1000L); + + assertTrue("Active check has not occured", checked[0]); + assertFalse("lockReleased should NOT have been called", released[0]); + try + { + jobLockService.getLock(lockQName, lockTTL); + fail("Lock should still be held"); + } + catch (LockAcquisitionException e) + { + // Expected + } + } +}