Initial commit for ALF-10419:

- Bulk Filesystem Importer
- Adapted from work by Peter Monks (see http://code.google.com/p/alfresco-bulk-filesystem-import) and Romain Guinot
- Refactored to limit repeated code
- Refactored to use the Bulk Processor
- Currently limited to one bulk import at a time (enforced by lock service)
- Unit tests added
- Some fixes applied
- Performance tests; tried different filesystem "walking" strategies
- Still to do: yui dependencies - are these necessary?

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@31100 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Steven Glover
2011-10-10 18:45:00 +00:00
parent 717e89267e
commit 3d6b44bbdf
122 changed files with 14862 additions and 787 deletions

View File

@@ -0,0 +1,423 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.repo.bulkimport.BulkFSImportEvent;
import org.alfresco.repo.bulkimport.BulkFilesystemImporter;
import org.alfresco.repo.bulkimport.BulkImportParameters;
import org.alfresco.repo.bulkimport.BulkImportStatus;
import org.alfresco.repo.bulkimport.DirectoryAnalyser;
import org.alfresco.repo.bulkimport.NodeImporter;
import org.alfresco.repo.lock.JobLockService;
import org.alfresco.repo.lock.LockAcquisitionException;
import org.alfresco.repo.policy.BehaviourFilter;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork;
import org.alfresco.repo.transaction.RetryingTransactionHelper;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.cmr.model.FileFolderService;
import org.alfresco.service.cmr.model.FileInfo;
import org.alfresco.service.cmr.model.FileNotFoundException;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.security.AccessStatus;
import org.alfresco.service.cmr.security.PermissionService;
import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.PropertyCheck;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
/**
*
* @since 4.0
*
*/
public abstract class AbstractBulkFilesystemImporter implements BulkFilesystemImporter, InitializingBean, ApplicationContextAware
{
private static final QName LOCK = QName.createQName(NamespaceService.SYSTEM_MODEL_1_0_URI, "BatchFilesystemImport");
protected static final Log logger = LogFactory.getLog(BulkFilesystemImporter.class);
protected ApplicationContext applicationContext;
protected FileFolderService fileFolderService;
protected TransactionService transactionService;
protected PermissionService permissionService;
protected RetryingTransactionHelper transactionHelper;
protected BulkImportStatusImpl importStatus;
protected DirectoryAnalyser directoryAnalyser = null;
protected JobLockService jobLockService;
protected BehaviourFilter behaviourFilter;
public void setBehaviourFilter(BehaviourFilter behaviourFilter)
{
this.behaviourFilter = behaviourFilter;
}
public void setJobLockService(JobLockService jobLockService)
{
this.jobLockService = jobLockService;
}
public void setImportStatus(BulkImportStatusImpl importStatus)
{
this.importStatus = importStatus;
}
public final void setDirectoryAnalyser(DirectoryAnalyser directoryAnalyser)
{
this.directoryAnalyser = directoryAnalyser;
}
public void setFileFolderService(FileFolderService fileFolderService)
{
this.fileFolderService = fileFolderService;
}
public void setTransactionService(TransactionService transactionService)
{
this.transactionService = transactionService;
}
public void setPermissionService(PermissionService permissionService)
{
this.permissionService = permissionService;
}
/**
* @see org.alfresco.extension.bulkfilesystemimport.BulkFilesystemImporter#getStatus()
*/
public final BulkImportStatus getStatus()
{
return(importStatus);
}
public void afterPropertiesSet() throws Exception
{
PropertyCheck.mandatory(this, "fileFolderService", fileFolderService);
PropertyCheck.mandatory(this, "transactionService", transactionService);
PropertyCheck.mandatory(this, "permissionService", permissionService);
PropertyCheck.mandatory(this, "importStatus", importStatus);
PropertyCheck.mandatory(this, "directoryAnalyser", directoryAnalyser);
this.transactionHelper = transactionService.getRetryingTransactionHelper();
}
protected abstract void bulkImportImpl(BulkImportParameters bulkImportParameters, NodeImporter nodeImporter, String lockToken) throws Throwable;
/**
* Attempts to get the lock. If the lock couldn't be taken, then <tt>null</tt> is returned.
*
* @return Returns the lock token or <tt>null</tt>
*/
protected String getLock(long time)
{
try
{
return jobLockService.getLock(LOCK, time);
}
catch (LockAcquisitionException e)
{
return null;
}
}
/**
* Attempts to get the lock. If it fails, the current transaction is marked for rollback.
*
* @return Returns the lock token
*/
protected void refreshLock(String lockToken, long time)
{
if (lockToken == null)
{
throw new IllegalArgumentException("Must provide existing lockToken");
}
jobLockService.refreshLock(lockToken, LOCK, time);
}
protected void releaseLock(String lockToken)
{
if (lockToken == null)
{
throw new IllegalArgumentException("Must provide existing lockToken");
}
jobLockService.releaseLock(lockToken, LOCK);
}
/*
* Because commons-lang ToStringBuilder doesn't seem to like unmodifiable Maps
*/
protected final String mapToString(Map<?, ?> map)
{
StringBuffer result = new StringBuffer();
if (map != null)
{
result.append('[');
if (map.size() > 0)
{
for (Object key : map.keySet())
{
result.append(String.valueOf(key));
result.append(" = ");
result.append(String.valueOf(map.get(key)));
result.append(",\n");
}
// Delete final dangling ", " value
result.delete(result.length() - 2, result.length());
}
result.append(']');
}
else
{
result.append("(null)");
}
return(result.toString());
}
protected final String getRepositoryPath(NodeRef nodeRef)
{
String result = null;
if (nodeRef != null)
{
List<FileInfo> pathElements = null;
try
{
pathElements = fileFolderService.getNamePath(null, nodeRef);
if (pathElements != null && pathElements.size() > 0)
{
StringBuilder temp = new StringBuilder();
for (FileInfo pathElement : pathElements)
{
temp.append("/");
temp.append(pathElement.getName());
}
result = temp.toString();
}
}
catch (final FileNotFoundException fnfe)
{
// Do nothing
}
}
return(result);
}
protected final void validateNodeRefIsWritableSpace(NodeRef target)
{
if (target == null)
{
throw new IllegalArgumentException("target must not be null.");
}
if (!fileFolderService.exists(target))
{
throw new IllegalArgumentException("Target '" + target.toString() + "' doesn't exist.");
}
if (AccessStatus.DENIED.equals(permissionService.hasPermission(target, PermissionService.ADD_CHILDREN)))
{
throw new IllegalArgumentException("Target '" + target.toString() + "' is not writeable.");
}
if (!fileFolderService.getFileInfo(target).isFolder())
{
throw new IllegalArgumentException("Target '" + target.toString() + "' is not a space.");
}
}
protected String getFileName(File file)
{
return FileUtils.getFileName(file);
}
protected String getLockToken()
{
// Take out a bulk filesystem import lock
RetryingTransactionCallback<String> txnWork = new RetryingTransactionCallback<String>()
{
public String execute() throws Exception
{
String lockToken = getLock(20000L);
return lockToken;
}
};
String lockToken = transactionService.getRetryingTransactionHelper().doInTransaction(txnWork, false, true);
// if(lockToken == null)
// {
// logger.warn("Can't get lock. Assume multiple bulk filesystem importers ...");
// return;
// }
return lockToken;
}
public void validateSourceIsReadableDirectory(File source)
{
try
{
if (source == null)
{
throw new IllegalArgumentException("source must not be null.");
}
if (!source.exists())
{
throw new IllegalArgumentException("Source '" + source.getCanonicalPath() + "' doesn't exist.");
}
if (!source.canRead())
{
throw new IllegalArgumentException("Source '" + source.getCanonicalPath() + "' is not readable.");
}
if (!source.isDirectory())
{
throw new IllegalArgumentException("Source '" + source.getCanonicalPath() + "' is not a directory.");
}
}
catch (final IOException ioe)
{
throw new RuntimeException(ioe);
}
}
public void asyncBulkImport(final BulkImportParameters bulkImportParameters, final NodeImporter nodeImporter)
{
final String currentUser = AuthenticationUtil.getFullyAuthenticatedUser();
Runnable backgroundLogic = new Runnable()
{
public void run()
{
AuthenticationUtil.runAs(new RunAsWork<Object>()
{
public Object doWork()
{
bulkImport(bulkImportParameters, nodeImporter);
return null;
}
}, currentUser);
}
};
Thread backgroundThread = new Thread(backgroundLogic, "BulkFilesystemImport-BackgroundThread");
//backgroundThread.setDaemon(true);
backgroundThread.start();
}
/**
* @see org.alfresco.extension.bulkfilesystemimport.BulkFilesystemImporter#bulkImport(java.io.File, org.alfresco.service.cmr.repository.NodeRef, boolean)
*/
public void bulkImport(final BulkImportParameters bulkImportParameters, final NodeImporter nodeImporter)
{
final File sourceFolder = nodeImporter.getSourceFolder();
final BulkFilesystemImporter importer = this;
transactionHelper.doInTransaction(new RetryingTransactionCallback<Void>()
{
@Override
public Void execute() throws Throwable
{
String sourceDirectory = getFileName(sourceFolder);
String targetSpace = getRepositoryPath(bulkImportParameters.getTarget());
String lockToken = getLockToken();
try
{
importStatus.startImport(sourceDirectory, targetSpace);
BulkFSImportEvent bulkImportEvent = new BulkFSImportEvent(importer);
applicationContext.publishEvent(bulkImportEvent);
validateNodeRefIsWritableSpace(bulkImportParameters.getTarget());
validateSourceIsReadableDirectory(sourceFolder);
if(logger.isDebugEnabled())
{
logger.debug("Bulk import started from '" + sourceFolder.getAbsolutePath() + "'...");
}
bulkImportImpl(bulkImportParameters, nodeImporter, lockToken);
importStatus.stopImport();
if(logger.isDebugEnabled())
{
logger.debug("Bulk import from '" + getFileName(sourceFolder) + "' succeeded.");
}
return null;
}
catch(Throwable e)
{
logger.error("Bulk import from '" + getFileName(sourceFolder) + "' failed.", e);
importStatus.stopImport(e);
throw new AlfrescoRuntimeException("Bulk filesystem import failed", e);
}
finally
{
BulkFSImportEvent bulkImportEvent = new BulkFSImportEvent(importer);
applicationContext.publishEvent(bulkImportEvent);
releaseLock(lockToken);
}
}
}, false, true);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
{
this.applicationContext = applicationContext;
}
}

View File

@@ -0,0 +1,266 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.transaction.NotSupportedException;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import org.alfresco.model.ContentModel;
import org.alfresco.query.CannedQueryPageDetails;
import org.alfresco.query.PagingRequest;
import org.alfresco.query.PagingResults;
import org.alfresco.repo.content.MimetypeMap;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.service.cmr.model.FileFolderService;
import org.alfresco.service.cmr.model.FileInfo;
import org.alfresco.service.cmr.repository.ContentReader;
import org.alfresco.service.cmr.repository.ContentService;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.repository.StoreRef;
import org.alfresco.service.namespace.QName;
import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.ApplicationContextHelper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.springframework.context.ApplicationContext;
/**
* @since 4.0
*/
public class AbstractBulkImportTests
{
protected static ApplicationContext ctx = null;
protected FileFolderService fileFolderService;
protected NodeService nodeService;
protected TransactionService transactionService;
protected ContentService contentService;
protected UserTransaction txn = null;
protected MultiThreadedBulkFilesystemImporter bulkImporter;
protected NodeRef rootNodeRef;
protected FileInfo topLevelFolder;
protected static void startContext()
{
ctx = ApplicationContextHelper.getApplicationContext();
}
protected static void startContext(String[] configLocations)
{
ctx = ApplicationContextHelper.getApplicationContext(configLocations);
}
protected static void stopContext()
{
ApplicationContextHelper.closeApplicationContext();
}
@Before
public void setup() throws SystemException, NotSupportedException
{
try
{
nodeService = (NodeService)ctx.getBean("nodeService");
fileFolderService = (FileFolderService)ctx.getBean("fileFolderService");
transactionService = (TransactionService)ctx.getBean("transactionService");
bulkImporter = (MultiThreadedBulkFilesystemImporter)ctx.getBean("bulkFilesystemImporter");
contentService = (ContentService)ctx.getBean("contentService");
AuthenticationUtil.setRunAsUserSystem();
String s = "BulkFilesystemImport" + System.currentTimeMillis();
txn = transactionService.getUserTransaction();
txn.begin();
StoreRef storeRef = nodeService.createStore(StoreRef.PROTOCOL_WORKSPACE, s);
rootNodeRef = nodeService.getRootNode(storeRef);
NodeRef top = nodeService.createNode(rootNodeRef, ContentModel.ASSOC_CHILDREN, QName.createQName("{namespace}top"), ContentModel.TYPE_FOLDER).getChildRef();
topLevelFolder = fileFolderService.create(top, s, ContentModel.TYPE_FOLDER);
txn.commit();
txn = transactionService.getUserTransaction();
txn.begin();
}
catch(Throwable e)
{
fail(e.getMessage());
}
}
@After
public void teardown() throws Exception
{
if(txn != null)
{
txn.commit();
}
}
@AfterClass
public static void afterTests()
{
stopContext();
}
protected List<FileInfo> getFolders(NodeRef parent, String pattern)
{
PagingResults<FileInfo> page = fileFolderService.list(parent, false, true, pattern, null, null, new PagingRequest(CannedQueryPageDetails.DEFAULT_PAGE_SIZE));
List<FileInfo> folders = page.getPage();
return folders;
}
protected List<FileInfo> getFiles(NodeRef parent, String pattern)
{
PagingResults<FileInfo> page = fileFolderService.list(parent, true, false, pattern, null, null, new PagingRequest(CannedQueryPageDetails.DEFAULT_PAGE_SIZE));
List<FileInfo> files = page.getPage();
return files;
}
protected Map<String, FileInfo> toMap(List<FileInfo> list)
{
Map<String, FileInfo> map = new HashMap<String, FileInfo>(list.size());
for(FileInfo fileInfo : list)
{
map.put(fileInfo.getName(), fileInfo);
}
return map;
}
protected void checkFolder(NodeRef folderNode, String childFolderName, String pattern, int numExpectedFolders, int numExpectedFiles, ExpectedFolder[] expectedFolders, ExpectedFile[] expectedFiles)
{
List<FileInfo> folders = getFolders(folderNode, childFolderName);
assertEquals("", 1, folders.size());
NodeRef folder1 = folders.get(0).getNodeRef();
checkFiles(folder1, pattern, numExpectedFolders, numExpectedFiles, expectedFiles, expectedFolders);
}
protected void checkFiles(NodeRef parent, String pattern, int expectedNumFolders, int expectedNumFiles,
ExpectedFile[] expectedFiles, ExpectedFolder[] expectedFolders)
{
Map<String, FileInfo> folders = toMap(getFolders(parent, pattern));
Map<String, FileInfo> files = toMap(getFiles(parent, pattern));
assertEquals("", expectedNumFolders, folders.size());
assertEquals("", expectedNumFiles, files.size());
if(expectedFiles != null)
{
for(ExpectedFile expectedFile : expectedFiles)
{
FileInfo fileInfo = files.get(expectedFile.getName());
assertNotNull("", fileInfo);
assertNotNull("", fileInfo.getContentData());
assertEquals(expectedFile.getMimeType(), fileInfo.getContentData().getMimetype());
if(fileInfo.getContentData().getMimetype() == MimetypeMap.MIMETYPE_TEXT_PLAIN
&& expectedFile.getContentContains() != null)
{
ContentReader reader = contentService.getReader(fileInfo.getNodeRef(), ContentModel.PROP_CONTENT);
String contentContains = expectedFile.getContentContains();
assertTrue("", reader.getContentString().indexOf(contentContains) != -1);
}
}
}
if(expectedFolders != null)
{
for(ExpectedFolder expectedFolder : expectedFolders)
{
FileInfo fileInfo = folders.get(expectedFolder.getName());
assertNotNull("", fileInfo);
}
}
}
protected void checkContent(FileInfo file, String name, String mimeType)
{
assertEquals("", name, file.getName());
assertEquals("", mimeType, file.getContentData().getMimetype());
}
protected static class ExpectedFolder
{
private String name;
public ExpectedFolder(String name)
{
super();
this.name = name;
}
public String getName()
{
return name;
}
}
protected static class ExpectedFile
{
private String name;
private String mimeType;
private String contentContains = null;
public ExpectedFile(String name, String mimeType, String contentContains)
{
this(name, mimeType);
this.contentContains = contentContains;
}
public ExpectedFile(String name, String mimeType)
{
super();
this.name = name;
this.mimeType = mimeType;
}
public String getName()
{
return name;
}
public String getMimeType()
{
return mimeType;
}
public String getContentContains()
{
return contentContains;
}
}
}

View File

@@ -0,0 +1,96 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import java.io.File;
import java.io.FileFilter;
import org.alfresco.repo.bulkimport.AnalysedDirectory;
import org.alfresco.repo.bulkimport.DirectoryAnalyser;
import org.alfresco.repo.bulkimport.FilesystemTracker;
import org.alfresco.repo.bulkimport.ImportableItem;
import org.alfresco.util.PropertyCheck;
import org.apache.log4j.Logger;
/**
*
* @since 4.0
*
*/
public abstract class AbstractFilesystemTracker implements FilesystemTracker
{
protected static Logger logger = Logger.getLogger(FilesystemTracker.class);
protected DirectoryAnalyser directoryAnalyser = null;
public final void setDirectoryAnalyser(DirectoryAnalyser directoryAnalyser)
{
this.directoryAnalyser = directoryAnalyser;
}
public void afterPropertiesSet() throws Exception
{
PropertyCheck.mandatory(this, "directoryAnalyser", directoryAnalyser);
}
protected final AnalysedDirectory getImportableItemsInDirectory(ImportableItem directory)
{
AnalysedDirectory analysedDirectory = directoryAnalyser.analyseDirectory(directory, null);
return analysedDirectory;
}
protected final AnalysedDirectory getImportableDirectoriesInDirectory(ImportableItem directory, final int count)
{
FileFilter filter = null;
if(count != -1)
{
filter = new FileFilter()
{
private int i = count;
@Override
public boolean accept(File file)
{
return file.isDirectory() && i-- > 0;
}
};
}
else
{
filter = new FileFilter()
{
@Override
public boolean accept(File file)
{
return file.isDirectory();
}
};
}
AnalysedDirectory analysedDirectory = directoryAnalyser.analyseDirectory(directory, filter);
return analysedDirectory;
}
}

View File

@@ -0,0 +1,442 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import java.io.File;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.bulkimport.BulkFilesystemImporter;
import org.alfresco.repo.bulkimport.DirectoryAnalyser;
import org.alfresco.repo.bulkimport.ImportableItem;
import org.alfresco.repo.bulkimport.MetadataLoader;
import org.alfresco.repo.bulkimport.NodeImporter;
import org.alfresco.repo.bulkimport.impl.BulkImportStatusImpl.NodeState;
import org.alfresco.repo.policy.BehaviourFilter;
import org.alfresco.repo.version.VersionModel;
import org.alfresco.service.cmr.model.FileExistsException;
import org.alfresco.service.cmr.model.FileFolderService;
import org.alfresco.service.cmr.repository.InvalidNodeRefException;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.version.VersionService;
import org.alfresco.service.cmr.version.VersionType;
import org.alfresco.service.namespace.QName;
import org.alfresco.util.Triple;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Abstract base class for the node importer, containing helper methods for use by subclasses.
*
* @since 4.0
*
*/
public abstract class AbstractNodeImporter implements NodeImporter
{
protected final static Log logger = LogFactory.getLog(BulkFilesystemImporter.class);
protected FileFolderService fileFolderService;
protected NodeService nodeService;
protected MetadataLoader metadataLoader = null;
protected BulkImportStatusImpl importStatus;
protected VersionService versionService;
protected BehaviourFilter behaviourFilter;
public void setVersionService(VersionService versionService)
{
this.versionService = versionService;
}
public void setFileFolderService(FileFolderService fileFolderService)
{
this.fileFolderService = fileFolderService;
}
public void setNodeService(NodeService nodeService)
{
this.nodeService = nodeService;
}
public void setMetadataLoader(MetadataLoader metadataLoader)
{
this.metadataLoader = metadataLoader;
}
public void setImportStatus(BulkImportStatusImpl importStatus)
{
this.importStatus = importStatus;
}
public void setBehaviourFilter(BehaviourFilter behaviourFilter)
{
this.behaviourFilter = behaviourFilter;
}
protected abstract NodeRef importImportableItemImpl(ImportableItem importableItem, boolean replaceExisting);
protected abstract void importContentAndMetadata(NodeRef nodeRef, ImportableItem.ContentAndMetadata contentAndMetadata, MetadataLoader.Metadata metadata);
/*
* Because commons-lang ToStringBuilder doesn't seem to like unmodifiable Maps
*/
protected final String mapToString(Map<?, ?> map)
{
StringBuffer result = new StringBuffer();
if (map != null)
{
result.append('[');
if (map.size() > 0)
{
for (Object key : map.keySet())
{
result.append(String.valueOf(key));
result.append(" = ");
result.append(String.valueOf(map.get(key)));
result.append(",\n");
}
// Delete final dangling ", " value
result.delete(result.length() - 2, result.length());
}
result.append(']');
}
else
{
result.append("(null)");
}
return(result.toString());
}
/**
* Returns the name of the given importable item. This is the final name of the item, as it would appear in the repository,
* after metadata renames are taken into account.
*
* @param importableItem The importableItem with which to
* @param metadata
* @return the name of the given importable item
*/
protected final String getImportableItemName(ImportableItem importableItem, MetadataLoader.Metadata metadata)
{
String result = null;
// Step 1: attempt to get name from metadata
if (metadata != null)
{
result = (String)metadata.getProperties().get(ContentModel.PROP_NAME);
}
// Step 2: attempt to get name from metadata file
if (result == null &&
importableItem != null &&
importableItem.getHeadRevision() != null)
{
File metadataFile = importableItem.getHeadRevision().getMetadataFile();
if (metadataFile != null)
{
final String metadataFileName = metadataFile.getName();
result = metadataFileName.substring(0, metadataFileName.length() -
(MetadataLoader.METADATA_SUFFIX.length() + metadataLoader.getMetadataFileExtension().length()));
}
}
return(result);
}
protected final int importImportableItemFile(NodeRef nodeRef, ImportableItem importableItem, MetadataLoader.Metadata metadata)
{
int result = 0;
if (importableItem.hasVersionEntries())
{
// If cm:versionable isn't listed as one of the aspects for this node, add it - cm:versionable is required for nodes that have versions
if (!metadata.getAspects().contains(ContentModel.ASPECT_VERSIONABLE))
{
if (logger.isWarnEnabled()) logger.warn("Metadata for file '" + getFileName(importableItem.getHeadRevision().getContentFile()) + "' was missing the cm:versionable aspect, yet it has " + importableItem.getVersionEntries().size() + " versions. Adding cm:versionable.");
metadata.addAspect(ContentModel.ASPECT_VERSIONABLE);
}
result = importContentVersions(nodeRef, importableItem);
}
if (logger.isDebugEnabled()) logger.debug("Creating head revision of node " + nodeRef.toString());
importContentAndMetadata(nodeRef, importableItem.getHeadRevision(), metadata);
return(result);
}
protected final int importContentVersions(NodeRef nodeRef, ImportableItem importableItem)
{
int result = 0;
for (final ImportableItem.VersionedContentAndMetadata versionEntry : importableItem.getVersionEntries())
{
Map<String, Serializable> versionProperties = new HashMap<String, Serializable>();
MetadataLoader.Metadata metadata = loadMetadata(versionEntry);
importContentAndMetadata(nodeRef, versionEntry, metadata);
if (logger.isDebugEnabled()) logger.debug("Creating v" + String.valueOf(versionEntry.getVersion()) + " of node '" + nodeRef.toString() + "' (note: version label in Alfresco will not be the same - it is not currently possible to explicitly force a particular version label).");
// Note: PROP_VERSION_LABEL is a "reserved" property, and cannot be modified by custom code.
// In other words, we can't use the version label on disk as the version label in Alfresco. :-(
// See: http://code.google.com/p/alfresco-bulk-filesystem-import/issues/detail?id=85
//versionProperties.put(ContentModel.PROP_VERSION_LABEL.toPrefixString(), String.valueOf(versionEntry.getVersion()));
versionProperties.put(VersionModel.PROP_VERSION_TYPE, VersionType.MAJOR); // Load every version as a major version for now - see http://code.google.com/p/alfresco-bulk-filesystem-import/issues/detail?id=84
versionService.createVersion(nodeRef, versionProperties);
result += metadata.getProperties().size() + 4; // Add 4 for "standard" metadata properties read from filesystem
}
return(result);
}
protected final Triple<NodeRef, Boolean, NodeState> createOrFindNode(NodeRef target, ImportableItem importableItem,
boolean replaceExisting, MetadataLoader.Metadata metadata)
{
Triple<NodeRef, Boolean, NodeState> result = null;
boolean isDirectory = false;
NodeState nodeState = replaceExisting ? NodeState.REPLACED : NodeState.SKIPPED;
String nodeName = getImportableItemName(importableItem, metadata);
NodeRef nodeRef = null;
//####TODO: handle this more elegantly
if (nodeName == null)
{
throw new IllegalStateException("Unable to determine node name for " + String.valueOf(importableItem));
}
if (logger.isDebugEnabled())
{
logger.debug("Searching for node with name '" + nodeName + "' within node '" + target.toString() + "'.");
}
nodeRef = fileFolderService.searchSimple(target, nodeName);
// If we didn't find an existing item, create a new node in the repo.
if (nodeRef == null)
{
// But only if the content file exists - we don't create new nodes based on metadata-only importableItems
if (importableItem.getHeadRevision().contentFileExists())
{
isDirectory = ImportableItem.FileType.DIRECTORY.equals(importableItem.getHeadRevision().getContentFileType());
try
{
if (logger.isDebugEnabled()) logger.debug("Creating new node of type '" + metadata.getType().toString() + "' with name '" + nodeName + "' within node '" + target.toString() + "'.");
nodeRef = fileFolderService.create(target, nodeName, metadata.getType()).getNodeRef();
nodeState = NodeState.CREATED;
}
catch (final FileExistsException fee)
{
if (logger.isWarnEnabled()) logger.warn("Node with name '" + nodeName + "' within node '" + target.toString() + "' was created concurrently to the bulk import. Skipping importing it.", fee);
nodeRef = null;
nodeState = NodeState.SKIPPED;
}
}
else
{
if (logger.isDebugEnabled()) logger.debug("Skipping creation of new node '" + nodeName + "' within node '" + target.toString() + "' since it doesn't have a content file.");
nodeRef = null;
nodeState = NodeState.SKIPPED;
}
}
// We found the node in the repository. Make sure we return the NodeRef, so that recursive loading works (we need the NodeRef of all sub-spaces, even if we didn't create them).
else
{
if (replaceExisting)
{
boolean targetNodeIsSpace = fileFolderService.getFileInfo(nodeRef).isFolder();
if (importableItem.getHeadRevision().contentFileExists())
{
// If the source file exists, ensure that the target node is of the same type (i.e. file or folder) as it.
isDirectory = ImportableItem.FileType.DIRECTORY.equals(importableItem.getHeadRevision().getContentFileType());
if (isDirectory != targetNodeIsSpace)
{
if (logger.isWarnEnabled()) logger.warn("Skipping replacement of " + (isDirectory ? "Directory " : "File ") +
"'" + getFileName(importableItem.getHeadRevision().getContentFile()) + "'. " +
"The target node in the repository is a " + (targetNodeIsSpace ? "space node" : "content node") + ".");
nodeState = NodeState.SKIPPED;
}
}
else
{
isDirectory = targetNodeIsSpace;
}
if (nodeRef != null)
{
if (metadata.getType() != null)
{
// Finally, specialise the type.
if (logger.isDebugEnabled()) logger.debug("Specialising type of node '" + nodeRef.toString() + "' to '" + String.valueOf(metadata.getType()) + "'.");
nodeService.setType(nodeRef, metadata.getType());
}
nodeState = NodeState.REPLACED;
}
}
else
{
if (logger.isDebugEnabled()) logger.debug("Found content node '" + nodeRef.toString() + "', but replaceExisting=false, so skipping it.");
nodeState = NodeState.SKIPPED;
}
}
result = new Triple<NodeRef, Boolean, NodeState>(nodeRef, isDirectory, nodeState);
return(result);
}
protected String getFileName(File file)
{
return FileUtils.getFileName(file);
}
protected final void importImportableItemMetadata(NodeRef nodeRef, File parentFile, MetadataLoader.Metadata metadata)
{
// Attach aspects
if (metadata.getAspects() != null)
{
for (final QName aspect : metadata.getAspects())
{
if (logger.isDebugEnabled()) logger.debug("Attaching aspect '" + aspect.toString() + "' to node '" + nodeRef.toString() + "'.");
nodeService.addAspect(nodeRef, aspect, null); // Note: we set the aspect's properties separately, hence null for the third parameter
}
}
// Set property values for both the type and any aspect(s)
if (metadata.getProperties() != null)
{
if (logger.isDebugEnabled()) logger.debug("Adding properties to node '" + nodeRef.toString() + "':\n" + mapToString(metadata.getProperties()));
try
{
nodeService.addProperties(nodeRef, metadata.getProperties());
}
catch (final InvalidNodeRefException inre)
{
if (!nodeRef.equals(inre.getNodeRef()))
{
// Caused by an invalid NodeRef in the metadata (e.g. in an association)
throw new IllegalStateException("Invalid nodeRef found in metadata for '" + getFileName(parentFile) + "'. " +
"Probable cause: an association is being populated via metadata, but the " +
"NodeRef for the target of that association ('" + inre.getNodeRef() + "') is invalid. " +
"Please double check your metadata file and try again.", inre);
}
else
{
// Logic bug in the BFSIT. :-(
throw inre;
}
}
}
}
protected final void importImportableItemDirectory(NodeRef nodeRef, ImportableItem importableItem, MetadataLoader.Metadata metadata)
{
if (importableItem.hasVersionEntries())
{
logger.warn("Skipping versions for directory '" + getFileName(importableItem.getHeadRevision().getContentFile()) + "' - Alfresco does not support versioned spaces.");
}
// Attach aspects and set all properties
importImportableItemMetadata(nodeRef, importableItem.getHeadRevision().getContentFile(), metadata);
}
protected final MetadataLoader.Metadata loadMetadata(ImportableItem.ContentAndMetadata contentAndMetadata)
{
MetadataLoader.Metadata result = new MetadataLoader.Metadata();
// Load "standard" metadata from the filesystem
if (contentAndMetadata != null && contentAndMetadata.contentFileExists())
{
final String filename = contentAndMetadata.getContentFile().getName().trim().replaceFirst(DirectoryAnalyser.VERSION_SUFFIX_REGEX, ""); // Strip off the version suffix (if any)
final Date modified = new Date(contentAndMetadata.getContentFile().lastModified());
final Date created = modified; //TODO: determine proper file creation time (awaiting JDK 1.7 NIO2 library)
result.setType(ImportableItem.FileType.FILE.equals(contentAndMetadata.getContentFileType()) ? ContentModel.TYPE_CONTENT : ContentModel.TYPE_FOLDER);
result.addProperty(ContentModel.PROP_NAME, filename);
result.addProperty(ContentModel.PROP_TITLE, filename);
result.addProperty(ContentModel.PROP_CREATED, created);
result.addProperty(ContentModel.PROP_MODIFIED, modified);
}
if (metadataLoader != null)
{
metadataLoader.loadMetadata(contentAndMetadata, result);
}
return(result);
}
public NodeRef importImportableItem(ImportableItem importableItem, boolean replaceExisting)
{
if(logger.isDebugEnabled())
{
logger.debug("Importing " + String.valueOf(importableItem));
}
NodeRef nodeRef = importImportableItemImpl(importableItem, replaceExisting);
// allow parent to be garbage collected
//importableItem.setParent(null);
// importableItem.clearParent();
importableItem.setNodeRef(nodeRef);
return nodeRef;
}
protected void skipImportableDirectory(ImportableItem importableItem)
{
if (logger.isInfoEnabled())
{
logger.info("Skipping '" + getFileName(importableItem.getHeadRevision().getContentFile()) + "' as it already exists in the repository and 'replace existing' is false.");
}
importStatus.incrementImportableItemsSkipped(importableItem, true);
}
protected void skipImportableFile(ImportableItem importableItem)
{
if (logger.isInfoEnabled())
{
logger.info("Skipping '" + getFileName(importableItem.getHeadRevision().getContentFile()) + "' as it already exists in the repository and 'replace existing' is false.");
}
importStatus.incrementImportableItemsSkipped(importableItem, false);
}
}

View File

@@ -0,0 +1,76 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import org.alfresco.repo.bulkimport.MetadataLoader;
import org.alfresco.repo.policy.BehaviourFilter;
import org.alfresco.service.cmr.model.FileFolderService;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.version.VersionService;
/**
*
* @since 4.0
*
*/
public class AbstractNodeImporterFactory
{
protected FileFolderService fileFolderService;
protected NodeService nodeService;
protected MetadataLoader metadataLoader = null;
protected BulkImportStatusImpl importStatus;
protected VersionService versionService;
protected BehaviourFilter behaviourFilter;
public void setFileFolderService(FileFolderService fileFolderService)
{
this.fileFolderService = fileFolderService;
}
public void setNodeService(NodeService nodeService)
{
this.nodeService = nodeService;
}
public void setMetadataLoader(MetadataLoader metadataLoader)
{
this.metadataLoader = metadataLoader;
}
public void setImportStatus(BulkImportStatusImpl importStatus)
{
this.importStatus = importStatus;
}
public void setVersionService(VersionService versionService)
{
this.versionService = versionService;
}
public void setBehaviourFilter(BehaviourFilter behaviourFilter)
{
this.behaviourFilter = behaviourFilter;
}
}

View File

@@ -0,0 +1,638 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.repo.bulkimport.BulkImportStatus;
import org.alfresco.repo.bulkimport.ImportableItem;
/**
* Thread-safe implementation of Bulk Import Status.
*
* @since 4.0
*
* @see org.alfresco.extension.bulkfilesystemimport.BulkImportStatus
*/
public class BulkImportStatusImpl implements BulkImportStatus
{
public enum NodeState
{
SKIPPED,
CREATED,
REPLACED
};
// General information
private int numThreads;
private int batchSize;
private AtomicBoolean inProgress = new AtomicBoolean();
private String sourceDirectory = null;
private String targetSpace = null;
private Date startDate = null;
private Date endDate = null;
private Long startNs = null;
private Long endNs = null;
private Throwable lastException = null;
private AtomicLong numberOfBatchesCompleted = new AtomicLong();
// Read-side information
private AtomicLong numberOfFoldersScanned = new AtomicLong();
private AtomicLong numberOfFilesScanned = new AtomicLong();
private AtomicLong numberOfUnreadableEntries = new AtomicLong();
private AtomicLong numberOfContentFilesRead = new AtomicLong();
private AtomicLong numberOfContentBytesRead = new AtomicLong();
private AtomicLong numberOfMetadataFilesRead = new AtomicLong();
private AtomicLong numberOfMetadataBytesRead = new AtomicLong();
private AtomicLong numberOfContentVersionFilesRead = new AtomicLong();
private AtomicLong numberOfContentVersionBytesRead = new AtomicLong();
private AtomicLong numberOfMetadataVersionFilesRead = new AtomicLong();
private AtomicLong numberOfMetadataVersionBytesRead = new AtomicLong();
// Write-side information
private AtomicLong numberOfSpaceNodesCreated = new AtomicLong();
private AtomicLong numberOfSpaceNodesReplaced = new AtomicLong();
private AtomicLong numberOfSpaceNodesSkipped = new AtomicLong();
private AtomicLong numberOfSpacePropertiesWritten = new AtomicLong();
private AtomicLong numberOfContentNodesCreated = new AtomicLong();
private AtomicLong numberOfContentNodesReplaced = new AtomicLong();
private AtomicLong numberOfContentNodesSkipped = new AtomicLong();
private AtomicLong numberOfContentBytesWritten = new AtomicLong();
private AtomicLong numberOfContentPropertiesWritten = new AtomicLong();
private AtomicLong numberOfContentVersionsCreated = new AtomicLong();
private AtomicLong numberOfContentVersionBytesWritten = new AtomicLong();
private AtomicLong numberOfContentVersionPropertiesWritten = new AtomicLong();
public BulkImportStatusImpl()
{
inProgress.set(false);
}
// General information
public String getSourceDirectory() { return(sourceDirectory); }
public String getTargetSpace() { return(targetSpace); }
public Date getStartDate() { return(copyDate(startDate)); }
public Date getEndDate() { return(copyDate(endDate)); }
public Long getDurationInNs()
{
Long result = null;
if (startNs != null)
{
if (endNs != null)
{
result = new Long(endNs - startNs);
}
else
{
result = new Long(System.nanoTime() - startNs);
}
}
return(result);
}
public Long getDuration()
{
long duration = 0;
Long durationNS = getDurationInNs();
if(durationNS != null)
{
duration = durationNS / (1000 * 1000 * 1000);
if(duration == 0)
{
return null;
}
}
return new Long(duration);
}
public Throwable getLastException()
{
return(lastException);
}
public String getLastExceptionAsString()
{
String result = null;
if (lastException != null)
{
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw, true);
lastException.printStackTrace(pw);
pw.flush();
sw.flush();
result = sw.toString();
}
return(result);
}
public boolean inProgress()
{
return(inProgress.get());
}
public long getNumberOfBatchesCompleted()
{
return(numberOfBatchesCompleted.get());
}
public void incrementNumberOfBatchesCompleted()
{
numberOfBatchesCompleted.incrementAndGet();
}
public void startImport(final String sourceDirectory, final String targetSpace)
{
if (!inProgress.compareAndSet(false, true))
{
throw new AlfrescoRuntimeException("Import already in progress.");
}
// General information
this.sourceDirectory = sourceDirectory;
this.targetSpace = targetSpace;
this.startDate = new Date();
this.endDate = null;
this.lastException = null;
this.numberOfBatchesCompleted.set(0);
// Read-side information
this.numberOfFoldersScanned.set(1); // We set this to one to count the initial starting directory (which doesn't otherwise get counted)
this.numberOfFilesScanned.set(0);
this.numberOfUnreadableEntries.set(0);
this.numberOfContentFilesRead.set(0);
this.numberOfContentBytesRead.set(0);
this.numberOfMetadataFilesRead.set(0);
this.numberOfMetadataBytesRead.set(0);
this.numberOfContentVersionFilesRead.set(0);
this.numberOfContentVersionBytesRead.set(0);
this.numberOfMetadataVersionFilesRead.set(0);
this.numberOfMetadataVersionBytesRead.set(0);
// Write-side information
this.numberOfSpaceNodesCreated.set(0);
this.numberOfSpaceNodesReplaced.set(0);
this.numberOfSpaceNodesSkipped.set(0);
this.numberOfSpacePropertiesWritten.set(0);
this.numberOfContentNodesCreated.set(0);
this.numberOfContentNodesReplaced.set(0);
this.numberOfContentNodesSkipped.set(0);
this.numberOfContentBytesWritten.set(0);
this.numberOfContentPropertiesWritten.set(0);
this.numberOfContentVersionsCreated.set(0);
this.numberOfContentVersionBytesWritten.set(0);
this.numberOfContentVersionPropertiesWritten.set(0);
this.startNs = System.nanoTime();
this.endNs = null;
}
public void stopImport()
{
if (!inProgress.compareAndSet(true, false))
{
throw new RuntimeException("Import not in progress.");
}
endNs = System.nanoTime();
endDate = new Date();
}
public void stopImport(final Throwable lastException)
{
stopImport();
this.lastException = lastException;
}
// Read-side information
public long getNumberOfFoldersScanned() { return(numberOfFoldersScanned.longValue()); }
public long getNumberOfFilesScanned() { return(numberOfFilesScanned.longValue()); }
public long getNumberOfUnreadableEntries() { return(numberOfUnreadableEntries.longValue()); }
public long getNumberOfContentFilesRead() { return(numberOfContentFilesRead.longValue()); }
public long getNumberOfContentBytesRead() { return(numberOfContentBytesRead.longValue()); }
public long getNumberOfMetadataFilesRead() { return(numberOfMetadataFilesRead.longValue()); }
public long getNumberOfMetadataBytesRead() { return(numberOfMetadataBytesRead.longValue()); }
public long getNumberOfContentVersionFilesRead() { return(numberOfContentVersionFilesRead.longValue()); }
public long getNumberOfContentVersionBytesRead() { return(numberOfContentVersionBytesRead.longValue()); }
public long getNumberOfMetadataVersionFilesRead() { return(numberOfMetadataVersionFilesRead.longValue()); }
public long getNumberOfMetadataVersionBytesRead() { return(numberOfMetadataVersionBytesRead.longValue()); }
public void incrementImportableItemsRead(final ImportableItem importableItem, final boolean isDirectory)
{
if (importableItem.getHeadRevision().contentFileExists())
{
if (!isDirectory)
{
numberOfContentFilesRead.incrementAndGet();
numberOfContentBytesRead.addAndGet(importableItem.getHeadRevision().getContentFileSize());
}
}
if (importableItem.getHeadRevision().metadataFileExists())
{
numberOfMetadataFilesRead.incrementAndGet();
numberOfMetadataBytesRead.addAndGet(importableItem.getHeadRevision().getMetadataFileSize());
}
if (!isDirectory && importableItem.hasVersionEntries())
{
for (final ImportableItem.ContentAndMetadata versionEntry : importableItem.getVersionEntries())
{
if (versionEntry.contentFileExists())
{
numberOfContentVersionFilesRead.incrementAndGet();
numberOfContentVersionBytesRead.addAndGet(versionEntry.getContentFileSize());
}
if (versionEntry.metadataFileExists())
{
numberOfMetadataVersionFilesRead.incrementAndGet();
numberOfMetadataVersionBytesRead.addAndGet(versionEntry.getMetadataFileSize());
}
}
}
}
public void incrementNumberOfFilesScanned()
{
numberOfFilesScanned.incrementAndGet();
}
public void incrementNumberOfFoldersScanned()
{
numberOfFoldersScanned.incrementAndGet();
}
public void incrementNumberOfUnreadableEntries()
{
numberOfUnreadableEntries.incrementAndGet();
}
public void incrementImportableItemsSkipped(final ImportableItem importableItem, final boolean isDirectory)
{
if (importableItem.getHeadRevision().contentFileExists())
{
long ignored = isDirectory ? numberOfSpaceNodesSkipped.incrementAndGet() : numberOfContentNodesSkipped.incrementAndGet();
}
// We don't track the number of properties or version entries skipped
}
// Write-side information
public long getNumberOfSpaceNodesCreated() { return(numberOfSpaceNodesCreated.longValue()); }
public long getNumberOfSpaceNodesReplaced() { return(numberOfSpaceNodesReplaced.longValue()); }
public long getNumberOfSpaceNodesSkipped() { return(numberOfSpaceNodesSkipped.longValue()); }
public long getNumberOfSpacePropertiesWritten() { return(numberOfSpacePropertiesWritten.longValue()); }
public long getNumberOfContentNodesCreated() { return(numberOfContentNodesCreated.longValue()); }
public long getNumberOfContentNodesReplaced() { return(numberOfContentNodesReplaced.longValue()); }
public long getNumberOfContentNodesSkipped() { return(numberOfContentNodesSkipped.longValue()); }
public long getNumberOfContentBytesWritten() { return(numberOfContentBytesWritten.longValue()); }
public long getNumberOfContentPropertiesWritten() { return(numberOfContentPropertiesWritten.longValue()); }
public long getNumberOfContentVersionsCreated() { return(numberOfContentVersionsCreated.longValue()); }
public long getNumberOfContentVersionBytesWritten() { return(numberOfContentVersionBytesWritten.longValue()); }
public long getNumberOfContentVersionPropertiesWritten() { return(numberOfContentVersionPropertiesWritten.longValue()); }
public void incrementContentBytesWritten(final ImportableItem importableItem, final boolean isSpace,
final NodeState nodeState)
{
if (importableItem.getHeadRevision().contentFileExists())
{
switch (nodeState)
{
case CREATED:
numberOfContentBytesWritten.addAndGet(importableItem.getHeadRevision().getContentFileSize());
break;
case REPLACED:
numberOfContentBytesWritten.addAndGet(importableItem.getHeadRevision().getContentFileSize());
break;
}
}
}
public void incrementNodesWritten(final ImportableItem importableItem,
final boolean isSpace,
final NodeState nodeState,
final long numProperties,
final long numVersionProperties)
{
long ignored;
if (importableItem.getHeadRevision().contentFileExists())
{
switch (nodeState)
{
case SKIPPED:
ignored = isSpace ? numberOfSpaceNodesSkipped.incrementAndGet() : numberOfContentNodesSkipped.incrementAndGet();
break;
case CREATED:
ignored = isSpace ? numberOfSpaceNodesCreated.incrementAndGet() : numberOfContentNodesCreated.incrementAndGet();
break;
case REPLACED:
ignored = isSpace ? numberOfSpaceNodesReplaced.incrementAndGet() : numberOfContentNodesReplaced.incrementAndGet();
break;
}
}
switch (nodeState)
{
case SKIPPED:
// We don't track the number of properties skipped
break;
case CREATED:
case REPLACED:
ignored = isSpace ? numberOfSpacePropertiesWritten.addAndGet(numProperties) : numberOfContentPropertiesWritten.addAndGet(numProperties);
break;
}
if (!isSpace && importableItem.hasVersionEntries())
{
numberOfContentVersionPropertiesWritten.addAndGet(numVersionProperties);
for (final ImportableItem.ContentAndMetadata versionEntry : importableItem.getVersionEntries())
{
if (versionEntry.contentFileExists())
{
switch (nodeState)
{
case SKIPPED:
// We only track the number of items skipped on the read side
break;
case CREATED:
case REPLACED:
numberOfContentVersionsCreated.incrementAndGet();
numberOfContentVersionBytesWritten.addAndGet(versionEntry.getContentFileSize());
break;
}
}
}
}
}
public Long getFilesReadPerSecond()
{
Long duration = getDuration();
if(duration != null)
{
long totalFilesRead = numberOfContentFilesRead.longValue() + numberOfMetadataFilesRead.longValue() +
numberOfContentVersionFilesRead.longValue() + numberOfMetadataVersionFilesRead.longValue();
long filesReadPerSecond = totalFilesRead / duration;
return filesReadPerSecond;
}
else
{
return null;
}
}
public Long getBytesReadPerSecond()
{
Long duration = getDuration();
if(duration != null)
{
long totalDataRead = numberOfContentBytesRead.longValue() + numberOfMetadataBytesRead.longValue() +
numberOfContentVersionBytesRead.longValue() + numberOfMetadataVersionBytesRead.longValue();
long bytesReadPerSecond = totalDataRead / duration;
return bytesReadPerSecond;
}
else
{
return null;
}
}
public Long getEntriesScannedPerSecond()
{
Long duration = getDuration();
if(duration != null)
{
long entriesScannedPerSecond = (numberOfFilesScanned.longValue() + numberOfFoldersScanned.longValue()) / duration;
return entriesScannedPerSecond;
}
else
{
return null;
}
}
public Long getBytesWrittenPerSecond()
{
Long duration = getDuration();
if(duration != null)
{
long totalDataWritten = numberOfContentBytesWritten.longValue() + numberOfContentVersionBytesWritten.longValue();
long bytesWrittenPerSecond = totalDataWritten / duration;
return bytesWrittenPerSecond;
}
else
{
return null;
}
}
public Long getNodesCreatedPerSecond()
{
Long duration = getDuration();
if(duration != null)
{
// Note: we count versions as a node
long totalNodesWritten = numberOfSpaceNodesCreated.longValue() + numberOfSpaceNodesReplaced.longValue() +
numberOfContentNodesCreated.longValue() + numberOfContentNodesReplaced.longValue() + numberOfContentVersionsCreated.longValue();
long nodesCreatedPerSecond = totalNodesWritten / duration;
return nodesCreatedPerSecond;
}
else
{
return null;
}
}
// Private helper methods
private final Date copyDate(final Date date)
{
// Defensively copy the date.
Date result = null;
if (date != null)
{
result = new Date(date.getTime());
}
return(result);
}
public int getNumThreads()
{
return numThreads;
}
public int getBatchSize()
{
return batchSize;
}
public void setNumThreads(int numThreads)
{
this.numThreads = numThreads;
}
public void setBatchSize(int batchSize)
{
this.batchSize = batchSize;
}
public String toString()
{
StringBuilder sb = new StringBuilder();
sb.append("Bulk Import Status\n");
sb.append("Source directory: ");
sb.append(getSourceDirectory());
sb.append("\nTarget space: ");
sb.append(getTargetSpace());
sb.append("\nStart date : ");
sb.append(getStartDate());
sb.append("\nEnd date : ");
sb.append(getEndDate());
sb.append("\nNum threads : ");
sb.append(getNumThreads());
sb.append("\nBatch size : ");
sb.append(getBatchSize());
if(inProgress())
{
sb.append("\n\nIn progress");
}
else
{
sb.append("\n\nNot in progress");
}
sb.append("\nBytes written/sec : ");
sb.append(getBytesWrittenPerSecond());
sb.append("\nBytes read/sec : ");
sb.append(getBytesReadPerSecond());
sb.append("\nEntries scanned/sec : ");
sb.append(getEntriesScannedPerSecond());
sb.append("\nFiles read/sec : ");
sb.append(getFilesReadPerSecond());
sb.append("\nNodes created/sec : ");
sb.append(getNodesCreatedPerSecond());
sb.append("\nNumber of files scanned : ");
sb.append(getNumberOfFilesScanned());
sb.append("\nNumber of folders scanned : ");
sb.append(getNumberOfFoldersScanned());
sb.append("\nNumber of content files read : ");
sb.append(getNumberOfContentFilesRead());
sb.append("\nNumber of content version files read : ");
sb.append(getNumberOfContentVersionFilesRead());
sb.append("\nNumber of metadata files read : ");
sb.append(getNumberOfMetadataFilesRead());
sb.append("\nNumber of metadata version files read : ");
sb.append(getNumberOfMetadataVersionFilesRead());
sb.append("\nNumber of unreadable entries : ");
sb.append(getNumberOfUnreadableEntries());
sb.append("\nNumber of content nodes created : ");
sb.append(getNumberOfContentNodesCreated());
sb.append("\nNumber of content nodes replaced : ");
sb.append(getNumberOfContentNodesReplaced());
sb.append("\nNumber of content nodes skipped : ");
sb.append(getNumberOfContentNodesSkipped());
sb.append("\nNumber of content properties written : ");
sb.append(getNumberOfContentPropertiesWritten());
sb.append("\nNumber of content version properties written : ");
sb.append(getNumberOfContentVersionPropertiesWritten());
sb.append("\nNumber of content versions created : ");
sb.append(getNumberOfContentVersionsCreated());
sb.append("\nNumber of space nodes created : ");
sb.append(getNumberOfSpaceNodesCreated());
sb.append("\nNumber of space nodes replaced : ");
sb.append(getNumberOfSpaceNodesReplaced());
sb.append("\nNumber of space nodes skipped : ");
sb.append(getNumberOfSpaceNodesSkipped());
sb.append("\nNumber of space properties written : ");
sb.append(getNumberOfSpacePropertiesWritten());
sb.append("\nNumber of bytes read : ");
sb.append(getNumberOfContentBytesRead());
sb.append("\nNumber of metadata bytes read: ");
sb.append(getNumberOfMetadataBytesRead());
sb.append("\nNumber of content version bytes read : ");
sb.append(getNumberOfContentVersionBytesRead());
sb.append("\nNumber of metadata bytes read : ");
sb.append(getNumberOfMetadataBytesRead());
sb.append("\nNumber of metadata version bytes read : ");
sb.append(getNumberOfMetadataVersionBytesRead());
sb.append("\nNumber of batches completed : ");
sb.append(getNumberOfBatchesCompleted());
sb.append("\nNumber of bytes written : ");
sb.append(getNumberOfContentBytesWritten());
sb.append("\nNumber of content version bytes written : ");
sb.append(getNumberOfContentVersionBytesWritten());
return sb.toString();
}
}

View File

@@ -0,0 +1,152 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.List;
import javax.transaction.NotSupportedException;
import javax.transaction.SystemException;
import org.alfresco.repo.bulkimport.BulkImportParameters;
import org.alfresco.repo.bulkimport.NodeImporter;
import org.alfresco.repo.content.MimetypeMap;
import org.alfresco.service.cmr.model.FileInfo;
import org.alfresco.service.cmr.repository.NodeRef;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.util.ResourceUtils;
/**
* @since 4.0
*/
public class BulkImportTests extends AbstractBulkImportTests
{
private StreamingNodeImporterFactory streamingNodeImporterFactory;
@BeforeClass
public static void beforeTests()
{
startContext();
}
@Before
public void setup() throws SystemException, NotSupportedException
{
super.setup();
streamingNodeImporterFactory = (StreamingNodeImporterFactory)ctx.getBean("streamingNodeImporterFactory");
}
@Test
public void testCopyImportStriping()
{
NodeRef folderNode = topLevelFolder.getNodeRef();
try
{
NodeImporter nodeImporter = streamingNodeImporterFactory.getNodeImporter(ResourceUtils.getFile("classpath:bulkimport"));
BulkImportParameters bulkImportParameters = new BulkImportParameters();
bulkImportParameters.setTarget(folderNode);
bulkImportParameters.setReplaceExisting(true);
bulkImportParameters.setBatchSize(40);
bulkImporter.bulkImport(bulkImportParameters, nodeImporter);
}
catch(Throwable e)
{
fail(e.getMessage());
}
System.out.println(bulkImporter.getStatus());
checkFiles(folderNode, null, 2, 9,
new ExpectedFile[]
{
new ExpectedFile("quickImg1.xls", MimetypeMap.MIMETYPE_EXCEL),
new ExpectedFile("quickImg1.doc", MimetypeMap.MIMETYPE_WORD),
new ExpectedFile("quick.txt", MimetypeMap.MIMETYPE_TEXT_PLAIN, "The quick brown fox jumps over the lazy dog"),
},
new ExpectedFolder[]
{
new ExpectedFolder("folder1"),
new ExpectedFolder("folder2")
});
List<FileInfo> folders = getFolders(folderNode, "folder1");
assertEquals("", 1, folders.size());
NodeRef folder1 = folders.get(0).getNodeRef();
checkFiles(folder1, null, 1, 0, null,
new ExpectedFolder[]
{
new ExpectedFolder("folder1.1")
});
folders = getFolders(folderNode, "folder2");
assertEquals("", 1, folders.size());
NodeRef folder2 = folders.get(0).getNodeRef();
checkFiles(folder2, null, 1, 0,
new ExpectedFile[]
{
},
new ExpectedFolder[]
{
new ExpectedFolder("folder2.1")
});
folders = getFolders(folder1, "folder1.1");
assertEquals("", 1, folders.size());
NodeRef folder1_1 = folders.get(0).getNodeRef();
checkFiles(folder1_1, null, 2, 12,
new ExpectedFile[]
{
new ExpectedFile("quick.txt", MimetypeMap.MIMETYPE_TEXT_PLAIN, "The quick brown fox jumps over the lazy dog"),
new ExpectedFile("quick.sxw", MimetypeMap.MIMETYPE_OPENOFFICE1_WRITER),
new ExpectedFile("quick.tar", "application/x-gtar"),
},
new ExpectedFolder[]
{
new ExpectedFolder("folder1.1.1"),
new ExpectedFolder("folder1.1.2")
});
folders = getFolders(folder2, "folder2.1");
assertEquals("", 1, folders.size());
NodeRef folder2_1 = folders.get(0).getNodeRef();
checkFiles(folder2_1, null, 0, 17,
new ExpectedFile[]
{
new ExpectedFile("quick.png", MimetypeMap.MIMETYPE_IMAGE_PNG),
new ExpectedFile("quick.pdf", MimetypeMap.MIMETYPE_PDF),
new ExpectedFile("quick.odt", MimetypeMap.MIMETYPE_OPENDOCUMENT_TEXT),
},
new ExpectedFolder[]
{
});
}
}

View File

@@ -0,0 +1,414 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import java.io.File;
import java.io.FileFilter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.alfresco.repo.bulkimport.AnalysedDirectory;
import org.alfresco.repo.bulkimport.DirectoryAnalyser;
import org.alfresco.repo.bulkimport.ImportFilter;
import org.alfresco.repo.bulkimport.ImportableItem;
import org.alfresco.repo.bulkimport.ImportableItem.FileType;
import org.alfresco.repo.bulkimport.MetadataLoader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* This class provides the implementation for directory analysis, the process by
* which a directory listing of files is broken up into ImportableItems.
*
* @since 4.0
*
*/
public class DirectoryAnalyserImpl implements DirectoryAnalyser
{
private final static Log log = LogFactory.getLog(DirectoryAnalyserImpl.class);
private final static Pattern VERSION_SUFFIX_PATTERN = Pattern.compile(".+" + VERSION_SUFFIX_REGEX);
private MetadataLoader metadataLoader;
private BulkImportStatusImpl importStatus;
private List<ImportFilter> importFilters;
public DirectoryAnalyserImpl(MetadataLoader metadataLoader, BulkImportStatusImpl importStatus, List<ImportFilter> importFilters)
{
this.metadataLoader = metadataLoader;
this.importStatus = importStatus;
this.importFilters = importFilters;
}
public DirectoryAnalyserImpl()
{
}
public void setMetadataLoader(MetadataLoader metadataLoader)
{
this.metadataLoader = metadataLoader;
}
public void setImportStatus(BulkImportStatusImpl status)
{
importStatus = status;
}
public final void setImportFilters(List<ImportFilter> importFilters)
{
if(importFilters != null)
{
this.importFilters = importFilters;
}
else
{
this.importFilters = new ArrayList<ImportFilter>();
}
}
protected boolean shouldFilter(ImportableItem importableItem)
{
boolean filterImportableItem = false;
if(importFilters != null && importFilters.size() > 0)
{
for (ImportFilter filter : importFilters)
{
if (filter.shouldFilter(importableItem))
{
filterImportableItem = true;
break;
}
}
}
return filterImportableItem;
}
/**
* @see org.alfresco.extension.bulkfilesystemimport.DirectoryAnalyser#analyseDirectory(java.io.File)
*/
public AnalysedDirectory analyseDirectory(ImportableItem directory, FileFilter filter)
{
File directoryFile = directory.getHeadRevision().getContentFile();
AnalysedDirectory result = null;
if(filter == null)
{
result = new AnalysedDirectory(directoryFile.listFiles());
}
else
{
result = new AnalysedDirectory(directoryFile.listFiles(filter));
}
if (log.isDebugEnabled())
{
log.debug("Analysing directory " + FileUtils.getFileName(directoryFile) + "...");
}
// Build up the list of ImportableItems from the directory listing
for (File file : result.getOriginalListing())
{
if (log.isTraceEnabled())
{
log.trace("Scanning file " + FileUtils.getFileName(file) + "...");
}
if (file.canRead())
{
if (isVersionFile(file))
{
addVersionFile(directory, result, file);
importStatus.incrementNumberOfFilesScanned();
}
else if (isMetadataFile(file))
{
addMetadataFile(directory, result, file);
importStatus.incrementNumberOfFilesScanned();
}
else
{
boolean isDirectory = addParentFile(directory, result, file);
if (isDirectory)
{
importStatus.incrementNumberOfFoldersScanned();
}
else
{
importStatus.incrementNumberOfFilesScanned();
}
}
}
else
{
if (log.isWarnEnabled())
{
log.warn("Skipping unreadable file '" + FileUtils.getFileName(file) + "'.");
}
importStatus.incrementNumberOfUnreadableEntries();
}
}
// Finally, remove any items from the list that aren't valid (don't have either a
// contentFile or a metadataFile)
Iterator<ImportableItem> iter = result.getImportableItems().iterator();
while (iter.hasNext())
{
ImportableItem importableItem = iter.next();
if (!importableItem.isValid())
{
iter.remove();
}
}
iter = result.getImportableDirectories().iterator();
while (iter.hasNext())
{
ImportableItem importableItem = iter.next();
if (!importableItem.isValid())
{
iter.remove();
}
}
if (log.isDebugEnabled())
{
log.debug("Finished analysing directory " + FileUtils.getFileName(directoryFile) + ".");
}
return result;
}
private boolean isVersionFile(File file)
{
Matcher matcher = VERSION_SUFFIX_PATTERN.matcher(file.getName());
return matcher.matches();
}
private boolean isMetadataFile(File file)
{
boolean result = false;
if (metadataLoader != null)
{
result = file.getName().endsWith(MetadataLoader.METADATA_SUFFIX + metadataLoader.getMetadataFileExtension());
}
return(result);
}
private void addVersionFile(ImportableItem parent, AnalysedDirectory analysedDirectory, File versionFile)
{
File parentContentFile = getParentOfVersionFile(versionFile);
boolean isContentVersion = false;
if (isMetadataFile(parentContentFile))
{
parentContentFile = getParentOfMetadatafile(parentContentFile);
isContentVersion = false;
}
else
{
isContentVersion = true;
}
ImportableItem importableItem = findOrCreateImportableItem(parent, analysedDirectory, parentContentFile);
int version = getVersionNumber(versionFile);
ImportableItem.VersionedContentAndMetadata versionEntry = findOrCreateVersionEntry(importableItem, version);
if (isContentVersion)
{
versionEntry.setContentFile(versionFile);
}
else
{
versionEntry.setMetadataFile(versionFile);
}
}
private void addMetadataFile(ImportableItem parent, AnalysedDirectory analysedDirectory, File metadataFile)
{
File parentContentfile = getParentOfMetadatafile(metadataFile);
ImportableItem importableItem = findOrCreateImportableItem(parent, analysedDirectory, parentContentfile);
importableItem.getHeadRevision().setMetadataFile(metadataFile);
}
private boolean addParentFile(ImportableItem parent, AnalysedDirectory analysedDirectory, File contentFile)
{
ImportableItem importableItem = findOrCreateImportableItem(parent, analysedDirectory, contentFile);
importableItem.getHeadRevision().setContentFile(contentFile);
return(importableItem.getHeadRevision().getContentFileType() == FileType.DIRECTORY);
}
private ImportableItem findOrCreateImportableItem(ImportableItem parent, AnalysedDirectory analysedDirectory, File contentFile)
{
ImportableItem result = findImportableItem(analysedDirectory, contentFile);
// We didn't find it, so create it
if (result == null)
{
result = new ImportableItem();
result.setParent(parent);
result.getHeadRevision().setContentFile(contentFile);
if(!shouldFilter(result))
{
analysedDirectory.addImportableItem(result);
}
}
return(result);
}
private ImportableItem findImportableItem(AnalysedDirectory analysedDirectory, File contentFile)
{
ImportableItem result = null;
if (contentFile == null)
{
throw new IllegalStateException("Cannot call findOrCreateImportableItem with null key");
}
result = analysedDirectory.findImportableItem(contentFile);
return(result);
}
private ImportableItem.VersionedContentAndMetadata findOrCreateVersionEntry(ImportableItem importableItem, int version)
{
ImportableItem.VersionedContentAndMetadata result = findVersionEntry(importableItem, version);
if (result == null)
{
result = importableItem.new VersionedContentAndMetadata(version);
importableItem.addVersionEntry(result);
}
return (result);
}
private ImportableItem.VersionedContentAndMetadata findVersionEntry(ImportableItem importableItem, int version)
{
ImportableItem.VersionedContentAndMetadata result = null;
if (importableItem.hasVersionEntries())
{
for (ImportableItem.VersionedContentAndMetadata versionEntry : importableItem.getVersionEntries())
{
if (version == versionEntry.getVersion())
{
result = versionEntry;
break;
}
}
}
return(result);
}
private int getVersionNumber(File versionFile)
{
int result = -1;
if (!isVersionFile(versionFile))
{
throw new IllegalStateException(FileUtils.getFileName(versionFile) + " is not a version file.");
}
Matcher matcher = VERSION_SUFFIX_PATTERN.matcher(versionFile.getName());
String versionStr = null;
if (matcher.matches())
{
versionStr = matcher.group(1);
}
else
{
throw new IllegalStateException(""); // ####TODO!!!!
}
result = Integer.parseInt(versionStr);
return(result);
}
private File getParentOfVersionFile(File versionFile)
{
File result = null;
if (!isVersionFile(versionFile))
{
throw new IllegalStateException(FileUtils.getFileName(versionFile) + " is not a version file.");
}
String parentFilename = versionFile.getName().replaceFirst(VERSION_SUFFIX_REGEX, "");
result = new File(versionFile.getParent(), parentFilename);
return(result);
}
private File getParentOfMetadatafile(File metadataFile)
{
File result = null;
if (!isMetadataFile(metadataFile))
{
throw new IllegalStateException(FileUtils.getFileName(metadataFile) + " is not a metadata file.");
}
String name = metadataFile.getName();
String contentName = name.substring(0, name.length() - (MetadataLoader.METADATA_SUFFIX + metadataLoader.getMetadataFileExtension()).length());
result = new File(metadataFile.getParent(), contentName);
return(result);
}
}

View File

@@ -0,0 +1,56 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import java.io.File;
import java.io.IOException;
/**
*
* @since 4.0
*
* TODO move to core project
*/
public class FileUtils
{
public static String getFileName(final File file)
{
String result = null;
if (file != null)
{
try
{
result = file.getCanonicalPath();
}
catch (final IOException ioe)
{
result = file.toString();
}
}
return(result);
}
}

View File

@@ -0,0 +1,176 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.bulkimport.ContentDataFactory;
import org.alfresco.repo.content.ContentStore;
import org.alfresco.repo.content.encoding.ContentCharsetFinder;
import org.alfresco.service.cmr.repository.ContentData;
import org.alfresco.service.cmr.repository.MimetypeService;
import org.alfresco.service.namespace.QName;
import org.alfresco.util.PropertyCheck;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
/**
*
* Factory that creates {@link ContentData} out of :
* <ul>
* <li> a {@link ContentStore}
* <li> a {@link File} located within that store's root
* </ul><br>
*
* The mimetype will be guessed from the file extension, or fall back to binary.
* The encoding will be guessed from the file itself, or fall back to {@link #defaultEncoding}.
*
* @since 4.0
*/
public class FilesystemContentDataFactory implements ContentDataFactory, InitializingBean
{
private static final Log logger = LogFactory.getLog(FilesystemContentDataFactory.class);
private static final String PROTOCOL_DELIMITER = ContentStore.PROTOCOL_DELIMITER;
private static final String OS_FILE_SEPARATOR = System.getProperty("file.separator");
private MimetypeService mimetypeService;
private String defaultEncoding;
private String storeProtocol;
public void setMimetypeService(MimetypeService mimetypeService)
{
this.mimetypeService = mimetypeService;
}
public void setDefaultEncoding(String defaultEncoding)
{
this.defaultEncoding = defaultEncoding;
}
public void setStoreProtocol(String storeProtocol)
{
this.storeProtocol = storeProtocol;
}
public void afterPropertiesSet() throws Exception
{
PropertyCheck.mandatory(this, "mimetypeService", mimetypeService);
PropertyCheck.mandatory(this, "defaultEncoding", defaultEncoding);
PropertyCheck.mandatory(this, "storeProtocol", storeProtocol);
}
/**
* Create a {@link ContentData} by combining the given {@link ContentStore}'s root location and the {@link File}'s path within that store.
* The given file must therefore be accessible within the content store's configured root location.
* The encoding and mimetype will be guessed from the given file.
*
* @param store The {@link ContentStore} in which the file should be
* @param contentFile The {@link File} to check
* @return the constructed {@link ContentData}
*/
public ContentData createContentData(ContentStore store, File contentFile)
{
if(!contentIsInStore(contentFile, store))
{
throw new IllegalArgumentException("Can't create content URL : file '" + contentFile.getAbsolutePath() +
"' is not located within the store's tree ! The store's root is :'" + store.getRootLocation());
}
String relativeFilePath = contentFile.getAbsolutePath().replace(store.getRootLocation() + OS_FILE_SEPARATOR, "");
String mimetype = mimetypeService.guessMimetype(contentFile.getName());
String encoding = defaultEncoding;
if(!contentFile.isDirectory())
{
encoding = guessEncoding(contentFile, mimetype);
}
ContentData contentData = new ContentData(storeProtocol + PROTOCOL_DELIMITER + relativeFilePath, mimetype, contentFile.length(), encoding);
Map<QName, Serializable> contentProps = new HashMap<QName, Serializable>();
contentProps.put(ContentModel.PROP_NAME, contentFile.getName());
contentProps.put(ContentModel.PROP_CONTENT, contentData);
return contentData;
}
/**
* Check if file is in the store's tree, by checking if the file path starts
* with the store's configured root location.
*
* @param store The {@link ContentStore} in which the file should be
* @param contentFile The {@link File} to check
* @return boolean : whether or not the file is in the expected file tree
*/
private boolean contentIsInStore(File contentFile,ContentStore store)
{
return contentFile.getAbsolutePath().startsWith(store.getRootLocation());
}
/**
* Attempt to guess file encoding. fall back to {@link #defaultEncoding} otherwise.
*
* @param file the {@link java.io.File} to test
* @param mimetype the file mimetype. used to first distinguish between binary and text files
* @return the encoding as a {@link String}
*/
private String guessEncoding(File file,String mimetype)
{
String encoding = defaultEncoding; // fallback default
if(file.isDirectory())
return defaultEncoding; // not necessary to guess folder encoding
InputStream is = null;
ContentCharsetFinder charsetFinder = mimetypeService.getContentCharsetFinder();
try
{
is = new BufferedInputStream(new FileInputStream(file));
encoding = charsetFinder.getCharset(is, mimetype).name();
}
catch (Throwable e)
{
if(logger.isWarnEnabled())
logger.warn("Failed to guess character encoding of file: '" + file.getName() + "'. Falling back to configured default encoding (" + defaultEncoding + ")");
}
finally
{
if (is != null)
{
try { is.close(); } catch (Throwable e) {}
}
}
return encoding;
}
}

View File

@@ -0,0 +1,153 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.batch.BatchProcessWorkProvider;
import org.alfresco.repo.batch.BatchProcessor;
import org.alfresco.repo.bulkimport.BulkFilesystemImporter;
import org.alfresco.repo.bulkimport.BulkImportParameters;
import org.alfresco.repo.bulkimport.FilesystemTracker;
import org.alfresco.repo.bulkimport.ImportableItem;
import org.alfresco.repo.bulkimport.NodeImporter;
import org.alfresco.service.cmr.repository.NodeRef;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Performs a filesystem import into the repository using the {@link BatchProcessor} in multiple threads.
*
* @since 4.0
*
*/
public abstract class MultiThreadedBulkFilesystemImporter extends AbstractBulkFilesystemImporter
{
protected final static Log logger = LogFactory.getLog(BulkFilesystemImporter.class);
protected int defaultBatchSize;
protected int defaultNumThreads;
protected int defaultLoggingInterval = 100;
protected int getLoggingInterval(BulkImportParameters bulkImportParameters)
{
return bulkImportParameters.getLoggingInterval() != null ? bulkImportParameters.getLoggingInterval() : defaultLoggingInterval;
}
protected int getBatchSize(BulkImportParameters bulkImportParameters)
{
return bulkImportParameters.getBatchSize() != null ? bulkImportParameters.getBatchSize() : defaultBatchSize;
}
protected int getNumThreads(BulkImportParameters bulkImportParameters)
{
return bulkImportParameters.getNumThreads() != null ? bulkImportParameters.getNumThreads() : defaultNumThreads;
}
protected BatchProcessor.BatchProcessWorker<ImportableItem> getWorker(final BulkImportParameters bulkImportParameters, final String lockToken,
final NodeImporter nodeImporter, final FilesystemTracker filesystemTracker)
{
final int batchSize = bulkImportParameters.getBatchSize() != null ? bulkImportParameters.getBatchSize() : defaultBatchSize;
BatchProcessor.BatchProcessWorker<ImportableItem> worker = new BatchProcessor.BatchProcessWorker<ImportableItem>()
{
public String getIdentifier(ImportableItem importableItem)
{
return importableItem.toString();
}
public void beforeProcess() throws Throwable
{
refreshLock(lockToken, batchSize * 250L);
// TODO this throws exception txn not started??
// Disable the auditable aspect's behaviours for this transaction only, to allow creation & modification dates to be set
//behaviourFilter.disableBehaviour(ContentModel.ASPECT_AUDITABLE);
}
public void afterProcess() throws Throwable
{
importStatus.incrementNumberOfBatchesCompleted();
//behaviourFilter.enableBehaviour(ContentModel.ASPECT_AUDITABLE);
}
public void process(final ImportableItem importableItem) throws Throwable
{
try
{
behaviourFilter.disableBehaviour(ContentModel.ASPECT_AUDITABLE);
NodeRef nodeRef = nodeImporter.importImportableItem(importableItem, bulkImportParameters.isReplaceExisting());
filesystemTracker.itemImported(nodeRef, importableItem);
// importableItem.setNodeRef(nodeRef);
}
finally
{
behaviourFilter.enableBehaviour(ContentModel.ASPECT_AUDITABLE);
}
}
};
return worker;
}
protected BatchProcessor<ImportableItem> getBatchProcessor(final BulkImportParameters bulkImportParameters,
final BatchProcessWorkProvider<ImportableItem> workProvider, final int loggingInterval)
{
final int numThreads = getNumThreads(bulkImportParameters);
final int batchSize = getBatchSize(bulkImportParameters);
importStatus.setNumThreads(numThreads);
importStatus.setBatchSize(batchSize);
BatchProcessor<ImportableItem> batchProcessor = new BatchProcessor<ImportableItem>(
"Bulk Filesystem Import",
transactionHelper,
workProvider,
numThreads, batchSize,
applicationContext,
logger, loggingInterval);
return batchProcessor;
}
public void setDefaultNumThreads(int defaultNumThreads)
{
this.defaultNumThreads = defaultNumThreads;
}
public void setDefaultBatchSize(int defaultBatchSize)
{
this.defaultBatchSize = defaultBatchSize;
}
public int getDefaultNumThreads()
{
return defaultNumThreads;
}
public int getDefaultBatchSize()
{
return defaultBatchSize;
}
}

View File

@@ -0,0 +1,155 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import java.io.File;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.repo.bulkimport.ImportableItem;
import org.alfresco.repo.bulkimport.MetadataLoader;
import org.alfresco.repo.bulkimport.NodeImporter;
import org.alfresco.repo.bulkimport.impl.BulkImportStatusImpl.NodeState;
import org.alfresco.service.cmr.repository.ContentWriter;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.util.Triple;
/**
*
* @since 4.0
*
*/
public class StreamingNodeImporterFactory extends AbstractNodeImporterFactory
{
public NodeImporter getNodeImporter(File sourceFolder)
{
StreamingNodeImporter nodeImporter = new StreamingNodeImporter();
nodeImporter.setNodeService(nodeService);
nodeImporter.setBehaviourFilter(behaviourFilter);
nodeImporter.setFileFolderService(fileFolderService);
nodeImporter.setMetadataLoader(metadataLoader);
nodeImporter.setVersionService(versionService);
nodeImporter.setImportStatus(importStatus);
nodeImporter.setSourceFolder(sourceFolder);
return nodeImporter;
}
/**
*
* @since 4.0
*
*/
private static class StreamingNodeImporter extends AbstractNodeImporter
{
private File sourceFolder;
public void setSourceFolder(File sourceFolder)
{
this.sourceFolder = sourceFolder;
}
protected final void importContentAndMetadata(NodeRef nodeRef, ImportableItem.ContentAndMetadata contentAndMetadata, MetadataLoader.Metadata metadata)
{
// Write the content of the file
if (contentAndMetadata.contentFileExists())
{
String filename = getFileName(contentAndMetadata.getContentFile());
if (logger.isDebugEnabled())
{
logger.debug("Streaming contents of file '" + filename + "' into node '" + nodeRef.toString() + "'.");
}
ContentWriter writer = fileFolderService.getWriter(nodeRef);
writer.putContent(contentAndMetadata.getContentFile());
}
else
{
if (logger.isDebugEnabled()) logger.debug("No content to stream into node '" + nodeRef.toString() + "' - importing metadata only.");
}
// Attach aspects and set all properties
importImportableItemMetadata(nodeRef, contentAndMetadata.getContentFile(), metadata);
}
protected NodeRef importImportableItemImpl(ImportableItem importableItem, boolean replaceExisting)
{
NodeRef target = importableItem.getParent().getNodeRef();
if(target == null)
{
// the parent has not been created yet, retry
throw new AlfrescoRuntimeException("Bulk importer: target is not known for importable item: " + importableItem.getParent());
}
NodeRef result = null;
MetadataLoader.Metadata metadata = loadMetadata(importableItem.getHeadRevision());
Triple<NodeRef, Boolean, NodeState> node = createOrFindNode(target, importableItem, replaceExisting, metadata);
boolean isDirectory = node.getSecond() == null ? false : node.getSecond(); // Watch out for NPEs during unboxing!
NodeState nodeState = node.getThird();
result = node.getFirst();
if (result != null && nodeState != NodeState.SKIPPED)
{
int numVersionProperties = 0;
importStatus.incrementImportableItemsRead(importableItem, isDirectory);
// Load the item
if (isDirectory)
{
importImportableItemDirectory(result, importableItem, metadata);
}
else
{
numVersionProperties = importImportableItemFile(result, importableItem, metadata);
}
importStatus.incrementNodesWritten(importableItem, isDirectory, nodeState, metadata.getProperties().size() + 4, numVersionProperties);
importStatus.incrementContentBytesWritten(importableItem, isDirectory, nodeState);
}
else
{
if(isDirectory)
{
skipImportableDirectory(importableItem);
}
else
{
skipImportableFile(importableItem);
}
}
return(result);
}
@Override
public File getSourceFolder()
{
return sourceFolder;
}
}
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import java.io.File;
import org.alfresco.repo.batch.BatchProcessor;
import org.alfresco.repo.bulkimport.BulkImportParameters;
import org.alfresco.repo.bulkimport.ImportableItem;
import org.alfresco.repo.bulkimport.NodeImporter;
/**
* A multi threaded bulk importer that imports by striping across filesystem levels.
*
* @since 4.0
*
*/
public class StripingBulkFilesystemImporter extends MultiThreadedBulkFilesystemImporter
{
/**
* Method that does the work of importing a filesystem using the BatchProcessor.
*
* @param target The target space to ingest the content into <i>(must not be null and must be a valid, writable space in the repository)</i>.
* @param sourceFolder The original directory from which this import was initiated <i>(must not be null)</i>.
* @param source The source directory on the local filesystem to read content from <i>(must not be null and must be a valid, readable directory on the local filesystem)</i>.
* @param replaceExisting A flag indicating whether to replace (true) or skip (false) files that are already in the repository.
*/
@Override
protected void bulkImportImpl(final BulkImportParameters bulkImportParameters, final NodeImporter nodeImporter, final String lockToken) throws Throwable
{
final File sourceFolder = nodeImporter.getSourceFolder();
final int batchSize = getBatchSize(bulkImportParameters);
final int loggingInterval = getLoggingInterval(bulkImportParameters);
final StripingFilesystemTracker tracker = new StripingFilesystemTracker(directoryAnalyser, bulkImportParameters.getTarget(), sourceFolder, batchSize);
final BatchProcessor<ImportableItem> batchProcessor = getBatchProcessor(bulkImportParameters, tracker.getWorkProvider(), loggingInterval);
final BatchProcessor.BatchProcessWorker<ImportableItem> worker = getWorker(bulkImportParameters, lockToken, nodeImporter, tracker);
do
{
batchProcessor.process(worker, true);
}
while(tracker.moreLevels());
}
}

View File

@@ -0,0 +1,208 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.alfresco.repo.batch.BatchProcessWorkProvider;
import org.alfresco.repo.bulkimport.AnalysedDirectory;
import org.alfresco.repo.bulkimport.DirectoryAnalyser;
import org.alfresco.repo.bulkimport.ImportableItem;
import org.alfresco.service.cmr.repository.NodeRef;
/**
* A filesystem walker that returns all files and directories in subsequent levels of a filesystem tree; it returns all directories
* and files in a given level, at which point it moves on to the next level and starts returning directories and files in that level.
*
* @since 4.0
*
*/
public class StripingFilesystemTracker extends AbstractFilesystemTracker
{
private ImportableItem rootFolder;
private int currentLevel = 0;
private int batchSize;
// TODO choose most appropriate list type
private Map<Integer, List<ImportableItem>> directoriesToProcess = new HashMap<Integer, List<ImportableItem>>(10);
private List<ImportableItem> toProcess = new ArrayList<ImportableItem>();
public StripingFilesystemTracker(DirectoryAnalyser directoryAnalyser, NodeRef target, File sourceFolder, int batchSize)
{
this.directoryAnalyser = directoryAnalyser;
this.batchSize = batchSize;
// not really an importable item but the interface requires it to be in this form
rootFolder = new ImportableItem();
rootFolder.getHeadRevision().setContentFile(sourceFolder);
rootFolder.setNodeRef(target);
addDirectoryToProcess(rootFolder, currentLevel);
}
protected void addDirectoriesToProcess(Collection<ImportableItem> dirsToAdd, int level)
{
List<ImportableItem> dirs = getDirectoriesToProcess(level);
dirs.addAll(dirsToAdd);
}
protected void addDirectoryToProcess(ImportableItem dir, int level)
{
List<ImportableItem> dirs = getDirectoriesToProcess(level);
dirs.add(dir);
}
protected List<ImportableItem> getDirectoriesToProcess(int level)
{
List<ImportableItem> dirs = directoriesToProcess.get(new Integer(level));
if(dirs == null)
{
dirs = new ArrayList<ImportableItem>();
directoriesToProcess.put(new Integer(level), dirs);
}
return dirs;
}
public int count()
{
// Note: this is an estimate of the number of directories and files in the current level
// TODO guess - multiplier of number of directories to process in the current directory
return numDirectoriesToProcess() * 100;
}
protected void incrementLevel()
{
currentLevel++;
}
public void itemImported(NodeRef nodeRef, ImportableItem importableItem)
{
// nothing to do
}
protected void addItemsToProcess(Collection<ImportableItem> items)
{
toProcess.addAll(items);
}
protected ImportableItem getDirectoryToProcess()
{
List<ImportableItem> dirs = getDirectoriesToProcess(currentLevel);
if(dirs.size() > 0)
{
return dirs.remove(0);
}
else
{
return null;
}
}
public boolean moreLevels()
{
return getDirectoriesToProcess(currentLevel).size() > 0;
}
public int numDirectoriesToProcess()
{
return getDirectoriesToProcess(currentLevel).size();
}
protected List<ImportableItem> getImportableItems(int count)
{
while(toProcess.size() < count)
{
ImportableItem directory = getDirectoryToProcess();
if(directory != null)
{
AnalysedDirectory analysedDirectory = getImportableItemsInDirectory(directory);
addItemsToProcess(analysedDirectory.getImportableDirectories());
addItemsToProcess(analysedDirectory.getImportableItems());
// add new directories to process in next level
getDirectoriesToProcess(currentLevel+1).addAll(analysedDirectory.getImportableDirectories());
}
else
{
break;
}
}
int size = (toProcess.size() >= count ? count : toProcess.size());
List<ImportableItem> result = new ArrayList<ImportableItem>(size);
int i = size;
while(i > 0)
{
// we can assume that there are items in toProcess to remove because the size has been pre-calculated above
ImportableItem importableItem = toProcess.remove(0);
if(importableItem != null)
{
result.add(importableItem);
i--;
}
else
{
logger.warn("Unexpected empty toProcess queue");
}
}
if(result.size() == 0)
{
// this level has been exhausted, increment level
incrementLevel();
}
return result;
}
@Override
public BatchProcessWorkProvider<ImportableItem> getWorkProvider()
{
BatchProcessWorkProvider<ImportableItem> provider = new BatchProcessWorkProvider<ImportableItem>()
{
@Override
public int getTotalEstimatedWorkSize()
{
return count();
}
@Override
public Collection<ImportableItem> getNextWork()
{
// TODO perhaps some multiple of the batchSize to limit calls
// to getNextWork? Add this to repository.properties.
return getImportableItems(batchSize*1000);
}
};
return provider;
}
}

View File

@@ -0,0 +1,81 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.List;
import org.alfresco.repo.bulkimport.DirectoryAnalyser;
import org.alfresco.repo.bulkimport.ImportableItem;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.util.ApplicationContextHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.util.ResourceUtils;
/**
*
* @since 4.0
*
*/
public class StripingFilesystemTrackerTest
{
private DirectoryAnalyser directoryAnalyser;
private static ApplicationContext ctx = ApplicationContextHelper.getApplicationContext();
@Before
public void setup() throws Exception
{
directoryAnalyser = (DirectoryAnalyser)ctx.getBean("bfsiDirectoryAnalyser");
}
@After
public void teardown() throws Exception
{
}
@Test
public void test1() throws FileNotFoundException
{
final File sourceFolder = ResourceUtils.getFile("classpath:bulkimport");
final StripingFilesystemTracker tracker = new StripingFilesystemTracker(directoryAnalyser, new NodeRef("workspace", "SpacesStore", "123"), sourceFolder, Integer.MAX_VALUE);
List<ImportableItem> items = tracker.getImportableItems(Integer.MAX_VALUE);
assertEquals("", 11, items.size());
tracker.incrementLevel();
items = tracker.getImportableItems(Integer.MAX_VALUE);
assertEquals("", 2, items.size());
tracker.incrementLevel();
items = tracker.getImportableItems(Integer.MAX_VALUE);
assertEquals("", 31, items.size());
}
}

View File

@@ -0,0 +1,121 @@
/*
* Copyright (C) 2005-2010 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl.stores;
import java.util.Iterator;
import java.util.Map;
import org.alfresco.repo.bulkimport.ContentStoreMapProvider;
import org.alfresco.repo.content.ContentStore;
import org.alfresco.repo.content.filestore.FileContentStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.extensions.surf.util.AbstractLifecycleBean;
/**
* Common elements of the role of a {@link ContentStoreMapProvider}.
* Extending classes should implement {@link #setUpStoreMap()} to initialize the {@link Map<String,ContentStore>}.
*
* @since 4.0
*
*/
public abstract class AbstractContentStoreMapProvider extends AbstractLifecycleBean implements ContentStoreMapProvider
{
private final static Log logger = LogFactory.getLog(AbstractContentStoreMapProvider.class);
protected ContentStore contentStore;
protected Map<String,ContentStore> storeMap;
protected abstract void setUpStoreMap();
/**
* set up the map on startup. see {@link #setUpStoreMap()}.
*/
protected void onBootstrap(ApplicationEvent event)
{
setUpStoreMap();
}
protected void onShutdown(ApplicationEvent event)
{
// nothing particular to do
}
/**
* Check that the given store name is in the list.
* Also check it's an instance of {@link FileContentStore}. If it's not, output a warning
* as non-file-based implementations have not been tested and may be unsupported.
*
* @param storeName the store name to check
*/
public ContentStore checkAndGetStore(String storeName)
{
ContentStore store = storeMap.get(storeName);
if(store == null)
{
String validStores ="";
Iterator<String> it = storeMap.keySet().iterator();
while (it.hasNext())
{
validStores += "'" + it.next() + "'" + (it.hasNext() ? " , " : "");
}
throw new IllegalArgumentException("given store name : '" + storeName + "' is not part of the registered stores : " + validStores);
}
if(!(store instanceof FileContentStore))
{
// letting you off with a warning :)
// some people may have a custom content store for which the import could work in this case too ...
if(logger.isWarnEnabled())
{
logger.warn("selected store '" + storeName + "' is not a FileContentStore. Is the implementation based on local files ?");
}
}
return store;
}
/**
* see {@link ContentStoreMapProvider#getStoreMap()}
*/
public Map<String,ContentStore> getStoreMap()
{
return storeMap;
}
public ContentStore getContentStore()
{
return contentStore;
}
public void setContentStore(ContentStore contentStore)
{
this.contentStore = contentStore;
}
}

View File

@@ -0,0 +1,61 @@
/*
* Copyright (C) 2005-2010 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have received a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.bulkimport.impl.stores;
import java.util.HashMap;
import org.alfresco.repo.content.ContentStore;
/**
* Provides a default {@link Map<String, ContentStore>()} of registered content stores.
* Use when the Content Store Selector is not available (e.g on community releases).
*
* @since 4.0
*
*/
public class DefaultContentStoreMapProvider extends AbstractContentStoreMapProvider
{
/**
* the default store name, should match the default store defined by the content store selector aspect.
*/
private String defaultStoreName;
/**
* Default implementation, relies on the default {@link ContentStore}.
*/
protected void setUpStoreMap()
{
storeMap = new HashMap<String, ContentStore>();
storeMap.put(defaultStoreName, contentStore);
}
// boilerplate setters
public void setDefaultStoreName(String defaultStoreName)
{
this.defaultStoreName = defaultStoreName;
}
}