Merged V3.3-BUG-FIX to HEAD

23257: Merged V3.3 to V3.3-BUG-FIX
      23224: (RECORD ONLY) MERGE V3.3 BUG FIX to V3.3 
         23199 : imap message test
      23247: Merged HEAD to V3.3
         23246: Fix for ALF-5032: findAuthorities() getting slow when many groups are created.
            - moved to PARENT driven queries where possible in preference to PATH
      23255: Merged PATCHES/V3.2.0 to V3.3
         23252: ALF-5141, ALF-5302, ALF-5281: Improved transaction limiting mechanism
            - No ceiling. Just monitor transaction start times and reject new transactions when the oldest transaction is older than the threshold.


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@23258 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Dave Ward
2010-10-25 14:30:27 +00:00
parent 2709b5fe47
commit bcf40763a3
2 changed files with 97 additions and 139 deletions

View File

@@ -22,6 +22,8 @@ import java.lang.reflect.Method;
import java.sql.BatchUpdateException; import java.sql.BatchUpdateException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Random; import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import javax.transaction.RollbackException; import javax.transaction.RollbackException;
import javax.transaction.Status; import javax.transaction.Status;
@@ -128,14 +130,11 @@ public class RetryingTransactionHelper
*/ */
private long maxExecutionMs; private long maxExecutionMs;
/** Map of transaction start times to counts. Only maintained when maxExecutionMs is set. */
private SortedMap <Long, Integer> txnsInProgress = new TreeMap<Long, Integer>();
/** The number of concurrently exeucting transactions. Only maintained when maxExecutionMs is set. */ /** The number of concurrently exeucting transactions. Only maintained when maxExecutionMs is set. */
private int txnCount; private int txnCount;
/**
* A 'ceiling' for the number of concurrent transactions that can execute. Dynamically maintained so that exeuction
* time is within maxExecutionMs. Transactions above this limit will be rejected with a {@link TooBusyException}.
*/
private Integer txnCeiling;
/** /**
* Whether the the transactions may only be reads * Whether the the transactions may only be reads
@@ -294,20 +293,52 @@ public class RetryingTransactionHelper
throw new AccessDeniedException(MSG_READ_ONLY); throw new AccessDeniedException(MSG_READ_ONLY);
} }
// First validate the requiresNew setting
boolean startingNew = requiresNew;
if (!startingNew)
{
TxnReadState readState = AlfrescoTransactionSupport.getTransactionReadState();
switch (readState)
{
case TXN_READ_ONLY:
if (!readOnly)
{
// The current transaction is read-only, but a writable transaction is requested
throw new AlfrescoRuntimeException("Read-Write transaction started within read-only transaction");
}
// We are in a read-only transaction and this is what we require so continue with it.
break;
case TXN_READ_WRITE:
// We are in a read-write transaction. It cannot be downgraded so just continue with it.
break;
case TXN_NONE:
// There is no current transaction so we need a new one.
startingNew = true;
break;
default:
throw new RuntimeException("Unknown transaction state: " + readState);
}
}
// If we are time limiting, set ourselves a time limit and maintain the count of concurrent transactions // If we are time limiting, set ourselves a time limit and maintain the count of concurrent transactions
long startTime = 0, endTime = 0, txnStartTime = 0; long startTime = 0, endTime = 0, txnStartTime = 0;
int txnCountWhenStarted = 0; if (startingNew && maxExecutionMs > 0)
if (maxExecutionMs > 0)
{ {
startTime = System.currentTimeMillis(); startTime = System.currentTimeMillis();
synchronized (this) synchronized (this)
{ {
// If this transaction would take us above our ceiling, reject it if (txnCount > 0)
if (txnCeiling != null && txnCount >= txnCeiling)
{ {
throw new TooBusyException("Too busy: " + txnCount + " transactions"); // If this transaction would take us above our ceiling, reject it
long oldestDuration = startTime - txnsInProgress.firstKey();
if (oldestDuration > maxExecutionMs)
{
throw new TooBusyException("Too busy: " + txnCount + " transactions. Oldest " + oldestDuration + " milliseconds");
}
} }
txnCountWhenStarted = ++txnCount; Integer count = txnsInProgress.get(startTime);
txnsInProgress.put(startTime, count == null ? 1 : count + 1);
++txnCount;
} }
endTime = startTime + maxExecutionMs; endTime = startTime + maxExecutionMs;
} }
@@ -319,45 +350,19 @@ public class RetryingTransactionHelper
RuntimeException lastException = null; RuntimeException lastException = null;
for (int count = 0; count == 0 || count < maxRetries; count++) for (int count = 0; count == 0 || count < maxRetries; count++)
{ {
// Monitor duration of each retry so that we can project an end time
if (maxExecutionMs > 0)
{
txnStartTime = System.currentTimeMillis();
}
UserTransaction txn = null; UserTransaction txn = null;
try try
{ {
if (requiresNew) if (startingNew)
{ {
txn = txnService.getNonPropagatingUserTransaction(readOnly); // Monitor duration of each retry so that we can project an end time
} if (maxExecutionMs > 0)
else {
{ txnStartTime = System.currentTimeMillis();
TxnReadState readState = AlfrescoTransactionSupport.getTransactionReadState();
switch (readState)
{
case TXN_READ_ONLY:
if (!readOnly)
{
// The current transaction is read-only, but a writable transaction is requested
throw new AlfrescoRuntimeException("Read-Write transaction started within read-only transaction");
}
// We are in a read-only transaction and this is what we require so continue with it.
break;
case TXN_READ_WRITE:
// We are in a read-write transaction. It cannot be downgraded so just continue with it.
break;
case TXN_NONE:
// There is no current transaction so we need a new one.
txn = txnService.getUserTransaction(readOnly);
break;
default:
throw new RuntimeException("Unknown transaction state: " + readState);
} }
}
if (txn != null) txn = requiresNew ? txnService.getNonPropagatingUserTransaction(readOnly) : txnService
{ .getUserTransaction(readOnly);
txn.begin(); txn.begin();
// Wrap it to protect it // Wrap it to protect it
UserTransactionProtectionAdvise advise = new UserTransactionProtectionAdvise(); UserTransactionProtectionAdvise advise = new UserTransactionProtectionAdvise();
@@ -475,8 +480,7 @@ public class RetryingTransactionHelper
long projectedEndTime = txnEndTime + (txnEndTime - txnStartTime); long projectedEndTime = txnEndTime + (txnEndTime - txnStartTime);
if (projectedEndTime > endTime) if (projectedEndTime > endTime)
{ {
// Force the ceiling to be lowered and reject // Reject the retry
endTime = 0;
throw new TooBusyException("Too busy to retry", e); throw new TooBusyException("Too busy to retry", e);
} }
// Limit the wait duration to fit into the time we have left // Limit the wait duration to fit into the time we have left
@@ -522,23 +526,22 @@ public class RetryingTransactionHelper
} }
finally finally
{ {
if (maxExecutionMs > 0) if (startingNew && maxExecutionMs > 0)
{ {
synchronized (this) synchronized (this)
{ {
txnCount--; txnCount--;
if(System.currentTimeMillis() > endTime) Integer count = txnsInProgress.get(startTime);
if (count != null)
{ {
// Lower the ceiling if (count == 1)
if (txnCeiling == null || txnCeiling > txnCountWhenStarted - 1)
{ {
txnCeiling = Math.max(1, txnCountWhenStarted - 1); txnsInProgress.remove(startTime);
}
else
{
txnsInProgress.put(startTime, count-1);
} }
}
else if (txnCeiling != null && txnCeiling < txnCountWhenStarted + 1)
{
// Raise the ceiling
txnCeiling = txnCountWhenStarted + 1;
} }
} }
} }

View File

@@ -556,54 +556,19 @@ public class RetryingTransactionHelperTest extends TestCase
txnHelper.setMaxExecutionMs(3000); txnHelper.setMaxExecutionMs(3000);
final List<Throwable> caughtExceptions = Collections.synchronizedList(new LinkedList<Throwable>()); final List<Throwable> caughtExceptions = Collections.synchronizedList(new LinkedList<Throwable>());
// Force ceiling of 2 // Try submitting a request after a timeout
runThreads(txnHelper, caughtExceptions, new Pair(2, 1000), new Pair(1, 5000)); runThreads(txnHelper, caughtExceptions, new Pair(2, 1000), new Pair(1, 5000), new Pair(4, 1000));
if (caughtExceptions.size() > 0) assertEquals("Expected 1 exception", 1, caughtExceptions.size());
{
throw new RuntimeException("Unexpected exception", caughtExceptions.get(0));
}
// Try breaching ceiling
runThreads(txnHelper, caughtExceptions, new Pair(3, 1000));
assertTrue("Expected exception", caughtExceptions.size() > 0);
assertTrue("Excpected TooBusyException", caughtExceptions.get(0) instanceof TooBusyException); assertTrue("Excpected TooBusyException", caughtExceptions.get(0) instanceof TooBusyException);
// Stay within ceiling, forcing expansion // Stay within timeout limits
caughtExceptions.clear(); caughtExceptions.clear();
runThreads(txnHelper, caughtExceptions, new Pair(1, 1000), new Pair(1, 2000)); runThreads(txnHelper, caughtExceptions, new Pair(2, 1000), new Pair(1, 2000), new Pair(4, 1000));
if (caughtExceptions.size() > 0) if (caughtExceptions.size() > 0)
{ {
throw new RuntimeException("Unexpected exception", caughtExceptions.get(0)); throw new RuntimeException("Unexpected exception", caughtExceptions.get(0));
} }
// Test expansion
caughtExceptions.clear();
runThreads(txnHelper, caughtExceptions, new Pair(3, 1000));
if (caughtExceptions.size() > 0)
{
throw new RuntimeException("Unexpected exception", caughtExceptions.get(0));
}
// Ensure expansion no too fast
caughtExceptions.clear();
runThreads(txnHelper, caughtExceptions, new Pair(5, 1000));
assertTrue("Expected exception", caughtExceptions.size() > 0);
assertTrue("Excpected TooBusyException", caughtExceptions.get(0) instanceof TooBusyException);
// Test contraction
caughtExceptions.clear();
runThreads(txnHelper, caughtExceptions, new Pair(2, 1000), new Pair(1, 5000));
if (caughtExceptions.size() > 0)
{
throw new RuntimeException("Unexpected exception", caughtExceptions.get(0));
}
// Try breaching new ceiling
runThreads(txnHelper, caughtExceptions, new Pair(3, 1000));
assertTrue("Expected exception", caughtExceptions.size() > 0);
assertTrue("Excpected TooBusyException", caughtExceptions.get(0) instanceof TooBusyException);
// Check retry limitation // Check retry limitation
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
try try
@@ -629,59 +594,52 @@ public class RetryingTransactionHelperTest extends TestCase
private void runThreads(final RetryingTransactionHelper txnHelper, final List<Throwable> caughtExceptions, private void runThreads(final RetryingTransactionHelper txnHelper, final List<Throwable> caughtExceptions,
Pair<Integer, Integer>... countDurationPairs) Pair<Integer, Integer>... countDurationPairs)
{ {
int threadCount = 0; final CountDownLatch endLatch = new CountDownLatch(countDurationPairs.length);
for (Pair<Integer, Integer> pair : countDurationPairs)
{
threadCount += pair.getFirst();
}
final CountDownLatch endLatch = new CountDownLatch(threadCount);
class Callback implements RetryingTransactionCallback<Void> class Callback implements RetryingTransactionCallback<Void>
{ {
private final CountDownLatch startLatch;
private final int duration; private final int duration;
public Callback(CountDownLatch startLatch, int duration) public Callback(int duration)
{ {
this.startLatch = startLatch;
this.duration = duration; this.duration = duration;
} }
public Void execute() throws Throwable public Void execute() throws Throwable
{ {
long endTime = System.currentTimeMillis() + duration; Thread.sleep(duration);
// Signal that we've started
startLatch.countDown();
long duration = endTime - System.currentTimeMillis();
if (duration > 0)
{
Thread.sleep(duration);
}
return null; return null;
} }
} }
; ;
class Work implements Runnable class Work implements Runnable
{ {
private final CountDownLatch startLatch;
private final Callback callback; private final Callback callback;
private final int execCount;
public Work(Callback callback) public Work(CountDownLatch startLatch, Callback callback, int execCount)
{ {
this.startLatch = startLatch;
this.callback = callback; this.callback = callback;
this.execCount = execCount;
} }
public void run() public void run()
{ {
try // Signal that we've started
startLatch.countDown();
for (int i = 0; i < execCount; i++)
{ {
txnHelper.doInTransaction(callback); try
} {
catch (Throwable e) txnHelper.doInTransaction(callback);
{ }
caughtExceptions.add(e); catch (Throwable e)
{
caughtExceptions.add(e);
}
} }
endLatch.countDown(); endLatch.countDown();
} }
@@ -693,21 +651,18 @@ public class RetryingTransactionHelperTest extends TestCase
for (Pair<Integer, Integer> pair : countDurationPairs) for (Pair<Integer, Integer> pair : countDurationPairs)
{ {
CountDownLatch startLatch = new CountDownLatch(1); CountDownLatch startLatch = new CountDownLatch(1);
Runnable work = new Work(new Callback(startLatch, pair.getSecond())); Runnable work = new Work(startLatch, new Callback(pair.getSecond()), pair.getFirst());
for (int i = 0; i < pair.getFirst(); i++) Thread thread = new Thread(work);
thread.setName(getName() + "-" + j++);
thread.setDaemon(true);
thread.start();
try
{
// Wait for the thread to get up and running. We need them starting in sequence
startLatch.await(60, TimeUnit.SECONDS);
}
catch (InterruptedException e)
{ {
Thread thread = new Thread(work);
thread.setName(getName() + "-" + j++);
thread.setDaemon(true);
thread.start();
try
{
// Wait for the thread to get up and running. We need them starting in sequence
startLatch.await(60, TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
}
} }
} }
// Wait for the threads to have finished // Wait for the threads to have finished