diff --git a/config/alfresco/opencmis-context.xml b/config/alfresco/opencmis-context.xml
index 6a854934ba..5e18dcc8b4 100644
--- a/config/alfresco/opencmis-context.xml
+++ b/config/alfresco/opencmis-context.xml
@@ -185,6 +185,10 @@
+
+
+
+
diff --git a/config/alfresco/repository.properties b/config/alfresco/repository.properties
index 48fef29dfc..9fd84e3d6c 100644
--- a/config/alfresco/repository.properties
+++ b/config/alfresco/repository.properties
@@ -709,6 +709,9 @@ opencmis.connector.default.objectsDefaultMaxItems=10000
opencmis.connector.default.objectsDefaultDepth=100
opencmis.connector.default.openHttpSession=false
opencmis.activities.enabled=true
+opencmis.bulkUpdateProperties.maxItemsSize=1000
+opencmis.bulkUpdateProperties.batchSize=20
+opencmis.bulkUpdateProperties.workerThreads=2
# IMAP
imap.server.enabled=false
diff --git a/source/java/org/alfresco/opencmis/AlfrescoCmisServiceImpl.java b/source/java/org/alfresco/opencmis/AlfrescoCmisServiceImpl.java
index 2c923aa56f..371d6f1220 100644
--- a/source/java/org/alfresco/opencmis/AlfrescoCmisServiceImpl.java
+++ b/source/java/org/alfresco/opencmis/AlfrescoCmisServiceImpl.java
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2005-2014 Alfresco Software Limited.
+ * Copyright (C) 2005-2015 Alfresco Software Limited.
*
* This file is part of Alfresco
*
@@ -28,12 +28,15 @@ import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpServletRequest;
@@ -47,12 +50,14 @@ import org.alfresco.opencmis.dictionary.PropertyDefinitionWrapper;
import org.alfresco.opencmis.dictionary.TypeDefinitionWrapper;
import org.alfresco.query.PagingRequest;
import org.alfresco.query.PagingResults;
+import org.alfresco.repo.batch.BatchProcessWorkProvider;
+import org.alfresco.repo.batch.BatchProcessor;
+import org.alfresco.repo.batch.BatchProcessor.BatchProcessWorker;
import org.alfresco.repo.content.encoding.ContentCharsetFinder;
import org.alfresco.repo.node.getchildren.GetChildrenCannedQuery;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.security.authentication.Authorization;
import org.alfresco.repo.transaction.RetryingTransactionHelper;
-import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.repo.version.VersionModel;
import org.alfresco.service.cmr.model.FileInfo;
import org.alfresco.service.cmr.model.FileNotFoundException;
@@ -1755,119 +1760,184 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
final List addSecondaryTypeIds, final List removeSecondaryTypeIds, ExtensionsData extension)
{
checkRepositoryId(repositoryId);
-
- if(objectIdAndChangeTokens.size() > 1)
+
+ if(objectIdAndChangeTokens.size() > connector.getBulkMaxItems())
{
- throw new CmisConstraintException("Bulk update not supported for more than one object.");
+ throw new CmisConstraintException("Bulk update not supported for more than " + connector.getBulkMaxItems() + " objects.");
}
+ // WorkProvider
+ class WorkProvider implements BatchProcessWorkProvider
+ {
+ private final Iterator iterator;
+ private final int size;
+ private final int batchSize;
+ private BulkUpdateContext context;
+
+ public WorkProvider(List objectIdAndChangeTokens, BulkUpdateContext context, int batchSize)
+ {
+ this.iterator = objectIdAndChangeTokens.iterator();
+ this.size = objectIdAndChangeTokens.size();
+ this.context = context;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public synchronized int getTotalEstimatedWorkSize()
+ {
+ return size;
+ }
+
+ @Override
+ public synchronized Collection getNextWork()
+ {
+ Collection results = new ArrayList(batchSize);
+ while (results.size() < batchSize && iterator.hasNext())
+ {
+ results.add(new BulkEntry(context, iterator.next(), properties, addSecondaryTypeIds, removeSecondaryTypeIds, getContext().isObjectInfoRequired()));
+ }
+ return results;
+ }
+ }
+
BulkUpdateContext context = new BulkUpdateContext(objectIdAndChangeTokens.size());
RetryingTransactionHelper helper = connector.getRetryingTransactionHelper();
- for(BulkUpdateObjectIdAndChangeToken objectIdAndChangeToken : objectIdAndChangeTokens)
- {
- BulkUpdateCallback callback = new BulkUpdateCallback(context, objectIdAndChangeToken, properties, addSecondaryTypeIds, removeSecondaryTypeIds);
- helper.doInTransaction(callback, false, true);
- }
+ final String runAsUser = AuthenticationUtil.getRunAsUser();
+ // Worker
+ BatchProcessWorker worker = new BatchProcessWorker()
+ {
+ @Override
+ public void process(final BulkEntry entry) throws Throwable
+ {
+ entry.update();
+ }
+
+ public String getIdentifier(BulkEntry entry)
+ {
+ return entry.getObjectIdAndChangeToken().getId();
+ }
+
+ @Override
+ public void beforeProcess() throws Throwable
+ {
+ // Authentication
+ AuthenticationUtil.setFullyAuthenticatedUser(runAsUser);
+ }
+
+ @Override
+ public void afterProcess() throws Throwable
+ {
+ // Clear authentication
+ AuthenticationUtil.clearCurrentSecurityContext();
+ }
+ };
+
+ // Processor
+ BatchProcessor processor = new BatchProcessor(
+ "CMISbulkUpdateProperties",
+ helper,
+ new WorkProvider(objectIdAndChangeTokens, context, connector.getBulkBatchSize()),
+ connector.getBulkWorkerThreads(), connector.getBulkBatchSize(),
+ null,
+ logger, 100);
+ processor.process(worker, true);
+
for(CMISNodeInfo info : context.getSuccesses())
{
- NodeRef nodeRef = info.getNodeRef();
- connector.getActivityPoster().postFileFolderUpdated(info.isFolder(), nodeRef);
+ NodeRef nodeRef = info.getNodeRef();
+ connector.getActivityPoster().postFileFolderUpdated(info.isFolder(), nodeRef);
}
return context.getChanges();
}
- private class BulkUpdateCallback implements RetryingTransactionCallback
+ private class BulkEntry
{
- private String repositoryId;
- private BulkUpdateContext context;
+ private String repositoryId;
+ private BulkUpdateContext bulkUpdateContext;
- private BulkUpdateObjectIdAndChangeToken objectIdAndChangeToken;
- private Properties properties;
- private List addSecondaryTypeIds;
- private List removeSecondaryTypeIds;
+ private BulkUpdateObjectIdAndChangeToken objectIdAndChangeToken;
+ private Properties properties;
+ private List addSecondaryTypeIds;
+ private List removeSecondaryTypeIds;
+ private boolean isObjectInfoRequired;
- BulkUpdateCallback(BulkUpdateContext context, BulkUpdateObjectIdAndChangeToken objectIdAndChangeToken,
- Properties properties, List addSecondaryTypeIds, List removeSecondaryTypeIds)
- {
- this.context = context;
- this.objectIdAndChangeToken = objectIdAndChangeToken;
- this.properties = properties;
- this.addSecondaryTypeIds = addSecondaryTypeIds;
- this.removeSecondaryTypeIds = removeSecondaryTypeIds;
- }
-
- public Void execute() throws Exception
+ BulkEntry(BulkUpdateContext bulkUpdateContext, BulkUpdateObjectIdAndChangeToken objectIdAndChangeToken, Properties properties, List addSecondaryTypeIds,
+ List removeSecondaryTypeIds, boolean isObjectInfoRequired)
{
- try
- {
- String objectId = objectIdAndChangeToken.getId();
- final CMISNodeInfo info = getOrCreateNodeInfo(objectId, "Object");
-
- if(!info.isVariant(CMISObjectVariant.ASSOC) && !info.isVariant(CMISObjectVariant.VERSION))
- {
- final NodeRef nodeRef = info.getNodeRef();
-
- connector.setProperties(nodeRef, info.getType(), properties, new String[0]);
-
- boolean isObjectInfoRequired = getContext().isObjectInfoRequired();
- if (isObjectInfoRequired)
- {
- getObjectInfo(repositoryId, objectId, "*", IncludeRelationships.NONE);
- }
-
- connector.addSecondaryTypes(nodeRef, addSecondaryTypeIds);
- connector.removeSecondaryTypes(nodeRef, removeSecondaryTypeIds);
-
- if(properties.getProperties().size() > 0 || addSecondaryTypeIds.size() > 0 || removeSecondaryTypeIds.size() > 0)
- {
- context.success(info);
- }
- }
- }
- catch(Throwable t)
- {
- // catch all exceptions as per the CMIS specification. Only successful updates are recorded for return to the
- // client.
- }
+ this.bulkUpdateContext = bulkUpdateContext;
+ this.objectIdAndChangeToken = objectIdAndChangeToken;
+ this.properties = properties;
+ this.addSecondaryTypeIds = addSecondaryTypeIds;
+ this.removeSecondaryTypeIds = removeSecondaryTypeIds;
+ this.isObjectInfoRequired = isObjectInfoRequired;
+ }
- return null;
+ public void update()
+ {
+ String objectId = objectIdAndChangeToken.getId();
+ final CMISNodeInfo info = getOrCreateNodeInfo(objectId, "Object");
+
+ if (!info.isVariant(CMISObjectVariant.ASSOC) && !info.isVariant(CMISObjectVariant.VERSION))
+ {
+ final NodeRef nodeRef = info.getNodeRef();
+
+ connector.setProperties(nodeRef, info.getType(), properties, new String[0]);
+
+ if (isObjectInfoRequired)
+ {
+ getObjectInfo(repositoryId, objectId, "*", IncludeRelationships.NONE);
+ }
+
+ connector.addSecondaryTypes(nodeRef, addSecondaryTypeIds);
+ connector.removeSecondaryTypes(nodeRef, removeSecondaryTypeIds);
+
+ if (properties.getProperties().size() > 0 || addSecondaryTypeIds.size() > 0 || removeSecondaryTypeIds.size() > 0)
+ {
+ bulkUpdateContext.success(info);
+ }
+ }
};
+
+ public BulkUpdateObjectIdAndChangeToken getObjectIdAndChangeToken()
+ {
+ return objectIdAndChangeToken;
+ }
};
private static class BulkUpdateContext
{
- private List successes;
-
- BulkUpdateContext(int size)
- {
- this.successes = new ArrayList(size);
- }
-
- void success(CMISNodeInfo info)
- {
- successes.add(info);
- }
-
- List getSuccesses()
- {
- return successes;
- }
+ private Set successes;
+
+ BulkUpdateContext(int size)
+ {
+ this.successes = Collections.newSetFromMap(new ConcurrentHashMap());
+ }
+
+ void success(CMISNodeInfo info)
+ {
+ successes.add(info);
+ }
+
+ Set getSuccesses()
+ {
+ return successes;
+ }
- List getChanges()
- {
- List changes = new ArrayList(successes.size());
- for(CMISNodeInfo info : successes)
- {
- BulkUpdateObjectIdAndChangeTokenImpl a = new BulkUpdateObjectIdAndChangeTokenImpl();
- a.setId(info.getObjectId());
-// a.setNewId(info.getObjectId());
- changes.add(a);
- }
+ List getChanges()
+ {
+ List changes = new ArrayList(successes.size());
+ for(CMISNodeInfo info : successes)
+ {
+ BulkUpdateObjectIdAndChangeTokenImpl a = new BulkUpdateObjectIdAndChangeTokenImpl();
+ a.setId(info.getObjectId());
+// a.setNewId(info.getObjectId());
+ changes.add(a);
+ }
- return changes;
- }
+ return changes;
+ }
}
@Override
diff --git a/source/java/org/alfresco/opencmis/CMISConnector.java b/source/java/org/alfresco/opencmis/CMISConnector.java
index e76d49eb2c..5814fd92e4 100644
--- a/source/java/org/alfresco/opencmis/CMISConnector.java
+++ b/source/java/org/alfresco/opencmis/CMISConnector.java
@@ -351,6 +351,11 @@ public class CMISConnector implements ApplicationContextAware, ApplicationListen
private ObjectFilter objectFilter;
+ // Bulk update properties
+ private int bulkMaxItems = 1000;
+ private int bulkBatchSize = 20;
+ private int bulkWorkerThreads = 2;
+
// --------------------------------------------------------------
// Configuration
// --------------------------------------------------------------
@@ -764,6 +769,39 @@ public class CMISConnector implements ApplicationContextAware, ApplicationListen
return proxyUser;
}
+ /**
+ * Sets bulk update properties.
+ */
+ public void setBulkMaxItems(int size)
+ {
+ bulkMaxItems = size;
+ }
+
+ public int getBulkMaxItems()
+ {
+ return bulkMaxItems;
+ }
+
+ public void setBulkBatchSize(int size)
+ {
+ bulkBatchSize = size;
+ }
+
+ public int getBulkBatchSize()
+ {
+ return bulkBatchSize;
+ }
+
+ public void setBulkWorkerThreads(int threads)
+ {
+ bulkWorkerThreads = threads;
+ }
+
+ public int getBulkWorkerThreads()
+ {
+ return bulkWorkerThreads;
+ }
+
// --------------------------------------------------------------
// Lifecycle methods
// --------------------------------------------------------------