From 740843908dcb0ee6979236d8b6d187aaa7ffc5fb Mon Sep 17 00:00:00 2001 From: "Brian M. Long" Date: Fri, 21 Feb 2025 09:36:01 -0500 Subject: [PATCH] refactored; added job listener --- .../mq/ActivitiEntityEventListener.java | 69 +++ .../mq/ActivitiEntityEventMonitor.java | 111 ++++ .../mq/MQProcessDefinitionMonitor.java | 473 ------------------ .../mq/MqDeploymentEventListener.java | 85 ++++ .../activiti/mq/MqExecutionService.java | 89 +++- .../activiti/mq/MqJobEventListener.java | 54 ++ .../mq/MqProcessDefinitionEventListener.java | 116 +++++ .../activiti/mq/MqSubscribeLooper.java | 268 ++++++++++ .../activiti/mq/MqSubscriptionService.java | 53 +- .../mq/ProcessDefinitionRegistry.java | 138 +++++ 10 files changed, 963 insertions(+), 493 deletions(-) create mode 100644 src/main/java/com/inteligr8/activiti/mq/ActivitiEntityEventListener.java create mode 100644 src/main/java/com/inteligr8/activiti/mq/ActivitiEntityEventMonitor.java delete mode 100644 src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java create mode 100644 src/main/java/com/inteligr8/activiti/mq/MqDeploymentEventListener.java create mode 100644 src/main/java/com/inteligr8/activiti/mq/MqJobEventListener.java create mode 100644 src/main/java/com/inteligr8/activiti/mq/MqProcessDefinitionEventListener.java create mode 100644 src/main/java/com/inteligr8/activiti/mq/MqSubscribeLooper.java create mode 100644 src/main/java/com/inteligr8/activiti/mq/ProcessDefinitionRegistry.java diff --git a/src/main/java/com/inteligr8/activiti/mq/ActivitiEntityEventListener.java b/src/main/java/com/inteligr8/activiti/mq/ActivitiEntityEventListener.java new file mode 100644 index 0000000..a248767 --- /dev/null +++ b/src/main/java/com/inteligr8/activiti/mq/ActivitiEntityEventListener.java @@ -0,0 +1,69 @@ +package com.inteligr8.activiti.mq; + +import org.activiti.engine.delegate.event.ActivitiEntityEvent; +import org.activiti.engine.delegate.event.ActivitiEventType; +import org.activiti.engine.impl.persistence.entity.Entity; + +public interface ActivitiEntityEventListener { + + /** + * This method checks to see if this listener is for the specified entity. + * + * @param entity An Activiti entity. + * @return `true` if this listener supports the entity; `false` otherwise. + */ + default boolean ofEntityType(Entity entity) { + return entity == null ? false : this.ofEntityType(entity.getClass()); + } + + /** + * This method checks to see if this listener is for the specified entity + * class. + * + * @param entityType An Activiti entity class. + * @return `true` if this listener supports the entity type; `false` otherwise. + */ + boolean ofEntityType(Class entityType); + + /** + * This method allows for initialization on application startup. Any + * resources this listener should subscribe to, should be connected or + * opened. + */ + default void onApplicationStartup() { + } + + /** + * This method allows for uninitialization on application shutdown. Any + * resources this listener subscribes to should be disconnected or closed. + * + * The `ActivitiEntityEventListener#onEvent()` method may still be called + * before startup or after shutdown. + */ + default void onApplicationShutdown() { + } + + /** + * This method is fired every time there is a qualifying entity event. + * + * @param aaevent An Activiti entity event. + */ + default void onEntityEvent(ActivitiEntityEvent aaevent) { + @SuppressWarnings("unchecked") + T entity = (T) aaevent.getEntity(); + + this.onEntityEvent(aaevent.getType(), aaevent.getProcessDefinitionId(), aaevent.getProcessInstanceId(), aaevent.getExecutionId(), entity); + } + + /** + * Thi smethod is fired every time there is a qualifying entity event. + * + * @param eventType An Activiti event type; limited to `ENTITY_*`. + * @param processDefinitionId The unique identifier of the process definition subject to the event. + * @param processInstanceId The unique identifier of the process instance subject to the event. + * @param executionId The unique identifier of the execution subject to the event. + * @param entity The Activiti entity. + */ + void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId, String executionId, T entity); + +} diff --git a/src/main/java/com/inteligr8/activiti/mq/ActivitiEntityEventMonitor.java b/src/main/java/com/inteligr8/activiti/mq/ActivitiEntityEventMonitor.java new file mode 100644 index 0000000..97cbe05 --- /dev/null +++ b/src/main/java/com/inteligr8/activiti/mq/ActivitiEntityEventMonitor.java @@ -0,0 +1,111 @@ +package com.inteligr8.activiti.mq; + +import java.util.List; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.activiti.engine.ProcessEngine; +import org.activiti.engine.delegate.event.ActivitiEntityEvent; +import org.activiti.engine.delegate.event.ActivitiEvent; +import org.activiti.engine.delegate.event.ActivitiEventListener; +import org.activiti.engine.delegate.event.ActivitiEventType; +import org.activiti.engine.impl.persistence.entity.Entity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ApplicationContextEvent; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.ContextStartedEvent; +import org.springframework.context.event.ContextStoppedEvent; +import org.springframework.stereotype.Component; + +@Component +public class ActivitiEntityEventMonitor implements ActivitiEventListener, ApplicationListener { + + private final Logger logger = LoggerFactory.getLogger(ActivitiEntityEventMonitor.class); + + @Autowired + private List> listeners; + + @Autowired + private ProcessEngine services; + + /** + * This method is fired by Spring, the framework behind Activiti. This + * forwards the application event to each listener so they can initialize + * or uninitialize. + */ + @Override + public void onApplicationEvent(ApplicationContextEvent event) { + if (event instanceof ContextRefreshedEvent || event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent) { + this.logger.debug("Application context refresh/stop/close detected; shutting down listeners: {}", event); + + for (ActivitiEntityEventListener listener : this.listeners) { + listener.onApplicationShutdown(); + } + } + + // the listener cannot be active until the context is initialized + if (event instanceof ContextRefreshedEvent || event instanceof ContextStartedEvent) { + this.logger.debug("Application context refresh/start detected; starting up listeners: {}", event); + + for (ActivitiEntityEventListener listener : this.listeners) { + listener.onApplicationStartup(); + } + } + } + + /** + * This method will start monitoring for changes in deployment, app, and + * process definitions. + */ + @PostConstruct + protected void init() { + this.logger.debug("Bean initialized; starting to listen for any entity event ..."); + + this.services.getRuntimeService().addEventListener(this, + ActivitiEventType.ENTITY_CREATED, + ActivitiEventType.ENTITY_INITIALIZED, + ActivitiEventType.ENTITY_UPDATED, + ActivitiEventType.ENTITY_ACTIVATED, + ActivitiEventType.ENTITY_SUSPENDED, + ActivitiEventType.ENTITY_DELETED); + } + + /** + * This method will stop monitoring for changes in deployment, app, and + * process definitions. + */ + @PreDestroy + protected void uninit() { + this.logger.debug("Bean uninitialized; stopping listener for any entity event ..."); + + this.services.getRuntimeService().removeEventListener(this); + } + + /** + * This method is fired by the Activiti platform. It is called on every + * entity event. + * + * @param event An Activiti event. + */ + @Override + public void onEvent(ActivitiEvent event) { + this.logger.trace("Triggered by event: {}", event); + + for (ActivitiEntityEventListener listener : this.listeners) { + ActivitiEntityEvent aaevent = (ActivitiEntityEvent) event; + if (listener.ofEntityType((Entity) aaevent.getEntity())) + listener.onEntityEvent(aaevent); + } + } + + @Override + public boolean isFailOnException() { + return true; + } + +} diff --git a/src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java b/src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java deleted file mode 100644 index ded9a7f..0000000 --- a/src/main/java/com/inteligr8/activiti/mq/MQProcessDefinitionMonitor.java +++ /dev/null @@ -1,473 +0,0 @@ -package com.inteligr8.activiti.mq; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - -import org.activiti.bpmn.model.BpmnModel; -import org.activiti.bpmn.model.FieldExtension; -import org.activiti.bpmn.model.FlowElement; -import org.activiti.bpmn.model.SequenceFlow; -import org.activiti.bpmn.model.ServiceTask; -import org.activiti.bpmn.model.StartEvent; -import org.activiti.engine.ProcessEngine; -import org.activiti.engine.delegate.JavaDelegate; -import org.activiti.engine.delegate.event.ActivitiActivityEvent; -import org.activiti.engine.delegate.event.ActivitiEntityEvent; -import org.activiti.engine.delegate.event.ActivitiEvent; -import org.activiti.engine.delegate.event.ActivitiEventListener; -import org.activiti.engine.delegate.event.ActivitiEventType; -import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior; -import org.activiti.engine.impl.context.Context; -import org.activiti.engine.impl.persistence.entity.DeploymentEntity; -import org.activiti.engine.impl.persistence.entity.JobEntity; -import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity; -import org.activiti.engine.impl.util.ProcessDefinitionUtil; -import org.activiti.engine.repository.ProcessDefinition; -import org.activiti.engine.repository.ProcessDefinitionQuery; -import org.activiti.engine.runtime.Execution; -import org.activiti.engine.runtime.ExecutionQuery; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationListener; -import org.springframework.context.event.ApplicationContextEvent; -import org.springframework.context.event.ContextClosedEvent; -import org.springframework.context.event.ContextRefreshedEvent; -import org.springframework.context.event.ContextStartedEvent; -import org.springframework.context.event.ContextStoppedEvent; -import org.springframework.stereotype.Component; - -import com.activiti.domain.idm.Tenant; - -@Component -public class MQProcessDefinitionMonitor implements ActivitiEventListener, ApplicationListener { - - private final Logger logger = LoggerFactory.getLogger(MQProcessDefinitionMonitor.class); - private final Pattern expressionPattern = Pattern.compile("\\$\\{(.+)\\}"); - - @Autowired - private ProcessEngine services; - - @Autowired - private ApplicationContext context; - - @Autowired - private TenantFinderService tenantFinderService; - - @Autowired - private MqSubscriptionService subscriptionService; - - private Map activeListeners = new HashMap<>(); - - /** - * This method will add/remove listeners to the specific process - * definitions that use MQ subscriptions to start process instances at - * application startup and shutdown. - */ - @Override - public void onApplicationEvent(ApplicationContextEvent event) { - if (event instanceof ContextRefreshedEvent || event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent) { - this.logger.debug("Discovered {} active process definitions to stop listening to", this.activeListeners.size()); - this.deloopMqSubscribeTasks(); - } - - if (event instanceof ContextRefreshedEvent || event instanceof ContextStartedEvent) { - String tenantId = this.findTenantId(); - List procDefs = this.findLatestActiveProcessDefinnitions(tenantId); - this.logger.debug("Found {} active process definitions", procDefs.size()); - for (ProcessDefinition procDef : procDefs) { - this.logger.trace("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId()); - - ServiceTask task = this.findMqStartSubscribeTask(procDef.getId()); - if (task == null) - continue; - - int concurrency = this.determineConcurrency(task); - this.logger.debug("Process definition MQ subscription is configured for concurrency: {}: {}", procDef.getId(), concurrency); - - List execs = this.findExecutionsByServiceTask(tenantId, procDef.getId(), task); - this.logger.debug("Process appears to have {} executions waiting on the MQ subscription: {}", execs.size(), procDef.getId()); - - if (execs.size() < concurrency) { - this.logger.info("Process has {} too few executions waiting on the MQ subscription; starting them: {}", (concurrency - execs.size()), procDef.getId()); - for (int thread = execs.size(); thread < concurrency; thread++) { - this.loopMqSubscribeTask(procDef.getId(), task); - } - } - } - } - } - - protected String findTenantId() { - Tenant tenant = this.tenantFinderService.findTenant(); - return this.tenantFinderService.transform(tenant); - } - - protected List findLatestActiveProcessDefinnitions(String tenantId) { - ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery() - .latestVersion() - .active(); - if (tenantId == null) { - procDefQuery.processDefinitionWithoutTenantId(); - } else { - procDefQuery.processDefinitionTenantId(tenantId); - } - - return procDefQuery.list(); - } - - protected List findExecutionsByServiceTask(String tenantId, String processDefinitionId, ServiceTask task) { - ExecutionQuery execQuery = this.services.getRuntimeService().createExecutionQuery() - .processDefinitionId(processDefinitionId) - .activityId(task.getId()); - if (tenantId == null) { - execQuery.executionWithoutTenantId(); - } else { - execQuery.executionTenantId(tenantId); - } - - return execQuery.list(); - } - - protected Integer findConcurrency(ServiceTask task) { - for (FieldExtension fieldext : task.getFieldExtensions()) { - if (fieldext.getFieldName().equals(Constants.FIELD_CONCURRENCY)) { - String concurrencyStr = StringUtils.trimToNull(fieldext.getStringValue()); - return concurrencyStr == null ? null : Integer.valueOf(concurrencyStr); - } - } - - return null; - } - - protected int determineConcurrency(ServiceTask task) { - Integer concurrency = this.findConcurrency(task); - if (concurrency == null) { - return 1; - } else if (concurrency.intValue() < 1) { - this.logger.warn("The task defines an illegal concurrency of {}; using 1: {}", concurrency, task.getId()); - return 1; - } else { - return concurrency.intValue(); - } - } - - /** - * This method will start monitoring for changes in deployment, app, and - * process definitions. - */ - @PostConstruct - protected void init() { - this.services.getRuntimeService().addEventListener(this, - ActivitiEventType.ENTITY_INITIALIZED, - ActivitiEventType.ENTITY_UPDATED, - ActivitiEventType.ENTITY_ACTIVATED, - ActivitiEventType.ENTITY_SUSPENDED, - ActivitiEventType.ENTITY_DELETED); - this.logger.debug("Started listening for entity events"); - } - - /** - * This method will stop monitoring for changes in deployment, app, and - * process definitions. - */ - @PreDestroy - protected void uninit() { - this.logger.debug("Stopping listening for entity events ..."); - this.services.getRuntimeService().removeEventListener(this); - } - - /** - * This method is fired by the Activiti platform. It is called on every - * entity event except the creation event. You can see that in the init() - * method above. - */ - @Override - public void onEvent(ActivitiEvent event) { - this.logger.trace("Triggered by event: {}", event); - this.onEntityEvent((ActivitiEntityEvent) event); - } - - @Override - public boolean isFailOnException() { - return true; - } - - protected void onEntityEvent(ActivitiEntityEvent aaevent) { - this.logger.trace("Triggered by entity: {}", aaevent.getEntity()); - - if (aaevent.getEntity() instanceof ProcessDefinitionEntity) { - this.logger.debug("Triggered by process definition state change: {}", aaevent.getEntity()); - - switch (aaevent.getType()) { - case ENTITY_INITIALIZED: - // we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached - // we must use DeploymentEntity and then dig down for the process definitions - break; - case ENTITY_DELETED: - case ENTITY_SUSPENDED: - // we need to stop the listener - this.onProcessDefinitionRemoveEvent((ProcessDefinitionEntity) aaevent.getEntity()); - break; - default: - // we need to start a listener - this.onProcessDefinitionAddEvent((ProcessDefinitionEntity) aaevent.getEntity()); - } - // we only want to deal with ProcessDefinition entities - } else if (aaevent.getEntity() instanceof DeploymentEntity) { - switch (aaevent.getType()) { - case ENTITY_INITIALIZED: - // we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached - // we must use DeploymentEntity and then dig down for the process definitions - this.onDeploymentAddEvent((DeploymentEntity) aaevent.getEntity()); - break; - default: - } - // we only want to deal with ProcessDefinition entities - } else if (aaevent.getEntity() instanceof JobEntity) { - this.logger.trace("Triggered by job state change: {}", aaevent.getEntity()); - - switch (aaevent.getType()) { - case ENTITY_DELETED: - case ENTITY_SUSPENDED: - // we need to stop the listener - this.onJobRemoveEvent((JobEntity) aaevent.getEntity()); - break; - default: - } - } - } - - protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) { - this.logger.debug("Triggered by process definition addition: {}", entity); - - this.unsubscribeOtherMqSubscribeTasks(entity); - - ServiceTask task = this.findMqStartSubscribeTask(entity.getId()); - if (task == null) - return; - - this.loopMqSubscribeTask(entity.getId(), task); - } - - protected void onDeploymentAddEvent(DeploymentEntity entity) { - this.logger.debug("Triggered by deployment addition: {}", entity); - - List procDefEntities = entity.getDeployedArtifacts(ProcessDefinitionEntity.class); - if (procDefEntities == null) - return; - this.logger.debug("Found {} process definitions in deployment: {}", procDefEntities.size(), entity.getId()); - - for (ProcessDefinitionEntity procDefEntity : procDefEntities) { - this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName()); - - this.unsubscribeOtherMqSubscribeTasks(procDefEntity); - - ServiceTask task = this.findMqStartSubscribeTask(procDefEntity.getId()); - if (task == null) - return; - - this.loopMqSubscribeTask(procDefEntity.getId(), task); - } - } - - protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) { - this.logger.debug("Triggered by process definition removal: {}", entity); - - this.unsubscribeOtherMqSubscribeTasks(entity); - - ServiceTask task = this.findMqStartSubscribeTask(entity.getId()); - if (task == null) - return; - - this.deloopMqSubscribeTask(entity.getId()); - } - - protected void onJobRemoveEvent(JobEntity entity) { - this.logger.trace("Triggered by job removal: {}", entity); - this.subscriptionService.cancelled(entity.getExecutionId()); - this.logger.debug("Subscription execution ended due to job removal: job: {}: exec: {}", entity.getId(), entity.getExecutionId()); - } - - protected void unsubscribeOtherMqSubscribeTasks(ProcessDefinitionEntity procDef) { - this.unsubscribeOtherMqSubscribeTasks(procDef.getId()); - } - - protected void unsubscribeOtherMqSubscribeTasks(String procDefId) { - try { - Set executionIds = this.subscriptionService.clearOtherVersions(procDefId); - if (this.logger.isDebugEnabled()) { - this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds); - } else { - this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size()); - } - } catch (Exception e) { - this.logger.error("The subscriptions could not be cleared: " + procDefId, e); - } - } - - protected synchronized void loopMqSubscribeTask(String processDefId, ServiceTask task) { - // start a process instance on the process - this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue", processDefId); - this.services.getRuntimeService().startProcessInstanceById(processDefId); - - if (this.activeListeners.containsKey(processDefId)) - // one listener, no matter how many instances are subscribed - return; - - AbstractActivityListener listener = new AbstractActivityListener(processDefId, task) { - @Override - public void onEvent(ActivitiActivityEvent event) { - logger.debug("MQ message received; starting another process instance on process '{}' to re-subscribe to the MQ queue", event.getProcessDefinitionId()); - services.getRuntimeService().startProcessInstanceById(processDefId); - } - - @Override - public boolean isFailOnException() { - return true; - } - }; - - // the ServiceTask will then execute, waiting for a message on the queue - // when a message is received, it will be responsible for starting a new process instance - // that new process instance will wait for the next message on the queue - // repeat - this.services.getRuntimeService().addEventListener(listener, ActivitiEventType.ACTIVITY_COMPLETED); - - // register the listener so we can possibly remove it later - this.activeListeners.put(processDefId, listener); - } - - protected synchronized int deloopMqSubscribeTasks() { - int count = 0; - - Iterator> i = this.activeListeners.entrySet().iterator(); - while (i.hasNext()) { - Entry listener = i.next(); - - this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", listener.getKey()); - this.services.getRuntimeService().removeEventListener(listener.getValue()); - - i.remove(); - count++; - } - - return count; - } - - protected synchronized boolean deloopMqSubscribeTask(String processDefId) { - AbstractActivityListener listener = this.activeListeners.remove(processDefId); - if (listener == null) { - this.logger.trace("No MQ subscription listener found; no listener to stop"); - return false; - } - - this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", processDefId); - this.services.getRuntimeService().removeEventListener(listener); - return true; - } - - private BpmnModel getBpmnModel(String processDefId) { - if (Context.getProcessDefinitionHelper() == null) { - // typically during initial startup - return this.services.getRepositoryService().getBpmnModel(processDefId); - } else { - // we cannot use the RepositoryService to get the BpmnModel as it is not yet cached in TX - return ProcessDefinitionUtil.getBpmnModel(processDefId); - } - } - - protected ServiceTask findMqStartSubscribeTask(String processDefId) { - BpmnModel model = this.getBpmnModel(processDefId); - StartEvent startElement = this.findMqPlainStartEvent(processDefId, model); - if (startElement == null) - return null; - - List flows = startElement.getOutgoingFlows(); - this.logger.trace("Found {} outgoing flows in the process start element: {}: {}", flows.size(), processDefId, startElement.getId()); - if (flows.isEmpty()) { - this.logger.warn("A start event is expected to always have an outgoing flow; skipping process: {}", processDefId); - return null; - } else if (flows.size() > 1) { - this.logger.debug("A start event with multiple outgoing flows cannot have an MQ subscription; skipping process: {}", processDefId); - return null; - } - - SequenceFlow flow = flows.iterator().next(); - FlowElement targetFlowElement = flow.getTargetFlowElement(); - if (targetFlowElement == null) { - this.logger.warn("A start event outgoing flow is expected to always have a target element; skipping process: {}", processDefId); - return null; - } - this.logger.trace("Found element '{}' after start event '{}' in process '{}'", targetFlowElement.getId(), startElement.getId(), processDefId); - - if (!(targetFlowElement instanceof ServiceTask)) { - this.logger.trace("A non-service task immediately after start event; skipping process: {}: {}: {}", processDefId, targetFlowElement.getId(), targetFlowElement.getName()); - return null; - } - - ServiceTask task = (ServiceTask) targetFlowElement; - if (!"delegateExpression".equals(task.getImplementationType())) { - this.logger.trace("A non-delegate service task immediately after start event; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName()); - return null; - } - this.logger.trace("Found service task immediately after start event: {}: {}", task.getImplementationType(), task.getImplementation()); - - Matcher matcher = this.expressionPattern.matcher(task.getImplementation()); - if (!matcher.find()) { - this.logger.warn("A service task delegate expression is in an unexpected format: {}; skipping process: {}: {}: {}", task.getImplementation(), processDefId, task.getId(), task.getName()); - return null; - } - - String beanId = matcher.group(1).trim(); - this.logger.trace("Looking up bean: {}", beanId); - - JavaDelegate delegateBean = this.context.getBean(beanId, JavaDelegate.class); - this.logger.trace("Found bean: {}: {}", beanId, delegateBean); - if (delegateBean == null) { - this.logger.trace("The service task delegate has no bean; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName()); - return null; - } else if (!(delegateBean instanceof MqSubscribeDelegate)) { - this.logger.trace("The service task delegate is not an MQ subscription; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName()); - return null; - } - - this.logger.info("Process starts with an MQ subscription: {}: {}", processDefId, task.getId(), task.getName()); - return task; - } - - protected StartEvent findMqPlainStartEvent(String processDefId, BpmnModel model) { - this.logger.trace("Finding all start elements in process: {}", processDefId); - List startElements = model.getMainProcess().findFlowElementsOfType(StartEvent.class); - this.logger.trace("Found {} start elements in process: {}", startElements.size(), processDefId); - for (StartEvent startElement : startElements) { - // constrain to just 0..1 plain start events; ignore timer, signal, message, and error start events - if (!(startElement.getBehavior() instanceof NoneStartEventActivityBehavior)) { - this.logger.trace("Found non-plain start event; ignoring start event: {}: {}: {}", processDefId, startElement.getId(), startElement.getBehavior()); - continue; - } - - return startElement; - } - - return null; - } - - protected ServiceTask castToServiceTask(FlowElement flowElement) { - if (!(flowElement instanceof ServiceTask)) - return null; - return (ServiceTask) flowElement; - } - -} diff --git a/src/main/java/com/inteligr8/activiti/mq/MqDeploymentEventListener.java b/src/main/java/com/inteligr8/activiti/mq/MqDeploymentEventListener.java new file mode 100644 index 0000000..06f116f --- /dev/null +++ b/src/main/java/com/inteligr8/activiti/mq/MqDeploymentEventListener.java @@ -0,0 +1,85 @@ +package com.inteligr8.activiti.mq; + +import java.util.List; +import java.util.Set; + +import org.activiti.engine.delegate.event.ActivitiEventType; +import org.activiti.engine.impl.persistence.entity.DeploymentEntity; +import org.activiti.engine.impl.persistence.entity.Entity; +import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class MqDeploymentEventListener implements ActivitiEntityEventListener { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + protected MqSubscribeLooper looper; + + @Autowired + protected ProcessDefinitionRegistry registry; + + @Autowired + private MqSubscriptionService subscriptionService; + + @Override + public boolean ofEntityType(Class entityType) { + return DeploymentEntity.class.isAssignableFrom(entityType); + } + + @Override + public boolean ofEntityType(Entity entity) { + return entity instanceof DeploymentEntity; + } + + @Override + public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId, + String executionId, DeploymentEntity entity) { + this.logger.trace("Triggered by deployment event: {}", entity); + + switch (eventType) { + case ENTITY_INITIALIZED: + // we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached + // we must use DeploymentEntity and then dig down for the process definitions + this.onDeploymentAddEvent((DeploymentEntity) entity); + break; + default: + } + } + + protected void onDeploymentAddEvent(DeploymentEntity entity) { + this.logger.debug("Triggered by deployment addition: {}", entity); + + List procDefEntities = entity.getDeployedArtifacts(ProcessDefinitionEntity.class); + if (procDefEntities == null) + return; + this.logger.debug("Found {} process definitions in deployment: {}", procDefEntities.size(), entity.getId()); + + for (ProcessDefinitionEntity procDefEntity : procDefEntities) { + this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName()); + + this.unsubscribeOtherMqSubscribeTasks(procDefEntity.getId()); + + if (this.registry.isMqStart(procDefEntity.getId())) + this.looper.loop(procDefEntity.getId()); + } + } + + protected void unsubscribeOtherMqSubscribeTasks(String procDefId) { + try { + Set executionIds = this.subscriptionService.cancelAllOtherVersions(procDefId); + if (this.logger.isDebugEnabled()) { + this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds); + } else { + this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size()); + } + } catch (Exception e) { + this.logger.error("The subscriptions could not be cancelled: " + procDefId, e); + } + } + +} diff --git a/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java b/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java index 4f7cf77..663c06f 100644 --- a/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java +++ b/src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java @@ -130,33 +130,88 @@ public class MqExecutionService { return this.cancelled(job.getExecutionId()); } + /** + * This method cancels all the executions active on the specified process + * definition. + * + * @param executionId An execution unique identifier. + * @return `true` if execution was cached; `false` otherwise. + */ + public boolean cancel(String executionId) throws Exception { + Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult(); + ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult(); + + return this.cancel(pi.getProcessDefinitionId(), executionId); + } + + /** + * This method cancels all the executions active on the specified process + * definition. + * + * @param processDefinitionId A process definition unique identifier. + * @return A set of execution identifiers that were cancelled. + */ + public Set cancelAll(String processDefinitionId) throws Exception { + ProcessDefinition processDefinition = this.services.getRepositoryService().getProcessDefinition(processDefinitionId); + String processDefinitionKey = processDefinition.getKey(); + + return this.cancelAll(processDefinitionId, processDefinitionKey); + } + /** * @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared. * @return A set of execution identifiers that were in the now cleared map. */ - public synchronized Set clearOtherVersions(String latestProcessDefinitionId) throws Exception { + public synchronized Set cancelAllOtherVersions(String latestProcessDefinitionId) throws Exception { ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId); String processDefinitionKey = latestProcessDefinition.getKey(); Set executionIds = new HashSet<>(); Collection processDefinitionIds = this.processDefinitionKeyMap.get(processDefinitionKey); - for (String processDefinitionId : processDefinitionIds) { - this.processDefinitionKeyMap.removeMapping(processDefinitionKey, processDefinitionId); - - Collection activityIds = this.processDefinitionActivityMap.remove(processDefinitionId); - if (activityIds == null) { - this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId); - return Collections.emptySet(); - } - - for (String activityId : activityIds) { - this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId); - Pair key = this.toKey(processDefinitionId, activityId); - Collection activityExecutionIds = this.activityExecutionMap.remove(key); - if (activityExecutionIds != null) - executionIds.addAll(activityExecutionIds); - } + for (String processDefinitionId : processDefinitionIds) + executionIds.addAll(this.cancelAll(processDefinitionId, processDefinitionKey)); + + return executionIds; + } + + private synchronized boolean cancel(String processDefinitionId, String executionId) throws Exception { + this.logger.trace("Cancelling execution: {}: {}", processDefinitionId, executionId); + + Collection activityIds = this.processDefinitionActivityMap.remove(processDefinitionId); + if (activityIds == null) { + this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId); + return false; + } + + for (String activityId : activityIds) { + this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId); + Pair key = this.toKey(processDefinitionId, activityId); + Collection activityExecutionIds = this.activityExecutionMap.get(key); + if (activityExecutionIds.remove(executionId)) + return true; + } + + return false; + } + + private synchronized Set cancelAll(String processDefinitionId, String processDefinitionKey) throws Exception { + Set executionIds = new HashSet<>(); + + this.processDefinitionKeyMap.removeMapping(processDefinitionKey, processDefinitionId); + + Collection activityIds = this.processDefinitionActivityMap.remove(processDefinitionId); + if (activityIds == null) { + this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId); + return Collections.emptySet(); + } + + for (String activityId : activityIds) { + this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId); + Pair key = this.toKey(processDefinitionId, activityId); + Collection activityExecutionIds = this.activityExecutionMap.remove(key); + if (activityExecutionIds != null) + executionIds.addAll(activityExecutionIds); } return executionIds; diff --git a/src/main/java/com/inteligr8/activiti/mq/MqJobEventListener.java b/src/main/java/com/inteligr8/activiti/mq/MqJobEventListener.java new file mode 100644 index 0000000..0bc294b --- /dev/null +++ b/src/main/java/com/inteligr8/activiti/mq/MqJobEventListener.java @@ -0,0 +1,54 @@ +package com.inteligr8.activiti.mq; + +import org.activiti.engine.delegate.event.ActivitiEventType; +import org.activiti.engine.impl.persistence.entity.Entity; +import org.activiti.engine.impl.persistence.entity.JobEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class MqJobEventListener implements ActivitiEntityEventListener { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + protected MqSubscriptionService subscriptionService; + + @Override + public boolean ofEntityType(Class entityType) { + return JobEntity.class.isAssignableFrom(entityType); + } + + @Override + public boolean ofEntityType(Entity entity) { + return entity instanceof JobEntity; + } + + @Override + public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId, + String executionId, JobEntity entity) { + this.logger.trace("Triggered by job event: {}", entity); + + switch (eventType) { + case ENTITY_DELETED: + case ENTITY_SUSPENDED: + // we need to stop the listener + this.onJobRemoveEvent(entity); + break; + default: + } + } + + protected void onJobRemoveEvent(JobEntity entity) { + this.logger.trace("Triggered by job removal: {}", entity); + try { + this.subscriptionService.cancel(entity.getExecutionId()); + this.logger.debug("Subscription execution ended due to job removal: job: {}: exec: {}", entity.getId(), entity.getExecutionId()); + } catch (Exception e) { + this.logger.error("Subscription execution failed to be canceled after job removal: job: " + entity.getId() + ": exec: " + entity.getExecutionId(), e); + } + } + +} diff --git a/src/main/java/com/inteligr8/activiti/mq/MqProcessDefinitionEventListener.java b/src/main/java/com/inteligr8/activiti/mq/MqProcessDefinitionEventListener.java new file mode 100644 index 0000000..60cb951 --- /dev/null +++ b/src/main/java/com/inteligr8/activiti/mq/MqProcessDefinitionEventListener.java @@ -0,0 +1,116 @@ +package com.inteligr8.activiti.mq; + +import java.util.Set; + +import org.activiti.engine.delegate.event.ActivitiEventType; +import org.activiti.engine.impl.persistence.entity.Entity; +import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class MqProcessDefinitionEventListener implements ActivitiEntityEventListener { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + protected MqSubscribeLooper looper; + + @Autowired + protected ProcessDefinitionRegistry registry; + + @Autowired + protected MqSubscriptionService subscriptionService; + + @Override + public boolean ofEntityType(Class entityType) { + return ProcessDefinitionEntity.class.isAssignableFrom(entityType); + } + + @Override + public boolean ofEntityType(Entity entity) { + return entity instanceof ProcessDefinitionEntity; + } + + @Override + public void onApplicationStartup() { + this.looper.deloopAllMqProcessDefinitions(); + this.looper.loopAllMqProcessDefinitions(); + } + + @Override + public void onApplicationShutdown() { + this.looper.deloopAllMqProcessDefinitions(); + } + + @Override + public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId, + String executionId, ProcessDefinitionEntity entity) { + this.logger.trace("Triggered by process definition event: {}", entity); + + switch (eventType) { + case ENTITY_INITIALIZED: + // we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached + // we must use DeploymentEntity and then dig down for the process definitions + break; + case ENTITY_DELETED: + case ENTITY_SUSPENDED: + // we need to stop the listener + this.onProcessDefinitionRemoveEvent((ProcessDefinitionEntity) entity); + break; + default: + // we need to start a listener + this.onProcessDefinitionAddEvent((ProcessDefinitionEntity) entity); + } + + } + + protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) { + this.logger.debug("Triggered by process definition addition: {}", entity); + + // cancel subscriptions on all legacy version process definitions + this.unsubscribeOtherMqSubscribeTasks(entity.getId()); + + if (this.registry.isMqStart(entity.getId())) + this.looper.loop(entity.getId()); + } + + protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) { + this.logger.debug("Triggered by process definition removal: {}", entity); + + // cancel subscriptions on this process definition + this.unsubscribeMqSubscribeTasks(entity.getId()); + + if (this.registry.isMqStart(entity.getId())) + this.looper.deloop(entity.getId()); + } + + protected void unsubscribeMqSubscribeTasks(String procDefId) { + try { + Set executionIds = this.subscriptionService.cancelAll(procDefId); + if (this.logger.isDebugEnabled()) { + this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds); + } else { + this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size()); + } + } catch (Exception e) { + this.logger.error("The subscriptions could not be cancelled: " + procDefId, e); + } + } + + protected void unsubscribeOtherMqSubscribeTasks(String procDefId) { + try { + Set executionIds = this.subscriptionService.cancelAllOtherVersions(procDefId); + if (this.logger.isDebugEnabled()) { + this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds); + } else { + this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size()); + } + } catch (Exception e) { + this.logger.error("The subscriptions could not be cancelled: " + procDefId, e); + } + } + +} diff --git a/src/main/java/com/inteligr8/activiti/mq/MqSubscribeLooper.java b/src/main/java/com/inteligr8/activiti/mq/MqSubscribeLooper.java new file mode 100644 index 0000000..8bd7da6 --- /dev/null +++ b/src/main/java/com/inteligr8/activiti/mq/MqSubscribeLooper.java @@ -0,0 +1,268 @@ +package com.inteligr8.activiti.mq; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.activiti.bpmn.model.FieldExtension; +import org.activiti.bpmn.model.FlowElement; +import org.activiti.bpmn.model.ServiceTask; +import org.activiti.engine.ProcessEngine; +import org.activiti.engine.delegate.event.ActivitiActivityEvent; +import org.activiti.engine.delegate.event.ActivitiEventType; +import org.activiti.engine.repository.ProcessDefinition; +import org.activiti.engine.repository.ProcessDefinitionQuery; +import org.activiti.engine.runtime.Execution; +import org.activiti.engine.runtime.ExecutionQuery; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.activiti.domain.idm.Tenant; + +@Component +public class MqSubscribeLooper { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + protected ProcessEngine services; + + @Autowired + protected ProcessDefinitionRegistry registry; + + @Autowired + protected TenantFinderService tenantFinderService; + + @Autowired + protected MqSubscriptionService subscriptionService; + + protected Map activeListeners = new HashMap<>(); + + /** + * This method starts the listening of activity completions on all the MQ + * subscribe tasks. This will start the infinite loop where new process + * instances are started every time another process instance receives a + * message. + */ + public void loopAllMqProcessDefinitions() { + String tenantId = this.findTenantId(); + List procDefs = this.findLatestActiveProcessDefinnitions(tenantId); + this.logger.debug("Found {} active process definitions", procDefs.size()); + + for (ProcessDefinition procDef : procDefs) { + this.logger.trace("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId()); + + if (this.registry.isMqStart(procDef.getId())) + this.loop(tenantId, procDef.getId()); + } + } + + /** + * This method starts the listening of activity completions on the MQ + * subscribe task in the specified process definition. This will start the + * infinite loop where new process instances are started every time another + * process instance receives a message. + * + * @param processDefinitionId The unique identifier of the process definition to loop. + */ + public void loop(String processDefinitionId) { + String tenantId = this.findTenantId(); + this.loop(tenantId, processDefinitionId); + } + + private void loop(String tenantId, String processDefinitionId) { + ServiceTask task = this.registry.findMqStartSubscribeTask(processDefinitionId); + if (task == null) + throw new IllegalArgumentException(); + + int concurrency = this.determineConcurrency(task); + this.logger.debug("Process definition MQ subscription task is configured for concurrency: {}: {}", processDefinitionId, concurrency); + + List execs = this.findExecutionsByServiceTask(tenantId, processDefinitionId, task); + this.logger.debug("Process appears to have {} executions waiting on the MQ subscription: {}", execs.size(), processDefinitionId); + + if (execs.size() < concurrency) { + this.logger.info("Process has {} too few executions waiting on the MQ subscription; starting them: {}", (concurrency - execs.size()), processDefinitionId); + this.loop(tenantId, processDefinitionId, task, concurrency, execs); + } + } + + private synchronized void loop(String tenantId, String processDefinitionId, ServiceTask task, int concurrency, List execs) { + String key = processDefinitionId + "|" + task.getId(); + if (this.activeListeners.containsKey(key)) + // one listener, no matter how many instances are subscribed + return; + + AbstractActivityListener listener = new AbstractActivityListener(processDefinitionId, task) { + @Override + public void onEvent(ActivitiActivityEvent event) { + logger.debug("MQ message received; starting another process instance on process '{}' to re-subscribe to the MQ queue", event.getProcessDefinitionId()); + services.getRuntimeService().startProcessInstanceById(event.getProcessDefinitionId()); + } + + @Override + public boolean isFailOnException() { + return true; + } + }; + + // the ServiceTask will then execute, waiting for a message on the queue + // when a message is received, it will be responsible for starting a new process instance + // that new process instance will wait for the next message on the queue + // repeat + this.services.getRuntimeService().addEventListener(listener, ActivitiEventType.ACTIVITY_COMPLETED); + + // register the listener so we can possibly remove it later + this.activeListeners.put(key, listener); + + for (int thread = execs.size(); thread < concurrency; thread++) { + // start a process instance on the process + this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue/topic", processDefinitionId); + this.services.getRuntimeService().startProcessInstanceById(processDefinitionId); + } + } + + /** + * This method stops the listening of activity completions. This will end + * the infinite loop where new process instances are started every time + * another process instance receives a message. + * + * It will also attempt to cancel/close any active MQ subscription. If + * they are already closed, it will be skipped. The `executionId` of + * successfully cancelled subscriptions will be returned. + * + * @return A set of unique identifiers for Activiti executions that were successfully cancelled. + */ + public synchronized Set deloopAllMqProcessDefinitions() { + Set executionIds = new HashSet<>(); + + Iterator> i = this.activeListeners.entrySet().iterator(); + while (i.hasNext()) { + Entry listener = i.next(); + + String[] splitkey = listener.getKey().split("\\|"); + String processDefinitionId = splitkey[0]; + String activityId = splitkey[1]; + + executionIds.addAll(this.deloop(processDefinitionId, activityId, listener.getValue())); + + i.remove(); + } + + return executionIds; + } + + /** + * This method stops the listening of activity completions on the MQ + * subscribe task in the specified process definition. This will end the + * infinite loop where new process instances are started every time another + * process instance receives a message. + * + * @param processDefinitionId The unique identifier of the process definition to loop. + * @return A set of unique identifiers for Activiti executions that were successfully cancelled. + */ + public synchronized Set deloop(String processDefinitionId) { + ServiceTask task = this.registry.findMqStartSubscribeTask(processDefinitionId); + if (task == null) + throw new IllegalArgumentException(); + + String key = processDefinitionId + "|" + task.getId(); + AbstractActivityListener listener = this.activeListeners.remove(key); + return this.deloop(processDefinitionId, task.getId(), listener); + } + + private Set deloop(String processDefinitionId, String activityId, AbstractActivityListener listener) { + if (listener == null) { + this.logger.trace("No MQ subscription listener was found; no listener to stop"); + } else { + this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", processDefinitionId); + this.services.getRuntimeService().removeEventListener(listener); + } + + Set executionIds = new HashSet<>(); + + List executions = this.services.getRuntimeService().createExecutionQuery() + .processDefinitionId(processDefinitionId) + .activityId(activityId) + .list(); + for (Execution execution : executions) { + this.logger.debug("An MQ subscription was found; stopping execution: {}: {}: {}", processDefinitionId, activityId, execution.getId()); + if (this.subscriptionService.cancelled(execution)) { + this.logger.info("An MQ subscription was closed: {}", execution.getId()); + executionIds.add(execution.getId()); + } else { + this.logger.debug("An MQ subscription was already closed: {}", execution.getId()); + } + } + + return executionIds; + } + + protected String findTenantId() { + Tenant tenant = this.tenantFinderService.findTenant(); + return this.tenantFinderService.transform(tenant); + } + + protected List findLatestActiveProcessDefinnitions(String tenantId) { + ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery() + .latestVersion() + .active(); + if (tenantId == null) { + procDefQuery.processDefinitionWithoutTenantId(); + } else { + procDefQuery.processDefinitionTenantId(tenantId); + } + + return procDefQuery.list(); + } + + protected List findExecutionsByServiceTask(String tenantId, String processDefinitionId, ServiceTask task) { + ExecutionQuery execQuery = this.services.getRuntimeService().createExecutionQuery() + .processDefinitionId(processDefinitionId) + .activityId(task.getId()); + if (tenantId == null) { + execQuery.executionWithoutTenantId(); + } else { + execQuery.executionTenantId(tenantId); + } + + return execQuery.list(); + } + + protected Integer findConcurrency(ServiceTask task) { + for (FieldExtension fieldext : task.getFieldExtensions()) { + if (fieldext.getFieldName().equals(Constants.FIELD_CONCURRENCY)) { + String concurrencyStr = StringUtils.trimToNull(fieldext.getStringValue()); + return concurrencyStr == null ? null : Integer.valueOf(concurrencyStr); + } + } + + return null; + } + + protected int determineConcurrency(ServiceTask task) { + Integer concurrency = this.findConcurrency(task); + if (concurrency == null) { + return 1; + } else if (concurrency.intValue() < 1) { + this.logger.warn("The task defines an illegal concurrency of {}; using 1: {}", concurrency, task.getId()); + return 1; + } else { + return concurrency.intValue(); + } + } + + protected ServiceTask castToServiceTask(FlowElement flowElement) { + if (!(flowElement instanceof ServiceTask)) + return null; + return (ServiceTask) flowElement; + } + +} diff --git a/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java b/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java index 060c2ca..8328a34 100644 --- a/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java +++ b/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java @@ -63,19 +63,66 @@ public class MqSubscriptionService extends MqExecutionService { } } + /** + * This method cancel the MQ subscription on the specified execution. + * + * @param executionId An execution unique identifier. + * @return `true` if execution and MQ subscription were ended; `false` otherwise. + */ + @Override + public synchronized boolean cancel(String executionId) throws Exception { + if (!super.cancel(executionId)) + return false; + + this.logger.trace("Removing MQ subscription execution: {}", executionId); + AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId); + if (consumer != null) { + this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}", executionId, consumer); + consumer.close(); + } + + return true; + } + + /** + * This method cancels all the MQ subscriptions active on the specified + * process definition. + * + * @param processDefinitionId A process definition version unique identifier. + * @return A set of execution identifiers subscribed to MQ that are now cancelled; all subscriptions now ended. + */ + @Override + public synchronized Set cancelAll(String processDefinitionId) throws Exception { + Set executionIds = super.cancelAll(processDefinitionId); + + ProcessDefinition processDefinition = this.services.getRepositoryService().getProcessDefinition(processDefinitionId); + String processDefinitionKey = processDefinition.getKey(); + + for (String executionId : executionIds) { + this.logger.trace("Removing MQ subscription execution: {}: {}", processDefinitionId, executionId); + AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId); + if (consumer != null) { + this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer); + consumer.close(); + } + } + + return executionIds; + } + /** * @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared. * @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended. */ @Override - public synchronized Set clearOtherVersions(String latestProcessDefinitionId) throws Exception { - Set executionIds = super.clearOtherVersions(latestProcessDefinitionId); + public synchronized Set cancelAllOtherVersions(String latestProcessDefinitionId) throws Exception { + Set executionIds = super.cancelAllOtherVersions(latestProcessDefinitionId); ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId); String processDefinitionKey = latestProcessDefinition.getKey(); for (String executionId : executionIds) { - this.logger.trace("Clearing process definition execution: {}: {}", latestProcessDefinitionId, executionId); + this.logger.trace("Removing MQ subscription execution: {}: {}", latestProcessDefinitionId, executionId); AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId); if (consumer != null) { this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer); diff --git a/src/main/java/com/inteligr8/activiti/mq/ProcessDefinitionRegistry.java b/src/main/java/com/inteligr8/activiti/mq/ProcessDefinitionRegistry.java new file mode 100644 index 0000000..a077986 --- /dev/null +++ b/src/main/java/com/inteligr8/activiti/mq/ProcessDefinitionRegistry.java @@ -0,0 +1,138 @@ +package com.inteligr8.activiti.mq; + +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.activiti.bpmn.model.BpmnModel; +import org.activiti.bpmn.model.FlowElement; +import org.activiti.bpmn.model.SequenceFlow; +import org.activiti.bpmn.model.ServiceTask; +import org.activiti.bpmn.model.StartEvent; +import org.activiti.engine.ProcessEngine; +import org.activiti.engine.delegate.JavaDelegate; +import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior; +import org.activiti.engine.impl.context.Context; +import org.activiti.engine.impl.util.ProcessDefinitionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +@Component +public class ProcessDefinitionRegistry { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Pattern expressionPattern = Pattern.compile("\\$\\{(.+)\\}"); + + @Autowired + protected ProcessEngine services; + + @Autowired + protected ApplicationContext context; + + private Map processDefinitionMqSubscribeTasks; + + public synchronized boolean isMqStart(String processDefinitionId) { + if (this.processDefinitionMqSubscribeTasks.containsKey(processDefinitionId)) { + return this.processDefinitionMqSubscribeTasks.get(processDefinitionId) != null; + } else { + // not yet cached; cache it; then return + return this.findMqStartSubscribeTask(processDefinitionId) != null; + } + } + + public synchronized ServiceTask findMqStartSubscribeTask(String processDefinitionId) { + ServiceTask task = this.processDefinitionMqSubscribeTasks.get(processDefinitionId); + if (task != null) + return task; + + BpmnModel model = this.getBpmnModel(processDefinitionId); + StartEvent startElement = this.findMqPlainStartEvent(processDefinitionId, model); + if (startElement == null) + return null; + + List flows = startElement.getOutgoingFlows(); + this.logger.trace("Found {} outgoing flows in the process start element: {}: {}", flows.size(), processDefinitionId, startElement.getId()); + if (flows.isEmpty()) { + this.logger.warn("A start event is expected to always have an outgoing flow; skipping process: {}", processDefinitionId); + return null; + } else if (flows.size() > 1) { + this.logger.debug("A start event with multiple outgoing flows cannot have an MQ subscription; skipping process: {}", processDefinitionId); + return null; + } + + SequenceFlow flow = flows.iterator().next(); + FlowElement targetFlowElement = flow.getTargetFlowElement(); + if (targetFlowElement == null) { + this.logger.warn("A start event outgoing flow is expected to always have a target element; skipping process: {}", processDefinitionId); + return null; + } + this.logger.trace("Found element '{}' after start event '{}' in process '{}'", targetFlowElement.getId(), startElement.getId(), processDefinitionId); + + if (!(targetFlowElement instanceof ServiceTask)) { + this.logger.trace("A non-service task immediately after start event; skipping process: {}: {}: {}", processDefinitionId, targetFlowElement.getId(), targetFlowElement.getName()); + return null; + } + + task = (ServiceTask) targetFlowElement; + if (!"delegateExpression".equals(task.getImplementationType())) { + this.logger.trace("A non-delegate service task immediately after start event; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName()); + return null; + } + this.logger.trace("Found service task immediately after start event: {}: {}", task.getImplementationType(), task.getImplementation()); + + Matcher matcher = this.expressionPattern.matcher(task.getImplementation()); + if (!matcher.find()) { + this.logger.warn("A service task delegate expression is in an unexpected format: {}; skipping process: {}: {}: {}", task.getImplementation(), processDefinitionId, task.getId(), task.getName()); + return null; + } + + String beanId = matcher.group(1).trim(); + this.logger.trace("Looking up bean: {}", beanId); + + JavaDelegate delegateBean = this.context.getBean(beanId, JavaDelegate.class); + this.logger.trace("Found bean: {}: {}", beanId, delegateBean); + if (delegateBean == null) { + this.logger.trace("The service task delegate has no bean; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName()); + return null; + } else if (!(delegateBean instanceof MqSubscribeDelegate)) { + this.logger.trace("The service task delegate is not an MQ subscription; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName()); + return null; + } + + this.processDefinitionMqSubscribeTasks.put(processDefinitionId, task); + this.logger.info("Process starts with an MQ subscription: {}: {}", processDefinitionId, task.getId(), task.getName()); + return task; + } + + private BpmnModel getBpmnModel(String processDefId) { + if (Context.getProcessDefinitionHelper() == null) { + // typically during initial startup + return this.services.getRepositoryService().getBpmnModel(processDefId); + } else { + // we cannot use the RepositoryService to get the BpmnModel as it is not yet cached in TX + return ProcessDefinitionUtil.getBpmnModel(processDefId); + } + } + + protected StartEvent findMqPlainStartEvent(String processDefId, BpmnModel model) { + this.logger.trace("Finding all start elements in process: {}", processDefId); + List startElements = model.getMainProcess().findFlowElementsOfType(StartEvent.class); + this.logger.trace("Found {} start elements in process: {}", startElements.size(), processDefId); + for (StartEvent startElement : startElements) { + // constrain to just 0..1 plain start events; ignore timer, signal, message, and error start events + if (!(startElement.getBehavior() instanceof NoneStartEventActivityBehavior)) { + this.logger.trace("Found non-plain start event; ignoring start event: {}: {}: {}", processDefId, startElement.getId(), startElement.getBehavior()); + continue; + } + + return startElement; + } + + return null; + } + +}