More fussing with lookup cache. It seems a hair faster.

Some cleanup of the retrying transaction stuff.
Minor mods to one stress test to account for changes to retrying
transactions.  


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@4616 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Britt Park
2006-12-15 06:42:50 +00:00
parent 8c777dbb94
commit d9a20c4e55
6 changed files with 214 additions and 86 deletions

View File

@@ -48,6 +48,6 @@ public class AVMLookupCacheListener extends TransactionListenerAdapter
@Override @Override
public void afterCommit() public void afterCommit()
{ {
fLookupCache.commitLookups(); fLookupCache.onCommit();
} }
} }

View File

@@ -35,8 +35,8 @@ public class AVMStressTestP extends AVMServiceTestBase
{ {
try try
{ {
int nCopies = 4; int nCopies = 8;
int nThreads = 2; int nThreads = 4;
BulkLoader loader = new BulkLoader(); BulkLoader loader = new BulkLoader();
loader.setAvmService(fService); loader.setAvmService(fService);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
@@ -52,9 +52,9 @@ public class AVMStressTestP extends AVMServiceTestBase
for (int i = 0; i < nThreads; i++) for (int i = 0; i < nThreads; i++)
{ {
AVMTester tester AVMTester tester
= new AVMTester(800, // create file. = new AVMTester(400, // create file.
20, // create dir, 20, // create dir,
0, // rename 5, // rename
5, // create layered dir 5, // create layered dir
5, // create layered file 5, // create layered file
10, // remove node 10, // remove node

View File

@@ -27,13 +27,10 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.service.cmr.avm.AVMException; import org.alfresco.service.cmr.avm.AVMException;
import org.alfresco.service.cmr.avm.AVMNodeDescriptor; import org.alfresco.service.cmr.avm.AVMNodeDescriptor;
import org.alfresco.service.cmr.avm.AVMService; import org.alfresco.service.cmr.avm.AVMService;
import org.alfresco.service.cmr.repository.InvalidNodeRefException; import org.alfresco.service.cmr.repository.ContentIOException;
import org.hibernate.HibernateException;
import org.springframework.dao.ConcurrencyFailureException;
/** /**
* This is a Runnable which randomly performs operations on an AVM Repository. * This is a Runnable which randomly performs operations on an AVM Repository.
@@ -572,11 +569,11 @@ class AVMTester implements Runnable
private void handleException(Exception e) private void handleException(Exception e)
{ {
e.printStackTrace(System.err); e.printStackTrace(System.err);
if (e instanceof AVMException || if (e instanceof AVMException)
e instanceof AlfrescoRuntimeException || {
e instanceof ConcurrencyFailureException || return;
e instanceof HibernateException || }
e instanceof InvalidNodeRefException) if (e instanceof ContentIOException)
{ {
return; return;
} }

View File

@@ -5,8 +5,10 @@ package org.alfresco.repo.avm;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
@@ -21,12 +23,18 @@ public class LookupCache
{ {
private static Logger fgLogger = Logger.getLogger(LookupCache.class); private static Logger fgLogger = Logger.getLogger(LookupCache.class);
/** /**
* Per transaction lookup results to be added to the cache on successful * Per transaction lookup results to be added to the cache on successful
* commit. * commit.
*/ */
private ThreadLocal<Map<LookupKey, Lookup>> fToBeAdded; private ThreadLocal<Map<LookupKey, Lookup>> fToBeAdded;
/**
* Per transaction set of invalidated lookup keys.
*/
private ThreadLocal<Set<LookupKey>> fToBePurged;
/** /**
* The Map of of keys to lookups. * The Map of of keys to lookups.
*/ */
@@ -71,6 +79,7 @@ public class LookupCache
fTimeStamps = new TreeMap<Long, LookupKey>(); fTimeStamps = new TreeMap<Long, LookupKey>();
fInverseTimeStamps = new HashMap<LookupKey, Long>(); fInverseTimeStamps = new HashMap<LookupKey, Long>();
fToBeAdded = new ThreadLocal<Map<LookupKey, Lookup>>(); fToBeAdded = new ThreadLocal<Map<LookupKey, Lookup>>();
fToBePurged = new ThreadLocal<Set<LookupKey>>();
fTimeStamp = 0L; fTimeStamp = 0L;
fMaxSize = 100; fMaxSize = 100;
} }
@@ -114,7 +123,9 @@ public class LookupCache
public Lookup lookup(AVMStore store, int version, SimplePath path, public Lookup lookup(AVMStore store, int version, SimplePath path,
boolean write, boolean includeDeleted) boolean write, boolean includeDeleted)
{ {
// Create a key object.
LookupKey key = new LookupKey(version, path, store.getName(), write, includeDeleted); LookupKey key = new LookupKey(version, path, store.getName(), write, includeDeleted);
// Is it in the cache?
Lookup found = findInCache(key); Lookup found = findInCache(key);
if (found != null) if (found != null)
{ {
@@ -186,43 +197,69 @@ public class LookupCache
*/ */
private synchronized Lookup findInCache(LookupKey key) private synchronized Lookup findInCache(LookupKey key)
{ {
Map<LookupKey, Lookup> map = fToBeAdded.get(); // Get the current transaction's purged set.
Lookup found = (map != null) ? map.get(key) : null; Set<LookupKey> purged = fToBePurged.get();
// Get the current transaction's added map.
Map<LookupKey, Lookup> added = fToBeAdded.get();
// See if it's cached in the transaction.
Lookup found = (added != null) ? added.get(key) : null;
// It's not.
if (found == null) if (found == null)
{ {
// If it's been purged in the transaction it is
// a miss.
if (purged != null && purged.contains(key))
{
return null;
}
found = fCache.get(key); found = fCache.get(key);
} }
// Despite the odds, we found a hit.
if (found != null) if (found != null)
{ {
// Get a freshened Lookup.
Lookup result = new Lookup(found, fAVMNodeDAO, fAVMStoreDAO); Lookup result = new Lookup(found, fAVMNodeDAO, fAVMStoreDAO);
// Check that nothing horrible is wrong. This should
// be assertible, but I'll leave the check in for now.
if (!result.isValid()) if (!result.isValid())
{ {
fgLogger.error("Invalid entry in cache: " + key); fgLogger.error("Invalid entry in cache: " + key);
onRollback();
return null; return null;
} }
// Prepare the cache for a timestamp update on commit.
updateCache(key, found); updateCache(key, found);
return result; return result;
} }
// Alternatively for a read lookup a write can match. // Alternatively for a read lookup a write can match.
if (!key.isWrite()) if (!key.isWrite())
{ {
// Make a copy of the key and set it to 'write'
LookupKey newKey = new LookupKey(key); LookupKey newKey = new LookupKey(key);
newKey.setWrite(true); newKey.setWrite(true);
found = (map != null) ? map.get(newKey) : null; // Is it in the transaction's cache?
found = (added != null) ? added.get(newKey) : null;
// If not.
if (found == null) if (found == null)
{ {
// If it's been purged it's a miss.
if (purged != null && purged.contains(newKey))
{
return null;
}
found = fCache.get(newKey); found = fCache.get(newKey);
} }
if (found != null) if (found != null)
{ {
// We found it. Make a freshened copy of the Lookup.
Lookup result = new Lookup(found, fAVMNodeDAO, fAVMStoreDAO); Lookup result = new Lookup(found, fAVMNodeDAO, fAVMStoreDAO);
// Check for badness. This should be assertible but I'll
// leave the check in for now.
if (!result.isValid()) if (!result.isValid())
{ {
fgLogger.error("Invalid entry in cache: " + newKey); fgLogger.error("Invalid entry in cache: " + newKey);
onRollback();
return null; return null;
} }
// Prepare the cache to update time stamp.
updateCache(newKey, found); updateCache(newKey, found);
return result; return result;
} }
@@ -237,29 +274,47 @@ public class LookupCache
*/ */
private void updateCache(LookupKey key, Lookup lookup) private void updateCache(LookupKey key, Lookup lookup)
{ {
// First, put it in the transaction scoped cache.
Map<LookupKey, Lookup> map = fToBeAdded.get(); Map<LookupKey, Lookup> map = fToBeAdded.get();
if (map == null) if (map == null)
{ {
map = new HashMap<LookupKey, Lookup>(); map = new HashMap<LookupKey, Lookup>();
} }
map.put(key, lookup); map.put(key, lookup);
// Remove any corresponding entry from the purge list.
Set<LookupKey> purged = fToBePurged.get();
if (purged == null)
{
return;
}
purged.remove(key);
} }
/** /**
* Called when a transaction has successfully committed, * Called when a transaction has successfully committed,
* to make lookups from the transaction available to other transactions. * to make lookups from the transaction available to other transactions.
*/ */
public synchronized void commitLookups() public synchronized void onCommit()
{ {
Map<LookupKey, Lookup> map = fToBeAdded.get(); // First get rid of all entries purged by the transaction.
if (map == null) Set<LookupKey> purged = fToBePurged.get();
if (purged != null)
{
purgeEntries(purged);
}
// Null out the thread local.
fToBePurged.set(null);
// Get and go through the transaction's added list.
Map<LookupKey, Lookup> added = fToBeAdded.get();
if (added == null)
{ {
return; return;
} }
for (Map.Entry<LookupKey, Lookup> entry : map.entrySet()) for (Map.Entry<LookupKey, Lookup> entry : added.entrySet())
{ {
LookupKey key = entry.getKey(); LookupKey key = entry.getKey();
Lookup lookup = entry.getValue(); Lookup lookup = entry.getValue();
// If the cache already has the key, remove it.
if (fCache.containsKey(key)) if (fCache.containsKey(key))
{ {
fCache.remove(key); fCache.remove(key);
@@ -267,10 +322,13 @@ public class LookupCache
fInverseTimeStamps.remove(key); fInverseTimeStamps.remove(key);
fTimeStamps.remove(oldTime); fTimeStamps.remove(oldTime);
} }
// Add the entry.
long timeStamp = fTimeStamp++; long timeStamp = fTimeStamp++;
fTimeStamps.put(timeStamp, key); fTimeStamps.put(timeStamp, key);
fInverseTimeStamps.put(key, timeStamp); fInverseTimeStamps.put(key, timeStamp);
fCache.put(key, lookup); fCache.put(key, lookup);
// Check if we're over the limit and purge the
// LRU entry if we are.
if (fCache.size() > fMaxSize) if (fCache.size() > fMaxSize)
{ {
// Get rid of the oldest entry. // Get rid of the oldest entry.
@@ -280,14 +338,15 @@ public class LookupCache
fCache.remove(old); fCache.remove(old);
} }
} }
// Null out the ThreadLocal.
fToBeAdded.set(null); fToBeAdded.set(null);
} }
/** /**
* Remove a List of entries. * Remove a Set of entries.
* @param keys The List of entries. * @param keys The Set of entries.
*/ */
private void purgeEntries(List<LookupKey> keys) private void purgeEntries(Set<LookupKey> keys)
{ {
for (LookupKey key : keys) for (LookupKey key : keys)
{ {
@@ -305,17 +364,39 @@ public class LookupCache
*/ */
public synchronized void onWrite(String storeName) public synchronized void onWrite(String storeName)
{ {
List<LookupKey> toDelete = new ArrayList<LookupKey>(); // Get or make up the purged Set for this transaction.
Set<LookupKey> purged = fToBePurged.get();
if (purged == null)
{
purged = new HashSet<LookupKey>();
fToBePurged.set(purged);
}
// Invalidate if it's a read lookup in the store, or
// any read lookup is it's layered.
for (Map.Entry<LookupKey, Lookup> entry : fCache.entrySet()) for (Map.Entry<LookupKey, Lookup> entry : fCache.entrySet())
{ {
if ((entry.getKey().getStoreName().equals(storeName) && if ((entry.getKey().getStoreName().equals(storeName) &&
!entry.getKey().isWrite()) || !entry.getKey().isWrite()) ||
(!entry.getKey().isWrite() && entry.getValue().isLayered())) (!entry.getKey().isWrite() && entry.getValue().isLayered()))
{ {
toDelete.add(entry.getKey()); purged.add(entry.getKey());
}
}
// Remove entries from the added set using the same criteria.
Map<LookupKey, Lookup> added = fToBeAdded.get();
if (added == null)
{
return;
}
for (Map.Entry<LookupKey, Lookup> entry : added.entrySet())
{
if ((entry.getKey().getStoreName().equals(storeName) &&
!entry.getKey().isWrite()) ||
(!entry.getKey().isWrite() && entry.getValue().isLayered()))
{
added.remove(entry.getKey());
} }
} }
purgeEntries(toDelete);
} }
/** /**
@@ -324,16 +405,36 @@ public class LookupCache
*/ */
public synchronized void onDelete(String storeName) public synchronized void onDelete(String storeName)
{ {
List<LookupKey> toDelete = new ArrayList<LookupKey>(); // Get or make up a fresh purged Set.
Set<LookupKey> purged = fToBePurged.get();
if (purged == null)
{
purged = new HashSet<LookupKey>();
fToBePurged.set(purged);
}
// Invalidate any entries that are in the store or are layered lookups.
for (Map.Entry<LookupKey, Lookup> entry : fCache.entrySet()) for (Map.Entry<LookupKey, Lookup> entry : fCache.entrySet())
{ {
if (entry.getKey().getStoreName().equals(storeName) || if (entry.getKey().getStoreName().equals(storeName) ||
entry.getValue().isLayered()) entry.getValue().isLayered())
{ {
toDelete.add(entry.getKey()); purged.add(entry.getKey());
}
}
// Get rid of any similarly matching elements in the added list.
Map<LookupKey, Lookup> added = fToBeAdded.get();
if (added == null)
{
return;
}
for (Map.Entry<LookupKey, Lookup> entry : added.entrySet())
{
if (entry.getKey().getStoreName().equals(storeName) ||
entry.getValue().isLayered())
{
added.remove(entry.getKey());
} }
} }
purgeEntries(toDelete);
} }
/** /**
@@ -342,28 +443,48 @@ public class LookupCache
*/ */
public synchronized void onSnapshot(String storeName) public synchronized void onSnapshot(String storeName)
{ {
List<LookupKey> toDelete = new ArrayList<LookupKey>(); // Get or make up a purged set.
Set<LookupKey> purged = fToBePurged.get();
if (purged == null)
{
purged = new HashSet<LookupKey>();
fToBePurged.set(purged);
}
// Invalidate any entries that in the store and writes or
// any layered lookups.
for (Map.Entry<LookupKey, Lookup> entry : fCache.entrySet()) for (Map.Entry<LookupKey, Lookup> entry : fCache.entrySet())
{ {
if ((entry.getKey().getStoreName().equals(storeName) && if ((entry.getKey().getStoreName().equals(storeName) &&
entry.getKey().isWrite()) || entry.getKey().isWrite()) ||
entry.getValue().isLayered()) entry.getValue().isLayered())
{ {
toDelete.add(entry.getKey()); purged.add(entry.getKey());
}
}
// Remove from the added list by the same criteria.
Map<LookupKey, Lookup> added = fToBeAdded.get();
if (added == null)
{
return;
}
for (Map.Entry<LookupKey, Lookup> entry : added.entrySet())
{
if ((entry.getKey().getStoreName().equals(storeName) &&
entry.getKey().isWrite()) ||
entry.getValue().isLayered())
{
added.remove(entry.getKey());
} }
} }
purgeEntries(toDelete);
} }
/** /**
* Called when a rollback has occurred. This invalidates the entire * Called when a rollback has occurred.
* cache. Heavy handed but quick.
*/ */
public synchronized void onRollback() public synchronized void onRollback()
{ {
fCache.clear(); // Just toss the transaction level changes.
fTimeStamps.clear();
fInverseTimeStamps.clear();
fToBeAdded.set(null); fToBeAdded.set(null);
fToBePurged.set(null);
} }
} }

View File

@@ -5,7 +5,6 @@ package org.alfresco.repo.transaction;
import java.util.Random; import java.util.Random;
import org.alfresco.error.AlfrescoRuntimeException;
import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation; import org.aopalliance.intercept.MethodInvocation;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -104,22 +103,27 @@ public class RetryingTransactionAdvice implements MethodInterceptor
} }
return result; return result;
} }
catch (Throwable e) catch (RuntimeException e)
{ {
if (txn != null && isNewTxn && !txn.isCompleted()) if (txn != null && isNewTxn && !txn.isCompleted())
{ {
fTxnManager.rollback(txn); fTxnManager.rollback(txn);
} }
if (e instanceof ConcurrencyFailureException ||
e instanceof DeadlockLoserDataAccessException ||
e instanceof StaleObjectStateException ||
e instanceof LockAcquisitionException)
{
if (!isNewTxn) if (!isNewTxn)
{ {
throw (RuntimeException)e; throw e;
} }
lastException = (RuntimeException)e; lastException = e;
Throwable t = e;
boolean shouldRetry = false;
while (t != null)
{
if (t instanceof ConcurrencyFailureException ||
t instanceof DeadlockLoserDataAccessException ||
t instanceof StaleObjectStateException ||
t instanceof LockAcquisitionException)
{
shouldRetry = true;
try try
{ {
Thread.sleep(fRandom.nextInt(500 * count + 500)); Thread.sleep(fRandom.nextInt(500 * count + 500));
@@ -128,13 +132,15 @@ public class RetryingTransactionAdvice implements MethodInterceptor
{ {
// Do nothing. // Do nothing.
} }
break;
}
t = t.getCause();
}
if (shouldRetry)
{
continue; continue;
} }
if (e instanceof RuntimeException) throw e;
{
throw (RuntimeException)e;
}
throw new AlfrescoRuntimeException("Failure in Transaction.", e);
} }
} }
fgLogger.error("Txn Failed after " + fMaxRetries + " retries:", lastException); fgLogger.error("Txn Failed after " + fMaxRetries + " retries:", lastException);

View File

@@ -5,7 +5,6 @@ package org.alfresco.repo.transaction;
import java.util.Random; import java.util.Random;
import javax.transaction.RollbackException;
import javax.transaction.Status; import javax.transaction.Status;
import javax.transaction.SystemException; import javax.transaction.SystemException;
import javax.transaction.UserTransaction; import javax.transaction.UserTransaction;
@@ -159,20 +158,19 @@ public class RetryingTransactionHelper
throw new AlfrescoRuntimeException("Failure during rollback.", e1); throw new AlfrescoRuntimeException("Failure during rollback.", e1);
} }
} }
// This handles the case of an unexpected rollback in lastException = (e instanceof RuntimeException) ?
// the UserTransaction. (RuntimeException)e : new AlfrescoRuntimeException("Unknown Exception in Transaction.", e);
if (e instanceof RollbackException) Throwable t = e;
boolean shouldRetry = false;
while (t != null)
{ {
RollbackException re = (RollbackException)e;
e = re.getCause();
}
// These are the 'OK' exceptions. These mean we can retry. // These are the 'OK' exceptions. These mean we can retry.
if (e instanceof ConcurrencyFailureException || if (t instanceof ConcurrencyFailureException ||
e instanceof DeadlockLoserDataAccessException || t instanceof DeadlockLoserDataAccessException ||
e instanceof StaleObjectStateException || t instanceof StaleObjectStateException ||
e instanceof LockAcquisitionException) t instanceof LockAcquisitionException)
{ {
lastException = (RuntimeException)e; shouldRetry = true;
// Sleep a random amount of time before retrying. // Sleep a random amount of time before retrying.
// The sleep interval increases with the number of retries. // The sleep interval increases with the number of retries.
try try
@@ -183,6 +181,12 @@ public class RetryingTransactionHelper
{ {
// Do nothing. // Do nothing.
} }
break;
}
t = t.getCause();
}
if (shouldRetry)
{
continue; continue;
} }
// It was a 'bad' exception. // It was a 'bad' exception.