mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
Fixed transactional cache when new entry was created concurrently with a clear: ALFCOM-3457
git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@17051 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -24,7 +24,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.alfresco.repo.cache;
|
package org.alfresco.repo.cache;
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
@@ -34,6 +33,7 @@ import javax.transaction.UserTransaction;
|
|||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
import net.sf.ehcache.CacheManager;
|
import net.sf.ehcache.CacheManager;
|
||||||
|
|
||||||
|
import org.alfresco.repo.cache.TransactionalCache.NullValueMarker;
|
||||||
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
|
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
|
||||||
import org.alfresco.repo.transaction.RetryingTransactionHelper;
|
import org.alfresco.repo.transaction.RetryingTransactionHelper;
|
||||||
import org.alfresco.repo.transaction.TransactionListenerAdapter;
|
import org.alfresco.repo.transaction.TransactionListenerAdapter;
|
||||||
@@ -192,7 +192,9 @@ public class CacheTest extends TestCase
|
|||||||
// update 3 in the cache
|
// update 3 in the cache
|
||||||
transactionalCache.put(updatedTxnThree, "XXX");
|
transactionalCache.put(updatedTxnThree, "XXX");
|
||||||
assertEquals("Item not updated in txn cache", "XXX", transactionalCache.get(updatedTxnThree));
|
assertEquals("Item not updated in txn cache", "XXX", transactionalCache.get(updatedTxnThree));
|
||||||
assertFalse("Item was put into backing cache", backingCache.contains(updatedTxnThree));
|
assertFalse("Item was put into backing cache (excl. NullValueMarker)",
|
||||||
|
backingCache.contains(updatedTxnThree) &&
|
||||||
|
!(backingCache.get(updatedTxnThree) instanceof NullValueMarker));
|
||||||
|
|
||||||
// check that the keys collection is correct
|
// check that the keys collection is correct
|
||||||
Collection<String> transactionalKeys = transactionalCache.getKeys();
|
Collection<String> transactionalKeys = transactionalCache.getKeys();
|
||||||
@@ -437,7 +439,7 @@ public class CacheTest extends TestCase
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Execute the callback and ensure that the backing cache is left with the expected value */
|
/** Execute the callback and ensure that the backing cache is left with the expected value */
|
||||||
private void executeAndCheck(RetryingTransactionCallback<Object> callback, Serializable key, Object expectedValue) throws Throwable
|
private void executeAndCheck(RetryingTransactionCallback<Object> callback, String key, Object expectedValue) throws Throwable
|
||||||
{
|
{
|
||||||
TransactionService transactionService = serviceRegistry.getTransactionService();
|
TransactionService transactionService = serviceRegistry.getTransactionService();
|
||||||
UserTransaction txn = transactionService.getUserTransaction();
|
UserTransaction txn = transactionService.getUserTransaction();
|
||||||
@@ -451,6 +453,15 @@ public class CacheTest extends TestCase
|
|||||||
{
|
{
|
||||||
try { txn.rollback(); } catch (Throwable ee) {}
|
try { txn.rollback(); } catch (Throwable ee) {}
|
||||||
}
|
}
|
||||||
|
Object actualValue = backingCache.get(key);
|
||||||
|
if (expectedValue == null)
|
||||||
|
{
|
||||||
|
assertNull("Expected backing cache to have null", actualValue);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
assertEquals("Backing cache value was not correct", expectedValue, actualValue);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final String COMMON_KEY = "A";
|
private static final String COMMON_KEY = "A";
|
||||||
@@ -493,7 +504,7 @@ public class CacheTest extends TestCase
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
executeAndCheck(callback, COMMON_KEY, commonValue);
|
executeAndCheck(callback, COMMON_KEY, null);
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* <ul>
|
* <ul>
|
||||||
|
@@ -80,14 +80,14 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
{
|
{
|
||||||
private static final String RESOURCE_KEY_TXN_DATA = "TransactionalCache.TxnData";
|
private static final String RESOURCE_KEY_TXN_DATA = "TransactionalCache.TxnData";
|
||||||
|
|
||||||
private static Log logger = LogFactory.getLog(TransactionalCache.class);
|
private Log logger;
|
||||||
private static boolean isDebugEnabled = logger.isDebugEnabled();
|
private boolean isDebugEnabled;
|
||||||
|
|
||||||
/** a name used to uniquely identify the transactional caches */
|
/** a name used to uniquely identify the transactional caches */
|
||||||
private String name;
|
private String name;
|
||||||
|
|
||||||
/** the shared cache that will get updated after commits */
|
/** the shared cache that will get updated after commits */
|
||||||
private SimpleCache<Serializable, V> sharedCache;
|
private SimpleCache<Serializable, Object> sharedCache;
|
||||||
|
|
||||||
/** the manager to control Ehcache caches */
|
/** the manager to control Ehcache caches */
|
||||||
private CacheManager cacheManager;
|
private CacheManager cacheManager;
|
||||||
@@ -103,6 +103,8 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
*/
|
*/
|
||||||
public TransactionalCache()
|
public TransactionalCache()
|
||||||
{
|
{
|
||||||
|
logger = LogFactory.getLog(TransactionalCache.class);
|
||||||
|
isDebugEnabled = logger.isDebugEnabled();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -143,7 +145,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
*
|
*
|
||||||
* @param sharedCache
|
* @param sharedCache
|
||||||
*/
|
*/
|
||||||
public void setSharedCache(SimpleCache<Serializable, V> sharedCache)
|
public void setSharedCache(SimpleCache<Serializable, Object> sharedCache)
|
||||||
{
|
{
|
||||||
this.sharedCache = sharedCache;
|
this.sharedCache = sharedCache;
|
||||||
}
|
}
|
||||||
@@ -192,6 +194,9 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
Assert.notNull(cacheManager, "cacheManager property not set");
|
Assert.notNull(cacheManager, "cacheManager property not set");
|
||||||
// generate the resource binding key
|
// generate the resource binding key
|
||||||
resourceKeyTxnData = RESOURCE_KEY_TXN_DATA + "." + name;
|
resourceKeyTxnData = RESOURCE_KEY_TXN_DATA + "." + name;
|
||||||
|
// Refine the log category
|
||||||
|
logger = LogFactory.getLog(TransactionalCache.class + "." + name);
|
||||||
|
isDebugEnabled = logger.isDebugEnabled();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -280,6 +285,27 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
return keys;
|
return keys;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches a value from the shared cache, checking for {@link NullValueMarker null markers}.
|
||||||
|
*
|
||||||
|
* @param key the key
|
||||||
|
* @return Returns the value or <tt>null</tt>
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private V getSharedCacheValue(K key)
|
||||||
|
{
|
||||||
|
Object valueObj = sharedCache.get(key);
|
||||||
|
if (valueObj instanceof NullValueMarker)
|
||||||
|
{
|
||||||
|
// Someone has already marked this as a null
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return (V) valueObj;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks the per-transaction caches for the object before going to the shared cache.
|
* Checks the per-transaction caches for the object before going to the shared cache.
|
||||||
* If the thread is not in a transaction, then the shared cache is accessed directly.
|
* If the thread is not in a transaction, then the shared cache is accessed directly.
|
||||||
@@ -345,7 +371,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
// no value found - must we ignore the shared cache?
|
// no value found - must we ignore the shared cache?
|
||||||
if (!ignoreSharedCache)
|
if (!ignoreSharedCache)
|
||||||
{
|
{
|
||||||
V value = sharedCache.get(key);
|
V value = getSharedCacheValue(key);
|
||||||
// go to the shared cache
|
// go to the shared cache
|
||||||
if (isDebugEnabled)
|
if (isDebugEnabled)
|
||||||
{
|
{
|
||||||
@@ -419,14 +445,17 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
CacheBucket<V> bucket = null;
|
CacheBucket<V> bucket = null;
|
||||||
if (sharedCache.contains(key))
|
if (sharedCache.contains(key))
|
||||||
{
|
{
|
||||||
V existingValue = sharedCache.get(key);
|
V existingValue = getSharedCacheValue(key);
|
||||||
// The value needs to be kept for later checks
|
// The value needs to be kept for later checks
|
||||||
bucket = new UpdateCacheBucket<V>(existingValue, value);
|
bucket = new UpdateCacheBucket<V>(existingValue, value);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
// Insert a 'null' marker into the shared cache
|
||||||
|
NullValueMarker nullMarker = new NullValueMarker();
|
||||||
|
sharedCache.put(key, nullMarker);
|
||||||
// The value didn't exist before
|
// The value didn't exist before
|
||||||
bucket = new NewCacheBucket<V>(value);
|
bucket = new NewCacheBucket<V>(nullMarker, value);
|
||||||
}
|
}
|
||||||
Element element = new Element(key, bucket);
|
Element element = new Element(key, bucket);
|
||||||
txnData.updatedItemsCache.put(element);
|
txnData.updatedItemsCache.put(element);
|
||||||
@@ -512,7 +541,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
V existingValue = sharedCache.get(key);
|
V existingValue = getSharedCacheValue(key);
|
||||||
if (existingValue == null)
|
if (existingValue == null)
|
||||||
{
|
{
|
||||||
// There is no point doing a remove for a value that doesn't exist
|
// There is no point doing a remove for a value that doesn't exist
|
||||||
@@ -691,6 +720,16 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
txnData.isClosed = true;
|
txnData.isClosed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Instances of this class are used to mark the shared cache null values for cases where
|
||||||
|
* new values are going to be inserted into it.
|
||||||
|
*
|
||||||
|
* @author Derek Hulley
|
||||||
|
*/
|
||||||
|
public static class NullValueMarker
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for the transactional cache buckets. These hold the actual values along
|
* Interface for the transactional cache buckets. These hold the actual values along
|
||||||
* with some state and behaviour around writing from the in-transaction caches to the
|
* with some state and behaviour around writing from the in-transaction caches to the
|
||||||
@@ -710,12 +749,12 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
* @param sharedCache the cache to flush to
|
* @param sharedCache the cache to flush to
|
||||||
* @param key the key that the bucket was stored against
|
* @param key the key that the bucket was stored against
|
||||||
*/
|
*/
|
||||||
public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key);
|
public void doPostCommit(SimpleCache<Serializable, Object> sharedCache, Serializable key);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A bucket class to hold values for the caches.<br/>
|
* A bucket class to hold values for the caches.<br/>
|
||||||
* The cache ID and timestamp of the bucket is stored to ensure cache consistency.
|
* The cache assumes the presence of a marker object to
|
||||||
*
|
*
|
||||||
* @author Derek Hulley
|
* @author Derek Hulley
|
||||||
*/
|
*/
|
||||||
@@ -724,55 +763,65 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
private static final long serialVersionUID = -8536386687213957425L;
|
private static final long serialVersionUID = -8536386687213957425L;
|
||||||
|
|
||||||
private final BV value;
|
private final BV value;
|
||||||
public NewCacheBucket(BV value)
|
private final NullValueMarker nullMarker;
|
||||||
|
public NewCacheBucket(NullValueMarker nullMarker, BV value)
|
||||||
{
|
{
|
||||||
this.value = value;
|
this.value = value;
|
||||||
|
this.nullMarker = nullMarker;
|
||||||
|
}
|
||||||
|
public BV getValue()
|
||||||
|
{
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
public void doPostCommit(SimpleCache<Serializable, Object> sharedCache, Serializable key)
|
||||||
|
{
|
||||||
|
Object sharedValue = sharedCache.get(key);
|
||||||
|
if (sharedValue != null)
|
||||||
|
{
|
||||||
|
if (sharedValue == nullMarker)
|
||||||
|
{
|
||||||
|
// The shared cache entry didn't change during the txn and is safe for writing
|
||||||
|
sharedCache.put(key, value);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// The shared value has moved on since
|
||||||
|
sharedCache.remove(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// The shared cache no longer has a value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Data holder to keep track of a cached value's ID in order to detect stale
|
||||||
|
* shared cache values. This bucket assumes the presence of a pre-existing entry in
|
||||||
|
* the shared cache.
|
||||||
|
*/
|
||||||
|
private static class UpdateCacheBucket<BV> implements CacheBucket<BV>
|
||||||
|
{
|
||||||
|
private static final long serialVersionUID = 7885689778259779578L;
|
||||||
|
|
||||||
|
private final BV value;
|
||||||
|
private final BV originalValue;
|
||||||
|
public UpdateCacheBucket(BV originalValue, BV value)
|
||||||
|
{
|
||||||
|
this.originalValue = originalValue;
|
||||||
|
this.value = value;
|
||||||
}
|
}
|
||||||
public BV getValue()
|
public BV getValue()
|
||||||
{
|
{
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
|
public void doPostCommit(SimpleCache<Serializable, Object> sharedCache, Serializable key)
|
||||||
{
|
{
|
||||||
if (sharedCache.contains(key))
|
Object sharedValue = sharedCache.get(key);
|
||||||
{
|
|
||||||
// The shared cache has a value where there wasn't one before.
|
|
||||||
// Just lose both of them.
|
|
||||||
sharedCache.remove(key);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// There is nothing in the shared cache, so add this entry
|
|
||||||
sharedCache.put(key, getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Data holder to keep track of a cached value's timestamps in order to detect stale
|
|
||||||
* shared cache values. This bucket assumes the presence of a pre-existing entry in
|
|
||||||
* the shared cache.
|
|
||||||
*/
|
|
||||||
private static class UpdateCacheBucket<BV> extends NewCacheBucket<BV>
|
|
||||||
{
|
|
||||||
private static final long serialVersionUID = 7885689778259779578L;
|
|
||||||
|
|
||||||
private final BV originalValue;
|
|
||||||
public UpdateCacheBucket(BV originalValue, BV value)
|
|
||||||
{
|
|
||||||
super(value);
|
|
||||||
this.originalValue = originalValue;
|
|
||||||
}
|
|
||||||
protected BV getOriginalValue()
|
|
||||||
{
|
|
||||||
return originalValue;
|
|
||||||
}
|
|
||||||
public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
|
|
||||||
{
|
|
||||||
BV sharedValue = sharedCache.get(key);
|
|
||||||
if (sharedValue != null)
|
if (sharedValue != null)
|
||||||
{
|
{
|
||||||
if (sharedValue == getOriginalValue())
|
if (sharedValue == originalValue)
|
||||||
{
|
{
|
||||||
// The cache entry is safe for writing
|
// The cache entry is safe for writing
|
||||||
sharedCache.put(key, getValue());
|
sharedCache.put(key, getValue());
|
||||||
@@ -802,7 +851,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
|
|||||||
{
|
{
|
||||||
super(originalValue, null);
|
super(originalValue, null);
|
||||||
}
|
}
|
||||||
public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
|
public void doPostCommit(SimpleCache<Serializable, Object> sharedCache, Serializable key)
|
||||||
{
|
{
|
||||||
// We remove the shared entry whether it has moved on or not
|
// We remove the shared entry whether it has moved on or not
|
||||||
sharedCache.remove(key);
|
sharedCache.remove(key);
|
||||||
|
Reference in New Issue
Block a user