Merge V3.2 To HEAD

17894 : Merge from NEILM to V3.2



git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@18233 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Roy Wetherall
2010-01-21 17:48:10 +00:00
parent ac8529fc24
commit 41e574f3d5
11 changed files with 600 additions and 29 deletions

View File

@@ -25,8 +25,13 @@
package org.alfresco.repo.action;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.Map.Entry;
import java.util.concurrent.ThreadPoolExecutor;
import org.alfresco.error.StackTraceUtil;
@@ -47,6 +52,7 @@ import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.namespace.QName;
import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.EqualsHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -64,7 +70,17 @@ public class AsynchronousActionExecutionQueueImpl implements AsynchronousActionE
private TransactionService transactionService;
private AuthenticationContext authenticationContext;
private PolicyComponent policyComponent;
private NodeService nodeService;
private Map<String, AbstractAsynchronousActionFilter>
actionFilters = new HashMap<String, AbstractAsynchronousActionFilter>();
private NodeService nodeService;
/**
* We keep a record of ongoing asynchronous actions (this includes those being executed and
* those that are in the queue).
* This needs to be thread-safe - hence the Vector.
*/
List<OngoingAsyncAction> ongoingActions = new Vector<OngoingAsyncAction>();
// Policy delegates
private ClassPolicyDelegate<OnAsyncActionExecute> onAsyncActionExecuteDelegate;
@@ -169,6 +185,37 @@ public class AsynchronousActionExecutionQueueImpl implements AsynchronousActionE
return qnames;
}
/**
* This method registers an action filter, which can be used to prevent unwanted or unnecessary
* asynchronous actions from being scheduled for execution.
*
* @param filter the filter implementation.
*/
public void registerActionFilter(AbstractAsynchronousActionFilter filter)
{
String filterName = filter.getName();
if (logger.isDebugEnabled())
{
StringBuilder msg = new StringBuilder();
msg.append("Registered asynchronous action filter ")
.append(filter.getName()).append(" for action ")
.append(filter.getActionDefinitionName());
logger.debug(msg.toString());
}
AbstractAsynchronousActionFilter existingFilter = actionFilters.get(filterName);
if (logger.isDebugEnabled() && existingFilter != null)
{
StringBuilder msg = new StringBuilder();
msg.append("This replaces previous filter ")
.append(existingFilter.getName());
logger.debug(msg.toString());
}
this.actionFilters.put(filter.getName(), filter);
}
/**
* {@inheritDoc}
*/
@@ -185,6 +232,22 @@ public class AsynchronousActionExecutionQueueImpl implements AsynchronousActionE
public void executeAction(RuntimeActionService actionService, Action action, NodeRef actionedUponNodeRef,
boolean checkConditions, Set<String> actionChain, NodeRef actionExecutionHistoryNodeRef)
{
if (logger.isDebugEnabled())
{
StringBuilder msg = new StringBuilder();
msg.append("Received request to execute async action ").append(action.getActionDefinitionName())
.append(" on ").append(actionedUponNodeRef);
logger.debug(msg.toString());
msg = new StringBuilder();
msg.append("ThreadPool's active count = ").append(this.threadPoolExecutor.getActiveCount());
logger.debug(msg.toString());
msg = new StringBuilder();
msg.append("ThreadPool's queue size = ").append(this.threadPoolExecutor.getQueue().size());
logger.debug(msg.toString());
}
Set<RuleServiceImpl.ExecutedRuleData> executedRules =
(Set<RuleServiceImpl.ExecutedRuleData>) AlfrescoTransactionSupport.getResource("RuleServiceImpl.ExecutedRules");
Runnable runnable = new ActionExecutionWrapper(
@@ -195,7 +258,60 @@ public class AsynchronousActionExecutionQueueImpl implements AsynchronousActionE
actionExecutionHistoryNodeRef,
actionChain,
executedRules);
threadPoolExecutor.execute(runnable);
// Consider whether this action should be filtered out by one of the registered filters.
boolean newActionShouldBeFilteredOut = false;
OngoingAsyncAction nodeBeingNewlyActioned = new OngoingAsyncAction(actionedUponNodeRef, action);
for (Entry<String, AbstractAsynchronousActionFilter> entry : actionFilters.entrySet())
{
AbstractAsynchronousActionFilter comparator = entry.getValue();
String actionDefinitionName = comparator.getActionDefinitionName();
if (actionDefinitionName.equals(action.getActionDefinitionName()) == false)
{
// We're only interested in registered actions with the same name as this one.
continue;
}
else
{
// Now we've found a registered action that matches the current one.
// So we'll go through the actions that are ongoing and consider them for matches with this one.
for (OngoingAsyncAction ongoingAction : this.ongoingActions)
{
if (comparator.compare(ongoingAction, nodeBeingNewlyActioned) == 0)
{
newActionShouldBeFilteredOut = true;
break;
}
}
}
}
if (newActionShouldBeFilteredOut)
{
if (logger.isDebugEnabled())
{
StringBuilder msg = new StringBuilder();
msg.append("Dropping action ").append(action).append(" as equivalent is ongoing.");
logger.debug(msg.toString());
}
return;
}
else
{
if (logger.isDebugEnabled())
{
StringBuilder msg = new StringBuilder();
msg.append("Executing action ").append(action);
logger.debug(msg.toString());
}
// Queue it and do it.
ongoingActions.add(nodeBeingNewlyActioned);
threadPoolExecutor.execute(runnable);
}
// Done
if (logger.isDebugEnabled())
{
@@ -214,8 +330,19 @@ public class AsynchronousActionExecutionQueueImpl implements AsynchronousActionE
}
}
private void handleAsyncActionIsCompleted(NodeRef n, Action action) {
if (logger.isDebugEnabled())
{
StringBuilder msg = new StringBuilder();
msg.append("Completed action ").append(action);
logger.debug(msg.toString());
}
OngoingAsyncAction ongoing = new OngoingAsyncAction(n, action);
ongoingActions.remove(ongoing);
}
/**
* Tansaction listener used to invoke callback policies
* Transaction listener used to invoke callback policies
*/
public class CallbackTransactionListener extends TransactionListenerAdapter
{
@@ -384,6 +511,7 @@ public class AsynchronousActionExecutionQueueImpl implements AsynchronousActionE
{
logger.error("Failed to execute asynchronous action: " + action, exception);
}
}
handleAsyncActionIsCompleted(actionedUponNodeRef, action);
}
}
}
}