Merged DEV/DEREK_2.1 to HEAD

- Removed Node.parentAssocs mapping
 - Added parentAssocs transactional cache to NodeDAO
 - Added concurrency detection to TransactionalCache
 - Fixed cluster sample config


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@5948 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Derek Hulley
2007-06-13 23:50:14 +00:00
parent 36ea25f822
commit fb069d1680
19 changed files with 940 additions and 186 deletions

View File

@@ -41,6 +41,7 @@ import org.alfresco.util.EqualsHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.util.Assert;
/**
@@ -54,12 +55,13 @@ import org.springframework.util.Assert;
* directly with the shared cache when no transaction is present. There is
* virtually no overhead when running out-of-transaction.
* <p>
* 3 caches are maintained.
* <ul>
* <li>Shared backing cache that should only be accessed by instances of this class</li>
* <li>Lazily created cache of updates made during the transaction</li>
* <li>Lazily created cache of deletions made during the transaction</li>
* </ul>
* The first phase of the commit ensures that any values written to the cache in the
* current transaction are not already superceded by values in the shared cache. In
* this case, the transaction is failed for concurrency reasons and will have to retry.
* The second phase occurs post-commit. We are sure that the transaction committed
* correctly, but things may have changed in the cache between the commit and post-commit.
* If this is the case, then the offending values are merely removed from the shared
* cache.
* <p>
* When the cache is {@link #clear() cleared}, a flag is set on the transaction.
* The shared cache, instead of being cleared itself, is just ignored for the remainder
@@ -69,7 +71,8 @@ import org.springframework.util.Assert;
* Because there is a limited amount of space available to the in-transaction caches,
* when either of these becomes full, the cleared flag is set. This ensures that
* the shared cache will not have stale data in the event of the transaction-local
* caches dropping items.
* caches dropping items. It is therefore important to size the transactional caches
* correctly.
*
* @author Derek Hulley
*/
@@ -77,15 +80,15 @@ public class TransactionalCache<K extends Serializable, V extends Object>
implements SimpleCache<K, V>, TransactionListener, InitializingBean
{
private static final String RESOURCE_KEY_TXN_DATA = "TransactionalCache.TxnData";
private static final String VALUE_DELETE = "TransactionalCache.DeleteMarker";
private static Log logger = LogFactory.getLog(TransactionalCache.class);
private static boolean isDebugEnabled = logger.isDebugEnabled();
/** a name used to uniquely identify the transactional caches */
private String name;
/** the shared cache that will get updated after commits */
private SimpleCache<Serializable, Object> sharedCache;
private SimpleCache<Serializable, V> sharedCache;
/** the manager to control Ehcache caches */
private CacheManager cacheManager;
@@ -96,6 +99,13 @@ public class TransactionalCache<K extends Serializable, V extends Object>
/** a unique string identifying this instance when binding resources */
private String resourceKeyTxnData;
/**
* Public constructor.
*/
public TransactionalCache()
{
}
/**
* @see #setName(String)
*/
@@ -133,7 +143,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
*
* @param sharedCache
*/
public void setSharedCache(SimpleCache<Serializable, Object> sharedCache)
public void setSharedCache(SimpleCache<Serializable, V> sharedCache)
{
this.sharedCache = sharedCache;
}
@@ -284,13 +294,13 @@ public class TransactionalCache<K extends Serializable, V extends Object>
TransactionData txnData = getTransactionData();
try
{
if (!txnData.isClearOn) // deletions cache is still reliable
if (!txnData.isClearOn) // deletions cache only useful before a clear
{
// check to see if the key is present in the transaction's removed items
if (txnData.removedItemsCache.get(key) != null)
{
// it has been removed in this transaction
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("get returning null - item has been removed from transactional cache: \n" +
" cache: " + this + "\n" +
@@ -304,15 +314,17 @@ public class TransactionalCache<K extends Serializable, V extends Object>
Element element = txnData.updatedItemsCache.get(key);
if (element != null)
{
CacheBucket<V> bucket = (CacheBucket<V>) element.getValue();
V value = bucket.getValue();
// element was found in transaction-specific updates/additions
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("Found item in transactional cache: \n" +
" cache: " + this + "\n" +
" key: " + key + "\n" +
" value: " + element.getValue());
" value: " + value);
}
return (V) element.getValue();
return value;
}
}
catch (CacheException e)
@@ -325,19 +337,20 @@ public class TransactionalCache<K extends Serializable, V extends Object>
// no value found - must we ignore the shared cache?
if (!ignoreSharedCache)
{
V value = sharedCache.get(key);
// go to the shared cache
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("No value found in transaction - fetching instance from shared cache: \n" +
" cache: " + this + "\n" +
" key: " + key + "\n" +
" value: " + sharedCache.get(key));
" value: " + value);
}
return (V) sharedCache.get(key);
return value;
}
else // ignore shared cache
{
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("No value found in transaction and ignoring shared cache: \n" +
" cache: " + this + "\n" +
@@ -361,7 +374,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
// no transaction
sharedCache.put(key, value);
// done
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("No transaction - adding item direct to shared cache: \n" +
" cache: " + this + "\n" +
@@ -381,12 +394,24 @@ public class TransactionalCache<K extends Serializable, V extends Object>
// shared cache needs to be ignored for the rest of the transaction.
txnData.isClearOn = true;
}
Element element = new Element(key, value);
CacheBucket<V> bucket = null;
if (sharedCache.contains(key))
{
V existingValue = sharedCache.get(key);
// The value needs to be kept for later checks
bucket = new UpdateCacheBucket<V>(existingValue, value);
}
else
{
// The value didn't exist before
bucket = new NewCacheBucket<V>(value);
}
Element element = new Element(key, bucket);
txnData.updatedItemsCache.put(element);
// remove the item from the removed cache, if present
txnData.removedItemsCache.remove(key);
// done
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("In transaction - adding item direct to transactional update cache: \n" +
" cache: " + this + "\n" +
@@ -410,7 +435,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
// no transaction
sharedCache.remove(key);
// done
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("No transaction - removing item from shared cache: \n" +
" cache: " + this + "\n" +
@@ -423,7 +448,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
// is the shared cache going to be cleared?
if (txnData.isClearOn)
{
// don't store removals
// don't store removals if we're just going to clear it all out later
}
else
{
@@ -434,7 +459,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
// data by clearing the shared cache after the transaction. Also, the
// shared cache needs to be ignored for the rest of the transaction.
txnData.isClearOn = true;
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("In transaction - removal cache reach capacity reached: \n" +
" cache: " + this + "\n" +
@@ -443,15 +468,24 @@ public class TransactionalCache<K extends Serializable, V extends Object>
}
else
{
// add it from the removed cache for this txn
Element element = new Element(key, VALUE_DELETE);
txnData.removedItemsCache.put(element);
V existingValue = sharedCache.get(key);
if (existingValue == null)
{
// There is no point doing a remove for a value that doesn't exist
}
else
{
// Create a bucket to remove the value from the shared cache
CacheBucket<V> removeBucket = new RemoveCacheBucket<V>(existingValue);
Element element = new Element(key, removeBucket);
txnData.removedItemsCache.put(element);
}
}
}
// remove the item from the udpated cache, if present
txnData.updatedItemsCache.remove(key);
// done
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("In transaction - adding item direct to transactional removed cache: \n" +
" cache: " + this + "\n" +
@@ -468,7 +502,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
// clear local caches
if (AlfrescoTransactionSupport.getTransactionId() != null)
{
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("In transaction clearing cache: \n" +
" cache: " + this + "\n" +
@@ -485,7 +519,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
}
else // no transaction
{
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("No transaction - clearing shared cache");
}
@@ -501,12 +535,40 @@ public class TransactionalCache<K extends Serializable, V extends Object>
{
}
public void beforeCommit(boolean readOnly)
public void beforeCompletion()
{
}
public void beforeCompletion()
/**
* Check that the cache used is not out of date with the shared cache.
* Note that the clear flag is ignored as this would have removed all
* entries from the local caches anyway.
*/
@SuppressWarnings("unchecked")
public void beforeCommit(boolean readOnly)
{
if (isDebugEnabled)
{
logger.debug("Processing pre-commit");
}
TransactionData txnData = getTransactionData();
// Process all the updates or new entries
for (Object obj : txnData.updatedItemsCache.getKeys())
{
Serializable key = (Serializable) obj;
Element element = txnData.updatedItemsCache.get(key);
CacheBucket<V> bucket = (CacheBucket<V>) element.getValue();
bucket.doPreCommit(sharedCache, key);
}
// Process all the removals
for (Object obj : txnData.removedItemsCache.getKeys())
{
Serializable key = (Serializable) obj;
Element element = txnData.removedItemsCache.get(key);
CacheBucket<V> bucket = (CacheBucket<V>) element.getValue();
bucket.doPreCommit(sharedCache, key);
}
}
/**
@@ -515,9 +577,9 @@ public class TransactionalCache<K extends Serializable, V extends Object>
@SuppressWarnings("unchecked")
public void afterCommit()
{
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("Processing end of transaction commit");
logger.debug("Processing post-commit");
}
TransactionData txnData = getTransactionData();
@@ -527,7 +589,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
{
// clear shared cache
sharedCache.clear();
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("Clear notification recieved at end of transaction - clearing shared cache");
}
@@ -542,7 +604,7 @@ public class TransactionalCache<K extends Serializable, V extends Object>
{
sharedCache.remove(key);
}
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("Removed " + keys.size() + " values from shared cache");
}
@@ -553,9 +615,10 @@ public class TransactionalCache<K extends Serializable, V extends Object>
for (Serializable key : keys)
{
Element element = txnData.updatedItemsCache.get(key);
sharedCache.put(key, element.getValue());
CacheBucket<V> bucket = (CacheBucket<V>) element.getObjectValue();
sharedCache.put(key, bucket.getValue());
}
if (logger.isDebugEnabled())
if (isDebugEnabled)
{
logger.debug("Added " + keys.size() + " values to shared cache");
}
@@ -591,6 +654,206 @@ public class TransactionalCache<K extends Serializable, V extends Object>
cacheManager.removeCache(txnData.removedItemsCache.getName());
}
/**
* Interface for the transactional cache buckets. These hold the actual values along
* with some state and behaviour around writing from the in-transaction caches to the
* shared.
*
* @author Derek Hulley
*/
private interface CacheBucket<BV extends Object> extends Serializable
{
/**
* @return Returns the bucket's value
*/
BV getValue();
/**
* Check that any new cache values have not been superceded by new values in the shared cache.
*
* @param sharedCache the cache to check
* @param key the key that the bucket was stored against
*/
void doPreCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key);
/**
* Flush the current bucket to the shared cache as far as possible.
*
* @param sharedCache the cache to flush to
* @param key the key that the bucket was stored against
*/
public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key);
}
/**
* A bucket class to hold values for the caches.<br/>
* The cache ID and timestamp of the bucket is stored to ensure cache consistency.
*
* @author Derek Hulley
*/
private class NewCacheBucket<BV> implements CacheBucket<BV>
{
private static final long serialVersionUID = -8536386687213957425L;
private final BV value;
public NewCacheBucket(BV value)
{
this.value = value;
}
public BV getValue()
{
return value;
}
public void doPreCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{
if (sharedCache.contains(key))
{
// The shared cache has a value where there wasn't one before. More than likely,
// this transaction would have used or modified that shared value.
throw new ConcurrencyFailureException(
"New cache bucket found new shared cache entry: \n" +
" Cache: " + name + "\n" +
" Key: " + key);
}
}
public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{
if (sharedCache.contains(key))
{
// The shared cache has a value where there wasn't one before.
// Just lose both of them.
sharedCache.remove(key);
}
else
{
// There is nothing in the shared cache, so add this entry
sharedCache.put(key, getValue());
}
}
}
/**
* Data holder to keep track of a cached value's timestamps in order to detect stale
* shared cache values. This bucket assumes the presence of a pre-existing entry in
* the shared cache.
*/
private class UpdateCacheBucket<BV> extends NewCacheBucket<BV>
{
private static final long serialVersionUID = 7885689778259779578L;
private final BV originalValue;
public UpdateCacheBucket(BV originalValue, BV value)
{
super(value);
this.originalValue = originalValue;
}
protected BV getOriginalValue()
{
return originalValue;
}
public void doPreCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{
if (sharedCache.contains(key))
{
BV sharedValue = sharedCache.get(key);
if (sharedValue == getOriginalValue())
{
// The cache entry is safe for writing
}
else
{
// The shared value has moved on since
throw new ConcurrencyFailureException(
"Update cache bucket found newer shared cache entry: \n" +
" Cache: " + name + "\n" +
" Key: " + key);
}
}
else
{
// The key was removed from the shared cache. This instance cannot update the entry.
throw new ConcurrencyFailureException(
"Update cache bucket couldn't fine entry to update: \n" +
" Cache: " + name + "\n" +
" Key: " + key);
}
}
public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{
BV sharedValue = sharedCache.get(key);
if (sharedValue != null)
{
if (sharedValue == getOriginalValue())
{
// The cache entry is safe for writing
sharedCache.put(key, getValue());
}
else
{
// The shared value has moved on since
sharedCache.remove(key);
}
}
else
{
// The shared cache no longer has a value
}
}
}
/**
* Data holder to keep track of cache removals. This bucket assumes the previous existence
* of an entry in the shared cache.
*/
private class RemoveCacheBucket<BV> extends UpdateCacheBucket<BV>
{
private static final long serialVersionUID = -7736719065158540252L;
public RemoveCacheBucket(BV originalValue)
{
super(originalValue, null);
}
public void doPreCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{
BV sharedValue = sharedCache.get(key);
if (sharedValue != null)
{
if (sharedValue == getOriginalValue())
{
// The shared cache entry is the same as the one we were flagged to remove
}
else
{
// The shared value has moved on since
throw new ConcurrencyFailureException(
"Remove cache bucket found newer shared cache entry: \n" +
" Cache: " + name + "\n" +
" Key: " + key);
}
}
else
{
// The shared cache no longer has the value. It is possible that the removal of the
// item is something that would have affected the behaviour of the current transaction.
throw new ConcurrencyFailureException(
"Remove cache bucket found new shared cache entry: \n" +
" Cache: " + name + "\n" +
" Key: " + key);
}
}
public void doPostCommit(SimpleCache<Serializable, BV> sharedCache, Serializable key)
{
if (sharedCache.contains(key))
{
// We remove the shared entry whether it has moved on or not
sharedCache.remove(key);
}
else
{
// The shared cache no longer has a value
}
}
}
/** Data holder to bind data to the transaction */
private class TransactionData
{