From db400934883a74cd4a81e97a0abaf99fb6730d3b Mon Sep 17 00:00:00 2001 From: Derek Hulley Date: Tue, 22 Jun 2010 13:38:09 +0000 Subject: [PATCH] BatchProcessor is fed work by a BatchProcessWorkProvider - Avoids necessity to supply full or clever Collection git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@20752 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261 --- .../repo/batch/BatchProcessWorkProvider.java | 51 +++++ .../alfresco/repo/batch/BatchProcessor.java | 201 ++++++++++++------ 2 files changed, 191 insertions(+), 61 deletions(-) create mode 100644 source/java/org/alfresco/repo/batch/BatchProcessWorkProvider.java diff --git a/source/java/org/alfresco/repo/batch/BatchProcessWorkProvider.java b/source/java/org/alfresco/repo/batch/BatchProcessWorkProvider.java new file mode 100644 index 0000000000..92f541da72 --- /dev/null +++ b/source/java/org/alfresco/repo/batch/BatchProcessWorkProvider.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2005-2010 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.repo.batch; + +import java.util.Collection; + +/** + * An interface that provides work loads to the {@link BatchProcessor}. + * + * @author Derek Hulley + * @since 3.4 + */ +public interface BatchProcessWorkProvider +{ + /** + * Get an estimate of the total number of objects that will be provided by this instance. + * Instances can provide accurate answers on each call, but only if the answer can be + * provided quickly and efficiently; usually it is enough to to cache the result after + * providing an initial estimate. + * + * @return a total work size estimate + */ + int getTotalEstimatedWorkSize(); + + /** + * Get the next lot of work for the batch processor. Implementations should return + * the largest number of entries possible; the {@link BatchProcessor} will keep calling + * this method until it has enough work for the individual worker threads to process + * or until the work load is empty. + * + * @return the next set of work object to process or an empty collection + * if there is no more work remaining. + */ + Collection getNextWork(); +} diff --git a/source/java/org/alfresco/repo/batch/BatchProcessor.java b/source/java/org/alfresco/repo/batch/BatchProcessor.java index 3d4bf7930e..6f61e31ece 100644 --- a/source/java/org/alfresco/repo/batch/BatchProcessor.java +++ b/source/java/org/alfresco/repo/batch/BatchProcessor.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -65,8 +66,8 @@ public class BatchProcessor implements BatchMonitor /** The retrying transaction helper. */ private final RetryingTransactionHelper retryingTransactionHelper; - /** The collection. */ - private final Collection collection; + /** The source of the work being done. */ + private BatchProcessWorkProvider workProvider; /** The process name. */ private final String processName; @@ -77,9 +78,9 @@ public class BatchProcessor implements BatchMonitor /** The number of worker threads. */ private final int workerThreads; - /** The number of entries we process at a time in a transaction *. */ + /** The number of entries we process at a time in a transaction. */ private final int batchSize; - + /** The current entry id. */ private String currentEntryId; @@ -104,26 +105,6 @@ public class BatchProcessor implements BatchMonitor /** The end time. */ private Date endTime; - /** - * Instantiates a new batch processor using a the default logger, which references - * this class as the log category. - * - * @see #BatchProcessor(String, RetryingTransactionHelper, Collection, int, int, ApplicationEventPublisher, Log, int) - */ - public BatchProcessor( - String processName, - RetryingTransactionHelper retryingTransactionHelper, - Collection collection, - int workerThreads, int batchSize) - { - this( - processName, - retryingTransactionHelper, - collection, - workerThreads, - batchSize, null, null, 1); - } - /** * Instantiates a new batch processor. * @@ -143,11 +124,62 @@ public class BatchProcessor implements BatchMonitor * the logger to use (may be null) * @param loggingInterval * the number of entries to process before reporting progress + * + * @deprecated Since 3.4, use the {@link BatchProcessWorkProvider} instead of the Collection */ public BatchProcessor( String processName, RetryingTransactionHelper retryingTransactionHelper, - Collection collection, + final Collection collection, + int workerThreads, int batchSize, + ApplicationEventPublisher applicationEventPublisher, + Log logger, + int loggingInterval) + { + this( + processName, + retryingTransactionHelper, + new BatchProcessWorkProvider() + { + public int getTotalEstimatedWorkSize() + { + return collection.size(); + } + public Collection getNextWork() + { + return collection; + } + }, + workerThreads, batchSize, + applicationEventPublisher, logger, loggingInterval); + } + + /** + * Instantiates a new batch processor. + * + * @param processName + * the process name + * @param retryingTransactionHelper + * the retrying transaction helper + * @param workProvider + * the object providing the work packets + * @param workerThreads + * the number of worker threads + * @param batchSize + * the number of entries we process at a time in a transaction + * @param applicationEventPublisher + * the application event publisher (may be null) + * @param logger + * the logger to use (may be null) + * @param loggingInterval + * the number of entries to process before reporting progress + * + * @since 3.4 + */ + public BatchProcessor( + String processName, + RetryingTransactionHelper retryingTransactionHelper, + BatchProcessWorkProvider workProvider, int workerThreads, int batchSize, ApplicationEventPublisher applicationEventPublisher, Log logger, @@ -159,7 +191,7 @@ public class BatchProcessor implements BatchMonitor this.processName = processName; this.retryingTransactionHelper = retryingTransactionHelper; - this.collection = collection; + this.workProvider = workProvider; this.workerThreads = workerThreads; this.batchSize = batchSize; if (logger == null) @@ -179,18 +211,16 @@ public class BatchProcessor implements BatchMonitor } } - /* - * (non-Javadoc) - * @see org.alfresco.repo.security.sync.BatchMonitor#getCurrentEntryId() + /** + * {@inheritDoc} */ public synchronized String getCurrentEntryId() { return this.currentEntryId; } - /* - * (non-Javadoc) - * @see org.alfresco.repo.security.sync.BatchMonitor#getLastError() + /** + * {@inheritDoc} */ public synchronized String getLastError() { @@ -205,75 +235,67 @@ public class BatchProcessor implements BatchMonitor return buff.toString(); } - /* - * (non-Javadoc) - * @see org.alfresco.repo.security.sync.BatchMonitor#getLastErrorEntryId() + /** + * {@inheritDoc} */ public synchronized String getLastErrorEntryId() { return this.lastErrorEntryId; } - /* - * (non-Javadoc) - * @see org.alfresco.repo.security.sync.BatchMonitor#getBatchType() + /** + * {@inheritDoc} */ public synchronized String getProcessName() { return this.processName; } - /* - * (non-Javadoc) - * @see org.alfresco.repo.security.sync.BatchMonitor#getSuccessfullyProcessedResults() + /** + * {@inheritDoc} */ public synchronized int getSuccessfullyProcessedEntries() { return this.successfullyProcessedEntries; } - /* - * (non-Javadoc) - * @see org.alfresco.repo.security.sync.BatchMonitor#getPercentComplete() + /** + * {@inheritDoc} */ public synchronized String getPercentComplete() { - int totalResults = this.collection.size(); + int totalResults = this.workProvider.getTotalEstimatedWorkSize(); int processed = this.successfullyProcessedEntries + this.totalErrors; return processed <= totalResults ? NumberFormat.getPercentInstance().format( totalResults == 0 ? 1.0F : (float) processed / totalResults) : "Unknown"; } - /* - * (non-Javadoc) - * @see org.alfresco.repo.security.sync.BatchMonitor#getTotalErrors() + /** + * {@inheritDoc} */ public synchronized int getTotalErrors() { return this.totalErrors; } - /* - * (non-Javadoc) - * @see org.alfresco.repo.security.sync.BatchMonitor#getTotalResults() + /** + * {@inheritDoc} */ public int getTotalResults() { - return this.collection.size(); + return this.workProvider.getTotalEstimatedWorkSize(); } - /* - * (non-Javadoc) - * @see org.alfresco.repo.security.sync.BatchMonitor#getEndTime() + /** + * {@inheritDoc} */ public synchronized Date getEndTime() { return this.endTime; } - /* - * (non-Javadoc) - * @see org.alfresco.repo.security.sync.BatchMonitor#getStartTime() + /** + * {@inheritDoc} */ public synchronized Date getStartTime() { @@ -297,7 +319,7 @@ public class BatchProcessor implements BatchMonitor @SuppressWarnings("serial") public int process(final BatchProcessWorker worker, final boolean splitTxns) { - int count = this.collection.size(); + int count = workProvider.getTotalEstimatedWorkSize(); synchronized (this) { this.startTime = new Date(); @@ -340,7 +362,7 @@ public class BatchProcessor implements BatchMonitor threadFactory) : null; try { - Iterator iterator = this.collection.iterator(); + Iterator iterator = new WorkProviderIterator(this.workProvider); List batch = new ArrayList(this.batchSize); while (iterator.hasNext()) { @@ -420,7 +442,7 @@ public class BatchProcessor implements BatchMonitor { StringBuilder message = new StringBuilder(100).append(getProcessName()).append(": Processed ").append( processed).append(" entries"); - int totalResults = this.collection.size(); + int totalResults = this.workProvider.getTotalEstimatedWorkSize(); if (totalResults >= processed) { message.append(" out of ").append(totalResults).append(". ").append( @@ -501,6 +523,63 @@ public class BatchProcessor implements BatchMonitor { } } + + /** + * Small iterator that repeatedly gets the next batch of work from a {@link BatchProcessWorkProvider} + + * @author Derek Hulley + */ + private static class WorkProviderIterator implements Iterator + { + private BatchProcessWorkProvider workProvider; + private Iterator currentIterator; + + private WorkProviderIterator(BatchProcessWorkProvider workProvider) + { + this.workProvider = workProvider; + } + + public boolean hasNext() + { + if (currentIterator == null) + { + if (workProvider != null) + { + Collection nextWork = workProvider.getNextWork(); + if (nextWork == null) + { + throw new RuntimeException("BatchProcessWorkProvider returned 'null' work: " + workProvider); + } + currentIterator = nextWork.iterator(); + } + else + { + // The null workProvider indicates that it was exhausted + } + } + boolean hasNext = (currentIterator == null) ? false : currentIterator.hasNext(); + if (!hasNext) + { + workProvider = null; // No more work to get + currentIterator = null; + } + return hasNext; + } + + public T next() + { + if (!hasNext()) + { + throw new NoSuchElementException(); + } + return currentIterator.next(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + } /** * A callback that invokes a worker on a batch, optionally in a new transaction.