Refinements to RetryingTransactionHelper to make it equivalent to

executeInUserTransaction().
Changed the on close callback for write listeners to use a RetryingTransaction.
The point of this exercise is to make it possible for clients of the core server
to ignore transient resource contention failures.  CIFS, for example, will be able 
to take advantage of this, since a transient error condition currently results in a dead
share.


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@4597 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Britt Park
2006-12-13 17:03:44 +00:00
parent 4834c89f5a
commit e76fc329b6
10 changed files with 172 additions and 68 deletions

View File

@@ -36,8 +36,8 @@ public class AVMCrawlTestP extends AVMServiceTestBase
public void testCrawl()
{
int n = 4; // Number of Threads.
int m = 16; // How many multiples of content to start with.
long runTime = 3600000; // 6 hours.
int m = 12; // How many multiples of content to start with.
long runTime = 7200000; // 6 hours.
fService.purgeAVMStore("main");
BulkLoader loader = new BulkLoader();
loader.setAvmService(fService);

View File

@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.service.cmr.avm.AVMException;
import org.alfresco.service.cmr.avm.AVMNodeDescriptor;
import org.alfresco.service.cmr.avm.AVMService;
@@ -232,10 +231,6 @@ class AVMCrawler implements Runnable
{
return;
}
if (e instanceof AlfrescoRuntimeException)
{
return;
}
throw new AVMException("Failure", e);
}
}

View File

@@ -27,6 +27,7 @@ import java.nio.channels.WritableByteChannel;
import java.util.List;
import org.alfresco.error.StackTraceUtil;
import org.alfresco.repo.transaction.RetryingTransactionHelper;
import org.alfresco.repo.transaction.TransactionUtil;
import org.alfresco.service.cmr.repository.ContentAccessor;
import org.alfresco.service.cmr.repository.ContentData;
@@ -57,7 +58,8 @@ public abstract class AbstractContentAccessor implements ContentAccessor
private StackTraceElement[] traceLoggerChannelAssignTrace;
/** when set, ensures that listeners are executed within a transaction */
private TransactionService transactionService;
// private TransactionService transactionService;
private RetryingTransactionHelper transactionHelper;
private String contentUrl;
private String mimetype;
@@ -120,21 +122,26 @@ public abstract class AbstractContentAccessor implements ContentAccessor
*
* @return Returns a source of user transactions
*/
protected TransactionService getTransactionService()
{
return transactionService;
}
// protected TransactionService getTransactionService()
// {
// return transactionService;
// }
/**
* Set the transaction provider to be used by {@link ContentStreamListener listeners}.
*
* @param transactionService the transaction service to wrap callback code in
*/
public void setTransactionService(TransactionService transactionService)
{
this.transactionService = transactionService;
}
// public void setTransactionService(TransactionService transactionService)
// {
// this.transactionService = transactionService;
// }
public void setRetryingTransactionHelper(RetryingTransactionHelper helper)
{
this.transactionHelper = helper;
}
/**
* Derived classes can call this method to ensure that necessary trace logging is performed
* when the IO Channel is opened.
@@ -237,28 +244,40 @@ public abstract class AbstractContentAccessor implements ContentAccessor
// nothing to do
return;
}
TransactionUtil.TransactionWork<Object> work = new TransactionUtil.TransactionWork<Object>()
{
public Object doWork()
{
// call the listeners
for (ContentStreamListener listener : listeners)
{
listener.contentStreamClosed();
}
return null;
}
};
if (transactionService != null)
RetryingTransactionHelper.Callback cb =
new RetryingTransactionHelper.Callback()
{
// just create a transaction
TransactionUtil.executeInUserTransaction(transactionService, work);
public Object execute()
{
for (ContentStreamListener listener : listeners)
{
listener.contentStreamClosed();
}
return null;
}
};
// TransactionUtil.TransactionWork<Object> work = new TransactionUtil.TransactionWork<Object>()
// {
// public Object doWork()
// {
// // call the listeners
// for (ContentStreamListener listener : listeners)
// {
// listener.contentStreamClosed();
// }
// return null;
// }
// };
if (transactionHelper != null)
{
// Execute in transaction.
transactionHelper.doInTransaction(cb, false);
}
else
{
try
{
work.doWork();
cb.execute();
}
catch (Exception e)
{
@@ -331,28 +350,44 @@ public abstract class AbstractContentAccessor implements ContentAccessor
// nothing to do
return;
}
TransactionUtil.TransactionWork<Object> work = new TransactionUtil.TransactionWork<Object>()
// We're now doing this in a retrying transaction, which means
// that the body of execute() must be idempotent.
RetryingTransactionHelper.Callback cb =
new RetryingTransactionHelper.Callback()
{
public Object execute()
{
for (ContentStreamListener listener : listeners)
{
public Object doWork()
{
// call the listeners
for (ContentStreamListener listener : listeners)
{
listener.contentStreamClosed();
}
return null;
}
};
if (transactionService != null)
listener.contentStreamClosed();
}
return null;
}
};
// TransactionUtil.TransactionWork<Object> work = new TransactionUtil.TransactionWork<Object>()
// {
// public Object doWork()
// {
// // call the listeners
// for (ContentStreamListener listener : listeners)
// {
// listener.contentStreamClosed();
// }
// return null;
// }
// };
// We're now doing this inside a Retrying transaction.
// NB
if (transactionHelper != null)
{
// just create a transaction
TransactionUtil.executeInUserTransaction(transactionService, work);
transactionHelper.doInTransaction(cb, false);
}
else
{
try
{
work.doWork();
cb.execute();
}
catch (Exception e)
{

View File

@@ -307,7 +307,7 @@ public abstract class AbstractContentReadWriteTest extends TestCase
streamClosed[0] = true;
}
};
writer.setTransactionService(new DummyTransactionService());
writer.setRetryingTransactionHelper(null);
writer.addListener(listener);
// write some content

View File

@@ -23,7 +23,6 @@ import java.util.Map;
import java.util.Set;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.avm.AVMNodeConverter;
import org.alfresco.repo.content.ContentServicePolicies.OnContentReadPolicy;
import org.alfresco.repo.content.ContentServicePolicies.OnContentUpdatePolicy;
@@ -34,6 +33,7 @@ import org.alfresco.repo.content.transform.magick.ImageMagickContentTransformer;
import org.alfresco.repo.policy.ClassPolicyDelegate;
import org.alfresco.repo.policy.JavaBehaviour;
import org.alfresco.repo.policy.PolicyComponent;
import org.alfresco.repo.transaction.RetryingTransactionHelper;
import org.alfresco.service.cmr.avm.AVMService;
import org.alfresco.service.cmr.dictionary.DataTypeDefinition;
import org.alfresco.service.cmr.dictionary.DictionaryService;
@@ -73,6 +73,7 @@ public class RoutingContentService implements ContentService
private DictionaryService dictionaryService;
private NodeService nodeService;
private AVMService avmService;
private RetryingTransactionHelper transactionHelper;
/** a registry of all available content transformers */
private ContentTransformerRegistry transformerRegistry;
@@ -106,6 +107,11 @@ public class RoutingContentService implements ContentService
this.transactionService = transactionService;
}
public void setRetryingTransactionHelper(RetryingTransactionHelper helper)
{
this.transactionHelper = helper;
}
public void setDictionaryService(DictionaryService dictionaryService)
{
this.dictionaryService = dictionaryService;
@@ -356,9 +362,9 @@ public class RoutingContentService implements ContentService
if (update)
{
// need a listener to update the node when the stream closes
WriteStreamListener listener = new WriteStreamListener(nodeService, nodeRef, propertyQName, writer);
WriteStreamListener listener = new WriteStreamListener(nodeService, avmService, nodeRef, propertyQName, writer);
writer.addListener(listener);
writer.setTransactionService(transactionService);
writer.setRetryingTransactionHelper(transactionHelper);
}
// give back to the client
@@ -458,17 +464,20 @@ public class RoutingContentService implements ContentService
private static class WriteStreamListener implements ContentStreamListener
{
private NodeService nodeService;
private AVMService avmService;
private NodeRef nodeRef;
private QName propertyQName;
private ContentWriter writer;
public WriteStreamListener(
NodeService nodeService,
AVMService avmService,
NodeRef nodeRef,
QName propertyQName,
ContentWriter writer)
{
this.nodeService = nodeService;
this.avmService = avmService;
this.nodeRef = nodeRef;
this.propertyQName = propertyQName;
this.writer = writer;
@@ -480,10 +489,19 @@ public class RoutingContentService implements ContentService
{
// set the full content property
ContentData contentData = writer.getContentData();
nodeService.setProperty(
// Bypass NodeService for avm stores.
if (nodeRef.getStoreRef().getProtocol().equals(StoreRef.PROTOCOL_AVM))
{
Pair<Integer, String> versionPath = AVMNodeConverter.ToAVMVersionPath(nodeRef);
avmService.setContentData(versionPath.getSecond(), contentData);
}
else
{
nodeService.setProperty(
nodeRef,
propertyQName,
contentData);
}
// done
if (logger.isDebugEnabled())
{

View File

@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.repo.content.AbstractContentStore;
import org.alfresco.repo.content.ContentStore;
import org.alfresco.repo.transaction.RetryingTransactionHelper;
import org.alfresco.service.cmr.repository.ContentIOException;
import org.alfresco.service.cmr.repository.ContentReader;
import org.alfresco.service.cmr.repository.ContentStreamListener;
@@ -98,6 +99,7 @@ public class ReplicatingContentStore extends AbstractContentStore
private static Log logger = LogFactory.getLog(ReplicatingContentStore.class);
private TransactionService transactionService;
private RetryingTransactionHelper transactionHelper;
private ContentStore primaryStore;
private List<ContentStore> secondaryStores;
private boolean inbound;
@@ -130,6 +132,14 @@ public class ReplicatingContentStore extends AbstractContentStore
this.transactionService = transactionService;
}
/**
* Set the retrying transaction helper.
*/
public void setRetryingTransactionHelper(RetryingTransactionHelper helper)
{
this.transactionHelper = helper;
}
/**
* Set the primary store that content will be replicated to or from
*
@@ -291,7 +301,7 @@ public class ReplicatingContentStore extends AbstractContentStore
// attach the listener
ContentStreamListener listener = new ReplicatingWriteListener(secondaryStores, writer, outboundThreadPoolExecutor);
writer.addListener(listener);
writer.setTransactionService(transactionService); // mandatory when listeners are added
writer.setRetryingTransactionHelper(transactionHelper); // mandatory when listeners are added
}
// done

View File

@@ -5,6 +5,8 @@ package org.alfresco.repo.transaction;
import java.util.Random;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
@@ -79,25 +81,38 @@ public class RetryingTransactionHelper
* Execute a callback in a transaction until it succeeds, fails
* because of an error not the result of an optimistic locking failure,
* or a deadlock loser failure, or until a maximum number of retries have
* been attempted. NB that this ignores transaction status and relies entirely
* on thrown exceptions to decide to rollback. Also this is non-reentrant, not
* to be called within an existing transaction.
* been attempted.
* @param cb The callback containing the unit of work.
* @param readOnly Whether this is a read only transaction.
* @return The result of the unit of work.
*/
public Object doInTransaction(Callback cb, boolean readOnly)
{
// Track the last exception caught, so that we
// can throw it if we run out of retries.
RuntimeException lastException = null;
for (int count = 0; fMaxRetries < 0 || count < fMaxRetries; ++count)
{
UserTransaction txn = null;
boolean isNew = false;
try
{
txn = fTxnService.getNonPropagatingUserTransaction(readOnly);
txn.begin();
txn = fTxnService.getUserTransaction(readOnly);
// Do we need to handle transaction demarcation. If
// no, we cannot do retries, that will be up to the containing
// transaction.
isNew = txn.getStatus() == Status.STATUS_NO_TRANSACTION;
if (isNew)
{
txn.begin();
}
// Do the work.
Object result = cb.execute();
txn.commit();
// Only commit if we 'own' the transaction.
if (isNew)
{
txn.commit();
}
if (fgLogger.isDebugEnabled())
{
if (count != 0)
@@ -107,13 +122,29 @@ public class RetryingTransactionHelper
}
return result;
}
catch (Exception e)
catch (Throwable e)
{
// Somebody else 'owns' the transaction, so just rethrow.
if (!isNew)
{
if (e instanceof RuntimeException)
{
throw (RuntimeException)e;
}
else
{
throw new AlfrescoRuntimeException("Unknown Exception.", e);
}
}
// Rollback if we can.
if (txn != null)
{
try
{
txn.rollback();
if (txn.getStatus() != Status.STATUS_ROLLEDBACK)
{
txn.rollback();
}
}
catch (IllegalStateException e1)
{
@@ -128,12 +159,22 @@ public class RetryingTransactionHelper
throw new AlfrescoRuntimeException("Failure during rollback.", e1);
}
}
// This handles the case of an unexpected rollback in
// the UserTransaction.
if (e instanceof RollbackException)
{
RollbackException re = (RollbackException)e;
e = re.getCause();
}
// These are the 'OK' exceptions. These mean we can retry.
if (e instanceof ConcurrencyFailureException ||
e instanceof DeadlockLoserDataAccessException ||
e instanceof StaleObjectStateException ||
e instanceof LockAcquisitionException)
{
lastException = (RuntimeException)e;
// Sleep a random amount of time before retrying.
// The sleep interval increases with the number of retries.
try
{
Thread.sleep(fRandom.nextInt(500 * count + 500));
@@ -144,6 +185,7 @@ public class RetryingTransactionHelper
}
continue;
}
// It was a 'bad' exception.
if (e instanceof RuntimeException)
{
throw (RuntimeException)e;
@@ -151,6 +193,8 @@ public class RetryingTransactionHelper
throw new AlfrescoRuntimeException("Exception in Transaction.", e);
}
}
// We've worn out our welcome and retried the maximum number of times.
// So, fail.
throw lastException;
}
}