Merged 5.2.N (5.2.1) to HEAD (5.2)

126003 aleahu: Merged 5.1.N (5.1.2) to 5.2.N (5.2.1)
      125891 adragoi: Merged 5.0.N (5.0.4) to 5.1.N (5.1.2)
         125841 rmunteanu: Merged V4.2-BUG-FIX (4.2.7) to 5.0.N (5.0.4)
            125680 sglover: Merged BRANCHES/DEV/SG/MNT15914 to V4.2-BUG-FIX (4.2.7)
                  125540 sglover MNT-15914 "Add multi-threaded workers for FixedAclUpdater"


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@127834 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Alan Davis
2016-06-06 08:34:20 +00:00
parent 61837639c8
commit 640535fd09
4 changed files with 287 additions and 196 deletions

View File

@@ -26,17 +26,21 @@
package org.alfresco.repo.domain.permissions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.batch.BatchProcessWorkProvider;
import org.alfresco.repo.batch.BatchProcessor;
import org.alfresco.repo.domain.node.NodeDAO;
import org.alfresco.repo.domain.node.NodeDAO.NodeRefQueryCallback;
import org.alfresco.repo.lock.JobLockService;
import org.alfresco.repo.lock.LockAcquisitionException;
import org.alfresco.repo.lock.JobLockService.JobLockRefreshCallback;
import org.alfresco.repo.lock.LockAcquisitionException;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
@@ -46,17 +50,20 @@ import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.Pair;
import org.alfresco.util.VmShutdownListener.VmShutdownException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
/**
* Finds nodes with ASPECT_PENDING_FIX_ACL aspect and sets fixed ACLs for them
*
* @author Andreea Dragoi
* @author sglover
* @since 4.2.7
*/
public class FixedAclUpdater extends TransactionListenerAdapter
public class FixedAclUpdater extends TransactionListenerAdapter implements ApplicationContextAware
{
private static final Log log = LogFactory.getLog(FixedAclUpdater.class);
private static final Set<QName> PENDING_FIX_ACL_ASPECT_PROPS = pendingFixAclAspectProps();
@@ -64,6 +71,7 @@ public class FixedAclUpdater extends TransactionListenerAdapter
public static final String FIXED_ACL_ASYNC_REQUIRED_KEY = "FIXED_ACL_ASYNC_REQUIRED";
public static final String FIXED_ACL_ASYNC_CALL_KEY = "FIXED_ACL_ASYNC_CALL";
private ApplicationContext applicationContext;
private JobLockService jobLockService;
private TransactionService transactionService;
private AccessControlListDAO accessControlListDAO;
@@ -71,7 +79,20 @@ public class FixedAclUpdater extends TransactionListenerAdapter
private QName lockQName = QName.createQName(NamespaceService.SYSTEM_MODEL_1_0_URI, "FixedAclUpdater");
private long lockTimeToLive = 10000;
private long lockRefreshTime = lockTimeToLive / 2;
private int maxItemBatchSize = 100;
private int numThreads = 4;
public void setNumThreads(int numThreads)
{
this.numThreads = numThreads;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
{
this.applicationContext = applicationContext;
};
public void setJobLockService(JobLockService jobLockService)
{
@@ -104,161 +125,160 @@ public class FixedAclUpdater extends TransactionListenerAdapter
this.lockRefreshTime = lockTimeToLive / 2;
}
private static Set<QName> pendingFixAclAspectProps()
private class GetNodesWithAspects
{
Set<QName> props = new HashSet<>();
props.add(ContentModel.PROP_SHARED_ACL_TO_REPLACE);
props.add(ContentModel.PROP_INHERIT_FROM_ACL);
return props;
}
private Set<QName> aspects;
private int workSize;
private GetNodesWithAspectCallback getNodesCallback;
private int findAndUpdateAcl(FixedAclUpdaterJobLockRefreshCallback jobCallback)
{
final Set<QName> aspects = new HashSet<>(1);
aspects.add(ContentModel.ASPECT_PENDING_FIX_ACL);
List<NodeRef> nodesToUpdate = getNodesWithAspects(aspects);
int processedNodes = 0;
// loop over results
for (final NodeRef nodeRef : nodesToUpdate)
GetNodesWithAspects(Set<QName> aspects)
{
// Check if we have been terminated
if (!jobCallback.isActive.get())
{
if (log.isDebugEnabled())
{
log.debug(String.format("Processing node failed %s. Job not active", nodeRef));
}
// terminate
break;
}
if (log.isDebugEnabled())
{
log.debug(String.format("Processing node %s", nodeRef));
}
final Long nodeId = nodeDAO.getNodePair(nodeRef).getFirst();
this.aspects = aspects;
try
{
transactionService.getRetryingTransactionHelper().doInTransaction(new RetryingTransactionCallback<Void>()
{
public Void execute() throws Throwable
this.getNodesCallback = new GetNodesWithAspectCallback();
this.workSize = countNodesWithAspects();
}
int getWorkSize()
{
return workSize;
}
List<NodeRef> getNodesWithAspects()
{
List<NodeRef> nodes = transactionService.getRetryingTransactionHelper()
.doInTransaction(new RetryingTransactionCallback<List<NodeRef>>()
{
// retrieve acl properties from node
Long inheritFrom = (Long) nodeDAO.getNodeProperty(nodeId, ContentModel.PROP_INHERIT_FROM_ACL);
Long sharedAclToReplace = (Long) nodeDAO.getNodeProperty(nodeId, ContentModel.PROP_SHARED_ACL_TO_REPLACE);
// set inheritance using retrieved prop
accessControlListDAO.setInheritanceForChildren(nodeRef, inheritFrom, sharedAclToReplace, true);
nodeDAO.removeNodeAspects(nodeId, aspects);
nodeDAO.removeNodeProperties(nodeId, PENDING_FIX_ACL_ASPECT_PROPS);
if (log.isDebugEnabled())
@Override
public List<NodeRef> execute() throws Throwable
{
log.debug(String.format("Node processed ", nodeRef));
getNodesCallback.init();
nodeDAO.getNodesWithAspects(aspects, getNodesCallback.getMinNodeId(), null, getNodesCallback);
getNodesCallback.done();
return getNodesCallback.getNodes();
}
return null;
}, false, true);
return nodes;
}
int countNodesWithAspects()
{
final CountNodesWithAspectCallback countNodesCallback = new CountNodesWithAspectCallback();
int count = transactionService.getRetryingTransactionHelper()
.doInTransaction(new RetryingTransactionCallback<Integer>()
{
@Override
public Integer execute() throws Throwable
{
nodeDAO.getNodesWithAspects(aspects, 0l, null, countNodesCallback);
return countNodesCallback.getCount();
}
}, false, true);
return count;
}
}
private class AclWorkProvider implements BatchProcessWorkProvider<NodeRef>
{
private GetNodesWithAspects getNodesWithAspects;
AclWorkProvider()
{
getNodesWithAspects = new GetNodesWithAspects(Collections.singleton(ContentModel.ASPECT_PENDING_FIX_ACL));
}
@Override
public int getTotalEstimatedWorkSize()
{
int workSize = getNodesWithAspects.getWorkSize();
return workSize;
}
@Override
public Collection<NodeRef> getNextWork()
{
return getNodesWithAspects.getNodesWithAspects();
}
}
private class AclWorker implements BatchProcessor.BatchProcessWorker<NodeRef>
{
private Set<QName> aspects = new HashSet<>(1);
AclWorker()
{
aspects.add(ContentModel.ASPECT_PENDING_FIX_ACL);
}
public String getIdentifier(NodeRef nodeRef)
{
return String.valueOf(nodeRef.toString());
}
public void beforeProcess() throws Throwable
{
}
public void afterProcess() throws Throwable
{
}
public void process(final NodeRef nodeRef) throws Throwable
{
RunAsWork<Void> findAndUpdateAclRunAsWork = new RunAsWork<Void>()
{
@Override
public Void doWork() throws Exception
{
if (log.isDebugEnabled())
{
log.debug(String.format("Processing node %s", nodeRef));
}
}, false, true);
}
catch (Exception e)
{
if (log.isDebugEnabled())
{
log.debug(String.format("Could not process node ", nodeRef), e);
final Long nodeId = nodeDAO.getNodePair(nodeRef).getFirst();
// retrieve acl properties from node
Long inheritFrom = (Long) nodeDAO.getNodeProperty(nodeId,
ContentModel.PROP_INHERIT_FROM_ACL);
Long sharedAclToReplace = (Long) nodeDAO.getNodeProperty(nodeId,
ContentModel.PROP_SHARED_ACL_TO_REPLACE);
// set inheritance using retrieved prop
accessControlListDAO.setInheritanceForChildren(nodeRef, inheritFrom, sharedAclToReplace,
true);
nodeDAO.removeNodeAspects(nodeId, aspects);
nodeDAO.removeNodeProperties(nodeId, PENDING_FIX_ACL_ASPECT_PROPS);
if (log.isDebugEnabled())
{
log.debug(String.format("Node processed %s", nodeRef));
}
return null;
}
}
processedNodes++;
};
AuthenticationUtil.runAs(findAndUpdateAclRunAsWork, AuthenticationUtil.getSystemUserName());
}
if (log.isDebugEnabled())
{
log.debug(String.format("Nodes found %s; nodes processed %s", nodesToUpdate.size(), processedNodes));
}
return processedNodes;
}
public void execute()
{
String lockToken = null;
FixedAclUpdaterJobLockRefreshCallback callback = new FixedAclUpdaterJobLockRefreshCallback();
try
{
RunAsWork<Integer> findAndUpdateAclRunAsWork = findAndUpdateAclRunAsWork(callback);
lockToken = jobLockService.getLock(lockQName, lockTimeToLive, 0, 1);
while (true)
{
jobLockService.refreshLock(lockToken, lockQName, lockRefreshTime, callback);
Integer processed = AuthenticationUtil.runAs(findAndUpdateAclRunAsWork, AuthenticationUtil.getSystemUserName());
if (processed.intValue() == 0)
{
// There is no more to process
break;
}
// There is still more to process, so continue
}
}
catch (LockAcquisitionException e)
{
// already running
}
catch (VmShutdownException e)
{
if (log.isDebugEnabled())
{
log.debug("FixedAclUpdater aborted");
}
}
finally
{
callback.isActive.set(false);
jobLockService.releaseLock(lockToken, lockQName);
}
}
private RunAsWork<Integer> findAndUpdateAclRunAsWork(final FixedAclUpdaterJobLockRefreshCallback callback)
{
final RetryingTransactionCallback<Integer> findAndUpdateAclWork = new RetryingTransactionCallback<Integer>()
{
public Integer execute() throws Exception
{
return findAndUpdateAcl(callback);
}
};
// execute as system user to ensure fast, accurate results
RunAsWork<Integer> findAndUpdateAclRunAsWork = new RunAsWork<Integer>()
{
@Override
public Integer doWork() throws Exception
{
return transactionService.getRetryingTransactionHelper().doInTransaction(findAndUpdateAclWork, false, true);
}
};
return findAndUpdateAclRunAsWork;
}
private List<NodeRef> getNodesWithAspects(final Set<QName> aspects)
{
return transactionService.getRetryingTransactionHelper().doInTransaction(new RetryingTransactionCallback<List<NodeRef>>()
{
@Override
public List<NodeRef> execute() throws Throwable
{
GetNodesWithAspectCallback callback = new GetNodesWithAspectCallback();
nodeDAO.getNodesWithAspects(aspects, 1L, null, callback);
return callback.getNodes();
}
}, false, true);
}
};
private class GetNodesWithAspectCallback implements NodeRefQueryCallback
{
private List<NodeRef> nodes = new ArrayList<>();
private long minNodeId;
private long maxNodeId;
void init()
{
nodes.clear();
}
void done()
{
this.minNodeId = maxNodeId + 1;
}
@Override
public boolean handle(Pair<Long, NodeRef> nodePair)
@@ -266,17 +286,40 @@ public class FixedAclUpdater extends TransactionListenerAdapter
if (nodes.size() < maxItemBatchSize)
{
nodes.add(nodePair.getSecond());
maxNodeId = nodePair.getFirst();
return true;
}
return false;
}
long getMinNodeId()
{
return minNodeId;
}
public List<NodeRef> getNodes()
{
return nodes;
}
}
private class CountNodesWithAspectCallback implements NodeRefQueryCallback
{
private int count = 0;
@Override
public boolean handle(Pair<Long, NodeRef> nodePair)
{
count++;
return true;
}
public int getCount()
{
return count;
}
}
private static class FixedAclUpdaterJobLockRefreshCallback implements JobLockRefreshCallback
{
public AtomicBoolean isActive = new AtomicBoolean(true);
@@ -294,17 +337,54 @@ public class FixedAclUpdater extends TransactionListenerAdapter
}
}
private static Set<QName> pendingFixAclAspectProps()
{
Set<QName> props = new HashSet<>();
props.add(ContentModel.PROP_SHARED_ACL_TO_REPLACE);
props.add(ContentModel.PROP_INHERIT_FROM_ACL);
return props;
}
public int execute()
{
String lockToken = null;
FixedAclUpdaterJobLockRefreshCallback jobLockRefreshCallback = new FixedAclUpdaterJobLockRefreshCallback();
try
{
lockToken = jobLockService.getLock(lockQName, lockTimeToLive, 0, 1);
jobLockService.refreshLock(lockToken, lockQName, lockRefreshTime, jobLockRefreshCallback);
AclWorkProvider provider = new AclWorkProvider();
AclWorker worker = new AclWorker();
BatchProcessor<NodeRef> bp = new BatchProcessor<>(
"FixedAclUpdater",
transactionService.getRetryingTransactionHelper(),
provider,
numThreads, maxItemBatchSize,
applicationContext,
log, 100);
int count = bp.process(worker, true);
return count;
}
catch (LockAcquisitionException e)
{
// already running
return 0;
}
finally
{
jobLockRefreshCallback.isActive.set(false);
if(lockToken != null)
{
jobLockService.releaseLock(lockToken, lockQName);
}
}
}
@Override
public void afterCommit()
{
Thread t = new Thread(new Runnable()
{
@Override
public void run()
{
execute();
}
});
t.start();
execute();
}
}