Merge branch 'develop' into stable
This commit is contained in:
@@ -6,6 +6,10 @@
|
||||
|
||||
This delegate sends a message to an MQ queue.
|
||||
|
||||
The process instance will continue after a successful publish of the message by this delegate to the specified queue. Use `mqSubscribeReplyDelegate` in a subsequent task to handle the response asynchronously.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqSubscribeReplyDelegate` task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
|
||||
*See Also*: [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md)
|
||||
*See Also*: [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md)
|
||||
|
||||
@@ -20,21 +24,25 @@ ${mqPublishDelegate}
|
||||
You may use it in a **Service Task**.
|
||||
|
||||
|
||||
This method sends a message to an MQ queue.
|
||||
|
||||
It does not wait for a response and will continue after a successful publish of the message to the specified queue. Use `mqSubscribeReplyDelegate` in a subsequent task to handle the response asynchronously.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqPublishDelegate` task.
|
||||
This method makes this bean an Activiti delegate.
|
||||
|
||||
| Input Type | Name | Java Type | Documentation |
|
||||
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
|
||||
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
|
||||
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set the `mq_messageId` result variable in the process instance scope; otherwise the variable will be local to the task. `mq_correlationId` is always at the process scope. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process. |
|
||||
| BPMN Field | `mq_priority` | | [optional] A priority of the MQ message. May be an expression. Value depends on MQ protocol. |
|
||||
| BPMN Field | `mq_payload` | | [optional] The body of the MQ message. May include expressions. |
|
||||
| BPMN Field | `mq_payloadMimeType` | | [optional] The MIME type of the body of the MQ message. |
|
||||
| BPMN Field | `mq_replyQueueName` | | [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send a reply. |
|
||||
| BPMN Field | `mq_statusQueueName` | | [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send status updates. |
|
||||
| Activiti Variable | `mq_correlationId` | | [optional] The correlationId of the message to send. |
|
||||
|
||||
| Result Type | Java Type, Name, or Error Code | Documentation |
|
||||
| ------------------------ | ------------------------------------------------ | -------------------------------- |
|
||||
| Activiti Variable | `mq_correlationId` | The correlationId of the message sent. |
|
||||
| Activiti Variable | `mq_messageId` | The messageId of the message sent. |
|
||||
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
|
||||
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |
|
||||
| Thrown BPMN Error | `mq` | An unknown MQ issue occurred. |
|
||||
|
@@ -6,6 +6,14 @@
|
||||
|
||||
This delegate listens for messages on an MQ queue.
|
||||
|
||||
This is expected to exist in a Service Task immediately after a plain start activity. This will cause process instances to automatically be created in order to maintain the MQ subscription as messages are received. If used in any other way, it will error and the process will fail validation.
|
||||
|
||||
This does not wait for a response to a specific message, but instead to all messages put on an MQ queue. That is for the `mqSubscribeReplyDelegate` delegate.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
|
||||
This requires a long running connection to MQ. It runs in a long running Activiti job/execution. If there is a failure or the server is restarted, the Activiti job will fail and automatically retry per the Activiti standard features. After exhausting retries, it may eventually dead-letter. Retry the job to continue the subscription.
|
||||
|
||||
*See Also*: [`mqPublishDelegate`](bean-mqPublishDelegate.md)
|
||||
*See Also*: [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md)
|
||||
|
||||
@@ -20,22 +28,15 @@ ${mqSubscribeDelegate}
|
||||
You may use it in a **Service Task**.
|
||||
|
||||
|
||||
This delegate listens for messages on an MQ queue.
|
||||
|
||||
It does not wait for a response to a specific message put on an MQ queue. That is for the `mqPublishDelegate` and `mqSubscribeReplyDelegate` tasks.
|
||||
|
||||
When used, this task must be the first task after a plain start event in a process. If used in any other way, it will error and the process will fail validation.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
|
||||
TODO map response to variables
|
||||
This method makes this bean an Activiti delegate.
|
||||
|
||||
| Input Type | Name | Java Type | Documentation |
|
||||
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
|
||||
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
|
||||
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process. |
|
||||
| BPMN Field | `mq_concurrency` | | The number of process instances to simultaneously listen on the queue. Only positive numbers are accepted. |
|
||||
|
||||
| Result Type | Java Type, Name, or Error Code | Documentation |
|
||||
| ------------------------ | ------------------------------------------------ | -------------------------------- |
|
||||
@@ -43,7 +44,10 @@ TODO map response to variables
|
||||
| Activiti Variable | `mq_messageId` | The unique message identifer of the message received. |
|
||||
| Activiti Variable | `mq_deliveryTime` | The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti. |
|
||||
| Activiti Variable | `mq_priority` | An integer priority of the message; value depends on MQ protocol. |
|
||||
| Activiti Variable | `mq_replyQueueName` | The name of a queue or topic to use to reply to the received message. The reply message must have the same correlating identifier. |
|
||||
| Activiti Variable | `mq_payload` | The body of the MQ message. May include expressions. May be `null`. |
|
||||
| Activiti Variable | `mq_payloadMimeType` | The MIME type of the body of the MQ message. May be `null`. |
|
||||
| Activiti Variable | `mq_replyQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
|
||||
| Activiti Variable | `mq_statusQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
|
||||
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
|
||||
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |
|
||||
| Thrown BPMN Error | `mq` | An unknown MQ issue occurred. |
|
||||
|
@@ -4,10 +4,16 @@
|
||||
*Since Version*: 1.0
|
||||
|
||||
|
||||
This delegate listens for a reply message on an MQ queue.
|
||||
This method listens for a reply message on an MQ queue.
|
||||
|
||||
*See Also*: mqPublishDelegate
|
||||
*See Also*: mqSubscribeDelegate
|
||||
The process instance will block until a corresponding reply message is received. It uses the `mq_correlationId` variable to select the corresponding reply message. That variable is automatically set by the `mqPublishDelegate` task. This is meant to be used after `mqPublishDelegate` and not just by itself. If you want to start processes with an MQ subscription see the `mqSubscribeDelegate` task.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqPublishDelegate` task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
|
||||
This requires a long running connection to MQ. It runs in a long running Activiti job/execution. If there is a failure or the server is restarted, the Activiti job will fail and automatically retry per the Activiti standard features. After exhausting retries, it may eventually dead-letter. Retry the job to continue the subscription.
|
||||
|
||||
*See Also*: [`mqPublishDelegate`](bean-mqPublishDelegate.md)
|
||||
*See Also*: [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md)
|
||||
|
||||
## <a name="delegate"></a> Delegate Expression Uses
|
||||
|
||||
@@ -20,20 +26,14 @@ ${mqSubscribeReplyDelegate}
|
||||
You may use it in a **Service Task**.
|
||||
|
||||
|
||||
This method listens for a reply message on an MQ queue.
|
||||
|
||||
It uses the `mq_correlationId` variable to select the corresponding reply message. That variable is automatically set by the `mqPublishDelegate` task. This is meant to be used after `mqPublishDelegate` and not just by itself. If you want to start processes with an MQ subscription see the `mqSubscribeDelegate` task.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqPublishDelegate` task.
|
||||
|
||||
TODO map response to variables
|
||||
This method makes this bean an Activiti delegate.
|
||||
|
||||
| Input Type | Name | Java Type | Documentation |
|
||||
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
|
||||
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
|
||||
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process. |
|
||||
| Activiti Variable | `mq_correlationId` | | The correlating identifier of the message to receive. Used to correlate between sent/received messages; like a thread of communication. |
|
||||
|
||||
| Result Type | Java Type, Name, or Error Code | Documentation |
|
||||
@@ -41,7 +41,10 @@ TODO map response to variables
|
||||
| Activiti Variable | `mq_messageId` | The unique message identifer of the message received. |
|
||||
| Activiti Variable | `mq_deliveryTime` | The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti. |
|
||||
| Activiti Variable | `mq_priority` | An integer priority of the message; value depends on MQ protocol. |
|
||||
| Activiti Variable | `mq_replyQueueName` | The name of a queue or topic to use to reply to the received message. The reply message must have the same correlating identifier. |
|
||||
| Activiti Variable | `mq_payload` | The body of the MQ message. May include expressions. May be `null`. |
|
||||
| Activiti Variable | `mq_payloadMimeType` | The MIME type of the body of the MQ message. May be `null`. |
|
||||
| Activiti Variable | `mq_replyQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
|
||||
| Activiti Variable | `mq_statusQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
|
||||
| Thrown BPMN Error | `correlation` | `mq_correlationId` is required. |
|
||||
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
|
||||
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |
|
||||
|
@@ -23,7 +23,7 @@ It is important to note that Activiti expressions use the JUEL language and Acti
|
||||
| -------------------------------- | ------------------------- |
|
||||
| [`mqPublishDelegate`](bean-mqPublishDelegate.md) | This delegate sends a message to an MQ queue. |
|
||||
| [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md) | This delegate listens for messages on an MQ queue. |
|
||||
| [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md) | This delegate listens for a reply message on an MQ queue. |
|
||||
| [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md) | This method listens for a reply message on an MQ queue. |
|
||||
|
||||
---
|
||||
|
||||
|
2
pom.xml
2
pom.xml
@@ -27,7 +27,7 @@
|
||||
<spring.version>5.3.31</spring.version>
|
||||
<activiti.version>7.11.1</activiti.version>
|
||||
<tomcat-rad.version>9-2.2</tomcat-rad.version>
|
||||
<aps.tomcat.opts>-Dinteligr8.mq.connectors.docker-mq.url=failover:\(tcp://${project.artifactId}-mq:61616\)?timeout=3000 -Dinteligr8.mq.connectors.docker-mq.username=admin -Dinteligr8.mq.connectors.docker-mq.password=admin</aps.tomcat.opts>
|
||||
<aps.tomcat.opts>-Dinteligr8.mq.connectors.docker-mq.url=failover:\(tcp://${project.artifactId}-mq:61616\)?timeout=3000 -Dinteligr8.mq.connectors.docker-mq.username=admin -Dinteligr8.mq.connectors.docker-mq.password=admin -Dvalidator.editor.dmn.espression=false -Dvalidator.editor.bpmn.disable.scripttask=false -Djavascript.secure-scripting.enabled=false -Djavascript.secure-scripting.enable-class-whitelisting=false -Dbeans.whitelisting.enabled=false -Del.whitelisting.enabled=false -Dshell.whitelisting.enabled=false</aps.tomcat.opts>
|
||||
|
||||
<!-- reloads in APS are slower than a restart -->
|
||||
<aps.hotswap.enabled>false</aps.hotswap.enabled>
|
||||
|
@@ -94,11 +94,11 @@ public class ActivitiEntityEventMonitor implements ActivitiEventListener, Applic
|
||||
*/
|
||||
@Override
|
||||
public void onEvent(ActivitiEvent event) {
|
||||
this.logger.trace("Triggered by event: {}", event);
|
||||
this.logger.trace("Triggered by event: {}: {}; pdId: {}; piId: {}; execId: {}", event.getType(), event.getClass(), event.getProcessDefinitionId(), event.getProcessInstanceId(), event.getExecutionId());
|
||||
|
||||
for (ActivitiEntityEventListener<? extends Entity> listener : this.listeners) {
|
||||
ActivitiEntityEvent aaevent = (ActivitiEntityEvent) event;
|
||||
if (listener.ofEntityType((Entity) aaevent.getEntity()))
|
||||
if (aaevent.getEntity() instanceof Entity && listener.ofEntityType((Entity) aaevent.getEntity()))
|
||||
listener.onEntityEvent(aaevent);
|
||||
}
|
||||
}
|
||||
|
@@ -5,13 +5,16 @@ import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
public interface MqCommunicator {
|
||||
|
||||
boolean validateConnection();
|
||||
|
||||
<BodyType> PreparedMessage<BodyType> createPreparedMessage();
|
||||
|
||||
<BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException;
|
||||
<BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException;
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, null, null, null);
|
||||
|
@@ -54,7 +54,12 @@ public class MqDelegateExecution {
|
||||
}
|
||||
|
||||
protected void setMqVariable(String varName, Object value) {
|
||||
varName = this.formulateVariableName(varName);
|
||||
if (this.task.doWriteToProcessScope()) {
|
||||
this.execution.setVariable(varName, value);
|
||||
} else {
|
||||
this.execution.setVariableLocal(varName, value);
|
||||
}
|
||||
}
|
||||
|
||||
public String formulateVariableName(String varName) {
|
||||
|
@@ -73,9 +73,9 @@ public class MqDeploymentEventListener implements ActivitiEntityEventListener<De
|
||||
try {
|
||||
Set<String> executionIds = this.subscriptionService.cancelAllOtherVersions(procDefId);
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
|
||||
this.logger.debug("Subscription starter executions ended early: {}: {}", procDefId, executionIds);
|
||||
} else {
|
||||
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
|
||||
this.logger.info("Subscriptions starter ended early: {}: {}", procDefId, executionIds.size());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
|
||||
|
@@ -138,8 +138,17 @@ public class MqExecutionService {
|
||||
* @return `true` if execution was cached; `false` otherwise.
|
||||
*/
|
||||
public boolean cancel(String executionId) throws Exception {
|
||||
this.logger.trace("cancel({})", executionId);
|
||||
Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult();
|
||||
if (execution == null) {
|
||||
this.logger.debug("No execution to cancel: {}", executionId);
|
||||
return false;
|
||||
}
|
||||
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
|
||||
if (pi == null) {
|
||||
this.logger.debug("No process instance to cancel: {}", executionId);
|
||||
return false;
|
||||
}
|
||||
|
||||
return this.cancel(pi.getProcessDefinitionId(), executionId);
|
||||
}
|
||||
@@ -152,6 +161,7 @@ public class MqExecutionService {
|
||||
* @return A set of execution identifiers that were cancelled.
|
||||
*/
|
||||
public Set<String> cancelAll(String processDefinitionId) throws Exception {
|
||||
this.logger.trace("cancelAll({})", processDefinitionId);
|
||||
ProcessDefinition processDefinition = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
|
||||
String processDefinitionKey = processDefinition.getKey();
|
||||
|
||||
@@ -163,7 +173,10 @@ public class MqExecutionService {
|
||||
* @return A set of execution identifiers that were in the now cleared map.
|
||||
*/
|
||||
public synchronized Set<String> cancelAllOtherVersions(String latestProcessDefinitionId) throws Exception {
|
||||
this.logger.trace("cancelAllOtherVersions({})", latestProcessDefinitionId);
|
||||
ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId);
|
||||
if (latestProcessDefinition == null)
|
||||
return Collections.emptySet();
|
||||
String processDefinitionKey = latestProcessDefinition.getKey();
|
||||
|
||||
Set<String> executionIds = new HashSet<>();
|
||||
|
@@ -8,6 +8,9 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* This needs to be changed to detect just process versioning (deactivation?)
|
||||
*/
|
||||
@Component
|
||||
public class MqJobEventListener implements ActivitiEntityEventListener<JobEntity> {
|
||||
|
||||
@@ -31,14 +34,13 @@ public class MqJobEventListener implements ActivitiEntityEventListener<JobEntity
|
||||
String executionId, JobEntity entity) {
|
||||
this.logger.trace("Triggered by job event: {}", entity);
|
||||
|
||||
switch (eventType) {
|
||||
case ENTITY_DELETED:
|
||||
case ENTITY_SUSPENDED:
|
||||
// we need to stop the listener
|
||||
this.onJobRemoveEvent(entity);
|
||||
break;
|
||||
default:
|
||||
}
|
||||
// switch (eventType) {
|
||||
// case ENTITY_DELETED:
|
||||
// case ENTITY_SUSPENDED:
|
||||
// this.onJobRemoveEvent(entity);
|
||||
// break;
|
||||
// default:
|
||||
// }
|
||||
}
|
||||
|
||||
protected void onJobRemoveEvent(JobEntity entity) {
|
||||
|
@@ -2,6 +2,7 @@ package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.activiti.engine.ActivitiObjectNotFoundException;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventType;
|
||||
import org.activiti.engine.impl.persistence.entity.Entity;
|
||||
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
|
||||
@@ -51,8 +52,9 @@ public class MqProcessDefinitionEventListener implements ActivitiEntityEventList
|
||||
this.logger.trace("Triggered by process definition event: {}", entity);
|
||||
|
||||
switch (eventType) {
|
||||
case ENTITY_CREATED:
|
||||
case ENTITY_INITIALIZED:
|
||||
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
|
||||
// we cannot use the ProcessDefinitionEntity for ENTITY_CREATED or ENTITY_INITIALIZED as we need the BpmnModel and it is not yet cached
|
||||
// we must use DeploymentEntity and then dig down for the process definitions
|
||||
break;
|
||||
case ENTITY_DELETED:
|
||||
@@ -61,8 +63,6 @@ public class MqProcessDefinitionEventListener implements ActivitiEntityEventList
|
||||
this.onProcessDefinitionRemoveEvent((ProcessDefinitionEntity) entity);
|
||||
break;
|
||||
default:
|
||||
// we need to start a listener
|
||||
this.onProcessDefinitionAddEvent((ProcessDefinitionEntity) entity);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -70,11 +70,15 @@ public class MqProcessDefinitionEventListener implements ActivitiEntityEventList
|
||||
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
|
||||
this.logger.debug("Triggered by process definition addition: {}", entity);
|
||||
|
||||
try {
|
||||
// cancel subscriptions on all legacy version process definitions
|
||||
this.unsubscribeOtherMqSubscribeTasks(entity.getId());
|
||||
|
||||
if (this.registry.isMqStart(entity.getId()))
|
||||
this.looper.loop(entity.getId());
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
this.logger.debug("Added process definition could not be found; because of orders of operation or transaction isolation: {}: {}", entity.getId(), aonfe.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
|
||||
@@ -95,6 +99,8 @@ public class MqProcessDefinitionEventListener implements ActivitiEntityEventList
|
||||
} else {
|
||||
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
|
||||
}
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
throw aonfe;
|
||||
} catch (Exception e) {
|
||||
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
|
||||
}
|
||||
@@ -108,6 +114,8 @@ public class MqProcessDefinitionEventListener implements ActivitiEntityEventList
|
||||
} else {
|
||||
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
|
||||
}
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
throw aonfe;
|
||||
} catch (Exception e) {
|
||||
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
|
||||
}
|
||||
|
@@ -13,9 +13,21 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
/**
|
||||
* This delegate sends a message to an MQ queue.
|
||||
*
|
||||
* The process instance will continue after a successful publish of the message
|
||||
* by this delegate to the specified queue. Use `mqSubscribeReplyDelegate` in
|
||||
* a subsequent task to handle the response asynchronously.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this and the corresponding
|
||||
* `mqSubscribeReplyDelegate` task. The resultant variables will then include
|
||||
* that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
*
|
||||
* @author brian@inteligr8.com
|
||||
* @since 1.0
|
||||
* @see mqSubscribeReplyDelegate#
|
||||
@@ -34,28 +46,21 @@ public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
private MqServiceTaskService msts;
|
||||
|
||||
/**
|
||||
* This method sends a message to an MQ queue.
|
||||
*
|
||||
* It does not wait for a response and will continue after a successful
|
||||
* publish of the message to the specified queue. Use
|
||||
* `mqSubscribeReplyDelegate` in a subsequent task to handle the response
|
||||
* asynchronously.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this and the corresponding `mqPublishDelegate`
|
||||
* task.
|
||||
* This method makes this bean an Activiti delegate.
|
||||
*
|
||||
* @param execution An Activiti delegate execution (source task or execution/task listener).
|
||||
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_priority [optional] A priority of the MQ message. May be an expression. Value depends on MQ protocol.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set the `mq_messageId` result variable in the process instance scope; otherwise the variable will be local to the task. `mq_correlationId` is always at the process scope.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_priority [optional] A priority of the MQ message. May be an expression. Supported range depends on MQ protocol.
|
||||
* @field mq_payload [optional] The body of the MQ message. May include expressions.
|
||||
* @field mq_payloadMimeType [optional] The MIME type of the body of the MQ message.
|
||||
* @field mq_payloadMimeType [optional] The MIME type of the `mq_payload`.
|
||||
* @field mq_replyQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send a reply.
|
||||
* @field mq_statusQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send status updates.
|
||||
* @varIn mq_correlationId [optional] The correlationId of the message to send.
|
||||
* @varOut mq_correlationId The correlationId of the message sent.
|
||||
* @varOut mq_messageId The messageId of the message sent.
|
||||
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
|
||||
* @throws BPMNError network The MQ connection experienced network issues.
|
||||
* @throws BPMNError mq An unknown MQ issue occurred.
|
||||
@@ -91,10 +96,11 @@ public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
}
|
||||
}
|
||||
|
||||
String correlationId = communicator.send(destination, message);
|
||||
this.logger.debug("Sent MQ message: {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), correlationId);
|
||||
DeliveredMessage<?> deliveredMessage = communicator.send(destination, message);
|
||||
this.logger.debug("Sent MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), deliveredMessage.getMessageId(), deliveredMessage.getCorrelationId());
|
||||
|
||||
mqExecution.setCorrelationId(correlationId);
|
||||
mqExecution.setMessageId(deliveredMessage.getMessageId());
|
||||
mqExecution.setCorrelationId(deliveredMessage.getCorrelationId());
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
|
@@ -13,9 +13,30 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
/**
|
||||
* This delegate listens for messages on an MQ queue.
|
||||
*
|
||||
* This is expected to exist in a Service Task immediately after a plain start
|
||||
* activity. This will cause process instances to automatically be created in
|
||||
* order to maintain the MQ subscription as messages are received. If used in
|
||||
* any other way, it will error and the process will fail validation.
|
||||
*
|
||||
* This does not wait for a response to a specific message, but instead to all
|
||||
* messages put on an MQ queue. That is for the `mqSubscribeReplyDelegate`
|
||||
* delegate.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this task. The resultant variables will then
|
||||
* include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
*
|
||||
* This requires a long running connection to MQ. It runs in a long running
|
||||
* Activiti job/execution. If there is a failure or the server is restarted,
|
||||
* the Activiti job will fail and automatically retry per the Activiti standard
|
||||
* features. After exhausting retries, it may eventually dead-letter. Retry
|
||||
* the job to continue the subscription.
|
||||
*
|
||||
* @author brian@inteligr8.com
|
||||
* @since 1.0
|
||||
* @see mqPublishDelegate#
|
||||
@@ -37,35 +58,22 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
private MqSubscriptionService subscriptionService;
|
||||
|
||||
/**
|
||||
* This delegate listens for messages on an MQ queue.
|
||||
*
|
||||
* It does not wait for a response to a specific message put on an MQ queue.
|
||||
* That is for the `mqPublishDelegate` and `mqSubscribeReplyDelegate` tasks.
|
||||
*
|
||||
* When used, this task must be the first task after a plain start event in a
|
||||
* process. If used in any other way, it will error and the process will fail
|
||||
* validation.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this task. The resultant variables will then
|
||||
* include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
*
|
||||
* TODO map response to variables
|
||||
* This method makes this bean an Activiti delegate.
|
||||
*
|
||||
* @param execution An Activiti delegate execution (source task or execution/task listener).
|
||||
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_concurrency The number of process instances to simultaneously listen on the queue. Only positive numbers are accepted.
|
||||
* @varOut mq_correlationId The correlating identifier of the message received. Use to correlate between sent/received messages; like a thread of communication.
|
||||
* @varOut mq_messageId The unique message identifer of the message received.
|
||||
* @varOut mq_deliveryTime The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti.
|
||||
* @varOut mq_priority An integer priority of the message; value depends on MQ protocol.
|
||||
* @varOut mq_payload The body of the MQ message. May include expressions. May be `null`.
|
||||
* @varOut mq_payloadMimeType The MIME type of the body of the MQ message. May be `null`.
|
||||
* @varOut mq_priority An integer priority of the message; supported range depends on MQ protocol.
|
||||
* @varOut mq_payload The body of the MQ message; may be `null`.
|
||||
* @varOut mq_payloadMimeType The MIME type of `mq_payload`; may be `null`.
|
||||
* @varOut mq_replyQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) where status updates on the received message are expected to be sent. The status message must have the same correlating identifier. May be `null`.
|
||||
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
|
||||
* @throws BPMNError network The MQ connection experienced network issues.
|
||||
* @throws BPMNError mq An unknown MQ issue occurred.
|
||||
@@ -84,11 +92,11 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
|
||||
try {
|
||||
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
|
||||
communicator.receive(destination,
|
||||
if (communicator.receive(destination,
|
||||
new MqSubscriptionListener() {
|
||||
@Override
|
||||
public void consuming(AutoCloseable consumerCloseable) {
|
||||
subscriptionService.consuming(execution, consumerCloseable);
|
||||
subscriptionService.autoconsuming(execution, consumerCloseable);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -111,7 +119,10 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
mqExecution.setStatusQueueName(message.getStatusQueueName());
|
||||
}
|
||||
}
|
||||
);
|
||||
) == null) {
|
||||
this.logger.debug("Gracefully stopped looking for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
|
||||
this.services.getRuntimeService().deleteProcessInstance(execution.getProcessInstanceId(), "MQ subscription cancelled gracefully");
|
||||
}
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
@@ -124,9 +135,4 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getConcurrency(DelegateExecution execution) {
|
||||
MqServiceTask task = this.msts.get(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
|
||||
return task == null ? 1 : task.getFieldValueFromModel(Constants.FIELD_CONCURRENCY, Integer.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -9,15 +9,6 @@ public class MqSubscribeDelegateExecution extends MqDelegateExecution {
|
||||
super(services, msts, execution);
|
||||
}
|
||||
|
||||
protected void setMqVariable(String varName, Object value) {
|
||||
varName = this.formulateVariableName(varName);
|
||||
if (this.task.doWriteToProcessScope()) {
|
||||
this.execution.setVariable(varName, value);
|
||||
} else {
|
||||
this.execution.setVariableLocal(varName, value);
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getConcurrencyFromModel() {
|
||||
return this.getFieldValueFromModel(Constants.FIELD_CONCURRENCY, Integer.class);
|
||||
}
|
||||
|
@@ -78,9 +78,15 @@ public class MqSubscribeLooper {
|
||||
}
|
||||
|
||||
private void loop(String tenantId, String processDefinitionId) {
|
||||
ProcessDefinition procDef = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
|
||||
if (procDef == null)
|
||||
throw new IllegalArgumentException("The process definition does not exist: " + processDefinitionId);
|
||||
if (procDef.isSuspended())
|
||||
throw new IllegalStateException("The process definition is suspended: " + processDefinitionId);
|
||||
|
||||
ServiceTask task = this.registry.findMqStartSubscribeTask(processDefinitionId);
|
||||
if (task == null)
|
||||
throw new IllegalArgumentException();
|
||||
throw new IllegalArgumentException("The process definition does not qualify for MQ subscription looping: " + processDefinitionId);
|
||||
|
||||
int concurrency = this.determineConcurrency(task);
|
||||
this.logger.debug("Process definition MQ subscription task is configured for concurrency: {}: {}", processDefinitionId, concurrency);
|
||||
|
@@ -13,8 +13,28 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
/**
|
||||
* This delegate listens for a reply message on an MQ queue.
|
||||
* This method listens for a reply message on an MQ queue.
|
||||
*
|
||||
* The process instance will block until a corresponding reply message is
|
||||
* received. It uses the `mq_correlationId` variable to select the
|
||||
* corresponding reply message. That variable is automatically set by the
|
||||
* `mqPublishDelegate` task. This is meant to be used after
|
||||
* `mqPublishDelegate` and not just by itself. If you want to start processes
|
||||
* with an MQ subscription see the `mqSubscribeDelegate` task.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this and the corresponding `mqPublishDelegate`
|
||||
* task. The resultant variables will then include that message name (e.g.
|
||||
* `mq_messageId_{mq_messageName}`).
|
||||
*
|
||||
* This requires a long running connection to MQ. It runs in a long running
|
||||
* Activiti job/execution. If there is a failure or the server is restarted,
|
||||
* the Activiti job will fail and automatically retry per the Activiti standard
|
||||
* features. After exhausting retries, it may eventually dead-letter. Retry
|
||||
* the job to continue the subscription.
|
||||
*
|
||||
* @author brian@inteligr8.com
|
||||
* @since 1.0
|
||||
@@ -37,33 +57,21 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
|
||||
private MqSubscriptionService subscriptionService;
|
||||
|
||||
/**
|
||||
* This method listens for a reply message on an MQ queue.
|
||||
*
|
||||
* It uses the `mq_correlationId` variable to select the corresponding reply
|
||||
* message. That variable is automatically set by the `mqPublishDelegate`
|
||||
* task. This is meant to be used after `mqPublishDelegate` and not just by
|
||||
* itself. If you want to start processes with an MQ subscription see the
|
||||
* `mqSubscribeDelegate` task.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this and the corresponding `mqPublishDelegate`
|
||||
* task.
|
||||
*
|
||||
* TODO map response to variables
|
||||
* This method makes this bean an Activiti delegate.
|
||||
*
|
||||
* @param execution An Activiti delegate execution (source task or execution/task listener).
|
||||
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process.
|
||||
* @varIn mq_correlationId The correlating identifier of the message to receive. Used to correlate between sent/received messages; like a thread of communication.
|
||||
* @varOut mq_messageId The unique message identifer of the message received.
|
||||
* @varOut mq_deliveryTime The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti.
|
||||
* @varOut mq_priority An integer priority of the message; value depends on MQ protocol.
|
||||
* @varOut mq_payload The body of the MQ message. May include expressions. May be `null`.
|
||||
* @varOut mq_payloadMimeType The MIME type of the body of the MQ message. May be `null`.
|
||||
* @varOut mq_priority An integer priority of the message; supported range depends on MQ protocol.
|
||||
* @varOut mq_payload The body of the MQ message; may be `null`.
|
||||
* @varOut mq_payloadMimeType The MIME type of `mq_payload`; may be `null`.
|
||||
* @varOut mq_replyQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) where status updates on the received message are expected to be sent. The status message must have the same correlating identifier. May be `null`.
|
||||
* @throws BPMNError correlation `mq_correlationId` is required.
|
||||
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
|
||||
* @throws BPMNError network The MQ connection experienced network issues.
|
||||
@@ -84,7 +92,7 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
|
||||
|
||||
try {
|
||||
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
|
||||
communicator.receive(destination, correlationId,
|
||||
if (communicator.receive(destination, correlationId,
|
||||
new MqSubscriptionListener() {
|
||||
@Override
|
||||
public void consuming(AutoCloseable consumerCloseable) {
|
||||
@@ -110,7 +118,10 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
|
||||
mqExecution.setStatusQueueName(message.getStatusQueueName());
|
||||
}
|
||||
}
|
||||
);
|
||||
) == null) {
|
||||
this.logger.debug("Gracefully stopped looking for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
|
||||
throw new BpmnError("cancelled", "MQ subscription cancelled gracefully");
|
||||
}
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
|
@@ -8,6 +8,7 @@ import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
import org.activiti.engine.repository.ProcessDefinition;
|
||||
import org.activiti.engine.runtime.Execution;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -30,16 +31,37 @@ public class MqSubscriptionService extends MqExecutionService {
|
||||
* also necessary to remove the execution when it is removed from the
|
||||
* `activityExecutionMap` map.
|
||||
*/
|
||||
private Map<String, AutoCloseable> executionSubscriptionMap = new HashMap<>();
|
||||
private Map<String, Pair<Boolean, AutoCloseable>> executionSubscriptionMap = new HashMap<>();
|
||||
|
||||
/**
|
||||
* This method registers a consuming execution; an execution that was not
|
||||
* automatically started as part of the framework. This is for reply
|
||||
* subscriptions and not starter subscriptions.
|
||||
*
|
||||
* @param execution An Activiti execution.
|
||||
* @param consumerCloseable A closeable MQ consumer.
|
||||
*/
|
||||
public synchronized void consuming(DelegateExecution execution, AutoCloseable consumerCloseable) {
|
||||
this.executing(execution);
|
||||
this.executionSubscriptionMap.put(execution.getId(), consumerCloseable);
|
||||
this.executionSubscriptionMap.put(execution.getId(), Pair.of(false, consumerCloseable));
|
||||
}
|
||||
|
||||
/**
|
||||
* This method registers an auto-consuming execution; an execution that was
|
||||
* automatically started as part of the framework. This is for starter
|
||||
* subscriptions and not reply subscriptions.
|
||||
*
|
||||
* @param execution An Activiti execution.
|
||||
* @param consumerCloseable A closeable MQ consumer.
|
||||
*/
|
||||
public synchronized void autoconsuming(DelegateExecution execution, AutoCloseable consumerCloseable) {
|
||||
this.executing(execution);
|
||||
this.executionSubscriptionMap.put(execution.getId(), Pair.of(true, consumerCloseable));
|
||||
}
|
||||
|
||||
public synchronized void consumed(DelegateExecution execution, AutoCloseable consumerCloseable) {
|
||||
AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
|
||||
if (cachedConsumerCloseable != consumerCloseable)
|
||||
Pair<Boolean, AutoCloseable> cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
|
||||
if (cachedConsumerCloseable != null && cachedConsumerCloseable.getRight() != consumerCloseable)
|
||||
throw new IllegalStateException("The consumer objects were expected to be identical");
|
||||
|
||||
this.executionSubscriptionMap.remove(execution.getId());
|
||||
@@ -48,7 +70,7 @@ public class MqSubscriptionService extends MqExecutionService {
|
||||
|
||||
@Override
|
||||
public synchronized boolean cancelled(Execution execution) {
|
||||
AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
|
||||
Pair<Boolean, AutoCloseable> cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
|
||||
if (cachedConsumerCloseable == null) {
|
||||
this.logger.trace("An execution was cancelled, but had no registered subscription to close: {}", execution.getId());
|
||||
return false;
|
||||
@@ -56,7 +78,7 @@ public class MqSubscriptionService extends MqExecutionService {
|
||||
|
||||
// this will eventually lead to a call to "consumed() above"
|
||||
try {
|
||||
cachedConsumerCloseable.close();
|
||||
cachedConsumerCloseable.getRight().close();
|
||||
return super.cancelled(execution);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
@@ -75,10 +97,10 @@ public class MqSubscriptionService extends MqExecutionService {
|
||||
return false;
|
||||
|
||||
this.logger.trace("Removing MQ subscription execution: {}", executionId);
|
||||
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
|
||||
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
|
||||
if (consumer != null) {
|
||||
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}", executionId, consumer);
|
||||
consumer.close();
|
||||
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}", executionId, consumer.getRight());
|
||||
consumer.getRight().close();
|
||||
}
|
||||
|
||||
return true;
|
||||
@@ -99,11 +121,11 @@ public class MqSubscriptionService extends MqExecutionService {
|
||||
String processDefinitionKey = processDefinition.getKey();
|
||||
|
||||
for (String executionId : executionIds) {
|
||||
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
|
||||
if (consumer != null && consumer.getLeft()) {
|
||||
this.logger.trace("Removing MQ subscription execution: {}: {}", processDefinitionId, executionId);
|
||||
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
|
||||
if (consumer != null) {
|
||||
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer);
|
||||
consumer.close();
|
||||
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer.getRight());
|
||||
consumer.getRight().close();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,11 +144,11 @@ public class MqSubscriptionService extends MqExecutionService {
|
||||
String processDefinitionKey = latestProcessDefinition.getKey();
|
||||
|
||||
for (String executionId : executionIds) {
|
||||
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
|
||||
if (consumer != null && consumer.getLeft()) {
|
||||
this.logger.trace("Removing MQ subscription execution: {}: {}", latestProcessDefinitionId, executionId);
|
||||
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
|
||||
if (consumer != null) {
|
||||
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer);
|
||||
consumer.close();
|
||||
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer.getRight());
|
||||
consumer.getRight().close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -13,6 +13,7 @@ import org.activiti.bpmn.model.FlowElement;
|
||||
import org.activiti.bpmn.model.SequenceFlow;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
import org.activiti.bpmn.model.StartEvent;
|
||||
import org.activiti.engine.ActivitiObjectNotFoundException;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.JavaDelegate;
|
||||
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
|
||||
@@ -47,8 +48,12 @@ public class ProcessDefinitionRegistry {
|
||||
if (this.processDefinitionMqSubscribeTasks.containsKey(processDefinitionId)) {
|
||||
return this.processDefinitionMqSubscribeTasks.get(processDefinitionId) != null;
|
||||
} else {
|
||||
try {
|
||||
// not yet cached; cache it; then return
|
||||
return this.findMqStartSubscribeTask(processDefinitionId) != null;
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -2,6 +2,8 @@ package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
public interface TransactionalMessageHandler<BodyType> {
|
||||
|
||||
void onMessage(DeliveredMessage<BodyType> message) throws IOException;
|
||||
|
@@ -13,12 +13,12 @@ import javax.jms.JMSException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.GenericDestination;
|
||||
import com.inteligr8.activiti.mq.MqCommunicator;
|
||||
import com.inteligr8.activiti.mq.MqSubscriptionListener;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.CancelCallback;
|
||||
import com.rabbitmq.client.Channel;
|
||||
@@ -72,7 +72,7 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException {
|
||||
public <BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException {
|
||||
String correlationId = message.getCorrelationId() != null ? message.getCorrelationId() : UUID.randomUUID().toString();
|
||||
|
||||
AMQP.BasicProperties bprops = new AMQP.BasicProperties(null, null,
|
||||
@@ -82,7 +82,7 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
correlationId,
|
||||
message.getReplyToQueueName() != null ? message.getReplyToQueueName() : null,
|
||||
null, // expiration
|
||||
null, // messageId
|
||||
UUID.randomUUID().toString(), // messageId
|
||||
null, // timestamp
|
||||
null, // type
|
||||
null, null, null);
|
||||
@@ -100,7 +100,7 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
con.close();
|
||||
}
|
||||
|
||||
return correlationId;
|
||||
return AmqpDeliveredMessage.transform(bprops);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@@ -8,9 +8,10 @@ import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.ByteArrayChannel;
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.Delivery;
|
||||
|
||||
/**
|
||||
@@ -47,6 +48,19 @@ public class AmqpDeliveredMessage<BodyType> extends AbstractMessage implements D
|
||||
this.getProperties().putAll(message.getProperties().getHeaders());
|
||||
}
|
||||
|
||||
public AmqpDeliveredMessage(AMQP.BasicProperties props, Class<BodyType> bodyType) throws IOException {
|
||||
this.setMessageId(props.getMessageId());
|
||||
this.setCorrelationId(props.getCorrelationId());
|
||||
}
|
||||
|
||||
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(AMQP.BasicProperties props) throws IOException {
|
||||
return new AmqpDeliveredMessage<>(props, null);
|
||||
}
|
||||
|
||||
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(AMQP.BasicProperties props, Class<BodyType> bodyType) throws IOException {
|
||||
return new AmqpDeliveredMessage<>(props, bodyType);
|
||||
}
|
||||
|
||||
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(Delivery message) throws IOException {
|
||||
return new AmqpDeliveredMessage<>(message, null);
|
||||
}
|
||||
|
@@ -12,8 +12,8 @@ import org.apache.commons.io.IOUtils;
|
||||
import org.jgroups.util.UUID;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.Delivery;
|
||||
|
||||
|
@@ -14,12 +14,12 @@ import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.GenericDestination;
|
||||
import com.inteligr8.activiti.mq.MqCommunicator;
|
||||
import com.inteligr8.activiti.mq.MqSubscriptionListener;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
public class JmsCommunicator implements MqCommunicator {
|
||||
|
||||
@@ -68,7 +68,7 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException {
|
||||
public <BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException {
|
||||
Connection con = this.connect();
|
||||
try {
|
||||
return this.send(con, destination, (JmsPreparedMessage<BodyType>) message);
|
||||
@@ -77,18 +77,18 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
}
|
||||
}
|
||||
|
||||
protected <BodyType> String send(Connection con, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
protected <BodyType> DeliveredMessage<BodyType> send(Connection con, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
Session session = this.start(con);
|
||||
try {
|
||||
String correlationId = this.send(session, destination, message);
|
||||
DeliveredMessage<BodyType> dmessage = this.send(session, destination, message);
|
||||
session.commit();
|
||||
return correlationId;
|
||||
return dmessage;
|
||||
} finally {
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
|
||||
public <BodyType> String send(Session session, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
public <BodyType> DeliveredMessage<BodyType> send(Session session, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
MessageProducer messenger = session.createProducer(destination.toJmsQueue(session));
|
||||
try {
|
||||
if (destination.getDeliveryMode() != null)
|
||||
@@ -103,7 +103,7 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
this.logger.debug("Sending message to queue: {}", destination.getQueueName());
|
||||
messenger.send(jmsmsg);
|
||||
|
||||
return jmsmsg.getJMSCorrelationID();
|
||||
return JmsDeliveredMessage.transform(jmsmsg);
|
||||
} finally {
|
||||
messenger.close();
|
||||
}
|
||||
@@ -143,37 +143,43 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
|
||||
public <BodyType> JmsDeliveredMessage<BodyType> receive(Session session, GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener) throws JMSException, TimeoutException {
|
||||
String messageSelector = correlationId == null ? null : ("JMSCorrelationID='" + correlationId + "'");
|
||||
Message receivedMessage = null;
|
||||
|
||||
MessageConsumer messenger = session.createConsumer(destination.toJmsQueue(session), messageSelector);
|
||||
try {
|
||||
if (listener != null)
|
||||
listener.consuming(messenger);
|
||||
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
if (timeoutInMillis < 0L) {
|
||||
this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName());
|
||||
return JmsDeliveredMessage.transform(messenger.receive());
|
||||
receivedMessage = messenger.receive();
|
||||
} else if (timeoutInMillis == 0L) {
|
||||
this.logger.debug("Checking for message without waiting: {}", destination.getQueueName());
|
||||
return JmsDeliveredMessage.transform(messenger.receiveNoWait());
|
||||
receivedMessage = messenger.receiveNoWait();
|
||||
} else {
|
||||
this.logger.debug("Waiting for message for {} ms: {}", timeoutInMillis, destination.getQueueName());
|
||||
long startTime = System.currentTimeMillis();
|
||||
Message message = messenger.receive(timeoutInMillis);
|
||||
long elapsedTime = System.currentTimeMillis() - startTime;
|
||||
if (message != null) {
|
||||
this.logger.debug("Received message after {} ms: {}", elapsedTime, destination.getQueueName());
|
||||
return JmsDeliveredMessage.transform(message);
|
||||
receivedMessage = messenger.receive(timeoutInMillis);
|
||||
}
|
||||
|
||||
if (elapsedTime < timeoutInMillis) {
|
||||
throw new JMSException("The reading of the queue ended prematurely");
|
||||
} else {
|
||||
long elapsedTime = System.currentTimeMillis() - startTime;
|
||||
if (receivedMessage != null) {
|
||||
this.logger.debug("Received message after {} ms: {}", elapsedTime, destination.getQueueName());
|
||||
return JmsDeliveredMessage.transform(receivedMessage);
|
||||
}
|
||||
|
||||
if (timeoutInMillis > 0L) {
|
||||
if (elapsedTime >= timeoutInMillis)
|
||||
throw new TimeoutException("A timeout of " + timeoutInMillis + " ms was reached");
|
||||
this.logger.debug("Done waiting for message after {} minutes: {}", elapsedTime / 60000L, destination.getQueueName());
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
} finally {
|
||||
if (listener != null)
|
||||
listener.consumed(messenger);
|
||||
|
||||
}
|
||||
} finally {
|
||||
messenger.close();
|
||||
}
|
||||
}
|
||||
|
@@ -20,8 +20,8 @@ import org.apache.commons.io.IOUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
public class JmsDeliveredMessage<BodyType> extends AbstractMessage implements DeliveredMessage<BodyType> {
|
||||
|
||||
|
@@ -14,8 +14,8 @@ import javax.jms.StreamMessage;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.jgroups.util.UUID;
|
||||
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
public class JmsPreparedMessage<BodyType> extends AbstractMessage implements PreparedMessage<BodyType> {
|
||||
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
public interface Message {
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
@@ -7,8 +7,6 @@ import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.Queue;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -17,6 +15,9 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
@Component
|
||||
public class MQDockerIT {
|
||||
|
||||
@@ -106,15 +107,18 @@ public class MQDockerIT {
|
||||
OffsetDateTime beforeSend = OffsetDateTime.now();
|
||||
this.logger.debug("Timestamp before send: {}", beforeSend);
|
||||
|
||||
String correlationId = communicator.send(destination, message);
|
||||
Assertions.assertNotNull(correlationId);
|
||||
DeliveredMessage<String> sentMessage = communicator.send(destination, message);
|
||||
Assertions.assertNotNull(sentMessage);
|
||||
Assertions.assertNotNull(sentMessage.getMessageId());
|
||||
Assertions.assertNotNull(sentMessage.getCorrelationId());
|
||||
|
||||
DeliveredMessage<String> receivedMessage = communicator.receive(destination, 25L, correlationId);
|
||||
DeliveredMessage<String> receivedMessage = communicator.receive(destination, 25L, sentMessage.getCorrelationId());
|
||||
Assertions.assertNotNull(receivedMessage);
|
||||
this.logger.debug("Timestamp of delivery: {}", receivedMessage.getDeliveryTime());
|
||||
|
||||
Assertions.assertNotNull(receivedMessage.getMessageId());
|
||||
Assertions.assertEquals(correlationId, receivedMessage.getCorrelationId());
|
||||
Assertions.assertEquals(sentMessage.getMessageId(), receivedMessage.getMessageId());
|
||||
Assertions.assertEquals(sentMessage.getCorrelationId(), receivedMessage.getCorrelationId());
|
||||
Assertions.assertEquals(message.getReplyToQueueName(), receivedMessage.getReplyToQueueName());
|
||||
Assertions.assertEquals("{}", receivedMessage.getContent());
|
||||
Assertions.assertNotNull(receivedMessage.getDeliveryTime());
|
||||
|
3
src/test/resources/activiti/whitelisted-scripts.conf
Normal file
3
src/test/resources/activiti/whitelisted-scripts.conf
Normal file
@@ -0,0 +1,3 @@
|
||||
javascript
|
||||
js
|
||||
ecmascript
|
24
src/test/vscode/test.http
Normal file
24
src/test/vscode/test.http
Normal file
@@ -0,0 +1,24 @@
|
||||
@baseUrl = http://localhost:8080/activiti-app
|
||||
@username = admin@app.activiti.com
|
||||
@password = admin
|
||||
@basic = YWRtaW5AYXBwLmFjdGl2aXRpLmNvbTphZG1pbg==
|
||||
|
||||
###
|
||||
# @name getPis
|
||||
POST {{baseUrl}}/api/enterprise/historic-process-instances/query
|
||||
Authorization: BASIC {{username}}:{{password}}
|
||||
Content-type: application/json
|
||||
|
||||
{
|
||||
"finished": false
|
||||
}
|
||||
|
||||
###
|
||||
# @name getPis2
|
||||
GET {{baseUrl}}/api/runtime/process-instances?tenantId=tenant_1
|
||||
Authorization: BASIC {{username}}:{{password}}
|
||||
|
||||
###
|
||||
# @name getExecs
|
||||
GET {{baseUrl}}/api/runtime/executions?tenantId=tenant_1
|
||||
Authorization: BASIC {{username}}:{{password}}
|
Reference in New Issue
Block a user