mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-06-30 18:15:39 +00:00
99377: BENCH-369: BM-0004: API and internals for Alfresco server - Move commons Math3 to 'core' project for general reuse - Clone NormalDistributionHelper class from Benchmark projects to Alfresco 'core' - API added: http://localhost:8080/alfresco/s/api/model/filefolder/load JSON: { "folderPath":"/Sites/t2/documentLibrary", "fileCount":"1", "minFileSize":"1024", "maxFileSize":"2048", "maxUniqueDocuments":"10000" } - Above JSON will create 1 file in the 't2' site document library with spoofed plain text - Change away from deprecated API for TransactionListenerAdapter - Fix imports and neatness - Improve FileNotFoundException details - Disable timestamp propagation on the parent folder to reduce CPU overhead - Document changes relating to the addition of cm:description properties - Add options to control generation of MLText cm:description fields - descriptionCount: number of cm:description translations to include - descriptionSize: size in bytes of each cm:description translation - Use released 'alfresco-text-gen' V1.1 - Use fixed text-gen component to prevent ArrayIndexOutOfBOunds - Tighten up error message when errors occur on reading content strings - Fix random seed generation bug git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@99503 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
900 lines
32 KiB
Java
900 lines
32 KiB
Java
/*
|
|
* Copyright (C) 2005-2010 Alfresco Software Limited.
|
|
*
|
|
* This file is part of Alfresco
|
|
*
|
|
* Alfresco is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Lesser General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* Alfresco is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public License
|
|
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
package org.alfresco.repo.batch;
|
|
|
|
import java.io.PrintWriter;
|
|
import java.io.StringWriter;
|
|
import java.io.Writer;
|
|
import java.text.NumberFormat;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collections;
|
|
import java.util.Date;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.NoSuchElementException;
|
|
import java.util.SortedSet;
|
|
import java.util.TreeSet;
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import org.alfresco.error.AlfrescoRuntimeException;
|
|
import org.alfresco.repo.node.integrity.IntegrityException;
|
|
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
|
|
import org.alfresco.repo.transaction.RetryingTransactionHelper;
|
|
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
|
|
import org.alfresco.util.TraceableThreadFactory;
|
|
import org.alfresco.util.transaction.TransactionListenerAdapter;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.springframework.context.ApplicationEventPublisher;
|
|
|
|
/**
|
|
* A <code>BatchProcessor</code> manages the running and monitoring of a potentially long-running transactional batch
|
|
* process. It iterates over a collection, and queues jobs that fire a worker on a batch of members. The queued jobs
|
|
* handle progress / error reporting, transaction delineation and retrying. They are processed in parallel by a pool of
|
|
* threads of a configurable size. The job processing is designed to be fault tolerant and will continue in the event of
|
|
* errors. When the batch is complete a summary of the number of errors and the last error stack trace will be logged at
|
|
* ERROR level. Each individual error is logged at WARN level and progress information is logged at INFO level. Through
|
|
* the {@link BatchMonitor} interface, it also supports the real-time monitoring of batch metrics (e.g. over JMX in the
|
|
* Enterprise Edition).
|
|
*
|
|
* @author dward
|
|
*/
|
|
public class BatchProcessor<T> implements BatchMonitor
|
|
{
|
|
/** The factory for all new threads */
|
|
private TraceableThreadFactory threadFactory;
|
|
|
|
/** The logger to use. */
|
|
private final Log logger;
|
|
|
|
/** The retrying transaction helper. */
|
|
private final RetryingTransactionHelper retryingTransactionHelper;
|
|
|
|
/** The source of the work being done. */
|
|
private BatchProcessWorkProvider<T> workProvider;
|
|
|
|
/** The process name. */
|
|
private final String processName;
|
|
|
|
/** The number of entries to process before reporting progress. */
|
|
private final int loggingInterval;
|
|
|
|
/** The number of worker threads. */
|
|
private final int workerThreads;
|
|
|
|
/** The number of entries we process at a time in a transaction. */
|
|
private final int batchSize;
|
|
|
|
/** The current entry id. */
|
|
private String currentEntryId;
|
|
|
|
/** The number of batches currently executing. */
|
|
private int executingCount;
|
|
|
|
/** What transactions need to be retried?. We do these single-threaded in order to avoid cross-dependency issues */
|
|
private SortedSet<Integer> retryTxns = new TreeSet<Integer>();
|
|
|
|
/** The last error. */
|
|
private Throwable lastError;
|
|
|
|
/** The last error entry id. */
|
|
private String lastErrorEntryId;
|
|
|
|
/** The total number of errors. */
|
|
private int totalErrors;
|
|
|
|
/** The number of successfully processed entries. */
|
|
private int successfullyProcessedEntries;
|
|
|
|
/** The start time. */
|
|
private Date startTime;
|
|
|
|
/** The end time. */
|
|
private Date endTime;
|
|
|
|
/**
|
|
* Instantiates a new batch processor.
|
|
*
|
|
* @param processName
|
|
* the process name
|
|
* @param retryingTransactionHelper
|
|
* the retrying transaction helper
|
|
* @param collection
|
|
* the collection
|
|
* @param workerThreads
|
|
* the number of worker threads
|
|
* @param batchSize
|
|
* the number of entries we process at a time in a transaction
|
|
* @param applicationEventPublisher
|
|
* the application event publisher (may be <tt>null</tt>)
|
|
* @param logger
|
|
* the logger to use (may be <tt>null</tt>)
|
|
* @param loggingInterval
|
|
* the number of entries to process before reporting progress
|
|
*
|
|
* @deprecated Since 3.4, use the {@link BatchProcessWorkProvider} instead of the <tt>Collection</tt>
|
|
*/
|
|
public BatchProcessor(
|
|
String processName,
|
|
RetryingTransactionHelper retryingTransactionHelper,
|
|
final Collection<T> collection,
|
|
int workerThreads, int batchSize,
|
|
ApplicationEventPublisher applicationEventPublisher,
|
|
Log logger,
|
|
int loggingInterval)
|
|
{
|
|
this(
|
|
processName,
|
|
retryingTransactionHelper,
|
|
new BatchProcessWorkProvider<T>()
|
|
{
|
|
boolean hasMore = true;
|
|
public int getTotalEstimatedWorkSize()
|
|
{
|
|
return collection.size();
|
|
}
|
|
public Collection<T> getNextWork()
|
|
{
|
|
// Only return the collection once
|
|
if (hasMore)
|
|
{
|
|
hasMore = false;
|
|
return collection;
|
|
}
|
|
else
|
|
{
|
|
return Collections.emptyList();
|
|
}
|
|
}
|
|
},
|
|
workerThreads, batchSize,
|
|
applicationEventPublisher, logger, loggingInterval);
|
|
}
|
|
|
|
/**
|
|
* Instantiates a new batch processor.
|
|
*
|
|
* @param processName
|
|
* the process name
|
|
* @param retryingTransactionHelper
|
|
* the retrying transaction helper
|
|
* @param workProvider
|
|
* the object providing the work packets
|
|
* @param workerThreads
|
|
* the number of worker threads
|
|
* @param batchSize
|
|
* the number of entries we process at a time in a transaction
|
|
* @param applicationEventPublisher
|
|
* the application event publisher (may be <tt>null</tt>)
|
|
* @param logger
|
|
* the logger to use (may be <tt>null</tt>)
|
|
* @param loggingInterval
|
|
* the number of entries to process before reporting progress
|
|
*
|
|
* @since 3.4
|
|
*/
|
|
public BatchProcessor(
|
|
String processName,
|
|
RetryingTransactionHelper retryingTransactionHelper,
|
|
BatchProcessWorkProvider<T> workProvider,
|
|
int workerThreads, int batchSize,
|
|
ApplicationEventPublisher applicationEventPublisher,
|
|
Log logger,
|
|
int loggingInterval)
|
|
{
|
|
this.threadFactory = new TraceableThreadFactory();
|
|
this.threadFactory.setNamePrefix(processName);
|
|
this.threadFactory.setThreadDaemon(true);
|
|
|
|
this.processName = processName;
|
|
this.retryingTransactionHelper = retryingTransactionHelper;
|
|
this.workProvider = workProvider;
|
|
this.workerThreads = workerThreads;
|
|
this.batchSize = batchSize;
|
|
if (logger == null)
|
|
{
|
|
this.logger = LogFactory.getLog(this.getClass());
|
|
}
|
|
else
|
|
{
|
|
this.logger = logger;
|
|
}
|
|
this.loggingInterval = loggingInterval;
|
|
|
|
// Let the (enterprise) monitoring side know of our presence
|
|
if (applicationEventPublisher != null)
|
|
{
|
|
applicationEventPublisher.publishEvent(new BatchMonitorEvent(this));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritDoc}
|
|
*/
|
|
public synchronized String getCurrentEntryId()
|
|
{
|
|
return this.currentEntryId;
|
|
}
|
|
|
|
/**
|
|
* {@inheritDoc}
|
|
*/
|
|
public synchronized String getLastError()
|
|
{
|
|
if (this.lastError == null)
|
|
{
|
|
return null;
|
|
}
|
|
Writer buff = new StringWriter(1024);
|
|
PrintWriter out = new PrintWriter(buff);
|
|
this.lastError.printStackTrace(out);
|
|
out.close();
|
|
return buff.toString();
|
|
}
|
|
|
|
/**
|
|
* {@inheritDoc}
|
|
*/
|
|
public synchronized String getLastErrorEntryId()
|
|
{
|
|
return this.lastErrorEntryId;
|
|
}
|
|
|
|
/**
|
|
* {@inheritDoc}
|
|
*/
|
|
public synchronized String getProcessName()
|
|
{
|
|
return this.processName;
|
|
}
|
|
|
|
/**
|
|
* {@inheritDoc}
|
|
*/
|
|
public synchronized int getSuccessfullyProcessedEntries()
|
|
{
|
|
return this.successfullyProcessedEntries;
|
|
}
|
|
|
|
/**
|
|
* {@inheritDoc}
|
|
*/
|
|
public synchronized String getPercentComplete()
|
|
{
|
|
int totalResults = this.workProvider.getTotalEstimatedWorkSize();
|
|
int processed = this.successfullyProcessedEntries + this.totalErrors;
|
|
return processed <= totalResults ? NumberFormat.getPercentInstance().format(
|
|
totalResults == 0 ? 1.0F : (float) processed / totalResults) : "Unknown";
|
|
}
|
|
|
|
/**
|
|
* {@inheritDoc}
|
|
*/
|
|
public synchronized int getTotalErrors()
|
|
{
|
|
return this.totalErrors;
|
|
}
|
|
|
|
/**
|
|
* {@inheritDoc}
|
|
*/
|
|
public int getTotalResults()
|
|
{
|
|
return this.workProvider.getTotalEstimatedWorkSize();
|
|
}
|
|
|
|
/**
|
|
* {@inheritDoc}
|
|
*/
|
|
public synchronized Date getEndTime()
|
|
{
|
|
return this.endTime;
|
|
}
|
|
|
|
/**
|
|
* {@inheritDoc}
|
|
*/
|
|
public synchronized Date getStartTime()
|
|
{
|
|
return this.startTime;
|
|
}
|
|
|
|
/**
|
|
* Invokes the worker for each entry in the collection, managing transactions and collating success / failure
|
|
* information.
|
|
*
|
|
* @param worker
|
|
* the worker
|
|
* @param splitTxns
|
|
* Can the modifications to Alfresco be split across multiple transactions for maximum performance? If
|
|
* <code>true</code>, worker invocations are isolated in separate transactions in batches for
|
|
* increased performance. If <code>false</code>, all invocations are performed in the current
|
|
* transaction. This is required if calling synchronously (e.g. in response to an authentication event in
|
|
* the same transaction).
|
|
* @return the number of invocations
|
|
*/
|
|
@SuppressWarnings("serial")
|
|
public int process(final BatchProcessWorker<T> worker, final boolean splitTxns)
|
|
{
|
|
int count = workProvider.getTotalEstimatedWorkSize();
|
|
synchronized (this)
|
|
{
|
|
this.startTime = new Date();
|
|
if (this.logger.isInfoEnabled())
|
|
{
|
|
if (count >= 0)
|
|
{
|
|
this.logger.info(getProcessName() + ": Commencing batch of " + count + " entries");
|
|
}
|
|
else
|
|
{
|
|
this.logger.info(getProcessName() + ": Commencing batch");
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create a thread pool executor with the specified number of threads and a finite blocking queue of jobs
|
|
ExecutorService executorService = splitTxns && this.workerThreads > 1 ?
|
|
new ThreadPoolExecutor(
|
|
this.workerThreads, this.workerThreads, 0L, TimeUnit.MILLISECONDS,
|
|
new ArrayBlockingQueue<Runnable>(this.workerThreads * this.batchSize * 10)
|
|
{
|
|
// Add blocking behaviour to work queue
|
|
@Override
|
|
public boolean offer(Runnable o)
|
|
{
|
|
try
|
|
{
|
|
put(o);
|
|
}
|
|
catch (InterruptedException e)
|
|
{
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
},
|
|
threadFactory) : null;
|
|
try
|
|
{
|
|
Iterator<T> iterator = new WorkProviderIterator<T>(this.workProvider);
|
|
int id=0;
|
|
List<T> batch = new ArrayList<T>(this.batchSize);
|
|
while (iterator.hasNext())
|
|
{
|
|
batch.add(iterator.next());
|
|
boolean hasNext = iterator.hasNext();
|
|
if (batch.size() >= this.batchSize || !hasNext)
|
|
{
|
|
final TxnCallback callback = new TxnCallback(id++, worker, batch, splitTxns);
|
|
if (hasNext)
|
|
{
|
|
batch = new ArrayList<T>(this.batchSize);
|
|
}
|
|
|
|
if (executorService == null)
|
|
{
|
|
callback.run();
|
|
}
|
|
else
|
|
{
|
|
executorService.execute(callback);
|
|
}
|
|
}
|
|
}
|
|
return count;
|
|
}
|
|
finally
|
|
{
|
|
if (executorService != null)
|
|
{
|
|
executorService.shutdown();
|
|
try
|
|
{
|
|
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
|
|
}
|
|
catch (InterruptedException e)
|
|
{
|
|
}
|
|
}
|
|
synchronized (this)
|
|
{
|
|
reportProgress(true);
|
|
this.endTime = new Date();
|
|
if (this.logger.isInfoEnabled())
|
|
{
|
|
if (count >= 0)
|
|
{
|
|
this.logger.info(getProcessName() + ": Completed batch of " + count + " entries");
|
|
}
|
|
else
|
|
{
|
|
this.logger.info(getProcessName() + ": Completed batch");
|
|
|
|
}
|
|
}
|
|
if (this.totalErrors > 0 && this.logger.isErrorEnabled())
|
|
{
|
|
this.logger.error(getProcessName() + ": " + this.totalErrors
|
|
+ " error(s) detected. Last error from entry \"" + this.lastErrorEntryId + "\"",
|
|
this.lastError);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Reports the current progress.
|
|
*
|
|
* @param last
|
|
* Have all jobs been processed? If <code>false</code> then progress is only reported after the number of
|
|
* entries indicated by {@link #loggingInterval}. If <code>true</code> then progress is reported if this
|
|
* is not one of the entries indicated by {@link #loggingInterval}.
|
|
*/
|
|
private synchronized void reportProgress(boolean last)
|
|
{
|
|
int processed = this.successfullyProcessedEntries + this.totalErrors;
|
|
if (processed % this.loggingInterval == 0 ^ last)
|
|
{
|
|
StringBuilder message = new StringBuilder(100).append(getProcessName()).append(": Processed ").append(
|
|
processed).append(" entries");
|
|
int totalResults = this.workProvider.getTotalEstimatedWorkSize();
|
|
if (totalResults >= processed)
|
|
{
|
|
message.append(" out of ").append(totalResults).append(". ").append(
|
|
NumberFormat.getPercentInstance().format(
|
|
totalResults == 0 ? 1.0F : (float) processed / totalResults)).append(" complete");
|
|
}
|
|
long duration = System.currentTimeMillis() - this.startTime.getTime();
|
|
if (duration > 0)
|
|
{
|
|
message.append(". Rate: ").append(processed * 1000L / duration).append(" per second");
|
|
}
|
|
message.append(". " + this.totalErrors + " failures detected.");
|
|
this.logger.info(message);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* An interface for workers to be invoked by the {@link BatchProcessor}.
|
|
*/
|
|
public interface BatchProcessWorker<T>
|
|
{
|
|
/**
|
|
* Gets an identifier for the given entry (for monitoring / logging purposes).
|
|
*
|
|
* @param entry
|
|
* the entry
|
|
* @return the identifier
|
|
*/
|
|
public String getIdentifier(T entry);
|
|
|
|
/**
|
|
* Callback to allow thread initialization before the work entries are
|
|
* {@link #process(Object) processed}. Typically, this will include authenticating
|
|
* as a valid user and disbling or enabling any system flags that might affect the
|
|
* entry processing.
|
|
*/
|
|
public void beforeProcess() throws Throwable;
|
|
|
|
/**
|
|
* Processes the given entry.
|
|
*
|
|
* @param entry
|
|
* the entry
|
|
* @throws Throwable
|
|
* on any error
|
|
*/
|
|
public void process(T entry) throws Throwable;
|
|
|
|
/**
|
|
* Callback to allow thread cleanup after the work entries have been
|
|
* {@link #process(Object) processed}.
|
|
* Typically, this will involve cleanup of authentication and resetting any
|
|
* system flags previously set.
|
|
* <p/>
|
|
* This call is made regardless of the outcome of the entry processing.
|
|
*/
|
|
public void afterProcess() throws Throwable;
|
|
}
|
|
|
|
/**
|
|
* Adaptor that allows implementations to only implement {@link #process(Object)}
|
|
*/
|
|
public static abstract class BatchProcessWorkerAdaptor<TT> implements BatchProcessWorker<TT>
|
|
{
|
|
/**
|
|
* @return Returns the <code>toString()</code> of the entry
|
|
*/
|
|
public String getIdentifier(TT entry)
|
|
{
|
|
return entry.toString();
|
|
}
|
|
/** No-op */
|
|
public void beforeProcess() throws Throwable
|
|
{
|
|
}
|
|
/** No-op */
|
|
public void afterProcess() throws Throwable
|
|
{
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Small iterator that repeatedly gets the next batch of work from a {@link BatchProcessWorkProvider}
|
|
|
|
* @author Derek Hulley
|
|
*/
|
|
private static class WorkProviderIterator<T> implements Iterator<T>
|
|
{
|
|
private BatchProcessWorkProvider<T> workProvider;
|
|
private Iterator<T> currentIterator;
|
|
|
|
private WorkProviderIterator(BatchProcessWorkProvider<T> workProvider)
|
|
{
|
|
this.workProvider = workProvider;
|
|
}
|
|
|
|
public boolean hasNext()
|
|
{
|
|
boolean hasNext = false;
|
|
if (workProvider == null)
|
|
{
|
|
// The workProvider was exhausted
|
|
hasNext = false;
|
|
}
|
|
else
|
|
{
|
|
if (currentIterator != null)
|
|
{
|
|
// See if there there is any more on this specific iterator
|
|
hasNext = currentIterator.hasNext();
|
|
}
|
|
|
|
// If we don't have a next (remember that the workProvider is still available)
|
|
// go and get more results
|
|
if (!hasNext)
|
|
{
|
|
Collection<T> nextWork = workProvider.getNextWork();
|
|
if (nextWork == null)
|
|
{
|
|
throw new RuntimeException("BatchProcessWorkProvider returned 'null' work: " + workProvider);
|
|
}
|
|
// Check that there are some results at all
|
|
if (nextWork.size() == 0)
|
|
{
|
|
// An empty collection indicates that there are no more results
|
|
workProvider = null;
|
|
currentIterator = null;
|
|
hasNext = false;
|
|
}
|
|
else
|
|
{
|
|
// There were some results, so get a new iterator
|
|
currentIterator = nextWork.iterator();
|
|
hasNext = currentIterator.hasNext();
|
|
}
|
|
}
|
|
}
|
|
return hasNext;
|
|
}
|
|
|
|
public T next()
|
|
{
|
|
if (!hasNext())
|
|
{
|
|
throw new NoSuchElementException();
|
|
}
|
|
return currentIterator.next();
|
|
}
|
|
|
|
public void remove()
|
|
{
|
|
throw new UnsupportedOperationException();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A callback that invokes a worker on a batch, optionally in a new transaction.
|
|
*/
|
|
class TxnCallback extends TransactionListenerAdapter implements RetryingTransactionCallback<Object>, Runnable
|
|
{
|
|
|
|
/**
|
|
* Instantiates a new callback.
|
|
*
|
|
* @param worker
|
|
* the worker
|
|
* @param batch
|
|
* the batch to process
|
|
* @param splitTxns
|
|
* If <code>true</code>, the worker invocation is made in a new transaction.
|
|
*/
|
|
public TxnCallback(int id, BatchProcessWorker<T> worker, List<T> batch, boolean splitTxns)
|
|
{
|
|
this.id = id;
|
|
this.worker = worker;
|
|
this.batch = batch;
|
|
this.splitTxns = splitTxns;
|
|
}
|
|
|
|
private final int id;
|
|
|
|
/** The worker. */
|
|
private final BatchProcessWorker<T> worker;
|
|
|
|
/** The batch. */
|
|
private final List<T> batch;
|
|
|
|
/** If <code>true</code>, the worker invocation is made in a new transaction. */
|
|
private final boolean splitTxns;
|
|
|
|
/** The total number of errors. */
|
|
private int txnErrors;
|
|
|
|
/** The number of successfully processed entries. */
|
|
private int txnSuccesses;
|
|
|
|
/** The current entry being processed in the transaction */
|
|
private String txnEntryId;
|
|
|
|
/** The last error. */
|
|
private Throwable txnLastError;
|
|
|
|
/** The last error entry id. */
|
|
private String txnLastErrorEntryId;
|
|
|
|
public Object execute() throws Throwable
|
|
{
|
|
reset();
|
|
if (this.batch.isEmpty())
|
|
{
|
|
return null;
|
|
}
|
|
|
|
// Bind this instance to the transaction
|
|
AlfrescoTransactionSupport.bindListener(this);
|
|
|
|
synchronized (BatchProcessor.this)
|
|
{
|
|
if (BatchProcessor.this.logger.isDebugEnabled())
|
|
{
|
|
BatchProcessor.this.logger.debug("RETRY TXNS: " + BatchProcessor.this.retryTxns);
|
|
}
|
|
// If we are retrying after failure, assume there are cross-dependencies and wait for other
|
|
// executing batches to complete
|
|
while (!BatchProcessor.this.retryTxns.isEmpty()
|
|
&& (BatchProcessor.this.retryTxns.first() < this.id || BatchProcessor.this.retryTxns.first() == this.id
|
|
&& BatchProcessor.this.executingCount > 0)
|
|
&& BatchProcessor.this.retryTxns.last() >= this.id)
|
|
{
|
|
if (BatchProcessor.this.logger.isDebugEnabled())
|
|
{
|
|
BatchProcessor.this.logger.debug(Thread.currentThread().getName()
|
|
+ " Recoverable failure: waiting for other batches to complete");
|
|
}
|
|
BatchProcessor.this.wait();
|
|
}
|
|
if (BatchProcessor.this.logger.isDebugEnabled())
|
|
{
|
|
BatchProcessor.this.logger.debug(Thread.currentThread().getName() + " ready to execute");
|
|
}
|
|
BatchProcessor.this.currentEntryId = this.worker.getIdentifier(this.batch.get(0));
|
|
BatchProcessor.this.executingCount++;
|
|
}
|
|
|
|
for (T entry : this.batch)
|
|
{
|
|
this.txnEntryId = this.worker.getIdentifier(entry);
|
|
try
|
|
{
|
|
this.worker.process(entry);
|
|
this.txnSuccesses++;
|
|
}
|
|
catch (Throwable t)
|
|
{
|
|
if (RetryingTransactionHelper.extractRetryCause(t) == null)
|
|
{
|
|
if (BatchProcessor.this.logger.isWarnEnabled())
|
|
{
|
|
BatchProcessor.this.logger.warn(getProcessName() + ": Failed to process entry \""
|
|
+ this.txnEntryId + "\".", t);
|
|
}
|
|
this.txnLastError = t;
|
|
this.txnLastErrorEntryId = this.txnEntryId;
|
|
this.txnErrors++;
|
|
}
|
|
else
|
|
{
|
|
// Next time we retry, we will wait for other executing batches to complete
|
|
throw t;
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
public void run()
|
|
{
|
|
try
|
|
{
|
|
}
|
|
catch (Throwable e)
|
|
{
|
|
BatchProcessor.this.logger.error("Failed to cleanup Worker after processing.", e);
|
|
}
|
|
|
|
|
|
final BatchProcessor<T>.TxnCallback callback = this;
|
|
try
|
|
{
|
|
Throwable tt = null;
|
|
worker.beforeProcess();
|
|
try
|
|
{
|
|
BatchProcessor.this.retryingTransactionHelper.doInTransaction(callback, false, splitTxns);
|
|
}
|
|
catch (Throwable t)
|
|
{
|
|
// Keep this and rethrow
|
|
tt = t;
|
|
}
|
|
worker.afterProcess();
|
|
// Throw if there was a processing exception
|
|
if (tt != null)
|
|
{
|
|
throw tt;
|
|
}
|
|
}
|
|
catch (Throwable t)
|
|
{
|
|
// If the callback was in its own transaction, it must have run out of retries
|
|
if (this.splitTxns)
|
|
{
|
|
this.txnLastError = t;
|
|
this.txnLastErrorEntryId = (t instanceof IntegrityException) ? "unknown" : this.txnEntryId;
|
|
this.txnErrors++;
|
|
if (BatchProcessor.this.logger.isWarnEnabled())
|
|
{
|
|
String message = (t instanceof IntegrityException) ? ": Failed on batch commit." : ": Failed to process entry \""
|
|
+ this.txnEntryId + "\".";
|
|
BatchProcessor.this.logger.warn(getProcessName() + message, t);
|
|
}
|
|
}
|
|
// Otherwise, we have a retryable exception that we should propagate
|
|
else
|
|
{
|
|
if (t instanceof RuntimeException)
|
|
{
|
|
throw (RuntimeException) t;
|
|
}
|
|
if (t instanceof Error)
|
|
{
|
|
throw (Error) t;
|
|
}
|
|
throw new AlfrescoRuntimeException("Transactional error during " + getProcessName(), t);
|
|
}
|
|
}
|
|
|
|
commitProgress();
|
|
}
|
|
|
|
/**
|
|
* Resets the callback state for a retry.
|
|
*/
|
|
private void reset()
|
|
{
|
|
this.txnLastError = null;
|
|
this.txnLastErrorEntryId = null;
|
|
this.txnSuccesses = this.txnErrors = 0;
|
|
}
|
|
|
|
/**
|
|
* Commits progress from this transaction after a successful commit.
|
|
*/
|
|
private void commitProgress()
|
|
{
|
|
synchronized (BatchProcessor.this)
|
|
{
|
|
if (this.txnErrors > 0)
|
|
{
|
|
int processed = BatchProcessor.this.successfullyProcessedEntries + BatchProcessor.this.totalErrors;
|
|
int currentIncrement = processed % BatchProcessor.this.loggingInterval;
|
|
int newErrors = BatchProcessor.this.totalErrors + this.txnErrors;
|
|
// Work out the number of logging intervals we will cross and report them
|
|
int intervals = (this.txnErrors + currentIncrement) / BatchProcessor.this.loggingInterval;
|
|
if (intervals > 0)
|
|
{
|
|
BatchProcessor.this.totalErrors += BatchProcessor.this.loggingInterval - currentIncrement;
|
|
reportProgress(false);
|
|
while (--intervals > 0)
|
|
{
|
|
BatchProcessor.this.totalErrors += BatchProcessor.this.loggingInterval;
|
|
reportProgress(false);
|
|
}
|
|
}
|
|
BatchProcessor.this.totalErrors = newErrors;
|
|
}
|
|
|
|
if (this.txnSuccesses > 0)
|
|
{
|
|
int processed = BatchProcessor.this.successfullyProcessedEntries + BatchProcessor.this.totalErrors;
|
|
int currentIncrement = processed % BatchProcessor.this.loggingInterval;
|
|
int newSuccess = BatchProcessor.this.successfullyProcessedEntries + this.txnSuccesses;
|
|
// Work out the number of logging intervals we will cross and report them
|
|
int intervals = (this.txnSuccesses + currentIncrement) / BatchProcessor.this.loggingInterval;
|
|
if (intervals > 0)
|
|
{
|
|
BatchProcessor.this.successfullyProcessedEntries += BatchProcessor.this.loggingInterval
|
|
- currentIncrement;
|
|
reportProgress(false);
|
|
while (--intervals > 0)
|
|
{
|
|
BatchProcessor.this.successfullyProcessedEntries += BatchProcessor.this.loggingInterval;
|
|
reportProgress(false);
|
|
}
|
|
}
|
|
BatchProcessor.this.successfullyProcessedEntries = newSuccess;
|
|
}
|
|
|
|
if (this.txnLastError != null)
|
|
{
|
|
BatchProcessor.this.lastError = this.txnLastError;
|
|
BatchProcessor.this.lastErrorEntryId = this.txnLastErrorEntryId;
|
|
}
|
|
|
|
reset();
|
|
|
|
// Make sure we don't wait for a failing transaction
|
|
BatchProcessor.this.retryTxns.remove(this.id);
|
|
BatchProcessor.this.notifyAll();
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void afterCommit()
|
|
{
|
|
// Wake up any waiting batches
|
|
synchronized (BatchProcessor.this)
|
|
{
|
|
BatchProcessor.this.executingCount--;
|
|
// We do the final notifications in commitProgress so we can handle a transaction ending in a rollback
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void afterRollback()
|
|
{
|
|
// Wake up any waiting batches
|
|
synchronized (BatchProcessor.this)
|
|
{
|
|
BatchProcessor.this.executingCount--;
|
|
BatchProcessor.this.retryTxns.add(this.id);
|
|
BatchProcessor.this.notifyAll();
|
|
}
|
|
}
|
|
}
|
|
}
|