Fix enabler for ALF-1990: BatchProcessWorkProvider calls are made in new, read-only transactions

- Allows work provider to have visibility of changes made by worker threads
 - Prevents large queries from being held by the controlling transaction


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@23266 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Derek Hulley
2010-10-26 08:06:55 +00:00
parent a23f30a7f7
commit 7ac35ae7fd

View File

@@ -275,7 +275,7 @@ public class BatchProcessor<T> implements BatchMonitor
*/ */
public synchronized String getPercentComplete() public synchronized String getPercentComplete()
{ {
int totalResults = this.workProvider.getTotalEstimatedWorkSize(); int totalResults = getTotalResults();
int processed = this.successfullyProcessedEntries + this.totalErrors; int processed = this.successfullyProcessedEntries + this.totalErrors;
return processed <= totalResults ? NumberFormat.getPercentInstance().format( return processed <= totalResults ? NumberFormat.getPercentInstance().format(
totalResults == 0 ? 1.0F : (float) processed / totalResults) : "Unknown"; totalResults == 0 ? 1.0F : (float) processed / totalResults) : "Unknown";
@@ -294,7 +294,15 @@ public class BatchProcessor<T> implements BatchMonitor
*/ */
public int getTotalResults() public int getTotalResults()
{ {
return this.workProvider.getTotalEstimatedWorkSize(); RetryingTransactionCallback<Integer> callback = new RetryingTransactionCallback<Integer>()
{
@Override
public Integer execute() throws Throwable
{
return workProvider.getTotalEstimatedWorkSize();
}
};
return retryingTransactionHelper.doInTransaction(callback, true, true);
} }
/** /**
@@ -330,7 +338,7 @@ public class BatchProcessor<T> implements BatchMonitor
@SuppressWarnings("serial") @SuppressWarnings("serial")
public int process(final BatchProcessWorker<T> worker, final boolean splitTxns) public int process(final BatchProcessWorker<T> worker, final boolean splitTxns)
{ {
int count = workProvider.getTotalEstimatedWorkSize(); int count = getTotalResults();
synchronized (this) synchronized (this)
{ {
this.startTime = new Date(); this.startTime = new Date();
@@ -373,7 +381,7 @@ public class BatchProcessor<T> implements BatchMonitor
threadFactory) : null; threadFactory) : null;
try try
{ {
Iterator<T> iterator = new WorkProviderIterator<T>(this.workProvider); Iterator<T> iterator = new WorkProviderIterator<T>(workProvider, retryingTransactionHelper);
List<T> batch = new ArrayList<T>(this.batchSize); List<T> batch = new ArrayList<T>(this.batchSize);
while (iterator.hasNext()) while (iterator.hasNext())
{ {
@@ -453,7 +461,7 @@ public class BatchProcessor<T> implements BatchMonitor
{ {
StringBuilder message = new StringBuilder(100).append(getProcessName()).append(": Processed ").append( StringBuilder message = new StringBuilder(100).append(getProcessName()).append(": Processed ").append(
processed).append(" entries"); processed).append(" entries");
int totalResults = this.workProvider.getTotalEstimatedWorkSize(); int totalResults = getTotalResults();
if (totalResults >= processed) if (totalResults >= processed)
{ {
message.append(" out of ").append(totalResults).append(". ").append( message.append(" out of ").append(totalResults).append(". ").append(
@@ -543,11 +551,13 @@ public class BatchProcessor<T> implements BatchMonitor
private static class WorkProviderIterator<T> implements Iterator<T> private static class WorkProviderIterator<T> implements Iterator<T>
{ {
private BatchProcessWorkProvider<T> workProvider; private BatchProcessWorkProvider<T> workProvider;
private final RetryingTransactionHelper txnHelper;
private Iterator<T> currentIterator; private Iterator<T> currentIterator;
private WorkProviderIterator(BatchProcessWorkProvider<T> workProvider) private WorkProviderIterator(BatchProcessWorkProvider<T> workProvider, RetryingTransactionHelper txnHelper)
{ {
this.workProvider = workProvider; this.workProvider = workProvider;
this.txnHelper = txnHelper;
} }
public boolean hasNext() public boolean hasNext()
@@ -570,7 +580,15 @@ public class BatchProcessor<T> implements BatchMonitor
// go and get more results // go and get more results
if (!hasNext) if (!hasNext)
{ {
Collection<T> nextWork = workProvider.getNextWork(); RetryingTransactionCallback<Collection<T>> callback = new RetryingTransactionCallback<Collection<T>>()
{
@Override
public Collection<T> execute() throws Throwable
{
return workProvider.getNextWork();
}
};
Collection<T> nextWork = txnHelper.doInTransaction(callback, true, true);
if (nextWork == null) if (nextWork == null)
{ {
throw new RuntimeException("BatchProcessWorkProvider returned 'null' work: " + workProvider); throw new RuntimeException("BatchProcessWorkProvider returned 'null' work: " + workProvider);