diff --git a/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java b/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java index 4179ca3c6e..f7d6fa040e 100644 --- a/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java +++ b/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java @@ -22,6 +22,8 @@ import java.lang.reflect.Method; import java.sql.BatchUpdateException; import java.sql.SQLException; import java.util.Random; +import java.util.SortedMap; +import java.util.TreeMap; import javax.transaction.RollbackException; import javax.transaction.Status; @@ -128,14 +130,11 @@ public class RetryingTransactionHelper */ private long maxExecutionMs; + /** Map of transaction start times to counts. Only maintained when maxExecutionMs is set. */ + private SortedMap txnsInProgress = new TreeMap(); + /** The number of concurrently exeucting transactions. Only maintained when maxExecutionMs is set. */ private int txnCount; - - /** - * A 'ceiling' for the number of concurrent transactions that can execute. Dynamically maintained so that exeuction - * time is within maxExecutionMs. Transactions above this limit will be rejected with a {@link TooBusyException}. - */ - private Integer txnCeiling; /** * Whether the the transactions may only be reads @@ -294,20 +293,52 @@ public class RetryingTransactionHelper throw new AccessDeniedException(MSG_READ_ONLY); } + // First validate the requiresNew setting + boolean startingNew = requiresNew; + if (!startingNew) + { + TxnReadState readState = AlfrescoTransactionSupport.getTransactionReadState(); + switch (readState) + { + case TXN_READ_ONLY: + if (!readOnly) + { + // The current transaction is read-only, but a writable transaction is requested + throw new AlfrescoRuntimeException("Read-Write transaction started within read-only transaction"); + } + // We are in a read-only transaction and this is what we require so continue with it. + break; + case TXN_READ_WRITE: + // We are in a read-write transaction. It cannot be downgraded so just continue with it. + break; + case TXN_NONE: + // There is no current transaction so we need a new one. + startingNew = true; + break; + default: + throw new RuntimeException("Unknown transaction state: " + readState); + } + } + // If we are time limiting, set ourselves a time limit and maintain the count of concurrent transactions long startTime = 0, endTime = 0, txnStartTime = 0; - int txnCountWhenStarted = 0; - if (maxExecutionMs > 0) + if (startingNew && maxExecutionMs > 0) { startTime = System.currentTimeMillis(); synchronized (this) { - // If this transaction would take us above our ceiling, reject it - if (txnCeiling != null && txnCount >= txnCeiling) + if (txnCount > 0) { - throw new TooBusyException("Too busy: " + txnCount + " transactions"); + // If this transaction would take us above our ceiling, reject it + long oldestDuration = startTime - txnsInProgress.firstKey(); + if (oldestDuration > maxExecutionMs) + { + throw new TooBusyException("Too busy: " + txnCount + " transactions. Oldest " + oldestDuration + " milliseconds"); + } } - txnCountWhenStarted = ++txnCount; + Integer count = txnsInProgress.get(startTime); + txnsInProgress.put(startTime, count == null ? 1 : count + 1); + ++txnCount; } endTime = startTime + maxExecutionMs; } @@ -319,45 +350,19 @@ public class RetryingTransactionHelper RuntimeException lastException = null; for (int count = 0; count == 0 || count < maxRetries; count++) { - // Monitor duration of each retry so that we can project an end time - if (maxExecutionMs > 0) - { - txnStartTime = System.currentTimeMillis(); - } - UserTransaction txn = null; try { - if (requiresNew) + if (startingNew) { - txn = txnService.getNonPropagatingUserTransaction(readOnly); - } - else - { - TxnReadState readState = AlfrescoTransactionSupport.getTransactionReadState(); - switch (readState) - { - case TXN_READ_ONLY: - if (!readOnly) - { - // The current transaction is read-only, but a writable transaction is requested - throw new AlfrescoRuntimeException("Read-Write transaction started within read-only transaction"); - } - // We are in a read-only transaction and this is what we require so continue with it. - break; - case TXN_READ_WRITE: - // We are in a read-write transaction. It cannot be downgraded so just continue with it. - break; - case TXN_NONE: - // There is no current transaction so we need a new one. - txn = txnService.getUserTransaction(readOnly); - break; - default: - throw new RuntimeException("Unknown transaction state: " + readState); + // Monitor duration of each retry so that we can project an end time + if (maxExecutionMs > 0) + { + txnStartTime = System.currentTimeMillis(); } - } - if (txn != null) - { + + txn = requiresNew ? txnService.getNonPropagatingUserTransaction(readOnly) : txnService + .getUserTransaction(readOnly); txn.begin(); // Wrap it to protect it UserTransactionProtectionAdvise advise = new UserTransactionProtectionAdvise(); @@ -475,8 +480,7 @@ public class RetryingTransactionHelper long projectedEndTime = txnEndTime + (txnEndTime - txnStartTime); if (projectedEndTime > endTime) { - // Force the ceiling to be lowered and reject - endTime = 0; + // Reject the retry throw new TooBusyException("Too busy to retry", e); } // Limit the wait duration to fit into the time we have left @@ -522,23 +526,22 @@ public class RetryingTransactionHelper } finally { - if (maxExecutionMs > 0) + if (startingNew && maxExecutionMs > 0) { synchronized (this) { txnCount--; - if(System.currentTimeMillis() > endTime) + Integer count = txnsInProgress.get(startTime); + if (count != null) { - // Lower the ceiling - if (txnCeiling == null || txnCeiling > txnCountWhenStarted - 1) + if (count == 1) { - txnCeiling = Math.max(1, txnCountWhenStarted - 1); + txnsInProgress.remove(startTime); + } + else + { + txnsInProgress.put(startTime, count-1); } - } - else if (txnCeiling != null && txnCeiling < txnCountWhenStarted + 1) - { - // Raise the ceiling - txnCeiling = txnCountWhenStarted + 1; } } } diff --git a/source/java/org/alfresco/repo/transaction/RetryingTransactionHelperTest.java b/source/java/org/alfresco/repo/transaction/RetryingTransactionHelperTest.java index 2343342650..6700aa6fcf 100644 --- a/source/java/org/alfresco/repo/transaction/RetryingTransactionHelperTest.java +++ b/source/java/org/alfresco/repo/transaction/RetryingTransactionHelperTest.java @@ -556,54 +556,19 @@ public class RetryingTransactionHelperTest extends TestCase txnHelper.setMaxExecutionMs(3000); final List caughtExceptions = Collections.synchronizedList(new LinkedList()); - // Force ceiling of 2 - runThreads(txnHelper, caughtExceptions, new Pair(2, 1000), new Pair(1, 5000)); - if (caughtExceptions.size() > 0) - { - throw new RuntimeException("Unexpected exception", caughtExceptions.get(0)); - } - - - // Try breaching ceiling - runThreads(txnHelper, caughtExceptions, new Pair(3, 1000)); - assertTrue("Expected exception", caughtExceptions.size() > 0); + // Try submitting a request after a timeout + runThreads(txnHelper, caughtExceptions, new Pair(2, 1000), new Pair(1, 5000), new Pair(4, 1000)); + assertEquals("Expected 1 exception", 1, caughtExceptions.size()); assertTrue("Excpected TooBusyException", caughtExceptions.get(0) instanceof TooBusyException); - // Stay within ceiling, forcing expansion + // Stay within timeout limits caughtExceptions.clear(); - runThreads(txnHelper, caughtExceptions, new Pair(1, 1000), new Pair(1, 2000)); + runThreads(txnHelper, caughtExceptions, new Pair(2, 1000), new Pair(1, 2000), new Pair(4, 1000)); if (caughtExceptions.size() > 0) { throw new RuntimeException("Unexpected exception", caughtExceptions.get(0)); } - // Test expansion - caughtExceptions.clear(); - runThreads(txnHelper, caughtExceptions, new Pair(3, 1000)); - if (caughtExceptions.size() > 0) - { - throw new RuntimeException("Unexpected exception", caughtExceptions.get(0)); - } - - // Ensure expansion no too fast - caughtExceptions.clear(); - runThreads(txnHelper, caughtExceptions, new Pair(5, 1000)); - assertTrue("Expected exception", caughtExceptions.size() > 0); - assertTrue("Excpected TooBusyException", caughtExceptions.get(0) instanceof TooBusyException); - - // Test contraction - caughtExceptions.clear(); - runThreads(txnHelper, caughtExceptions, new Pair(2, 1000), new Pair(1, 5000)); - if (caughtExceptions.size() > 0) - { - throw new RuntimeException("Unexpected exception", caughtExceptions.get(0)); - } - - // Try breaching new ceiling - runThreads(txnHelper, caughtExceptions, new Pair(3, 1000)); - assertTrue("Expected exception", caughtExceptions.size() > 0); - assertTrue("Excpected TooBusyException", caughtExceptions.get(0) instanceof TooBusyException); - // Check retry limitation long startTime = System.currentTimeMillis(); try @@ -629,59 +594,52 @@ public class RetryingTransactionHelperTest extends TestCase private void runThreads(final RetryingTransactionHelper txnHelper, final List caughtExceptions, Pair... countDurationPairs) { - int threadCount = 0; - for (Pair pair : countDurationPairs) - { - threadCount += pair.getFirst(); - } - - final CountDownLatch endLatch = new CountDownLatch(threadCount); + final CountDownLatch endLatch = new CountDownLatch(countDurationPairs.length); class Callback implements RetryingTransactionCallback { - private final CountDownLatch startLatch; private final int duration; - public Callback(CountDownLatch startLatch, int duration) + public Callback(int duration) { - this.startLatch = startLatch; this.duration = duration; } public Void execute() throws Throwable { - long endTime = System.currentTimeMillis() + duration; - - // Signal that we've started - startLatch.countDown(); - - long duration = endTime - System.currentTimeMillis(); - if (duration > 0) - { - Thread.sleep(duration); - } + Thread.sleep(duration); return null; } } ; class Work implements Runnable { + private final CountDownLatch startLatch; private final Callback callback; + private final int execCount; - public Work(Callback callback) + public Work(CountDownLatch startLatch, Callback callback, int execCount) { + this.startLatch = startLatch; this.callback = callback; + this.execCount = execCount; } public void run() { - try + // Signal that we've started + startLatch.countDown(); + + for (int i = 0; i < execCount; i++) { - txnHelper.doInTransaction(callback); - } - catch (Throwable e) - { - caughtExceptions.add(e); + try + { + txnHelper.doInTransaction(callback); + } + catch (Throwable e) + { + caughtExceptions.add(e); + } } endLatch.countDown(); } @@ -693,21 +651,18 @@ public class RetryingTransactionHelperTest extends TestCase for (Pair pair : countDurationPairs) { CountDownLatch startLatch = new CountDownLatch(1); - Runnable work = new Work(new Callback(startLatch, pair.getSecond())); - for (int i = 0; i < pair.getFirst(); i++) + Runnable work = new Work(startLatch, new Callback(pair.getSecond()), pair.getFirst()); + Thread thread = new Thread(work); + thread.setName(getName() + "-" + j++); + thread.setDaemon(true); + thread.start(); + try + { + // Wait for the thread to get up and running. We need them starting in sequence + startLatch.await(60, TimeUnit.SECONDS); + } + catch (InterruptedException e) { - Thread thread = new Thread(work); - thread.setName(getName() + "-" + j++); - thread.setDaemon(true); - thread.start(); - try - { - // Wait for the thread to get up and running. We need them starting in sequence - startLatch.await(60, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - } } } // Wait for the threads to have finished