diff --git a/source/java/org/alfresco/repo/action/AsynchronousActionExecutionQueueImpl.java b/source/java/org/alfresco/repo/action/AsynchronousActionExecutionQueueImpl.java index 4654e14fc0..3903b101fb 100644 --- a/source/java/org/alfresco/repo/action/AsynchronousActionExecutionQueueImpl.java +++ b/source/java/org/alfresco/repo/action/AsynchronousActionExecutionQueueImpl.java @@ -25,48 +25,30 @@ package org.alfresco.repo.action; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.alfresco.repo.rule.RuleServiceImpl; import org.alfresco.repo.security.authentication.AuthenticationComponent; import org.alfresco.repo.transaction.AlfrescoTransactionSupport; -import org.alfresco.repo.transaction.TransactionUtil; +import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.cmr.action.Action; import org.alfresco.service.cmr.action.ActionServiceException; import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.transaction.TransactionService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * The asynchronous action execution queue implementation * * @author Roy Wetherall */ -public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor implements - AsynchronousActionExecutionQueue +public class AsynchronousActionExecutionQueueImpl implements AsynchronousActionExecutionQueue { - /** - * Default pool values - */ - private static final int CORE_POOL_SIZE = 2; - - private static final int MAX_POOL_SIZE = 5; - - private static final long KEEP_ALIVE = 30; - - private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; - - private static final int MAX_QUEUE_SIZE = 500; - - /** - * The transaction service - */ + private static Log logger = LogFactory.getLog(AsynchronousActionExecutionQueueImpl.class); + + private ThreadPoolExecutor threadPoolExecutor; private TransactionService transactionService; - - /** - * The authentication component - */ private AuthenticationComponent authenticationComponent; /** @@ -74,15 +56,23 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp */ public AsynchronousActionExecutionQueueImpl() { - super(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE, TIME_UNIT, new ArrayBlockingQueue(MAX_QUEUE_SIZE, - true)); + } + + /** + * Set the thread pool, which may be shared with other components, that will be used + * to run the actions. + * + * @param threadPoolExecutor the thread pool + */ + public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) + { + this.threadPoolExecutor = threadPoolExecutor; } /** * Set the transaction service * - * @param transactionService - * the transaction service + * @param transactionService the transaction service */ public void setTransactionService(TransactionService transactionService) { @@ -92,8 +82,7 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp /** * Set the authentication component * - * @param authenticationComponent - * the authentication component + * @param authenticationComponent the authentication component */ public void setAuthenticationComponent(AuthenticationComponent authenticationComponent) { @@ -101,8 +90,7 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp } /** - * @see org.alfresco.repo.action.AsynchronousActionExecutionQueue#executeAction(org.alfresco.service.cmr.repository.NodeRef, - * org.alfresco.service.cmr.action.Action, boolean) + * {@inheritDoc} */ public void executeAction(RuntimeActionService actionService, Action action, NodeRef actionedUponNodeRef, boolean checkConditions, Set actionChain) @@ -111,38 +99,23 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp } /** - * @see org.alfresco.repo.action.AsynchronousActionExecutionQueue#executeAction(org.alfresco.service.cmr.repository.NodeRef, - * org.alfresco.service.cmr.action.Action, boolean, - * org.alfresco.service.cmr.repository.NodeRef) + * {@inheritDoc} */ @SuppressWarnings("unchecked") public void executeAction(RuntimeActionService actionService, Action action, NodeRef actionedUponNodeRef, boolean checkConditions, Set actionChain, NodeRef actionExecutionHistoryNodeRef) { Set executedRules = - (Set) AlfrescoTransactionSupport.getResource("RuleServiceImpl.ExecutedRules"); - execute(new ActionExecutionWrapper(actionService, transactionService, authenticationComponent, action, - actionedUponNodeRef, checkConditions, actionExecutionHistoryNodeRef, actionChain, executedRules)); - } - - /** - * @see java.util.concurrent.ThreadPoolExecutor#beforeExecute(java.lang.Thread, - * java.lang.Runnable) - */ - @Override - protected void beforeExecute(Thread thread, Runnable runnable) - { - super.beforeExecute(thread, runnable); - } - - /** - * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, - * java.lang.Throwable) - */ - @Override - protected void afterExecute(Runnable thread, Throwable runnable) - { - super.afterExecute(thread, runnable); + (Set) AlfrescoTransactionSupport.getResource("RuleServiceImpl.ExecutedRules"); + Runnable runnable = new ActionExecutionWrapper( + actionService, + action, + actionedUponNodeRef, + checkConditions, + actionExecutionHistoryNodeRef, + actionChain, + executedRules); + threadPoolExecutor.execute(runnable); } /** @@ -150,69 +123,34 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp */ private class ActionExecutionWrapper implements Runnable { - /** - * Runtime action service - */ private RuntimeActionService actionService; - /** - * The transaction service - */ - private TransactionService transactionService; - - /** - * The authentication component - */ - private AuthenticationComponent authenticationComponent; - - /** - * The action - */ private Action action; - - /** - * The actioned upon node reference - */ private NodeRef actionedUponNodeRef; - - /** - * The check conditions value - */ private boolean checkConditions; - - /** - * The action execution history node reference - */ private NodeRef actionExecutionHistoryNodeRef; - - /** - * The action chain - */ private Set actionChain; - - /** - * List of executed list, helps to prevent loop scenarios with async rules - */ private Set executedRules; /** - * Constructor - * - * @param actionService - * @param transactionService - * @param authenticationComponent - * @param action - * @param actionedUponNodeRef - * @param checkConditions - * @param actionExecutionHistoryNodeRef + * @param actionService the action service + * @param action the action to perform + * @param actionedUponNodeRef the node to perform the action on + * @param checkConditions the check conditions + * @param actionExecutionHistoryNodeRef the action execution history node reference + * @param actionChain the action chain + * @param executedRules list of executions done to helps to prevent loop scenarios with async rules */ - public ActionExecutionWrapper(RuntimeActionService actionService, TransactionService transactionService, - AuthenticationComponent authenticationComponent, Action action, NodeRef actionedUponNodeRef, - boolean checkConditions, NodeRef actionExecutionHistoryNodeRef, Set actionChain, Set executedRules) + public ActionExecutionWrapper( + RuntimeActionService actionService, + Action action, + NodeRef actionedUponNodeRef, + boolean checkConditions, + NodeRef actionExecutionHistoryNodeRef, + Set actionChain, + Set executedRules) { this.actionService = actionService; - this.transactionService = transactionService; - this.authenticationComponent = authenticationComponent; this.actionedUponNodeRef = actionedUponNodeRef; this.action = action; this.checkConditions = checkConditions; @@ -287,38 +225,38 @@ public class AsynchronousActionExecutionQueueImpl extends ThreadPoolExecutor imp throw new ActionServiceException("Cannot execute action asynchronously since run as user is 'null'"); } - ActionExecutionWrapper.this.authenticationComponent.setCurrentUser(userName); + authenticationComponent.setCurrentUser(userName); try { - TransactionUtil.executeInNonPropagatingUserTransaction(this.transactionService, - new TransactionUtil.TransactionWork() + RetryingTransactionCallback actionCallback = new RetryingTransactionCallback() + { + public Object execute() + { + if (ActionExecutionWrapper.this.executedRules != null) { - public Object doWork() - { - if (ActionExecutionWrapper.this.executedRules != null) - { - AlfrescoTransactionSupport.bindResource("RuleServiceImpl.ExecutedRules", ActionExecutionWrapper.this.executedRules); - } - - ActionExecutionWrapper.this.actionService.executeActionImpl( - ActionExecutionWrapper.this.action, - ActionExecutionWrapper.this.actionedUponNodeRef, - ActionExecutionWrapper.this.checkConditions, true, - ActionExecutionWrapper.this.actionChain); - - return null; - } - }); + AlfrescoTransactionSupport.bindResource("RuleServiceImpl.ExecutedRules", ActionExecutionWrapper.this.executedRules); + } + + ActionExecutionWrapper.this.actionService.executeActionImpl( + ActionExecutionWrapper.this.action, + ActionExecutionWrapper.this.actionedUponNodeRef, + ActionExecutionWrapper.this.checkConditions, true, + ActionExecutionWrapper.this.actionChain); + + return null; + } + }; + transactionService.getRetryingTransactionHelper().doInTransaction(actionCallback); } finally { - ActionExecutionWrapper.this.authenticationComponent.clearCurrentSecurityContext(); + authenticationComponent.clearCurrentSecurityContext(); } } catch (Throwable exception) { - exception.printStackTrace(); + logger.error("Failed to execute asynchronous action: " + action, exception); } } }