diff --git a/source/java/org/alfresco/repo/batch/BatchProcessWorkProvider.java b/source/java/org/alfresco/repo/batch/BatchProcessWorkProvider.java
new file mode 100644
index 0000000000..92f541da72
--- /dev/null
+++ b/source/java/org/alfresco/repo/batch/BatchProcessWorkProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2005-2010 Alfresco Software Limited.
+ *
+ * This file is part of Alfresco
+ *
+ * Alfresco is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Alfresco is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with Alfresco. If not, see .
+ */
+package org.alfresco.repo.batch;
+
+import java.util.Collection;
+
+/**
+ * An interface that provides work loads to the {@link BatchProcessor}.
+ *
+ * @author Derek Hulley
+ * @since 3.4
+ */
+public interface BatchProcessWorkProvider
+{
+ /**
+ * Get an estimate of the total number of objects that will be provided by this instance.
+ * Instances can provide accurate answers on each call, but only if the answer can be
+ * provided quickly and efficiently; usually it is enough to to cache the result after
+ * providing an initial estimate.
+ *
+ * @return a total work size estimate
+ */
+ int getTotalEstimatedWorkSize();
+
+ /**
+ * Get the next lot of work for the batch processor. Implementations should return
+ * the largest number of entries possible; the {@link BatchProcessor} will keep calling
+ * this method until it has enough work for the individual worker threads to process
+ * or until the work load is empty.
+ *
+ * @return the next set of work object to process or an empty collection
+ * if there is no more work remaining.
+ */
+ Collection getNextWork();
+}
diff --git a/source/java/org/alfresco/repo/batch/BatchProcessor.java b/source/java/org/alfresco/repo/batch/BatchProcessor.java
index 3d4bf7930e..6f61e31ece 100644
--- a/source/java/org/alfresco/repo/batch/BatchProcessor.java
+++ b/source/java/org/alfresco/repo/batch/BatchProcessor.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -65,8 +66,8 @@ public class BatchProcessor implements BatchMonitor
/** The retrying transaction helper. */
private final RetryingTransactionHelper retryingTransactionHelper;
- /** The collection. */
- private final Collection collection;
+ /** The source of the work being done. */
+ private BatchProcessWorkProvider workProvider;
/** The process name. */
private final String processName;
@@ -77,9 +78,9 @@ public class BatchProcessor implements BatchMonitor
/** The number of worker threads. */
private final int workerThreads;
- /** The number of entries we process at a time in a transaction *. */
+ /** The number of entries we process at a time in a transaction. */
private final int batchSize;
-
+
/** The current entry id. */
private String currentEntryId;
@@ -104,26 +105,6 @@ public class BatchProcessor implements BatchMonitor
/** The end time. */
private Date endTime;
- /**
- * Instantiates a new batch processor using a the default logger, which references
- * this class as the log category.
- *
- * @see #BatchProcessor(String, RetryingTransactionHelper, Collection, int, int, ApplicationEventPublisher, Log, int)
- */
- public BatchProcessor(
- String processName,
- RetryingTransactionHelper retryingTransactionHelper,
- Collection collection,
- int workerThreads, int batchSize)
- {
- this(
- processName,
- retryingTransactionHelper,
- collection,
- workerThreads,
- batchSize, null, null, 1);
- }
-
/**
* Instantiates a new batch processor.
*
@@ -143,11 +124,62 @@ public class BatchProcessor implements BatchMonitor
* the logger to use (may be null)
* @param loggingInterval
* the number of entries to process before reporting progress
+ *
+ * @deprecated Since 3.4, use the {@link BatchProcessWorkProvider} instead of the Collection
*/
public BatchProcessor(
String processName,
RetryingTransactionHelper retryingTransactionHelper,
- Collection collection,
+ final Collection collection,
+ int workerThreads, int batchSize,
+ ApplicationEventPublisher applicationEventPublisher,
+ Log logger,
+ int loggingInterval)
+ {
+ this(
+ processName,
+ retryingTransactionHelper,
+ new BatchProcessWorkProvider()
+ {
+ public int getTotalEstimatedWorkSize()
+ {
+ return collection.size();
+ }
+ public Collection getNextWork()
+ {
+ return collection;
+ }
+ },
+ workerThreads, batchSize,
+ applicationEventPublisher, logger, loggingInterval);
+ }
+
+ /**
+ * Instantiates a new batch processor.
+ *
+ * @param processName
+ * the process name
+ * @param retryingTransactionHelper
+ * the retrying transaction helper
+ * @param workProvider
+ * the object providing the work packets
+ * @param workerThreads
+ * the number of worker threads
+ * @param batchSize
+ * the number of entries we process at a time in a transaction
+ * @param applicationEventPublisher
+ * the application event publisher (may be null)
+ * @param logger
+ * the logger to use (may be null)
+ * @param loggingInterval
+ * the number of entries to process before reporting progress
+ *
+ * @since 3.4
+ */
+ public BatchProcessor(
+ String processName,
+ RetryingTransactionHelper retryingTransactionHelper,
+ BatchProcessWorkProvider workProvider,
int workerThreads, int batchSize,
ApplicationEventPublisher applicationEventPublisher,
Log logger,
@@ -159,7 +191,7 @@ public class BatchProcessor implements BatchMonitor
this.processName = processName;
this.retryingTransactionHelper = retryingTransactionHelper;
- this.collection = collection;
+ this.workProvider = workProvider;
this.workerThreads = workerThreads;
this.batchSize = batchSize;
if (logger == null)
@@ -179,18 +211,16 @@ public class BatchProcessor implements BatchMonitor
}
}
- /*
- * (non-Javadoc)
- * @see org.alfresco.repo.security.sync.BatchMonitor#getCurrentEntryId()
+ /**
+ * {@inheritDoc}
*/
public synchronized String getCurrentEntryId()
{
return this.currentEntryId;
}
- /*
- * (non-Javadoc)
- * @see org.alfresco.repo.security.sync.BatchMonitor#getLastError()
+ /**
+ * {@inheritDoc}
*/
public synchronized String getLastError()
{
@@ -205,75 +235,67 @@ public class BatchProcessor implements BatchMonitor
return buff.toString();
}
- /*
- * (non-Javadoc)
- * @see org.alfresco.repo.security.sync.BatchMonitor#getLastErrorEntryId()
+ /**
+ * {@inheritDoc}
*/
public synchronized String getLastErrorEntryId()
{
return this.lastErrorEntryId;
}
- /*
- * (non-Javadoc)
- * @see org.alfresco.repo.security.sync.BatchMonitor#getBatchType()
+ /**
+ * {@inheritDoc}
*/
public synchronized String getProcessName()
{
return this.processName;
}
- /*
- * (non-Javadoc)
- * @see org.alfresco.repo.security.sync.BatchMonitor#getSuccessfullyProcessedResults()
+ /**
+ * {@inheritDoc}
*/
public synchronized int getSuccessfullyProcessedEntries()
{
return this.successfullyProcessedEntries;
}
- /*
- * (non-Javadoc)
- * @see org.alfresco.repo.security.sync.BatchMonitor#getPercentComplete()
+ /**
+ * {@inheritDoc}
*/
public synchronized String getPercentComplete()
{
- int totalResults = this.collection.size();
+ int totalResults = this.workProvider.getTotalEstimatedWorkSize();
int processed = this.successfullyProcessedEntries + this.totalErrors;
return processed <= totalResults ? NumberFormat.getPercentInstance().format(
totalResults == 0 ? 1.0F : (float) processed / totalResults) : "Unknown";
}
- /*
- * (non-Javadoc)
- * @see org.alfresco.repo.security.sync.BatchMonitor#getTotalErrors()
+ /**
+ * {@inheritDoc}
*/
public synchronized int getTotalErrors()
{
return this.totalErrors;
}
- /*
- * (non-Javadoc)
- * @see org.alfresco.repo.security.sync.BatchMonitor#getTotalResults()
+ /**
+ * {@inheritDoc}
*/
public int getTotalResults()
{
- return this.collection.size();
+ return this.workProvider.getTotalEstimatedWorkSize();
}
- /*
- * (non-Javadoc)
- * @see org.alfresco.repo.security.sync.BatchMonitor#getEndTime()
+ /**
+ * {@inheritDoc}
*/
public synchronized Date getEndTime()
{
return this.endTime;
}
- /*
- * (non-Javadoc)
- * @see org.alfresco.repo.security.sync.BatchMonitor#getStartTime()
+ /**
+ * {@inheritDoc}
*/
public synchronized Date getStartTime()
{
@@ -297,7 +319,7 @@ public class BatchProcessor implements BatchMonitor
@SuppressWarnings("serial")
public int process(final BatchProcessWorker worker, final boolean splitTxns)
{
- int count = this.collection.size();
+ int count = workProvider.getTotalEstimatedWorkSize();
synchronized (this)
{
this.startTime = new Date();
@@ -340,7 +362,7 @@ public class BatchProcessor implements BatchMonitor
threadFactory) : null;
try
{
- Iterator iterator = this.collection.iterator();
+ Iterator iterator = new WorkProviderIterator(this.workProvider);
List batch = new ArrayList(this.batchSize);
while (iterator.hasNext())
{
@@ -420,7 +442,7 @@ public class BatchProcessor implements BatchMonitor
{
StringBuilder message = new StringBuilder(100).append(getProcessName()).append(": Processed ").append(
processed).append(" entries");
- int totalResults = this.collection.size();
+ int totalResults = this.workProvider.getTotalEstimatedWorkSize();
if (totalResults >= processed)
{
message.append(" out of ").append(totalResults).append(". ").append(
@@ -501,6 +523,63 @@ public class BatchProcessor implements BatchMonitor
{
}
}
+
+ /**
+ * Small iterator that repeatedly gets the next batch of work from a {@link BatchProcessWorkProvider}
+
+ * @author Derek Hulley
+ */
+ private static class WorkProviderIterator implements Iterator
+ {
+ private BatchProcessWorkProvider workProvider;
+ private Iterator currentIterator;
+
+ private WorkProviderIterator(BatchProcessWorkProvider workProvider)
+ {
+ this.workProvider = workProvider;
+ }
+
+ public boolean hasNext()
+ {
+ if (currentIterator == null)
+ {
+ if (workProvider != null)
+ {
+ Collection nextWork = workProvider.getNextWork();
+ if (nextWork == null)
+ {
+ throw new RuntimeException("BatchProcessWorkProvider returned 'null' work: " + workProvider);
+ }
+ currentIterator = nextWork.iterator();
+ }
+ else
+ {
+ // The null workProvider indicates that it was exhausted
+ }
+ }
+ boolean hasNext = (currentIterator == null) ? false : currentIterator.hasNext();
+ if (!hasNext)
+ {
+ workProvider = null; // No more work to get
+ currentIterator = null;
+ }
+ return hasNext;
+ }
+
+ public T next()
+ {
+ if (!hasNext())
+ {
+ throw new NoSuchElementException();
+ }
+ return currentIterator.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
/**
* A callback that invokes a worker on a batch, optionally in a new transaction.