Relaxed TransactionalCache

- Treating the shared cache like a database w.r.t. concurrency leads to too many conflicts,
   when it is quite reasonable to just throw the cached values away when in doubt.
Fixed cache size tracing


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@5949 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Derek Hulley 2007-06-14 08:25:13 +00:00
parent fb069d1680
commit b65a6fe1a8
3 changed files with 81 additions and 166 deletions

View File

@ -33,14 +33,12 @@ import javax.transaction.UserTransaction;
import junit.framework.TestCase; import junit.framework.TestCase;
import net.sf.ehcache.CacheManager; import net.sf.ehcache.CacheManager;
import org.alfresco.error.ExceptionStackUtil;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.ServiceRegistry; import org.alfresco.service.ServiceRegistry;
import org.alfresco.service.transaction.TransactionService; import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.ApplicationContextHelper; import org.alfresco.util.ApplicationContextHelper;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.dao.ConcurrencyFailureException;
/** /**
* @see org.alfresco.repo.cache.EhCacheAdapter * @see org.alfresco.repo.cache.EhCacheAdapter
@ -54,9 +52,9 @@ public class CacheTest extends TestCase
); );
private ServiceRegistry serviceRegistry; private ServiceRegistry serviceRegistry;
private SimpleCache<String, Serializable> standaloneCache; private SimpleCache<String, Object> standaloneCache;
private SimpleCache<String, Serializable> backingCache; private SimpleCache<String, Object> backingCache;
private SimpleCache<String, Serializable> transactionalCache; private SimpleCache<String, Object> transactionalCache;
private SimpleCache<String, Object> objectCache; private SimpleCache<String, Object> objectCache;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -64,9 +62,9 @@ public class CacheTest extends TestCase
public void setUp() throws Exception public void setUp() throws Exception
{ {
serviceRegistry = (ServiceRegistry) ctx.getBean(ServiceRegistry.SERVICE_REGISTRY); serviceRegistry = (ServiceRegistry) ctx.getBean(ServiceRegistry.SERVICE_REGISTRY);
standaloneCache = (SimpleCache<String, Serializable>) ctx.getBean("ehCache1"); standaloneCache = (SimpleCache<String, Object>) ctx.getBean("ehCache1");
backingCache = (SimpleCache<String, Serializable>) ctx.getBean("backingCache"); backingCache = (SimpleCache<String, Object>) ctx.getBean("backingCache");
transactionalCache = (SimpleCache<String, Serializable>) ctx.getBean("transactionalCache"); transactionalCache = (SimpleCache<String, Object>) ctx.getBean("transactionalCache");
objectCache = (SimpleCache<String, Object>) ctx.getBean("objectCache"); objectCache = (SimpleCache<String, Object>) ctx.getBean("objectCache");
} }
@ -196,7 +194,7 @@ public class CacheTest extends TestCase
* @param objectCount * @param objectCount
* @return Returns the time it took in <b>nanoseconds</b>. * @return Returns the time it took in <b>nanoseconds</b>.
*/ */
public long runPerformanceTestOnCache(SimpleCache<String, Serializable> cache, int objectCount) public long runPerformanceTestOnCache(SimpleCache<String, Object> cache, int objectCount)
{ {
// preload // preload
for (int i = 0; i < objectCount; i++) for (int i = 0; i < objectCount; i++)
@ -279,9 +277,8 @@ public class CacheTest extends TestCase
} }
} }
private static final Class[] CONCURRENCY_EXCEPTIONS = {ConcurrencyFailureException.class}; /** Execute the callback and ensure that the backing cache is left with the expected value */
/** Execute the callback and ensure that the concurrent condition is detected */ private void executeAndCheck(RetryingTransactionCallback callback, Serializable key, Object expectedValue) throws Throwable
private void executeAndCheck(RetryingTransactionCallback callback) throws Exception
{ {
TransactionService transactionService = serviceRegistry.getTransactionService(); TransactionService transactionService = serviceRegistry.getTransactionService();
UserTransaction txn = transactionService.getUserTransaction(); UserTransaction txn = transactionService.getUserTransaction();
@ -290,13 +287,6 @@ public class CacheTest extends TestCase
txn.begin(); txn.begin();
callback.execute(); callback.execute();
txn.commit(); txn.commit();
fail("Failed to detect concurrent modification");
}
catch (Throwable e)
{
assertNotNull(
"Expected a concurrency failure",
ExceptionStackUtil.getCause(e, CONCURRENCY_EXCEPTIONS));
} }
finally finally
{ {
@ -304,6 +294,7 @@ public class CacheTest extends TestCase
} }
} }
private static final String COMMON_KEY = "A";
/** /**
* <ul> * <ul>
* <li>Add to the transaction cache</li> * <li>Add to the transaction cache</li>
@ -311,18 +302,39 @@ public class CacheTest extends TestCase
* <li>Commit</li> * <li>Commit</li>
* </ul> * </ul>
*/ */
public void testConcurrentAddAgainstAdd() throws Exception public void testConcurrentAddAgainstAdd()throws Throwable
{ {
RetryingTransactionCallback callback = new RetryingTransactionCallback() RetryingTransactionCallback callback = new RetryingTransactionCallback()
{ {
public Object execute() throws Throwable public Object execute() throws Throwable
{ {
transactionalCache.put("A", "AAA"); transactionalCache.put(COMMON_KEY, "AAA");
backingCache.put("A", "aaa"); backingCache.put(COMMON_KEY, "aaa");
return null; return null;
} }
}; };
executeAndCheck(callback); executeAndCheck(callback, COMMON_KEY, null);
}
/**
* <ul>
* <li>Add to the transaction cache</li>
* <li>Add to the backing cache</li>
* <li>Commit</li>
* </ul>
*/
public void testConcurrentAddAgainstAddSame()throws Throwable
{
final Object commonValue = "AAA";
RetryingTransactionCallback callback = new RetryingTransactionCallback()
{
public Object execute() throws Throwable
{
transactionalCache.put(COMMON_KEY, commonValue);
backingCache.put(COMMON_KEY, commonValue);
return null;
}
};
executeAndCheck(callback, COMMON_KEY, commonValue);
} }
/** /**
* <ul> * <ul>
@ -331,18 +343,18 @@ public class CacheTest extends TestCase
* <li>Commit</li> * <li>Commit</li>
* </ul> * </ul>
*/ */
public void testConcurrentAddAgainstClear() throws Exception public void testConcurrentAddAgainstClear()throws Throwable
{ {
RetryingTransactionCallback callback = new RetryingTransactionCallback() RetryingTransactionCallback callback = new RetryingTransactionCallback()
{ {
public Object execute() throws Throwable public Object execute() throws Throwable
{ {
transactionalCache.put("A", "AAA"); transactionalCache.put(COMMON_KEY, "AAA");
backingCache.clear(); backingCache.clear();
return null; return null;
} }
}; };
executeAndCheck(callback); executeAndCheck(callback, COMMON_KEY, null);
} }
/** /**
* <ul> * <ul>
@ -352,19 +364,19 @@ public class CacheTest extends TestCase
* <li>Commit</li> * <li>Commit</li>
* </ul> * </ul>
*/ */
public void testConcurrentUpdateAgainstUpdate() throws Exception public void testConcurrentUpdateAgainstUpdate()throws Throwable
{ {
RetryingTransactionCallback callback = new RetryingTransactionCallback() RetryingTransactionCallback callback = new RetryingTransactionCallback()
{ {
public Object execute() throws Throwable public Object execute() throws Throwable
{ {
backingCache.put("A", "aaa1"); backingCache.put(COMMON_KEY, "aaa1");
transactionalCache.put("A", "AAA"); transactionalCache.put(COMMON_KEY, "AAA");
backingCache.put("A", "aaa2"); backingCache.put(COMMON_KEY, "aaa2");
return null; return null;
} }
}; };
executeAndCheck(callback); executeAndCheck(callback, COMMON_KEY, null);
} }
/** /**
* <ul> * <ul>
@ -374,19 +386,19 @@ public class CacheTest extends TestCase
* <li>Commit</li> * <li>Commit</li>
* </ul> * </ul>
*/ */
public void testConcurrentUpdateAgainstUpdateNull() throws Exception public void testConcurrentUpdateAgainstUpdateNull()throws Throwable
{ {
RetryingTransactionCallback callback = new RetryingTransactionCallback() RetryingTransactionCallback callback = new RetryingTransactionCallback()
{ {
public Object execute() throws Throwable public Object execute() throws Throwable
{ {
backingCache.put("A", "aaa1"); backingCache.put(COMMON_KEY, "aaa1");
transactionalCache.put("A", "AAA"); transactionalCache.put(COMMON_KEY, "AAA");
backingCache.put("A", null); backingCache.put(COMMON_KEY, null);
return null; return null;
} }
}; };
executeAndCheck(callback); executeAndCheck(callback, COMMON_KEY, null);
} }
/** /**
* <ul> * <ul>
@ -396,19 +408,19 @@ public class CacheTest extends TestCase
* <li>Commit</li> * <li>Commit</li>
* </ul> * </ul>
*/ */
public void testConcurrentUpdateNullAgainstUpdate() throws Exception public void testConcurrentUpdateNullAgainstUpdate()throws Throwable
{ {
RetryingTransactionCallback callback = new RetryingTransactionCallback() RetryingTransactionCallback callback = new RetryingTransactionCallback()
{ {
public Object execute() throws Throwable public Object execute() throws Throwable
{ {
backingCache.put("A", "aaa1"); backingCache.put(COMMON_KEY, "aaa1");
transactionalCache.put("A", null); transactionalCache.put(COMMON_KEY, null);
backingCache.put("A", "aaa2"); backingCache.put(COMMON_KEY, "aaa2");
return null; return null;
} }
}; };
executeAndCheck(callback); executeAndCheck(callback, COMMON_KEY, null);
} }
/** /**
* <ul> * <ul>
@ -418,19 +430,19 @@ public class CacheTest extends TestCase
* <li>Commit</li> * <li>Commit</li>
* </ul> * </ul>
*/ */
public void testConcurrentUpdateAgainstRemove() throws Exception public void testConcurrentUpdateAgainstRemove()throws Throwable
{ {
RetryingTransactionCallback callback = new RetryingTransactionCallback() RetryingTransactionCallback callback = new RetryingTransactionCallback()
{ {
public Object execute() throws Throwable public Object execute() throws Throwable
{ {
backingCache.put("A", "aaa1"); backingCache.put(COMMON_KEY, "aaa1");
transactionalCache.put("A", "AAA"); transactionalCache.put(COMMON_KEY, "AAA");
backingCache.remove("A"); backingCache.remove(COMMON_KEY);
return null; return null;
} }
}; };
executeAndCheck(callback); executeAndCheck(callback, COMMON_KEY, null);
} }
/** /**
* <ul> * <ul>
@ -440,19 +452,19 @@ public class CacheTest extends TestCase
* <li>Commit</li> * <li>Commit</li>
* </ul> * </ul>
*/ */
public void testConcurrentUpdateAgainstClear() throws Exception public void testConcurrentUpdateAgainstClear()throws Throwable
{ {
RetryingTransactionCallback callback = new RetryingTransactionCallback() RetryingTransactionCallback callback = new RetryingTransactionCallback()
{ {
public Object execute() throws Throwable public Object execute() throws Throwable
{ {
backingCache.put("A", "aaa1"); backingCache.put(COMMON_KEY, "aaa1");
transactionalCache.put("A", "AAA"); transactionalCache.put(COMMON_KEY, "AAA");
backingCache.clear(); backingCache.clear();
return null; return null;
} }
}; };
executeAndCheck(callback); executeAndCheck(callback, COMMON_KEY, null);
} }
/** /**
* <ul> * <ul>
@ -462,19 +474,19 @@ public class CacheTest extends TestCase
* <li>Commit</li> * <li>Commit</li>
* </ul> * </ul>
*/ */
public void testConcurrentRemoveAgainstUpdate() throws Exception public void testConcurrentRemoveAgainstUpdate()throws Throwable
{ {
RetryingTransactionCallback callback = new RetryingTransactionCallback() RetryingTransactionCallback callback = new RetryingTransactionCallback()
{ {
public Object execute() throws Throwable public Object execute() throws Throwable
{ {
backingCache.put("A", "aaa1"); backingCache.put(COMMON_KEY, "aaa1");
transactionalCache.remove("A"); transactionalCache.remove(COMMON_KEY);
backingCache.put("A", "aaa2"); backingCache.put(COMMON_KEY, "aaa2");
return null; return null;
} }
}; };
executeAndCheck(callback); executeAndCheck(callback, COMMON_KEY, null);
} }
/** /**
* <ul> * <ul>
@ -484,19 +496,19 @@ public class CacheTest extends TestCase
* <li>Commit</li> * <li>Commit</li>
* </ul> * </ul>
*/ */
public void testConcurrentRemoveAgainstRemove() throws Exception public void testConcurrentRemoveAgainstRemove()throws Throwable
{ {
RetryingTransactionCallback callback = new RetryingTransactionCallback() RetryingTransactionCallback callback = new RetryingTransactionCallback()
{ {
public Object execute() throws Throwable public Object execute() throws Throwable
{ {
backingCache.put("A", "aaa1"); backingCache.put(COMMON_KEY, "aaa1");
transactionalCache.remove("A"); transactionalCache.remove(COMMON_KEY);
backingCache.remove("A"); backingCache.remove(COMMON_KEY);
return null; return null;
} }
}; };
executeAndCheck(callback); executeAndCheck(callback, COMMON_KEY, null);
} }
/** /**
* <ul> * <ul>
@ -506,18 +518,18 @@ public class CacheTest extends TestCase
* <li>Commit</li> * <li>Commit</li>
* </ul> * </ul>
*/ */
public void testConcurrentRemoveAgainstClear() throws Exception public void testConcurrentRemoveAgainstClear()throws Throwable
{ {
RetryingTransactionCallback callback = new RetryingTransactionCallback() RetryingTransactionCallback callback = new RetryingTransactionCallback()
{ {
public Object execute() throws Throwable public Object execute() throws Throwable
{ {
backingCache.put("A", "aaa1"); backingCache.put(COMMON_KEY, "aaa1");
transactionalCache.remove("A"); transactionalCache.remove(COMMON_KEY);
backingCache.clear(); backingCache.clear();
return null; return null;
} }
}; };
executeAndCheck(callback); executeAndCheck(callback, COMMON_KEY, null);
} }
} }

View File

@ -82,7 +82,7 @@ public class EhCacheTracerJob implements Job
{ {
if (cacheManager == null) if (cacheManager == null)
{ {
cacheManager = CacheManager.getInstance(); cacheManager = InternalEhCacheManagerFactoryBean.getInstance();
} }
long maxHeapSize = Runtime.getRuntime().maxMemory(); long maxHeapSize = Runtime.getRuntime().maxMemory();

View File

@ -41,7 +41,6 @@ import org.alfresco.util.EqualsHelper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
@ -535,40 +534,19 @@ public class TransactionalCache<K extends Serializable, V extends Object>
{ {
} }
/**
* NO-OP
*/
public void beforeCompletion() public void beforeCompletion()
{ {
} }
/** /**
* Check that the cache used is not out of date with the shared cache. * NO-OP
* Note that the clear flag is ignored as this would have removed all
* entries from the local caches anyway.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void beforeCommit(boolean readOnly) public void beforeCommit(boolean readOnly)
{ {
if (isDebugEnabled)
{
logger.debug("Processing pre-commit");
}
TransactionData txnData = getTransactionData();
// Process all the updates or new entries
for (Object obj : txnData.updatedItemsCache.getKeys())
{
Serializable key = (Serializable) obj;
Element element = txnData.updatedItemsCache.get(key);
CacheBucket<V> bucket = (CacheBucket<V>) element.getValue();
bucket.doPreCommit(sharedCache, key);
}
// Process all the removals
for (Object obj : txnData.removedItemsCache.getKeys())
{
Serializable key = (Serializable) obj;
Element element = txnData.removedItemsCache.get(key);
CacheBucket<V> bucket = (CacheBucket<V>) element.getValue();
bucket.doPreCommit(sharedCache, key);
}
} }
/** /**
@ -667,13 +645,6 @@ public class TransactionalCache<K extends Serializable, V extends Object>
* @return Returns the bucket's value * @return Returns the bucket's value
*/ */
BV getValue(); BV getValue();
/**
* Check that any new cache values have not been superceded by new values in the shared cache.
*
* @param sharedCache the cache to check
* @param key the key that the bucket was stored against
*/
void doPreCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key);
/** /**
* Flush the current bucket to the shared cache as far as possible. * Flush the current bucket to the shared cache as far as possible.
* *
@ -702,19 +673,6 @@ public class TransactionalCache<K extends Serializable, V extends Object>
{ {
return value; return value;
} }
public void doPreCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{
if (sharedCache.contains(key))
{
// The shared cache has a value where there wasn't one before. More than likely,
// this transaction would have used or modified that shared value.
throw new ConcurrencyFailureException(
"New cache bucket found new shared cache entry: \n" +
" Cache: " + name + "\n" +
" Key: " + key);
}
}
public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key) public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{ {
if (sharedCache.contains(key)) if (sharedCache.contains(key))
@ -750,33 +708,6 @@ public class TransactionalCache<K extends Serializable, V extends Object>
{ {
return originalValue; return originalValue;
} }
public void doPreCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{
if (sharedCache.contains(key))
{
BV sharedValue = sharedCache.get(key);
if (sharedValue == getOriginalValue())
{
// The cache entry is safe for writing
}
else
{
// The shared value has moved on since
throw new ConcurrencyFailureException(
"Update cache bucket found newer shared cache entry: \n" +
" Cache: " + name + "\n" +
" Key: " + key);
}
}
else
{
// The key was removed from the shared cache. This instance cannot update the entry.
throw new ConcurrencyFailureException(
"Update cache bucket couldn't fine entry to update: \n" +
" Cache: " + name + "\n" +
" Key: " + key);
}
}
public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key) public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{ {
BV sharedValue = sharedCache.get(key); BV sharedValue = sharedCache.get(key);
@ -812,34 +743,6 @@ public class TransactionalCache<K extends Serializable, V extends Object>
{ {
super(originalValue, null); super(originalValue, null);
} }
public void doPreCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{
BV sharedValue = sharedCache.get(key);
if (sharedValue != null)
{
if (sharedValue == getOriginalValue())
{
// The shared cache entry is the same as the one we were flagged to remove
}
else
{
// The shared value has moved on since
throw new ConcurrencyFailureException(
"Remove cache bucket found newer shared cache entry: \n" +
" Cache: " + name + "\n" +
" Key: " + key);
}
}
else
{
// The shared cache no longer has the value. It is possible that the removal of the
// item is something that would have affected the behaviour of the current transaction.
throw new ConcurrencyFailureException(
"Remove cache bucket found new shared cache entry: \n" +
" Cache: " + name + "\n" +
" Key: " + key);
}
}
public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key) public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{ {
if (sharedCache.contains(key)) if (sharedCache.contains(key))