mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
Merged BRANCHES/V3.2 to HEAD:
18363: WCM clustering - ETHREEOH-3962 (duplicate root node entry) 19091: Fix Part 1 ALF-726: v3.1.x Content Cleaner Job needs to be ported to v3.2 19159: Fixed ALF-726: Migrate pre-3.2 content URLs to new format and pick up tag existing orphaned content 19169: Fix fallout from 19159 for ALF-726: Migrate pre-3.2 content URLs to new format and pick up tag existing orphaned content 19262: ALF-726 Multithreading for content URL conversion git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@19267 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -34,15 +34,12 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.alfresco.error.AlfrescoRuntimeException;
|
||||
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
|
||||
import org.alfresco.repo.security.authentication.AuthenticationUtil;
|
||||
import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork;
|
||||
import org.alfresco.repo.tenant.TenantService;
|
||||
import org.alfresco.repo.tenant.TenantUserService;
|
||||
import org.alfresco.repo.transaction.RetryingTransactionHelper;
|
||||
import org.alfresco.repo.transaction.TransactionListenerAdapter;
|
||||
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
|
||||
import org.alfresco.service.cmr.rule.RuleService;
|
||||
import org.alfresco.util.TraceableThreadFactory;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
|
||||
/**
|
||||
@@ -59,20 +56,15 @@ import org.springframework.context.ApplicationEventPublisher;
|
||||
*/
|
||||
public class BatchProcessor<T> implements BatchMonitor
|
||||
{
|
||||
/** The factory for all new threads */
|
||||
private TraceableThreadFactory threadFactory;
|
||||
|
||||
/** The logger to use. */
|
||||
private final Log logger;
|
||||
|
||||
/** The retrying transaction helper. */
|
||||
private final RetryingTransactionHelper retryingTransactionHelper;
|
||||
|
||||
/** The rule service. */
|
||||
private final RuleService ruleService;
|
||||
|
||||
/** The tenant user service. */
|
||||
private final TenantUserService tenantUserService;
|
||||
|
||||
private final String tenantDomain;
|
||||
|
||||
/** The collection. */
|
||||
private final Collection<T> collection;
|
||||
|
||||
@@ -113,84 +105,78 @@ public class BatchProcessor<T> implements BatchMonitor
|
||||
private Date endTime;
|
||||
|
||||
/**
|
||||
* Instantiates a new batch processor.
|
||||
* Instantiates a new batch processor using a the default logger, which references
|
||||
* this class as the log category.
|
||||
*
|
||||
* @param logger
|
||||
* the logger to use
|
||||
* @param retryingTransactionHelper
|
||||
* the retrying transaction helper
|
||||
* @param ruleService
|
||||
* the rule service
|
||||
* @param collection
|
||||
* the collection
|
||||
* @param processName
|
||||
* the process name
|
||||
* @param loggingInterval
|
||||
* the number of entries to process before reporting progress
|
||||
* @param applicationEventPublisher
|
||||
* the application event publisher
|
||||
* @param workerThreads
|
||||
* the number of worker threads
|
||||
* @param batchSize
|
||||
* the number of entries we process at a time in a transaction
|
||||
* @see #BatchProcessor(String, RetryingTransactionHelper, Collection, int, int, ApplicationEventPublisher, Log, int)
|
||||
*/
|
||||
public BatchProcessor(Log logger, RetryingTransactionHelper retryingTransactionHelper, RuleService ruleService,
|
||||
ApplicationEventPublisher applicationEventPublisher, Collection<T> collection, String processName,
|
||||
int loggingInterval, int workerThreads, int batchSize)
|
||||
public BatchProcessor(
|
||||
String processName,
|
||||
RetryingTransactionHelper retryingTransactionHelper,
|
||||
Collection<T> collection,
|
||||
int workerThreads, int batchSize)
|
||||
{
|
||||
this(logger, retryingTransactionHelper, ruleService, null, applicationEventPublisher, collection, processName,
|
||||
loggingInterval, workerThreads, batchSize);
|
||||
this(
|
||||
processName,
|
||||
retryingTransactionHelper,
|
||||
collection,
|
||||
workerThreads,
|
||||
batchSize, null, null, 1);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Instantiates a new batch processor.
|
||||
*
|
||||
* @param logger
|
||||
* the logger to use
|
||||
* @param retryingTransactionHelper
|
||||
* the retrying transaction helper
|
||||
* @param ruleService
|
||||
* the rule service
|
||||
* @param tenantUserService
|
||||
* the tenant user service
|
||||
* @param collection
|
||||
* the collection
|
||||
* @param processName
|
||||
* the process name
|
||||
* @param loggingInterval
|
||||
* the number of entries to process before reporting progress
|
||||
* @param applicationEventPublisher
|
||||
* the application event publisher
|
||||
* @param retryingTransactionHelper
|
||||
* the retrying transaction helper
|
||||
* @param collection
|
||||
* the collection
|
||||
* @param workerThreads
|
||||
* the number of worker threads
|
||||
* @param batchSize
|
||||
* the number of entries we process at a time in a transaction
|
||||
* @param applicationEventPublisher
|
||||
* the application event publisher (may be <tt>null</tt>)
|
||||
* @param logger
|
||||
* the logger to use (may be <tt>null</tt>)
|
||||
* @param loggingInterval
|
||||
* the number of entries to process before reporting progress
|
||||
*/
|
||||
public BatchProcessor(Log logger, RetryingTransactionHelper retryingTransactionHelper, RuleService ruleService,
|
||||
TenantUserService tenantUserService, ApplicationEventPublisher applicationEventPublisher, Collection<T> collection, String processName,
|
||||
int loggingInterval, int workerThreads, int batchSize)
|
||||
public BatchProcessor(
|
||||
String processName,
|
||||
RetryingTransactionHelper retryingTransactionHelper,
|
||||
Collection<T> collection,
|
||||
int workerThreads, int batchSize,
|
||||
ApplicationEventPublisher applicationEventPublisher,
|
||||
Log logger,
|
||||
int loggingInterval)
|
||||
{
|
||||
this.logger = logger;
|
||||
this.retryingTransactionHelper = retryingTransactionHelper;
|
||||
this.ruleService = ruleService;
|
||||
this.tenantUserService = tenantUserService;
|
||||
this.collection = collection;
|
||||
this.threadFactory = new TraceableThreadFactory();
|
||||
this.threadFactory.setNamePrefix(processName);
|
||||
this.threadFactory.setThreadDaemon(true);
|
||||
|
||||
this.processName = processName;
|
||||
this.loggingInterval = loggingInterval;
|
||||
this.retryingTransactionHelper = retryingTransactionHelper;
|
||||
this.collection = collection;
|
||||
this.workerThreads = workerThreads;
|
||||
this.batchSize = batchSize;
|
||||
|
||||
if (tenantUserService != null)
|
||||
if (logger == null)
|
||||
{
|
||||
this.tenantDomain = tenantUserService.getUserDomain(AuthenticationUtil.getRunAsUser());
|
||||
this.logger = LogFactory.getLog(this.getClass());
|
||||
}
|
||||
else
|
||||
{
|
||||
this.tenantDomain = TenantService.DEFAULT_DOMAIN;
|
||||
this.logger = logger;
|
||||
}
|
||||
this.loggingInterval = loggingInterval;
|
||||
|
||||
// Let the (enterprise) monitoring side know of our presence
|
||||
applicationEventPublisher.publishEvent(new BatchMonitorEvent(this));
|
||||
if (applicationEventPublisher != null)
|
||||
{
|
||||
applicationEventPublisher.publishEvent(new BatchMonitorEvent(this));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -302,14 +288,14 @@ public class BatchProcessor<T> implements BatchMonitor
|
||||
* the worker
|
||||
* @param splitTxns
|
||||
* Can the modifications to Alfresco be split across multiple transactions for maximum performance? If
|
||||
* <code>true</code>, worker invocations are isolated in separate transactions in batches of 10 for
|
||||
* <code>true</code>, worker invocations are isolated in separate transactions in batches for
|
||||
* increased performance. If <code>false</code>, all invocations are performed in the current
|
||||
* transaction. This is required if calling synchronously (e.g. in response to an authentication event in
|
||||
* the same transaction).
|
||||
* @return the number of invocations
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public int process(final Worker<T> worker, final boolean splitTxns)
|
||||
public int process(final BatchProcessWorker<T> worker, final boolean splitTxns)
|
||||
{
|
||||
int count = this.collection.size();
|
||||
synchronized (this)
|
||||
@@ -330,9 +316,10 @@ public class BatchProcessor<T> implements BatchMonitor
|
||||
}
|
||||
|
||||
// Create a thread pool executor with the specified number of threads and a finite blocking queue of jobs
|
||||
ExecutorService executorService = splitTxns && this.workerThreads > 1 ? new ThreadPoolExecutor(
|
||||
this.workerThreads, this.workerThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(
|
||||
this.workerThreads * this.batchSize * 10)
|
||||
ExecutorService executorService = splitTxns && this.workerThreads > 1 ?
|
||||
new ThreadPoolExecutor(
|
||||
this.workerThreads, this.workerThreads, 0L, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<Runnable>(this.workerThreads * this.batchSize * 10)
|
||||
{
|
||||
// Add blocking behaviour to work queue
|
||||
@Override
|
||||
@@ -349,7 +336,8 @@ public class BatchProcessor<T> implements BatchMonitor
|
||||
return true;
|
||||
}
|
||||
|
||||
}) : null;
|
||||
},
|
||||
threadFactory) : null;
|
||||
try
|
||||
{
|
||||
Iterator<T> iterator = this.collection.iterator();
|
||||
@@ -452,9 +440,8 @@ public class BatchProcessor<T> implements BatchMonitor
|
||||
/**
|
||||
* An interface for workers to be invoked by the {@link BatchProcessor}.
|
||||
*/
|
||||
public interface Worker<T>
|
||||
public interface BatchProcessWorker<T>
|
||||
{
|
||||
|
||||
/**
|
||||
* Gets an identifier for the given entry (for monitoring / logging purposes).
|
||||
*
|
||||
@@ -464,6 +451,14 @@ public class BatchProcessor<T> implements BatchMonitor
|
||||
*/
|
||||
public String getIdentifier(T entry);
|
||||
|
||||
/**
|
||||
* Callback to allow thread initialization before the work entries are
|
||||
* {@link #process(Object) processed}. Typically, this will include authenticating
|
||||
* as a valid user and disbling or enabling any system flags that might affect the
|
||||
* entry processing.
|
||||
*/
|
||||
public void beforeProcess() throws Throwable;
|
||||
|
||||
/**
|
||||
* Processes the given entry.
|
||||
*
|
||||
@@ -473,6 +468,38 @@ public class BatchProcessor<T> implements BatchMonitor
|
||||
* on any error
|
||||
*/
|
||||
public void process(T entry) throws Throwable;
|
||||
|
||||
/**
|
||||
* Callback to allow thread cleanup after the work entries have been
|
||||
* {@link #process(Object) processed}.
|
||||
* Typically, this will involve cleanup of authentication and resetting any
|
||||
* system flags previously set.
|
||||
* <p/>
|
||||
* This call is made regardless of the outcome of the entry processing.
|
||||
*/
|
||||
public void afterProcess() throws Throwable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adaptor that allows implementations to only implement {@link #process(Object)}
|
||||
*/
|
||||
public static abstract class BatchProcessWorkerAdaptor<TT> implements BatchProcessWorker<TT>
|
||||
{
|
||||
/**
|
||||
* @return Returns the <code>toString()</code> of the entry
|
||||
*/
|
||||
public String getIdentifier(TT entry)
|
||||
{
|
||||
return entry.toString();
|
||||
}
|
||||
/** No-op */
|
||||
public void beforeProcess() throws Throwable
|
||||
{
|
||||
}
|
||||
/** No-op */
|
||||
public void afterProcess() throws Throwable
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -491,7 +518,7 @@ public class BatchProcessor<T> implements BatchMonitor
|
||||
* @param splitTxns
|
||||
* If <code>true</code>, the worker invocation is made in a new transaction.
|
||||
*/
|
||||
public TxnCallback(Worker<T> worker, List<T> batch, boolean splitTxns)
|
||||
public TxnCallback(BatchProcessWorker<T> worker, List<T> batch, boolean splitTxns)
|
||||
{
|
||||
this.worker = worker;
|
||||
this.batch = batch;
|
||||
@@ -499,7 +526,7 @@ public class BatchProcessor<T> implements BatchMonitor
|
||||
}
|
||||
|
||||
/** The worker. */
|
||||
private final Worker<T> worker;
|
||||
private final BatchProcessWorker<T> worker;
|
||||
|
||||
/** The batch. */
|
||||
private final List<T> batch;
|
||||
@@ -602,26 +629,21 @@ public class BatchProcessor<T> implements BatchMonitor
|
||||
*/
|
||||
public void run()
|
||||
{
|
||||
// Disable rules for this thread
|
||||
BatchProcessor.this.ruleService.disableRules();
|
||||
try
|
||||
{
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
BatchProcessor.this.logger.error("Failed to cleanup Worker after processing.", e);
|
||||
}
|
||||
|
||||
|
||||
final BatchProcessor<T>.TxnCallback callback = this;
|
||||
try
|
||||
{
|
||||
String systemUser = AuthenticationUtil.getSystemUserName();
|
||||
if (tenantUserService != null)
|
||||
{
|
||||
systemUser = tenantUserService.getDomainUser(AuthenticationUtil.getSystemUserName(), tenantDomain);
|
||||
}
|
||||
|
||||
AuthenticationUtil.runAs(new RunAsWork<Void>()
|
||||
{
|
||||
public Void doWork() throws Exception
|
||||
{
|
||||
BatchProcessor.this.retryingTransactionHelper.doInTransaction(callback, false, splitTxns);
|
||||
return null;
|
||||
}
|
||||
}, systemUser);
|
||||
worker.beforeProcess();
|
||||
BatchProcessor.this.retryingTransactionHelper.doInTransaction(callback, false, splitTxns);
|
||||
worker.afterProcess();
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
@@ -651,11 +673,6 @@ public class BatchProcessor<T> implements BatchMonitor
|
||||
throw new AlfrescoRuntimeException("Transactional error during " + getProcessName(), t);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Re-enable rules
|
||||
BatchProcessor.this.ruleService.enableRules();
|
||||
}
|
||||
|
||||
commitProgress();
|
||||
}
|
||||
|
Reference in New Issue
Block a user