From c11e6c76108f13c29f11eebb7bf6f3d1f614f212 Mon Sep 17 00:00:00 2001 From: "Brian M. Long" Date: Fri, 14 Feb 2025 15:51:53 -0500 Subject: [PATCH] clear/unsubscribe old process definitions --- .../mq/MQProcessDefinitionMonitor.java | 14 ++-- .../activiti/mq/MqExecutionService.java | 68 ++++++++++++++----- .../activiti/mq/MqSubscriptionService.java | 19 ++++-- 3 files changed, 74 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java b/src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java index f406eb2..2c28c58 100644 --- a/src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java +++ b/src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java @@ -240,7 +240,7 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) { this.logger.debug("Triggered by process definition addition: {}", entity); - this.unsubscribeMqSubscribeTasks(entity.getId()); + this.unsubscribeOtherMqSubscribeTasks(entity); ServiceTask task = this.findMqStartSubscribeTask(entity.getId()); if (task == null) @@ -260,7 +260,7 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic for (ProcessDefinitionEntity procDefEntity : procDefEntities) { this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName()); - this.unsubscribeMqSubscribeTasks(procDefEntity.getId()); + this.unsubscribeOtherMqSubscribeTasks(procDefEntity); ServiceTask task = this.findMqStartSubscribeTask(procDefEntity.getId()); if (task == null) @@ -273,7 +273,7 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) { this.logger.debug("Triggered by process definition removal: {}", entity); - this.unsubscribeMqSubscribeTasks(entity.getId()); + this.unsubscribeOtherMqSubscribeTasks(entity); ServiceTask task = this.findMqStartSubscribeTask(entity.getId()); if (task == null) @@ -282,9 +282,13 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic this.deloopMqSubscribeTask(entity.getId()); } - protected void unsubscribeMqSubscribeTasks(String procDefId) { + protected void unsubscribeOtherMqSubscribeTasks(ProcessDefinitionEntity procDef) { + this.unsubscribeOtherMqSubscribeTasks(procDef.getId()); + } + + protected void unsubscribeOtherMqSubscribeTasks(String procDefId) { try { - Set executionIds = this.subscriptionService.clear(procDefId); + Set executionIds = this.subscriptionService.clearOtherVersions(procDefId); if (this.logger.isDebugEnabled()) { this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds); } else { diff --git a/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java b/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java index a37e19c..bc4ece5 100644 --- a/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java +++ b/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java @@ -5,18 +5,44 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import org.activiti.engine.ProcessEngine; import org.activiti.engine.delegate.DelegateExecution; +import org.activiti.engine.repository.ProcessDefinition; import org.apache.commons.collections4.MultiValuedMap; import org.apache.commons.collections4.multimap.HashSetValuedHashMap; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MqExecutionService { private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private ProcessEngine services; + + /** + * The size of the keys is limited to the number of process definitions + * series are defined. This excludes versions. It would actually only + * contain ones with an MQ subscribe task. It would never be a significant + * memory hog. + * + * The size of the values is limited to the number of active process + * definition versions exist. So it would never be a significant memory + * hog. + * + * The size of the keys/values have nothing to do with the number of + * process instances or executions. + * + * This means it does not need to be trimmed. However, it is a good idea + * to remove process definition identifiers (values) that have no active + * executions. You could do the same with active activities, but cleaning + * up process definitions will clean those up too. + */ + private MultiValuedMap processDefinitionKeyMap = new HashSetValuedHashMap<>(); /** * The size of the keys is limited to the number of process definitions @@ -55,9 +81,9 @@ public class MqExecutionService { private MultiValuedMap, String> activityExecutionMap = new HashSetValuedHashMap<>(); public synchronized void executing(DelegateExecution execution) { - this.processDefinitionActivityMap.put(execution.getProcessDefinitionId(), execution.getCurrentActivityId()); - Pair key = this.toKey(execution); + this.processDefinitionKeyMap.put(execution.getProcessDefinitionId(), key.getLeft()); + this.processDefinitionActivityMap.put(key.getLeft(), key.getRight()); this.activityExecutionMap.put(key, execution.getId()); } @@ -67,31 +93,39 @@ public class MqExecutionService { } /** - * @param processDefinitionId A process definition identifier. + * @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared. * @return A set of execution identifiers that were in the now cleared map. */ - public synchronized Set clear(String processDefinitionId) throws Exception { - Collection activityIds = this.processDefinitionActivityMap.get(processDefinitionId); - if (activityIds == null) { - this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId); - return Collections.emptySet(); - } - + public synchronized Set clearOtherVersions(String latestProcessDefinitionId) throws Exception { + ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId); + String processDefinitionKey = latestProcessDefinition.getKey(); + Set executionIds = new HashSet<>(); - for (String activityId : activityIds) { - this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId); - Pair key = this.toKey(processDefinitionId, activityId); - Collection activityExecutionIds = this.activityExecutionMap.get(key); - if (activityExecutionIds != null) - executionIds.addAll(activityExecutionIds); + Collection processDefinitionIds = this.processDefinitionKeyMap.get(processDefinitionKey); + for (String processDefinitionId : processDefinitionIds) { + this.processDefinitionKeyMap.removeMapping(processDefinitionKey, processDefinitionId); + + Collection activityIds = this.processDefinitionActivityMap.remove(processDefinitionId); + if (activityIds == null) { + this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId); + return Collections.emptySet(); + } + + for (String activityId : activityIds) { + this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId); + Pair key = this.toKey(processDefinitionId, activityId); + Collection activityExecutionIds = this.activityExecutionMap.remove(key); + if (activityExecutionIds != null) + executionIds.addAll(activityExecutionIds); + } } return executionIds; } protected Pair toKey(DelegateExecution execution) { - return this.toKey(execution.getProcessDefinitionId(), execution.getCurrentActivityId()); + return Pair.of(execution.getProcessDefinitionId(), execution.getCurrentActivityId()); } protected Pair toKey(String processDefinitionId, String activityId) { diff --git a/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java b/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java index b974e44..7427823 100644 --- a/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java +++ b/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java @@ -4,15 +4,21 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import org.activiti.engine.ProcessEngine; import org.activiti.engine.delegate.DelegateExecution; +import org.activiti.engine.repository.ProcessDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MqSubscriptionService extends MqExecutionService { private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private ProcessEngine services; /** * The size of the map has no limit. It will grow with the number of @@ -40,18 +46,21 @@ public class MqSubscriptionService extends MqExecutionService { } /** - * @param processDefinitionId A process definition identifier. + * @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared. * @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended. */ @Override - public synchronized Set clear(String processDefinitionId) throws Exception { - Set executionIds = super.clear(processDefinitionId); + public synchronized Set clearOtherVersions(String latestProcessDefinitionId) throws Exception { + Set executionIds = super.clearOtherVersions(latestProcessDefinitionId); + + ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId); + String processDefinitionKey = latestProcessDefinition.getKey(); for (String executionId : executionIds) { - this.logger.trace("Clearing process definition execution: {}: {}", processDefinitionId, executionId); + this.logger.trace("Clearing process definition execution: {}: {}", latestProcessDefinitionId, executionId); AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId); if (consumer != null) { - this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionId, executionId, consumer); + this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer); consumer.close(); } }