clear/unsubscribe old process definitions

This commit is contained in:
Brian Long 2025-02-14 15:51:53 -05:00
parent 5d0a77f623
commit c11e6c7610
3 changed files with 74 additions and 27 deletions

View File

@ -240,7 +240,7 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) { protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition addition: {}", entity); this.logger.debug("Triggered by process definition addition: {}", entity);
this.unsubscribeMqSubscribeTasks(entity.getId()); this.unsubscribeOtherMqSubscribeTasks(entity);
ServiceTask task = this.findMqStartSubscribeTask(entity.getId()); ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
if (task == null) if (task == null)
@ -260,7 +260,7 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
for (ProcessDefinitionEntity procDefEntity : procDefEntities) { for (ProcessDefinitionEntity procDefEntity : procDefEntities) {
this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName()); this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName());
this.unsubscribeMqSubscribeTasks(procDefEntity.getId()); this.unsubscribeOtherMqSubscribeTasks(procDefEntity);
ServiceTask task = this.findMqStartSubscribeTask(procDefEntity.getId()); ServiceTask task = this.findMqStartSubscribeTask(procDefEntity.getId());
if (task == null) if (task == null)
@ -273,7 +273,7 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) { protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition removal: {}", entity); this.logger.debug("Triggered by process definition removal: {}", entity);
this.unsubscribeMqSubscribeTasks(entity.getId()); this.unsubscribeOtherMqSubscribeTasks(entity);
ServiceTask task = this.findMqStartSubscribeTask(entity.getId()); ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
if (task == null) if (task == null)
@ -282,9 +282,13 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
this.deloopMqSubscribeTask(entity.getId()); this.deloopMqSubscribeTask(entity.getId());
} }
protected void unsubscribeMqSubscribeTasks(String procDefId) { protected void unsubscribeOtherMqSubscribeTasks(ProcessDefinitionEntity procDef) {
this.unsubscribeOtherMqSubscribeTasks(procDef.getId());
}
protected void unsubscribeOtherMqSubscribeTasks(String procDefId) {
try { try {
Set<String> executionIds = this.subscriptionService.clear(procDefId); Set<String> executionIds = this.subscriptionService.clearOtherVersions(procDefId);
if (this.logger.isDebugEnabled()) { if (this.logger.isDebugEnabled()) {
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds); this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
} else { } else {

View File

@ -5,18 +5,44 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.DelegateExecution; import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.repository.ProcessDefinition;
import org.apache.commons.collections4.MultiValuedMap; import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap; import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class MqExecutionService { public class MqExecutionService {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ProcessEngine services;
/**
* The size of the keys is limited to the number of process definitions
* series are defined. This excludes versions. It would actually only
* contain ones with an MQ subscribe task. It would never be a significant
* memory hog.
*
* The size of the values is limited to the number of active process
* definition versions exist. So it would never be a significant memory
* hog.
*
* The size of the keys/values have nothing to do with the number of
* process instances or executions.
*
* This means it does not need to be trimmed. However, it is a good idea
* to remove process definition identifiers (values) that have no active
* executions. You could do the same with active activities, but cleaning
* up process definitions will clean those up too.
*/
private MultiValuedMap<String, String> processDefinitionKeyMap = new HashSetValuedHashMap<>();
/** /**
* The size of the keys is limited to the number of process definitions * The size of the keys is limited to the number of process definitions
@ -55,9 +81,9 @@ public class MqExecutionService {
private MultiValuedMap<Pair<String, String>, String> activityExecutionMap = new HashSetValuedHashMap<>(); private MultiValuedMap<Pair<String, String>, String> activityExecutionMap = new HashSetValuedHashMap<>();
public synchronized void executing(DelegateExecution execution) { public synchronized void executing(DelegateExecution execution) {
this.processDefinitionActivityMap.put(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
Pair<String, String> key = this.toKey(execution); Pair<String, String> key = this.toKey(execution);
this.processDefinitionKeyMap.put(execution.getProcessDefinitionId(), key.getLeft());
this.processDefinitionActivityMap.put(key.getLeft(), key.getRight());
this.activityExecutionMap.put(key, execution.getId()); this.activityExecutionMap.put(key, execution.getId());
} }
@ -67,31 +93,39 @@ public class MqExecutionService {
} }
/** /**
* @param processDefinitionId A process definition identifier. * @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
* @return A set of execution identifiers that were in the now cleared map. * @return A set of execution identifiers that were in the now cleared map.
*/ */
public synchronized Set<String> clear(String processDefinitionId) throws Exception { public synchronized Set<String> clearOtherVersions(String latestProcessDefinitionId) throws Exception {
Collection<String> activityIds = this.processDefinitionActivityMap.get(processDefinitionId); ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId);
if (activityIds == null) { String processDefinitionKey = latestProcessDefinition.getKey();
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
return Collections.emptySet();
}
Set<String> executionIds = new HashSet<>(); Set<String> executionIds = new HashSet<>();
for (String activityId : activityIds) { Collection<String> processDefinitionIds = this.processDefinitionKeyMap.get(processDefinitionKey);
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId); for (String processDefinitionId : processDefinitionIds) {
Pair<String, String> key = this.toKey(processDefinitionId, activityId); this.processDefinitionKeyMap.removeMapping(processDefinitionKey, processDefinitionId);
Collection<String> activityExecutionIds = this.activityExecutionMap.get(key);
if (activityExecutionIds != null) Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId);
executionIds.addAll(activityExecutionIds); if (activityIds == null) {
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
return Collections.emptySet();
}
for (String activityId : activityIds) {
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
Collection<String> activityExecutionIds = this.activityExecutionMap.remove(key);
if (activityExecutionIds != null)
executionIds.addAll(activityExecutionIds);
}
} }
return executionIds; return executionIds;
} }
protected Pair<String, String> toKey(DelegateExecution execution) { protected Pair<String, String> toKey(DelegateExecution execution) {
return this.toKey(execution.getProcessDefinitionId(), execution.getCurrentActivityId()); return Pair.of(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
} }
protected Pair<String, String> toKey(String processDefinitionId, String activityId) { protected Pair<String, String> toKey(String processDefinitionId, String activityId) {

View File

@ -4,15 +4,21 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.DelegateExecution; import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.repository.ProcessDefinition;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class MqSubscriptionService extends MqExecutionService { public class MqSubscriptionService extends MqExecutionService {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ProcessEngine services;
/** /**
* The size of the map has no limit. It will grow with the number of * The size of the map has no limit. It will grow with the number of
@ -40,18 +46,21 @@ public class MqSubscriptionService extends MqExecutionService {
} }
/** /**
* @param processDefinitionId A process definition identifier. * @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
* @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended. * @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended.
*/ */
@Override @Override
public synchronized Set<String> clear(String processDefinitionId) throws Exception { public synchronized Set<String> clearOtherVersions(String latestProcessDefinitionId) throws Exception {
Set<String> executionIds = super.clear(processDefinitionId); Set<String> executionIds = super.clearOtherVersions(latestProcessDefinitionId);
ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId);
String processDefinitionKey = latestProcessDefinition.getKey();
for (String executionId : executionIds) { for (String executionId : executionIds) {
this.logger.trace("Clearing process definition execution: {}: {}", processDefinitionId, executionId); this.logger.trace("Clearing process definition execution: {}: {}", latestProcessDefinitionId, executionId);
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId); AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null) { if (consumer != null) {
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionId, executionId, consumer); this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer);
consumer.close(); consumer.close();
} }
} }