diff --git a/src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java b/src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java index 2c28c58..ded9a7f 100644 --- a/src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java +++ b/src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java @@ -28,6 +28,7 @@ import org.activiti.engine.delegate.event.ActivitiEventType; import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior; import org.activiti.engine.impl.context.Context; import org.activiti.engine.impl.persistence.entity.DeploymentEntity; +import org.activiti.engine.impl.persistence.entity.JobEntity; import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity; import org.activiti.engine.impl.util.ProcessDefinitionUtil; import org.activiti.engine.repository.ProcessDefinition; @@ -234,6 +235,17 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic default: } // we only want to deal with ProcessDefinition entities + } else if (aaevent.getEntity() instanceof JobEntity) { + this.logger.trace("Triggered by job state change: {}", aaevent.getEntity()); + + switch (aaevent.getType()) { + case ENTITY_DELETED: + case ENTITY_SUSPENDED: + // we need to stop the listener + this.onJobRemoveEvent((JobEntity) aaevent.getEntity()); + break; + default: + } } } @@ -282,6 +294,12 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic this.deloopMqSubscribeTask(entity.getId()); } + protected void onJobRemoveEvent(JobEntity entity) { + this.logger.trace("Triggered by job removal: {}", entity); + this.subscriptionService.cancelled(entity.getExecutionId()); + this.logger.debug("Subscription execution ended due to job removal: job: {}: exec: {}", entity.getId(), entity.getExecutionId()); + } + protected void unsubscribeOtherMqSubscribeTasks(ProcessDefinitionEntity procDef) { this.unsubscribeOtherMqSubscribeTasks(procDef.getId()); } diff --git a/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java b/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java index bc4ece5..4f7cf77 100644 --- a/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java +++ b/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java @@ -8,6 +8,9 @@ import java.util.Set; import org.activiti.engine.ProcessEngine; import org.activiti.engine.delegate.DelegateExecution; import org.activiti.engine.repository.ProcessDefinition; +import org.activiti.engine.runtime.Execution; +import org.activiti.engine.runtime.Job; +import org.activiti.engine.runtime.ProcessInstance; import org.apache.commons.collections4.MultiValuedMap; import org.apache.commons.collections4.multimap.HashSetValuedHashMap; import org.apache.commons.lang3.tuple.Pair; @@ -26,13 +29,13 @@ public class MqExecutionService { /** * The size of the keys is limited to the number of process definitions - * series are defined. This excludes versions. It would actually only - * contain ones with an MQ subscribe task. It would never be a significant - * memory hog. + * series defined. This excludes versions. It would actually only contain + * ones with an MQ subscribe task. It would never be a significant memory + * hog. * * The size of the values is limited to the number of active process - * definition versions exist. So it would never be a significant memory - * hog. + * definition versions that exist. So it would never be a significant + * memory hog. * * The size of the keys/values have nothing to do with the number of * process instances or executions. @@ -81,8 +84,10 @@ public class MqExecutionService { private MultiValuedMap, String> activityExecutionMap = new HashSetValuedHashMap<>(); public synchronized void executing(DelegateExecution execution) { + ProcessDefinition procDef = this.services.getRepositoryService().createProcessDefinitionQuery().processDefinitionId(execution.getProcessDefinitionId()).singleResult(); + this.processDefinitionKeyMap.put(procDef.getKey(), execution.getProcessDefinitionId()); + Pair key = this.toKey(execution); - this.processDefinitionKeyMap.put(execution.getProcessDefinitionId(), key.getLeft()); this.processDefinitionActivityMap.put(key.getLeft(), key.getRight()); this.activityExecutionMap.put(key, execution.getId()); } @@ -92,6 +97,39 @@ public class MqExecutionService { this.activityExecutionMap.removeMapping(key, execution.getId()); } + public synchronized final boolean cancelled(String executionId) { + Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult(); + return this.cancelled(execution); + } + + public synchronized boolean cancelled(Execution execution) { + ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult(); + if (execution.getActivityId() != null) { + Pair key = this.toKey(pi.getProcessDefinitionId(), execution.getActivityId()); + return this.activityExecutionMap.removeMapping(key, execution.getId()); + } else { + this.logger.trace("No activity discovered, so checking all activities in the process definition: {}: {}", pi.getProcessDefinitionId(), execution.getId()); + Collection activityIds = this.processDefinitionActivityMap.get(pi.getProcessDefinitionId()); + boolean removed = false; + + for (String activityId : activityIds) { + Pair key = this.toKey(pi.getProcessDefinitionId(), activityId); + removed = this.activityExecutionMap.removeMapping(key, execution.getId()) || removed; + } + + return removed; + } + } + + public synchronized final boolean cancelledJob(String jobId) { + Job job = this.services.getManagementService().createJobQuery().jobId(jobId).singleResult(); + return this.cancelledJob(job); + } + + public synchronized final boolean cancelledJob(Job job) { + return this.cancelled(job.getExecutionId()); + } + /** * @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared. * @return A set of execution identifiers that were in the now cleared map. diff --git a/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java b/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java index 7427823..060c2ca 100644 --- a/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java +++ b/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java @@ -7,6 +7,7 @@ import java.util.Set; import org.activiti.engine.ProcessEngine; import org.activiti.engine.delegate.DelegateExecution; import org.activiti.engine.repository.ProcessDefinition; +import org.activiti.engine.runtime.Execution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -45,6 +46,23 @@ public class MqSubscriptionService extends MqExecutionService { this.executed(execution); } + @Override + public synchronized boolean cancelled(Execution execution) { + AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId()); + if (cachedConsumerCloseable == null) { + this.logger.trace("An execution was cancelled, but had no registered subscription to close: {}", execution.getId()); + return false; + } + + // this will eventually lead to a call to "consumed() above" + try { + cachedConsumerCloseable.close(); + return super.cancelled(execution); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + /** * @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared. * @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended.