mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
Transfer Service:
- Added a few transfer-specific policies that are raised on the target side at the beginning and end of a transfer. git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@22427 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -26,16 +26,14 @@ import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Serializable;
|
||||
import java.io.Writer;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Vector;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import javax.xml.parsers.SAXParser;
|
||||
@@ -46,20 +44,19 @@ import org.alfresco.repo.copy.CopyBehaviourCallback;
|
||||
import org.alfresco.repo.copy.CopyDetails;
|
||||
import org.alfresco.repo.copy.CopyServicePolicies;
|
||||
import org.alfresco.repo.copy.DefaultCopyBehaviourCallback;
|
||||
import org.alfresco.repo.copy.CopyServicePolicies.BeforeCopyPolicy;
|
||||
import org.alfresco.repo.node.NodeServicePolicies;
|
||||
import org.alfresco.repo.node.NodeServicePolicies.OnMoveNodePolicy;
|
||||
import org.alfresco.repo.policy.BehaviourFilter;
|
||||
import org.alfresco.repo.policy.ClassPolicyDelegate;
|
||||
import org.alfresco.repo.policy.JavaBehaviour;
|
||||
import org.alfresco.repo.policy.PolicyComponent;
|
||||
import org.alfresco.repo.policy.Behaviour.NotificationFrequency;
|
||||
import org.alfresco.repo.rule.RuleModel;
|
||||
import org.alfresco.repo.security.authentication.AuthenticationUtil;
|
||||
import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork;
|
||||
import org.alfresco.repo.tenant.TenantService;
|
||||
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
|
||||
import org.alfresco.repo.transaction.RetryingTransactionHelper;
|
||||
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
|
||||
import org.alfresco.repo.transfer.ChangeCapturingProgressMonitor.TransferChangesRecord;
|
||||
import org.alfresco.repo.transfer.manifest.TransferManifestProcessor;
|
||||
import org.alfresco.repo.transfer.manifest.XMLTransferManifestReader;
|
||||
import org.alfresco.repo.transfer.requisite.XMLTransferRequsiteWriter;
|
||||
@@ -76,7 +73,11 @@ import org.alfresco.service.cmr.search.SearchService;
|
||||
import org.alfresco.service.cmr.transfer.TransferException;
|
||||
import org.alfresco.service.cmr.transfer.TransferProgress;
|
||||
import org.alfresco.service.cmr.transfer.TransferReceiver;
|
||||
import org.alfresco.service.cmr.transfer.TransferServicePolicies;
|
||||
import org.alfresco.service.cmr.transfer.TransferProgress.Status;
|
||||
import org.alfresco.service.cmr.transfer.TransferServicePolicies.BeforeStartInboundTransferPolicy;
|
||||
import org.alfresco.service.cmr.transfer.TransferServicePolicies.OnEndInboundTransferPolicy;
|
||||
import org.alfresco.service.cmr.transfer.TransferServicePolicies.OnStartInboundTransferPolicy;
|
||||
import org.alfresco.service.descriptor.DescriptorService;
|
||||
import org.alfresco.service.namespace.NamespaceService;
|
||||
import org.alfresco.service.namespace.QName;
|
||||
@@ -166,7 +167,7 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
|
||||
private String transferTempFolderPath;
|
||||
private ManifestProcessorFactory manifestProcessorFactory;
|
||||
private BehaviourFilter behaviourFilter;
|
||||
private TransferProgressMonitor progressMonitor;
|
||||
private ChangeCapturingProgressMonitor progressMonitor;
|
||||
private ActionService actionService;
|
||||
private TenantService tenantService;
|
||||
private RuleService ruleService;
|
||||
@@ -180,6 +181,10 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
|
||||
private Map<String,NodeRef> transferTempFolderMap = new ConcurrentHashMap<String, NodeRef>();
|
||||
private Map<String,NodeRef> inboundTransferRecordsFolderMap = new ConcurrentHashMap<String, NodeRef>();
|
||||
|
||||
private ClassPolicyDelegate<BeforeStartInboundTransferPolicy> beforeStartInboundTransferDelegate;
|
||||
private ClassPolicyDelegate<OnStartInboundTransferPolicy> onStartInboundTransferDelegate;
|
||||
private ClassPolicyDelegate<OnEndInboundTransferPolicy> onEndInboundTransferDelegate;
|
||||
|
||||
public void init()
|
||||
{
|
||||
PropertyCheck.mandatory(this, "nodeService", nodeService);
|
||||
@@ -195,7 +200,11 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
|
||||
PropertyCheck.mandatory(this, "policyComponent", policyComponent);
|
||||
PropertyCheck.mandatory(this, "descriptorService", descriptorService);
|
||||
PropertyCheck.mandatory(this, "alienProcessor", alienProcessor);
|
||||
|
||||
|
||||
beforeStartInboundTransferDelegate = policyComponent.registerClassPolicy(TransferServicePolicies.BeforeStartInboundTransferPolicy.class);
|
||||
onStartInboundTransferDelegate = policyComponent.registerClassPolicy(TransferServicePolicies.OnStartInboundTransferPolicy.class);
|
||||
onEndInboundTransferDelegate = policyComponent.registerClassPolicy(TransferServicePolicies.OnEndInboundTransferPolicy.class);
|
||||
|
||||
/**
|
||||
* For every new child of a node with the trx:transferred aspect run this.onCreateChildAssociation
|
||||
*/
|
||||
@@ -363,22 +372,27 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
|
||||
public String start()
|
||||
{
|
||||
final NodeRef lockFolder = getLockFolder();
|
||||
NodeRef relatedTransferRecord = null;
|
||||
String transferId = null;
|
||||
|
||||
RetryingTransactionHelper txHelper = transactionService.getRetryingTransactionHelper();
|
||||
try
|
||||
{
|
||||
relatedTransferRecord = txHelper.doInTransaction(
|
||||
new RetryingTransactionHelper.RetryingTransactionCallback<NodeRef>()
|
||||
transferId = txHelper.doInTransaction(
|
||||
new RetryingTransactionHelper.RetryingTransactionCallback<String>()
|
||||
{
|
||||
public NodeRef execute() throws Throwable
|
||||
public String execute() throws Throwable
|
||||
{
|
||||
TransferServicePolicies.BeforeStartInboundTransferPolicy beforeStartPolicy =
|
||||
beforeStartInboundTransferDelegate.get(TransferModel.TYPE_TRANSFER_RECORD);
|
||||
beforeStartPolicy.beforeStartInboundTransfer();
|
||||
|
||||
final NodeRef relatedTransferRecord = createTransferRecord();
|
||||
getTempFolder(relatedTransferRecord.toString());
|
||||
String transferId = relatedTransferRecord.toString();
|
||||
getTempFolder(transferId);
|
||||
|
||||
Map<QName, Serializable> props = new HashMap<QName, Serializable>();
|
||||
props.put(ContentModel.PROP_NAME, LOCK_FILE_NAME);
|
||||
props.put(TransferModel.PROP_TRANSFER_ID, relatedTransferRecord.toString());
|
||||
props.put(TransferModel.PROP_TRANSFER_ID, transferId);
|
||||
|
||||
if (log.isInfoEnabled())
|
||||
{
|
||||
@@ -393,7 +407,12 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
|
||||
{
|
||||
log.info("Transfer lock created as node " + assoc.getChildRef());
|
||||
}
|
||||
return relatedTransferRecord;
|
||||
|
||||
TransferServicePolicies.OnStartInboundTransferPolicy onStartPolicy =
|
||||
onStartInboundTransferDelegate.get(TransferModel.TYPE_TRANSFER_RECORD);
|
||||
onStartPolicy.onStartInboundTransfer(transferId);
|
||||
|
||||
return transferId;
|
||||
}
|
||||
}, false, true);
|
||||
}
|
||||
@@ -403,7 +422,6 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
|
||||
// lock is already taken.
|
||||
throw new TransferException(MSG_TRANSFER_LOCK_UNAVAILABLE);
|
||||
}
|
||||
String transferId = relatedTransferRecord.toString();
|
||||
getStagingFolder(transferId);
|
||||
return transferId;
|
||||
}
|
||||
@@ -448,6 +466,7 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
|
||||
ChildAssociationRef assoc = nodeService.createNode(inboundTransferRecordsFolder, ContentModel.ASSOC_CONTAINS,
|
||||
recordName, TransferModel.TYPE_TRANSFER_RECORD, props);
|
||||
log.debug("<-createTransferRecord: " + assoc.getChildRef());
|
||||
|
||||
return assoc.getChildRef();
|
||||
}
|
||||
|
||||
@@ -502,14 +521,39 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.warn("Failed to delete temp store node for transfer id " + transferId + "\nTemp store noderef = " + tempStoreNode);
|
||||
log.warn("Failed to delete temp store node for transfer id " + transferId +
|
||||
"\nTemp store noderef = " + tempStoreNode);
|
||||
}
|
||||
|
||||
log.debug("delete staging folder " + transferId);
|
||||
// Delete the staging folder.
|
||||
File stagingFolder = getStagingFolder(transferId);
|
||||
deleteFile(stagingFolder);
|
||||
log.debug("Staging folder deleted");
|
||||
File stagingFolder = null;
|
||||
try
|
||||
{
|
||||
log.debug("delete staging folder " + transferId);
|
||||
// Delete the staging folder.
|
||||
stagingFolder = getStagingFolder(transferId);
|
||||
deleteFile(stagingFolder);
|
||||
log.debug("Staging folder deleted");
|
||||
}
|
||||
catch(Exception ex)
|
||||
{
|
||||
log.warn("Failed to delete staging folder for transfer id " + transferId +
|
||||
"\nStaging folder = " + stagingFolder.toString());
|
||||
}
|
||||
|
||||
//Fire the OnEndInboundTransfer policy
|
||||
Set<NodeRef> createdNodes = Collections.emptySet();
|
||||
Set<NodeRef> updatedNodes = Collections.emptySet();
|
||||
Set<NodeRef> deletedNodes = Collections.emptySet();
|
||||
TransferChangesRecord changesRecord = progressMonitor.removeChangeRecord(transferId);
|
||||
if (changesRecord != null)
|
||||
{
|
||||
createdNodes = new HashSet<NodeRef>(changesRecord.getCreatedNodes());
|
||||
updatedNodes = new HashSet<NodeRef>(changesRecord.getUpdatedNodes());
|
||||
deletedNodes = new HashSet<NodeRef>(changesRecord.getDeletedNodes());
|
||||
}
|
||||
TransferServicePolicies.OnEndInboundTransferPolicy onEndPolicy =
|
||||
onEndInboundTransferDelegate.get(TransferModel.TYPE_TRANSFER_RECORD);
|
||||
onEndPolicy.onEndInboundTransfer(transferId, createdNodes, updatedNodes, deletedNodes);
|
||||
}
|
||||
catch (TransferException ex)
|
||||
{
|
||||
@@ -913,7 +957,7 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
|
||||
*/
|
||||
public void setProgressMonitor(TransferProgressMonitor progressMonitor)
|
||||
{
|
||||
this.progressMonitor = progressMonitor;
|
||||
this.progressMonitor = new ChangeCapturingProgressMonitor(progressMonitor);
|
||||
}
|
||||
|
||||
public void setActionService(ActionService actionService)
|
||||
@@ -1044,7 +1088,6 @@ public class RepoTransferReceiverImpl implements TransferReceiver,
|
||||
public void onRestoreNode(ChildAssociationRef childAssocRef)
|
||||
{
|
||||
log.debug("on restore node");
|
||||
final String localRepositoryId = descriptorService.getCurrentRepositoryDescriptor().getId();
|
||||
log.debug("restoredAssocRef:" + childAssocRef);
|
||||
alienProcessor.afterMoveAlien(childAssocRef);
|
||||
}
|
||||
|
Reference in New Issue
Block a user