mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
ALF-4284 - allow cluster-wide visibility of pending actions, as well as running ones
git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@21707 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -113,7 +113,12 @@ public class ActionTrackingServiceImpl implements ActionTrackingService
|
||||
}
|
||||
public void recordActionPending(ActionImpl action)
|
||||
{
|
||||
// Set the status
|
||||
action.setExecutionStatus(ActionStatus.Pending);
|
||||
|
||||
// Have it put into the cache, so we can tell it
|
||||
// is waiting to be run
|
||||
placeActionInCache(action);
|
||||
}
|
||||
|
||||
public void recordActionComplete(Action action)
|
||||
@@ -152,10 +157,47 @@ public class ActionTrackingServiceImpl implements ActionTrackingService
|
||||
logger.debug("Action " + action + " has begun exection");
|
||||
}
|
||||
|
||||
// Grab what status it was before
|
||||
ActionStatus previousStatus = action.getExecutionStatus();
|
||||
|
||||
// Mark the action as starting
|
||||
action.setExecutionStartDate(new Date());
|
||||
action.setExecutionStatus(ActionStatus.Running);
|
||||
|
||||
|
||||
// If it's a synchronous execution, put it into the cache
|
||||
if(previousStatus != ActionStatus.Pending)
|
||||
{
|
||||
placeActionInCache(action);
|
||||
}
|
||||
else
|
||||
{
|
||||
// If it's async, update the existing cache entry
|
||||
String key = generateCacheKey(action);
|
||||
ExecutionDetails details = executingActionsCache.get(key);
|
||||
|
||||
// Check it's really there, warn + fix if not
|
||||
if(details == null) {
|
||||
logger.warn(
|
||||
"Went to mark the start of execution of " +
|
||||
action + " but it wasn't in the running actions cache! " +
|
||||
"Your running actions cache is probably too small"
|
||||
);
|
||||
}
|
||||
|
||||
// Update and save into the cache
|
||||
details = buildExecutionDetails(action);
|
||||
executingActionsCache.put(key, details);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For an action that needs to go into the cache
|
||||
* (async action that is pending, or sync action
|
||||
* that is running), assign an execution instance
|
||||
* and put into the cache
|
||||
*/
|
||||
private void placeActionInCache(ActionImpl action)
|
||||
{
|
||||
// Assign it a (unique) execution ID
|
||||
// (Keep checking to see if the key is used as we
|
||||
// increase nextExecutionId until it isn't)
|
||||
|
@@ -73,6 +73,7 @@ public class ActionTrackingServiceImplTest extends TestCase
|
||||
private EhCacheAdapter<String, ExecutionDetails> executingActionsCache;
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void setUp() throws Exception {
|
||||
this.nodeService = (NodeService)ctx.getBean("nodeService");
|
||||
this.actionService = (ActionService)ctx.getBean("actionService");
|
||||
@@ -225,21 +226,30 @@ public class ActionTrackingServiceImplTest extends TestCase
|
||||
assertEquals(null, executingActionsCache.get(key));
|
||||
|
||||
|
||||
// Pending won't add it in either
|
||||
// Pending will add it, but with no start date
|
||||
actionTrackingService.recordActionPending(action);
|
||||
key = ActionTrackingServiceImpl.generateCacheKey(action);
|
||||
assertEquals(ActionStatus.Pending, action.getExecutionStatus());
|
||||
assertEquals(null, executingActionsCache.get(key));
|
||||
assertNotNull(null, executingActionsCache.get(key));
|
||||
|
||||
ExecutionSummary s = ActionTrackingServiceImpl.buildExecutionSummary(action);
|
||||
ExecutionDetails d = actionTrackingService.getExecutionDetails(s);
|
||||
assertNotNull(d.getExecutionSummary());
|
||||
assertEquals("sleep-action", d.getActionType());
|
||||
assertEquals("1234", d.getActionId());
|
||||
assertEquals(1, d.getExecutionInstance());
|
||||
assertEquals(null, d.getPersistedActionRef());
|
||||
assertNull(null, d.getStartedAt());
|
||||
|
||||
|
||||
// Run it, will go into the cache
|
||||
// Run it, will be updated in the cache
|
||||
actionTrackingService.recordActionExecuting(action);
|
||||
key = ActionTrackingServiceImpl.generateCacheKey(action);
|
||||
assertEquals(ActionStatus.Running, action.getExecutionStatus());
|
||||
assertNotNull(null, executingActionsCache.get(key));
|
||||
|
||||
ExecutionSummary s = ActionTrackingServiceImpl.buildExecutionSummary(action);
|
||||
ExecutionDetails d = actionTrackingService.getExecutionDetails(s);
|
||||
s = ActionTrackingServiceImpl.buildExecutionSummary(action);
|
||||
d = actionTrackingService.getExecutionDetails(s);
|
||||
assertNotNull(d.getExecutionSummary());
|
||||
assertEquals("sleep-action", d.getActionType());
|
||||
assertEquals("1234", d.getActionId());
|
||||
@@ -264,6 +274,26 @@ public class ActionTrackingServiceImplTest extends TestCase
|
||||
assertEquals(ActionStatus.Failed, action.getExecutionStatus());
|
||||
assertEquals("Testing", action.getExecutionFailureMessage());
|
||||
assertEquals(null, executingActionsCache.get(key));
|
||||
|
||||
|
||||
// If run from new, i.e. not via pending, goes into the cache
|
||||
((ActionImpl)action).setExecutionStatus(ActionStatus.New);
|
||||
((ActionImpl)action).setExecutionStartDate(null);
|
||||
((ActionImpl)action).setExecutionEndDate(null);
|
||||
((ActionImpl)action).setExecutionFailureMessage(null);
|
||||
((ActionImpl)action).setExecutionInstance(-1);
|
||||
|
||||
actionTrackingService.recordActionExecuting(action);
|
||||
assertEquals(ActionStatus.Running, action.getExecutionStatus());
|
||||
assertTrue( ((ActionImpl)action).getExecutionInstance() != -1 );
|
||||
|
||||
key = ActionTrackingServiceImpl.generateCacheKey(action);
|
||||
assertNotNull(null, executingActionsCache.get(key));
|
||||
|
||||
actionTrackingService.recordActionComplete(action);
|
||||
key = ActionTrackingServiceImpl.generateCacheKey(action);
|
||||
assertEquals(ActionStatus.Completed, action.getExecutionStatus());
|
||||
assertEquals(null, executingActionsCache.get(key));
|
||||
}
|
||||
|
||||
/** Working actions go into the cache, then out */
|
||||
@@ -385,6 +415,86 @@ public class ActionTrackingServiceImplTest extends TestCase
|
||||
assertEquals(null, d);
|
||||
}
|
||||
|
||||
/** Ensure that pending actions behave properly */
|
||||
public void testPendingActions() throws Exception
|
||||
{
|
||||
// New ones won't be in the cache
|
||||
Action action = createWorkingSleepAction("1234");
|
||||
assertEquals(ActionStatus.New, action.getExecutionStatus());
|
||||
|
||||
String key = ActionTrackingServiceImpl.generateCacheKey(action);
|
||||
assertEquals(null, executingActionsCache.get(key));
|
||||
|
||||
|
||||
// Ask for it to be pending, will go in
|
||||
actionTrackingService.recordActionPending(action);
|
||||
key = ActionTrackingServiceImpl.generateCacheKey(action);
|
||||
assertEquals(ActionStatus.Pending, action.getExecutionStatus());
|
||||
assertNotNull(null, executingActionsCache.get(key));
|
||||
|
||||
ExecutionSummary s = ActionTrackingServiceImpl.buildExecutionSummary(action);
|
||||
ExecutionDetails d = actionTrackingService.getExecutionDetails(s);
|
||||
assertNotNull(d.getExecutionSummary());
|
||||
assertEquals("sleep-action", d.getActionType());
|
||||
assertEquals("1234", d.getActionId());
|
||||
assertEquals(1, d.getExecutionInstance());
|
||||
assertEquals(null, d.getPersistedActionRef());
|
||||
assertNull(null, d.getStartedAt());
|
||||
|
||||
|
||||
// Run it, will stay
|
||||
actionTrackingService.recordActionExecuting(action);
|
||||
key = ActionTrackingServiceImpl.generateCacheKey(action);
|
||||
assertEquals(ActionStatus.Running, action.getExecutionStatus());
|
||||
assertNotNull(null, executingActionsCache.get(key));
|
||||
|
||||
s = ActionTrackingServiceImpl.buildExecutionSummary(action);
|
||||
d = actionTrackingService.getExecutionDetails(s);
|
||||
assertNotNull(d.getExecutionSummary());
|
||||
assertEquals("sleep-action", d.getActionType());
|
||||
assertEquals("1234", d.getActionId());
|
||||
assertEquals(1, d.getExecutionInstance());
|
||||
assertEquals(null, d.getPersistedActionRef());
|
||||
assertNotNull(d.getStartedAt());
|
||||
|
||||
|
||||
// Finish, goes
|
||||
actionTrackingService.recordActionComplete(action);
|
||||
key = ActionTrackingServiceImpl.generateCacheKey(action);
|
||||
assertEquals(ActionStatus.Completed, action.getExecutionStatus());
|
||||
assertEquals(null, executingActionsCache.get(key));
|
||||
|
||||
|
||||
// Put another pending one in
|
||||
action = createWorkingSleepAction("1234");
|
||||
actionTrackingService.recordActionPending(action);
|
||||
key = ActionTrackingServiceImpl.generateCacheKey(action);
|
||||
assertEquals(ActionStatus.Pending, action.getExecutionStatus());
|
||||
assertNotNull(null, executingActionsCache.get(key));
|
||||
|
||||
|
||||
// Remove it by hand
|
||||
executingActionsCache.remove(key);
|
||||
assertNull(null, executingActionsCache.get(key));
|
||||
int instanceId = ((ActionImpl)action).getExecutionInstance();
|
||||
|
||||
|
||||
// Run it, will go back in again, ID unchanged
|
||||
actionTrackingService.recordActionExecuting(action);
|
||||
assertEquals(key, ActionTrackingServiceImpl.generateCacheKey(action));
|
||||
assertEquals(instanceId, ((ActionImpl)action).getExecutionInstance());
|
||||
|
||||
assertEquals(ActionStatus.Running, action.getExecutionStatus());
|
||||
assertNotNull(null, executingActionsCache.get(key));
|
||||
|
||||
|
||||
// Finish, will go again
|
||||
actionTrackingService.recordActionComplete(action);
|
||||
key = ActionTrackingServiceImpl.generateCacheKey(action);
|
||||
assertEquals(ActionStatus.Completed, action.getExecutionStatus());
|
||||
assertEquals(null, executingActionsCache.get(key));
|
||||
}
|
||||
|
||||
/** Ensure that the listing functions work */
|
||||
public void testListings() throws Exception
|
||||
{
|
||||
|
@@ -98,6 +98,10 @@ public class ExecutionDetails implements Serializable {
|
||||
return runningOn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns when this action started executing, or
|
||||
* null if it is still pending
|
||||
*/
|
||||
public Date getStartedAt() {
|
||||
return startedAt;
|
||||
}
|
||||
|
Reference in New Issue
Block a user