diff --git a/source/java/org/alfresco/repo/bulkimport/impl/AbstractBulkFilesystemImporter.java b/source/java/org/alfresco/repo/bulkimport/impl/AbstractBulkFilesystemImporter.java index 1da4c9bb23..6700396d0b 100644 --- a/source/java/org/alfresco/repo/bulkimport/impl/AbstractBulkFilesystemImporter.java +++ b/source/java/org/alfresco/repo/bulkimport/impl/AbstractBulkFilesystemImporter.java @@ -146,7 +146,7 @@ public abstract class AbstractBulkFilesystemImporter implements BulkFilesystemIm this.transactionHelper = transactionService.getRetryingTransactionHelper(); } - protected abstract void bulkImportImpl(BulkImportParameters bulkImportParameters, NodeImporter nodeImporter, String lockToken) throws Throwable; + protected abstract void bulkImportImpl(BulkImportParameters bulkImportParameters, NodeImporter nodeImporter, String lockToken); /** * Attempts to get the lock. If the lock couldn't be taken, then null is returned. @@ -357,7 +357,6 @@ public abstract class AbstractBulkFilesystemImporter implements BulkFilesystemIm }; Thread backgroundThread = new Thread(backgroundLogic, "BulkFilesystemImport-BackgroundThread"); - //backgroundThread.setDaemon(true); backgroundThread.start(); } @@ -374,51 +373,51 @@ public abstract class AbstractBulkFilesystemImporter implements BulkFilesystemIm @Override public Void execute() throws Throwable { - String sourceDirectory = getFileName(sourceFolder); - String targetSpace = getRepositoryPath(bulkImportParameters.getTarget()); + final String sourceDirectory = getFileName(sourceFolder); + final String targetSpace = getRepositoryPath(bulkImportParameters.getTarget()); + final String lockToken = getLockToken(); - String lockToken = getLockToken(); + try + { + importStatus.startImport(sourceDirectory, targetSpace); - try - { - importStatus.startImport(sourceDirectory, targetSpace); - - BulkFSImportEvent bulkImportEvent = new BulkFSImportEvent(importer); - applicationContext.publishEvent(bulkImportEvent); - - validateNodeRefIsWritableSpace(bulkImportParameters.getTarget()); - validateSourceIsReadableDirectory(sourceFolder); - - if(logger.isDebugEnabled()) - { - logger.debug("Bulk import started from '" + sourceFolder.getAbsolutePath() + "'..."); - } - - bulkImportImpl(bulkImportParameters, nodeImporter, lockToken); - - importStatus.stopImport(); - - if(logger.isDebugEnabled()) - { - logger.debug("Bulk import from '" + getFileName(sourceFolder) + "' succeeded."); - } - - return null; - } - catch(Throwable e) - { - logger.error("Bulk import from '" + getFileName(sourceFolder) + "' failed.", e); - importStatus.stopImport(e); - throw new AlfrescoRuntimeException("Bulk filesystem import failed", e); - } - finally - { - BulkFSImportEvent bulkImportEvent = new BulkFSImportEvent(importer); - applicationContext.publishEvent(bulkImportEvent); + BulkFSImportEvent bulkImportEvent = new BulkFSImportEvent(importer); + applicationContext.publishEvent(bulkImportEvent); - releaseLock(lockToken); - } - } + validateNodeRefIsWritableSpace(bulkImportParameters.getTarget()); + validateSourceIsReadableDirectory(sourceFolder); + + if(logger.isDebugEnabled()) + { + logger.debug("Bulk import started from '" + sourceFolder.getAbsolutePath() + "'..."); + } + + + bulkImportImpl(bulkImportParameters, nodeImporter, lockToken); + + importStatus.stopImport(); + + if(logger.isDebugEnabled()) + { + logger.debug("Bulk import from '" + getFileName(sourceFolder) + "' succeeded."); + } + + return null; + } + catch(Throwable e) + { + logger.error("Bulk import from '" + getFileName(sourceFolder) + "' failed.", e); + importStatus.stopImport(e); + throw new AlfrescoRuntimeException("Bulk filesystem import failed", e); + } + finally + { + BulkFSImportEvent bulkImportEvent = new BulkFSImportEvent(importer); + applicationContext.publishEvent(bulkImportEvent); + + releaseLock(lockToken); + } + } }, false, true); } diff --git a/source/java/org/alfresco/repo/bulkimport/impl/AbstractBulkImportTests.java b/source/java/org/alfresco/repo/bulkimport/impl/AbstractBulkImportTests.java index b8cf1c1a60..410ec724bd 100644 --- a/source/java/org/alfresco/repo/bulkimport/impl/AbstractBulkImportTests.java +++ b/source/java/org/alfresco/repo/bulkimport/impl/AbstractBulkImportTests.java @@ -43,6 +43,7 @@ import org.alfresco.query.PagingRequest; import org.alfresco.query.PagingResults; import org.alfresco.repo.content.MimetypeMap; import org.alfresco.repo.security.authentication.AuthenticationUtil; +import org.alfresco.service.cmr.action.ActionService; import org.alfresco.service.cmr.model.FileFolderService; import org.alfresco.service.cmr.model.FileInfo; import org.alfresco.service.cmr.repository.ContentReader; @@ -50,6 +51,7 @@ import org.alfresco.service.cmr.repository.ContentService; import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.repository.NodeService; import org.alfresco.service.cmr.repository.StoreRef; +import org.alfresco.service.cmr.rule.RuleService; import org.alfresco.service.namespace.QName; import org.alfresco.service.transaction.TransactionService; import org.alfresco.util.ApplicationContextHelper; @@ -70,10 +72,13 @@ public class AbstractBulkImportTests protected TransactionService transactionService; protected ContentService contentService; protected UserTransaction txn = null; + protected RuleService ruleService; + protected ActionService actionService; protected MultiThreadedBulkFilesystemImporter bulkImporter; protected NodeRef rootNodeRef; protected FileInfo topLevelFolder; + protected NodeRef top; protected static void startContext() { @@ -100,22 +105,26 @@ public class AbstractBulkImportTests transactionService = (TransactionService)ctx.getBean("transactionService"); bulkImporter = (MultiThreadedBulkFilesystemImporter)ctx.getBean("bulkFilesystemImporter"); contentService = (ContentService)ctx.getBean("contentService"); - - AuthenticationUtil.setRunAsUserSystem(); + actionService = (ActionService)ctx.getBean("actionService"); + ruleService = (RuleService)ctx.getBean("ruleService"); + + AuthenticationUtil.setFullyAuthenticatedUser(AuthenticationUtil.getAdminUserName()); String s = "BulkFilesystemImport" + System.currentTimeMillis(); txn = transactionService.getUserTransaction(); txn.begin(); + + AuthenticationUtil.pushAuthentication(); + AuthenticationUtil.setFullyAuthenticatedUser(AuthenticationUtil.getAdminUserName()); + StoreRef storeRef = nodeService.createStore(StoreRef.PROTOCOL_WORKSPACE, s); rootNodeRef = nodeService.getRootNode(storeRef); - NodeRef top = nodeService.createNode(rootNodeRef, ContentModel.ASSOC_CHILDREN, QName.createQName("{namespace}top"), ContentModel.TYPE_FOLDER).getChildRef(); + top = nodeService.createNode(rootNodeRef, ContentModel.ASSOC_CHILDREN, QName.createQName("{namespace}top"), ContentModel.TYPE_FOLDER).getChildRef(); topLevelFolder = fileFolderService.create(top, s, ContentModel.TYPE_FOLDER); - txn.commit(); - txn = transactionService.getUserTransaction(); - txn.begin(); + txn.commit(); } catch(Throwable e) { @@ -126,6 +135,7 @@ public class AbstractBulkImportTests @After public void teardown() throws Exception { + AuthenticationUtil.popAuthentication(); if(txn != null) { txn.commit(); diff --git a/source/java/org/alfresco/repo/bulkimport/impl/BulkImportTest.java b/source/java/org/alfresco/repo/bulkimport/impl/BulkImportTest.java index 2de8af0397..97bda8aa4a 100644 --- a/source/java/org/alfresco/repo/bulkimport/impl/BulkImportTest.java +++ b/source/java/org/alfresco/repo/bulkimport/impl/BulkImportTest.java @@ -27,16 +27,27 @@ package org.alfresco.repo.bulkimport.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import java.io.Serializable; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.transaction.NotSupportedException; import javax.transaction.SystemException; +import org.alfresco.model.ContentModel; +import org.alfresco.repo.action.evaluator.NoConditionEvaluator; +import org.alfresco.repo.action.executer.CopyActionExecuter; +import org.alfresco.repo.action.executer.MoveActionExecuter; import org.alfresco.repo.bulkimport.BulkImportParameters; import org.alfresco.repo.bulkimport.NodeImporter; import org.alfresco.repo.content.MimetypeMap; +import org.alfresco.service.cmr.action.Action; +import org.alfresco.service.cmr.action.ActionCondition; import org.alfresco.service.cmr.model.FileInfo; import org.alfresco.service.cmr.repository.NodeRef; +import org.alfresco.service.cmr.rule.Rule; +import org.alfresco.service.cmr.rule.RuleType; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -63,8 +74,11 @@ public class BulkImportTest extends AbstractBulkImportTests } @Test - public void testCopyImportStriping() + public void testCopyImportStriping() throws Throwable { + txn = transactionService.getUserTransaction(); + txn.begin(); + NodeRef folderNode = topLevelFolder.getNodeRef(); try @@ -73,6 +87,7 @@ public class BulkImportTest extends AbstractBulkImportTests BulkImportParameters bulkImportParameters = new BulkImportParameters(); bulkImportParameters.setTarget(folderNode); bulkImportParameters.setReplaceExisting(true); + bulkImportParameters.setDisableRulesService(true); bulkImportParameters.setBatchSize(40); bulkImporter.bulkImport(bulkImportParameters, nodeImporter); } @@ -148,5 +163,110 @@ public class BulkImportTest extends AbstractBulkImportTests { }); } + + protected Rule createCopyRule(NodeRef targetNode, boolean isAppliedToChildren) + { + Rule rule = new Rule(); + rule.setRuleType(RuleType.INBOUND); + String title = "rule title " + System.currentTimeMillis(); + rule.setTitle(title); + rule.setDescription(title); + rule.applyToChildren(isAppliedToChildren); + + Map params = new HashMap(1); + params.put(MoveActionExecuter.PARAM_DESTINATION_FOLDER, targetNode); + + Action action = actionService.createAction(CopyActionExecuter.NAME, params); + ActionCondition condition = actionService.createActionCondition(NoConditionEvaluator.NAME); + action.addActionCondition(condition); + rule.setAction(action); + + return rule; + } + + @Test + public void testImportWithRules() throws Throwable + { + NodeRef folderNode = topLevelFolder.getNodeRef(); + NodeImporter nodeImporter = null; + + txn = transactionService.getUserTransaction(); + txn.begin(); + + NodeRef targetNode = fileFolderService.create(top, "target", ContentModel.TYPE_FOLDER).getNodeRef(); + + // Create a rule on the node into which we're importing + Rule newRule = createCopyRule(targetNode, false); + this.ruleService.saveRule(folderNode, newRule); + + txn.commit(); + + txn = transactionService.getUserTransaction(); + txn.begin(); + + nodeImporter = streamingNodeImporterFactory.getNodeImporter(ResourceUtils.getFile("classpath:bulkimport")); + + BulkImportParameters bulkImportParameters = new BulkImportParameters(); + bulkImportParameters.setTarget(folderNode); + bulkImportParameters.setReplaceExisting(true); + bulkImportParameters.setDisableRulesService(false); + bulkImportParameters.setBatchSize(40); + bulkImporter.bulkImport(bulkImportParameters, nodeImporter); + + System.out.println(bulkImporter.getStatus()); + + assertEquals("", 74, bulkImporter.getStatus().getNumberOfContentNodesCreated()); + + checkFiles(folderNode, null, 2, 9, new ExpectedFile[] { + new ExpectedFile("quickImg1.xls", MimetypeMap.MIMETYPE_EXCEL), + new ExpectedFile("quickImg1.doc", MimetypeMap.MIMETYPE_WORD), + new ExpectedFile("quick.txt", MimetypeMap.MIMETYPE_TEXT_PLAIN, "The quick brown fox jumps over the lazy dog"), + }, + new ExpectedFolder[] { + new ExpectedFolder("folder1"), + new ExpectedFolder("folder2") + }); + + List folders = getFolders(folderNode, "folder1"); + assertEquals("", 1, folders.size()); + NodeRef folder1 = folders.get(0).getNodeRef(); + checkFiles(folder1, null, 1, 0, null, new ExpectedFolder[] { + new ExpectedFolder("folder1.1") + }); + + folders = getFolders(folderNode, "folder2"); + assertEquals("", 1, folders.size()); + NodeRef folder2 = folders.get(0).getNodeRef(); + checkFiles(folder2, null, 1, 0, new ExpectedFile[] { + }, + new ExpectedFolder[] { + new ExpectedFolder("folder2.1") + }); + + folders = getFolders(folder1, "folder1.1"); + assertEquals("", 1, folders.size()); + NodeRef folder1_1 = folders.get(0).getNodeRef(); + checkFiles(folder1_1, null, 2, 12, new ExpectedFile[] { + new ExpectedFile("quick.txt", MimetypeMap.MIMETYPE_TEXT_PLAIN, "The quick brown fox jumps over the lazy dog"), + new ExpectedFile("quick.sxw", MimetypeMap.MIMETYPE_OPENOFFICE1_WRITER), + new ExpectedFile("quick.tar", "application/x-gtar"), + }, + new ExpectedFolder[] { + new ExpectedFolder("folder1.1.1"), + new ExpectedFolder("folder1.1.2") + }); + + folders = getFolders(folder2, "folder2.1"); + assertEquals("", 1, folders.size()); + NodeRef folder2_1 = folders.get(0).getNodeRef(); + + checkFiles(folder2_1, null, 0, 17, new ExpectedFile[] { + new ExpectedFile("quick.png", MimetypeMap.MIMETYPE_IMAGE_PNG), + new ExpectedFile("quick.pdf", MimetypeMap.MIMETYPE_PDF), + new ExpectedFile("quick.odt", MimetypeMap.MIMETYPE_OPENDOCUMENT_TEXT), + }, + new ExpectedFolder[] { + }); + } } diff --git a/source/java/org/alfresco/repo/bulkimport/impl/MultiThreadedBulkFilesystemImporter.java b/source/java/org/alfresco/repo/bulkimport/impl/MultiThreadedBulkFilesystemImporter.java index 928b8e24b0..18f48388b6 100644 --- a/source/java/org/alfresco/repo/bulkimport/impl/MultiThreadedBulkFilesystemImporter.java +++ b/source/java/org/alfresco/repo/bulkimport/impl/MultiThreadedBulkFilesystemImporter.java @@ -32,13 +32,13 @@ import org.alfresco.repo.bulkimport.BulkImportParameters; import org.alfresco.repo.bulkimport.FilesystemTracker; import org.alfresco.repo.bulkimport.ImportableItem; import org.alfresco.repo.bulkimport.NodeImporter; -import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; +import org.alfresco.repo.security.authentication.AuthenticationUtil; import org.alfresco.service.cmr.repository.NodeRef; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** - * Performs a filesystem import into the repository using the {@link BatchProcessor} in multiple threads. + * Performs a multi-threaded filesystem import into the repository using the {@link BatchProcessor}. * * @since 4.0 * @@ -65,17 +65,13 @@ public abstract class MultiThreadedBulkFilesystemImporter extends AbstractBulkFi { return bulkImportParameters.getNumThreads() != null ? bulkImportParameters.getNumThreads() : defaultNumThreads; } - - protected void handleRuleService(final BulkImportParameters bulkImportParameters) - { - - } protected BatchProcessor.BatchProcessWorker getWorker(final BulkImportParameters bulkImportParameters, final String lockToken, final NodeImporter nodeImporter, final FilesystemTracker filesystemTracker) { final int batchSize = bulkImportParameters.getBatchSize() != null ? bulkImportParameters.getBatchSize() : defaultBatchSize; final boolean rulesEnabled = ruleService.isEnabled(); + final String currentUser = AuthenticationUtil.getFullyAuthenticatedUser(); BatchProcessor.BatchProcessWorker worker = new BatchProcessor.BatchProcessWorker() { @@ -86,22 +82,14 @@ public abstract class MultiThreadedBulkFilesystemImporter extends AbstractBulkFi public void beforeProcess() throws Throwable { + // Run as the correct user + AuthenticationUtil.setRunAsUser(currentUser); + refreshLock(lockToken, batchSize * 250L); if(bulkImportParameters.isDisableRulesService() && rulesEnabled) { ruleService.disableRules(); } - - // Disable the auditable aspect's behaviours for this transaction only, to allow creation & modification dates to be set - transactionHelper.doInTransaction(new RetryingTransactionCallback() - { - @Override - public Void execute() throws Throwable - { - behaviourFilter.disableBehaviour(ContentModel.ASPECT_AUDITABLE); - return null; - } - }); } public void afterProcess() throws Throwable @@ -113,21 +101,22 @@ public abstract class MultiThreadedBulkFilesystemImporter extends AbstractBulkFi importStatus.incrementNumberOfBatchesCompleted(); - transactionHelper.doInTransaction(new RetryingTransactionCallback() - { - @Override - public Void execute() throws Throwable - { - behaviourFilter.enableBehaviour(ContentModel.ASPECT_AUDITABLE); - return null; - } - }); + AuthenticationUtil.clearCurrentSecurityContext(); } public void process(final ImportableItem importableItem) throws Throwable { - NodeRef nodeRef = nodeImporter.importImportableItem(importableItem, bulkImportParameters.isReplaceExisting()); - filesystemTracker.itemImported(nodeRef, importableItem); + try + { + behaviourFilter.disableBehaviour(ContentModel.ASPECT_AUDITABLE); + + NodeRef nodeRef = nodeImporter.importImportableItem(importableItem, bulkImportParameters.isReplaceExisting()); + filesystemTracker.itemImported(nodeRef, importableItem); + } + finally + { + behaviourFilter.enableBehaviour(ContentModel.ASPECT_AUDITABLE); + } } }; @@ -173,5 +162,22 @@ public abstract class MultiThreadedBulkFilesystemImporter extends AbstractBulkFi { return defaultBatchSize; } + + /** + * Method that does the work of importing a filesystem using the BatchProcessor. + * + * @param bulkImportParameters The bulk import parameters to apply to this bulk import. + * @param nodeImporter The node importer implementation that will import each node. + * @param lockToken The lock token to use during the bulk import. + */ + @Override + protected void bulkImportImpl(final BulkImportParameters bulkImportParameters, final NodeImporter nodeImporter, final String lockToken) + { + final int batchSize = getBatchSize(bulkImportParameters); + final int numThreads = getNumThreads(bulkImportParameters); + + importStatus.setNumThreads(numThreads); + importStatus.setBatchSize(batchSize); + } } diff --git a/source/java/org/alfresco/repo/bulkimport/impl/StripingBulkFilesystemImporter.java b/source/java/org/alfresco/repo/bulkimport/impl/StripingBulkFilesystemImporter.java index 08e448d65b..f315f317c0 100644 --- a/source/java/org/alfresco/repo/bulkimport/impl/StripingBulkFilesystemImporter.java +++ b/source/java/org/alfresco/repo/bulkimport/impl/StripingBulkFilesystemImporter.java @@ -42,14 +42,15 @@ public class StripingBulkFilesystemImporter extends MultiThreadedBulkFilesystemI /** * Method that does the work of importing a filesystem using the BatchProcessor. * - * @param target The target space to ingest the content into (must not be null and must be a valid, writable space in the repository). - * @param sourceFolder The original directory from which this import was initiated (must not be null). - * @param source The source directory on the local filesystem to read content from (must not be null and must be a valid, readable directory on the local filesystem). - * @param replaceExisting A flag indicating whether to replace (true) or skip (false) files that are already in the repository. + * @param bulkImportParameters The bulk import parameters to apply to this bulk import. + * @param nodeImporter The node importer implementation that will import each node. + * @param lockToken The lock token to use during the bulk import. */ @Override - protected void bulkImportImpl(final BulkImportParameters bulkImportParameters, final NodeImporter nodeImporter, final String lockToken) throws Throwable + protected void bulkImportImpl(final BulkImportParameters bulkImportParameters, final NodeImporter nodeImporter, final String lockToken) { + super.bulkImportImpl(bulkImportParameters, nodeImporter, lockToken); + final File sourceFolder = nodeImporter.getSourceFolder(); final int batchSize = getBatchSize(bulkImportParameters); final int loggingInterval = getLoggingInterval(bulkImportParameters);