Fixed unreported error: New transactions started in the post-commit phase *might* see stale cache data

- Transactional caches were flushing last in the post-commit phase
 - New transactions in the post-commit phase would see cache data out of synch with the DB-committed data
 - Affects DOD5015 post-commit audit actions, which were not generating full datasets for the first-pass data import

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@16627 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Derek Hulley
2009-09-30 15:41:37 +00:00
parent 8a1a86dc0f
commit 77457e3670
3 changed files with 172 additions and 107 deletions

View File

@@ -169,6 +169,7 @@ public class CacheTest extends TestCase
String newGlobalTwo = "new_global_two"; String newGlobalTwo = "new_global_two";
String newGlobalThree = "new_global_three"; String newGlobalThree = "new_global_three";
String updatedTxnThree = "updated_txn_three"; String updatedTxnThree = "updated_txn_three";
String updatedTxnFour = "updated_txn_four";
// add item to global cache // add item to global cache
backingCache.put(newGlobalOne, newGlobalOne); backingCache.put(newGlobalOne, newGlobalOne);
@@ -198,17 +199,25 @@ public class CacheTest extends TestCase
assertFalse("Transactionally removed item found in keys", transactionalKeys.contains(newGlobalOne)); assertFalse("Transactionally removed item found in keys", transactionalKeys.contains(newGlobalOne));
assertTrue("Transactionally added item not found in keys", transactionalKeys.contains(updatedTxnThree)); assertTrue("Transactionally added item not found in keys", transactionalKeys.contains(updatedTxnThree));
// Register a post-commit stresser. We do this here so that it is registered after the transactional cache // Register a post-commit cache reader to make sure that nothing blows up if the cache is hit in post-commit
PostCommitCacheUser listener = new PostCommitCacheUser(transactionalCache, updatedTxnThree); PostCommitCacheReader listenerReader = new PostCommitCacheReader(transactionalCache, updatedTxnThree);
AlfrescoTransactionSupport.bindListener(listener); AlfrescoTransactionSupport.bindListener(listenerReader);
// Register a post-commit cache reader to make sure that nothing blows up if the cache is hit in post-commit
PostCommitCacheWriter listenerWriter = new PostCommitCacheWriter(transactionalCache, updatedTxnFour, "FOUR");
AlfrescoTransactionSupport.bindListener(listenerWriter);
// commit the transaction // commit the transaction
txn.commit(); txn.commit();
// Check the post-commit stresser // Check the post-commit stressers
if (listener.e != null) if (listenerReader.e != null)
{ {
throw listener.e; throw listenerReader.e;
}
if (listenerWriter.e != null)
{
throw listenerWriter.e;
} }
// check that backing cache was updated with the in-transaction changes // check that backing cache was updated with the in-transaction changes
@@ -231,18 +240,18 @@ public class CacheTest extends TestCase
} }
/** /**
* This transaction listener attempts to use the cache in the afterCommit phase. Technically the * This transaction listener attempts to read from the cache in the afterCommit phase. Technically the
* transaction has finished, but the transaction resources are still available. * transaction has finished, but the transaction resources are still available.
* *
* @author Derek Hulley * @author Derek Hulley
* @since 2.1 * @since 2.1
*/ */
private class PostCommitCacheUser extends TransactionListenerAdapter private class PostCommitCacheReader extends TransactionListenerAdapter
{ {
private final SimpleCache<String, Object> transactionalCache; private final SimpleCache<String, Object> transactionalCache;
private final String key; private final String key;
private Throwable e; private Throwable e;
private PostCommitCacheUser(SimpleCache<String, Object> transactionalCache, String key) private PostCommitCacheReader(SimpleCache<String, Object> transactionalCache, String key)
{ {
this.transactionalCache = transactionalCache; this.transactionalCache = transactionalCache;
this.key = key; this.key = key;
@@ -260,12 +269,42 @@ public class CacheTest extends TestCase
return; return;
} }
} }
@Override }
public int hashCode()
/**
* This transaction listener attempts to write to the cache in the afterCommit phase. Technically the
* transaction has finished, but the transaction resources are still available.
*
* @author Derek Hulley
* @since 2.1
*/
private class PostCommitCacheWriter extends TransactionListenerAdapter
{
private final SimpleCache<String, Object> transactionalCache;
private final String key;
private final Object value;
private Throwable e;
private PostCommitCacheWriter(SimpleCache<String, Object> transactionalCache, String key, Object value)
{ {
return -100000; this.transactionalCache = transactionalCache;
this.key = key;
this.value = value;
}
@Override
public void afterCommit()
{
try
{
transactionalCache.put(key, value);
transactionalCache.remove(key);
transactionalCache.clear();
}
catch (Throwable e)
{
this.e = e;
return;
}
} }
} }
/** /**

View File

@@ -123,7 +123,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
{ {
return false; return false;
} }
if (!(obj instanceof TransactionalCache)) if (!(obj instanceof TransactionalCache<?, ?>))
{ {
return false; return false;
} }
@@ -396,40 +396,50 @@ public class TransactionalCache<K extends Serializable, V extends Object>
// Ensure that the cache isn't being modified // Ensure that the cache isn't being modified
if (txnData.isClosed) if (txnData.isClosed)
{ {
throw new AlfrescoRuntimeException("onCommit cache modifications are not allowed."); if (isDebugEnabled)
} {
// we have a transaction - add the item into the updated cache for this transaction logger.debug(
// are we in an overflow condition? "In post-commit add: \n" +
if (txnData.updatedItemsCache.getMemoryStoreSize() >= maxCacheSize) " cache: " + this + "\n" +
{ " key: " + key + "\n" +
// overflow about to occur or has occured - we can only guarantee non-stale " value: " + value);
// data by clearing the shared cache after the transaction. Also, the }
// shared cache needs to be ignored for the rest of the transaction.
txnData.isClearOn = true;
}
CacheBucket<V> bucket = null;
if (sharedCache.contains(key))
{
V existingValue = sharedCache.get(key);
// The value needs to be kept for later checks
bucket = new UpdateCacheBucket<V>(existingValue, value);
} }
else else
{ {
// The value didn't exist before // we have an active transaction - add the item into the updated cache for this transaction
bucket = new NewCacheBucket<V>(value); // are we in an overflow condition?
} if (txnData.updatedItemsCache.getMemoryStoreSize() >= maxCacheSize)
Element element = new Element(key, bucket); {
txnData.updatedItemsCache.put(element); // overflow about to occur or has occured - we can only guarantee non-stale
// remove the item from the removed cache, if present // data by clearing the shared cache after the transaction. Also, the
txnData.removedItemsCache.remove(key); // shared cache needs to be ignored for the rest of the transaction.
// done txnData.isClearOn = true;
if (isDebugEnabled) }
{ CacheBucket<V> bucket = null;
logger.debug("In transaction - adding item direct to transactional update cache: \n" + if (sharedCache.contains(key))
" cache: " + this + "\n" + {
" key: " + key + "\n" + V existingValue = sharedCache.get(key);
" value: " + value); // The value needs to be kept for later checks
bucket = new UpdateCacheBucket<V>(existingValue, value);
}
else
{
// The value didn't exist before
bucket = new NewCacheBucket<V>(value);
}
Element element = new Element(key, bucket);
txnData.updatedItemsCache.put(element);
// remove the item from the removed cache, if present
txnData.removedItemsCache.remove(key);
// done
if (isDebugEnabled)
{
logger.debug("In transaction - adding item direct to transactional update cache: \n" +
" cache: " + this + "\n" +
" key: " + key + "\n" +
" value: " + value);
}
} }
} }
} }
@@ -461,53 +471,62 @@ public class TransactionalCache<K extends Serializable, V extends Object>
// Ensure that the cache isn't being modified // Ensure that the cache isn't being modified
if (txnData.isClosed) if (txnData.isClosed)
{ {
throw new AlfrescoRuntimeException("onCommit cache modifications are not allowed."); if (isDebugEnabled)
} {
// is the shared cache going to be cleared? logger.debug(
if (txnData.isClearOn) "In post-commit remove: \n" +
{ " cache: " + this + "\n" +
// don't store removals if we're just going to clear it all out later " key: " + key);
}
} }
else else
{ {
// are we in an overflow condition? // is the shared cache going to be cleared?
if (txnData.removedItemsCache.getMemoryStoreSize() >= maxCacheSize) if (txnData.isClearOn)
{ {
// overflow about to occur or has occured - we can only guarantee non-stale // don't store removals if we're just going to clear it all out later
// data by clearing the shared cache after the transaction. Also, the
// shared cache needs to be ignored for the rest of the transaction.
txnData.isClearOn = true;
if (isDebugEnabled)
{
logger.debug("In transaction - removal cache reach capacity reached: \n" +
" cache: " + this + "\n" +
" txn: " + AlfrescoTransactionSupport.getTransactionId());
}
} }
else else
{ {
V existingValue = sharedCache.get(key); // are we in an overflow condition?
if (existingValue == null) if (txnData.removedItemsCache.getMemoryStoreSize() >= maxCacheSize)
{ {
// There is no point doing a remove for a value that doesn't exist // overflow about to occur or has occured - we can only guarantee non-stale
// data by clearing the shared cache after the transaction. Also, the
// shared cache needs to be ignored for the rest of the transaction.
txnData.isClearOn = true;
if (isDebugEnabled)
{
logger.debug("In transaction - removal cache reach capacity reached: \n" +
" cache: " + this + "\n" +
" txn: " + AlfrescoTransactionSupport.getTransactionId());
}
} }
else else
{ {
// Create a bucket to remove the value from the shared cache V existingValue = sharedCache.get(key);
CacheBucket<V> removeBucket = new RemoveCacheBucket<V>(existingValue); if (existingValue == null)
Element element = new Element(key, removeBucket); {
txnData.removedItemsCache.put(element); // There is no point doing a remove for a value that doesn't exist
}
else
{
// Create a bucket to remove the value from the shared cache
CacheBucket<V> removeBucket = new RemoveCacheBucket<V>(existingValue);
Element element = new Element(key, removeBucket);
txnData.removedItemsCache.put(element);
}
} }
} }
} // remove the item from the udpated cache, if present
// remove the item from the udpated cache, if present txnData.updatedItemsCache.remove(key);
txnData.updatedItemsCache.remove(key); // done
// done if (isDebugEnabled)
if (isDebugEnabled) {
{ logger.debug("In transaction - adding item direct to transactional removed cache: \n" +
logger.debug("In transaction - adding item direct to transactional removed cache: \n" + " cache: " + this + "\n" +
" cache: " + this + "\n" + " key: " + key);
" key: " + key); }
} }
} }
} }
@@ -531,14 +550,22 @@ public class TransactionalCache<K extends Serializable, V extends Object>
// Ensure that the cache isn't being modified // Ensure that the cache isn't being modified
if (txnData.isClosed) if (txnData.isClosed)
{ {
throw new AlfrescoRuntimeException("onCommit cache modifications are not allowed."); if (isDebugEnabled)
{
logger.debug(
"In post-commit clear: \n" +
" cache: " + this);
}
}
else
{
// the shared cache must be cleared at the end of the transaction
// and also serves to ensure that the shared cache will be ignored
// for the remainder of the transaction
txnData.isClearOn = true;
txnData.updatedItemsCache.removeAll();
txnData.removedItemsCache.removeAll();
} }
// the shared cache must be cleared at the end of the transaction
// and also serves to ensure that the shared cache will be ignored
// for the remainder of the transaction
txnData.isClearOn = true;
txnData.updatedItemsCache.removeAll();
txnData.removedItemsCache.removeAll();
} }
else // no transaction else // no transaction
{ {
@@ -568,7 +595,6 @@ public class TransactionalCache<K extends Serializable, V extends Object>
/** /**
* NO-OP * NO-OP
*/ */
@SuppressWarnings("unchecked")
public void beforeCommit(boolean readOnly) public void beforeCommit(boolean readOnly)
{ {
} }

View File

@@ -778,6 +778,26 @@ public abstract class AlfrescoTransactionSupport
logger.debug("After completion (" + statusStr + "): " + this); logger.debug("After completion (" + statusStr + "): " + this);
} }
// Clean up the transactional caches
for (TransactionalCache<Serializable, Object> cache : transactionalCaches)
{
try
{
if (status == TransactionSynchronization.STATUS_COMMITTED)
{
cache.afterCommit();
}
else
{
cache.afterRollback();
}
}
catch (RuntimeException e)
{
logger.error("After completion (" + statusStr + ") TransactionalCache exception", e);
}
}
List<TransactionListener> iterableListeners = getListenersIterable(); List<TransactionListener> iterableListeners = getListenersIterable();
// notify listeners // notify listeners
if (status == TransactionSynchronization.STATUS_COMMITTED) if (status == TransactionSynchronization.STATUS_COMMITTED)
@@ -833,26 +853,6 @@ public abstract class AlfrescoTransactionSupport
} }
} }
// Clean up the transactional caches
for (TransactionalCache<Serializable, Object> cache : transactionalCaches)
{
try
{
if (status == TransactionSynchronization.STATUS_COMMITTED)
{
cache.afterCommit();
}
else
{
cache.afterRollback();
}
}
catch (RuntimeException e)
{
logger.error("After completion (" + statusStr + ") TransactionalCache exception", e);
}
}
// clear the thread's registrations and synchronizations // clear the thread's registrations and synchronizations
AlfrescoTransactionSupport.clearSynchronization(); AlfrescoTransactionSupport.clearSynchronization();
} }