Merge branch 'develop' into stable

This commit is contained in:
2025-02-21 09:36:12 -05:00
11 changed files with 966 additions and 493 deletions

View File

@@ -0,0 +1,69 @@
package com.inteligr8.activiti.mq;
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.persistence.entity.Entity;
public interface ActivitiEntityEventListener<T extends Entity> {
/**
* This method checks to see if this listener is for the specified entity.
*
* @param entity An Activiti entity.
* @return `true` if this listener supports the entity; `false` otherwise.
*/
default boolean ofEntityType(Entity entity) {
return entity == null ? false : this.ofEntityType(entity.getClass());
}
/**
* This method checks to see if this listener is for the specified entity
* class.
*
* @param entityType An Activiti entity class.
* @return `true` if this listener supports the entity type; `false` otherwise.
*/
boolean ofEntityType(Class<? extends Entity> entityType);
/**
* This method allows for initialization on application startup. Any
* resources this listener should subscribe to, should be connected or
* opened.
*/
default void onApplicationStartup() {
}
/**
* This method allows for uninitialization on application shutdown. Any
* resources this listener subscribes to should be disconnected or closed.
*
* The `ActivitiEntityEventListener#onEvent()` method may still be called
* before startup or after shutdown.
*/
default void onApplicationShutdown() {
}
/**
* This method is fired every time there is a qualifying entity event.
*
* @param aaevent An Activiti entity event.
*/
default void onEntityEvent(ActivitiEntityEvent aaevent) {
@SuppressWarnings("unchecked")
T entity = (T) aaevent.getEntity();
this.onEntityEvent(aaevent.getType(), aaevent.getProcessDefinitionId(), aaevent.getProcessInstanceId(), aaevent.getExecutionId(), entity);
}
/**
* Thi smethod is fired every time there is a qualifying entity event.
*
* @param eventType An Activiti event type; limited to `ENTITY_*`.
* @param processDefinitionId The unique identifier of the process definition subject to the event.
* @param processInstanceId The unique identifier of the process instance subject to the event.
* @param executionId The unique identifier of the execution subject to the event.
* @param entity The Activiti entity.
*/
void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId, String executionId, T entity);
}

View File

@@ -0,0 +1,111 @@
package com.inteligr8.activiti.mq;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.delegate.event.ActivitiEventListener;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.persistence.entity.Entity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.stereotype.Component;
@Component
public class ActivitiEntityEventMonitor implements ActivitiEventListener, ApplicationListener<ApplicationContextEvent> {
private final Logger logger = LoggerFactory.getLogger(ActivitiEntityEventMonitor.class);
@Autowired
private List<ActivitiEntityEventListener<? extends Entity>> listeners;
@Autowired
private ProcessEngine services;
/**
* This method is fired by Spring, the framework behind Activiti. This
* forwards the application event to each listener so they can initialize
* or uninitialize.
*/
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent || event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent) {
this.logger.debug("Application context refresh/stop/close detected; shutting down listeners: {}", event);
for (ActivitiEntityEventListener<? extends Entity> listener : this.listeners) {
listener.onApplicationShutdown();
}
}
// the listener cannot be active until the context is initialized
if (event instanceof ContextRefreshedEvent || event instanceof ContextStartedEvent) {
this.logger.debug("Application context refresh/start detected; starting up listeners: {}", event);
for (ActivitiEntityEventListener<? extends Entity> listener : this.listeners) {
listener.onApplicationStartup();
}
}
}
/**
* This method will start monitoring for changes in deployment, app, and
* process definitions.
*/
@PostConstruct
protected void init() {
this.logger.debug("Bean initialized; starting to listen for any entity event ...");
this.services.getRuntimeService().addEventListener(this,
ActivitiEventType.ENTITY_CREATED,
ActivitiEventType.ENTITY_INITIALIZED,
ActivitiEventType.ENTITY_UPDATED,
ActivitiEventType.ENTITY_ACTIVATED,
ActivitiEventType.ENTITY_SUSPENDED,
ActivitiEventType.ENTITY_DELETED);
}
/**
* This method will stop monitoring for changes in deployment, app, and
* process definitions.
*/
@PreDestroy
protected void uninit() {
this.logger.debug("Bean uninitialized; stopping listener for any entity event ...");
this.services.getRuntimeService().removeEventListener(this);
}
/**
* This method is fired by the Activiti platform. It is called on every
* entity event.
*
* @param event An Activiti event.
*/
@Override
public void onEvent(ActivitiEvent event) {
this.logger.trace("Triggered by event: {}", event);
for (ActivitiEntityEventListener<? extends Entity> listener : this.listeners) {
ActivitiEntityEvent aaevent = (ActivitiEntityEvent) event;
if (listener.ofEntityType((Entity) aaevent.getEntity()))
listener.onEntityEvent(aaevent);
}
}
@Override
public boolean isFailOnException() {
return true;
}
}

View File

@@ -1,473 +0,0 @@
package com.inteligr8.activiti.mq;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.activiti.bpmn.model.BpmnModel;
import org.activiti.bpmn.model.FieldExtension;
import org.activiti.bpmn.model.FlowElement;
import org.activiti.bpmn.model.SequenceFlow;
import org.activiti.bpmn.model.ServiceTask;
import org.activiti.bpmn.model.StartEvent;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.JavaDelegate;
import org.activiti.engine.delegate.event.ActivitiActivityEvent;
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.delegate.event.ActivitiEventListener;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.repository.ProcessDefinitionQuery;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.ExecutionQuery;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.stereotype.Component;
import com.activiti.domain.idm.Tenant;
@Component
public class MQProcessDefinitionMonitor implements ActivitiEventListener, ApplicationListener<ApplicationContextEvent> {
private final Logger logger = LoggerFactory.getLogger(MQProcessDefinitionMonitor.class);
private final Pattern expressionPattern = Pattern.compile("\\$\\{(.+)\\}");
@Autowired
private ProcessEngine services;
@Autowired
private ApplicationContext context;
@Autowired
private TenantFinderService tenantFinderService;
@Autowired
private MqSubscriptionService subscriptionService;
private Map<String, AbstractActivityListener> activeListeners = new HashMap<>();
/**
* This method will add/remove listeners to the specific process
* definitions that use MQ subscriptions to start process instances at
* application startup and shutdown.
*/
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent || event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent) {
this.logger.debug("Discovered {} active process definitions to stop listening to", this.activeListeners.size());
this.deloopMqSubscribeTasks();
}
if (event instanceof ContextRefreshedEvent || event instanceof ContextStartedEvent) {
String tenantId = this.findTenantId();
List<ProcessDefinition> procDefs = this.findLatestActiveProcessDefinnitions(tenantId);
this.logger.debug("Found {} active process definitions", procDefs.size());
for (ProcessDefinition procDef : procDefs) {
this.logger.trace("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId());
ServiceTask task = this.findMqStartSubscribeTask(procDef.getId());
if (task == null)
continue;
int concurrency = this.determineConcurrency(task);
this.logger.debug("Process definition MQ subscription is configured for concurrency: {}: {}", procDef.getId(), concurrency);
List<Execution> execs = this.findExecutionsByServiceTask(tenantId, procDef.getId(), task);
this.logger.debug("Process appears to have {} executions waiting on the MQ subscription: {}", execs.size(), procDef.getId());
if (execs.size() < concurrency) {
this.logger.info("Process has {} too few executions waiting on the MQ subscription; starting them: {}", (concurrency - execs.size()), procDef.getId());
for (int thread = execs.size(); thread < concurrency; thread++) {
this.loopMqSubscribeTask(procDef.getId(), task);
}
}
}
}
}
protected String findTenantId() {
Tenant tenant = this.tenantFinderService.findTenant();
return this.tenantFinderService.transform(tenant);
}
protected List<ProcessDefinition> findLatestActiveProcessDefinnitions(String tenantId) {
ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery()
.latestVersion()
.active();
if (tenantId == null) {
procDefQuery.processDefinitionWithoutTenantId();
} else {
procDefQuery.processDefinitionTenantId(tenantId);
}
return procDefQuery.list();
}
protected List<Execution> findExecutionsByServiceTask(String tenantId, String processDefinitionId, ServiceTask task) {
ExecutionQuery execQuery = this.services.getRuntimeService().createExecutionQuery()
.processDefinitionId(processDefinitionId)
.activityId(task.getId());
if (tenantId == null) {
execQuery.executionWithoutTenantId();
} else {
execQuery.executionTenantId(tenantId);
}
return execQuery.list();
}
protected Integer findConcurrency(ServiceTask task) {
for (FieldExtension fieldext : task.getFieldExtensions()) {
if (fieldext.getFieldName().equals(Constants.FIELD_CONCURRENCY)) {
String concurrencyStr = StringUtils.trimToNull(fieldext.getStringValue());
return concurrencyStr == null ? null : Integer.valueOf(concurrencyStr);
}
}
return null;
}
protected int determineConcurrency(ServiceTask task) {
Integer concurrency = this.findConcurrency(task);
if (concurrency == null) {
return 1;
} else if (concurrency.intValue() < 1) {
this.logger.warn("The task defines an illegal concurrency of {}; using 1: {}", concurrency, task.getId());
return 1;
} else {
return concurrency.intValue();
}
}
/**
* This method will start monitoring for changes in deployment, app, and
* process definitions.
*/
@PostConstruct
protected void init() {
this.services.getRuntimeService().addEventListener(this,
ActivitiEventType.ENTITY_INITIALIZED,
ActivitiEventType.ENTITY_UPDATED,
ActivitiEventType.ENTITY_ACTIVATED,
ActivitiEventType.ENTITY_SUSPENDED,
ActivitiEventType.ENTITY_DELETED);
this.logger.debug("Started listening for entity events");
}
/**
* This method will stop monitoring for changes in deployment, app, and
* process definitions.
*/
@PreDestroy
protected void uninit() {
this.logger.debug("Stopping listening for entity events ...");
this.services.getRuntimeService().removeEventListener(this);
}
/**
* This method is fired by the Activiti platform. It is called on every
* entity event except the creation event. You can see that in the init()
* method above.
*/
@Override
public void onEvent(ActivitiEvent event) {
this.logger.trace("Triggered by event: {}", event);
this.onEntityEvent((ActivitiEntityEvent) event);
}
@Override
public boolean isFailOnException() {
return true;
}
protected void onEntityEvent(ActivitiEntityEvent aaevent) {
this.logger.trace("Triggered by entity: {}", aaevent.getEntity());
if (aaevent.getEntity() instanceof ProcessDefinitionEntity) {
this.logger.debug("Triggered by process definition state change: {}", aaevent.getEntity());
switch (aaevent.getType()) {
case ENTITY_INITIALIZED:
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
// we must use DeploymentEntity and then dig down for the process definitions
break;
case ENTITY_DELETED:
case ENTITY_SUSPENDED:
// we need to stop the listener
this.onProcessDefinitionRemoveEvent((ProcessDefinitionEntity) aaevent.getEntity());
break;
default:
// we need to start a listener
this.onProcessDefinitionAddEvent((ProcessDefinitionEntity) aaevent.getEntity());
}
// we only want to deal with ProcessDefinition entities
} else if (aaevent.getEntity() instanceof DeploymentEntity) {
switch (aaevent.getType()) {
case ENTITY_INITIALIZED:
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
// we must use DeploymentEntity and then dig down for the process definitions
this.onDeploymentAddEvent((DeploymentEntity) aaevent.getEntity());
break;
default:
}
// we only want to deal with ProcessDefinition entities
} else if (aaevent.getEntity() instanceof JobEntity) {
this.logger.trace("Triggered by job state change: {}", aaevent.getEntity());
switch (aaevent.getType()) {
case ENTITY_DELETED:
case ENTITY_SUSPENDED:
// we need to stop the listener
this.onJobRemoveEvent((JobEntity) aaevent.getEntity());
break;
default:
}
}
}
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition addition: {}", entity);
this.unsubscribeOtherMqSubscribeTasks(entity);
ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
if (task == null)
return;
this.loopMqSubscribeTask(entity.getId(), task);
}
protected void onDeploymentAddEvent(DeploymentEntity entity) {
this.logger.debug("Triggered by deployment addition: {}", entity);
List<ProcessDefinitionEntity> procDefEntities = entity.getDeployedArtifacts(ProcessDefinitionEntity.class);
if (procDefEntities == null)
return;
this.logger.debug("Found {} process definitions in deployment: {}", procDefEntities.size(), entity.getId());
for (ProcessDefinitionEntity procDefEntity : procDefEntities) {
this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName());
this.unsubscribeOtherMqSubscribeTasks(procDefEntity);
ServiceTask task = this.findMqStartSubscribeTask(procDefEntity.getId());
if (task == null)
return;
this.loopMqSubscribeTask(procDefEntity.getId(), task);
}
}
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition removal: {}", entity);
this.unsubscribeOtherMqSubscribeTasks(entity);
ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
if (task == null)
return;
this.deloopMqSubscribeTask(entity.getId());
}
protected void onJobRemoveEvent(JobEntity entity) {
this.logger.trace("Triggered by job removal: {}", entity);
this.subscriptionService.cancelled(entity.getExecutionId());
this.logger.debug("Subscription execution ended due to job removal: job: {}: exec: {}", entity.getId(), entity.getExecutionId());
}
protected void unsubscribeOtherMqSubscribeTasks(ProcessDefinitionEntity procDef) {
this.unsubscribeOtherMqSubscribeTasks(procDef.getId());
}
protected void unsubscribeOtherMqSubscribeTasks(String procDefId) {
try {
Set<String> executionIds = this.subscriptionService.clearOtherVersions(procDefId);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
} else {
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
}
} catch (Exception e) {
this.logger.error("The subscriptions could not be cleared: " + procDefId, e);
}
}
protected synchronized void loopMqSubscribeTask(String processDefId, ServiceTask task) {
// start a process instance on the process
this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue", processDefId);
this.services.getRuntimeService().startProcessInstanceById(processDefId);
if (this.activeListeners.containsKey(processDefId))
// one listener, no matter how many instances are subscribed
return;
AbstractActivityListener listener = new AbstractActivityListener(processDefId, task) {
@Override
public void onEvent(ActivitiActivityEvent event) {
logger.debug("MQ message received; starting another process instance on process '{}' to re-subscribe to the MQ queue", event.getProcessDefinitionId());
services.getRuntimeService().startProcessInstanceById(processDefId);
}
@Override
public boolean isFailOnException() {
return true;
}
};
// the ServiceTask will then execute, waiting for a message on the queue
// when a message is received, it will be responsible for starting a new process instance
// that new process instance will wait for the next message on the queue
// repeat
this.services.getRuntimeService().addEventListener(listener, ActivitiEventType.ACTIVITY_COMPLETED);
// register the listener so we can possibly remove it later
this.activeListeners.put(processDefId, listener);
}
protected synchronized int deloopMqSubscribeTasks() {
int count = 0;
Iterator<Entry<String, AbstractActivityListener>> i = this.activeListeners.entrySet().iterator();
while (i.hasNext()) {
Entry<String, AbstractActivityListener> listener = i.next();
this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", listener.getKey());
this.services.getRuntimeService().removeEventListener(listener.getValue());
i.remove();
count++;
}
return count;
}
protected synchronized boolean deloopMqSubscribeTask(String processDefId) {
AbstractActivityListener listener = this.activeListeners.remove(processDefId);
if (listener == null) {
this.logger.trace("No MQ subscription listener found; no listener to stop");
return false;
}
this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", processDefId);
this.services.getRuntimeService().removeEventListener(listener);
return true;
}
private BpmnModel getBpmnModel(String processDefId) {
if (Context.getProcessDefinitionHelper() == null) {
// typically during initial startup
return this.services.getRepositoryService().getBpmnModel(processDefId);
} else {
// we cannot use the RepositoryService to get the BpmnModel as it is not yet cached in TX
return ProcessDefinitionUtil.getBpmnModel(processDefId);
}
}
protected ServiceTask findMqStartSubscribeTask(String processDefId) {
BpmnModel model = this.getBpmnModel(processDefId);
StartEvent startElement = this.findMqPlainStartEvent(processDefId, model);
if (startElement == null)
return null;
List<SequenceFlow> flows = startElement.getOutgoingFlows();
this.logger.trace("Found {} outgoing flows in the process start element: {}: {}", flows.size(), processDefId, startElement.getId());
if (flows.isEmpty()) {
this.logger.warn("A start event is expected to always have an outgoing flow; skipping process: {}", processDefId);
return null;
} else if (flows.size() > 1) {
this.logger.debug("A start event with multiple outgoing flows cannot have an MQ subscription; skipping process: {}", processDefId);
return null;
}
SequenceFlow flow = flows.iterator().next();
FlowElement targetFlowElement = flow.getTargetFlowElement();
if (targetFlowElement == null) {
this.logger.warn("A start event outgoing flow is expected to always have a target element; skipping process: {}", processDefId);
return null;
}
this.logger.trace("Found element '{}' after start event '{}' in process '{}'", targetFlowElement.getId(), startElement.getId(), processDefId);
if (!(targetFlowElement instanceof ServiceTask)) {
this.logger.trace("A non-service task immediately after start event; skipping process: {}: {}: {}", processDefId, targetFlowElement.getId(), targetFlowElement.getName());
return null;
}
ServiceTask task = (ServiceTask) targetFlowElement;
if (!"delegateExpression".equals(task.getImplementationType())) {
this.logger.trace("A non-delegate service task immediately after start event; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
return null;
}
this.logger.trace("Found service task immediately after start event: {}: {}", task.getImplementationType(), task.getImplementation());
Matcher matcher = this.expressionPattern.matcher(task.getImplementation());
if (!matcher.find()) {
this.logger.warn("A service task delegate expression is in an unexpected format: {}; skipping process: {}: {}: {}", task.getImplementation(), processDefId, task.getId(), task.getName());
return null;
}
String beanId = matcher.group(1).trim();
this.logger.trace("Looking up bean: {}", beanId);
JavaDelegate delegateBean = this.context.getBean(beanId, JavaDelegate.class);
this.logger.trace("Found bean: {}: {}", beanId, delegateBean);
if (delegateBean == null) {
this.logger.trace("The service task delegate has no bean; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
return null;
} else if (!(delegateBean instanceof MqSubscribeDelegate)) {
this.logger.trace("The service task delegate is not an MQ subscription; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
return null;
}
this.logger.info("Process starts with an MQ subscription: {}: {}", processDefId, task.getId(), task.getName());
return task;
}
protected StartEvent findMqPlainStartEvent(String processDefId, BpmnModel model) {
this.logger.trace("Finding all start elements in process: {}", processDefId);
List<StartEvent> startElements = model.getMainProcess().findFlowElementsOfType(StartEvent.class);
this.logger.trace("Found {} start elements in process: {}", startElements.size(), processDefId);
for (StartEvent startElement : startElements) {
// constrain to just 0..1 plain start events; ignore timer, signal, message, and error start events
if (!(startElement.getBehavior() instanceof NoneStartEventActivityBehavior)) {
this.logger.trace("Found non-plain start event; ignoring start event: {}: {}: {}", processDefId, startElement.getId(), startElement.getBehavior());
continue;
}
return startElement;
}
return null;
}
protected ServiceTask castToServiceTask(FlowElement flowElement) {
if (!(flowElement instanceof ServiceTask))
return null;
return (ServiceTask) flowElement;
}
}

View File

@@ -0,0 +1,85 @@
package com.inteligr8.activiti.mq;
import java.util.List;
import java.util.Set;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
import org.activiti.engine.impl.persistence.entity.Entity;
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqDeploymentEventListener implements ActivitiEntityEventListener<DeploymentEntity> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
protected MqSubscribeLooper looper;
@Autowired
protected ProcessDefinitionRegistry registry;
@Autowired
private MqSubscriptionService subscriptionService;
@Override
public boolean ofEntityType(Class<? extends Entity> entityType) {
return DeploymentEntity.class.isAssignableFrom(entityType);
}
@Override
public boolean ofEntityType(Entity entity) {
return entity instanceof DeploymentEntity;
}
@Override
public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId,
String executionId, DeploymentEntity entity) {
this.logger.trace("Triggered by deployment event: {}", entity);
switch (eventType) {
case ENTITY_INITIALIZED:
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
// we must use DeploymentEntity and then dig down for the process definitions
this.onDeploymentAddEvent((DeploymentEntity) entity);
break;
default:
}
}
protected void onDeploymentAddEvent(DeploymentEntity entity) {
this.logger.debug("Triggered by deployment addition: {}", entity);
List<ProcessDefinitionEntity> procDefEntities = entity.getDeployedArtifacts(ProcessDefinitionEntity.class);
if (procDefEntities == null)
return;
this.logger.debug("Found {} process definitions in deployment: {}", procDefEntities.size(), entity.getId());
for (ProcessDefinitionEntity procDefEntity : procDefEntities) {
this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName());
this.unsubscribeOtherMqSubscribeTasks(procDefEntity.getId());
if (this.registry.isMqStart(procDefEntity.getId()))
this.looper.loop(procDefEntity.getId());
}
}
protected void unsubscribeOtherMqSubscribeTasks(String procDefId) {
try {
Set<String> executionIds = this.subscriptionService.cancelAllOtherVersions(procDefId);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
} else {
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
}
} catch (Exception e) {
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
}
}
}

View File

@@ -130,33 +130,88 @@ public class MqExecutionService {
return this.cancelled(job.getExecutionId()); return this.cancelled(job.getExecutionId());
} }
/**
* This method cancels all the executions active on the specified process
* definition.
*
* @param executionId An execution unique identifier.
* @return `true` if execution was cached; `false` otherwise.
*/
public boolean cancel(String executionId) throws Exception {
Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult();
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
return this.cancel(pi.getProcessDefinitionId(), executionId);
}
/**
* This method cancels all the executions active on the specified process
* definition.
*
* @param processDefinitionId A process definition unique identifier.
* @return A set of execution identifiers that were cancelled.
*/
public Set<String> cancelAll(String processDefinitionId) throws Exception {
ProcessDefinition processDefinition = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
String processDefinitionKey = processDefinition.getKey();
return this.cancelAll(processDefinitionId, processDefinitionKey);
}
/** /**
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared. * @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
* @return A set of execution identifiers that were in the now cleared map. * @return A set of execution identifiers that were in the now cleared map.
*/ */
public synchronized Set<String> clearOtherVersions(String latestProcessDefinitionId) throws Exception { public synchronized Set<String> cancelAllOtherVersions(String latestProcessDefinitionId) throws Exception {
ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId); ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId);
String processDefinitionKey = latestProcessDefinition.getKey(); String processDefinitionKey = latestProcessDefinition.getKey();
Set<String> executionIds = new HashSet<>(); Set<String> executionIds = new HashSet<>();
Collection<String> processDefinitionIds = this.processDefinitionKeyMap.get(processDefinitionKey); Collection<String> processDefinitionIds = this.processDefinitionKeyMap.get(processDefinitionKey);
for (String processDefinitionId : processDefinitionIds) { for (String processDefinitionId : processDefinitionIds)
this.processDefinitionKeyMap.removeMapping(processDefinitionKey, processDefinitionId); executionIds.addAll(this.cancelAll(processDefinitionId, processDefinitionKey));
Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId); return executionIds;
if (activityIds == null) { }
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
return Collections.emptySet(); private synchronized boolean cancel(String processDefinitionId, String executionId) throws Exception {
} this.logger.trace("Cancelling execution: {}: {}", processDefinitionId, executionId);
for (String activityId : activityIds) { Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId);
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId); if (activityIds == null) {
Pair<String, String> key = this.toKey(processDefinitionId, activityId); this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
Collection<String> activityExecutionIds = this.activityExecutionMap.remove(key); return false;
if (activityExecutionIds != null) }
executionIds.addAll(activityExecutionIds);
} for (String activityId : activityIds) {
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
Collection<String> activityExecutionIds = this.activityExecutionMap.get(key);
if (activityExecutionIds.remove(executionId))
return true;
}
return false;
}
private synchronized Set<String> cancelAll(String processDefinitionId, String processDefinitionKey) throws Exception {
Set<String> executionIds = new HashSet<>();
this.processDefinitionKeyMap.removeMapping(processDefinitionKey, processDefinitionId);
Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId);
if (activityIds == null) {
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
return Collections.emptySet();
}
for (String activityId : activityIds) {
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
Collection<String> activityExecutionIds = this.activityExecutionMap.remove(key);
if (activityExecutionIds != null)
executionIds.addAll(activityExecutionIds);
} }
return executionIds; return executionIds;

View File

@@ -0,0 +1,54 @@
package com.inteligr8.activiti.mq;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.persistence.entity.Entity;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqJobEventListener implements ActivitiEntityEventListener<JobEntity> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
protected MqSubscriptionService subscriptionService;
@Override
public boolean ofEntityType(Class<? extends Entity> entityType) {
return JobEntity.class.isAssignableFrom(entityType);
}
@Override
public boolean ofEntityType(Entity entity) {
return entity instanceof JobEntity;
}
@Override
public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId,
String executionId, JobEntity entity) {
this.logger.trace("Triggered by job event: {}", entity);
switch (eventType) {
case ENTITY_DELETED:
case ENTITY_SUSPENDED:
// we need to stop the listener
this.onJobRemoveEvent(entity);
break;
default:
}
}
protected void onJobRemoveEvent(JobEntity entity) {
this.logger.trace("Triggered by job removal: {}", entity);
try {
this.subscriptionService.cancel(entity.getExecutionId());
this.logger.debug("Subscription execution ended due to job removal: job: {}: exec: {}", entity.getId(), entity.getExecutionId());
} catch (Exception e) {
this.logger.error("Subscription execution failed to be canceled after job removal: job: " + entity.getId() + ": exec: " + entity.getExecutionId(), e);
}
}
}

View File

@@ -0,0 +1,116 @@
package com.inteligr8.activiti.mq;
import java.util.Set;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.persistence.entity.Entity;
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqProcessDefinitionEventListener implements ActivitiEntityEventListener<ProcessDefinitionEntity> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
protected MqSubscribeLooper looper;
@Autowired
protected ProcessDefinitionRegistry registry;
@Autowired
protected MqSubscriptionService subscriptionService;
@Override
public boolean ofEntityType(Class<? extends Entity> entityType) {
return ProcessDefinitionEntity.class.isAssignableFrom(entityType);
}
@Override
public boolean ofEntityType(Entity entity) {
return entity instanceof ProcessDefinitionEntity;
}
@Override
public void onApplicationStartup() {
this.looper.deloopAllMqProcessDefinitions();
this.looper.loopAllMqProcessDefinitions();
}
@Override
public void onApplicationShutdown() {
this.looper.deloopAllMqProcessDefinitions();
}
@Override
public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId,
String executionId, ProcessDefinitionEntity entity) {
this.logger.trace("Triggered by process definition event: {}", entity);
switch (eventType) {
case ENTITY_INITIALIZED:
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
// we must use DeploymentEntity and then dig down for the process definitions
break;
case ENTITY_DELETED:
case ENTITY_SUSPENDED:
// we need to stop the listener
this.onProcessDefinitionRemoveEvent((ProcessDefinitionEntity) entity);
break;
default:
// we need to start a listener
this.onProcessDefinitionAddEvent((ProcessDefinitionEntity) entity);
}
}
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition addition: {}", entity);
// cancel subscriptions on all legacy version process definitions
this.unsubscribeOtherMqSubscribeTasks(entity.getId());
if (this.registry.isMqStart(entity.getId()))
this.looper.loop(entity.getId());
}
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition removal: {}", entity);
// cancel subscriptions on this process definition
this.unsubscribeMqSubscribeTasks(entity.getId());
if (this.registry.isMqStart(entity.getId()))
this.looper.deloop(entity.getId());
}
protected void unsubscribeMqSubscribeTasks(String procDefId) {
try {
Set<String> executionIds = this.subscriptionService.cancelAll(procDefId);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
} else {
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
}
} catch (Exception e) {
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
}
}
protected void unsubscribeOtherMqSubscribeTasks(String procDefId) {
try {
Set<String> executionIds = this.subscriptionService.cancelAllOtherVersions(procDefId);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
} else {
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
}
} catch (Exception e) {
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
}
}
}

View File

@@ -54,6 +54,7 @@ public class MqPublishDelegate extends AbstractMqDelegate {
* @field mq_payloadMimeType [optional] The MIME type of the body of the MQ message. * @field mq_payloadMimeType [optional] The MIME type of the body of the MQ message.
* @field mq_replyQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send a reply. * @field mq_replyQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send a reply.
* @field mq_statusQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send status updates. * @field mq_statusQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send status updates.
* @varIn mq_correlationId [optional] The correlationId of the message to send.
* @varOut mq_correlationId The correlationId of the message sent. * @varOut mq_correlationId The correlationId of the message sent.
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message. * @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
* @throws BPMNError network The MQ connection experienced network issues. * @throws BPMNError network The MQ connection experienced network issues.
@@ -73,6 +74,8 @@ public class MqPublishDelegate extends AbstractMqDelegate {
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel()); MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
PreparedMessage<String> message = communicator.createPreparedMessage(); PreparedMessage<String> message = communicator.createPreparedMessage();
if (mqExecution.getCorrelationId() != null)
message.setCorrelationId(mqExecution.getCorrelationId());
if (mqExecution.getStatusQueueNameFromModel() != null) if (mqExecution.getStatusQueueNameFromModel() != null)
message.setProperty("inteligr8.statusQueueName", mqExecution.getStatusQueueNameFromModel()); message.setProperty("inteligr8.statusQueueName", mqExecution.getStatusQueueNameFromModel());
message.setReplyToQueueName(mqExecution.getReplyQueueNameFromModel()); message.setReplyToQueueName(mqExecution.getReplyQueueNameFromModel());

View File

@@ -0,0 +1,268 @@
package com.inteligr8.activiti.mq;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.activiti.bpmn.model.FieldExtension;
import org.activiti.bpmn.model.FlowElement;
import org.activiti.bpmn.model.ServiceTask;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.event.ActivitiActivityEvent;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.repository.ProcessDefinitionQuery;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.ExecutionQuery;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.activiti.domain.idm.Tenant;
@Component
public class MqSubscribeLooper {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
protected ProcessEngine services;
@Autowired
protected ProcessDefinitionRegistry registry;
@Autowired
protected TenantFinderService tenantFinderService;
@Autowired
protected MqSubscriptionService subscriptionService;
protected Map<String, AbstractActivityListener> activeListeners = new HashMap<>();
/**
* This method starts the listening of activity completions on all the MQ
* subscribe tasks. This will start the infinite loop where new process
* instances are started every time another process instance receives a
* message.
*/
public void loopAllMqProcessDefinitions() {
String tenantId = this.findTenantId();
List<ProcessDefinition> procDefs = this.findLatestActiveProcessDefinnitions(tenantId);
this.logger.debug("Found {} active process definitions", procDefs.size());
for (ProcessDefinition procDef : procDefs) {
this.logger.trace("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId());
if (this.registry.isMqStart(procDef.getId()))
this.loop(tenantId, procDef.getId());
}
}
/**
* This method starts the listening of activity completions on the MQ
* subscribe task in the specified process definition. This will start the
* infinite loop where new process instances are started every time another
* process instance receives a message.
*
* @param processDefinitionId The unique identifier of the process definition to loop.
*/
public void loop(String processDefinitionId) {
String tenantId = this.findTenantId();
this.loop(tenantId, processDefinitionId);
}
private void loop(String tenantId, String processDefinitionId) {
ServiceTask task = this.registry.findMqStartSubscribeTask(processDefinitionId);
if (task == null)
throw new IllegalArgumentException();
int concurrency = this.determineConcurrency(task);
this.logger.debug("Process definition MQ subscription task is configured for concurrency: {}: {}", processDefinitionId, concurrency);
List<Execution> execs = this.findExecutionsByServiceTask(tenantId, processDefinitionId, task);
this.logger.debug("Process appears to have {} executions waiting on the MQ subscription: {}", execs.size(), processDefinitionId);
if (execs.size() < concurrency) {
this.logger.info("Process has {} too few executions waiting on the MQ subscription; starting them: {}", (concurrency - execs.size()), processDefinitionId);
this.loop(tenantId, processDefinitionId, task, concurrency, execs);
}
}
private synchronized void loop(String tenantId, String processDefinitionId, ServiceTask task, int concurrency, List<Execution> execs) {
String key = processDefinitionId + "|" + task.getId();
if (this.activeListeners.containsKey(key))
// one listener, no matter how many instances are subscribed
return;
AbstractActivityListener listener = new AbstractActivityListener(processDefinitionId, task) {
@Override
public void onEvent(ActivitiActivityEvent event) {
logger.debug("MQ message received; starting another process instance on process '{}' to re-subscribe to the MQ queue", event.getProcessDefinitionId());
services.getRuntimeService().startProcessInstanceById(event.getProcessDefinitionId());
}
@Override
public boolean isFailOnException() {
return true;
}
};
// the ServiceTask will then execute, waiting for a message on the queue
// when a message is received, it will be responsible for starting a new process instance
// that new process instance will wait for the next message on the queue
// repeat
this.services.getRuntimeService().addEventListener(listener, ActivitiEventType.ACTIVITY_COMPLETED);
// register the listener so we can possibly remove it later
this.activeListeners.put(key, listener);
for (int thread = execs.size(); thread < concurrency; thread++) {
// start a process instance on the process
this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue/topic", processDefinitionId);
this.services.getRuntimeService().startProcessInstanceById(processDefinitionId);
}
}
/**
* This method stops the listening of activity completions. This will end
* the infinite loop where new process instances are started every time
* another process instance receives a message.
*
* It will also attempt to cancel/close any active MQ subscription. If
* they are already closed, it will be skipped. The `executionId` of
* successfully cancelled subscriptions will be returned.
*
* @return A set of unique identifiers for Activiti executions that were successfully cancelled.
*/
public synchronized Set<String> deloopAllMqProcessDefinitions() {
Set<String> executionIds = new HashSet<>();
Iterator<Entry<String, AbstractActivityListener>> i = this.activeListeners.entrySet().iterator();
while (i.hasNext()) {
Entry<String, AbstractActivityListener> listener = i.next();
String[] splitkey = listener.getKey().split("\\|");
String processDefinitionId = splitkey[0];
String activityId = splitkey[1];
executionIds.addAll(this.deloop(processDefinitionId, activityId, listener.getValue()));
i.remove();
}
return executionIds;
}
/**
* This method stops the listening of activity completions on the MQ
* subscribe task in the specified process definition. This will end the
* infinite loop where new process instances are started every time another
* process instance receives a message.
*
* @param processDefinitionId The unique identifier of the process definition to loop.
* @return A set of unique identifiers for Activiti executions that were successfully cancelled.
*/
public synchronized Set<String> deloop(String processDefinitionId) {
ServiceTask task = this.registry.findMqStartSubscribeTask(processDefinitionId);
if (task == null)
throw new IllegalArgumentException();
String key = processDefinitionId + "|" + task.getId();
AbstractActivityListener listener = this.activeListeners.remove(key);
return this.deloop(processDefinitionId, task.getId(), listener);
}
private Set<String> deloop(String processDefinitionId, String activityId, AbstractActivityListener listener) {
if (listener == null) {
this.logger.trace("No MQ subscription listener was found; no listener to stop");
} else {
this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", processDefinitionId);
this.services.getRuntimeService().removeEventListener(listener);
}
Set<String> executionIds = new HashSet<>();
List<Execution> executions = this.services.getRuntimeService().createExecutionQuery()
.processDefinitionId(processDefinitionId)
.activityId(activityId)
.list();
for (Execution execution : executions) {
this.logger.debug("An MQ subscription was found; stopping execution: {}: {}: {}", processDefinitionId, activityId, execution.getId());
if (this.subscriptionService.cancelled(execution)) {
this.logger.info("An MQ subscription was closed: {}", execution.getId());
executionIds.add(execution.getId());
} else {
this.logger.debug("An MQ subscription was already closed: {}", execution.getId());
}
}
return executionIds;
}
protected String findTenantId() {
Tenant tenant = this.tenantFinderService.findTenant();
return this.tenantFinderService.transform(tenant);
}
protected List<ProcessDefinition> findLatestActiveProcessDefinnitions(String tenantId) {
ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery()
.latestVersion()
.active();
if (tenantId == null) {
procDefQuery.processDefinitionWithoutTenantId();
} else {
procDefQuery.processDefinitionTenantId(tenantId);
}
return procDefQuery.list();
}
protected List<Execution> findExecutionsByServiceTask(String tenantId, String processDefinitionId, ServiceTask task) {
ExecutionQuery execQuery = this.services.getRuntimeService().createExecutionQuery()
.processDefinitionId(processDefinitionId)
.activityId(task.getId());
if (tenantId == null) {
execQuery.executionWithoutTenantId();
} else {
execQuery.executionTenantId(tenantId);
}
return execQuery.list();
}
protected Integer findConcurrency(ServiceTask task) {
for (FieldExtension fieldext : task.getFieldExtensions()) {
if (fieldext.getFieldName().equals(Constants.FIELD_CONCURRENCY)) {
String concurrencyStr = StringUtils.trimToNull(fieldext.getStringValue());
return concurrencyStr == null ? null : Integer.valueOf(concurrencyStr);
}
}
return null;
}
protected int determineConcurrency(ServiceTask task) {
Integer concurrency = this.findConcurrency(task);
if (concurrency == null) {
return 1;
} else if (concurrency.intValue() < 1) {
this.logger.warn("The task defines an illegal concurrency of {}; using 1: {}", concurrency, task.getId());
return 1;
} else {
return concurrency.intValue();
}
}
protected ServiceTask castToServiceTask(FlowElement flowElement) {
if (!(flowElement instanceof ServiceTask))
return null;
return (ServiceTask) flowElement;
}
}

View File

@@ -63,19 +63,66 @@ public class MqSubscriptionService extends MqExecutionService {
} }
} }
/**
* This method cancel the MQ subscription on the specified execution.
*
* @param executionId An execution unique identifier.
* @return `true` if execution and MQ subscription were ended; `false` otherwise.
*/
@Override
public synchronized boolean cancel(String executionId) throws Exception {
if (!super.cancel(executionId))
return false;
this.logger.trace("Removing MQ subscription execution: {}", executionId);
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null) {
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}", executionId, consumer);
consumer.close();
}
return true;
}
/**
* This method cancels all the MQ subscriptions active on the specified
* process definition.
*
* @param processDefinitionId A process definition version unique identifier.
* @return A set of execution identifiers subscribed to MQ that are now cancelled; all subscriptions now ended.
*/
@Override
public synchronized Set<String> cancelAll(String processDefinitionId) throws Exception {
Set<String> executionIds = super.cancelAll(processDefinitionId);
ProcessDefinition processDefinition = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
String processDefinitionKey = processDefinition.getKey();
for (String executionId : executionIds) {
this.logger.trace("Removing MQ subscription execution: {}: {}", processDefinitionId, executionId);
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null) {
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer);
consumer.close();
}
}
return executionIds;
}
/** /**
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared. * @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
* @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended. * @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended.
*/ */
@Override @Override
public synchronized Set<String> clearOtherVersions(String latestProcessDefinitionId) throws Exception { public synchronized Set<String> cancelAllOtherVersions(String latestProcessDefinitionId) throws Exception {
Set<String> executionIds = super.clearOtherVersions(latestProcessDefinitionId); Set<String> executionIds = super.cancelAllOtherVersions(latestProcessDefinitionId);
ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId); ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId);
String processDefinitionKey = latestProcessDefinition.getKey(); String processDefinitionKey = latestProcessDefinition.getKey();
for (String executionId : executionIds) { for (String executionId : executionIds) {
this.logger.trace("Clearing process definition execution: {}: {}", latestProcessDefinitionId, executionId); this.logger.trace("Removing MQ subscription execution: {}: {}", latestProcessDefinitionId, executionId);
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId); AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null) { if (consumer != null) {
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer); this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer);

View File

@@ -0,0 +1,138 @@
package com.inteligr8.activiti.mq;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.activiti.bpmn.model.BpmnModel;
import org.activiti.bpmn.model.FlowElement;
import org.activiti.bpmn.model.SequenceFlow;
import org.activiti.bpmn.model.ServiceTask;
import org.activiti.bpmn.model.StartEvent;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.JavaDelegate;
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
@Component
public class ProcessDefinitionRegistry {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Pattern expressionPattern = Pattern.compile("\\$\\{(.+)\\}");
@Autowired
protected ProcessEngine services;
@Autowired
protected ApplicationContext context;
private Map<String, ServiceTask> processDefinitionMqSubscribeTasks;
public synchronized boolean isMqStart(String processDefinitionId) {
if (this.processDefinitionMqSubscribeTasks.containsKey(processDefinitionId)) {
return this.processDefinitionMqSubscribeTasks.get(processDefinitionId) != null;
} else {
// not yet cached; cache it; then return
return this.findMqStartSubscribeTask(processDefinitionId) != null;
}
}
public synchronized ServiceTask findMqStartSubscribeTask(String processDefinitionId) {
ServiceTask task = this.processDefinitionMqSubscribeTasks.get(processDefinitionId);
if (task != null)
return task;
BpmnModel model = this.getBpmnModel(processDefinitionId);
StartEvent startElement = this.findMqPlainStartEvent(processDefinitionId, model);
if (startElement == null)
return null;
List<SequenceFlow> flows = startElement.getOutgoingFlows();
this.logger.trace("Found {} outgoing flows in the process start element: {}: {}", flows.size(), processDefinitionId, startElement.getId());
if (flows.isEmpty()) {
this.logger.warn("A start event is expected to always have an outgoing flow; skipping process: {}", processDefinitionId);
return null;
} else if (flows.size() > 1) {
this.logger.debug("A start event with multiple outgoing flows cannot have an MQ subscription; skipping process: {}", processDefinitionId);
return null;
}
SequenceFlow flow = flows.iterator().next();
FlowElement targetFlowElement = flow.getTargetFlowElement();
if (targetFlowElement == null) {
this.logger.warn("A start event outgoing flow is expected to always have a target element; skipping process: {}", processDefinitionId);
return null;
}
this.logger.trace("Found element '{}' after start event '{}' in process '{}'", targetFlowElement.getId(), startElement.getId(), processDefinitionId);
if (!(targetFlowElement instanceof ServiceTask)) {
this.logger.trace("A non-service task immediately after start event; skipping process: {}: {}: {}", processDefinitionId, targetFlowElement.getId(), targetFlowElement.getName());
return null;
}
task = (ServiceTask) targetFlowElement;
if (!"delegateExpression".equals(task.getImplementationType())) {
this.logger.trace("A non-delegate service task immediately after start event; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName());
return null;
}
this.logger.trace("Found service task immediately after start event: {}: {}", task.getImplementationType(), task.getImplementation());
Matcher matcher = this.expressionPattern.matcher(task.getImplementation());
if (!matcher.find()) {
this.logger.warn("A service task delegate expression is in an unexpected format: {}; skipping process: {}: {}: {}", task.getImplementation(), processDefinitionId, task.getId(), task.getName());
return null;
}
String beanId = matcher.group(1).trim();
this.logger.trace("Looking up bean: {}", beanId);
JavaDelegate delegateBean = this.context.getBean(beanId, JavaDelegate.class);
this.logger.trace("Found bean: {}: {}", beanId, delegateBean);
if (delegateBean == null) {
this.logger.trace("The service task delegate has no bean; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName());
return null;
} else if (!(delegateBean instanceof MqSubscribeDelegate)) {
this.logger.trace("The service task delegate is not an MQ subscription; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName());
return null;
}
this.processDefinitionMqSubscribeTasks.put(processDefinitionId, task);
this.logger.info("Process starts with an MQ subscription: {}: {}", processDefinitionId, task.getId(), task.getName());
return task;
}
private BpmnModel getBpmnModel(String processDefId) {
if (Context.getProcessDefinitionHelper() == null) {
// typically during initial startup
return this.services.getRepositoryService().getBpmnModel(processDefId);
} else {
// we cannot use the RepositoryService to get the BpmnModel as it is not yet cached in TX
return ProcessDefinitionUtil.getBpmnModel(processDefId);
}
}
protected StartEvent findMqPlainStartEvent(String processDefId, BpmnModel model) {
this.logger.trace("Finding all start elements in process: {}", processDefId);
List<StartEvent> startElements = model.getMainProcess().findFlowElementsOfType(StartEvent.class);
this.logger.trace("Found {} start elements in process: {}", startElements.size(), processDefId);
for (StartEvent startElement : startElements) {
// constrain to just 0..1 plain start events; ignore timer, signal, message, and error start events
if (!(startElement.getBehavior() instanceof NoneStartEventActivityBehavior)) {
this.logger.trace("Found non-plain start event; ignoring start event: {}: {}: {}", processDefId, startElement.getId(), startElement.getBehavior());
continue;
}
return startElement;
}
return null;
}
}