Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
fd142e709e | |||
98ee188595 | |||
5d0a77f623 | |||
bdbda54743 | |||
8d91166a01 | |||
4fe113b8a7 | |||
370b385aea | |||
82b4c500d5 | |||
9762f74e89 |
2
pom.xml
2
pom.xml
@@ -11,7 +11,7 @@
|
||||
</parent>
|
||||
|
||||
<artifactId>mq-activiti-ext</artifactId>
|
||||
<version>1.0.0</version>
|
||||
<version>1.0.3</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>MQ Activiti Extension</name>
|
||||
|
@@ -1186,6 +1186,40 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "mq_prioritypackage",
|
||||
"properties": [
|
||||
{
|
||||
"id": "mq_priority",
|
||||
"type": "String",
|
||||
"title": "MQ Priority",
|
||||
"value": "",
|
||||
"description": "MQ message priority; depends on protocol specification",
|
||||
"popular": false,
|
||||
"custom": {
|
||||
"includeInXML": true,
|
||||
"xmlPropertyName": "mq_priority"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "mq_concurrencypackage",
|
||||
"properties": [
|
||||
{
|
||||
"id": "mq_concurrency",
|
||||
"type": "String",
|
||||
"title": "MQ Concurrency (positive number)",
|
||||
"value": "",
|
||||
"description": "Number of MQ subscription threads",
|
||||
"popular": false,
|
||||
"custom": {
|
||||
"includeInXML": true,
|
||||
"xmlPropertyName": "mq_concurrency"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "mq_payloadpackage",
|
||||
"properties": [
|
||||
@@ -3050,10 +3084,11 @@
|
||||
"mq_connectorIdpackage",
|
||||
"mq_queueNamepackage",
|
||||
"mq_messageNamepackage",
|
||||
"mq_prioritypackage",
|
||||
"mq_payloadpackage",
|
||||
"mq_replyQueueNamepackage",
|
||||
"mq_statusQueueNamepackage",
|
||||
"mq_metadataProcessScopepackage",
|
||||
"mq_payloadpackage"
|
||||
"mq_metadataProcessScopepackage"
|
||||
],
|
||||
"hiddenPropertyPackages": [
|
||||
"multiinstance_typepackage",
|
||||
@@ -3153,6 +3188,7 @@
|
||||
"mq_connectorIdpackage",
|
||||
"mq_queueNamepackage",
|
||||
"mq_messageNamepackage",
|
||||
"mq_concurrencypackage",
|
||||
"mq_metadataProcessScopepackage"
|
||||
],
|
||||
"hiddenPropertyPackages": [
|
||||
|
@@ -33,7 +33,7 @@ public abstract class AbstractMqDelegate implements JavaDelegate {
|
||||
|
||||
private Map<String, MqCommunicator> communicators = new HashMap<>();
|
||||
|
||||
protected synchronized MqCommunicator getConnection(String connectorId) throws JMSException, TimeoutException, IOException {
|
||||
protected synchronized MqCommunicator getCommunicator(String connectorId) throws JMSException, TimeoutException, IOException {
|
||||
MqCommunicator communicator = this.communicators.get(connectorId);
|
||||
if (communicator == null) {
|
||||
EndpointConfiguration endpointConfig = this.endpointService.getConfigurationByName(connectorId);
|
||||
|
@@ -9,6 +9,8 @@ public class Constants {
|
||||
public static final String FIELD_CONNECTOR_ID = "mq_connectorId";
|
||||
public static final String FIELD_QUEUE_NAME = "mq_queueName";
|
||||
public static final String FIELD_MESSAGE_NAME = "mq_messageName";
|
||||
public static final String FIELD_CONCURRENCY = "mq_concurrency";
|
||||
public static final String FIELD_PRIORITY = "mq_priority";
|
||||
public static final String FIELD_REPLY_QUEUE_NAME = "mq_replyQueueName";
|
||||
public static final String FIELD_STATUS_QUEUE_NAME = "mq_statusQueueName";
|
||||
public static final String FIELD_PAYLOAD = "mq_payload";
|
||||
|
@@ -5,6 +5,7 @@ import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@@ -12,6 +13,7 @@ import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
import org.activiti.bpmn.model.BpmnModel;
|
||||
import org.activiti.bpmn.model.FieldExtension;
|
||||
import org.activiti.bpmn.model.FlowElement;
|
||||
import org.activiti.bpmn.model.SequenceFlow;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
@@ -24,11 +26,15 @@ import org.activiti.engine.delegate.event.ActivitiEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventListener;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventType;
|
||||
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
|
||||
import org.activiti.engine.impl.context.Context;
|
||||
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
|
||||
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
|
||||
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
|
||||
import org.activiti.engine.repository.ProcessDefinition;
|
||||
import org.activiti.engine.repository.ProcessDefinitionQuery;
|
||||
import org.activiti.engine.runtime.Execution;
|
||||
import org.activiti.engine.runtime.ExecutionQuery;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -58,6 +64,9 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
|
||||
@Autowired
|
||||
private TenantFinderService tenantFinderService;
|
||||
|
||||
@Autowired
|
||||
private MqSubscriptionService subscriptionService;
|
||||
|
||||
private Map<String, AbstractActivityListener> activeListeners = new HashMap<>();
|
||||
|
||||
/**
|
||||
@@ -73,30 +82,86 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
|
||||
}
|
||||
|
||||
if (event instanceof ContextRefreshedEvent || event instanceof ContextStartedEvent) {
|
||||
Tenant tenant = this.tenantFinderService.findTenant();
|
||||
|
||||
ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery().active();
|
||||
if (tenant == null) {
|
||||
procDefQuery.processDefinitionWithoutTenantId();
|
||||
} else {
|
||||
String tenantId = this.tenantFinderService.transform(tenant.getId());
|
||||
procDefQuery.processDefinitionTenantId(tenantId);
|
||||
}
|
||||
|
||||
List<ProcessDefinition> procDefs = procDefQuery.list();
|
||||
String tenantId = this.findTenantId();
|
||||
List<ProcessDefinition> procDefs = this.findLatestActiveProcessDefinnitions(tenantId);
|
||||
this.logger.debug("Found {} active process definitions", procDefs.size());
|
||||
for (ProcessDefinition procDef : procDefs) {
|
||||
this.logger.debug("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId());
|
||||
this.logger.trace("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId());
|
||||
|
||||
ServiceTask task = this.findMqStartSubscribeTask(procDef.getId());
|
||||
if (task == null)
|
||||
continue;
|
||||
|
||||
this.loopMqSubscribeTask(procDef.getId(), task);
|
||||
int concurrency = this.determineConcurrency(task);
|
||||
this.logger.debug("Process definition MQ subscription is configured for concurrency: {}: {}", procDef.getId(), concurrency);
|
||||
|
||||
List<Execution> execs = this.findExecutionsByServiceTask(tenantId, procDef.getId(), task);
|
||||
this.logger.debug("Process appears to have {} executions waiting on the MQ subscription: {}", execs.size(), procDef.getId());
|
||||
|
||||
if (execs.size() < concurrency) {
|
||||
this.logger.info("Process has {} too few executions waiting on the MQ subscription; starting them: {}", (concurrency - execs.size()), procDef.getId());
|
||||
for (int thread = execs.size(); thread < concurrency; thread++) {
|
||||
this.loopMqSubscribeTask(procDef.getId(), task);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected String findTenantId() {
|
||||
Tenant tenant = this.tenantFinderService.findTenant();
|
||||
return this.tenantFinderService.transform(tenant);
|
||||
}
|
||||
|
||||
protected List<ProcessDefinition> findLatestActiveProcessDefinnitions(String tenantId) {
|
||||
ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery()
|
||||
.latestVersion()
|
||||
.active();
|
||||
if (tenantId == null) {
|
||||
procDefQuery.processDefinitionWithoutTenantId();
|
||||
} else {
|
||||
procDefQuery.processDefinitionTenantId(tenantId);
|
||||
}
|
||||
|
||||
return procDefQuery.list();
|
||||
}
|
||||
|
||||
protected List<Execution> findExecutionsByServiceTask(String tenantId, String processDefinitionId, ServiceTask task) {
|
||||
ExecutionQuery execQuery = this.services.getRuntimeService().createExecutionQuery()
|
||||
.processDefinitionId(processDefinitionId)
|
||||
.activityId(task.getId());
|
||||
if (tenantId == null) {
|
||||
execQuery.executionWithoutTenantId();
|
||||
} else {
|
||||
execQuery.executionTenantId(tenantId);
|
||||
}
|
||||
|
||||
return execQuery.list();
|
||||
}
|
||||
|
||||
protected Integer findConcurrency(ServiceTask task) {
|
||||
for (FieldExtension fieldext : task.getFieldExtensions()) {
|
||||
if (fieldext.getFieldName().equals(Constants.FIELD_CONCURRENCY)) {
|
||||
String concurrencyStr = StringUtils.trimToNull(fieldext.getStringValue());
|
||||
return concurrencyStr == null ? null : Integer.valueOf(concurrencyStr);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
protected int determineConcurrency(ServiceTask task) {
|
||||
Integer concurrency = this.findConcurrency(task);
|
||||
if (concurrency == null) {
|
||||
return 1;
|
||||
} else if (concurrency.intValue() < 1) {
|
||||
this.logger.warn("The task defines an illegal concurrency of {}; using 1: {}", concurrency, task.getId());
|
||||
return 1;
|
||||
} else {
|
||||
return concurrency.intValue();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will start monitoring for changes in deployment, app, and
|
||||
* process definitions.
|
||||
@@ -174,11 +239,13 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
|
||||
|
||||
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
|
||||
this.logger.debug("Triggered by process definition addition: {}", entity);
|
||||
|
||||
this.unsubscribeMqSubscribeTasks(entity.getId());
|
||||
|
||||
ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
|
||||
if (task == null)
|
||||
return;
|
||||
|
||||
|
||||
this.loopMqSubscribeTask(entity.getId(), task);
|
||||
}
|
||||
|
||||
@@ -192,6 +259,8 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
|
||||
|
||||
for (ProcessDefinitionEntity procDefEntity : procDefEntities) {
|
||||
this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName());
|
||||
|
||||
this.unsubscribeMqSubscribeTasks(procDefEntity.getId());
|
||||
|
||||
ServiceTask task = this.findMqStartSubscribeTask(procDefEntity.getId());
|
||||
if (task == null)
|
||||
@@ -203,18 +272,37 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
|
||||
|
||||
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
|
||||
this.logger.debug("Triggered by process definition removal: {}", entity);
|
||||
|
||||
this.unsubscribeMqSubscribeTasks(entity.getId());
|
||||
|
||||
ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
|
||||
if (task == null)
|
||||
return;
|
||||
|
||||
this.deloopMqSubscribeTask(entity.getId());
|
||||
}
|
||||
|
||||
protected void unsubscribeMqSubscribeTasks(String procDefId) {
|
||||
try {
|
||||
Set<String> executionIds = this.subscriptionService.clear(procDefId);
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
|
||||
} else {
|
||||
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
this.logger.error("The subscriptions could not be cleared: " + procDefId, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized void loopMqSubscribeTask(String processDefId, ServiceTask task) {
|
||||
// start a process instance on the process
|
||||
this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue", processDefId);
|
||||
this.services.getRuntimeService().startProcessInstanceById(processDefId);
|
||||
|
||||
if (this.activeListeners.containsKey(processDefId)) {
|
||||
this.logger.debug("The process definition already has a looping listener: {}", processDefId);
|
||||
if (this.activeListeners.containsKey(processDefId))
|
||||
// one listener, no matter how many instances are subscribed
|
||||
return;
|
||||
}
|
||||
|
||||
AbstractActivityListener listener = new AbstractActivityListener(processDefId, task) {
|
||||
@Override
|
||||
@@ -268,9 +356,18 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
|
||||
return true;
|
||||
}
|
||||
|
||||
private BpmnModel getBpmnModel(String processDefId) {
|
||||
if (Context.getProcessDefinitionHelper() == null) {
|
||||
// typically during initial startup
|
||||
return this.services.getRepositoryService().getBpmnModel(processDefId);
|
||||
} else {
|
||||
// we cannot use the RepositoryService to get the BpmnModel as it is not yet cached in TX
|
||||
return ProcessDefinitionUtil.getBpmnModel(processDefId);
|
||||
}
|
||||
}
|
||||
|
||||
protected ServiceTask findMqStartSubscribeTask(String processDefId) {
|
||||
// we cannot use the RepositoryService to get the BpmnModel as it is not yet cached in TX
|
||||
BpmnModel model = ProcessDefinitionUtil.getBpmnModel(processDefId);
|
||||
BpmnModel model = this.getBpmnModel(processDefId);
|
||||
StartEvent startElement = this.findMqPlainStartEvent(processDefId, model);
|
||||
if (startElement == null)
|
||||
return null;
|
||||
@@ -324,7 +421,7 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
|
||||
return null;
|
||||
}
|
||||
|
||||
this.logger.debug("Process starts with an MQ subscription: {}: {}", processDefId, task.getId(), task.getName());
|
||||
this.logger.info("Process starts with an MQ subscription: {}: {}", processDefId, task.getId(), task.getName());
|
||||
return task;
|
||||
}
|
||||
|
||||
|
@@ -14,33 +14,49 @@ public interface MqCommunicator {
|
||||
<BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException;
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, null, null);
|
||||
return this.receive(destination, -1L, null, null, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, MqSubscriptionListener listener) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, null, listener, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, timeoutInMillis, null, null);
|
||||
return this.receive(destination, timeoutInMillis, null, null, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, MqSubscriptionListener listener) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, timeoutInMillis, null, listener, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, String correlationId) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, correlationId, null);
|
||||
return this.receive(destination, -1L, correlationId, null, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, String correlationId, MqSubscriptionListener listener) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, correlationId, listener, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, timeoutInMillis, correlationId, null);
|
||||
return this.receive(destination, timeoutInMillis, correlationId, null, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, null, handler);
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, timeoutInMillis, correlationId, listener, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, timeoutInMillis, null, handler);
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, null, listener, handler);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, correlationId, handler);
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, timeoutInMillis, null, listener, handler);
|
||||
}
|
||||
|
||||
<BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException;
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, String correlationId, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, correlationId, listener, handler);
|
||||
}
|
||||
|
||||
<BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException;
|
||||
|
||||
}
|
||||
|
@@ -2,92 +2,35 @@ package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.annotation.OverridingMethodsMustInvokeSuper;
|
||||
|
||||
import org.activiti.bpmn.model.FieldExtension;
|
||||
import org.activiti.bpmn.model.FlowElement;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
import org.activiti.engine.delegate.Expression;
|
||||
import org.activiti.engine.impl.context.Context;
|
||||
import org.activiti.engine.impl.el.ExpressionManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MqDelegateExecution {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
protected final ProcessEngine services;
|
||||
protected final DelegateExecution execution;
|
||||
protected final ServiceTask task;
|
||||
protected final Map<String, FieldExtension> fieldMap = new HashMap<>();
|
||||
private boolean metadataToProcessScope = false;
|
||||
protected final MqServiceTask task;
|
||||
|
||||
public MqDelegateExecution(ProcessEngine services, DelegateExecution execution) {
|
||||
this.services = services;
|
||||
public MqDelegateExecution(ProcessEngine services, MqServiceTaskService msts, DelegateExecution execution) {
|
||||
this.execution = execution;
|
||||
|
||||
FlowElement flowElement = execution.getCurrentFlowElement();
|
||||
if (!(flowElement instanceof ServiceTask))
|
||||
throw new IllegalStateException("This should never happen");
|
||||
this.task = (ServiceTask) flowElement;
|
||||
this.logger.trace("Discovered task: {}: {}", this.task.getId(), this.task.getName());
|
||||
|
||||
this.logger.trace("Indexing {} fields", this.task.getFieldExtensions().size());
|
||||
for (FieldExtension field : this.task.getFieldExtensions()) {
|
||||
this.logger.trace("Discovering field: {}: {}: {}", field.getId(), field.getFieldName(), field.getStringValue());
|
||||
|
||||
switch (field.getFieldName()) {
|
||||
case Constants.FIELD_METADATA_PROCESS_SCOPE:
|
||||
this.metadataToProcessScope = Boolean.valueOf(field.getStringValue());
|
||||
break;
|
||||
default:
|
||||
this.fieldMap.put(field.getFieldName(), field);
|
||||
}
|
||||
}
|
||||
this.task = msts.get(execution.getProcessDefinitionId(), execution.getCurrentFlowElement());
|
||||
}
|
||||
|
||||
@OverridingMethodsMustInvokeSuper
|
||||
public void validate() {
|
||||
if (this.fieldMap.get(Constants.FIELD_CONNECTOR_ID) == null)
|
||||
throw new IllegalStateException("The '" + this.execution.getCurrentActivityId() + "' activity must define an 'MQ Connector ID'");
|
||||
if (this.fieldMap.get(Constants.FIELD_QUEUE_NAME) == null)
|
||||
throw new IllegalStateException("The '" + this.execution.getCurrentActivityId() + "' activity must define an 'MQ Queue Name'");
|
||||
}
|
||||
|
||||
public String getTaskId() {
|
||||
return this.task.getId();
|
||||
}
|
||||
|
||||
public boolean doWriteToProcessScope() {
|
||||
return this.metadataToProcessScope;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Unlike MqServiceTask, this allows for expression expansion based on the variables
|
||||
*/
|
||||
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type) {
|
||||
return this.getFieldValueFromModel(fieldName, type, false);
|
||||
return this.task.getFieldValueFromModel(fieldName, type, this.execution, false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
/**
|
||||
* Unlike MqServiceTask, this allows for expression expansion based on the variables; even partial values
|
||||
*/
|
||||
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type, boolean forceExpressionProcessing) {
|
||||
FieldExtension field = this.fieldMap.get(fieldName);
|
||||
if (field == null) {
|
||||
return null;
|
||||
} else if (field.getExpression() != null && field.getExpression().length() > 0) {
|
||||
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||
Expression expr = exprman.createExpression(field.getExpression());
|
||||
return (T) expr.getValue(this.execution);
|
||||
} else if (forceExpressionProcessing) {
|
||||
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||
Expression expr = exprman.createExpression(field.getStringValue());
|
||||
return (T) expr.getValue(this.execution);
|
||||
} else {
|
||||
return (T) field.getStringValue();
|
||||
}
|
||||
return this.task.getFieldValueFromModel(fieldName, type, this.execution, forceExpressionProcessing);
|
||||
}
|
||||
|
||||
public String getConnectorIdFromModel() {
|
||||
@@ -112,7 +55,7 @@ public class MqDelegateExecution {
|
||||
|
||||
protected void setMqVariable(String varName, Object value) {
|
||||
varName = this.formulateVariableName(varName);
|
||||
if (this.doWriteToProcessScope()) {
|
||||
if (this.task.doWriteToProcessScope()) {
|
||||
this.execution.setVariable(varName, value);
|
||||
} else {
|
||||
this.execution.setVariableLocal(varName, value);
|
||||
|
101
src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java
Normal file
101
src/main/java/com/inteligr8/activiti/mq/MqExecutionService.java
Normal file
@@ -0,0 +1,101 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
import org.apache.commons.collections4.MultiValuedMap;
|
||||
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class MqExecutionService {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
/**
|
||||
* The size of the keys is limited to the number of process definitions
|
||||
* defined. It would actually only contain ones with an MQ subscribe task.
|
||||
* Even if we kept versioned or inactive process definitions in the map, it
|
||||
* would never be a significant memory hog.
|
||||
*
|
||||
* The size of the values is limited to the number of MQ subscribe tasks
|
||||
* defined in each process definition. So it would never be a significant
|
||||
* memory hog.
|
||||
*
|
||||
* The size of the keys/values have nothing to do with the number of
|
||||
* process instances or executions.
|
||||
*
|
||||
* This means it does not need to be trimmed. However, it is a good idea
|
||||
* to remove process definition keys that have no active executions. You
|
||||
* could do the same with active activities, but cleaning up process
|
||||
* definitions will clean those up too.
|
||||
*/
|
||||
private MultiValuedMap<String, String> processDefinitionActivityMap = new HashSetValuedHashMap<>();
|
||||
|
||||
/**
|
||||
* The size of the keys is limited to the number of MQ subscribe tasks
|
||||
* defined in all process definitions. So it would never be a significant
|
||||
* memory hog.
|
||||
*
|
||||
* The size of the values has no limit. It will grow with the number of
|
||||
* executions (related to process instances).
|
||||
*
|
||||
* This means the map values need to be trimmed. When an MQ subscribe task
|
||||
* is completed, it is paramount to remove the execution from the values of
|
||||
* this map. It is also a good idea to remove the activity key when it is
|
||||
* removed from the `processDefinitionActivityMap` map; and to propagate
|
||||
* the removal of executions from the `executionSubscriptionMap` map.
|
||||
*/
|
||||
private MultiValuedMap<Pair<String, String>, String> activityExecutionMap = new HashSetValuedHashMap<>();
|
||||
|
||||
public synchronized void executing(DelegateExecution execution) {
|
||||
this.processDefinitionActivityMap.put(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
|
||||
|
||||
Pair<String, String> key = this.toKey(execution);
|
||||
this.activityExecutionMap.put(key, execution.getId());
|
||||
}
|
||||
|
||||
public synchronized void executed(DelegateExecution execution) {
|
||||
Pair<String, String> key = this.toKey(execution);
|
||||
this.activityExecutionMap.removeMapping(key, execution.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param processDefinitionId A process definition identifier.
|
||||
* @return A set of execution identifiers that were in the now cleared map.
|
||||
*/
|
||||
public synchronized Set<String> clear(String processDefinitionId) throws Exception {
|
||||
Collection<String> activityIds = this.processDefinitionActivityMap.get(processDefinitionId);
|
||||
if (activityIds == null) {
|
||||
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
Set<String> executionIds = new HashSet<>();
|
||||
|
||||
for (String activityId : activityIds) {
|
||||
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
|
||||
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
|
||||
Collection<String> activityExecutionIds = this.activityExecutionMap.get(key);
|
||||
if (activityExecutionIds != null)
|
||||
executionIds.addAll(activityExecutionIds);
|
||||
}
|
||||
|
||||
return executionIds;
|
||||
}
|
||||
|
||||
protected Pair<String, String> toKey(DelegateExecution execution) {
|
||||
return this.toKey(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
|
||||
}
|
||||
|
||||
protected Pair<String, String> toKey(String processDefinitionId, String activityId) {
|
||||
return Pair.of(processDefinitionId, activityId);
|
||||
}
|
||||
|
||||
}
|
@@ -30,6 +30,9 @@ public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
@Autowired
|
||||
private ProcessEngine services;
|
||||
|
||||
@Autowired
|
||||
private MqServiceTaskService msts;
|
||||
|
||||
/**
|
||||
* This method sends a message to an MQ queue.
|
||||
*
|
||||
@@ -46,6 +49,7 @@ public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_priority [optional] A priority of the MQ message. May be an expression. Value depends on MQ protocol.
|
||||
* @field mq_payload [optional] The body of the MQ message. May include expressions.
|
||||
* @field mq_payloadMimeType [optional] The MIME type of the body of the MQ message.
|
||||
* @field mq_replyQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send a reply.
|
||||
@@ -59,19 +63,20 @@ public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
*/
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
MqPublishDelegateExecution mqExecution = new MqPublishDelegateExecution(this.services, execution);
|
||||
MqPublishDelegateExecution mqExecution = new MqPublishDelegateExecution(this.services, this.msts, execution);
|
||||
mqExecution.validate();
|
||||
|
||||
GenericDestination destination = new GenericDestination();
|
||||
destination.setQueueName(mqExecution.getQueueNameFromModel());
|
||||
|
||||
try {
|
||||
MqCommunicator communicator = this.getConnection(mqExecution.getConnectorIdFromModel());
|
||||
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
|
||||
|
||||
PreparedMessage<String> message = communicator.createPreparedMessage();
|
||||
if (mqExecution.getStatusQueueNameFromModel() != null)
|
||||
message.setProperty("inteligr8.statusQueueName", mqExecution.getStatusQueueNameFromModel());
|
||||
message.setReplyToQueueName(mqExecution.getReplyQueueNameFromModel());
|
||||
message.setPriority(mqExecution.getPriorityFromModel());
|
||||
|
||||
String payload = mqExecution.getPayloadFromModel();
|
||||
if (payload != null) {
|
||||
|
@@ -5,8 +5,12 @@ import org.activiti.engine.delegate.DelegateExecution;
|
||||
|
||||
public class MqPublishDelegateExecution extends MqDelegateExecution {
|
||||
|
||||
public MqPublishDelegateExecution(ProcessEngine services, DelegateExecution execution) {
|
||||
super(services, execution);
|
||||
public MqPublishDelegateExecution(ProcessEngine services, MqServiceTaskService msts, DelegateExecution execution) {
|
||||
super(services, msts, execution);
|
||||
}
|
||||
|
||||
public Integer getPriorityFromModel() {
|
||||
return this.getFieldValueFromModel(Constants.FIELD_PRIORITY, Integer.class);
|
||||
}
|
||||
|
||||
public String getReplyQueueNameFromModel() {
|
||||
|
96
src/main/java/com/inteligr8/activiti/mq/MqServiceTask.java
Normal file
96
src/main/java/com/inteligr8/activiti/mq/MqServiceTask.java
Normal file
@@ -0,0 +1,96 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.annotation.OverridingMethodsMustInvokeSuper;
|
||||
|
||||
import org.activiti.bpmn.model.FieldExtension;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.Expression;
|
||||
import org.activiti.engine.delegate.VariableScope;
|
||||
import org.activiti.engine.impl.context.Context;
|
||||
import org.activiti.engine.impl.el.ExpressionManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MqServiceTask {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
protected final ProcessEngine services;
|
||||
protected final ServiceTask task;
|
||||
protected final Map<String, FieldExtension> fieldMap = new HashMap<>();
|
||||
private boolean metadataToProcessScope = false;
|
||||
|
||||
public MqServiceTask(ProcessEngine services, ServiceTask task) {
|
||||
this.services = services;
|
||||
this.task = task;
|
||||
|
||||
this.logger.trace("Indexing {} fields", this.task.getFieldExtensions().size());
|
||||
for (FieldExtension field : this.task.getFieldExtensions()) {
|
||||
this.logger.trace("Discovering field: {}: {}: {}", field.getId(), field.getFieldName(), field.getStringValue());
|
||||
|
||||
switch (field.getFieldName()) {
|
||||
case Constants.FIELD_METADATA_PROCESS_SCOPE:
|
||||
this.metadataToProcessScope = Boolean.valueOf(field.getStringValue());
|
||||
break;
|
||||
default:
|
||||
this.fieldMap.put(field.getFieldName(), field);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OverridingMethodsMustInvokeSuper
|
||||
public void validate() {
|
||||
if (this.fieldMap.get(Constants.FIELD_CONNECTOR_ID) == null)
|
||||
throw new IllegalStateException("The '" + this.task.getId() + "' activity must define an 'MQ Connector ID'");
|
||||
if (this.fieldMap.get(Constants.FIELD_QUEUE_NAME) == null)
|
||||
throw new IllegalStateException("The '" + this.task.getId() + "' activity must define an 'MQ Queue Name'");
|
||||
}
|
||||
|
||||
public String getTaskId() {
|
||||
return this.task.getId();
|
||||
}
|
||||
|
||||
public boolean doWriteToProcessScope() {
|
||||
return this.metadataToProcessScope;
|
||||
}
|
||||
|
||||
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type) {
|
||||
return this.getFieldValueFromModel(fieldName, type, null, false);
|
||||
}
|
||||
|
||||
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type, VariableScope varscope) {
|
||||
return this.getFieldValueFromModel(fieldName, type, varscope, false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type, VariableScope varscope, boolean forceExpressionProcessing) {
|
||||
FieldExtension field = this.fieldMap.get(fieldName);
|
||||
if (field == null) {
|
||||
return null;
|
||||
} else if (field.getExpression() != null && field.getExpression().length() > 0) {
|
||||
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||
Expression expr = exprman.createExpression(field.getExpression());
|
||||
return (T) expr.getValue(varscope);
|
||||
} else if (forceExpressionProcessing) {
|
||||
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||
Expression expr = exprman.createExpression(field.getStringValue());
|
||||
return (T) expr.getValue(varscope);
|
||||
} else if (String.class.isAssignableFrom(type)) {
|
||||
return (T) field.getStringValue();
|
||||
} else {
|
||||
try {
|
||||
Method method = type.getMethod("valueOf", String.class);
|
||||
return (T) method.invoke(null, field.getStringValue());
|
||||
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,82 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.activiti.bpmn.model.BpmnModel;
|
||||
import org.activiti.bpmn.model.FlowElement;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class MqServiceTaskService {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(MqServiceTaskService.class);
|
||||
|
||||
@Autowired
|
||||
private ProcessEngine services;
|
||||
|
||||
/**
|
||||
* This is an indefinite cache with no updates or removals.
|
||||
* We should be fine here.
|
||||
*/
|
||||
private Map<String, MqServiceTask> taskCache = new HashMap<>();
|
||||
|
||||
public MqServiceTask get(String processDefinitionId, String taskId) {
|
||||
String key = processDefinitionId + "|" + taskId;
|
||||
if (this.taskCache.containsKey(key))
|
||||
return this.taskCache.get(key);
|
||||
|
||||
BpmnModel model = this.services.getRepositoryService().getBpmnModel(processDefinitionId);
|
||||
if (model == null)
|
||||
throw new IllegalArgumentException("The process definition '" + processDefinitionId + "' could not be found");
|
||||
|
||||
FlowElement element = model.getFlowElement(taskId);
|
||||
if (element == null)
|
||||
throw new IllegalArgumentException("The service task '" + taskId + "' could not be found in the process definition '" + processDefinitionId + "' model");
|
||||
|
||||
if (!(element instanceof ServiceTask))
|
||||
throw new IllegalArgumentException("The model element '" + taskId + "' is not a service task as expected");
|
||||
|
||||
ServiceTask task = (ServiceTask) element;
|
||||
this.logger.trace("Discovered service task: {}: {}", task.getId(), task.getName());
|
||||
|
||||
MqServiceTask mqtask = new MqServiceTask(this.services, task);
|
||||
mqtask.validate();
|
||||
this.taskCache.put(key, mqtask);
|
||||
return mqtask;
|
||||
}
|
||||
|
||||
public MqServiceTask get(String processDefinitionId, FlowElement taskElement) {
|
||||
String key = processDefinitionId + "|" + taskElement.getId();
|
||||
if (this.taskCache.containsKey(key))
|
||||
return this.taskCache.get(key);
|
||||
|
||||
if (!(taskElement instanceof ServiceTask))
|
||||
throw new IllegalArgumentException("The model element '" + taskElement.getId() + "' is not a service task as expected");
|
||||
|
||||
ServiceTask task = (ServiceTask) taskElement;
|
||||
this.logger.trace("Discovered service task: {}: {}", task.getId(), task.getName());
|
||||
|
||||
MqServiceTask mqtask = new MqServiceTask(this.services, task);
|
||||
mqtask.validate();
|
||||
this.taskCache.put(key, mqtask);
|
||||
return mqtask;
|
||||
}
|
||||
|
||||
public MqServiceTask get(String processDefinitionId, ServiceTask task) {
|
||||
String key = processDefinitionId + "|" + task.getId();
|
||||
if (this.taskCache.containsKey(key))
|
||||
return this.taskCache.get(key);
|
||||
|
||||
MqServiceTask mqtask = new MqServiceTask(this.services, task);
|
||||
mqtask.validate();
|
||||
this.taskCache.put(key, mqtask);
|
||||
return mqtask;
|
||||
}
|
||||
|
||||
}
|
@@ -29,6 +29,12 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
|
||||
@Autowired
|
||||
private ProcessEngine services;
|
||||
|
||||
@Autowired
|
||||
private MqServiceTaskService msts;
|
||||
|
||||
@Autowired
|
||||
private MqSubscriptionService subscriptionService;
|
||||
|
||||
/**
|
||||
* This delegate listens for messages on an MQ queue.
|
||||
@@ -51,6 +57,7 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_concurrency The number of process instances to simultaneously listen on the queue. Only positive numbers are accepted.
|
||||
* @varOut mq_correlationId The correlating identifier of the message received. Use to correlate between sent/received messages; like a thread of communication.
|
||||
* @varOut mq_messageId The unique message identifer of the message received.
|
||||
* @varOut mq_deliveryTime The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti.
|
||||
@@ -66,8 +73,8 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
* @see mqSubscribeReplyDelegate#delegate
|
||||
*/
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, execution);
|
||||
public void execute(final DelegateExecution execution) {
|
||||
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, this.msts, execution);
|
||||
mqExecution.validate();
|
||||
|
||||
GenericDestination destination = new GenericDestination();
|
||||
@@ -76,22 +83,35 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
this.logger.debug("Will look for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
|
||||
|
||||
try {
|
||||
MqCommunicator communicator = this.getConnection(mqExecution.getConnectorIdFromModel());
|
||||
communicator.receive(destination, new TransactionalMessageHandler<String>() {
|
||||
@Override
|
||||
public void onMessage(DeliveredMessage<String> message) throws IOException {
|
||||
logger.debug("Received MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), message.getCorrelationId(), message.getMessageId());
|
||||
|
||||
if (message.getMessageId() != null)
|
||||
mqExecution.setMqVariable(Constants.VARIABLE_MESSAGE_ID, message.getMessageId());
|
||||
mqExecution.setCorrelationId(message.getCorrelationId());
|
||||
mqExecution.setDeliveryTime(message.getDeliveryTime());
|
||||
mqExecution.setPriority(message.getPriority());
|
||||
mqExecution.setPayload(message.getContent(), message.getContentType());
|
||||
mqExecution.setReplyToQueueName(message.getReplyToQueueName());
|
||||
mqExecution.setStatusQueueName(message.getStatusQueueName());
|
||||
}
|
||||
});
|
||||
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
|
||||
communicator.receive(destination,
|
||||
new MqSubscriptionListener() {
|
||||
@Override
|
||||
public void consuming(AutoCloseable consumerCloseable) {
|
||||
subscriptionService.consuming(execution, consumerCloseable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consumed(AutoCloseable consumerCloseable) {
|
||||
subscriptionService.consumed(execution, consumerCloseable);
|
||||
}
|
||||
},
|
||||
new TransactionalMessageHandler<String>() {
|
||||
@Override
|
||||
public void onMessage(DeliveredMessage<String> message) throws IOException {
|
||||
logger.debug("Received MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), message.getCorrelationId(), message.getMessageId());
|
||||
|
||||
if (message.getMessageId() != null)
|
||||
mqExecution.setMqVariable(Constants.VARIABLE_MESSAGE_ID, message.getMessageId());
|
||||
mqExecution.setCorrelationId(message.getCorrelationId());
|
||||
mqExecution.setDeliveryTime(message.getDeliveryTime());
|
||||
mqExecution.setPriority(message.getPriority());
|
||||
mqExecution.setPayload(message.getContent(), message.getContentType());
|
||||
mqExecution.setReplyToQueueName(message.getReplyToQueueName());
|
||||
mqExecution.setStatusQueueName(message.getStatusQueueName());
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
@@ -103,5 +123,10 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
throw new BpmnError("mq", "JMS communication failed: " + je.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getConcurrency(DelegateExecution execution) {
|
||||
MqServiceTask task = this.msts.get(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
|
||||
return task == null ? 1 : task.getFieldValueFromModel(Constants.FIELD_CONCURRENCY, Integer.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -5,8 +5,12 @@ import org.activiti.engine.delegate.DelegateExecution;
|
||||
|
||||
public class MqSubscribeDelegateExecution extends MqDelegateExecution {
|
||||
|
||||
public MqSubscribeDelegateExecution(ProcessEngine services, DelegateExecution execution) {
|
||||
super(services, execution);
|
||||
public MqSubscribeDelegateExecution(ProcessEngine services, MqServiceTaskService msts, DelegateExecution execution) {
|
||||
super(services, msts, execution);
|
||||
}
|
||||
|
||||
public Integer getConcurrencyFromModel() {
|
||||
return this.getFieldValueFromModel(Constants.FIELD_CONCURRENCY, Integer.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -30,6 +30,12 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
|
||||
@Autowired
|
||||
private ProcessEngine services;
|
||||
|
||||
@Autowired
|
||||
private MqServiceTaskService msts;
|
||||
|
||||
@Autowired
|
||||
private MqSubscriptionService subscriptionService;
|
||||
|
||||
/**
|
||||
* This method listens for a reply message on an MQ queue.
|
||||
*
|
||||
@@ -65,7 +71,7 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
|
||||
*/
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, execution);
|
||||
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, this.msts, execution);
|
||||
mqExecution.validate();
|
||||
|
||||
GenericDestination destination = new GenericDestination();
|
||||
@@ -77,21 +83,34 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
|
||||
this.logger.debug("Will look only for MQ message: {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), correlationId);
|
||||
|
||||
try {
|
||||
MqCommunicator communicator = this.getConnection(mqExecution.getConnectorIdFromModel());
|
||||
communicator.receive(destination, correlationId, new TransactionalMessageHandler<String>() {
|
||||
@Override
|
||||
public void onMessage(DeliveredMessage<String> message) {
|
||||
logger.debug("Received MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), message.getCorrelationId(), message.getMessageId());
|
||||
|
||||
if (message.getMessageId() != null)
|
||||
mqExecution.setMqVariable(Constants.VARIABLE_MESSAGE_ID, message.getMessageId());
|
||||
mqExecution.setDeliveryTime(message.getDeliveryTime());
|
||||
mqExecution.setPriority(message.getPriority());
|
||||
mqExecution.setPayload(message.getContent(), message.getContentType());
|
||||
mqExecution.setReplyToQueueName(message.getReplyToQueueName());
|
||||
mqExecution.setStatusQueueName(message.getStatusQueueName());
|
||||
}
|
||||
});
|
||||
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
|
||||
communicator.receive(destination, correlationId,
|
||||
new MqSubscriptionListener() {
|
||||
@Override
|
||||
public void consuming(AutoCloseable consumerCloseable) {
|
||||
subscriptionService.consuming(execution, consumerCloseable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consumed(AutoCloseable consumerCloseable) {
|
||||
subscriptionService.consumed(execution, consumerCloseable);
|
||||
}
|
||||
},
|
||||
new TransactionalMessageHandler<String>() {
|
||||
@Override
|
||||
public void onMessage(DeliveredMessage<String> message) {
|
||||
logger.debug("Received MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), message.getCorrelationId(), message.getMessageId());
|
||||
|
||||
if (message.getMessageId() != null)
|
||||
mqExecution.setMqVariable(Constants.VARIABLE_MESSAGE_ID, message.getMessageId());
|
||||
mqExecution.setDeliveryTime(message.getDeliveryTime());
|
||||
mqExecution.setPriority(message.getPriority());
|
||||
mqExecution.setPayload(message.getContent(), message.getContentType());
|
||||
mqExecution.setReplyToQueueName(message.getReplyToQueueName());
|
||||
mqExecution.setStatusQueueName(message.getStatusQueueName());
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
|
@@ -0,0 +1,11 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
public interface MqSubscriptionListener {
|
||||
|
||||
default void consuming(AutoCloseable consumerCloseable) {
|
||||
}
|
||||
|
||||
default void consumed(AutoCloseable consumerCloseable) {
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,62 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class MqSubscriptionService extends MqExecutionService {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
/**
|
||||
* The size of the map has no limit. It will grow with the number of
|
||||
* executions (related to process instances).
|
||||
*
|
||||
* This means the map needs to be trimmed. When an MQ subscribe task is
|
||||
* completed, it is paramount to remove the execution from the map. It is
|
||||
* also necessary to remove the execution when it is removed from the
|
||||
* `activityExecutionMap` map.
|
||||
*/
|
||||
private Map<String, AutoCloseable> executionSubscriptionMap = new HashMap<>();
|
||||
|
||||
public synchronized void consuming(DelegateExecution execution, AutoCloseable consumerCloseable) {
|
||||
this.executing(execution);
|
||||
this.executionSubscriptionMap.put(execution.getId(), consumerCloseable);
|
||||
}
|
||||
|
||||
public synchronized void consumed(DelegateExecution execution, AutoCloseable consumerCloseable) {
|
||||
AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
|
||||
if (cachedConsumerCloseable != consumerCloseable)
|
||||
throw new IllegalStateException("The consumer objects were expected to be identical");
|
||||
|
||||
this.executionSubscriptionMap.remove(execution.getId());
|
||||
this.executed(execution);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param processDefinitionId A process definition identifier.
|
||||
* @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended.
|
||||
*/
|
||||
@Override
|
||||
public synchronized Set<String> clear(String processDefinitionId) throws Exception {
|
||||
Set<String> executionIds = super.clear(processDefinitionId);
|
||||
|
||||
for (String executionId : executionIds) {
|
||||
this.logger.trace("Clearing process definition execution: {}: {}", processDefinitionId, executionId);
|
||||
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
|
||||
if (consumer != null) {
|
||||
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionId, executionId, consumer);
|
||||
consumer.close();
|
||||
}
|
||||
}
|
||||
|
||||
return executionIds;
|
||||
}
|
||||
|
||||
}
|
@@ -34,6 +34,14 @@ public interface PreparedMessage<BodyType> extends Message {
|
||||
}
|
||||
}
|
||||
|
||||
default Integer getPriority() {
|
||||
return this.getProperty("priority");
|
||||
}
|
||||
|
||||
default void setPriority(Integer priority) {
|
||||
this.setProperty("priority", priority);
|
||||
}
|
||||
|
||||
default String getReplyToQueueName() {
|
||||
return this.getProperty("replyToQueueName");
|
||||
}
|
||||
|
@@ -64,6 +64,10 @@ public class TenantFinderService implements ApplicationListener<ApplicationStart
|
||||
return tenantId == null ? null : ("tenant_" + tenantId);
|
||||
}
|
||||
|
||||
public String transform(Tenant tenant) {
|
||||
return tenant == null ? null : this.transform(tenant.getId());
|
||||
}
|
||||
|
||||
public Long transform(String tenantId) {
|
||||
return tenantId == null ? null : Long.parseLong(tenantId.substring("tenant_".length()));
|
||||
}
|
||||
|
@@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory;
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.GenericDestination;
|
||||
import com.inteligr8.activiti.mq.MqCommunicator;
|
||||
import com.inteligr8.activiti.mq.MqSubscriptionListener;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
@@ -103,11 +104,14 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, final String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
public <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, final String correlationId, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
Connection con = this.connect();
|
||||
try {
|
||||
Channel channel = con.createChannel();
|
||||
try {
|
||||
if (listener != null)
|
||||
listener.consuming(channel);
|
||||
|
||||
channel.queueDeclare(destination.getQueueName(), true, false, false, Collections.emptyMap());
|
||||
channel.basicConsume(
|
||||
destination.getQueueName(),
|
||||
@@ -115,6 +119,7 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
new DeliverCallback() {
|
||||
@Override
|
||||
public void handle(String consumerTag, Delivery message) throws IOException {
|
||||
// FIXME we need a better solution here; maybe one consumer that distributes them baced on correlationId
|
||||
if (correlationId != null && !correlationId.equals(message.getProperties().getCorrelationId()))
|
||||
channel.basicNack(message.getEnvelope().getDeliveryTag(), true, true);
|
||||
|
||||
@@ -129,6 +134,9 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
}
|
||||
);
|
||||
} finally {
|
||||
if (listener != null)
|
||||
listener.consumed(channel);
|
||||
|
||||
channel.close();
|
||||
}
|
||||
} finally {
|
||||
|
@@ -45,7 +45,7 @@ public class AmqpPreparedMessage<BodyType> extends AbstractMessage implements Pr
|
||||
"utf-8",
|
||||
this.getProperties(),
|
||||
DeliveryMode.PERSISTENT, // yes, this is a JMS class, but the values are the same; survive restarts
|
||||
null, // default priority
|
||||
this.getPriority(),
|
||||
correlationId,
|
||||
this.getReplyToQueueName() != null ? this.getReplyToQueueName() : null,
|
||||
null, // expiration
|
||||
|
@@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.GenericDestination;
|
||||
import com.inteligr8.activiti.mq.MqCommunicator;
|
||||
import com.inteligr8.activiti.mq.MqSubscriptionListener;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
|
||||
|
||||
@@ -109,21 +110,21 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, TimeoutException, IOException {
|
||||
public <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, TimeoutException, IOException {
|
||||
Connection con = this.connect();
|
||||
try {
|
||||
return this.receive(con, destination, timeoutInMillis, correlationId, handler);
|
||||
return this.receive(con, destination, timeoutInMillis, correlationId, listener, handler);
|
||||
} finally {
|
||||
con.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected <BodyType> JmsDeliveredMessage<BodyType> receive(Connection con, GenericDestination destination, long timeoutInMillis, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, TimeoutException, IOException {
|
||||
protected <BodyType> JmsDeliveredMessage<BodyType> receive(Connection con, GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, TimeoutException, IOException {
|
||||
con.start();
|
||||
try {
|
||||
Session session = this.start(con);
|
||||
try {
|
||||
JmsDeliveredMessage<BodyType> message = this.receive(session, destination, timeoutInMillis, correlationId);
|
||||
JmsDeliveredMessage<BodyType> message = this.receive(session, destination, timeoutInMillis, correlationId, listener);
|
||||
if (message == null)
|
||||
return null;
|
||||
|
||||
@@ -140,10 +141,13 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
}
|
||||
}
|
||||
|
||||
public <BodyType> JmsDeliveredMessage<BodyType> receive(Session session, GenericDestination destination, long timeoutInMillis, String correlationId) throws JMSException, TimeoutException {
|
||||
public <BodyType> JmsDeliveredMessage<BodyType> receive(Session session, GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener) throws JMSException, TimeoutException {
|
||||
String messageSelector = correlationId == null ? null : ("JMSCorrelationID='" + correlationId + "'");
|
||||
MessageConsumer messenger = session.createConsumer(destination.toJmsQueue(session), messageSelector);
|
||||
try {
|
||||
if (listener != null)
|
||||
listener.consuming(messenger);
|
||||
|
||||
if (timeoutInMillis < 0L) {
|
||||
this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName());
|
||||
return JmsDeliveredMessage.transform(messenger.receive());
|
||||
@@ -167,8 +171,10 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (listener != null)
|
||||
listener.consumed(messenger);
|
||||
|
||||
messenger.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -56,6 +56,9 @@ public class JmsPreparedMessage<BodyType> extends AbstractMessage implements Pre
|
||||
if (this.getReplyToQueueName() != null)
|
||||
message.setJMSReplyTo(session.createQueue(this.getReplyToQueueName()));
|
||||
|
||||
if (this.getPriority() != null)
|
||||
message.setJMSPriority(this.getPriority());
|
||||
|
||||
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); // survive restarts
|
||||
return message;
|
||||
}
|
||||
|
Reference in New Issue
Block a user