Files
mq-activiti-ext/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java
2025-03-10 15:27:04 -04:00

242 lines
11 KiB
Java

package com.inteligr8.activiti.mq;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.Job;
import org.activiti.engine.runtime.ProcessInstance;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqExecutionService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ProcessEngine services;
/**
* The size of the keys is limited to the number of process definitions
* series defined. This excludes versions. It would actually only contain
* ones with an MQ subscribe task. It would never be a significant memory
* hog.
*
* The size of the values is limited to the number of active process
* definition versions that exist. So it would never be a significant
* memory hog.
*
* The size of the keys/values have nothing to do with the number of
* process instances or executions.
*
* This means it does not need to be trimmed. However, it is a good idea
* to remove process definition identifiers (values) that have no active
* executions. You could do the same with active activities, but cleaning
* up process definitions will clean those up too.
*/
private MultiValuedMap<String, String> processDefinitionKeyMap = new HashSetValuedHashMap<>();
/**
* The size of the keys is limited to the number of process definitions
* defined. It would actually only contain ones with an MQ subscribe task.
* Even if we kept versioned or inactive process definitions in the map, it
* would never be a significant memory hog.
*
* The size of the values is limited to the number of MQ subscribe tasks
* defined in each process definition. So it would never be a significant
* memory hog.
*
* The size of the keys/values have nothing to do with the number of
* process instances or executions.
*
* This means it does not need to be trimmed. However, it is a good idea
* to remove process definition keys that have no active executions. You
* could do the same with active activities, but cleaning up process
* definitions will clean those up too.
*/
private MultiValuedMap<String, String> processDefinitionActivityMap = new HashSetValuedHashMap<>();
/**
* The size of the keys is limited to the number of MQ subscribe tasks
* defined in all process definitions. So it would never be a significant
* memory hog.
*
* The size of the values has no limit. It will grow with the number of
* executions (related to process instances).
*
* This means the map values need to be trimmed. When an MQ subscribe task
* is completed, it is paramount to remove the execution from the values of
* this map. It is also a good idea to remove the activity key when it is
* removed from the `processDefinitionActivityMap` map; and to propagate
* the removal of executions from the `executionSubscriptionMap` map.
*/
private MultiValuedMap<Pair<String, String>, String> activityExecutionMap = new HashSetValuedHashMap<>();
public synchronized void executing(DelegateExecution execution) {
ProcessDefinition procDef = this.services.getRepositoryService().createProcessDefinitionQuery().processDefinitionId(execution.getProcessDefinitionId()).singleResult();
this.processDefinitionKeyMap.put(procDef.getKey(), execution.getProcessDefinitionId());
Pair<String, String> key = this.toKey(execution);
this.processDefinitionActivityMap.put(key.getLeft(), key.getRight());
this.activityExecutionMap.put(key, execution.getId());
}
public synchronized void executed(DelegateExecution execution) {
Pair<String, String> key = this.toKey(execution);
this.activityExecutionMap.removeMapping(key, execution.getId());
}
public synchronized final boolean cancelled(String executionId) {
Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult();
return this.cancelled(execution);
}
public synchronized boolean cancelled(Execution execution) {
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
if (execution.getActivityId() != null) {
Pair<String, String> key = this.toKey(pi.getProcessDefinitionId(), execution.getActivityId());
return this.activityExecutionMap.removeMapping(key, execution.getId());
} else {
this.logger.trace("No activity discovered, so checking all activities in the process definition: {}: {}", pi.getProcessDefinitionId(), execution.getId());
Collection<String> activityIds = this.processDefinitionActivityMap.get(pi.getProcessDefinitionId());
boolean removed = false;
for (String activityId : activityIds) {
Pair<String, String> key = this.toKey(pi.getProcessDefinitionId(), activityId);
removed = this.activityExecutionMap.removeMapping(key, execution.getId()) || removed;
}
return removed;
}
}
public synchronized final boolean cancelledJob(String jobId) {
Job job = this.services.getManagementService().createJobQuery().jobId(jobId).singleResult();
return this.cancelledJob(job);
}
public synchronized final boolean cancelledJob(Job job) {
return this.cancelled(job.getExecutionId());
}
/**
* This method cancels all the executions active on the specified process
* definition.
*
* @param executionId An execution unique identifier.
* @return `true` if execution was cached; `false` otherwise.
*/
public boolean cancel(String executionId) throws Exception {
this.logger.trace("cancel({})", executionId);
Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult();
if (execution == null) {
this.logger.debug("No execution to cancel: {}", executionId);
return false;
}
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
if (pi == null) {
this.logger.debug("No process instance to cancel: {}", executionId);
return false;
}
return this.cancel(pi.getProcessDefinitionId(), executionId);
}
/**
* This method cancels all the executions active on the specified process
* definition.
*
* @param processDefinitionId A process definition unique identifier.
* @return A set of execution identifiers that were cancelled.
*/
public Set<String> cancelAll(String processDefinitionId) throws Exception {
this.logger.trace("cancelAll({})", processDefinitionId);
ProcessDefinition processDefinition = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
String processDefinitionKey = processDefinition.getKey();
return this.cancelAll(processDefinitionId, processDefinitionKey);
}
/**
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
* @return A set of execution identifiers that were in the now cleared map.
*/
public synchronized Set<String> cancelAllOtherVersions(String latestProcessDefinitionId) throws Exception {
this.logger.trace("cancelAllOtherVersions({})", latestProcessDefinitionId);
ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId);
if (latestProcessDefinition == null)
return Collections.emptySet();
String processDefinitionKey = latestProcessDefinition.getKey();
Set<String> executionIds = new HashSet<>();
Collection<String> processDefinitionIds = this.processDefinitionKeyMap.get(processDefinitionKey);
for (String processDefinitionId : processDefinitionIds)
executionIds.addAll(this.cancelAll(processDefinitionId, processDefinitionKey));
return executionIds;
}
private synchronized boolean cancel(String processDefinitionId, String executionId) throws Exception {
this.logger.trace("Cancelling execution: {}: {}", processDefinitionId, executionId);
Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId);
if (activityIds == null) {
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
return false;
}
for (String activityId : activityIds) {
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
Collection<String> activityExecutionIds = this.activityExecutionMap.get(key);
if (activityExecutionIds.remove(executionId))
return true;
}
return false;
}
private synchronized Set<String> cancelAll(String processDefinitionId, String processDefinitionKey) throws Exception {
Set<String> executionIds = new HashSet<>();
this.processDefinitionKeyMap.removeMapping(processDefinitionKey, processDefinitionId);
Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId);
if (activityIds == null) {
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
return Collections.emptySet();
}
for (String activityId : activityIds) {
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
Collection<String> activityExecutionIds = this.activityExecutionMap.remove(key);
if (activityExecutionIds != null)
executionIds.addAll(activityExecutionIds);
}
return executionIds;
}
protected Pair<String, String> toKey(DelegateExecution execution) {
return Pair.of(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
}
protected Pair<String, String> toKey(String processDefinitionId, String activityId) {
return Pair.of(processDefinitionId, activityId);
}
}