ACS-7587 Reimplement BulkStatusUpdater [ags][tas]

This commit is contained in:
Damian Ujma
2024-05-16 17:16:47 +02:00
parent 592dc35b6d
commit 0de1aca0f6
6 changed files with 99 additions and 113 deletions

View File

@@ -43,7 +43,6 @@ import org.alfresco.service.transaction.TransactionService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationEventPublisher;
/**
* A base class for executing bulk operations on nodes based on search query results
@@ -57,7 +56,6 @@ public abstract class BulkBaseService<T> implements InitializingBean
protected TransactionService transactionService;
protected SearchMapper searchMapper;
protected BulkMonitor<T> bulkMonitor;
protected ApplicationEventPublisher applicationEventPublisher;
protected int threadCount;
protected int batchSize;
@@ -96,30 +94,28 @@ public abstract class BulkBaseService<T> implements InitializingBean
bulkMonitor.registerProcess(nodeRef, processId);
BatchProcessWorker<NodeRef> batchProcessWorker = getWorkerProvider(nodeRef, bulkOperation);
BulkTaskContainer bulkTaskContainer = getBulkTaskContainer();
BulkStatusUpdater bulkStatusUpdater = getBulkStatusUpdater();
BatchProcessor<NodeRef> batchProcessor = new BatchProcessor<>(
processId,
transactionService.getRetryingTransactionHelper(),
getWorkProvider(bulkOperation, totalItems, bulkTaskContainer),
getWorkProvider(bulkOperation, totalItems, bulkStatusUpdater),
threadCount,
itemsPerTransaction,
applicationEventPublisher,
bulkStatusUpdater,
logger,
loggingIntervalMs);
runAsyncBatchProcessor(batchProcessor, batchProcessWorker, bulkTaskContainer);
runAsyncBatchProcessor(batchProcessor, batchProcessWorker, bulkStatusUpdater);
return initBulkStatus;
}
/**
* Run batch processor and set a bulk task
* Run batch processor
*/
protected void runAsyncBatchProcessor(BatchProcessor<NodeRef> batchProcessor,
BatchProcessWorker<NodeRef> batchProcessWorker, BulkTaskContainer bulkTaskContainer)
BatchProcessWorker<NodeRef> batchProcessWorker, BulkStatusUpdater bulkStatusUpdater)
{
bulkTaskContainer.setTask(getTask(batchProcessor, bulkMonitor));
Runnable backgroundLogic = () -> {
try
{
@@ -139,7 +135,7 @@ public abstract class BulkBaseService<T> implements InitializingBean
}
finally
{
bulkTaskContainer.runTask();
bulkStatusUpdater.update();
}
};
@@ -157,30 +153,22 @@ public abstract class BulkBaseService<T> implements InitializingBean
protected abstract T getInitBulkStatus(String processId, long totalItems);
/**
* Get bulk task container
* Get bulk status updater
*
* @return task container
*/
protected abstract BulkTaskContainer getBulkTaskContainer();
/**
* Get task
*
* @param batchProcessor
* @param monitor
* @return
*/
protected abstract Runnable getTask(BatchProcessor<NodeRef> batchProcessor, BulkMonitor<T> monitor);
protected abstract BulkStatusUpdater getBulkStatusUpdater();
/**
* Get work provider
*
* @param bulkOperation bulk operation
* @param totalItems total items
* @param bulkOperation bulk operation
* @param totalItems total items
* @param bulkStatusUpdater bulk status updater
* @return work provider
*/
protected abstract BatchProcessWorkProvider<NodeRef> getWorkProvider(BulkOperation bulkOperation, long totalItems,
BulkTaskContainer bulkTaskContainer);
BulkStatusUpdater bulkStatusUpdater);
/**
* Get worker provider
@@ -209,11 +197,6 @@ public abstract class BulkBaseService<T> implements InitializingBean
return searchService.query(searchParams);
}
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
{
this.applicationEventPublisher = applicationEventPublisher;
}
public void setServiceRegistry(ServiceRegistry serviceRegistry)
{
this.serviceRegistry = serviceRegistry;

View File

@@ -26,20 +26,15 @@
*/
package org.alfresco.module.org_alfresco_module_rm.bulk;
import org.springframework.context.ApplicationEventPublisher;
/**
* An interface for containing a bulk task
* An interface for updating the status of a bulk operation
*/
public interface BulkTaskContainer
public interface BulkStatusUpdater extends ApplicationEventPublisher
{
/**
* Run the task
* Update the bulk status
*/
void runTask();
/**
* Set the task
*
* @param task the task
*/
void setTask(Runnable task);
void update();
}

View File

@@ -31,4 +31,6 @@ import java.io.Serializable;
/**
* A simple immutable POJO to hold the details of a bulk hold process
*/
public record HoldBulkProcessDetails(String bulkStatusId, String creatorInstance) implements Serializable {}
public record HoldBulkProcessDetails(String bulkStatusId, String creatorInstance) implements Serializable
{
}

View File

@@ -35,14 +35,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.alfresco.model.ContentModel;
import org.alfresco.module.org_alfresco_module_rm.bulk.BulkBaseService;
import org.alfresco.module.org_alfresco_module_rm.bulk.BulkMonitor;
import org.alfresco.module.org_alfresco_module_rm.bulk.BulkOperation;
import org.alfresco.module.org_alfresco_module_rm.bulk.BulkTaskContainer;
import org.alfresco.module.org_alfresco_module_rm.bulk.BulkStatusUpdater;
import org.alfresco.module.org_alfresco_module_rm.capability.CapabilityService;
import org.alfresco.module.org_alfresco_module_rm.capability.RMPermissionModel;
import org.alfresco.module.org_alfresco_module_rm.hold.HoldService;
import org.alfresco.repo.batch.BatchProcessWorkProvider;
import org.alfresco.repo.batch.BatchProcessor;
import org.alfresco.repo.batch.BatchProcessor.BatchProcessWorker;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.security.permissions.AccessDeniedException;
@@ -80,28 +78,16 @@ public class HoldBulkServiceImpl extends BulkBaseService<HoldBulkStatus> impleme
}
@Override
protected BulkTaskContainer getBulkTaskContainer()
protected BulkStatusUpdater getBulkStatusUpdater()
{
return new HoldBulkTaskContainer();
}
@Override
protected Runnable getTask(BatchProcessor<NodeRef> batchProcessor,
BulkMonitor<HoldBulkStatus> monitor)
{
return () -> monitor.updateBulkStatus(
new HoldBulkStatus(batchProcessor.getProcessName(), batchProcessor.getStartTime(),
batchProcessor.getEndTime(),
batchProcessor.getSuccessfullyProcessedEntriesLong() + batchProcessor.getTotalErrorsLong(),
batchProcessor.getTotalErrorsLong(), batchProcessor.getTotalResultsLong(),
batchProcessor.getLastError()));
return new HoldBulkStatusUpdater((HoldBulkMonitor) bulkMonitor);
}
@Override
protected BatchProcessWorkProvider<NodeRef> getWorkProvider(BulkOperation bulkOperation, long totalItems,
BulkTaskContainer bulkTaskContainer)
BulkStatusUpdater bulkStatusUpdater)
{
return new AddToHoldWorkerProvider(new AtomicInteger(0), bulkOperation, totalItems, bulkTaskContainer);
return new AddToHoldWorkerProvider(new AtomicInteger(0), bulkOperation, totalItems, bulkStatusUpdater);
}
@Override
@@ -175,15 +161,15 @@ public class HoldBulkServiceImpl extends BulkBaseService<HoldBulkStatus> impleme
private final Query searchQuery;
private final String currentUser;
private final long totalItems;
private final BulkTaskContainer bulkTaskContainer;
private final BulkStatusUpdater bulkStatusUpdater;
public AddToHoldWorkerProvider(AtomicInteger currentNodeNumber, BulkOperation bulkOperation, long totalItems,
BulkTaskContainer bulkTaskContainer)
BulkStatusUpdater bulkStatusUpdater)
{
this.currentNodeNumber = currentNodeNumber;
this.searchQuery = bulkOperation.searchQuery();
this.totalItems = totalItems;
this.bulkTaskContainer = bulkTaskContainer;
this.bulkStatusUpdater = bulkStatusUpdater;
currentUser = AuthenticationUtil.getFullyAuthenticatedUser();
}
@@ -217,7 +203,7 @@ public class HoldBulkServiceImpl extends BulkBaseService<HoldBulkStatus> impleme
searchParams.getSkipCount(), result.getNumberFound());
}
currentNodeNumber.addAndGet(batchSize);
bulkTaskContainer.runTask();
bulkStatusUpdater.update();
return result.getNodeRefs();
}

View File

@@ -0,0 +1,68 @@
/*
* #%L
* Alfresco Records Management Module
* %%
* Copyright (C) 2005 - 2024 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* -
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
* -
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* -
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
* -
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.module.org_alfresco_module_rm.bulk.hold;
import org.alfresco.module.org_alfresco_module_rm.bulk.BulkStatusUpdater;
import org.alfresco.repo.batch.BatchMonitor;
import org.alfresco.repo.batch.BatchMonitorEvent;
import org.alfresco.rm.rest.api.model.HoldBulkStatus;
/**
* An implementation of {@link BulkStatusUpdater} for the hold bulk operation
*/
public class HoldBulkStatusUpdater implements BulkStatusUpdater
{
private final Runnable task;
private BatchMonitor batchMonitor;
public HoldBulkStatusUpdater(HoldBulkMonitor holdBulkMonitor)
{
this.task = () -> holdBulkMonitor.updateBulkStatus(
new HoldBulkStatus(batchMonitor.getProcessName(), batchMonitor.getStartTime(),
batchMonitor.getEndTime(),
batchMonitor.getSuccessfullyProcessedEntriesLong() + batchMonitor.getTotalErrorsLong(),
batchMonitor.getTotalErrorsLong(), batchMonitor.getTotalResultsLong(),
batchMonitor.getLastError()));
}
public void update()
{
if (task != null)
{
task.run();
}
}
@Override
public void publishEvent(Object event)
{
if (event instanceof BatchMonitorEvent batchMonitorEvent)
{
batchMonitor = batchMonitorEvent.getBatchMonitor();
}
}
}

View File

@@ -1,48 +0,0 @@
/*
* #%L
* Alfresco Records Management Module
* %%
* Copyright (C) 2005 - 2024 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* -
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
* -
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* -
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
* -
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.module.org_alfresco_module_rm.bulk.hold;
import org.alfresco.module.org_alfresco_module_rm.bulk.BulkTaskContainer;
/**
* An implementation of {@link BulkTaskContainer} for the hold bulk operation
*/
public class HoldBulkTaskContainer implements BulkTaskContainer
{
private Runnable task;
public void runTask()
{
task.run();
}
public void setTask(Runnable task)
{
this.task = task;
}
}