mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
ALF-4476 - Make transfer definitions for replication execution read only (locked)
Also improve the testing of cancelling running replication jobs, and the debug output of the action tracking service git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@22055 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -187,6 +187,7 @@ public class ActionImpl extends ParameterizedItemImpl implements Action
|
|||||||
if (action instanceof ActionImpl)
|
if (action instanceof ActionImpl)
|
||||||
{
|
{
|
||||||
ActionImpl actionImpl = (ActionImpl) action;
|
ActionImpl actionImpl = (ActionImpl) action;
|
||||||
|
this.executionInstance = actionImpl.getExecutionInstance();
|
||||||
this.runAsUserName = actionImpl.getRunAsUser();
|
this.runAsUserName = actionImpl.getRunAsUser();
|
||||||
this.actionChain = actionImpl.actionChain;
|
this.actionChain = actionImpl.actionChain;
|
||||||
}
|
}
|
||||||
|
@@ -154,7 +154,7 @@ public class ActionTrackingServiceImpl implements ActionTrackingService
|
|||||||
{
|
{
|
||||||
if (logger.isDebugEnabled() == true)
|
if (logger.isDebugEnabled() == true)
|
||||||
{
|
{
|
||||||
logger.debug("Action " + action + " has begun exection");
|
logger.debug("Action " + action + " with provisional key " + generateCacheKey(action) + " has begun exection");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Grab what status it was before
|
// Grab what status it was before
|
||||||
@@ -179,7 +179,8 @@ public class ActionTrackingServiceImpl implements ActionTrackingService
|
|||||||
if(details == null) {
|
if(details == null) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
"Went to mark the start of execution of " +
|
"Went to mark the start of execution of " +
|
||||||
action + " but it wasn't in the running actions cache! " +
|
action + " with key " + key +
|
||||||
|
" but it wasn't in the running actions cache! " +
|
||||||
"Your running actions cache is probably too small"
|
"Your running actions cache is probably too small"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -227,6 +228,11 @@ public class ActionTrackingServiceImpl implements ActionTrackingService
|
|||||||
// Put it into the cache
|
// Put it into the cache
|
||||||
ExecutionDetails details = buildExecutionDetails(action);
|
ExecutionDetails details = buildExecutionDetails(action);
|
||||||
executingActionsCache.put(key, details);
|
executingActionsCache.put(key, details);
|
||||||
|
|
||||||
|
if (logger.isDebugEnabled() == true)
|
||||||
|
{
|
||||||
|
logger.debug("Action " + action + " with key " + key + " placed into execution cache");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -332,10 +338,15 @@ public class ActionTrackingServiceImpl implements ActionTrackingService
|
|||||||
String key = generateCacheKey(action);
|
String key = generateCacheKey(action);
|
||||||
ExecutionDetails details = getExecutionDetails(buildExecutionSummary(key));
|
ExecutionDetails details = getExecutionDetails(buildExecutionSummary(key));
|
||||||
if(details == null) {
|
if(details == null) {
|
||||||
|
Exception e = new Exception("Cancellation status missing from cache");
|
||||||
|
e.fillInStackTrace();
|
||||||
|
|
||||||
logger.warn(
|
logger.warn(
|
||||||
"Unable to check cancellation status for running action " +
|
"Unable to check cancellation status for running action " +
|
||||||
action + " as it wasn't in the running actions cache! " +
|
action + " with execution key " + key +
|
||||||
"Your running actions cache is probably too small"
|
" as it wasn't in the running actions cache! " +
|
||||||
|
"Your running actions cache is probably too small",
|
||||||
|
e
|
||||||
);
|
);
|
||||||
|
|
||||||
// Re-generate
|
// Re-generate
|
||||||
|
@@ -162,17 +162,21 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
|||||||
new TransferDefinition();
|
new TransferDefinition();
|
||||||
transferDefinition.setNodes(toTransfer);
|
transferDefinition.setNodes(toTransfer);
|
||||||
transferDefinition.setSync(true);
|
transferDefinition.setSync(true);
|
||||||
// transferDefinition.setReadOnly(true); // TODO Make read only, but then need to fix tests
|
transferDefinition.setReadOnly(true);
|
||||||
|
|
||||||
return transferDefinition;
|
return transferDefinition;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void executeImpl(Action action, NodeRef actionedUponNodeRef) {
|
protected void executeImpl(Action action, NodeRef actionedUponNodeRef) {
|
||||||
// Specialise the action if needed, eg when loaded directly from
|
if(action instanceof ReplicationDefinition)
|
||||||
// the NodeRef without going via the replication service
|
|
||||||
if(action.getActionDefinitionName().equals(ReplicationDefinitionImpl.EXECUTOR_NAME))
|
|
||||||
{
|
{
|
||||||
|
// Already of the correct type
|
||||||
|
}
|
||||||
|
else if(action.getActionDefinitionName().equals(ReplicationDefinitionImpl.EXECUTOR_NAME))
|
||||||
|
{
|
||||||
|
// Specialise the action if needed, eg when loaded directly from
|
||||||
|
// the NodeRef without going via the replication service
|
||||||
action = new ReplicationDefinitionImpl(action);
|
action = new ReplicationDefinitionImpl(action);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -294,6 +298,7 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
|||||||
if(transferId != null)
|
if(transferId != null)
|
||||||
{
|
{
|
||||||
transferService.cancelAsync(transferId);
|
transferService.cancelAsync(transferId);
|
||||||
|
logger.debug("Replication cancel was requested for " + replicationDef.getReplicationQName());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -331,6 +336,12 @@ public class ReplicationActionExecutor extends ActionExecuterAbstractBase {
|
|||||||
6 // 6 times = wait up to 30 seconds
|
6 // 6 times = wait up to 30 seconds
|
||||||
);
|
);
|
||||||
} catch(LockAcquisitionException e) {
|
} catch(LockAcquisitionException e) {
|
||||||
|
logger.debug(
|
||||||
|
"Unable to get the replication job lock on " +
|
||||||
|
replicationDef.getReplicationQName() +
|
||||||
|
", retrying every " + (int)(retryTime/1000) + " seconds"
|
||||||
|
);
|
||||||
|
|
||||||
// Long try - every 30 seconds
|
// Long try - every 30 seconds
|
||||||
lockToken = jobLockService.getLock(
|
lockToken = jobLockService.getLock(
|
||||||
replicationDef.getReplicationQName(),
|
replicationDef.getReplicationQName(),
|
||||||
|
@@ -181,7 +181,11 @@ public class ReplicationServiceIntegrationTest extends TestCase
|
|||||||
if(folder2 != null) {
|
if(folder2 != null) {
|
||||||
nodeService.deleteNode(folder2);
|
nodeService.deleteNode(folder2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Zap the destination folder, which may well contain
|
||||||
|
// entries transfered over which are locked
|
||||||
if(destinationFolder != null) {
|
if(destinationFolder != null) {
|
||||||
|
lockService.unlock(destinationFolder, true);
|
||||||
nodeService.deleteNode(destinationFolder);
|
nodeService.deleteNode(destinationFolder);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -598,6 +602,10 @@ public class ReplicationServiceIntegrationTest extends TestCase
|
|||||||
* Check that cancelling works.
|
* Check that cancelling works.
|
||||||
* Does this by taking a lock on the job, cancelling,
|
* Does this by taking a lock on the job, cancelling,
|
||||||
* releasing and seeing it abort.
|
* releasing and seeing it abort.
|
||||||
|
*
|
||||||
|
* Tests that when we ask for a replication task to be cancelled,
|
||||||
|
* that it starts, cancels, and the status is correctly recorded
|
||||||
|
* for it.
|
||||||
*/
|
*/
|
||||||
public void testReplicationExectionCancelling() throws Exception
|
public void testReplicationExectionCancelling() throws Exception
|
||||||
{
|
{
|
||||||
@@ -623,16 +631,32 @@ public class ReplicationServiceIntegrationTest extends TestCase
|
|||||||
txn.begin();
|
txn.begin();
|
||||||
actionService.executeAction(rd, replicationRoot, false, true);
|
actionService.executeAction(rd, replicationRoot, false, true);
|
||||||
assertEquals(ActionStatus.Pending, rd.getExecutionStatus());
|
assertEquals(ActionStatus.Pending, rd.getExecutionStatus());
|
||||||
txn.commit();
|
|
||||||
|
|
||||||
|
assertEquals(false, actionTrackingService.isCancellationRequested(rd));
|
||||||
|
actionTrackingService.requestActionCancellation(rd);
|
||||||
|
assertEquals(true, actionTrackingService.isCancellationRequested(rd));
|
||||||
|
|
||||||
|
txn.commit();
|
||||||
|
|
||||||
// Let it get going, will be waiting for the lock
|
// Let it get going, will be waiting for the lock
|
||||||
// having registered with the action tracking service
|
// having registered with the action tracking service
|
||||||
Thread.sleep(500);
|
for(int i=0; i<100; i++) {
|
||||||
assertEquals(ActionStatus.Running, rd.getExecutionStatus());
|
// Keep asking for it to be cancelled ASAP
|
||||||
|
actionTrackingService.requestActionCancellation(rd);
|
||||||
|
|
||||||
// Now request the cancel
|
if(rd.getExecutionStatus().equals(ActionStatus.Running)) {
|
||||||
actionTrackingService.requestActionCancellation(rd);
|
// Good, has started up
|
||||||
|
// Stop waiting and do the cancel
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// Still pending, wait a bit more
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure it started, and should shortly stop
|
||||||
|
assertEquals(ActionStatus.Running, rd.getExecutionStatus());
|
||||||
|
assertEquals(true, actionTrackingService.isCancellationRequested(rd));
|
||||||
|
|
||||||
// Release our lock, should allow the replication task
|
// Release our lock, should allow the replication task
|
||||||
// to get going and spot the cancel
|
// to get going and spot the cancel
|
||||||
@@ -856,7 +880,7 @@ public class ReplicationServiceIntegrationTest extends TestCase
|
|||||||
|
|
||||||
TransferDefinition td = replicationActionExecutor.buildTransferDefinition(rd, nodes);
|
TransferDefinition td = replicationActionExecutor.buildTransferDefinition(rd, nodes);
|
||||||
assertEquals(true, td.isSync());
|
assertEquals(true, td.isSync());
|
||||||
// assertEquals(true, td.isReadOnly());// TODO Make read only, and fix tests
|
assertEquals(true, td.isReadOnly());
|
||||||
assertEquals(2, td.getNodes().size());
|
assertEquals(2, td.getNodes().size());
|
||||||
assertEquals(true, td.getNodes().contains(folder1));
|
assertEquals(true, td.getNodes().contains(folder1));
|
||||||
assertEquals(true, td.getNodes().contains(content1_1));
|
assertEquals(true, td.getNodes().contains(content1_1));
|
||||||
@@ -1012,11 +1036,6 @@ public class ReplicationServiceIntegrationTest extends TestCase
|
|||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCancellation() throws Exception
|
|
||||||
{
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private NodeRef makeNode(NodeRef parent, QName nodeType)
|
private NodeRef makeNode(NodeRef parent, QName nodeType)
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user