BatchProcessor is fed work by a BatchProcessWorkProvider

- Avoids necessity to supply full or clever Collection


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@20752 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Derek Hulley
2010-06-22 13:38:09 +00:00
parent 7530774bba
commit db40093488
2 changed files with 191 additions and 61 deletions

View File

@@ -0,0 +1,51 @@
/*
* Copyright (C) 2005-2010 Alfresco Software Limited.
*
* This file is part of Alfresco
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
*/
package org.alfresco.repo.batch;
import java.util.Collection;
/**
* An interface that provides work loads to the {@link BatchProcessor}.
*
* @author Derek Hulley
* @since 3.4
*/
public interface BatchProcessWorkProvider<T>
{
/**
* Get an estimate of the total number of objects that will be provided by this instance.
* Instances can provide accurate answers on each call, but only if the answer can be
* provided quickly and efficiently; usually it is enough to to cache the result after
* providing an initial estimate.
*
* @return a total work size estimate
*/
int getTotalEstimatedWorkSize();
/**
* Get the next lot of work for the batch processor. Implementations should return
* the largest number of entries possible; the {@link BatchProcessor} will keep calling
* this method until it has enough work for the individual worker threads to process
* or until the work load is empty.
*
* @return the next set of work object to process or an empty collection
* if there is no more work remaining.
*/
Collection<T> getNextWork();
}

View File

@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -65,8 +66,8 @@ public class BatchProcessor<T> implements BatchMonitor
/** The retrying transaction helper. */
private final RetryingTransactionHelper retryingTransactionHelper;
/** The collection. */
private final Collection<T> collection;
/** The source of the work being done. */
private BatchProcessWorkProvider<T> workProvider;
/** The process name. */
private final String processName;
@@ -77,7 +78,7 @@ public class BatchProcessor<T> implements BatchMonitor
/** The number of worker threads. */
private final int workerThreads;
/** The number of entries we process at a time in a transaction *. */
/** The number of entries we process at a time in a transaction. */
private final int batchSize;
/** The current entry id. */
@@ -104,26 +105,6 @@ public class BatchProcessor<T> implements BatchMonitor
/** The end time. */
private Date endTime;
/**
* Instantiates a new batch processor using a the default logger, which references
* this class as the log category.
*
* @see #BatchProcessor(String, RetryingTransactionHelper, Collection, int, int, ApplicationEventPublisher, Log, int)
*/
public BatchProcessor(
String processName,
RetryingTransactionHelper retryingTransactionHelper,
Collection<T> collection,
int workerThreads, int batchSize)
{
this(
processName,
retryingTransactionHelper,
collection,
workerThreads,
batchSize, null, null, 1);
}
/**
* Instantiates a new batch processor.
*
@@ -143,11 +124,62 @@ public class BatchProcessor<T> implements BatchMonitor
* the logger to use (may be <tt>null</tt>)
* @param loggingInterval
* the number of entries to process before reporting progress
*
* @deprecated Since 3.4, use the {@link BatchProcessWorkProvider} instead of the <tt>Collection</tt>
*/
public BatchProcessor(
String processName,
RetryingTransactionHelper retryingTransactionHelper,
Collection<T> collection,
final Collection<T> collection,
int workerThreads, int batchSize,
ApplicationEventPublisher applicationEventPublisher,
Log logger,
int loggingInterval)
{
this(
processName,
retryingTransactionHelper,
new BatchProcessWorkProvider<T>()
{
public int getTotalEstimatedWorkSize()
{
return collection.size();
}
public Collection<T> getNextWork()
{
return collection;
}
},
workerThreads, batchSize,
applicationEventPublisher, logger, loggingInterval);
}
/**
* Instantiates a new batch processor.
*
* @param processName
* the process name
* @param retryingTransactionHelper
* the retrying transaction helper
* @param workProvider
* the object providing the work packets
* @param workerThreads
* the number of worker threads
* @param batchSize
* the number of entries we process at a time in a transaction
* @param applicationEventPublisher
* the application event publisher (may be <tt>null</tt>)
* @param logger
* the logger to use (may be <tt>null</tt>)
* @param loggingInterval
* the number of entries to process before reporting progress
*
* @since 3.4
*/
public BatchProcessor(
String processName,
RetryingTransactionHelper retryingTransactionHelper,
BatchProcessWorkProvider<T> workProvider,
int workerThreads, int batchSize,
ApplicationEventPublisher applicationEventPublisher,
Log logger,
@@ -159,7 +191,7 @@ public class BatchProcessor<T> implements BatchMonitor
this.processName = processName;
this.retryingTransactionHelper = retryingTransactionHelper;
this.collection = collection;
this.workProvider = workProvider;
this.workerThreads = workerThreads;
this.batchSize = batchSize;
if (logger == null)
@@ -179,18 +211,16 @@ public class BatchProcessor<T> implements BatchMonitor
}
}
/*
* (non-Javadoc)
* @see org.alfresco.repo.security.sync.BatchMonitor#getCurrentEntryId()
/**
* {@inheritDoc}
*/
public synchronized String getCurrentEntryId()
{
return this.currentEntryId;
}
/*
* (non-Javadoc)
* @see org.alfresco.repo.security.sync.BatchMonitor#getLastError()
/**
* {@inheritDoc}
*/
public synchronized String getLastError()
{
@@ -205,75 +235,67 @@ public class BatchProcessor<T> implements BatchMonitor
return buff.toString();
}
/*
* (non-Javadoc)
* @see org.alfresco.repo.security.sync.BatchMonitor#getLastErrorEntryId()
/**
* {@inheritDoc}
*/
public synchronized String getLastErrorEntryId()
{
return this.lastErrorEntryId;
}
/*
* (non-Javadoc)
* @see org.alfresco.repo.security.sync.BatchMonitor#getBatchType()
/**
* {@inheritDoc}
*/
public synchronized String getProcessName()
{
return this.processName;
}
/*
* (non-Javadoc)
* @see org.alfresco.repo.security.sync.BatchMonitor#getSuccessfullyProcessedResults()
/**
* {@inheritDoc}
*/
public synchronized int getSuccessfullyProcessedEntries()
{
return this.successfullyProcessedEntries;
}
/*
* (non-Javadoc)
* @see org.alfresco.repo.security.sync.BatchMonitor#getPercentComplete()
/**
* {@inheritDoc}
*/
public synchronized String getPercentComplete()
{
int totalResults = this.collection.size();
int totalResults = this.workProvider.getTotalEstimatedWorkSize();
int processed = this.successfullyProcessedEntries + this.totalErrors;
return processed <= totalResults ? NumberFormat.getPercentInstance().format(
totalResults == 0 ? 1.0F : (float) processed / totalResults) : "Unknown";
}
/*
* (non-Javadoc)
* @see org.alfresco.repo.security.sync.BatchMonitor#getTotalErrors()
/**
* {@inheritDoc}
*/
public synchronized int getTotalErrors()
{
return this.totalErrors;
}
/*
* (non-Javadoc)
* @see org.alfresco.repo.security.sync.BatchMonitor#getTotalResults()
/**
* {@inheritDoc}
*/
public int getTotalResults()
{
return this.collection.size();
return this.workProvider.getTotalEstimatedWorkSize();
}
/*
* (non-Javadoc)
* @see org.alfresco.repo.security.sync.BatchMonitor#getEndTime()
/**
* {@inheritDoc}
*/
public synchronized Date getEndTime()
{
return this.endTime;
}
/*
* (non-Javadoc)
* @see org.alfresco.repo.security.sync.BatchMonitor#getStartTime()
/**
* {@inheritDoc}
*/
public synchronized Date getStartTime()
{
@@ -297,7 +319,7 @@ public class BatchProcessor<T> implements BatchMonitor
@SuppressWarnings("serial")
public int process(final BatchProcessWorker<T> worker, final boolean splitTxns)
{
int count = this.collection.size();
int count = workProvider.getTotalEstimatedWorkSize();
synchronized (this)
{
this.startTime = new Date();
@@ -340,7 +362,7 @@ public class BatchProcessor<T> implements BatchMonitor
threadFactory) : null;
try
{
Iterator<T> iterator = this.collection.iterator();
Iterator<T> iterator = new WorkProviderIterator<T>(this.workProvider);
List<T> batch = new ArrayList<T>(this.batchSize);
while (iterator.hasNext())
{
@@ -420,7 +442,7 @@ public class BatchProcessor<T> implements BatchMonitor
{
StringBuilder message = new StringBuilder(100).append(getProcessName()).append(": Processed ").append(
processed).append(" entries");
int totalResults = this.collection.size();
int totalResults = this.workProvider.getTotalEstimatedWorkSize();
if (totalResults >= processed)
{
message.append(" out of ").append(totalResults).append(". ").append(
@@ -502,6 +524,63 @@ public class BatchProcessor<T> implements BatchMonitor
}
}
/**
* Small iterator that repeatedly gets the next batch of work from a {@link BatchProcessWorkProvider}
* @author Derek Hulley
*/
private static class WorkProviderIterator<T> implements Iterator<T>
{
private BatchProcessWorkProvider<T> workProvider;
private Iterator<T> currentIterator;
private WorkProviderIterator(BatchProcessWorkProvider<T> workProvider)
{
this.workProvider = workProvider;
}
public boolean hasNext()
{
if (currentIterator == null)
{
if (workProvider != null)
{
Collection<T> nextWork = workProvider.getNextWork();
if (nextWork == null)
{
throw new RuntimeException("BatchProcessWorkProvider returned 'null' work: " + workProvider);
}
currentIterator = nextWork.iterator();
}
else
{
// The null workProvider indicates that it was exhausted
}
}
boolean hasNext = (currentIterator == null) ? false : currentIterator.hasNext();
if (!hasNext)
{
workProvider = null; // No more work to get
currentIterator = null;
}
return hasNext;
}
public T next()
{
if (!hasNext())
{
throw new NoSuchElementException();
}
return currentIterator.next();
}
public void remove()
{
throw new UnsupportedOperationException();
}
}
/**
* A callback that invokes a worker on a batch, optionally in a new transaction.
*/