added execution cancellation support; listening for job deletion

This commit is contained in:
2025-02-20 13:09:57 -05:00
parent 41b5271617
commit 5aef2d4446
3 changed files with 80 additions and 6 deletions

View File

@@ -28,6 +28,7 @@ import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
import org.activiti.engine.repository.ProcessDefinition;
@@ -234,6 +235,17 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
default:
}
// we only want to deal with ProcessDefinition entities
} else if (aaevent.getEntity() instanceof JobEntity) {
this.logger.trace("Triggered by job state change: {}", aaevent.getEntity());
switch (aaevent.getType()) {
case ENTITY_DELETED:
case ENTITY_SUSPENDED:
// we need to stop the listener
this.onJobRemoveEvent((JobEntity) aaevent.getEntity());
break;
default:
}
}
}
@@ -282,6 +294,12 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
this.deloopMqSubscribeTask(entity.getId());
}
protected void onJobRemoveEvent(JobEntity entity) {
this.logger.trace("Triggered by job removal: {}", entity);
this.subscriptionService.cancelled(entity.getExecutionId());
this.logger.debug("Subscription execution ended due to job removal: job: {}: exec: {}", entity.getId(), entity.getExecutionId());
}
protected void unsubscribeOtherMqSubscribeTasks(ProcessDefinitionEntity procDef) {
this.unsubscribeOtherMqSubscribeTasks(procDef.getId());
}

View File

@@ -8,6 +8,9 @@ import java.util.Set;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.Job;
import org.activiti.engine.runtime.ProcessInstance;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import org.apache.commons.lang3.tuple.Pair;
@@ -26,13 +29,13 @@ public class MqExecutionService {
/**
* The size of the keys is limited to the number of process definitions
* series are defined. This excludes versions. It would actually only
* contain ones with an MQ subscribe task. It would never be a significant
* memory hog.
* series defined. This excludes versions. It would actually only contain
* ones with an MQ subscribe task. It would never be a significant memory
* hog.
*
* The size of the values is limited to the number of active process
* definition versions exist. So it would never be a significant memory
* hog.
* definition versions that exist. So it would never be a significant
* memory hog.
*
* The size of the keys/values have nothing to do with the number of
* process instances or executions.
@@ -81,8 +84,10 @@ public class MqExecutionService {
private MultiValuedMap<Pair<String, String>, String> activityExecutionMap = new HashSetValuedHashMap<>();
public synchronized void executing(DelegateExecution execution) {
ProcessDefinition procDef = this.services.getRepositoryService().createProcessDefinitionQuery().processDefinitionId(execution.getProcessDefinitionId()).singleResult();
this.processDefinitionKeyMap.put(procDef.getKey(), execution.getProcessDefinitionId());
Pair<String, String> key = this.toKey(execution);
this.processDefinitionKeyMap.put(execution.getProcessDefinitionId(), key.getLeft());
this.processDefinitionActivityMap.put(key.getLeft(), key.getRight());
this.activityExecutionMap.put(key, execution.getId());
}
@@ -92,6 +97,39 @@ public class MqExecutionService {
this.activityExecutionMap.removeMapping(key, execution.getId());
}
public synchronized final boolean cancelled(String executionId) {
Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult();
return this.cancelled(execution);
}
public synchronized boolean cancelled(Execution execution) {
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
if (execution.getActivityId() != null) {
Pair<String, String> key = this.toKey(pi.getProcessDefinitionId(), execution.getActivityId());
return this.activityExecutionMap.removeMapping(key, execution.getId());
} else {
this.logger.trace("No activity discovered, so checking all activities in the process definition: {}: {}", pi.getProcessDefinitionId(), execution.getId());
Collection<String> activityIds = this.processDefinitionActivityMap.get(pi.getProcessDefinitionId());
boolean removed = false;
for (String activityId : activityIds) {
Pair<String, String> key = this.toKey(pi.getProcessDefinitionId(), activityId);
removed = this.activityExecutionMap.removeMapping(key, execution.getId()) || removed;
}
return removed;
}
}
public synchronized final boolean cancelledJob(String jobId) {
Job job = this.services.getManagementService().createJobQuery().jobId(jobId).singleResult();
return this.cancelledJob(job);
}
public synchronized final boolean cancelledJob(Job job) {
return this.cancelled(job.getExecutionId());
}
/**
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
* @return A set of execution identifiers that were in the now cleared map.

View File

@@ -7,6 +7,7 @@ import java.util.Set;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.Execution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -45,6 +46,23 @@ public class MqSubscriptionService extends MqExecutionService {
this.executed(execution);
}
@Override
public synchronized boolean cancelled(Execution execution) {
AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
if (cachedConsumerCloseable == null) {
this.logger.trace("An execution was cancelled, but had no registered subscription to close: {}", execution.getId());
return false;
}
// this will eventually lead to a call to "consumed() above"
try {
cachedConsumerCloseable.close();
return super.cancelled(execution);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
/**
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
* @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended.