From d9a20c4e55cbae7e3800e110ac66004befd772cb Mon Sep 17 00:00:00 2001 From: Britt Park Date: Fri, 15 Dec 2006 06:42:50 +0000 Subject: [PATCH] More fussing with lookup cache. It seems a hair faster. Some cleanup of the retrying transaction stuff. Minor mods to one stress test to account for changes to retrying transactions. git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@4616 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261 --- .../repo/avm/AVMLookupCacheListener.java | 2 +- .../org/alfresco/repo/avm/AVMStressTestP.java | 8 +- .../java/org/alfresco/repo/avm/AVMTester.java | 15 +- .../org/alfresco/repo/avm/LookupCache.java | 177 +++++++++++++++--- .../RetryingTransactionAdvice.java | 50 ++--- .../RetryingTransactionHelper.java | 48 ++--- 6 files changed, 214 insertions(+), 86 deletions(-) diff --git a/source/java/org/alfresco/repo/avm/AVMLookupCacheListener.java b/source/java/org/alfresco/repo/avm/AVMLookupCacheListener.java index dfd7402125..8b1aa5f4ef 100644 --- a/source/java/org/alfresco/repo/avm/AVMLookupCacheListener.java +++ b/source/java/org/alfresco/repo/avm/AVMLookupCacheListener.java @@ -48,6 +48,6 @@ public class AVMLookupCacheListener extends TransactionListenerAdapter @Override public void afterCommit() { - fLookupCache.commitLookups(); + fLookupCache.onCommit(); } } diff --git a/source/java/org/alfresco/repo/avm/AVMStressTestP.java b/source/java/org/alfresco/repo/avm/AVMStressTestP.java index bfb522809b..eeb3510856 100644 --- a/source/java/org/alfresco/repo/avm/AVMStressTestP.java +++ b/source/java/org/alfresco/repo/avm/AVMStressTestP.java @@ -35,8 +35,8 @@ public class AVMStressTestP extends AVMServiceTestBase { try { - int nCopies = 4; - int nThreads = 2; + int nCopies = 8; + int nThreads = 4; BulkLoader loader = new BulkLoader(); loader.setAvmService(fService); long start = System.currentTimeMillis(); @@ -52,9 +52,9 @@ public class AVMStressTestP extends AVMServiceTestBase for (int i = 0; i < nThreads; i++) { AVMTester tester - = new AVMTester(800, // create file. + = new AVMTester(400, // create file. 20, // create dir, - 0, // rename + 5, // rename 5, // create layered dir 5, // create layered file 10, // remove node diff --git a/source/java/org/alfresco/repo/avm/AVMTester.java b/source/java/org/alfresco/repo/avm/AVMTester.java index 105a78d467..ed6dca3a33 100644 --- a/source/java/org/alfresco/repo/avm/AVMTester.java +++ b/source/java/org/alfresco/repo/avm/AVMTester.java @@ -27,13 +27,10 @@ import java.util.Map; import java.util.Random; import java.util.Set; -import org.alfresco.error.AlfrescoRuntimeException; import org.alfresco.service.cmr.avm.AVMException; import org.alfresco.service.cmr.avm.AVMNodeDescriptor; import org.alfresco.service.cmr.avm.AVMService; -import org.alfresco.service.cmr.repository.InvalidNodeRefException; -import org.hibernate.HibernateException; -import org.springframework.dao.ConcurrencyFailureException; +import org.alfresco.service.cmr.repository.ContentIOException; /** * This is a Runnable which randomly performs operations on an AVM Repository. @@ -572,11 +569,11 @@ class AVMTester implements Runnable private void handleException(Exception e) { e.printStackTrace(System.err); - if (e instanceof AVMException || - e instanceof AlfrescoRuntimeException || - e instanceof ConcurrencyFailureException || - e instanceof HibernateException || - e instanceof InvalidNodeRefException) + if (e instanceof AVMException) + { + return; + } + if (e instanceof ContentIOException) { return; } diff --git a/source/java/org/alfresco/repo/avm/LookupCache.java b/source/java/org/alfresco/repo/avm/LookupCache.java index cde721d3a0..1c80ee5948 100644 --- a/source/java/org/alfresco/repo/avm/LookupCache.java +++ b/source/java/org/alfresco/repo/avm/LookupCache.java @@ -5,8 +5,10 @@ package org.alfresco.repo.avm; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -20,13 +22,19 @@ import org.apache.log4j.Logger; public class LookupCache { private static Logger fgLogger = Logger.getLogger(LookupCache.class); - + + /** * Per transaction lookup results to be added to the cache on successful * commit. */ private ThreadLocal> fToBeAdded; + /** + * Per transaction set of invalidated lookup keys. + */ + private ThreadLocal> fToBePurged; + /** * The Map of of keys to lookups. */ @@ -71,6 +79,7 @@ public class LookupCache fTimeStamps = new TreeMap(); fInverseTimeStamps = new HashMap(); fToBeAdded = new ThreadLocal>(); + fToBePurged = new ThreadLocal>(); fTimeStamp = 0L; fMaxSize = 100; } @@ -114,7 +123,9 @@ public class LookupCache public Lookup lookup(AVMStore store, int version, SimplePath path, boolean write, boolean includeDeleted) { + // Create a key object. LookupKey key = new LookupKey(version, path, store.getName(), write, includeDeleted); + // Is it in the cache? Lookup found = findInCache(key); if (found != null) { @@ -186,43 +197,69 @@ public class LookupCache */ private synchronized Lookup findInCache(LookupKey key) { - Map map = fToBeAdded.get(); - Lookup found = (map != null) ? map.get(key) : null; + // Get the current transaction's purged set. + Set purged = fToBePurged.get(); + // Get the current transaction's added map. + Map added = fToBeAdded.get(); + // See if it's cached in the transaction. + Lookup found = (added != null) ? added.get(key) : null; + // It's not. if (found == null) { + // If it's been purged in the transaction it is + // a miss. + if (purged != null && purged.contains(key)) + { + return null; + } found = fCache.get(key); } + // Despite the odds, we found a hit. if (found != null) { + // Get a freshened Lookup. Lookup result = new Lookup(found, fAVMNodeDAO, fAVMStoreDAO); + // Check that nothing horrible is wrong. This should + // be assertible, but I'll leave the check in for now. if (!result.isValid()) { fgLogger.error("Invalid entry in cache: " + key); - onRollback(); return null; } + // Prepare the cache for a timestamp update on commit. updateCache(key, found); return result; } // Alternatively for a read lookup a write can match. if (!key.isWrite()) { + // Make a copy of the key and set it to 'write' LookupKey newKey = new LookupKey(key); newKey.setWrite(true); - found = (map != null) ? map.get(newKey) : null; + // Is it in the transaction's cache? + found = (added != null) ? added.get(newKey) : null; + // If not. if (found == null) { + // If it's been purged it's a miss. + if (purged != null && purged.contains(newKey)) + { + return null; + } found = fCache.get(newKey); } if (found != null) { + // We found it. Make a freshened copy of the Lookup. Lookup result = new Lookup(found, fAVMNodeDAO, fAVMStoreDAO); + // Check for badness. This should be assertible but I'll + // leave the check in for now. if (!result.isValid()) { fgLogger.error("Invalid entry in cache: " + newKey); - onRollback(); return null; } + // Prepare the cache to update time stamp. updateCache(newKey, found); return result; } @@ -237,29 +274,47 @@ public class LookupCache */ private void updateCache(LookupKey key, Lookup lookup) { + // First, put it in the transaction scoped cache. Map map = fToBeAdded.get(); if (map == null) { map = new HashMap(); } map.put(key, lookup); + // Remove any corresponding entry from the purge list. + Set purged = fToBePurged.get(); + if (purged == null) + { + return; + } + purged.remove(key); } /** * Called when a transaction has successfully committed, * to make lookups from the transaction available to other transactions. */ - public synchronized void commitLookups() + public synchronized void onCommit() { - Map map = fToBeAdded.get(); - if (map == null) + // First get rid of all entries purged by the transaction. + Set purged = fToBePurged.get(); + if (purged != null) + { + purgeEntries(purged); + } + // Null out the thread local. + fToBePurged.set(null); + // Get and go through the transaction's added list. + Map added = fToBeAdded.get(); + if (added == null) { return; } - for (Map.Entry entry : map.entrySet()) + for (Map.Entry entry : added.entrySet()) { LookupKey key = entry.getKey(); Lookup lookup = entry.getValue(); + // If the cache already has the key, remove it. if (fCache.containsKey(key)) { fCache.remove(key); @@ -267,10 +322,13 @@ public class LookupCache fInverseTimeStamps.remove(key); fTimeStamps.remove(oldTime); } + // Add the entry. long timeStamp = fTimeStamp++; fTimeStamps.put(timeStamp, key); fInverseTimeStamps.put(key, timeStamp); fCache.put(key, lookup); + // Check if we're over the limit and purge the + // LRU entry if we are. if (fCache.size() > fMaxSize) { // Get rid of the oldest entry. @@ -280,14 +338,15 @@ public class LookupCache fCache.remove(old); } } + // Null out the ThreadLocal. fToBeAdded.set(null); } /** - * Remove a List of entries. - * @param keys The List of entries. + * Remove a Set of entries. + * @param keys The Set of entries. */ - private void purgeEntries(List keys) + private void purgeEntries(Set keys) { for (LookupKey key : keys) { @@ -305,17 +364,39 @@ public class LookupCache */ public synchronized void onWrite(String storeName) { - List toDelete = new ArrayList(); + // Get or make up the purged Set for this transaction. + Set purged = fToBePurged.get(); + if (purged == null) + { + purged = new HashSet(); + fToBePurged.set(purged); + } + // Invalidate if it's a read lookup in the store, or + // any read lookup is it's layered. for (Map.Entry entry : fCache.entrySet()) { if ((entry.getKey().getStoreName().equals(storeName) && !entry.getKey().isWrite()) || (!entry.getKey().isWrite() && entry.getValue().isLayered())) { - toDelete.add(entry.getKey()); + purged.add(entry.getKey()); + } + } + // Remove entries from the added set using the same criteria. + Map added = fToBeAdded.get(); + if (added == null) + { + return; + } + for (Map.Entry entry : added.entrySet()) + { + if ((entry.getKey().getStoreName().equals(storeName) && + !entry.getKey().isWrite()) || + (!entry.getKey().isWrite() && entry.getValue().isLayered())) + { + added.remove(entry.getKey()); } } - purgeEntries(toDelete); } /** @@ -324,16 +405,36 @@ public class LookupCache */ public synchronized void onDelete(String storeName) { - List toDelete = new ArrayList(); + // Get or make up a fresh purged Set. + Set purged = fToBePurged.get(); + if (purged == null) + { + purged = new HashSet(); + fToBePurged.set(purged); + } + // Invalidate any entries that are in the store or are layered lookups. for (Map.Entry entry : fCache.entrySet()) { if (entry.getKey().getStoreName().equals(storeName) || entry.getValue().isLayered()) { - toDelete.add(entry.getKey()); + purged.add(entry.getKey()); + } + } + // Get rid of any similarly matching elements in the added list. + Map added = fToBeAdded.get(); + if (added == null) + { + return; + } + for (Map.Entry entry : added.entrySet()) + { + if (entry.getKey().getStoreName().equals(storeName) || + entry.getValue().isLayered()) + { + added.remove(entry.getKey()); } } - purgeEntries(toDelete); } /** @@ -342,28 +443,48 @@ public class LookupCache */ public synchronized void onSnapshot(String storeName) { - List toDelete = new ArrayList(); + // Get or make up a purged set. + Set purged = fToBePurged.get(); + if (purged == null) + { + purged = new HashSet(); + fToBePurged.set(purged); + } + // Invalidate any entries that in the store and writes or + // any layered lookups. for (Map.Entry entry : fCache.entrySet()) { if ((entry.getKey().getStoreName().equals(storeName) && entry.getKey().isWrite()) || - entry.getValue().isLayered()) + entry.getValue().isLayered()) { - toDelete.add(entry.getKey()); + purged.add(entry.getKey()); + } + } + // Remove from the added list by the same criteria. + Map added = fToBeAdded.get(); + if (added == null) + { + return; + } + for (Map.Entry entry : added.entrySet()) + { + if ((entry.getKey().getStoreName().equals(storeName) && + entry.getKey().isWrite()) || + entry.getValue().isLayered()) + { + added.remove(entry.getKey()); } } - purgeEntries(toDelete); } /** - * Called when a rollback has occurred. This invalidates the entire - * cache. Heavy handed but quick. + * Called when a rollback has occurred. */ public synchronized void onRollback() { - fCache.clear(); - fTimeStamps.clear(); - fInverseTimeStamps.clear(); + // Just toss the transaction level changes. fToBeAdded.set(null); + fToBePurged.set(null); } } diff --git a/source/java/org/alfresco/repo/transaction/RetryingTransactionAdvice.java b/source/java/org/alfresco/repo/transaction/RetryingTransactionAdvice.java index 2032648874..1c75af998b 100644 --- a/source/java/org/alfresco/repo/transaction/RetryingTransactionAdvice.java +++ b/source/java/org/alfresco/repo/transaction/RetryingTransactionAdvice.java @@ -5,7 +5,6 @@ package org.alfresco.repo.transaction; import java.util.Random; -import org.alfresco.error.AlfrescoRuntimeException; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.log4j.Logger; @@ -104,37 +103,44 @@ public class RetryingTransactionAdvice implements MethodInterceptor } return result; } - catch (Throwable e) + catch (RuntimeException e) { if (txn != null && isNewTxn && !txn.isCompleted()) { fTxnManager.rollback(txn); } - if (e instanceof ConcurrencyFailureException || - e instanceof DeadlockLoserDataAccessException || - e instanceof StaleObjectStateException || - e instanceof LockAcquisitionException) + if (!isNewTxn) { - if (!isNewTxn) + throw e; + } + lastException = e; + Throwable t = e; + boolean shouldRetry = false; + while (t != null) + { + if (t instanceof ConcurrencyFailureException || + t instanceof DeadlockLoserDataAccessException || + t instanceof StaleObjectStateException || + t instanceof LockAcquisitionException) { - throw (RuntimeException)e; - } - lastException = (RuntimeException)e; - try - { - Thread.sleep(fRandom.nextInt(500 * count + 500)); - } - catch (InterruptedException ie) - { - // Do nothing. + shouldRetry = true; + try + { + Thread.sleep(fRandom.nextInt(500 * count + 500)); + } + catch (InterruptedException ie) + { + // Do nothing. + } + break; } + t = t.getCause(); + } + if (shouldRetry) + { continue; } - if (e instanceof RuntimeException) - { - throw (RuntimeException)e; - } - throw new AlfrescoRuntimeException("Failure in Transaction.", e); + throw e; } } fgLogger.error("Txn Failed after " + fMaxRetries + " retries:", lastException); diff --git a/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java b/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java index af7b3c7b56..38ecf1cc9b 100644 --- a/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java +++ b/source/java/org/alfresco/repo/transaction/RetryingTransactionHelper.java @@ -5,7 +5,6 @@ package org.alfresco.repo.transaction; import java.util.Random; -import javax.transaction.RollbackException; import javax.transaction.Status; import javax.transaction.SystemException; import javax.transaction.UserTransaction; @@ -159,30 +158,35 @@ public class RetryingTransactionHelper throw new AlfrescoRuntimeException("Failure during rollback.", e1); } } - // This handles the case of an unexpected rollback in - // the UserTransaction. - if (e instanceof RollbackException) + lastException = (e instanceof RuntimeException) ? + (RuntimeException)e : new AlfrescoRuntimeException("Unknown Exception in Transaction.", e); + Throwable t = e; + boolean shouldRetry = false; + while (t != null) { - RollbackException re = (RollbackException)e; - e = re.getCause(); + // These are the 'OK' exceptions. These mean we can retry. + if (t instanceof ConcurrencyFailureException || + t instanceof DeadlockLoserDataAccessException || + t instanceof StaleObjectStateException || + t instanceof LockAcquisitionException) + { + shouldRetry = true; + // Sleep a random amount of time before retrying. + // The sleep interval increases with the number of retries. + try + { + Thread.sleep(fRandom.nextInt(500 * count + 500)); + } + catch (InterruptedException ie) + { + // Do nothing. + } + break; + } + t = t.getCause(); } - // These are the 'OK' exceptions. These mean we can retry. - if (e instanceof ConcurrencyFailureException || - e instanceof DeadlockLoserDataAccessException || - e instanceof StaleObjectStateException || - e instanceof LockAcquisitionException) + if (shouldRetry) { - lastException = (RuntimeException)e; - // Sleep a random amount of time before retrying. - // The sleep interval increases with the number of retries. - try - { - Thread.sleep(fRandom.nextInt(500 * count + 500)); - } - catch (InterruptedException ie) - { - // Do nothing. - } continue; } // It was a 'bad' exception.