mirror of
				https://github.com/Alfresco/alfresco-community-repo.git
				synced 2025-10-22 15:12:38 +00:00 
			
		
		
		
	107541: Merged 5.0.N (5.0.3) to HEAD-BUG-FIX (5.1/Cloud) (PARTIAL MERGE)
      107413: Merged DEV to 5.0.N (5.0.3)
         106858 : MNT-13545: JavaDoc : Inconsistencies between the Java doc and the actual code
            - Cleaning of Javadoc,
   107565: MNT-13545 Fix compilation after merge of Javadoc
git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@107633 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
		
	
		
			
				
	
	
		
			451 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Java
		
	
	
	
	
	
			
		
		
	
	
			451 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Java
		
	
	
	
	
	
| /*
 | |
|  * Copyright (C) 2005-2013 Alfresco Software Limited.
 | |
|  *
 | |
|  * This file is part of Alfresco
 | |
|  *
 | |
|  * Alfresco is free software: you can redistribute it and/or modify
 | |
|  * it under the terms of the GNU Lesser General Public License as published by
 | |
|  * the Free Software Foundation, either version 3 of the License, or
 | |
|  * (at your option) any later version.
 | |
|  *
 | |
|  * Alfresco is distributed in the hope that it will be useful,
 | |
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|  * GNU Lesser General Public License for more details.
 | |
|  *
 | |
|  * You should have received a copy of the GNU Lesser General Public License
 | |
|  * along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
 | |
|  */
 | |
| package org.alfresco.repo.action;
 | |
| 
 | |
| import java.util.List;
 | |
| import java.util.Map;
 | |
| import java.util.Map.Entry;
 | |
| import java.util.Set;
 | |
| import java.util.Vector;
 | |
| import java.util.concurrent.ConcurrentHashMap;
 | |
| import java.util.concurrent.ThreadPoolExecutor;
 | |
| 
 | |
| import org.alfresco.error.AlfrescoRuntimeException;
 | |
| import org.alfresco.error.StackTraceUtil;
 | |
| import org.alfresco.repo.action.AsynchronousActionExecutionQueuePolicies.OnAsyncActionExecute;
 | |
| import org.alfresco.repo.policy.ClassPolicyDelegate;
 | |
| import org.alfresco.repo.policy.PolicyComponent;
 | |
| import org.alfresco.repo.rule.RuleServiceImpl;
 | |
| import org.alfresco.repo.security.authentication.AuthenticationContext;
 | |
| import org.alfresco.repo.tenant.TenantUtil;
 | |
| import org.alfresco.repo.tenant.TenantUtil.TenantRunAsWork;
 | |
| import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
 | |
| import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
 | |
| import org.alfresco.repo.transaction.TransactionListenerAdapter;
 | |
| import org.alfresco.service.cmr.action.Action;
 | |
| import org.alfresco.service.cmr.action.ActionServiceException;
 | |
| import org.alfresco.service.cmr.repository.NodeRef;
 | |
| import org.alfresco.service.transaction.TransactionService;
 | |
| import org.apache.commons.logging.Log;
 | |
| import org.apache.commons.logging.LogFactory;
 | |
| 
 | |
| /**
 | |
|  * The asynchronous action execution queue implementation
 | |
|  * 
 | |
|  * @author Roy Wetherall
 | |
|  */
 | |
| public class AsynchronousActionExecutionQueueImpl implements AsynchronousActionExecutionQueue
 | |
| {
 | |
|     private static Log logger = LogFactory.getLog(AsynchronousActionExecutionQueueImpl.class);
 | |
|     
 | |
|     /** Services */
 | |
|     private ActionServiceImpl  actionServiceImpl;
 | |
|     private ThreadPoolExecutor threadPoolExecutor;
 | |
|     private TransactionService transactionService;
 | |
|     private PolicyComponent policyComponent;
 | |
|     private Map<String, AbstractAsynchronousActionFilter>
 | |
|             actionFilters = new ConcurrentHashMap<String, AbstractAsynchronousActionFilter>();
 | |
|     private String id;
 | |
| 
 | |
| 	/**
 | |
| 	 * We keep a record of ongoing asynchronous actions (this includes those being executed and
 | |
| 	 * those that are in the queue).
 | |
| 	 * This needs to be thread-safe - hence the Vector.
 | |
| 	 */
 | |
|     List<OngoingAsyncAction> ongoingActions = new Vector<OngoingAsyncAction>();
 | |
|     
 | |
|     // Policy delegates
 | |
|     private ClassPolicyDelegate<OnAsyncActionExecute> onAsyncActionExecuteDelegate; 
 | |
| 
 | |
|     /**
 | |
|      * Default constructor
 | |
|      */
 | |
|     public AsynchronousActionExecutionQueueImpl()
 | |
|     {
 | |
|     }
 | |
|     
 | |
|     /**
 | |
|      * Init method.  Registers the policies.
 | |
|      */
 | |
|     public void init()
 | |
|     {
 | |
|         // Register the execution queue with the ActionService
 | |
|         actionServiceImpl.registerAsynchronousActionExecutionQueue(id, this);
 | |
|         
 | |
|         // Register the policies
 | |
|         onAsyncActionExecuteDelegate = policyComponent.registerClassPolicy(OnAsyncActionExecute.class);
 | |
|     }
 | |
|     
 | |
|     /**
 | |
|      * @since Thor Phase 2 Sprint 2
 | |
|      */
 | |
|     public void setActionServiceImpl(ActionServiceImpl serviceImpl)
 | |
|     {
 | |
|         this.actionServiceImpl = serviceImpl;
 | |
|     }
 | |
|     
 | |
|     /**
 | |
|      * @since Thor Phase 2 Sprint 2
 | |
|      */
 | |
|     public void setId(String id)
 | |
|     {
 | |
|         this.id = id;
 | |
|     }
 | |
|     
 | |
|     /**
 | |
|      * Set the thread pool, which may be shared with other components, that will be used
 | |
|      * to run the actions.
 | |
|      * 
 | |
|      * @param threadPoolExecutor            the thread pool
 | |
|      */
 | |
|     public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor)
 | |
|     {
 | |
|         this.threadPoolExecutor = threadPoolExecutor;
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Set the transaction service
 | |
|      * 
 | |
|      * @param transactionService            the transaction service
 | |
|      */
 | |
|     public void setTransactionService(TransactionService transactionService)
 | |
|     {
 | |
|         this.transactionService = transactionService;
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * @deprecated Not used since 3.4
 | |
|      */
 | |
|     public void setAuthenticationContext(AuthenticationContext authenticationContext)
 | |
|     {
 | |
|         logger.warn("Property 'authenticationContext' is no longer required.");
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Set the policy component
 | |
|      * 
 | |
|      * @param policyComponent   policy component
 | |
|      */
 | |
|     public void setPolicyComponent(PolicyComponent policyComponent)
 | |
|     {
 | |
|         this.policyComponent = policyComponent;
 | |
|     }
 | |
|     
 | |
|     private void invokeOnAsyncActionExecutePolicy(Action action, NodeRef actionedUponNodeRef)
 | |
|     {
 | |
|         // Execute the policy, passing it all details, firing as a general action case
 | |
|         AsynchronousActionExecutionQueuePolicies.OnAsyncActionExecute policy = 
 | |
|            onAsyncActionExecuteDelegate.get(actionedUponNodeRef, ActionModel.TYPE_ACTION);
 | |
|         policy.onAsyncActionExecute(action, actionedUponNodeRef);
 | |
|     }
 | |
|     
 | |
|     /**
 | |
|      * This method registers an action filter, which can be used to prevent unwanted or unnecessary
 | |
|      * asynchronous actions from being scheduled for execution.
 | |
|      * 
 | |
|      * @param filter the filter implementation.
 | |
|      */
 | |
|     public void registerActionFilter(AbstractAsynchronousActionFilter filter)
 | |
|     {
 | |
|     	String filterName = filter.getName();
 | |
|     	
 | |
|     	if (logger.isDebugEnabled())
 | |
|     	{
 | |
|     		StringBuilder msg = new StringBuilder();
 | |
|     		msg.append("Registered asynchronous action filter ")
 | |
|     		    .append(filter.getName()).append(" for action ")
 | |
|     		    .append(filter.getActionDefinitionName());
 | |
|     		logger.debug(msg.toString());
 | |
|     	}
 | |
|     	
 | |
|     	AbstractAsynchronousActionFilter existingFilter = actionFilters.get(filterName);
 | |
| 		if (logger.isDebugEnabled() && existingFilter != null)
 | |
|     	{
 | |
|     		StringBuilder msg = new StringBuilder();
 | |
| 			msg.append("This replaces previous filter ")
 | |
|     		    .append(existingFilter.getName());
 | |
|     		logger.debug(msg.toString());
 | |
|     	}
 | |
| 		
 | |
|     	this.actionFilters.put(filter.getName(), filter);
 | |
|     }
 | |
|     
 | |
|     /**
 | |
|      * {@inheritDoc}
 | |
|      */
 | |
|     public void executeAction(RuntimeActionService actionService, Action action, NodeRef actionedUponNodeRef,
 | |
|             boolean checkConditions, Set<String> actionChain)
 | |
|     {
 | |
|         executeAction(actionService, action, actionedUponNodeRef, checkConditions, actionChain, null);
 | |
|     }
 | |
| 
 | |
|     @SuppressWarnings("unchecked")
 | |
| 	public void executeAction(RuntimeActionService actionService, Action action, NodeRef actionedUponNodeRef,
 | |
|             boolean checkConditions, Set<String> actionChain, NodeRef actionExecutionHistoryNodeRef)
 | |
|     {
 | |
|     	if (logger.isDebugEnabled())
 | |
|     	{
 | |
|     	    StringBuilder msg = new StringBuilder();
 | |
|     	    msg.append("Received request to execute async action ").append(action.getActionDefinitionName())
 | |
|     	        .append(" on ").append(actionedUponNodeRef);
 | |
|     	    logger.debug(msg.toString());
 | |
| 
 | |
|     	    msg = new StringBuilder();
 | |
|     	    msg.append("ThreadPool's active count = ").append(this.threadPoolExecutor.getActiveCount());
 | |
|     	    logger.debug(msg.toString());
 | |
| 
 | |
|     	    msg = new StringBuilder();
 | |
|     	    msg.append("ThreadPool's queue size = ").append(this.threadPoolExecutor.getQueue().size());
 | |
|     	    logger.debug(msg.toString());
 | |
|     	}
 | |
| 
 | |
|     	Set<RuleServiceImpl.ExecutedRuleData> executedRules =
 | |
|             (Set<RuleServiceImpl.ExecutedRuleData>) AlfrescoTransactionSupport.getResource("RuleServiceImpl.ExecutedRules");
 | |
|         Runnable runnable = new ActionExecutionWrapper(
 | |
|                 actionService,
 | |
|                 action,
 | |
|                 actionedUponNodeRef,
 | |
|                 checkConditions,
 | |
|                 actionChain,
 | |
|                 executedRules);
 | |
|         
 | |
|         // Consider whether this action should be filtered out by one of the registered filters.
 | |
|         boolean newActionShouldBeFilteredOut = false;
 | |
|         OngoingAsyncAction nodeBeingNewlyActioned = new OngoingAsyncAction(actionedUponNodeRef, action);
 | |
|         
 | |
|         for (Entry<String, AbstractAsynchronousActionFilter> entry : actionFilters.entrySet())
 | |
|         {
 | |
|         	AbstractAsynchronousActionFilter comparator = entry.getValue();
 | |
|         	String actionDefinitionName = comparator.getActionDefinitionName();
 | |
|         	
 | |
|         	if (actionDefinitionName.equals(action.getActionDefinitionName()) == false)
 | |
|         	{
 | |
|         		// We're only interested in registered actions with the same name as this one.
 | |
|         		continue;
 | |
|         	}
 | |
|         	else
 | |
|         	{
 | |
|         		// Now we've found a registered action that matches the current one.
 | |
|         		// So we'll go through the actions that are ongoing and consider them for matches with this one.
 | |
|         		// Need to synchronize to prevent changes to ongoingActions whilst iterating. Assume that ongoingActions
 | |
|         		// is not going to be too big and the loop will execute quite quickly, so that the synchronization 
 | |
|         		// will not impact concurrency too much.
 | |
|         		synchronized(this.ongoingActions)
 | |
|         		{
 | |
| 	        		for (OngoingAsyncAction ongoingAction : this.ongoingActions)
 | |
| 	        		{
 | |
| 	        			if (comparator.compare(ongoingAction, nodeBeingNewlyActioned) == 0)
 | |
| 	        			{
 | |
| 	        				newActionShouldBeFilteredOut = true;
 | |
| 	        				break;
 | |
| 	        			}
 | |
| 	        		}
 | |
|         		}
 | |
|         	}
 | |
|         }
 | |
|         if (newActionShouldBeFilteredOut)
 | |
|         {
 | |
|         	if (logger.isDebugEnabled())
 | |
|         	{
 | |
|         		StringBuilder msg = new StringBuilder();
 | |
|         		msg.append("Dropping action ").append(action).append(" as equivalent is ongoing.");
 | |
|         		logger.debug(msg.toString());
 | |
|         	}
 | |
|         	return;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|         	if (logger.isDebugEnabled())
 | |
|         	{
 | |
|         		StringBuilder msg = new StringBuilder();
 | |
|         		msg.append("Executing action ").append(action);
 | |
|         		logger.debug(msg.toString());
 | |
|         	}
 | |
|         	
 | |
|         	// Queue it and do it.
 | |
|         	ongoingActions.add(nodeBeingNewlyActioned);
 | |
|         	threadPoolExecutor.execute(runnable);
 | |
|         }
 | |
| 
 | |
|         // Done
 | |
|         if (logger.isDebugEnabled())
 | |
|         {
 | |
|             // get the stack trace
 | |
|             Exception e = new Exception();
 | |
|             e.fillInStackTrace();
 | |
|             StackTraceElement[] trace = e.getStackTrace();
 | |
|             StringBuilder sb = new StringBuilder();
 | |
|             sb.append("\n")
 | |
|               .append("Placed action on execution queue: \n")
 | |
|               .append("   Action:     " + action);
 | |
|             String msg = sb.toString();
 | |
|             sb = new StringBuilder();
 | |
|             StackTraceUtil.buildStackTrace(msg, trace, sb, -1);
 | |
|             logger.debug(sb);
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     private void handleAsyncActionIsCompleted(NodeRef n, Action action) {
 | |
|     	if (logger.isDebugEnabled())
 | |
|     	{
 | |
|     		StringBuilder msg = new StringBuilder();
 | |
|     		msg.append("Completed action ").append(action);
 | |
|     		logger.debug(msg.toString());
 | |
|     	}
 | |
|     	OngoingAsyncAction ongoing = new OngoingAsyncAction(n, action);
 | |
|     	ongoingActions.remove(ongoing);
 | |
|     }
 | |
|     
 | |
|     /**
 | |
|      * Transaction listener used to invoke callback policies
 | |
|      */
 | |
|     public class CallbackTransactionListener extends TransactionListenerAdapter
 | |
|     {
 | |
|         private Action action;
 | |
|         private NodeRef actionedUponNodeRef;
 | |
|         
 | |
|         /**
 | |
|          * Constructor
 | |
|          * 
 | |
|          * @param action                    action
 | |
|          * @param actionedUponNodeRef       actioned upon node reference
 | |
|          */
 | |
|         public CallbackTransactionListener(Action action, NodeRef actionedUponNodeRef)
 | |
|         {
 | |
|             this.action = action;
 | |
|             this.actionedUponNodeRef = actionedUponNodeRef;
 | |
|         }
 | |
| 
 | |
|         /**
 | |
|          * @see org.alfresco.repo.transaction.TransactionListenerAdapter#afterCommit()
 | |
|          */
 | |
|         @Override
 | |
|         public void afterCommit()
 | |
|         {
 | |
|             // Invoke the execute complete policy
 | |
|             invokeOnAsyncActionExecutePolicy(action, actionedUponNodeRef);           
 | |
|         }        
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Runnable class to wrap the execution of the action.
 | |
|      */
 | |
|     private class ActionExecutionWrapper implements Runnable
 | |
|     {
 | |
|         private RuntimeActionService actionService;
 | |
| 
 | |
|         private Action action;
 | |
|         private NodeRef actionedUponNodeRef;
 | |
|         private boolean checkConditions;
 | |
|         private Set<String> actionChain;
 | |
|         private Set<RuleServiceImpl.ExecutedRuleData> executedRules;
 | |
| 
 | |
|         /**
 | |
|          * @param actionService                     the action service
 | |
|          * @param action                            the action to perform
 | |
|          * @param actionedUponNodeRef               the node to perform the action on
 | |
|          * @param checkConditions                   the check conditions
 | |
|          * @param actionChain                       the action chain
 | |
|          * @param executedRules                     list of executions done to helps to prevent loop scenarios with async rules
 | |
|          */
 | |
|         public ActionExecutionWrapper(
 | |
|                 RuntimeActionService actionService,
 | |
|                 Action action,
 | |
|                 NodeRef actionedUponNodeRef,
 | |
|                 boolean checkConditions,
 | |
|                 Set<String> actionChain,
 | |
|                 Set<RuleServiceImpl.ExecutedRuleData> executedRules)
 | |
|         {
 | |
|             this.actionService = actionService;
 | |
|             this.actionedUponNodeRef = actionedUponNodeRef;
 | |
|             this.action = action;
 | |
|             this.checkConditions = checkConditions;
 | |
|             this.actionChain = actionChain;
 | |
|             this.executedRules = executedRules;
 | |
|         }
 | |
| 
 | |
|         /**
 | |
|          * Executes the action via the action runtime service
 | |
|          * 
 | |
|          * @see java.lang.Runnable#run()
 | |
|          */
 | |
|         public void run()
 | |
|         {
 | |
|             try
 | |
|             {
 | |
|                 // Get the run as user name
 | |
|                 final String userName = ((ActionImpl)ActionExecutionWrapper.this.action).getRunAsUser();
 | |
|                 if (userName == null)
 | |
|                 {
 | |
|                     throw new ActionServiceException("Cannot execute action asynchronously since run as user is 'null'");
 | |
|                 }
 | |
|                 // Get the tenant the action was submitted from
 | |
|                 final String tenantId = ((ActionImpl)ActionExecutionWrapper.this.action).getTenantId();
 | |
|                 
 | |
|                 // import the content
 | |
|                 TenantRunAsWork<Object> actionRunAs = new TenantRunAsWork<Object>()
 | |
|                 {
 | |
|                     public Object doWork() throws Exception
 | |
|                     {
 | |
|                         RetryingTransactionCallback<Object> actionCallback = new RetryingTransactionCallback<Object>()
 | |
|                         {
 | |
|                             public Object execute()
 | |
|                             {   
 | |
|                                 // If we have rules, apply them
 | |
|                                 if (ActionExecutionWrapper.this.executedRules != null)
 | |
|                                 {
 | |
|                                     AlfrescoTransactionSupport.bindResource("RuleServiceImpl.ExecutedRules", ActionExecutionWrapper.this.executedRules);
 | |
|                                 }
 | |
|                                 
 | |
|                                 // Allow other classes to know when this action completes
 | |
|                                 AlfrescoTransactionSupport.bindListener(new CallbackTransactionListener(
 | |
|                                       ActionExecutionWrapper.this.action,
 | |
|                                       ActionExecutionWrapper.this.actionedUponNodeRef
 | |
|                                 ));
 | |
|                                 
 | |
|                                 // Have the action run
 | |
|                                 ActionExecutionWrapper.this.actionService.executeActionImpl(
 | |
|                                         ActionExecutionWrapper.this.action,
 | |
|                                         ActionExecutionWrapper.this.actionedUponNodeRef,
 | |
|                                         ActionExecutionWrapper.this.checkConditions, true,
 | |
|                                         ActionExecutionWrapper.this.actionChain);
 | |
| 
 | |
|                                 return null;
 | |
|                             }
 | |
|                         };
 | |
|                         return transactionService.getRetryingTransactionHelper().doInTransaction(actionCallback);
 | |
|                     }
 | |
|                 };
 | |
|                 TenantUtil.runAsUserTenant(actionRunAs, userName, tenantId);
 | |
|             }
 | |
|             catch (Throwable e)
 | |
|             {
 | |
|                 Throwable rootCause = (e instanceof AlfrescoRuntimeException) ? ((AlfrescoRuntimeException)e).getRootCause() : null;
 | |
|                 String message = (rootCause == null ? null : rootCause.getMessage());
 | |
|                 message = "Failed to execute asynchronous action: " + action+ (message == null ? "" : ": "+message);
 | |
|                 if(!ActionExecutionWrapper.this.actionService.onLogException(action, logger, rootCause, message))
 | |
|                 {
 | |
|                     //if not handled by the executor just show in the log
 | |
|                     logger.error(message, e);
 | |
|                 }
 | |
|             }
 | |
|             handleAsyncActionIsCompleted(actionedUponNodeRef, action);
 | |
|         }
 | |
|     }
 | |
| } |