mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-06-30 18:15:39 +00:00
40713: Merged BRANCHES/DEV/BELARUS/V4.1-BUG-FIX-2012_08_15 to BRANCHES/DEV/V4.1-BUG-FIX: 40604: ALF-15274 "I'm following" filter of "My Activities" dashlet doesn't work (PostgreSQL) 40727: Fix for ALF-15469 from Alex Malinovsky - Cannot Edit Online files with special characters in names 40733: Merged DEV (ALF-12358 and ALF-14496) to V4.1-BUG-FIX 38973: DEV for ALF-12358 (upgrades and build plans) 38975: Part of refactoring around the handling of deleted nodes. - Deleted nodes are now treated as real nodes by the NodeDAO 38977: Fixed up queries related to bulk-loading 38978: Fixed up Alfresco side of SOLR tracking APIs to handle removal of alf_node.node_deleted (ALF-12358) 38979: Removed potential contention on cm:name during random file creation 38980: Initial setup for patching of ALF-12358 38981: Merged DEV/BELARUS/ALF-12358-4 to DEV/DEREK/ALF-12358-4 36052: ALF-12358: Concurrency: Possible to create association references to deleted nodes .NodeDeleted. upgrade SQL patch for PostgreSQL is implemented: - SQL create scripts are updated to do not create .alf_node.deleted. column and its indexes; - schema references associated with .alf_node.deleted. column are updated; - Subscriptions DAO and schema reference are updated to use .sys:deleted. type instead of .alf_node.deleted. column; - .NodeStatus. receiving template was modified to receive .typeQNameId. for correct .deleted. state determination; - some other minor fixes 36287: ALF-12358: Concurrency: Possible to create association references to deleted nodes 'NodeDeleted' patch has been implemented for MySQL InnoDB, Alfresco Oracle 9, Alfresco SQL Server and PostgreSQL dialects. Not implemented for DB2 dialect! - DB creating scripts are modified in accordance with removal of 'alf_node.node_deleted' column and respective indexes; - iBATIS schema references are modified in accordance with removal of 'alf_node.node_deleted' column and respective indexes; - the code for handling subscriptions on deleted nodes removed; - subscriptions DAO schema reference is corrected respectively 37284: ALF-12358: Concurrency: Possible to create association references to deleted nodes 'NodeDeletd' updating patch for 4.0 version has been modified to recreate 'alf_node' table for all supported dialects. 'SubscriptionServiceImplTest' has been extended to test whether subscriptions are being removed if node is archived. The new test creates fake user node and applies a custom aspect with 'archive=true' attribute 37905: ALF-12358: Concurrency: Possible to create association references to deleted nodes Maintenance of the .idx_alf_node_txn_type. index has been added into the patch for all the dialects. SQL formatting has been changed to more compact and visually comfortable. Some minor changes for MySQL dialec Also: - Started move to 4.1 scripts - Fixed Schema reference files for alf_node column ordering 38982: ALF-12358: Concurrency: Possible to create association references to deleted nodes - Moving scripts to V4.1 code base - Fixed upgrade with sequences for introduction of 'deleted' qname 38983: Migration to 4.1 fix for ALF-12358 38995: Fix scripts for ALF-12358 - Fixed index removal for indexes that could not possibly have existed - Fixed ALF schema mappings to reflect new index names - Fixed PostgreSQL PK name check 39027: Added in missing index idx_alf_node_txn_type (ALF-12358) - Merge note: Removed redundant index alf_node.fk_alf_node_txn 39028: Fixed fallout from node deletion strategy (ALF-12358) 39222: Minor test enhancements for diagnostics 40738: ALF-11297: resurrect system-build-test (not plugged in yet, still a few failures) 40740: Follow-up for DB2 upgrade for ALF-12358: New alf_node table ID column autoincrement value set 40770: Merged DEV (ALF-12358) to V4.1-BUG-FIX 39223: Merged 3.4.6HF to DEV (ALF-12358) 39218: ALF-15109: Improved fix - must fire cascaded secondary association deletions at DbNodeServiceImpl level to ensure appropriate index events are fired and prevent out of sync indexes! 39259: Merged V3.4.6HF to DEV (ALF-12358) 39240: ALF-15109: Another attempt. Now we are firing all the right events on cascade removal of secondary associations a lot of things are coming out in the wash! - Cascade delete secondary associations in a first recursive pass - Use a List of Pairs rather than a Map to avoid missing multiple associations to the same child 39271: Added policy invocations for peer association removal when an aspect is removed 39401: Utility class to walk a node hierarchy (primary) and gather all association data - Data gathered has to include secondary association pointing out of the hierarchy - Source and target associations gathered as well - TODO: Bulk queries for above 39402: Follow up to ALF-15109: Break node deletion by removing deleteAssocsToAndFrom - TODO: Use NodeHierarchyWalker to gather data, fire policies and execute deletes 39456: NodeHierarchyWalker: tests and fixes 39457: ALF-12358: Remove in-txn manual recording of deleted and new nodes 39917: ALF-12358: Use NodeHierarchyWalker to pick up all associations for a node hierarchy, fire policies and perform deletes - NOTE: Currently, in-process links back to the hierarchy prevent certain tests from passing. - TODO: Add detection for nodes that are about to be deleted 40000: ALF-12358: Added support for 'beforeDeleteNodeAssociationPolicy' 40001: ALF-12358: A node hierarchy walker to predetermine data required for deleting a hierarchy 40002: ALF-12358: Spoof aspect 'sys:pendingDelete' that appears on all nodes within hierarchies being deleted 40003: ALF-12358: Changes to prevent hierarchy modification during delete - The entire hierarchy is read before actual delete starts - All policies (including previously-missing assoc policies) are triggered from the recorded information - Nodes in the delete record cannot have new associations added / removed - All deletes are done on the same information so any underlying data shift causes concurrency violations - Archival: - Archival is a full copy of the hierarchy but all outbound and secondary associations are removed - Archival is followed by a full delete of the original hierarchy 40128: ALF-12358: Test for linking to deleted nodes now fail even after having tested the recovery code - Recovery code shows this when activated: ...ERROR [...NodeDAOImpl] ALF-13066: Orphan child node has been re-homed under lost_found: (49179, ...) 40129: ALF-12358: Added a more verbose message when association deletes don't find required rows 40130: ALF-12358: Avoid incidental removal of associations when removing aspects if the associations are already scheduled for deletion 40131: ALF-12358: Fix fallout for rules linking to avoid multiple deletions of the same association 40371: ALF-12358: Fire beforeDeleteNode even when archiving 40772: Merged DEV (ALF-12358) to V4.1-BUG-FIX 40372: ALF-12358: Fallout in ML code 40397: Fallout from ALF-12358: IMAP pre-commit handling must check for nodes having been deleted - Also fixed some TODOs and line endings for test 40403: PersonService: Reinstated new getPeopleFilteredByProperty method - Also fixed test to rollback transaction after forced catch of exception 40404: Fixed line endings, updated deprecated calls and removed unused code 40494: ALF-12358: Fixed missing before- and after-create policy calls for the archive store 40504: Fixed bug in rev 40494: ALF-12358: Fixed missing before- and after-create policy calls for the archive store - Used incorrect child node reference when calling policies and notifying indexer 40529: ALF-12358: Fixed in-txn holding of nodes pending delete to cater for deletes triggering more deletes 40530: Fallout from ALF-12358: Actions: Association act:scheduledAction multiplicity was not being enforced - act:actionSchedule nodes were not cleaned up when associated actions were deleted - Added onDeleteAssociation handling to clean up act:actionSchedule node - Fixed tests appropriately 40556: Fallout from ALF-12358: Split out negative tests for deleteSite, which were absorbing exceptions 40569: Tagging's beforeCommit behaviour was not checking for nodes having been deleted. - Added 'nodeServiceInternal' and used that to double-check that nodes still exist - Also removed heavily-used call to check if auditing is on 40618: ALF-12358 fallout: Fixed policy callback details for associations of archived nodes - Also some more details when throwing concurrency violation when deleting associations 40673: Fixed fallout from ALF-12358: Multilingual behaviours fixed - Listen to the association being removed from the ML container to the translation - Keep track of containers that must be deleted before committing rather than attempting to delete them immediately; this avoids attempts to delete associations that are about to be deleted (and the thing that ALF-12358 actually fixes). 40680: Follow-up to rev 40673 (ALF-12358): Forgot to remove commented-out code 40781: ALF-15587: Merged PATCHES/V4.0.2 to V4.1-BUG-FIX 40780: Merged DEV to PATCHES/V4.0.2 40777: ALF-15385 : Unable to set bpm:assingee and other properties in Activiti task via JS Added the initialization of runtimeService property. 40787: Merge V4.1 (4.1) to V4.1-BUG-FIX (4.1.1) 40782: Fix ALF-15420: Move: child files/subfolders aren't synced after moving from parent folder and updating in Alfresco on-premise/Cloud - Corrected the handling of moving a sub-folder out of its synced parent 40718: Fixes: ALF-15498: Creates new nodeLock indicator (overrides locked) to cope with differences between content models when a node is locked directly using a nodeLock and when a node is locked due to it being a working copy. 40790: Merged V3.4-BUG-FIX to V4.1-BUG-FIX 40789: ALF-15598: Merged PATCHES/V3.4.9 to V3.4-BUG-FIX 40671: Merged DEV to V3.4.9 (3.4.9.6) 40658: ALF-15505: Build-up of lucene folder segments following CMIS queries (un-closed ResultSet objects?) - Close unclosed ResultSet. - Remove kind="org.alfresco.cmiskind" parameter from query.get and queries.post webscripts and now they use CMISQueryWebScript as implementation. 40795: Fixed txn handling in the event of cleanup failure of test 40797: Fix for ALF-15602 - XSS issue in OpenSearch Explorer webscript - unescaped search terms displayed in page 40810: ALF-12358: Possible build fix (Derek, Neil and Jan to review) - Reinstate invokeBeforeMoveNode which seems to have disappeared in the refactor - Due to extra cascaded calls to onDeleteAssociation, SyncChangeMonitor must ignore certain events 40827: ALF-12358: Possible build fix (Derek, Neil and Jan to review) - SyncChangeMonitor must ignore onDeleteAssociation calls on both sides of the association when a node is deleted 40843: Stop deploying XAM connector to maven repo, it's gone 40845: ALF-15406 Index Tracker seems not to gracefully stop upon shutdown keeping all other threads in waiting - Don't allow Quartz scheduler jobs for the OOoDirect subsystem, checking the connection to backup in a queue. Avoids multiple timeouts. Should just get one now. There is no need for multiple threads to be running anyway. - ALF-610 changes should stop the OOoDirect subsystem from running any of these Quartz jobs in the first place when using the default configuration in an enterprise release. So no timeout? 40848: Merged BRANCHES/DEV/V3.4-BUG-FIX to BRANCHES/DEV/V4.1-BUG-FIX 40847: Fix for ALF-15189 - Incorrect trimming of the date on the Advanced search by date range 40887: ALF-15596: Deadlocks in DescriptorServiceImpl / LicenseComponent threads - Problem discovered by Gab - The two classes are mutually dependent and can end up deadlocking - Removed excessive synchronization from DescriptorServiceImpl - Now two key synchronization points - bootstrap and currentRepoDescriptor updates - Bootstrap synchronization controlled outside this class - no need to defend against it other than throwing IllegalStateException if accessed before bootstrapped - currentRepoDescriptorLock added to regulate currentRepoDescriptor accesses / updates - Uncovered problem in bootstrapping order - descriptorComponent must be bootstrapped before multiTenantBootstrap 40889: ALF-15691: Poor cluster performance in user dashboard due to unnecessary cache replication 40899: ALF-15691: Corrected duplicate property 40900: ALF-12358 / ALF-15688: Finish the job! Make SOLR tracking work again and process deletes (Derek, Andy please review) - select_Txn_Nodes now uses a COALESCE query to substitute the original node ID when recorded in the PROP_ORIGINAL_ID property - NodesGet webscript extended so that it detects deleted nodes in the new way and also includes the noderef - CoreTracker avoids trying to retrieve the metadata of deleted nodes (possible because of NodesGet noderef extension) - SOLRTrackingComponentImpl doesn't barf when getNodesMetadata called for a cascade deleted node by CoreTracker.updateDescendantAuxDocs() 40902: ALF-12358 / ALF-15688: Fixed unit test - Don't expect meta data for deleted nodes anymore (as this is generated on client side) - Also removed stray line of code from CoreTracker 40917: ALF-13750: Merged V3.4-BUG-FIX to V4.1-BUG-FIX 40915: ALF-15708: Trailing whitespace should be trimmed from properties. - Implemented custom properties persister to trim trailing whitespace from properties. 40925: RUSSIAN: Translation updates based on EN r40357 git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@40935 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
1674 lines
59 KiB
Java
1674 lines
59 KiB
Java
/*
|
|
* Copyright (C) 2009-2010 Alfresco Software Limited.
|
|
*
|
|
* This file is part of Alfresco
|
|
*
|
|
* Alfresco is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Lesser General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* Alfresco is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public License
|
|
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
package org.alfresco.repo.transfer;
|
|
|
|
import java.io.BufferedOutputStream;
|
|
import java.io.File;
|
|
import java.io.FileOutputStream;
|
|
import java.io.InputStream;
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStreamWriter;
|
|
import java.io.Serializable;
|
|
import java.text.SimpleDateFormat;
|
|
import java.util.Collections;
|
|
import java.util.Date;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import javax.xml.parsers.SAXParser;
|
|
import javax.xml.parsers.SAXParserFactory;
|
|
|
|
import org.alfresco.model.ContentModel;
|
|
import org.alfresco.repo.cache.SimpleCache;
|
|
import org.alfresco.repo.content.ContentServicePolicies;
|
|
import org.alfresco.repo.copy.CopyBehaviourCallback;
|
|
import org.alfresco.repo.copy.CopyDetails;
|
|
import org.alfresco.repo.copy.CopyServicePolicies;
|
|
import org.alfresco.repo.copy.DefaultCopyBehaviourCallback;
|
|
import org.alfresco.repo.lock.JobLockService;
|
|
import org.alfresco.repo.lock.LockAcquisitionException;
|
|
import org.alfresco.repo.node.NodeServicePolicies;
|
|
import org.alfresco.repo.policy.Behaviour.NotificationFrequency;
|
|
import org.alfresco.repo.policy.BehaviourFilter;
|
|
import org.alfresco.repo.policy.ClassPolicyDelegate;
|
|
import org.alfresco.repo.policy.JavaBehaviour;
|
|
import org.alfresco.repo.policy.PolicyComponent;
|
|
import org.alfresco.repo.security.authentication.AuthenticationUtil;
|
|
import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork;
|
|
import org.alfresco.repo.tenant.TenantService;
|
|
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
|
|
import org.alfresco.repo.transaction.RetryingTransactionHelper;
|
|
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
|
|
import org.alfresco.repo.transfer.ChangeCapturingProgressMonitor.TransferChangesRecord;
|
|
import org.alfresco.repo.transfer.manifest.TransferManifestProcessor;
|
|
import org.alfresco.repo.transfer.manifest.XMLTransferManifestReader;
|
|
import org.alfresco.repo.transfer.requisite.XMLTransferRequsiteWriter;
|
|
import org.alfresco.service.cmr.action.Action;
|
|
import org.alfresco.service.cmr.action.ActionService;
|
|
import org.alfresco.service.cmr.repository.ChildAssociationRef;
|
|
import org.alfresco.service.cmr.repository.NodeRef;
|
|
import org.alfresco.service.cmr.repository.NodeService;
|
|
import org.alfresco.service.cmr.repository.StoreRef;
|
|
import org.alfresco.service.cmr.rule.RuleService;
|
|
import org.alfresco.service.cmr.search.ResultSet;
|
|
import org.alfresco.service.cmr.search.SearchService;
|
|
import org.alfresco.service.cmr.transfer.TransferException;
|
|
import org.alfresco.service.cmr.transfer.TransferProgress;
|
|
import org.alfresco.service.cmr.transfer.TransferProgress.Status;
|
|
import org.alfresco.service.cmr.transfer.TransferReceiver;
|
|
import org.alfresco.service.cmr.transfer.TransferServicePolicies;
|
|
import org.alfresco.service.cmr.transfer.TransferServicePolicies.BeforeStartInboundTransferPolicy;
|
|
import org.alfresco.service.cmr.transfer.TransferServicePolicies.OnEndInboundTransferPolicy;
|
|
import org.alfresco.service.cmr.transfer.TransferServicePolicies.OnStartInboundTransferPolicy;
|
|
import org.alfresco.service.cmr.transfer.TransferVersion;
|
|
import org.alfresco.service.descriptor.Descriptor;
|
|
import org.alfresco.service.descriptor.DescriptorService;
|
|
import org.alfresco.service.namespace.NamespaceService;
|
|
import org.alfresco.service.namespace.QName;
|
|
import org.alfresco.service.namespace.RegexQNamePattern;
|
|
import org.alfresco.service.transaction.TransactionService;
|
|
import org.alfresco.util.PropertyCheck;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.springframework.util.FileCopyUtils;
|
|
|
|
/**
|
|
* The Repo Transfer Receiver is the "Back-End" for transfer subsystem.
|
|
* <p>
|
|
* Provides the implementation of the transfer commands on the destination repository.
|
|
* <p>
|
|
* Provides callback handlers for Aliens and Transferred Aspects.
|
|
* <p>
|
|
* Calls transfer policies.
|
|
* <p>
|
|
* Co-ordinates locking and logging as the transfer progresses.
|
|
*
|
|
* @author brian
|
|
*/
|
|
public class RepoTransferReceiverImpl implements TransferReceiver,
|
|
NodeServicePolicies.OnCreateChildAssociationPolicy,
|
|
NodeServicePolicies.BeforeDeleteNodePolicy,
|
|
NodeServicePolicies.OnRestoreNodePolicy,
|
|
NodeServicePolicies.OnMoveNodePolicy,
|
|
ContentServicePolicies.OnContentUpdatePolicy
|
|
{
|
|
/**
|
|
* This embedded class is used to push requests for asynchronous commits onto a different thread
|
|
*
|
|
* @author Brian
|
|
*
|
|
*/
|
|
public class AsyncCommitCommand implements Runnable
|
|
{
|
|
|
|
private String transferId;
|
|
private String runAsUser;
|
|
|
|
public AsyncCommitCommand(String transferId)
|
|
{
|
|
this.transferId = transferId;
|
|
this.runAsUser = AuthenticationUtil.getFullyAuthenticatedUser();
|
|
}
|
|
|
|
public void run()
|
|
{
|
|
RunAsWork<Object> actionRunAs = new RunAsWork<Object>()
|
|
{
|
|
public Object doWork() throws Exception
|
|
{
|
|
return transactionService.getRetryingTransactionHelper().doInTransaction(
|
|
new RetryingTransactionCallback<Object>()
|
|
{
|
|
public Object execute()
|
|
{
|
|
commit(transferId);
|
|
return null;
|
|
}
|
|
}, false, true);
|
|
}
|
|
};
|
|
AuthenticationUtil.runAs(actionRunAs, runAsUser);
|
|
}
|
|
|
|
}
|
|
|
|
private final static Log log = LogFactory.getLog(RepoTransferReceiverImpl.class);
|
|
|
|
private static final String MSG_FAILED_TO_CREATE_STAGING_FOLDER = "transfer_service.receiver.failed_to_create_staging_folder";
|
|
private static final String MSG_ERROR_WHILE_STARTING = "transfer_service.receiver.error_start";
|
|
private static final String MSG_TRANSFER_TEMP_FOLDER_NOT_FOUND = "transfer_service.receiver.temp_folder_not_found";
|
|
private static final String MSG_TRANSFER_LOCK_UNAVAILABLE = "transfer_service.receiver.lock_unavailable";
|
|
private static final String MSG_INBOUND_TRANSFER_FOLDER_NOT_FOUND = "transfer_service.receiver.record_folder_not_found";
|
|
|
|
private static final String MSG_ERROR_WHILE_ENDING_TRANSFER = "transfer_service.receiver.error_ending_transfer";
|
|
private static final String MSG_ERROR_WHILE_STAGING_SNAPSHOT = "transfer_service.receiver.error_staging_snapshot";
|
|
private static final String MSG_ERROR_WHILE_STAGING_CONTENT = "transfer_service.receiver.error_staging_content";
|
|
private static final String MSG_NO_SNAPSHOT_RECEIVED = "transfer_service.receiver.no_snapshot_received";
|
|
private static final String MSG_ERROR_WHILE_COMMITTING_TRANSFER = "transfer_service.receiver.error_committing_transfer";
|
|
private static final String MSG_ERROR_WHILE_GENERATING_REQUISITE = "transfer_service.receiver.error_generating_requisite";
|
|
private static final String MSG_LOCK_TIMED_OUT = "transfer_service.receiver.lock_timed_out";
|
|
private static final String MSG_LOCK_NOT_FOUND = "transfer_service.receiver.lock_not_found";
|
|
private static final String MSG_TRANSFER_TO_SELF = "transfer_service.receiver.error.transfer_to_self";
|
|
private static final String MSG_INCOMPATIBLE_VERSIONS = "transfer_service.incompatible_versions";
|
|
|
|
private static final String SNAPSHOT_FILE_NAME = "snapshot.xml";
|
|
|
|
private NodeService nodeService;
|
|
private SearchService searchService;
|
|
private TransactionService transactionService;
|
|
private String transferLockFolderPath;
|
|
private String inboundTransferRecordsPath;
|
|
private String rootStagingDirectory;
|
|
private String transferTempFolderPath;
|
|
private ManifestProcessorFactory manifestProcessorFactory;
|
|
private BehaviourFilter behaviourFilter;
|
|
private ChangeCapturingProgressMonitor progressMonitor;
|
|
private ActionService actionService;
|
|
private TenantService tenantService;
|
|
private RuleService ruleService;
|
|
private PolicyComponent policyComponent;
|
|
private DescriptorService descriptorService;
|
|
private AlienProcessor alienProcessor;
|
|
private JobLockService jobLockService;
|
|
private TransferVersionChecker transferVersionChecker;
|
|
|
|
// note: cache is tenant-aware (if using EhCacheAdapter shared cache)
|
|
private SimpleCache<String, NodeRef> singletonCache; // eg. for transfer temp folder nodeRef
|
|
private final String KEY_TRANSFER_TEMP_NODEREF = "key.transferTempFolder.noderef"; // where temp files are stored
|
|
private final String KEY_INBOUND_TRANSFER_RECORDS_NODEREF = "key.inboundTransferRecordsFolder.noderef"; // where the destination side transfer report is generated
|
|
|
|
private ClassPolicyDelegate<BeforeStartInboundTransferPolicy> beforeStartInboundTransferDelegate;
|
|
private ClassPolicyDelegate<OnStartInboundTransferPolicy> onStartInboundTransferDelegate;
|
|
private ClassPolicyDelegate<OnEndInboundTransferPolicy> onEndInboundTransferDelegate;
|
|
|
|
/**
|
|
* Locks for the transfers in progress
|
|
* <p>
|
|
* TransferId, Lock
|
|
*/
|
|
private Map<String, Lock> locks = new ConcurrentHashMap<String, Lock>();
|
|
|
|
/**
|
|
* How many mS before refreshing the lock?
|
|
*/
|
|
private long lockRefreshTime = 60000;
|
|
|
|
/**
|
|
* How many times to retry to obtain the lock
|
|
*/
|
|
private int lockRetryCount = 2;
|
|
|
|
/**
|
|
* How long to wait between retries
|
|
*/
|
|
private long lockRetryWait = 100;
|
|
|
|
/**
|
|
* How long in mS to keep the lock before giving up and ending the transfer,
|
|
* possibly the client has terminated?
|
|
*/
|
|
private long lockTimeOut = 3600000;
|
|
|
|
public void init()
|
|
{
|
|
PropertyCheck.mandatory(this, "nodeService", nodeService);
|
|
PropertyCheck.mandatory(this, "searchService", searchService);
|
|
PropertyCheck.mandatory(this, "ruleService", ruleService);
|
|
PropertyCheck.mandatory(this, "actionService", actionService);
|
|
PropertyCheck.mandatory(this, "behaviourFilter", behaviourFilter);
|
|
PropertyCheck.mandatory(this, "tennantService", tenantService);
|
|
PropertyCheck.mandatory(this, "transactionService", transactionService);
|
|
PropertyCheck.mandatory(this, "transferLockFolderPath", transferLockFolderPath);
|
|
PropertyCheck.mandatory(this, "inboundTransferRecordsPath", inboundTransferRecordsPath);
|
|
PropertyCheck.mandatory(this, "rootStagingDirectory", rootStagingDirectory);
|
|
PropertyCheck.mandatory(this, "policyComponent", policyComponent);
|
|
PropertyCheck.mandatory(this, "descriptorService", descriptorService);
|
|
PropertyCheck.mandatory(this, "alienProcessor", alienProcessor);
|
|
PropertyCheck.mandatory(this, "jobLockService", getJobLockService());
|
|
PropertyCheck.mandatory(this, "transferVersionChecker", getTransferVersionChecker());
|
|
|
|
beforeStartInboundTransferDelegate = policyComponent.registerClassPolicy(TransferServicePolicies.BeforeStartInboundTransferPolicy.class);
|
|
onStartInboundTransferDelegate = policyComponent.registerClassPolicy(TransferServicePolicies.OnStartInboundTransferPolicy.class);
|
|
onEndInboundTransferDelegate = policyComponent.registerClassPolicy(TransferServicePolicies.OnEndInboundTransferPolicy.class);
|
|
|
|
/**
|
|
* For every new child of a node with the trx:transferred aspect run this.onCreateChildAssociation
|
|
*/
|
|
this.getPolicyComponent().bindAssociationBehaviour(
|
|
NodeServicePolicies.OnCreateChildAssociationPolicy.QNAME,
|
|
TransferModel.ASPECT_TRANSFERRED,
|
|
new JavaBehaviour(this, "onCreateChildAssociation", NotificationFrequency.EVERY_EVENT));
|
|
|
|
/**
|
|
* For every update of a transferred node
|
|
*/
|
|
this.getPolicyComponent().bindClassBehaviour(
|
|
ContentServicePolicies.OnContentUpdatePolicy.QNAME,
|
|
TransferModel.ASPECT_TRANSFERRED,
|
|
new JavaBehaviour(this, "onContentUpdate", NotificationFrequency.EVERY_EVENT));
|
|
|
|
/**
|
|
* For every copy of a transferred node run onCopyTransferred
|
|
*/
|
|
this.getPolicyComponent().bindClassBehaviour(
|
|
CopyServicePolicies.OnCopyNodePolicy.QNAME,
|
|
TransferModel.ASPECT_TRANSFERRED,
|
|
new JavaBehaviour(this, "onCopyTransferred", NotificationFrequency.EVERY_EVENT));
|
|
|
|
/**
|
|
* For every new child of a node with the trx:alien aspect run this.onCreateChildAssociation
|
|
*/
|
|
this.getPolicyComponent().bindAssociationBehaviour(
|
|
NodeServicePolicies.OnCreateChildAssociationPolicy.QNAME,
|
|
TransferModel.ASPECT_ALIEN,
|
|
new JavaBehaviour(this, "onCreateChildAssociation", NotificationFrequency.EVERY_EVENT));
|
|
|
|
/**
|
|
* For every node with the trx:alien aspect run this.beforeDeleteNode
|
|
*/
|
|
this.getPolicyComponent().bindClassBehaviour(
|
|
NodeServicePolicies.BeforeDeleteNodePolicy.QNAME,
|
|
TransferModel.ASPECT_ALIEN,
|
|
new JavaBehaviour(this, "beforeDeleteNode", NotificationFrequency.EVERY_EVENT));
|
|
|
|
/**
|
|
* For every restore of a node with the trx:alien aspect
|
|
*/
|
|
this.getPolicyComponent().bindClassBehaviour(
|
|
NodeServicePolicies.OnRestoreNodePolicy.QNAME,
|
|
TransferModel.ASPECT_ALIEN,
|
|
new JavaBehaviour(this, "onRestoreNode", NotificationFrequency.EVERY_EVENT));
|
|
|
|
/**
|
|
* For every move of a node with the trx:alien aspect.
|
|
*/
|
|
this.getPolicyComponent().bindClassBehaviour(
|
|
NodeServicePolicies.OnMoveNodePolicy.QNAME,
|
|
TransferModel.ASPECT_ALIEN,
|
|
new JavaBehaviour(this, "onMoveNode", NotificationFrequency.EVERY_EVENT));
|
|
|
|
/**
|
|
* For every copy of an alien node remove the alien aspect
|
|
*/
|
|
this.getPolicyComponent().bindClassBehaviour(
|
|
CopyServicePolicies.OnCopyNodePolicy.QNAME,
|
|
TransferModel.ASPECT_ALIEN,
|
|
new JavaBehaviour(this, "onCopyAlien", NotificationFrequency.EVERY_EVENT));
|
|
}
|
|
|
|
/*
|
|
* (non-Javadoc)
|
|
*
|
|
* @see
|
|
* org.alfresco.repo.web.scripts.transfer.TransferReceiver#getStagingFolder(org.alfresco.service.cmr.repository.
|
|
* NodeRef)
|
|
*/
|
|
public File getStagingFolder(String transferId)
|
|
{
|
|
if (transferId == null)
|
|
{
|
|
throw new IllegalArgumentException("transferId = " + transferId);
|
|
}
|
|
NodeRef transferNodeRef = new NodeRef(transferId);
|
|
File tempFolder;
|
|
String tempFolderPath = rootStagingDirectory + "/" + transferNodeRef.getId();
|
|
tempFolder = new File(tempFolderPath);
|
|
if (!tempFolder.exists())
|
|
{
|
|
if (!tempFolder.mkdirs())
|
|
{
|
|
tempFolder = null;
|
|
throw new TransferException(MSG_FAILED_TO_CREATE_STAGING_FOLDER, new Object[] { transferId });
|
|
}
|
|
}
|
|
return tempFolder;
|
|
|
|
}
|
|
|
|
public NodeRef getTempFolder(String transferId)
|
|
{
|
|
NodeRef transferTempFolder = singletonCache.get(KEY_TRANSFER_TEMP_NODEREF);
|
|
|
|
// Have we already resolved the node that is the temp folder?
|
|
// If not then do so.
|
|
if (transferTempFolder == null)
|
|
{
|
|
ResultSet rs = null;
|
|
|
|
try
|
|
{
|
|
rs = searchService.query(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, SearchService.LANGUAGE_XPATH,
|
|
transferTempFolderPath);
|
|
if (rs.length() > 0)
|
|
{
|
|
transferTempFolder = rs.getNodeRef(0);
|
|
singletonCache.put(KEY_TRANSFER_TEMP_NODEREF, transferTempFolder);
|
|
}
|
|
else
|
|
{
|
|
throw new TransferException(MSG_TRANSFER_TEMP_FOLDER_NOT_FOUND, new Object[] { transferId,
|
|
transferTempFolderPath });
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if (rs != null) {rs.close();}
|
|
}
|
|
}
|
|
|
|
NodeRef transferNodeRef = new NodeRef(transferId);
|
|
String tempTransferFolderName = transferNodeRef.getId();
|
|
NodeRef tempFolderNode = null;
|
|
QName folderName = QName.createQName(NamespaceService.APP_MODEL_1_0_URI, tempTransferFolderName);
|
|
|
|
// Do we already have a temp folder for this transfer?
|
|
List<ChildAssociationRef> tempChildren = nodeService.getChildAssocs(transferTempFolder,
|
|
RegexQNamePattern.MATCH_ALL, folderName);
|
|
if (tempChildren.isEmpty())
|
|
{
|
|
// No, we don't have a temp folder for this transfer yet. Create it...
|
|
Map<QName, Serializable> props = new HashMap<QName, Serializable>();
|
|
props.put(ContentModel.PROP_NAME, tempTransferFolderName);
|
|
tempFolderNode = nodeService.createNode(transferTempFolder, ContentModel.ASSOC_CONTAINS, folderName,
|
|
TransferModel.TYPE_TEMP_TRANSFER_STORE, props).getChildRef();
|
|
}
|
|
else
|
|
{
|
|
// Yes, we do have a temp folder for this transfer already. Return it.
|
|
tempFolderNode = tempChildren.get(0).getChildRef();
|
|
}
|
|
return tempFolderNode;
|
|
|
|
}
|
|
|
|
/*
|
|
* (non-Javadoc)
|
|
*
|
|
* @see org.alfresco.repo.web.scripts.transfer.TransferReceiver#start()
|
|
*/
|
|
public String start(String fromRepositoryId, boolean transferToSelf, TransferVersion fromVersion)
|
|
{
|
|
log.debug("Start transfer");
|
|
|
|
/**
|
|
* Check that transfer is allowed to this repository
|
|
*/
|
|
checkTransfer(fromRepositoryId, transferToSelf);
|
|
|
|
/**
|
|
* Check that the versions are compatible
|
|
*/
|
|
TransferVersion toVersion = getVersion();
|
|
|
|
if(!getTransferVersionChecker().checkTransferVersions(fromVersion, toVersion))
|
|
{
|
|
throw new TransferException(MSG_INCOMPATIBLE_VERSIONS, new Object[] {"None", fromVersion, toVersion});
|
|
}
|
|
|
|
/**
|
|
* First get the transfer lock for this domain
|
|
*/
|
|
String tenantDomain = tenantService.getUserDomain(AuthenticationUtil.getRunAsUser());
|
|
String lockStr = tenantDomain.isEmpty() ? "transfer.server.default" : "transfer.server.tenant." + tenantDomain;
|
|
QName lockQName = QName.createQName(TransferModel.TRANSFER_MODEL_1_0_URI, lockStr);
|
|
Lock lock = new Lock(lockQName);
|
|
|
|
try
|
|
{
|
|
TransferServicePolicies.BeforeStartInboundTransferPolicy beforeStartPolicy =
|
|
beforeStartInboundTransferDelegate.get(TransferModel.TYPE_TRANSFER_RECORD);
|
|
beforeStartPolicy.beforeStartInboundTransfer();
|
|
|
|
lock.makeLock();
|
|
|
|
/**
|
|
* Transfer Lock held if we get this far
|
|
*/
|
|
String transferId = null;
|
|
|
|
try
|
|
{
|
|
/**
|
|
* Now create a transfer record and use its NodeRef as the transfer id
|
|
*/
|
|
RetryingTransactionHelper txHelper = transactionService.getRetryingTransactionHelper();
|
|
|
|
transferId = txHelper.doInTransaction(
|
|
new RetryingTransactionHelper.RetryingTransactionCallback<String>()
|
|
{
|
|
public String execute() throws Throwable
|
|
{
|
|
final NodeRef relatedTransferRecord = createTransferRecord();
|
|
String transferId = relatedTransferRecord.toString();
|
|
getTempFolder(transferId);
|
|
getStagingFolder(transferId);
|
|
|
|
TransferServicePolicies.OnStartInboundTransferPolicy onStartPolicy =
|
|
onStartInboundTransferDelegate.get(TransferModel.TYPE_TRANSFER_RECORD);
|
|
onStartPolicy.onStartInboundTransfer(transferId);
|
|
|
|
return transferId;
|
|
}
|
|
}, false, true);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
log.debug("Exception while staring transfer", e);
|
|
log.debug("releasing lock - we never created the transfer id");
|
|
lock.releaseLock();
|
|
throw new TransferException(MSG_ERROR_WHILE_STARTING, e);
|
|
}
|
|
|
|
/**
|
|
* Here if we have begun a transfer and have a valid transfer id
|
|
*/
|
|
lock.transferId = transferId;
|
|
locks.put(transferId, lock);
|
|
log.info("transfer started:" + transferId);
|
|
lock.enableLockTimeout();
|
|
return transferId;
|
|
|
|
}
|
|
catch (LockAcquisitionException lae)
|
|
{
|
|
log.debug("transfer lock is already taken", lae);
|
|
// lock is already taken.
|
|
throw new TransferException(MSG_TRANSFER_LOCK_UNAVAILABLE);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @return
|
|
*/
|
|
private NodeRef createTransferRecord()
|
|
{
|
|
log.debug("Receiver createTransferRecord");
|
|
NodeRef inboundTransferRecordsFolder = singletonCache.get(KEY_INBOUND_TRANSFER_RECORDS_NODEREF);
|
|
|
|
if (inboundTransferRecordsFolder == null)
|
|
{
|
|
log.debug("Trying to find transfer records folder: " + inboundTransferRecordsPath);
|
|
ResultSet rs = null;
|
|
|
|
try
|
|
{
|
|
rs = searchService.query(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, SearchService.LANGUAGE_XPATH,
|
|
inboundTransferRecordsPath);
|
|
if (rs.length() > 0)
|
|
{
|
|
inboundTransferRecordsFolder = rs.getNodeRef(0);
|
|
singletonCache.put(KEY_INBOUND_TRANSFER_RECORDS_NODEREF, inboundTransferRecordsFolder);
|
|
log.debug("Found inbound transfer records folder: " + inboundTransferRecordsFolder);
|
|
}
|
|
else
|
|
{
|
|
throw new TransferException(MSG_INBOUND_TRANSFER_FOLDER_NOT_FOUND,
|
|
new Object[] { inboundTransferRecordsPath });
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if (rs != null) {rs.close();}
|
|
}
|
|
}
|
|
|
|
SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmssSSSZ");
|
|
String timeNow = format.format(new Date());
|
|
String name = timeNow + ".xml";
|
|
|
|
QName recordName = QName.createQName(NamespaceService.APP_MODEL_1_0_URI, name);
|
|
|
|
Map<QName, Serializable> props = new HashMap<QName, Serializable>();
|
|
props.put(ContentModel.PROP_NAME, name);
|
|
props.put(TransferModel.PROP_PROGRESS_POSITION, 0);
|
|
props.put(TransferModel.PROP_PROGRESS_ENDPOINT, 1);
|
|
props.put(TransferModel.PROP_TRANSFER_STATUS, TransferProgress.Status.PRE_COMMIT.toString());
|
|
|
|
log.debug("Creating transfer record with name: " + name);
|
|
ChildAssociationRef assoc = nodeService.createNode(inboundTransferRecordsFolder, ContentModel.ASSOC_CONTAINS,
|
|
recordName, TransferModel.TYPE_TRANSFER_RECORD, props);
|
|
log.debug("<-createTransferRecord: " + assoc.getChildRef());
|
|
|
|
return assoc.getChildRef();
|
|
}
|
|
|
|
/**
|
|
* Timeout a transfer. Called after the lock has been released via a timeout.
|
|
*
|
|
* This is the last chance to clean up.
|
|
*
|
|
* @param transferId
|
|
*/
|
|
private void timeout(final String transferId)
|
|
{
|
|
log.info("Inbound Transfer has timed out transferId:" + transferId);
|
|
/*
|
|
* There is no transaction or authentication context in this method since it is called via a
|
|
* timer thread.
|
|
*/
|
|
final RetryingTransactionCallback<Void> timeoutCB = new RetryingTransactionCallback<Void>() {
|
|
|
|
|
|
public Void execute() throws Throwable
|
|
{
|
|
TransferProgress progress = getProgressMonitor().getProgress(transferId);
|
|
|
|
if (progress.getStatus().equals(TransferProgress.Status.PRE_COMMIT))
|
|
{
|
|
log.warn("Inbound Transfer Lock Timeout - transferId:" + transferId);
|
|
/**
|
|
* Did not get out of PRE_COMMIT. The client has probably "gone away" after calling
|
|
* "start", but before calling commit, cancel or error.
|
|
*/
|
|
locks.remove(transferId);
|
|
removeTempFolders(transferId);
|
|
Object[] msgParams = { transferId };
|
|
getProgressMonitor().logException(transferId, "transfer timeout", new TransferException(MSG_LOCK_TIMED_OUT, msgParams));
|
|
getProgressMonitor().updateStatus(transferId, TransferProgress.Status.ERROR);
|
|
}
|
|
else
|
|
{
|
|
// We got beyond PRE_COMMIT, therefore leave the clean up to either
|
|
// commit, cancel or error command, since there may still be "in-flight"
|
|
// transfer in another thread. Although why, in that case, are we here?
|
|
log.warn("Inbound Transfer Lock Timeout - already past PRE-COMMIT - do no cleanup transferId:" + transferId);
|
|
}
|
|
return null;
|
|
}
|
|
};
|
|
|
|
AuthenticationUtil.runAs(new AuthenticationUtil.RunAsWork<String>()
|
|
{
|
|
public String doWork() throws Exception
|
|
{
|
|
transactionService.getRetryingTransactionHelper().doInTransaction(timeoutCB, false, true);
|
|
return null;
|
|
}
|
|
}, AuthenticationUtil.getSystemUserName());
|
|
}
|
|
|
|
/*
|
|
* (non-Javadoc)
|
|
*
|
|
* @see org.alfresco.repo.web.scripts.transfer.TransferReceiver#end(org.alfresco.service.cmr.repository.NodeRef)
|
|
*/
|
|
public void end(final String transferId)
|
|
{
|
|
if (log.isDebugEnabled())
|
|
{
|
|
log.debug("Request to end transfer " + transferId);
|
|
}
|
|
if (transferId == null)
|
|
{
|
|
throw new IllegalArgumentException("transferId = null");
|
|
}
|
|
|
|
try
|
|
{
|
|
Lock lock = locks.get(transferId);
|
|
if(lock != null)
|
|
{
|
|
log.debug("releasing lock:" + lock.lockToken);
|
|
lock.releaseLock();
|
|
locks.remove(transferId);
|
|
}
|
|
|
|
removeTempFolders(transferId);
|
|
|
|
|
|
//Fire the OnEndInboundTransfer policy
|
|
Set<NodeRef> createdNodes = Collections.emptySet();
|
|
Set<NodeRef> updatedNodes = Collections.emptySet();
|
|
Set<NodeRef> deletedNodes = Collections.emptySet();
|
|
TransferChangesRecord changesRecord = progressMonitor.removeChangeRecord(transferId);
|
|
if (changesRecord != null)
|
|
{
|
|
createdNodes = new HashSet<NodeRef>(changesRecord.getCreatedNodes());
|
|
updatedNodes = new HashSet<NodeRef>(changesRecord.getUpdatedNodes());
|
|
deletedNodes = new HashSet<NodeRef>(changesRecord.getDeletedNodes());
|
|
}
|
|
TransferServicePolicies.OnEndInboundTransferPolicy onEndPolicy =
|
|
onEndInboundTransferDelegate.get(TransferModel.TYPE_TRANSFER_RECORD);
|
|
onEndPolicy.onEndInboundTransfer(transferId, createdNodes, updatedNodes, deletedNodes);
|
|
}
|
|
catch (TransferException ex)
|
|
{
|
|
throw ex;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
throw new TransferException(MSG_ERROR_WHILE_ENDING_TRANSFER, new Object[] {transferId}, ex);
|
|
}
|
|
}
|
|
|
|
private void removeTempFolders(final String transferId)
|
|
{
|
|
NodeRef tempStoreNode = null;
|
|
try
|
|
{
|
|
log.debug("Deleting temporary store node...");
|
|
tempStoreNode = getTempFolder(transferId);
|
|
nodeService.deleteNode(tempStoreNode);
|
|
log.debug("Deleted temporary store node.");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
log.warn("Failed to delete temp store node for transfer id " + transferId +
|
|
"\nTemp store noderef = " + tempStoreNode);
|
|
}
|
|
|
|
File stagingFolder = null;
|
|
try
|
|
{
|
|
log.debug("delete staging folder " + transferId);
|
|
// Delete the staging folder.
|
|
stagingFolder = getStagingFolder(transferId);
|
|
deleteFile(stagingFolder);
|
|
log.debug("Staging folder deleted");
|
|
}
|
|
catch(Exception ex)
|
|
{
|
|
log.warn("Failed to delete staging folder for transfer id " + transferId +
|
|
"\nStaging folder = " + stagingFolder.toString());
|
|
}
|
|
}
|
|
|
|
|
|
public void cancel(String transferId) throws TransferException
|
|
{
|
|
// no need to check the lock
|
|
TransferProgress progress = getProgressMonitor().getProgress(transferId);
|
|
getProgressMonitor().updateStatus(transferId, TransferProgress.Status.CANCELLED);
|
|
if (progress.getStatus().equals(TransferProgress.Status.PRE_COMMIT))
|
|
{
|
|
end(transferId);
|
|
}
|
|
}
|
|
|
|
public void prepare(String transferId) throws TransferException
|
|
{
|
|
// Check that this transfer still owns the lock
|
|
Lock lock = checkLock(transferId);
|
|
try
|
|
{
|
|
|
|
}
|
|
finally
|
|
{
|
|
lock.enableLockTimeout();
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* @param stagingFolder
|
|
*/
|
|
private void deleteFile(File file)
|
|
{
|
|
if (file.isDirectory())
|
|
{
|
|
File[] fileList = file.listFiles();
|
|
if (fileList != null)
|
|
{
|
|
for (File currentFile : fileList)
|
|
{
|
|
deleteFile(currentFile);
|
|
}
|
|
}
|
|
}
|
|
file.delete();
|
|
}
|
|
|
|
public Lock checkLock(final String transferId) throws TransferException
|
|
{
|
|
if (transferId == null)
|
|
{
|
|
throw new IllegalArgumentException("nudgeLock: transferId = null");
|
|
}
|
|
|
|
Lock lock = locks.get(transferId);
|
|
if(lock != null)
|
|
{
|
|
if(lock.isActive())
|
|
{
|
|
lock.suspendLockTimeout();
|
|
return lock;
|
|
}
|
|
else
|
|
{
|
|
// lock is no longer active
|
|
log.debug("lock not active");
|
|
throw new TransferException(MSG_LOCK_TIMED_OUT, new Object[]{transferId});
|
|
|
|
}
|
|
}
|
|
else
|
|
{
|
|
log.debug("lock not found");
|
|
throw new TransferException(MSG_LOCK_NOT_FOUND, new Object[]{transferId});
|
|
// lock not found
|
|
}
|
|
}
|
|
|
|
public void saveSnapshot(String transferId, InputStream openStream) throws TransferException
|
|
{
|
|
// Check that this transfer still owns the lock
|
|
Lock lock = checkLock(transferId);
|
|
try
|
|
{
|
|
if (log.isDebugEnabled())
|
|
{
|
|
log.debug("Saving snapshot for transferId =" + transferId);
|
|
}
|
|
|
|
File snapshotFile = new File(getStagingFolder(transferId), SNAPSHOT_FILE_NAME);
|
|
try
|
|
{
|
|
if (snapshotFile.createNewFile())
|
|
{
|
|
FileCopyUtils.copy(openStream, new BufferedOutputStream(new FileOutputStream(snapshotFile)));
|
|
}
|
|
if (log.isDebugEnabled())
|
|
{
|
|
log.debug("Saved snapshot for transferId =" + transferId);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
throw new TransferException(MSG_ERROR_WHILE_STAGING_SNAPSHOT, new Object[]{transferId}, ex);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
lock.enableLockTimeout();
|
|
}
|
|
}
|
|
|
|
public void saveContent(String transferId, String contentFileId, InputStream contentStream)
|
|
throws TransferException
|
|
{
|
|
Lock lock = checkLock(transferId);
|
|
try
|
|
{
|
|
File stagedFile = new File(getStagingFolder(transferId), contentFileId);
|
|
if (stagedFile.createNewFile())
|
|
{
|
|
FileCopyUtils.copy(contentStream, new BufferedOutputStream(new FileOutputStream(stagedFile)));
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
throw new TransferException(MSG_ERROR_WHILE_STAGING_CONTENT, new Object[]{transferId, contentFileId}, ex);
|
|
}
|
|
finally
|
|
{
|
|
lock.enableLockTimeout();
|
|
}
|
|
}
|
|
|
|
public void commitAsync(String transferId)
|
|
{
|
|
/**
|
|
* A side-effect of checking the lock here is that the lock timeout is suspended.
|
|
*
|
|
*/
|
|
Lock lock = checkLock(transferId);
|
|
try
|
|
{
|
|
progressMonitor.updateStatus(transferId, Status.COMMIT_REQUESTED);
|
|
Action commitAction = actionService.createAction(TransferCommitActionExecuter.NAME);
|
|
commitAction.setParameterValue(TransferCommitActionExecuter.PARAM_TRANSFER_ID, transferId);
|
|
commitAction.setExecuteAsynchronously(true);
|
|
actionService.executeAction(commitAction, new NodeRef(transferId));
|
|
if (log.isDebugEnabled())
|
|
{
|
|
log.debug("Registered transfer commit for asynchronous execution: " + transferId);
|
|
}
|
|
}
|
|
catch (Exception error)
|
|
{
|
|
/**
|
|
* Error somewhere in the action service?
|
|
*/
|
|
//TODO consider whether the methods in this class should be retried/retryable..
|
|
|
|
// need to re-enable the lock timeout otherwise we will hold the lock forever...
|
|
lock.enableLockTimeout();
|
|
|
|
throw new TransferException(MSG_ERROR_WHILE_COMMITTING_TRANSFER, new Object[]{transferId}, error);
|
|
}
|
|
|
|
/**
|
|
* Lock intentionally not re-enabled here
|
|
*/
|
|
}
|
|
|
|
public void commit(final String transferId) throws TransferException
|
|
{
|
|
if (log.isDebugEnabled())
|
|
{
|
|
log.debug("Committing transferId=" + transferId);
|
|
}
|
|
|
|
/**
|
|
* A side-effect of checking the lock here is that it ensures that the lock timeout is suspended.
|
|
*/
|
|
checkLock(transferId);
|
|
|
|
/**
|
|
* Turn off rules while transfer is being committed.
|
|
*/
|
|
boolean rulesEnabled = ruleService.isEnabled();
|
|
ruleService.disableRules();
|
|
|
|
try
|
|
{
|
|
/* lock is going to be released */ checkLock(transferId);
|
|
progressMonitor.updateStatus(transferId, Status.COMMITTING);
|
|
|
|
RetryingTransactionHelper.RetryingTransactionCallback<Object> commitWork = new RetryingTransactionCallback<Object>()
|
|
{
|
|
public Object execute() throws Throwable
|
|
{
|
|
AlfrescoTransactionSupport.bindListener(new TransferCommitTransactionListener(transferId,
|
|
RepoTransferReceiverImpl.this));
|
|
|
|
List<TransferManifestProcessor> commitProcessors = manifestProcessorFactory.getCommitProcessors(
|
|
RepoTransferReceiverImpl.this, transferId);
|
|
|
|
SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
|
|
SAXParser parser = saxParserFactory.newSAXParser();
|
|
File snapshotFile = getSnapshotFile(transferId);
|
|
|
|
if (snapshotFile.exists())
|
|
{
|
|
if (log.isDebugEnabled())
|
|
{
|
|
log.debug("Processing manifest file:" + snapshotFile.getAbsolutePath());
|
|
}
|
|
// We parse the file as many times as we have processors
|
|
for (TransferManifestProcessor processor : commitProcessors)
|
|
{
|
|
XMLTransferManifestReader reader = new XMLTransferManifestReader(processor);
|
|
|
|
//behaviourFilter.disableBehaviour(ContentModel.ASPECT_AUDITABLE);
|
|
behaviourFilter.disableBehaviour();
|
|
|
|
try
|
|
{
|
|
parser.parse(snapshotFile, reader);
|
|
}
|
|
finally
|
|
{
|
|
behaviourFilter.enableBehaviour();
|
|
}
|
|
parser.reset();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
progressMonitor.logException(transferId, "Unable to start commit. No snapshot file received",
|
|
new TransferException(MSG_NO_SNAPSHOT_RECEIVED, new Object[]{transferId}));
|
|
}
|
|
return null;
|
|
}
|
|
};
|
|
|
|
transactionService.getRetryingTransactionHelper().doInTransaction(commitWork, false, true);
|
|
|
|
Throwable error = progressMonitor.getProgress(transferId).getError();
|
|
if (error != null)
|
|
{
|
|
if (TransferException.class.isAssignableFrom(error.getClass()))
|
|
{
|
|
throw (TransferException) error;
|
|
}
|
|
else
|
|
{
|
|
throw new TransferException(MSG_ERROR_WHILE_COMMITTING_TRANSFER, new Object[]{transferId}, error);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Successfully committed
|
|
*/
|
|
if (log.isDebugEnabled())
|
|
{
|
|
log.debug("Commit success transferId=" + transferId);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
if (TransferException.class.isAssignableFrom(ex.getClass()))
|
|
{
|
|
throw (TransferException) ex;
|
|
}
|
|
else
|
|
{
|
|
throw new TransferException(MSG_ERROR_WHILE_COMMITTING_TRANSFER, ex);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if(rulesEnabled)
|
|
{
|
|
/**
|
|
* Turn rules back on if we turned them off earlier.
|
|
*/
|
|
ruleService.enableRules();
|
|
}
|
|
|
|
/**
|
|
* Clean up at the end of the transfer
|
|
*/
|
|
try
|
|
{
|
|
end(transferId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
log.error("Failed to clean up transfer. Lock may still be in place: " + transferId, ex);
|
|
}
|
|
}
|
|
}
|
|
|
|
public TransferProgress getStatus(String transferId) throws TransferException
|
|
{
|
|
return getProgressMonitor().getProgress(transferId);
|
|
}
|
|
|
|
private File getSnapshotFile(String transferId)
|
|
{
|
|
return new File(getStagingFolder(transferId), SNAPSHOT_FILE_NAME);
|
|
}
|
|
|
|
/**
|
|
* @param searchService
|
|
* the searchService to set
|
|
*/
|
|
public void setSearchService(SearchService searchService)
|
|
{
|
|
this.searchService = searchService;
|
|
}
|
|
|
|
/**
|
|
* @param transactionService
|
|
* the transactionService to set
|
|
*/
|
|
public void setTransactionService(TransactionService transactionService)
|
|
{
|
|
this.transactionService = transactionService;
|
|
}
|
|
|
|
public void setTenantService(TenantService tenantService)
|
|
{
|
|
this.tenantService = tenantService;
|
|
}
|
|
|
|
public void setSingletonCache(SimpleCache<String, NodeRef> singletonCache)
|
|
{
|
|
this.singletonCache = singletonCache;
|
|
}
|
|
|
|
/**
|
|
* @param transferLockFolderPath
|
|
* the transferLockFolderPath to set
|
|
*/
|
|
public void setTransferLockFolderPath(String transferLockFolderPath)
|
|
{
|
|
this.transferLockFolderPath = transferLockFolderPath;
|
|
}
|
|
|
|
/**
|
|
* @param transferTempFolderPath
|
|
* the transferTempFolderPath to set
|
|
*/
|
|
public void setTransferTempFolderPath(String transferTempFolderPath)
|
|
{
|
|
this.transferTempFolderPath = transferTempFolderPath;
|
|
}
|
|
|
|
/**
|
|
* @param rootStagingDirectory
|
|
* the rootTransferFolder to set
|
|
*/
|
|
public void setRootStagingDirectory(String rootStagingDirectory)
|
|
{
|
|
this.rootStagingDirectory = rootStagingDirectory;
|
|
}
|
|
|
|
/**
|
|
* @param inboundTransferRecordsPath
|
|
* the inboundTransferRecordsPath to set
|
|
*/
|
|
public void setInboundTransferRecordsPath(String inboundTransferRecordsPath)
|
|
{
|
|
this.inboundTransferRecordsPath = inboundTransferRecordsPath;
|
|
}
|
|
|
|
/**
|
|
* @param nodeService
|
|
* the nodeService to set
|
|
*/
|
|
public void setNodeService(NodeService nodeService)
|
|
{
|
|
this.nodeService = nodeService;
|
|
}
|
|
|
|
/**
|
|
* @param manifestProcessorFactory
|
|
* the manifestProcessorFactory to set
|
|
*/
|
|
public void setManifestProcessorFactory(ManifestProcessorFactory manifestProcessorFactory)
|
|
{
|
|
this.manifestProcessorFactory = manifestProcessorFactory;
|
|
}
|
|
|
|
/**
|
|
* @param behaviourFilter
|
|
* the behaviourFilter to set
|
|
*/
|
|
public void setBehaviourFilter(BehaviourFilter behaviourFilter)
|
|
{
|
|
this.behaviourFilter = behaviourFilter;
|
|
}
|
|
|
|
/**
|
|
* @return the progressMonitor
|
|
*/
|
|
public TransferProgressMonitor getProgressMonitor()
|
|
{
|
|
return progressMonitor;
|
|
}
|
|
|
|
/**
|
|
* @param progressMonitor
|
|
* the progressMonitor to set
|
|
*/
|
|
public void setProgressMonitor(TransferProgressMonitor progressMonitor)
|
|
{
|
|
this.progressMonitor = new ChangeCapturingProgressMonitor(progressMonitor);
|
|
}
|
|
|
|
public void setActionService(ActionService actionService)
|
|
{
|
|
this.actionService = actionService;
|
|
}
|
|
|
|
/**
|
|
* Set the ruleService
|
|
* @param ruleService
|
|
* the ruleService to set
|
|
*/
|
|
public void setRuleService(RuleService ruleService)
|
|
{
|
|
this.ruleService = ruleService;
|
|
}
|
|
|
|
/**
|
|
* Get the rule service
|
|
* @return the rule service
|
|
*/
|
|
public RuleService getRuleService()
|
|
{
|
|
return this.ruleService;
|
|
}
|
|
|
|
/**
|
|
* Generate the requsite
|
|
*/
|
|
public void generateRequsite(String transferId, OutputStream out) throws TransferException
|
|
{
|
|
log.debug("Generate Requsite for transfer:" + transferId);
|
|
try
|
|
{
|
|
File snapshotFile = getSnapshotFile(transferId);
|
|
|
|
if (snapshotFile.exists())
|
|
{
|
|
log.debug("snapshot does exist");
|
|
SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
|
|
SAXParser parser = saxParserFactory.newSAXParser();
|
|
OutputStreamWriter dest = new OutputStreamWriter(out, "UTF-8");
|
|
|
|
XMLTransferRequsiteWriter writer = new XMLTransferRequsiteWriter(dest);
|
|
TransferManifestProcessor processor = manifestProcessorFactory.getRequsiteProcessor(
|
|
RepoTransferReceiverImpl.this,
|
|
transferId,
|
|
writer);
|
|
|
|
XMLTransferManifestReader reader = new XMLTransferManifestReader(processor);
|
|
|
|
/**
|
|
* Now run the parser
|
|
*/
|
|
parser.parse(snapshotFile, reader);
|
|
|
|
/**
|
|
* And flush the destination in case any content remains in the writer.
|
|
*/
|
|
dest.flush();
|
|
|
|
}
|
|
log.debug("Generate Requsite done transfer:" + transferId);
|
|
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
if (TransferException.class.isAssignableFrom(ex.getClass()))
|
|
{
|
|
throw (TransferException) ex;
|
|
}
|
|
else
|
|
{
|
|
throw new TransferException(MSG_ERROR_WHILE_GENERATING_REQUISITE, ex);
|
|
}
|
|
}
|
|
}
|
|
|
|
public InputStream getTransferReport(String transferId)
|
|
{
|
|
return progressMonitor.getLogInputStream(transferId);
|
|
}
|
|
|
|
public void setPolicyComponent(PolicyComponent policyComponent)
|
|
{
|
|
this.policyComponent = policyComponent;
|
|
}
|
|
|
|
public PolicyComponent getPolicyComponent()
|
|
{
|
|
return policyComponent;
|
|
}
|
|
|
|
/**
|
|
* When a new node is created as a child of a Transferred or Alien node then
|
|
* the new node needs to be marked as an alien.
|
|
* <p>
|
|
* Then the tree needs to be walked upwards to mark all parent
|
|
* transferred nodes as alien.
|
|
*/
|
|
public void onCreateChildAssociation(ChildAssociationRef childAssocRef,
|
|
boolean isNewNode)
|
|
{
|
|
|
|
log.debug("on create child association to transferred node");
|
|
|
|
final String localRepositoryId = descriptorService.getCurrentRepositoryDescriptor().getId();
|
|
alienProcessor.onCreateChild(childAssocRef, localRepositoryId, isNewNode);
|
|
}
|
|
|
|
/**
|
|
* When an alien node is deleted the it may be the last alien invader
|
|
* <p>
|
|
* Walk the tree checking the invasion status!
|
|
*/
|
|
public void beforeDeleteNode(NodeRef deletedNodeRef)
|
|
{
|
|
log.debug("on delete node - need to check for transferred node");
|
|
alienProcessor.beforeDeleteAlien(deletedNodeRef, null);
|
|
}
|
|
|
|
/**
|
|
* When a transferred node is restored it may be a new invader or it may no
|
|
* longer be an invader.
|
|
* <p>
|
|
* Walk the tree checking the invasion status!
|
|
*/
|
|
public void onRestoreNode(ChildAssociationRef childAssocRef)
|
|
{
|
|
log.debug("on restore node");
|
|
log.debug("restoredAssocRef:" + childAssocRef);
|
|
alienProcessor.afterMoveAlien(childAssocRef);
|
|
}
|
|
|
|
/**
|
|
* When an alien node is moved it may un-invade its old location and invade a new
|
|
* location. The node may also cease to be alien.
|
|
*/
|
|
public void onMoveNode(ChildAssociationRef oldChildAssocRef,
|
|
ChildAssociationRef newChildAssocRef)
|
|
{
|
|
|
|
log.debug("onMoveNode");
|
|
log.debug("oldChildAssocRef:" + oldChildAssocRef);
|
|
log.debug("newChildAssocRef:" + newChildAssocRef);
|
|
|
|
NodeRef oldParentRef = oldChildAssocRef.getParentRef();
|
|
NodeRef newParentRef = newChildAssocRef.getParentRef();
|
|
|
|
if(newParentRef.equals(oldParentRef))
|
|
{
|
|
log.debug("old parent and new parent are the same - this is a rename, do nothing");
|
|
}
|
|
else
|
|
{
|
|
if(log.isDebugEnabled())
|
|
{
|
|
log.debug("moving node from oldParentRef:" + oldParentRef +" to:" + newParentRef);
|
|
}
|
|
alienProcessor.beforeDeleteAlien(newChildAssocRef.getChildRef(), oldChildAssocRef);
|
|
alienProcessor.afterMoveAlien(newChildAssocRef);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* When a transferred node is copied, don't copy the transferred aspect.
|
|
*/
|
|
public CopyBehaviourCallback onCopyTransferred(QName classRef,
|
|
CopyDetails copyDetails)
|
|
{
|
|
return TransferredAspectCopyBehaviourCallback.INSTANCE;
|
|
}
|
|
|
|
/**
|
|
* When an alien node is copied, don't copy the alien aspect.
|
|
*/
|
|
public CopyBehaviourCallback onCopyAlien(QName classRef,
|
|
CopyDetails copyDetails)
|
|
{
|
|
return AlienAspectCopyBehaviourCallback.INSTANCE;
|
|
}
|
|
|
|
/**
|
|
* Extends the default copy behaviour to prevent copying of transferred aspect and properties.
|
|
*
|
|
* @author Mark Rogers
|
|
* @since 3.4
|
|
*/
|
|
private static class TransferredAspectCopyBehaviourCallback extends DefaultCopyBehaviourCallback
|
|
{
|
|
private static final CopyBehaviourCallback INSTANCE = new TransferredAspectCopyBehaviourCallback();
|
|
|
|
/**
|
|
* @return Returns an empty map
|
|
*/
|
|
@Override
|
|
public Map<QName, Serializable> getCopyProperties(
|
|
QName classQName, CopyDetails copyDetails, Map<QName, Serializable> properties)
|
|
{
|
|
return Collections.emptyMap();
|
|
}
|
|
|
|
/**
|
|
* Don't copy the transferred aspect.
|
|
*
|
|
* @return Returns <tt>true</tt> always
|
|
*/
|
|
@Override
|
|
public boolean getMustCopy(QName classQName, CopyDetails copyDetails)
|
|
{
|
|
if(classQName.equals(TransferModel.ASPECT_TRANSFERRED))
|
|
{
|
|
return false;
|
|
}
|
|
else
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Extends the default copy behaviour to prevent copying of alien aspect and properties.
|
|
*
|
|
* @author Mark Rogers
|
|
* @since 3.4
|
|
*/
|
|
private static class AlienAspectCopyBehaviourCallback extends DefaultCopyBehaviourCallback
|
|
{
|
|
private static final CopyBehaviourCallback INSTANCE = new AlienAspectCopyBehaviourCallback();
|
|
|
|
/**
|
|
* @return Returns an empty map
|
|
*/
|
|
@Override
|
|
public Map<QName, Serializable> getCopyProperties(
|
|
QName classQName, CopyDetails copyDetails, Map<QName, Serializable> properties)
|
|
{
|
|
return Collections.emptyMap();
|
|
}
|
|
|
|
/**
|
|
* Don't copy the transferred aspect.
|
|
*
|
|
* @return Returns <tt>true</tt> always
|
|
*/
|
|
@Override
|
|
public boolean getMustCopy(QName classQName, CopyDetails copyDetails)
|
|
{
|
|
if(classQName.equals(TransferModel.ASPECT_ALIEN))
|
|
{
|
|
return false;
|
|
}
|
|
else
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
public void setDescriptorService(DescriptorService descriptorService)
|
|
{
|
|
this.descriptorService = descriptorService;
|
|
}
|
|
|
|
public DescriptorService getDescriptorService()
|
|
{
|
|
return descriptorService;
|
|
}
|
|
|
|
public void setAlienProcessor(AlienProcessor alienProcessor)
|
|
{
|
|
this.alienProcessor = alienProcessor;
|
|
}
|
|
|
|
public AlienProcessor getAlienProcessor()
|
|
{
|
|
return alienProcessor;
|
|
}
|
|
|
|
@Override
|
|
public void onContentUpdate(NodeRef nodeRef, boolean newContent)
|
|
{
|
|
/**
|
|
* On update of a transferred node remove the from content from property.
|
|
*/
|
|
log.debug("on content update called:" + nodeRef);
|
|
if(newContent)
|
|
{
|
|
log.debug("new content remove PROP_FROM_CONTENT from node:" + nodeRef);
|
|
nodeService.setProperty(nodeRef, TransferModel.PROP_FROM_CONTENT, null);
|
|
}
|
|
}
|
|
|
|
public void setJobLockService(JobLockService jobLockService)
|
|
{
|
|
this.jobLockService = jobLockService;
|
|
}
|
|
|
|
public JobLockService getJobLockService()
|
|
{
|
|
return jobLockService;
|
|
}
|
|
|
|
public void setLockRetryCount(int lockRetryCount)
|
|
{
|
|
this.lockRetryCount = lockRetryCount;
|
|
}
|
|
|
|
public int getLockRetryCount()
|
|
{
|
|
return lockRetryCount;
|
|
}
|
|
|
|
public void setLockRetryWait(long lockRetryWait)
|
|
{
|
|
this.lockRetryWait = lockRetryWait;
|
|
}
|
|
|
|
public long getLockRetryWait()
|
|
{
|
|
return lockRetryWait;
|
|
}
|
|
|
|
public void setLockTimeOut(long lockTimeOut)
|
|
{
|
|
this.lockTimeOut = lockTimeOut;
|
|
}
|
|
|
|
public long getLockTimeOut()
|
|
{
|
|
return lockTimeOut;
|
|
}
|
|
|
|
public void setLockRefreshTime(long lockRefreshTime)
|
|
{
|
|
this.lockRefreshTime = lockRefreshTime;
|
|
}
|
|
|
|
public long getLockRefreshTime()
|
|
{
|
|
return lockRefreshTime;
|
|
}
|
|
|
|
/**
|
|
* A Transfer Lock
|
|
*/
|
|
private class Lock implements JobLockService.JobLockRefreshCallback
|
|
{
|
|
/**
|
|
* The name of the lock - unique for each domain
|
|
*/
|
|
QName lockQName;
|
|
|
|
/**
|
|
* The unique token for this lock instance.
|
|
*/
|
|
String lockToken;
|
|
|
|
/**
|
|
* The transfer that this lock belongs to.
|
|
*/
|
|
String transferId;
|
|
|
|
/**
|
|
* Is the lock active ?
|
|
*/
|
|
private boolean active = false;
|
|
|
|
/**
|
|
* Is the server processing ?
|
|
*/
|
|
private boolean processing = false;
|
|
|
|
/**
|
|
* When did we last check whether the lock is active
|
|
*/
|
|
long lastActive = System.currentTimeMillis();
|
|
|
|
public Lock(QName lockQName)
|
|
{
|
|
this.lockQName = lockQName;
|
|
}
|
|
|
|
|
|
/**
|
|
* Make the lock - called on main thread
|
|
*
|
|
* @throws LockAquisitionException
|
|
*/
|
|
public synchronized void makeLock()
|
|
{
|
|
if(log.isDebugEnabled())
|
|
{
|
|
log.debug("makeLock" + lockQName);
|
|
}
|
|
|
|
lockToken = getJobLockService().getLock(lockQName, getLockRefreshTime(), getLockRetryWait(), getLockRetryCount());
|
|
|
|
// Got the lock, so mark as active
|
|
active = true;
|
|
|
|
if (log.isDebugEnabled())
|
|
{
|
|
log.debug("lock taken: name" + lockQName + " token:" +lockToken);
|
|
log.debug("register lock callback, target lock refresh time :" + getLockRefreshTime());
|
|
}
|
|
getJobLockService().refreshLock(lockToken, lockQName, getLockRefreshTime(), this);
|
|
if (log.isDebugEnabled())
|
|
{
|
|
log.debug("refreshLock callback registered");
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check that the lock is still active
|
|
*
|
|
* Called on main transfer thread as transfer proceeds.
|
|
* @throws TransferException (Lock timeout)
|
|
*/
|
|
public synchronized void suspendLockTimeout()
|
|
{
|
|
log.debug("suspend lock called");
|
|
if (active)
|
|
{
|
|
processing = true;
|
|
}
|
|
else
|
|
{
|
|
// lock is no longer active
|
|
log.debug("lock not active, throw timed out exception");
|
|
throw new TransferException(MSG_LOCK_TIMED_OUT);
|
|
}
|
|
}
|
|
|
|
public synchronized void enableLockTimeout()
|
|
{
|
|
long now = System.currentTimeMillis();
|
|
// Update lastActive to 1S boundary
|
|
if(now > lastActive + 1000L)
|
|
{
|
|
lastActive = now;
|
|
log.debug("start waiting : lastActive:" + lastActive);
|
|
}
|
|
|
|
processing = false;
|
|
}
|
|
|
|
/**
|
|
* Release the lock
|
|
*
|
|
* Called on main thread
|
|
*/
|
|
public synchronized void releaseLock()
|
|
{
|
|
if(log.isDebugEnabled())
|
|
{
|
|
log.debug("transfer service about to releaseLock : " + lockQName);
|
|
}
|
|
|
|
if (active)
|
|
{
|
|
active = false;
|
|
getJobLockService().releaseLock(lockToken, lockQName);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Called by Job Lock Service to determine whether the lock is still active
|
|
*/
|
|
@Override
|
|
public synchronized boolean isActive()
|
|
{
|
|
long now = System.currentTimeMillis();
|
|
|
|
if(active)
|
|
{
|
|
if(!processing)
|
|
{
|
|
if(now > lastActive + getLockTimeOut())
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
if(log.isDebugEnabled())
|
|
{
|
|
log.debug("transfer service callback isActive: " + active);
|
|
}
|
|
|
|
return active;
|
|
}
|
|
|
|
/**
|
|
* Called by Job Lock Service on release of the lock after time-out
|
|
*/
|
|
@Override
|
|
public synchronized void lockReleased()
|
|
{
|
|
if(active)
|
|
{
|
|
active = false;
|
|
log.info("transfer service: lock has timed out, timeout :" + lockQName);
|
|
timeout(transferId);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check Whether transfer is allowed from the specified repository.
|
|
* Called prior to "begin".
|
|
*/
|
|
|
|
private void checkTransfer(String fromRepository, boolean transferToSelf)
|
|
{
|
|
if(log.isDebugEnabled())
|
|
{
|
|
log.debug("checkTransfer fromRepository:" + fromRepository + ", transferToSelf:" + transferToSelf );
|
|
}
|
|
final String localRepositoryId = descriptorService.getCurrentRepositoryDescriptor().getId();
|
|
|
|
if(!transferToSelf)
|
|
{
|
|
if(fromRepository != null)
|
|
{
|
|
if(fromRepository.equalsIgnoreCase(localRepositoryId))
|
|
{
|
|
throw new TransferException(MSG_TRANSFER_TO_SELF);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
throw new TransferException("from repository id is missing");
|
|
}
|
|
}
|
|
}
|
|
|
|
public void setTransferVersionChecker(TransferVersionChecker transferVersionChecker)
|
|
{
|
|
this.transferVersionChecker = transferVersionChecker;
|
|
}
|
|
|
|
public TransferVersionChecker getTransferVersionChecker()
|
|
{
|
|
return transferVersionChecker;
|
|
}
|
|
|
|
@Override
|
|
public TransferVersion getVersion()
|
|
{
|
|
Descriptor d = descriptorService.getServerDescriptor();
|
|
// needs to be serverDescriptor to pick up versionEdition
|
|
return new TransferVersionImpl(d);
|
|
}
|
|
|
|
public void setTransferRootNode(String rootFileSystem)
|
|
{
|
|
//just ignore, no relevant for transferring on file system
|
|
}
|
|
}
|