Added retrying behaviour to asynchronous actions.

Logging of asynchronous action failures goes to the logger and not just console.
Injected ThreadPoolExecutor to be used for asynchronous actions.


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@5946 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Derek Hulley
2007-06-13 23:31:31 +00:00
parent 7956a08df8
commit 36ea25f822

View File

@@ -25,48 +25,30 @@
package org.alfresco.repo.action; package org.alfresco.repo.action;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.alfresco.repo.rule.RuleServiceImpl; import org.alfresco.repo.rule.RuleServiceImpl;
import org.alfresco.repo.security.authentication.AuthenticationComponent; import org.alfresco.repo.security.authentication.AuthenticationComponent;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport; import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
import org.alfresco.repo.transaction.TransactionUtil; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.cmr.action.Action; import org.alfresco.service.cmr.action.Action;
import org.alfresco.service.cmr.action.ActionServiceException; import org.alfresco.service.cmr.action.ActionServiceException;
import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.transaction.TransactionService; import org.alfresco.service.transaction.TransactionService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** /**
* The asynchronous action execution queue implementation * The asynchronous action execution queue implementation
* *
* @author Roy Wetherall * @author Roy Wetherall
*/ */
public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor implements public class AsynchronousActionExecutionQueueImpl implements AsynchronousActionExecutionQueue
AsynchronousActionExecutionQueue
{ {
/** private static Log logger = LogFactory.getLog(AsynchronousActionExecutionQueueImpl.class);
* Default pool values
*/ private ThreadPoolExecutor threadPoolExecutor;
private static final int CORE_POOL_SIZE = 2;
private static final int MAX_POOL_SIZE = 5;
private static final long KEEP_ALIVE = 30;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
private static final int MAX_QUEUE_SIZE = 500;
/**
* The transaction service
*/
private TransactionService transactionService; private TransactionService transactionService;
/**
* The authentication component
*/
private AuthenticationComponent authenticationComponent; private AuthenticationComponent authenticationComponent;
/** /**
@@ -74,15 +56,23 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp
*/ */
public AsynchronousActionExecutionQueueImpl() public AsynchronousActionExecutionQueueImpl()
{ {
super(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE, TIME_UNIT, new ArrayBlockingQueue<Runnable>(MAX_QUEUE_SIZE, }
true));
/**
* Set the thread pool, which may be shared with other components, that will be used
* to run the actions.
*
* @param threadPoolExecutor the thread pool
*/
public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor)
{
this.threadPoolExecutor = threadPoolExecutor;
} }
/** /**
* Set the transaction service * Set the transaction service
* *
* @param transactionService * @param transactionService the transaction service
* the transaction service
*/ */
public void setTransactionService(TransactionService transactionService) public void setTransactionService(TransactionService transactionService)
{ {
@@ -92,8 +82,7 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp
/** /**
* Set the authentication component * Set the authentication component
* *
* @param authenticationComponent * @param authenticationComponent the authentication component
* the authentication component
*/ */
public void setAuthenticationComponent(AuthenticationComponent authenticationComponent) public void setAuthenticationComponent(AuthenticationComponent authenticationComponent)
{ {
@@ -101,8 +90,7 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp
} }
/** /**
* @see org.alfresco.repo.action.AsynchronousActionExecutionQueue#executeAction(org.alfresco.service.cmr.repository.NodeRef, * {@inheritDoc}
* org.alfresco.service.cmr.action.Action, boolean)
*/ */
public void executeAction(RuntimeActionService actionService, Action action, NodeRef actionedUponNodeRef, public void executeAction(RuntimeActionService actionService, Action action, NodeRef actionedUponNodeRef,
boolean checkConditions, Set<String> actionChain) boolean checkConditions, Set<String> actionChain)
@@ -111,38 +99,23 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp
} }
/** /**
* @see org.alfresco.repo.action.AsynchronousActionExecutionQueue#executeAction(org.alfresco.service.cmr.repository.NodeRef, * {@inheritDoc}
* org.alfresco.service.cmr.action.Action, boolean,
* org.alfresco.service.cmr.repository.NodeRef)
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void executeAction(RuntimeActionService actionService, Action action, NodeRef actionedUponNodeRef, public void executeAction(RuntimeActionService actionService, Action action, NodeRef actionedUponNodeRef,
boolean checkConditions, Set<String> actionChain, NodeRef actionExecutionHistoryNodeRef) boolean checkConditions, Set<String> actionChain, NodeRef actionExecutionHistoryNodeRef)
{ {
Set<RuleServiceImpl.ExecutedRuleData> executedRules = Set<RuleServiceImpl.ExecutedRuleData> executedRules =
(Set<RuleServiceImpl.ExecutedRuleData>) AlfrescoTransactionSupport.getResource("RuleServiceImpl.ExecutedRules"); (Set<RuleServiceImpl.ExecutedRuleData>) AlfrescoTransactionSupport.getResource("RuleServiceImpl.ExecutedRules");
execute(new ActionExecutionWrapper(actionService, transactionService, authenticationComponent, action, Runnable runnable = new ActionExecutionWrapper(
actionedUponNodeRef, checkConditions, actionExecutionHistoryNodeRef, actionChain, executedRules)); actionService,
} action,
actionedUponNodeRef,
/** checkConditions,
* @see java.util.concurrent.ThreadPoolExecutor#beforeExecute(java.lang.Thread, actionExecutionHistoryNodeRef,
* java.lang.Runnable) actionChain,
*/ executedRules);
@Override threadPoolExecutor.execute(runnable);
protected void beforeExecute(Thread thread, Runnable runnable)
{
super.beforeExecute(thread, runnable);
}
/**
* @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable,
* java.lang.Throwable)
*/
@Override
protected void afterExecute(Runnable thread, Throwable runnable)
{
super.afterExecute(thread, runnable);
} }
/** /**
@@ -150,69 +123,34 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp
*/ */
private class ActionExecutionWrapper implements Runnable private class ActionExecutionWrapper implements Runnable
{ {
/**
* Runtime action service
*/
private RuntimeActionService actionService; private RuntimeActionService actionService;
/**
* The transaction service
*/
private TransactionService transactionService;
/**
* The authentication component
*/
private AuthenticationComponent authenticationComponent;
/**
* The action
*/
private Action action; private Action action;
/**
* The actioned upon node reference
*/
private NodeRef actionedUponNodeRef; private NodeRef actionedUponNodeRef;
/**
* The check conditions value
*/
private boolean checkConditions; private boolean checkConditions;
/**
* The action execution history node reference
*/
private NodeRef actionExecutionHistoryNodeRef; private NodeRef actionExecutionHistoryNodeRef;
/**
* The action chain
*/
private Set<String> actionChain; private Set<String> actionChain;
/**
* List of executed list, helps to prevent loop scenarios with async rules
*/
private Set<RuleServiceImpl.ExecutedRuleData> executedRules; private Set<RuleServiceImpl.ExecutedRuleData> executedRules;
/** /**
* Constructor * @param actionService the action service
* * @param action the action to perform
* @param actionService * @param actionedUponNodeRef the node to perform the action on
* @param transactionService * @param checkConditions the check conditions
* @param authenticationComponent * @param actionExecutionHistoryNodeRef the action execution history node reference
* @param action * @param actionChain the action chain
* @param actionedUponNodeRef * @param executedRules list of executions done to helps to prevent loop scenarios with async rules
* @param checkConditions
* @param actionExecutionHistoryNodeRef
*/ */
public ActionExecutionWrapper(RuntimeActionService actionService, TransactionService transactionService, public ActionExecutionWrapper(
AuthenticationComponent authenticationComponent, Action action, NodeRef actionedUponNodeRef, RuntimeActionService actionService,
boolean checkConditions, NodeRef actionExecutionHistoryNodeRef, Set<String> actionChain, Set<RuleServiceImpl.ExecutedRuleData> executedRules) Action action,
NodeRef actionedUponNodeRef,
boolean checkConditions,
NodeRef actionExecutionHistoryNodeRef,
Set<String> actionChain,
Set<RuleServiceImpl.ExecutedRuleData> executedRules)
{ {
this.actionService = actionService; this.actionService = actionService;
this.transactionService = transactionService;
this.authenticationComponent = authenticationComponent;
this.actionedUponNodeRef = actionedUponNodeRef; this.actionedUponNodeRef = actionedUponNodeRef;
this.action = action; this.action = action;
this.checkConditions = checkConditions; this.checkConditions = checkConditions;
@@ -287,38 +225,38 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp
throw new ActionServiceException("Cannot execute action asynchronously since run as user is 'null'"); throw new ActionServiceException("Cannot execute action asynchronously since run as user is 'null'");
} }
ActionExecutionWrapper.this.authenticationComponent.setCurrentUser(userName); authenticationComponent.setCurrentUser(userName);
try try
{ {
TransactionUtil.executeInNonPropagatingUserTransaction(this.transactionService, RetryingTransactionCallback<Object> actionCallback = new RetryingTransactionCallback<Object>()
new TransactionUtil.TransactionWork<Object>() {
public Object execute()
{
if (ActionExecutionWrapper.this.executedRules != null)
{ {
public Object doWork() AlfrescoTransactionSupport.bindResource("RuleServiceImpl.ExecutedRules", ActionExecutionWrapper.this.executedRules);
{ }
if (ActionExecutionWrapper.this.executedRules != null)
{ ActionExecutionWrapper.this.actionService.executeActionImpl(
AlfrescoTransactionSupport.bindResource("RuleServiceImpl.ExecutedRules", ActionExecutionWrapper.this.executedRules); ActionExecutionWrapper.this.action,
} ActionExecutionWrapper.this.actionedUponNodeRef,
ActionExecutionWrapper.this.checkConditions, true,
ActionExecutionWrapper.this.actionService.executeActionImpl( ActionExecutionWrapper.this.actionChain);
ActionExecutionWrapper.this.action,
ActionExecutionWrapper.this.actionedUponNodeRef, return null;
ActionExecutionWrapper.this.checkConditions, true, }
ActionExecutionWrapper.this.actionChain); };
transactionService.getRetryingTransactionHelper().doInTransaction(actionCallback);
return null;
}
});
} }
finally finally
{ {
ActionExecutionWrapper.this.authenticationComponent.clearCurrentSecurityContext(); authenticationComponent.clearCurrentSecurityContext();
} }
} }
catch (Throwable exception) catch (Throwable exception)
{ {
exception.printStackTrace(); logger.error("Failed to execute asynchronous action: " + action, exception);
} }
} }
} }