24 Commits

Author SHA1 Message Date
bf6ba664f5 v1.0.10 2025-03-18 14:35:29 -04:00
d5fe8e3176 Merge branch 'develop' into stable 2025-03-18 14:35:10 -04:00
7020a6569c jar-with-dependencies to shaded; to prevent conflicts 2025-03-18 14:34:55 -04:00
893596898b v1.0.9 pom 2025-03-11 12:22:44 -04:00
61c552fdd2 Merge branch 'develop' into stable 2025-03-11 12:22:26 -04:00
1d2250abd4 updated docs/errors 2025-03-11 12:22:12 -04:00
c1cc36afc2 terminate process instance when starter subscription ends 2025-03-11 12:21:47 -04:00
0d9b40f020 updated docs 2025-03-11 09:46:52 -04:00
b8d75dd95f receive null when closed; treat reply subscription different 2025-03-11 08:58:09 -04:00
2bf612cac2 major refactoring 2025-03-10 15:27:04 -04:00
e3d49c2811 v1.0.8 pom 2025-02-24 15:33:11 -05:00
5234aad45d Merge branch 'develop' into stable 2025-02-24 15:32:48 -05:00
6089c6c69a NPE fix 2025-02-24 15:32:36 -05:00
8b90e42614 v1.0.7 pom 2025-02-21 09:36:27 -05:00
1d98e90e97 Merge branch 'develop' into stable 2025-02-21 09:36:12 -05:00
740843908d refactored; added job listener 2025-02-21 09:36:01 -05:00
e593d50bb8 reuse correlationId, if avail, when sending 2025-02-20 15:45:29 -05:00
7a5065419a v1.0.6 pom 2025-02-20 13:10:24 -05:00
d3c79cac9c Merge branch 'develop' into stable 2025-02-20 13:10:05 -05:00
5aef2d4446 added execution cancellation support; listening for job deletion 2025-02-20 13:09:57 -05:00
dfb24cbd1f v1.0.5 pom 2025-02-19 11:59:39 -05:00
7a67634cfb Merge branch 'develop' into stable 2025-02-19 11:59:12 -05:00
41b5271617 added manual expression detection 2025-02-19 11:59:01 -05:00
cc14a59959 remove process scope field from publish task 2025-02-19 11:58:30 -05:00
37 changed files with 1434 additions and 666 deletions

1
.gitignore vendored
View File

@@ -1,6 +1,7 @@
# Maven
target
pom.xml.versionsBackup
*-pom.xml
# Eclipse
.project

View File

@@ -6,6 +6,10 @@
This delegate sends a message to an MQ queue.
The process instance will continue after a successful publish of the message by this delegate to the specified queue. Use `mqSubscribeReplyDelegate` in a subsequent task to handle the response asynchronously.
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqSubscribeReplyDelegate` task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
*See Also*: [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md)
*See Also*: [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md)
@@ -20,21 +24,25 @@ ${mqPublishDelegate}
You may use it in a **Service Task**.
This method sends a message to an MQ queue.
It does not wait for a response and will continue after a successful publish of the message to the specified queue. Use `mqSubscribeReplyDelegate` in a subsequent task to handle the response asynchronously.
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqPublishDelegate` task.
This method makes this bean an Activiti delegate.
| Input Type | Name | Java Type | Documentation |
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set the `mq_messageId` result variable in the process instance scope; otherwise the variable will be local to the task. `mq_correlationId` is always at the process scope. |
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process. |
| BPMN Field | `mq_priority` | | [optional] A priority of the MQ message. May be an expression. Value depends on MQ protocol. |
| BPMN Field | `mq_payload` | | [optional] The body of the MQ message. May include expressions. |
| BPMN Field | `mq_payloadMimeType` | | [optional] The MIME type of the body of the MQ message. |
| BPMN Field | `mq_replyQueueName` | | [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send a reply. |
| BPMN Field | `mq_statusQueueName` | | [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send status updates. |
| Activiti Variable | `mq_correlationId` | | [optional] The correlationId of the message to send. |
| Result Type | Java Type, Name, or Error Code | Documentation |
| ------------------------ | ------------------------------------------------ | -------------------------------- |
| Activiti Variable | `mq_correlationId` | The correlationId of the message sent. |
| Activiti Variable | `mq_messageId` | The messageId of the message sent. |
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |
| Thrown BPMN Error | `mq` | An unknown MQ issue occurred. |

View File

@@ -6,6 +6,14 @@
This delegate listens for messages on an MQ queue.
This is expected to exist in a Service Task immediately after a plain start activity. This will cause process instances to automatically be created in order to maintain the MQ subscription as messages are received. If used in any other way, it will error and the process will fail validation.
This does not wait for a response to a specific message, but instead to all messages put on an MQ queue. That is for the `mqSubscribeReplyDelegate` delegate.
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
This requires a long running connection to MQ. It runs in a long running Activiti job/execution. If there is a failure or the server is restarted, the Activiti job will fail and automatically retry per the Activiti standard features. After exhausting retries, it may eventually dead-letter. Retry the job to continue the subscription.
*See Also*: [`mqPublishDelegate`](bean-mqPublishDelegate.md)
*See Also*: [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md)
@@ -20,22 +28,15 @@ ${mqSubscribeDelegate}
You may use it in a **Service Task**.
This delegate listens for messages on an MQ queue.
It does not wait for a response to a specific message put on an MQ queue. That is for the `mqPublishDelegate` and `mqSubscribeReplyDelegate` tasks.
When used, this task must be the first task after a plain start event in a process. If used in any other way, it will error and the process will fail validation.
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
TODO map response to variables
This method makes this bean an Activiti delegate.
| Input Type | Name | Java Type | Documentation |
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task. |
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task. |
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process. |
| BPMN Field | `mq_concurrency` | | The number of process instances to simultaneously listen on the queue. Only positive numbers are accepted. |
| Result Type | Java Type, Name, or Error Code | Documentation |
| ------------------------ | ------------------------------------------------ | -------------------------------- |
@@ -43,7 +44,10 @@ TODO map response to variables
| Activiti Variable | `mq_messageId` | The unique message identifer of the message received. |
| Activiti Variable | `mq_deliveryTime` | The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti. |
| Activiti Variable | `mq_priority` | An integer priority of the message; value depends on MQ protocol. |
| Activiti Variable | `mq_replyQueueName` | The name of a queue or topic to use to reply to the received message. The reply message must have the same correlating identifier. |
| Activiti Variable | `mq_payload` | The body of the MQ message. May include expressions. May be `null`. |
| Activiti Variable | `mq_payloadMimeType` | The MIME type of the body of the MQ message. May be `null`. |
| Activiti Variable | `mq_replyQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
| Activiti Variable | `mq_statusQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |
| Thrown BPMN Error | `mq` | An unknown MQ issue occurred. |

View File

@@ -4,10 +4,16 @@
*Since Version*: 1.0
This delegate listens for a reply message on an MQ queue.
This method listens for a reply message on an MQ queue.
*See Also*: mqPublishDelegate
*See Also*: mqSubscribeDelegate
The process instance will block until a corresponding reply message is received. It uses the `mq_correlationId` variable to select the corresponding reply message. That variable is automatically set by the `mqPublishDelegate` task. This is meant to be used after `mqPublishDelegate` and not just by itself. If you want to start processes with an MQ subscription see the `mqSubscribeDelegate` task.
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqPublishDelegate` task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
This requires a long running connection to MQ. It runs in a long running Activiti job/execution. If there is a failure or the server is restarted, the Activiti job will fail and automatically retry per the Activiti standard features. After exhausting retries, it may eventually dead-letter. Retry the job to continue the subscription.
*See Also*: [`mqPublishDelegate`](bean-mqPublishDelegate.md)
*See Also*: [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md)
## <a name="delegate"></a> Delegate Expression Uses
@@ -20,20 +26,14 @@ ${mqSubscribeReplyDelegate}
You may use it in a **Service Task**.
This method listens for a reply message on an MQ queue.
It uses the `mq_correlationId` variable to select the corresponding reply message. That variable is automatically set by the `mqPublishDelegate` task. This is meant to be used after `mqPublishDelegate` and not just by itself. If you want to start processes with an MQ subscription see the `mqSubscribeDelegate` task.
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqPublishDelegate` task.
TODO map response to variables
This method makes this bean an Activiti delegate.
| Input Type | Name | Java Type | Documentation |
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task. |
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task. |
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process. |
| Activiti Variable | `mq_correlationId` | | The correlating identifier of the message to receive. Used to correlate between sent/received messages; like a thread of communication. |
| Result Type | Java Type, Name, or Error Code | Documentation |
@@ -41,7 +41,10 @@ TODO map response to variables
| Activiti Variable | `mq_messageId` | The unique message identifer of the message received. |
| Activiti Variable | `mq_deliveryTime` | The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti. |
| Activiti Variable | `mq_priority` | An integer priority of the message; value depends on MQ protocol. |
| Activiti Variable | `mq_replyQueueName` | The name of a queue or topic to use to reply to the received message. The reply message must have the same correlating identifier. |
| Activiti Variable | `mq_payload` | The body of the MQ message. May include expressions. May be `null`. |
| Activiti Variable | `mq_payloadMimeType` | The MIME type of the body of the MQ message. May be `null`. |
| Activiti Variable | `mq_replyQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
| Activiti Variable | `mq_statusQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
| Thrown BPMN Error | `correlation` | `mq_correlationId` is required. |
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |

View File

@@ -23,7 +23,7 @@ It is important to note that Activiti expressions use the JUEL language and Acti
| -------------------------------- | ------------------------- |
| [`mqPublishDelegate`](bean-mqPublishDelegate.md) | This delegate sends a message to an MQ queue. |
| [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md) | This delegate listens for messages on an MQ queue. |
| [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md) | This delegate listens for a reply message on an MQ queue. |
| [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md) | This method listens for a reply message on an MQ queue. |
---

43
pom.xml
View File

@@ -11,7 +11,7 @@
</parent>
<artifactId>mq-activiti-ext</artifactId>
<version>1.0.4</version>
<version>1.0.10</version>
<packaging>jar</packaging>
<name>MQ Activiti Extension</name>
@@ -27,7 +27,7 @@
<spring.version>5.3.31</spring.version>
<activiti.version>7.11.1</activiti.version>
<tomcat-rad.version>9-2.2</tomcat-rad.version>
<aps.tomcat.opts>-Dinteligr8.mq.connectors.docker-mq.url=failover:\(tcp://${project.artifactId}-mq:61616\)?timeout=3000 -Dinteligr8.mq.connectors.docker-mq.username=admin -Dinteligr8.mq.connectors.docker-mq.password=admin</aps.tomcat.opts>
<aps.tomcat.opts>-Dinteligr8.mq.connectors.docker-mq.url=failover:\(tcp://${project.artifactId}-mq:61616\)?timeout=3000 -Dinteligr8.mq.connectors.docker-mq.username=admin -Dinteligr8.mq.connectors.docker-mq.password=admin -Dvalidator.editor.dmn.espression=false -Dvalidator.editor.bpmn.disable.scripttask=false -Djavascript.secure-scripting.enabled=false -Djavascript.secure-scripting.enable-class-whitelisting=false -Dbeans.whitelisting.enabled=false -Del.whitelisting.enabled=false -Dshell.whitelisting.enabled=false</aps.tomcat.opts>
<!-- reloads in APS are slower than a restart -->
<aps.hotswap.enabled>false</aps.hotswap.enabled>
@@ -140,6 +140,7 @@
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.10.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
@@ -189,16 +190,6 @@
<artifactId>maven-assembly-plugin</artifactId>
<version>3.7.1</version>
<executions>
<execution>
<id>assemble-jar</id>
<phase>package</phase>
<goals><goal>single</goal></goals>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
<execution>
<id>package-stencil</id>
<phase>package</phase>
@@ -223,6 +214,34 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.6.0</version>
<executions>
<execution>
<id>shade-deps</id>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<relocations>
<relocation>
<shadedPattern>shaded.mq.</shadedPattern>
<excludes>
<!-- anything in the root directory of a JAR -->
<exclude>*</exclude>
<!-- anything in the META-INF directory of a JAR -->
<exclude>META-INF/**</exclude>
<!-- anything developed in this extension -->
<exclude>com.activiti.extension.**</exclude>
<exclude>com.inteligr8.activiti.mq.**</exclude>
</excludes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>

View File

@@ -3087,8 +3087,7 @@
"mq_prioritypackage",
"mq_payloadpackage",
"mq_replyQueueNamepackage",
"mq_statusQueueNamepackage",
"mq_metadataProcessScopepackage"
"mq_statusQueueNamepackage"
],
"hiddenPropertyPackages": [
"multiinstance_typepackage",

View File

@@ -0,0 +1,69 @@
package com.inteligr8.activiti.mq;
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.persistence.entity.Entity;
public interface ActivitiEntityEventListener<T extends Entity> {
/**
* This method checks to see if this listener is for the specified entity.
*
* @param entity An Activiti entity.
* @return `true` if this listener supports the entity; `false` otherwise.
*/
default boolean ofEntityType(Entity entity) {
return entity == null ? false : this.ofEntityType(entity.getClass());
}
/**
* This method checks to see if this listener is for the specified entity
* class.
*
* @param entityType An Activiti entity class.
* @return `true` if this listener supports the entity type; `false` otherwise.
*/
boolean ofEntityType(Class<? extends Entity> entityType);
/**
* This method allows for initialization on application startup. Any
* resources this listener should subscribe to, should be connected or
* opened.
*/
default void onApplicationStartup() {
}
/**
* This method allows for uninitialization on application shutdown. Any
* resources this listener subscribes to should be disconnected or closed.
*
* The `ActivitiEntityEventListener#onEvent()` method may still be called
* before startup or after shutdown.
*/
default void onApplicationShutdown() {
}
/**
* This method is fired every time there is a qualifying entity event.
*
* @param aaevent An Activiti entity event.
*/
default void onEntityEvent(ActivitiEntityEvent aaevent) {
@SuppressWarnings("unchecked")
T entity = (T) aaevent.getEntity();
this.onEntityEvent(aaevent.getType(), aaevent.getProcessDefinitionId(), aaevent.getProcessInstanceId(), aaevent.getExecutionId(), entity);
}
/**
* Thi smethod is fired every time there is a qualifying entity event.
*
* @param eventType An Activiti event type; limited to `ENTITY_*`.
* @param processDefinitionId The unique identifier of the process definition subject to the event.
* @param processInstanceId The unique identifier of the process instance subject to the event.
* @param executionId The unique identifier of the execution subject to the event.
* @param entity The Activiti entity.
*/
void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId, String executionId, T entity);
}

View File

@@ -0,0 +1,111 @@
package com.inteligr8.activiti.mq;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.delegate.event.ActivitiEventListener;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.persistence.entity.Entity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.stereotype.Component;
@Component
public class ActivitiEntityEventMonitor implements ActivitiEventListener, ApplicationListener<ApplicationContextEvent> {
private final Logger logger = LoggerFactory.getLogger(ActivitiEntityEventMonitor.class);
@Autowired
private List<ActivitiEntityEventListener<? extends Entity>> listeners;
@Autowired
private ProcessEngine services;
/**
* This method is fired by Spring, the framework behind Activiti. This
* forwards the application event to each listener so they can initialize
* or uninitialize.
*/
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent || event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent) {
this.logger.debug("Application context refresh/stop/close detected; shutting down listeners: {}", event);
for (ActivitiEntityEventListener<? extends Entity> listener : this.listeners) {
listener.onApplicationShutdown();
}
}
// the listener cannot be active until the context is initialized
if (event instanceof ContextRefreshedEvent || event instanceof ContextStartedEvent) {
this.logger.debug("Application context refresh/start detected; starting up listeners: {}", event);
for (ActivitiEntityEventListener<? extends Entity> listener : this.listeners) {
listener.onApplicationStartup();
}
}
}
/**
* This method will start monitoring for changes in deployment, app, and
* process definitions.
*/
@PostConstruct
protected void init() {
this.logger.debug("Bean initialized; starting to listen for any entity event ...");
this.services.getRuntimeService().addEventListener(this,
ActivitiEventType.ENTITY_CREATED,
ActivitiEventType.ENTITY_INITIALIZED,
ActivitiEventType.ENTITY_UPDATED,
ActivitiEventType.ENTITY_ACTIVATED,
ActivitiEventType.ENTITY_SUSPENDED,
ActivitiEventType.ENTITY_DELETED);
}
/**
* This method will stop monitoring for changes in deployment, app, and
* process definitions.
*/
@PreDestroy
protected void uninit() {
this.logger.debug("Bean uninitialized; stopping listener for any entity event ...");
this.services.getRuntimeService().removeEventListener(this);
}
/**
* This method is fired by the Activiti platform. It is called on every
* entity event.
*
* @param event An Activiti event.
*/
@Override
public void onEvent(ActivitiEvent event) {
this.logger.trace("Triggered by event: {}: {}; pdId: {}; piId: {}; execId: {}", event.getType(), event.getClass(), event.getProcessDefinitionId(), event.getProcessInstanceId(), event.getExecutionId());
for (ActivitiEntityEventListener<? extends Entity> listener : this.listeners) {
ActivitiEntityEvent aaevent = (ActivitiEntityEvent) event;
if (aaevent.getEntity() instanceof Entity && listener.ofEntityType((Entity) aaevent.getEntity()))
listener.onEntityEvent(aaevent);
}
}
@Override
public boolean isFailOnException() {
return true;
}
}

View File

@@ -1,455 +0,0 @@
package com.inteligr8.activiti.mq;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.activiti.bpmn.model.BpmnModel;
import org.activiti.bpmn.model.FieldExtension;
import org.activiti.bpmn.model.FlowElement;
import org.activiti.bpmn.model.SequenceFlow;
import org.activiti.bpmn.model.ServiceTask;
import org.activiti.bpmn.model.StartEvent;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.JavaDelegate;
import org.activiti.engine.delegate.event.ActivitiActivityEvent;
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.delegate.event.ActivitiEventListener;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.repository.ProcessDefinitionQuery;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.ExecutionQuery;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.stereotype.Component;
import com.activiti.domain.idm.Tenant;
@Component
public class MQProcessDefinitionMonitor implements ActivitiEventListener, ApplicationListener<ApplicationContextEvent> {
private final Logger logger = LoggerFactory.getLogger(MQProcessDefinitionMonitor.class);
private final Pattern expressionPattern = Pattern.compile("\\$\\{(.+)\\}");
@Autowired
private ProcessEngine services;
@Autowired
private ApplicationContext context;
@Autowired
private TenantFinderService tenantFinderService;
@Autowired
private MqSubscriptionService subscriptionService;
private Map<String, AbstractActivityListener> activeListeners = new HashMap<>();
/**
* This method will add/remove listeners to the specific process
* definitions that use MQ subscriptions to start process instances at
* application startup and shutdown.
*/
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent || event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent) {
this.logger.debug("Discovered {} active process definitions to stop listening to", this.activeListeners.size());
this.deloopMqSubscribeTasks();
}
if (event instanceof ContextRefreshedEvent || event instanceof ContextStartedEvent) {
String tenantId = this.findTenantId();
List<ProcessDefinition> procDefs = this.findLatestActiveProcessDefinnitions(tenantId);
this.logger.debug("Found {} active process definitions", procDefs.size());
for (ProcessDefinition procDef : procDefs) {
this.logger.trace("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId());
ServiceTask task = this.findMqStartSubscribeTask(procDef.getId());
if (task == null)
continue;
int concurrency = this.determineConcurrency(task);
this.logger.debug("Process definition MQ subscription is configured for concurrency: {}: {}", procDef.getId(), concurrency);
List<Execution> execs = this.findExecutionsByServiceTask(tenantId, procDef.getId(), task);
this.logger.debug("Process appears to have {} executions waiting on the MQ subscription: {}", execs.size(), procDef.getId());
if (execs.size() < concurrency) {
this.logger.info("Process has {} too few executions waiting on the MQ subscription; starting them: {}", (concurrency - execs.size()), procDef.getId());
for (int thread = execs.size(); thread < concurrency; thread++) {
this.loopMqSubscribeTask(procDef.getId(), task);
}
}
}
}
}
protected String findTenantId() {
Tenant tenant = this.tenantFinderService.findTenant();
return this.tenantFinderService.transform(tenant);
}
protected List<ProcessDefinition> findLatestActiveProcessDefinnitions(String tenantId) {
ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery()
.latestVersion()
.active();
if (tenantId == null) {
procDefQuery.processDefinitionWithoutTenantId();
} else {
procDefQuery.processDefinitionTenantId(tenantId);
}
return procDefQuery.list();
}
protected List<Execution> findExecutionsByServiceTask(String tenantId, String processDefinitionId, ServiceTask task) {
ExecutionQuery execQuery = this.services.getRuntimeService().createExecutionQuery()
.processDefinitionId(processDefinitionId)
.activityId(task.getId());
if (tenantId == null) {
execQuery.executionWithoutTenantId();
} else {
execQuery.executionTenantId(tenantId);
}
return execQuery.list();
}
protected Integer findConcurrency(ServiceTask task) {
for (FieldExtension fieldext : task.getFieldExtensions()) {
if (fieldext.getFieldName().equals(Constants.FIELD_CONCURRENCY)) {
String concurrencyStr = StringUtils.trimToNull(fieldext.getStringValue());
return concurrencyStr == null ? null : Integer.valueOf(concurrencyStr);
}
}
return null;
}
protected int determineConcurrency(ServiceTask task) {
Integer concurrency = this.findConcurrency(task);
if (concurrency == null) {
return 1;
} else if (concurrency.intValue() < 1) {
this.logger.warn("The task defines an illegal concurrency of {}; using 1: {}", concurrency, task.getId());
return 1;
} else {
return concurrency.intValue();
}
}
/**
* This method will start monitoring for changes in deployment, app, and
* process definitions.
*/
@PostConstruct
protected void init() {
this.services.getRuntimeService().addEventListener(this,
ActivitiEventType.ENTITY_INITIALIZED,
ActivitiEventType.ENTITY_UPDATED,
ActivitiEventType.ENTITY_ACTIVATED,
ActivitiEventType.ENTITY_SUSPENDED,
ActivitiEventType.ENTITY_DELETED);
this.logger.debug("Started listening for entity events");
}
/**
* This method will stop monitoring for changes in deployment, app, and
* process definitions.
*/
@PreDestroy
protected void uninit() {
this.logger.debug("Stopping listening for entity events ...");
this.services.getRuntimeService().removeEventListener(this);
}
/**
* This method is fired by the Activiti platform. It is called on every
* entity event except the creation event. You can see that in the init()
* method above.
*/
@Override
public void onEvent(ActivitiEvent event) {
this.logger.trace("Triggered by event: {}", event);
this.onEntityEvent((ActivitiEntityEvent) event);
}
@Override
public boolean isFailOnException() {
return true;
}
protected void onEntityEvent(ActivitiEntityEvent aaevent) {
this.logger.trace("Triggered by entity: {}", aaevent.getEntity());
if (aaevent.getEntity() instanceof ProcessDefinitionEntity) {
this.logger.debug("Triggered by process definition state change: {}", aaevent.getEntity());
switch (aaevent.getType()) {
case ENTITY_INITIALIZED:
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
// we must use DeploymentEntity and then dig down for the process definitions
break;
case ENTITY_DELETED:
case ENTITY_SUSPENDED:
// we need to stop the listener
this.onProcessDefinitionRemoveEvent((ProcessDefinitionEntity) aaevent.getEntity());
break;
default:
// we need to start a listener
this.onProcessDefinitionAddEvent((ProcessDefinitionEntity) aaevent.getEntity());
}
// we only want to deal with ProcessDefinition entities
} else if (aaevent.getEntity() instanceof DeploymentEntity) {
switch (aaevent.getType()) {
case ENTITY_INITIALIZED:
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
// we must use DeploymentEntity and then dig down for the process definitions
this.onDeploymentAddEvent((DeploymentEntity) aaevent.getEntity());
break;
default:
}
// we only want to deal with ProcessDefinition entities
}
}
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition addition: {}", entity);
this.unsubscribeOtherMqSubscribeTasks(entity);
ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
if (task == null)
return;
this.loopMqSubscribeTask(entity.getId(), task);
}
protected void onDeploymentAddEvent(DeploymentEntity entity) {
this.logger.debug("Triggered by deployment addition: {}", entity);
List<ProcessDefinitionEntity> procDefEntities = entity.getDeployedArtifacts(ProcessDefinitionEntity.class);
if (procDefEntities == null)
return;
this.logger.debug("Found {} process definitions in deployment: {}", procDefEntities.size(), entity.getId());
for (ProcessDefinitionEntity procDefEntity : procDefEntities) {
this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName());
this.unsubscribeOtherMqSubscribeTasks(procDefEntity);
ServiceTask task = this.findMqStartSubscribeTask(procDefEntity.getId());
if (task == null)
return;
this.loopMqSubscribeTask(procDefEntity.getId(), task);
}
}
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition removal: {}", entity);
this.unsubscribeOtherMqSubscribeTasks(entity);
ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
if (task == null)
return;
this.deloopMqSubscribeTask(entity.getId());
}
protected void unsubscribeOtherMqSubscribeTasks(ProcessDefinitionEntity procDef) {
this.unsubscribeOtherMqSubscribeTasks(procDef.getId());
}
protected void unsubscribeOtherMqSubscribeTasks(String procDefId) {
try {
Set<String> executionIds = this.subscriptionService.clearOtherVersions(procDefId);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
} else {
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
}
} catch (Exception e) {
this.logger.error("The subscriptions could not be cleared: " + procDefId, e);
}
}
protected synchronized void loopMqSubscribeTask(String processDefId, ServiceTask task) {
// start a process instance on the process
this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue", processDefId);
this.services.getRuntimeService().startProcessInstanceById(processDefId);
if (this.activeListeners.containsKey(processDefId))
// one listener, no matter how many instances are subscribed
return;
AbstractActivityListener listener = new AbstractActivityListener(processDefId, task) {
@Override
public void onEvent(ActivitiActivityEvent event) {
logger.debug("MQ message received; starting another process instance on process '{}' to re-subscribe to the MQ queue", event.getProcessDefinitionId());
services.getRuntimeService().startProcessInstanceById(processDefId);
}
@Override
public boolean isFailOnException() {
return true;
}
};
// the ServiceTask will then execute, waiting for a message on the queue
// when a message is received, it will be responsible for starting a new process instance
// that new process instance will wait for the next message on the queue
// repeat
this.services.getRuntimeService().addEventListener(listener, ActivitiEventType.ACTIVITY_COMPLETED);
// register the listener so we can possibly remove it later
this.activeListeners.put(processDefId, listener);
}
protected synchronized int deloopMqSubscribeTasks() {
int count = 0;
Iterator<Entry<String, AbstractActivityListener>> i = this.activeListeners.entrySet().iterator();
while (i.hasNext()) {
Entry<String, AbstractActivityListener> listener = i.next();
this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", listener.getKey());
this.services.getRuntimeService().removeEventListener(listener.getValue());
i.remove();
count++;
}
return count;
}
protected synchronized boolean deloopMqSubscribeTask(String processDefId) {
AbstractActivityListener listener = this.activeListeners.remove(processDefId);
if (listener == null) {
this.logger.trace("No MQ subscription listener found; no listener to stop");
return false;
}
this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", processDefId);
this.services.getRuntimeService().removeEventListener(listener);
return true;
}
private BpmnModel getBpmnModel(String processDefId) {
if (Context.getProcessDefinitionHelper() == null) {
// typically during initial startup
return this.services.getRepositoryService().getBpmnModel(processDefId);
} else {
// we cannot use the RepositoryService to get the BpmnModel as it is not yet cached in TX
return ProcessDefinitionUtil.getBpmnModel(processDefId);
}
}
protected ServiceTask findMqStartSubscribeTask(String processDefId) {
BpmnModel model = this.getBpmnModel(processDefId);
StartEvent startElement = this.findMqPlainStartEvent(processDefId, model);
if (startElement == null)
return null;
List<SequenceFlow> flows = startElement.getOutgoingFlows();
this.logger.trace("Found {} outgoing flows in the process start element: {}: {}", flows.size(), processDefId, startElement.getId());
if (flows.isEmpty()) {
this.logger.warn("A start event is expected to always have an outgoing flow; skipping process: {}", processDefId);
return null;
} else if (flows.size() > 1) {
this.logger.debug("A start event with multiple outgoing flows cannot have an MQ subscription; skipping process: {}", processDefId);
return null;
}
SequenceFlow flow = flows.iterator().next();
FlowElement targetFlowElement = flow.getTargetFlowElement();
if (targetFlowElement == null) {
this.logger.warn("A start event outgoing flow is expected to always have a target element; skipping process: {}", processDefId);
return null;
}
this.logger.trace("Found element '{}' after start event '{}' in process '{}'", targetFlowElement.getId(), startElement.getId(), processDefId);
if (!(targetFlowElement instanceof ServiceTask)) {
this.logger.trace("A non-service task immediately after start event; skipping process: {}: {}: {}", processDefId, targetFlowElement.getId(), targetFlowElement.getName());
return null;
}
ServiceTask task = (ServiceTask) targetFlowElement;
if (!"delegateExpression".equals(task.getImplementationType())) {
this.logger.trace("A non-delegate service task immediately after start event; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
return null;
}
this.logger.trace("Found service task immediately after start event: {}: {}", task.getImplementationType(), task.getImplementation());
Matcher matcher = this.expressionPattern.matcher(task.getImplementation());
if (!matcher.find()) {
this.logger.warn("A service task delegate expression is in an unexpected format: {}; skipping process: {}: {}: {}", task.getImplementation(), processDefId, task.getId(), task.getName());
return null;
}
String beanId = matcher.group(1).trim();
this.logger.trace("Looking up bean: {}", beanId);
JavaDelegate delegateBean = this.context.getBean(beanId, JavaDelegate.class);
this.logger.trace("Found bean: {}: {}", beanId, delegateBean);
if (delegateBean == null) {
this.logger.trace("The service task delegate has no bean; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
return null;
} else if (!(delegateBean instanceof MqSubscribeDelegate)) {
this.logger.trace("The service task delegate is not an MQ subscription; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
return null;
}
this.logger.info("Process starts with an MQ subscription: {}: {}", processDefId, task.getId(), task.getName());
return task;
}
protected StartEvent findMqPlainStartEvent(String processDefId, BpmnModel model) {
this.logger.trace("Finding all start elements in process: {}", processDefId);
List<StartEvent> startElements = model.getMainProcess().findFlowElementsOfType(StartEvent.class);
this.logger.trace("Found {} start elements in process: {}", startElements.size(), processDefId);
for (StartEvent startElement : startElements) {
// constrain to just 0..1 plain start events; ignore timer, signal, message, and error start events
if (!(startElement.getBehavior() instanceof NoneStartEventActivityBehavior)) {
this.logger.trace("Found non-plain start event; ignoring start event: {}: {}: {}", processDefId, startElement.getId(), startElement.getBehavior());
continue;
}
return startElement;
}
return null;
}
protected ServiceTask castToServiceTask(FlowElement flowElement) {
if (!(flowElement instanceof ServiceTask))
return null;
return (ServiceTask) flowElement;
}
}

View File

@@ -5,13 +5,16 @@ import java.util.concurrent.TimeoutException;
import javax.jms.JMSException;
import com.inteligr8.activiti.mq.model.DeliveredMessage;
import com.inteligr8.activiti.mq.model.PreparedMessage;
public interface MqCommunicator {
boolean validateConnection();
<BodyType> PreparedMessage<BodyType> createPreparedMessage();
<BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException;
<BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException;
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination) throws JMSException, IOException, TimeoutException {
return this.receive(destination, -1L, null, null, null);

View File

@@ -68,10 +68,18 @@ public class MqDelegateExecution {
return varName;
}
/**
* Always pull from the process scope.
* @param correlationId
*/
public String getCorrelationId() {
return this.execution.getVariable(this.formulateVariableName(Constants.VARIABLE_CORRELATION_ID), String.class);
}
/**
* Always set at process scope.
* @param correlationId
*/
public void setCorrelationId(String correlationId) {
this.execution.setVariable(this.formulateVariableName(Constants.VARIABLE_CORRELATION_ID), correlationId);
}

View File

@@ -0,0 +1,85 @@
package com.inteligr8.activiti.mq;
import java.util.List;
import java.util.Set;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
import org.activiti.engine.impl.persistence.entity.Entity;
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqDeploymentEventListener implements ActivitiEntityEventListener<DeploymentEntity> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
protected MqSubscribeLooper looper;
@Autowired
protected ProcessDefinitionRegistry registry;
@Autowired
private MqSubscriptionService subscriptionService;
@Override
public boolean ofEntityType(Class<? extends Entity> entityType) {
return DeploymentEntity.class.isAssignableFrom(entityType);
}
@Override
public boolean ofEntityType(Entity entity) {
return entity instanceof DeploymentEntity;
}
@Override
public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId,
String executionId, DeploymentEntity entity) {
this.logger.trace("Triggered by deployment event: {}", entity);
switch (eventType) {
case ENTITY_INITIALIZED:
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
// we must use DeploymentEntity and then dig down for the process definitions
this.onDeploymentAddEvent((DeploymentEntity) entity);
break;
default:
}
}
protected void onDeploymentAddEvent(DeploymentEntity entity) {
this.logger.debug("Triggered by deployment addition: {}", entity);
List<ProcessDefinitionEntity> procDefEntities = entity.getDeployedArtifacts(ProcessDefinitionEntity.class);
if (procDefEntities == null)
return;
this.logger.debug("Found {} process definitions in deployment: {}", procDefEntities.size(), entity.getId());
for (ProcessDefinitionEntity procDefEntity : procDefEntities) {
this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName());
this.unsubscribeOtherMqSubscribeTasks(procDefEntity.getId());
if (this.registry.isMqStart(procDefEntity.getId()))
this.looper.loop(procDefEntity.getId());
}
}
protected void unsubscribeOtherMqSubscribeTasks(String procDefId) {
try {
Set<String> executionIds = this.subscriptionService.cancelAllOtherVersions(procDefId);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Subscription starter executions ended early: {}: {}", procDefId, executionIds);
} else {
this.logger.info("Subscriptions starter ended early: {}: {}", procDefId, executionIds.size());
}
} catch (Exception e) {
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
}
}
}

View File

@@ -8,6 +8,9 @@ import java.util.Set;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.Job;
import org.activiti.engine.runtime.ProcessInstance;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import org.apache.commons.lang3.tuple.Pair;
@@ -26,13 +29,13 @@ public class MqExecutionService {
/**
* The size of the keys is limited to the number of process definitions
* series are defined. This excludes versions. It would actually only
* contain ones with an MQ subscribe task. It would never be a significant
* memory hog.
* series defined. This excludes versions. It would actually only contain
* ones with an MQ subscribe task. It would never be a significant memory
* hog.
*
* The size of the values is limited to the number of active process
* definition versions exist. So it would never be a significant memory
* hog.
* definition versions that exist. So it would never be a significant
* memory hog.
*
* The size of the keys/values have nothing to do with the number of
* process instances or executions.
@@ -81,8 +84,10 @@ public class MqExecutionService {
private MultiValuedMap<Pair<String, String>, String> activityExecutionMap = new HashSetValuedHashMap<>();
public synchronized void executing(DelegateExecution execution) {
ProcessDefinition procDef = this.services.getRepositoryService().createProcessDefinitionQuery().processDefinitionId(execution.getProcessDefinitionId()).singleResult();
this.processDefinitionKeyMap.put(procDef.getKey(), execution.getProcessDefinitionId());
Pair<String, String> key = this.toKey(execution);
this.processDefinitionKeyMap.put(execution.getProcessDefinitionId(), key.getLeft());
this.processDefinitionActivityMap.put(key.getLeft(), key.getRight());
this.activityExecutionMap.put(key, execution.getId());
}
@@ -92,33 +97,134 @@ public class MqExecutionService {
this.activityExecutionMap.removeMapping(key, execution.getId());
}
public synchronized final boolean cancelled(String executionId) {
Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult();
return this.cancelled(execution);
}
public synchronized boolean cancelled(Execution execution) {
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
if (execution.getActivityId() != null) {
Pair<String, String> key = this.toKey(pi.getProcessDefinitionId(), execution.getActivityId());
return this.activityExecutionMap.removeMapping(key, execution.getId());
} else {
this.logger.trace("No activity discovered, so checking all activities in the process definition: {}: {}", pi.getProcessDefinitionId(), execution.getId());
Collection<String> activityIds = this.processDefinitionActivityMap.get(pi.getProcessDefinitionId());
boolean removed = false;
for (String activityId : activityIds) {
Pair<String, String> key = this.toKey(pi.getProcessDefinitionId(), activityId);
removed = this.activityExecutionMap.removeMapping(key, execution.getId()) || removed;
}
return removed;
}
}
public synchronized final boolean cancelledJob(String jobId) {
Job job = this.services.getManagementService().createJobQuery().jobId(jobId).singleResult();
return this.cancelledJob(job);
}
public synchronized final boolean cancelledJob(Job job) {
return this.cancelled(job.getExecutionId());
}
/**
* This method cancels all the executions active on the specified process
* definition.
*
* @param executionId An execution unique identifier.
* @return `true` if execution was cached; `false` otherwise.
*/
public boolean cancel(String executionId) throws Exception {
this.logger.trace("cancel({})", executionId);
Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult();
if (execution == null) {
this.logger.debug("No execution to cancel: {}", executionId);
return false;
}
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
if (pi == null) {
this.logger.debug("No process instance to cancel: {}", executionId);
return false;
}
return this.cancel(pi.getProcessDefinitionId(), executionId);
}
/**
* This method cancels all the executions active on the specified process
* definition.
*
* @param processDefinitionId A process definition unique identifier.
* @return A set of execution identifiers that were cancelled.
*/
public Set<String> cancelAll(String processDefinitionId) throws Exception {
this.logger.trace("cancelAll({})", processDefinitionId);
ProcessDefinition processDefinition = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
String processDefinitionKey = processDefinition.getKey();
return this.cancelAll(processDefinitionId, processDefinitionKey);
}
/**
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
* @return A set of execution identifiers that were in the now cleared map.
*/
public synchronized Set<String> clearOtherVersions(String latestProcessDefinitionId) throws Exception {
public synchronized Set<String> cancelAllOtherVersions(String latestProcessDefinitionId) throws Exception {
this.logger.trace("cancelAllOtherVersions({})", latestProcessDefinitionId);
ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId);
if (latestProcessDefinition == null)
return Collections.emptySet();
String processDefinitionKey = latestProcessDefinition.getKey();
Set<String> executionIds = new HashSet<>();
Collection<String> processDefinitionIds = this.processDefinitionKeyMap.get(processDefinitionKey);
for (String processDefinitionId : processDefinitionIds) {
this.processDefinitionKeyMap.removeMapping(processDefinitionKey, processDefinitionId);
for (String processDefinitionId : processDefinitionIds)
executionIds.addAll(this.cancelAll(processDefinitionId, processDefinitionKey));
Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId);
if (activityIds == null) {
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
return Collections.emptySet();
}
return executionIds;
}
for (String activityId : activityIds) {
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
Collection<String> activityExecutionIds = this.activityExecutionMap.remove(key);
if (activityExecutionIds != null)
executionIds.addAll(activityExecutionIds);
}
private synchronized boolean cancel(String processDefinitionId, String executionId) throws Exception {
this.logger.trace("Cancelling execution: {}: {}", processDefinitionId, executionId);
Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId);
if (activityIds == null) {
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
return false;
}
for (String activityId : activityIds) {
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
Collection<String> activityExecutionIds = this.activityExecutionMap.get(key);
if (activityExecutionIds.remove(executionId))
return true;
}
return false;
}
private synchronized Set<String> cancelAll(String processDefinitionId, String processDefinitionKey) throws Exception {
Set<String> executionIds = new HashSet<>();
this.processDefinitionKeyMap.removeMapping(processDefinitionKey, processDefinitionId);
Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId);
if (activityIds == null) {
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
return Collections.emptySet();
}
for (String activityId : activityIds) {
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
Collection<String> activityExecutionIds = this.activityExecutionMap.remove(key);
if (activityExecutionIds != null)
executionIds.addAll(activityExecutionIds);
}
return executionIds;

View File

@@ -0,0 +1,56 @@
package com.inteligr8.activiti.mq;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.persistence.entity.Entity;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* This needs to be changed to detect just process versioning (deactivation?)
*/
@Component
public class MqJobEventListener implements ActivitiEntityEventListener<JobEntity> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
protected MqSubscriptionService subscriptionService;
@Override
public boolean ofEntityType(Class<? extends Entity> entityType) {
return JobEntity.class.isAssignableFrom(entityType);
}
@Override
public boolean ofEntityType(Entity entity) {
return entity instanceof JobEntity;
}
@Override
public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId,
String executionId, JobEntity entity) {
this.logger.trace("Triggered by job event: {}", entity);
// switch (eventType) {
// case ENTITY_DELETED:
// case ENTITY_SUSPENDED:
// this.onJobRemoveEvent(entity);
// break;
// default:
// }
}
protected void onJobRemoveEvent(JobEntity entity) {
this.logger.trace("Triggered by job removal: {}", entity);
try {
this.subscriptionService.cancel(entity.getExecutionId());
this.logger.debug("Subscription execution ended due to job removal: job: {}: exec: {}", entity.getId(), entity.getExecutionId());
} catch (Exception e) {
this.logger.error("Subscription execution failed to be canceled after job removal: job: " + entity.getId() + ": exec: " + entity.getExecutionId(), e);
}
}
}

View File

@@ -0,0 +1,124 @@
package com.inteligr8.activiti.mq;
import java.util.Set;
import org.activiti.engine.ActivitiObjectNotFoundException;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.persistence.entity.Entity;
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqProcessDefinitionEventListener implements ActivitiEntityEventListener<ProcessDefinitionEntity> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
protected MqSubscribeLooper looper;
@Autowired
protected ProcessDefinitionRegistry registry;
@Autowired
protected MqSubscriptionService subscriptionService;
@Override
public boolean ofEntityType(Class<? extends Entity> entityType) {
return ProcessDefinitionEntity.class.isAssignableFrom(entityType);
}
@Override
public boolean ofEntityType(Entity entity) {
return entity instanceof ProcessDefinitionEntity;
}
@Override
public void onApplicationStartup() {
this.looper.deloopAllMqProcessDefinitions();
this.looper.loopAllMqProcessDefinitions();
}
@Override
public void onApplicationShutdown() {
this.looper.deloopAllMqProcessDefinitions();
}
@Override
public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId,
String executionId, ProcessDefinitionEntity entity) {
this.logger.trace("Triggered by process definition event: {}", entity);
switch (eventType) {
case ENTITY_CREATED:
case ENTITY_INITIALIZED:
// we cannot use the ProcessDefinitionEntity for ENTITY_CREATED or ENTITY_INITIALIZED as we need the BpmnModel and it is not yet cached
// we must use DeploymentEntity and then dig down for the process definitions
break;
case ENTITY_DELETED:
case ENTITY_SUSPENDED:
// we need to stop the listener
this.onProcessDefinitionRemoveEvent((ProcessDefinitionEntity) entity);
break;
default:
}
}
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition addition: {}", entity);
try {
// cancel subscriptions on all legacy version process definitions
this.unsubscribeOtherMqSubscribeTasks(entity.getId());
if (this.registry.isMqStart(entity.getId()))
this.looper.loop(entity.getId());
} catch (ActivitiObjectNotFoundException aonfe) {
this.logger.debug("Added process definition could not be found; because of orders of operation or transaction isolation: {}: {}", entity.getId(), aonfe.getMessage());
}
}
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition removal: {}", entity);
// cancel subscriptions on this process definition
this.unsubscribeMqSubscribeTasks(entity.getId());
if (this.registry.isMqStart(entity.getId()))
this.looper.deloop(entity.getId());
}
protected void unsubscribeMqSubscribeTasks(String procDefId) {
try {
Set<String> executionIds = this.subscriptionService.cancelAll(procDefId);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
} else {
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
}
} catch (ActivitiObjectNotFoundException aonfe) {
throw aonfe;
} catch (Exception e) {
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
}
}
protected void unsubscribeOtherMqSubscribeTasks(String procDefId) {
try {
Set<String> executionIds = this.subscriptionService.cancelAllOtherVersions(procDefId);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
} else {
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
}
} catch (ActivitiObjectNotFoundException aonfe) {
throw aonfe;
} catch (Exception e) {
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
}
}
}

View File

@@ -13,9 +13,21 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.inteligr8.activiti.mq.model.DeliveredMessage;
import com.inteligr8.activiti.mq.model.PreparedMessage;
/**
* This delegate sends a message to an MQ queue.
*
* The process instance will continue after a successful publish of the message
* by this delegate to the specified queue. Use `mqSubscribeReplyDelegate` in
* a subsequent task to handle the response asynchronously.
*
* If you have more than one MQ communication in a single process, set the
* `mq_messageName` field on this and the corresponding
* `mqSubscribeReplyDelegate` task. The resultant variables will then include
* that message name (e.g. `mq_messageId_{mq_messageName}`).
*
* @author brian@inteligr8.com
* @since 1.0
* @see mqSubscribeReplyDelegate#
@@ -34,27 +46,21 @@ public class MqPublishDelegate extends AbstractMqDelegate {
private MqServiceTaskService msts;
/**
* This method sends a message to an MQ queue.
*
* It does not wait for a response and will continue after a successful
* publish of the message to the specified queue. Use
* `mqSubscribeReplyDelegate` in a subsequent task to handle the response
* asynchronously.
*
* If you have more than one MQ communication in a single process, set the
* `mq_messageName` field on this and the corresponding `mqPublishDelegate`
* task.
* This method makes this bean an Activiti delegate.
*
* @param execution An Activiti delegate execution (source task or execution/task listener).
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
* @field mq_priority [optional] A priority of the MQ message. May be an expression. Value depends on MQ protocol.
* @field mq_metadataProcessScope [optional] `true` to set the `mq_messageId` result variable in the process instance scope; otherwise the variable will be local to the task. `mq_correlationId` is always at the process scope.
* @field mq_messageName [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process.
* @field mq_priority [optional] A priority of the MQ message. May be an expression. Supported range depends on MQ protocol.
* @field mq_payload [optional] The body of the MQ message. May include expressions.
* @field mq_payloadMimeType [optional] The MIME type of the body of the MQ message.
* @field mq_payloadMimeType [optional] The MIME type of the `mq_payload`.
* @field mq_replyQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send a reply.
* @field mq_statusQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send status updates.
* @varIn mq_correlationId [optional] The correlationId of the message to send.
* @varOut mq_correlationId The correlationId of the message sent.
* @varOut mq_messageId The messageId of the message sent.
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
* @throws BPMNError network The MQ connection experienced network issues.
* @throws BPMNError mq An unknown MQ issue occurred.
@@ -73,6 +79,8 @@ public class MqPublishDelegate extends AbstractMqDelegate {
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
PreparedMessage<String> message = communicator.createPreparedMessage();
if (mqExecution.getCorrelationId() != null)
message.setCorrelationId(mqExecution.getCorrelationId());
if (mqExecution.getStatusQueueNameFromModel() != null)
message.setProperty("inteligr8.statusQueueName", mqExecution.getStatusQueueNameFromModel());
message.setReplyToQueueName(mqExecution.getReplyQueueNameFromModel());
@@ -88,10 +96,11 @@ public class MqPublishDelegate extends AbstractMqDelegate {
}
}
String correlationId = communicator.send(destination, message);
this.logger.debug("Sent MQ message: {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), correlationId);
DeliveredMessage<?> deliveredMessage = communicator.send(destination, message);
this.logger.debug("Sent MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), deliveredMessage.getMessageId(), deliveredMessage.getCorrelationId());
mqExecution.setCorrelationId(correlationId);
mqExecution.setMessageId(deliveredMessage.getMessageId());
mqExecution.setCorrelationId(deliveredMessage.getCorrelationId());
} catch (TimeoutException te) {
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());

View File

@@ -70,27 +70,63 @@ public class MqServiceTask {
@SuppressWarnings("unchecked")
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type, VariableScope varscope, boolean forceExpressionProcessing) {
FieldExtension field = this.fieldMap.get(fieldName);
if (field == null) {
Object value = this.getFieldValueFromModel(fieldName, varscope, forceExpressionProcessing);
if (value == null) {
return null;
} else if (field.getExpression() != null && field.getExpression().length() > 0) {
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
Expression expr = exprman.createExpression(field.getExpression());
return (T) expr.getValue(varscope);
} else if (forceExpressionProcessing) {
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
Expression expr = exprman.createExpression(field.getStringValue());
return (T) expr.getValue(varscope);
} else if (String.class.isAssignableFrom(type)) {
return (T) field.getStringValue();
if (value instanceof String) {
return (T) value;
} else {
return (T) value.toString();
}
} else {
try {
Method method = type.getMethod("valueOf", String.class);
return (T) method.invoke(null, field.getStringValue());
Method method = type.getMethod("valueOf", value.getClass());
return (T) method.invoke(null, value);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new IllegalArgumentException(e);
String strvalue;
if (value instanceof String) {
strvalue = (String) value;
} else {
strvalue = value.toString();
}
try {
Method method = type.getMethod("valueOf", String.class);
return (T) method.invoke(null, strvalue);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e2) {
throw new IllegalArgumentException("The target type '" + type + "' has no 'valueOf' method for String or type: " + value.getClass());
}
}
}
}
protected Object getFieldValueFromModel(String fieldName, VariableScope varscope, boolean forceExpressionProcessing) {
FieldExtension field = this.fieldMap.get(fieldName);
if (field == null) {
return null;
} else if (field.getExpression() != null && field.getExpression().length() > 0) {
this.logger.trace("Field value is recognized as an expression by the Activity framework: {}: {}", fieldName, field.getExpression());
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
Expression expr = exprman.createExpression(field.getExpression());
return expr.getValue(varscope);
} else if (field.getStringValue() == null) {
this.logger.trace("Field value is null: {}", fieldName);
return null;
} else if (forceExpressionProcessing) {
this.logger.trace("Field value will be processed as potentially having expression(s): {}", fieldName);
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
Expression expr = exprman.createExpression(field.getStringValue());
return expr.getValue(varscope);
} else if (field.getStringValue().startsWith("${") && field.getStringValue().endsWith("}")) {
this.logger.trace("Field value is recognized as an expression by the MQ extension: {}: {}", fieldName, field.getStringValue());
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
Expression expr = exprman.createExpression(field.getStringValue());
return expr.getValue(varscope);
} else {
return field.getStringValue();
}
}
}

View File

@@ -13,9 +13,30 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.inteligr8.activiti.mq.model.DeliveredMessage;
/**
* This delegate listens for messages on an MQ queue.
*
* This is expected to exist in a Service Task immediately after a plain start
* activity. This will cause process instances to automatically be created in
* order to maintain the MQ subscription as messages are received. If used in
* any other way, it will error and the process will fail validation.
*
* This does not wait for a response to a specific message, but instead to all
* messages put on an MQ queue. That is for the `mqSubscribeReplyDelegate`
* delegate.
*
* If you have more than one MQ communication in a single process, set the
* `mq_messageName` field on this task. The resultant variables will then
* include that message name (e.g. `mq_messageId_{mq_messageName}`).
*
* This requires a long running connection to MQ. It runs in a long running
* Activiti job/execution. If there is a failure or the server is restarted,
* the Activiti job will fail and automatically retry per the Activiti standard
* features. After exhausting retries, it may eventually dead-letter. Retry
* the job to continue the subscription.
*
* @author brian@inteligr8.com
* @since 1.0
* @see mqPublishDelegate#
@@ -37,35 +58,22 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
private MqSubscriptionService subscriptionService;
/**
* This delegate listens for messages on an MQ queue.
*
* It does not wait for a response to a specific message put on an MQ queue.
* That is for the `mqPublishDelegate` and `mqSubscribeReplyDelegate` tasks.
*
* When used, this task must be the first task after a plain start event in a
* process. If used in any other way, it will error and the process will fail
* validation.
*
* If you have more than one MQ communication in a single process, set the
* `mq_messageName` field on this task. The resultant variables will then
* include that message name (e.g. `mq_messageId_{mq_messageName}`).
*
* TODO map response to variables
* This method makes this bean an Activiti delegate.
*
* @param execution An Activiti delegate execution (source task or execution/task listener).
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
* @field mq_metadataProcessScope [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task.
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
* @field mq_metadataProcessScope [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task.
* @field mq_messageName [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process.
* @field mq_concurrency The number of process instances to simultaneously listen on the queue. Only positive numbers are accepted.
* @varOut mq_correlationId The correlating identifier of the message received. Use to correlate between sent/received messages; like a thread of communication.
* @varOut mq_messageId The unique message identifer of the message received.
* @varOut mq_deliveryTime The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti.
* @varOut mq_priority An integer priority of the message; value depends on MQ protocol.
* @varOut mq_payload The body of the MQ message. May include expressions. May be `null`.
* @varOut mq_payloadMimeType The MIME type of the body of the MQ message. May be `null`.
* @varOut mq_priority An integer priority of the message; supported range depends on MQ protocol.
* @varOut mq_payload The body of the MQ message; may be `null`.
* @varOut mq_payloadMimeType The MIME type of `mq_payload`; may be `null`.
* @varOut mq_replyQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) where status updates on the received message are expected to be sent. The status message must have the same correlating identifier. May be `null`.
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
* @throws BPMNError network The MQ connection experienced network issues.
* @throws BPMNError mq An unknown MQ issue occurred.
@@ -84,11 +92,11 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
try {
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
communicator.receive(destination,
if (communicator.receive(destination,
new MqSubscriptionListener() {
@Override
public void consuming(AutoCloseable consumerCloseable) {
subscriptionService.consuming(execution, consumerCloseable);
subscriptionService.autoconsuming(execution, consumerCloseable);
}
@Override
@@ -111,7 +119,10 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
mqExecution.setStatusQueueName(message.getStatusQueueName());
}
}
);
) == null) {
this.logger.debug("Gracefully stopped looking for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
this.services.getRuntimeService().deleteProcessInstance(execution.getProcessInstanceId(), "MQ subscription cancelled gracefully");
}
} catch (TimeoutException te) {
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
@@ -124,9 +135,4 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
}
}
public Integer getConcurrency(DelegateExecution execution) {
MqServiceTask task = this.msts.get(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
return task == null ? 1 : task.getFieldValueFromModel(Constants.FIELD_CONCURRENCY, Integer.class);
}
}

View File

@@ -0,0 +1,274 @@
package com.inteligr8.activiti.mq;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.activiti.bpmn.model.FieldExtension;
import org.activiti.bpmn.model.FlowElement;
import org.activiti.bpmn.model.ServiceTask;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.event.ActivitiActivityEvent;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.repository.ProcessDefinitionQuery;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.ExecutionQuery;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.activiti.domain.idm.Tenant;
@Component
public class MqSubscribeLooper {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
protected ProcessEngine services;
@Autowired
protected ProcessDefinitionRegistry registry;
@Autowired
protected TenantFinderService tenantFinderService;
@Autowired
protected MqSubscriptionService subscriptionService;
protected Map<String, AbstractActivityListener> activeListeners = new HashMap<>();
/**
* This method starts the listening of activity completions on all the MQ
* subscribe tasks. This will start the infinite loop where new process
* instances are started every time another process instance receives a
* message.
*/
public void loopAllMqProcessDefinitions() {
String tenantId = this.findTenantId();
List<ProcessDefinition> procDefs = this.findLatestActiveProcessDefinnitions(tenantId);
this.logger.debug("Found {} active process definitions", procDefs.size());
for (ProcessDefinition procDef : procDefs) {
this.logger.trace("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId());
if (this.registry.isMqStart(procDef.getId()))
this.loop(tenantId, procDef.getId());
}
}
/**
* This method starts the listening of activity completions on the MQ
* subscribe task in the specified process definition. This will start the
* infinite loop where new process instances are started every time another
* process instance receives a message.
*
* @param processDefinitionId The unique identifier of the process definition to loop.
*/
public void loop(String processDefinitionId) {
String tenantId = this.findTenantId();
this.loop(tenantId, processDefinitionId);
}
private void loop(String tenantId, String processDefinitionId) {
ProcessDefinition procDef = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
if (procDef == null)
throw new IllegalArgumentException("The process definition does not exist: " + processDefinitionId);
if (procDef.isSuspended())
throw new IllegalStateException("The process definition is suspended: " + processDefinitionId);
ServiceTask task = this.registry.findMqStartSubscribeTask(processDefinitionId);
if (task == null)
throw new IllegalArgumentException("The process definition does not qualify for MQ subscription looping: " + processDefinitionId);
int concurrency = this.determineConcurrency(task);
this.logger.debug("Process definition MQ subscription task is configured for concurrency: {}: {}", processDefinitionId, concurrency);
List<Execution> execs = this.findExecutionsByServiceTask(tenantId, processDefinitionId, task);
this.logger.debug("Process appears to have {} executions waiting on the MQ subscription: {}", execs.size(), processDefinitionId);
if (execs.size() < concurrency) {
this.logger.info("Process has {} too few executions waiting on the MQ subscription; starting them: {}", (concurrency - execs.size()), processDefinitionId);
this.loop(tenantId, processDefinitionId, task, concurrency, execs);
}
}
private synchronized void loop(String tenantId, String processDefinitionId, ServiceTask task, int concurrency, List<Execution> execs) {
String key = processDefinitionId + "|" + task.getId();
if (this.activeListeners.containsKey(key))
// one listener, no matter how many instances are subscribed
return;
AbstractActivityListener listener = new AbstractActivityListener(processDefinitionId, task) {
@Override
public void onEvent(ActivitiActivityEvent event) {
logger.debug("MQ message received; starting another process instance on process '{}' to re-subscribe to the MQ queue", event.getProcessDefinitionId());
services.getRuntimeService().startProcessInstanceById(event.getProcessDefinitionId());
}
@Override
public boolean isFailOnException() {
return true;
}
};
// the ServiceTask will then execute, waiting for a message on the queue
// when a message is received, it will be responsible for starting a new process instance
// that new process instance will wait for the next message on the queue
// repeat
this.services.getRuntimeService().addEventListener(listener, ActivitiEventType.ACTIVITY_COMPLETED);
// register the listener so we can possibly remove it later
this.activeListeners.put(key, listener);
for (int thread = execs.size(); thread < concurrency; thread++) {
// start a process instance on the process
this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue/topic", processDefinitionId);
this.services.getRuntimeService().startProcessInstanceById(processDefinitionId);
}
}
/**
* This method stops the listening of activity completions. This will end
* the infinite loop where new process instances are started every time
* another process instance receives a message.
*
* It will also attempt to cancel/close any active MQ subscription. If
* they are already closed, it will be skipped. The `executionId` of
* successfully cancelled subscriptions will be returned.
*
* @return A set of unique identifiers for Activiti executions that were successfully cancelled.
*/
public synchronized Set<String> deloopAllMqProcessDefinitions() {
Set<String> executionIds = new HashSet<>();
Iterator<Entry<String, AbstractActivityListener>> i = this.activeListeners.entrySet().iterator();
while (i.hasNext()) {
Entry<String, AbstractActivityListener> listener = i.next();
String[] splitkey = listener.getKey().split("\\|");
String processDefinitionId = splitkey[0];
String activityId = splitkey[1];
executionIds.addAll(this.deloop(processDefinitionId, activityId, listener.getValue()));
i.remove();
}
return executionIds;
}
/**
* This method stops the listening of activity completions on the MQ
* subscribe task in the specified process definition. This will end the
* infinite loop where new process instances are started every time another
* process instance receives a message.
*
* @param processDefinitionId The unique identifier of the process definition to loop.
* @return A set of unique identifiers for Activiti executions that were successfully cancelled.
*/
public synchronized Set<String> deloop(String processDefinitionId) {
ServiceTask task = this.registry.findMqStartSubscribeTask(processDefinitionId);
if (task == null)
throw new IllegalArgumentException();
String key = processDefinitionId + "|" + task.getId();
AbstractActivityListener listener = this.activeListeners.remove(key);
return this.deloop(processDefinitionId, task.getId(), listener);
}
private Set<String> deloop(String processDefinitionId, String activityId, AbstractActivityListener listener) {
if (listener == null) {
this.logger.trace("No MQ subscription listener was found; no listener to stop");
} else {
this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", processDefinitionId);
this.services.getRuntimeService().removeEventListener(listener);
}
Set<String> executionIds = new HashSet<>();
List<Execution> executions = this.services.getRuntimeService().createExecutionQuery()
.processDefinitionId(processDefinitionId)
.activityId(activityId)
.list();
for (Execution execution : executions) {
this.logger.debug("An MQ subscription was found; stopping execution: {}: {}: {}", processDefinitionId, activityId, execution.getId());
if (this.subscriptionService.cancelled(execution)) {
this.logger.info("An MQ subscription was closed: {}", execution.getId());
executionIds.add(execution.getId());
} else {
this.logger.debug("An MQ subscription was already closed: {}", execution.getId());
}
}
return executionIds;
}
protected String findTenantId() {
Tenant tenant = this.tenantFinderService.findTenant();
return this.tenantFinderService.transform(tenant);
}
protected List<ProcessDefinition> findLatestActiveProcessDefinnitions(String tenantId) {
ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery()
.latestVersion()
.active();
if (tenantId == null) {
procDefQuery.processDefinitionWithoutTenantId();
} else {
procDefQuery.processDefinitionTenantId(tenantId);
}
return procDefQuery.list();
}
protected List<Execution> findExecutionsByServiceTask(String tenantId, String processDefinitionId, ServiceTask task) {
ExecutionQuery execQuery = this.services.getRuntimeService().createExecutionQuery()
.processDefinitionId(processDefinitionId)
.activityId(task.getId());
if (tenantId == null) {
execQuery.executionWithoutTenantId();
} else {
execQuery.executionTenantId(tenantId);
}
return execQuery.list();
}
protected Integer findConcurrency(ServiceTask task) {
for (FieldExtension fieldext : task.getFieldExtensions()) {
if (fieldext.getFieldName().equals(Constants.FIELD_CONCURRENCY)) {
String concurrencyStr = StringUtils.trimToNull(fieldext.getStringValue());
return concurrencyStr == null ? null : Integer.valueOf(concurrencyStr);
}
}
return null;
}
protected int determineConcurrency(ServiceTask task) {
Integer concurrency = this.findConcurrency(task);
if (concurrency == null) {
return 1;
} else if (concurrency.intValue() < 1) {
this.logger.warn("The task defines an illegal concurrency of {}; using 1: {}", concurrency, task.getId());
return 1;
} else {
return concurrency.intValue();
}
}
protected ServiceTask castToServiceTask(FlowElement flowElement) {
if (!(flowElement instanceof ServiceTask))
return null;
return (ServiceTask) flowElement;
}
}

View File

@@ -13,8 +13,28 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.inteligr8.activiti.mq.model.DeliveredMessage;
/**
* This delegate listens for a reply message on an MQ queue.
* This method listens for a reply message on an MQ queue.
*
* The process instance will block until a corresponding reply message is
* received. It uses the `mq_correlationId` variable to select the
* corresponding reply message. That variable is automatically set by the
* `mqPublishDelegate` task. This is meant to be used after
* `mqPublishDelegate` and not just by itself. If you want to start processes
* with an MQ subscription see the `mqSubscribeDelegate` task.
*
* If you have more than one MQ communication in a single process, set the
* `mq_messageName` field on this and the corresponding `mqPublishDelegate`
* task. The resultant variables will then include that message name (e.g.
* `mq_messageId_{mq_messageName}`).
*
* This requires a long running connection to MQ. It runs in a long running
* Activiti job/execution. If there is a failure or the server is restarted,
* the Activiti job will fail and automatically retry per the Activiti standard
* features. After exhausting retries, it may eventually dead-letter. Retry
* the job to continue the subscription.
*
* @author brian@inteligr8.com
* @since 1.0
@@ -37,33 +57,21 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
private MqSubscriptionService subscriptionService;
/**
* This method listens for a reply message on an MQ queue.
*
* It uses the `mq_correlationId` variable to select the corresponding reply
* message. That variable is automatically set by the `mqPublishDelegate`
* task. This is meant to be used after `mqPublishDelegate` and not just by
* itself. If you want to start processes with an MQ subscription see the
* `mqSubscribeDelegate` task.
*
* If you have more than one MQ communication in a single process, set the
* `mq_messageName` field on this and the corresponding `mqPublishDelegate`
* task.
*
* TODO map response to variables
* This method makes this bean an Activiti delegate.
*
* @param execution An Activiti delegate execution (source task or execution/task listener).
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
* @field mq_metadataProcessScope [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task.
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
* @field mq_metadataProcessScope [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task.
* @field mq_messageName [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process.
* @varIn mq_correlationId The correlating identifier of the message to receive. Used to correlate between sent/received messages; like a thread of communication.
* @varOut mq_messageId The unique message identifer of the message received.
* @varOut mq_deliveryTime The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti.
* @varOut mq_priority An integer priority of the message; value depends on MQ protocol.
* @varOut mq_payload The body of the MQ message. May include expressions. May be `null`.
* @varOut mq_payloadMimeType The MIME type of the body of the MQ message. May be `null`.
* @varOut mq_priority An integer priority of the message; supported range depends on MQ protocol.
* @varOut mq_payload The body of the MQ message; may be `null`.
* @varOut mq_payloadMimeType The MIME type of `mq_payload`; may be `null`.
* @varOut mq_replyQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) where status updates on the received message are expected to be sent. The status message must have the same correlating identifier. May be `null`.
* @throws BPMNError correlation `mq_correlationId` is required.
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
* @throws BPMNError network The MQ connection experienced network issues.
@@ -84,7 +92,7 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
try {
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
communicator.receive(destination, correlationId,
if (communicator.receive(destination, correlationId,
new MqSubscriptionListener() {
@Override
public void consuming(AutoCloseable consumerCloseable) {
@@ -110,7 +118,10 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
mqExecution.setStatusQueueName(message.getStatusQueueName());
}
}
);
) == null) {
this.logger.debug("Gracefully stopped looking for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
throw new BpmnError("cancelled", "MQ subscription cancelled gracefully");
}
} catch (TimeoutException te) {
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());

View File

@@ -7,6 +7,8 @@ import java.util.Set;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.Execution;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -29,39 +31,124 @@ public class MqSubscriptionService extends MqExecutionService {
* also necessary to remove the execution when it is removed from the
* `activityExecutionMap` map.
*/
private Map<String, AutoCloseable> executionSubscriptionMap = new HashMap<>();
private Map<String, Pair<Boolean, AutoCloseable>> executionSubscriptionMap = new HashMap<>();
/**
* This method registers a consuming execution; an execution that was not
* automatically started as part of the framework. This is for reply
* subscriptions and not starter subscriptions.
*
* @param execution An Activiti execution.
* @param consumerCloseable A closeable MQ consumer.
*/
public synchronized void consuming(DelegateExecution execution, AutoCloseable consumerCloseable) {
this.executing(execution);
this.executionSubscriptionMap.put(execution.getId(), consumerCloseable);
this.executionSubscriptionMap.put(execution.getId(), Pair.of(false, consumerCloseable));
}
/**
* This method registers an auto-consuming execution; an execution that was
* automatically started as part of the framework. This is for starter
* subscriptions and not reply subscriptions.
*
* @param execution An Activiti execution.
* @param consumerCloseable A closeable MQ consumer.
*/
public synchronized void autoconsuming(DelegateExecution execution, AutoCloseable consumerCloseable) {
this.executing(execution);
this.executionSubscriptionMap.put(execution.getId(), Pair.of(true, consumerCloseable));
}
public synchronized void consumed(DelegateExecution execution, AutoCloseable consumerCloseable) {
AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
if (cachedConsumerCloseable != consumerCloseable)
Pair<Boolean, AutoCloseable> cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
if (cachedConsumerCloseable != null && cachedConsumerCloseable.getRight() != consumerCloseable)
throw new IllegalStateException("The consumer objects were expected to be identical");
this.executionSubscriptionMap.remove(execution.getId());
this.executed(execution);
}
@Override
public synchronized boolean cancelled(Execution execution) {
Pair<Boolean, AutoCloseable> cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
if (cachedConsumerCloseable == null) {
this.logger.trace("An execution was cancelled, but had no registered subscription to close: {}", execution.getId());
return false;
}
// this will eventually lead to a call to "consumed() above"
try {
cachedConsumerCloseable.getRight().close();
return super.cancelled(execution);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
/**
* This method cancel the MQ subscription on the specified execution.
*
* @param executionId An execution unique identifier.
* @return `true` if execution and MQ subscription were ended; `false` otherwise.
*/
@Override
public synchronized boolean cancel(String executionId) throws Exception {
if (!super.cancel(executionId))
return false;
this.logger.trace("Removing MQ subscription execution: {}", executionId);
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null) {
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}", executionId, consumer.getRight());
consumer.getRight().close();
}
return true;
}
/**
* This method cancels all the MQ subscriptions active on the specified
* process definition.
*
* @param processDefinitionId A process definition version unique identifier.
* @return A set of execution identifiers subscribed to MQ that are now cancelled; all subscriptions now ended.
*/
@Override
public synchronized Set<String> cancelAll(String processDefinitionId) throws Exception {
Set<String> executionIds = super.cancelAll(processDefinitionId);
ProcessDefinition processDefinition = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
String processDefinitionKey = processDefinition.getKey();
for (String executionId : executionIds) {
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null && consumer.getLeft()) {
this.logger.trace("Removing MQ subscription execution: {}: {}", processDefinitionId, executionId);
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer.getRight());
consumer.getRight().close();
}
}
return executionIds;
}
/**
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
* @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended.
*/
@Override
public synchronized Set<String> clearOtherVersions(String latestProcessDefinitionId) throws Exception {
Set<String> executionIds = super.clearOtherVersions(latestProcessDefinitionId);
public synchronized Set<String> cancelAllOtherVersions(String latestProcessDefinitionId) throws Exception {
Set<String> executionIds = super.cancelAllOtherVersions(latestProcessDefinitionId);
ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId);
String processDefinitionKey = latestProcessDefinition.getKey();
for (String executionId : executionIds) {
this.logger.trace("Clearing process definition execution: {}: {}", latestProcessDefinitionId, executionId);
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null) {
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer);
consumer.close();
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null && consumer.getLeft()) {
this.logger.trace("Removing MQ subscription execution: {}: {}", latestProcessDefinitionId, executionId);
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer.getRight());
consumer.getRight().close();
}
}

View File

@@ -0,0 +1,151 @@
package com.inteligr8.activiti.mq;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.PostConstruct;
import org.activiti.bpmn.model.BpmnModel;
import org.activiti.bpmn.model.FlowElement;
import org.activiti.bpmn.model.SequenceFlow;
import org.activiti.bpmn.model.ServiceTask;
import org.activiti.bpmn.model.StartEvent;
import org.activiti.engine.ActivitiObjectNotFoundException;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.JavaDelegate;
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
@Component
public class ProcessDefinitionRegistry {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Pattern expressionPattern = Pattern.compile("\\$\\{(.+)\\}");
@Autowired
protected ProcessEngine services;
@Autowired
protected ApplicationContext context;
private Map<String, ServiceTask> processDefinitionMqSubscribeTasks;
@PostConstruct
private void init() {
this.processDefinitionMqSubscribeTasks = new HashMap<>();
}
public synchronized boolean isMqStart(String processDefinitionId) {
if (this.processDefinitionMqSubscribeTasks.containsKey(processDefinitionId)) {
return this.processDefinitionMqSubscribeTasks.get(processDefinitionId) != null;
} else {
try {
// not yet cached; cache it; then return
return this.findMqStartSubscribeTask(processDefinitionId) != null;
} catch (ActivitiObjectNotFoundException aonfe) {
return false;
}
}
}
public synchronized ServiceTask findMqStartSubscribeTask(String processDefinitionId) {
ServiceTask task = this.processDefinitionMqSubscribeTasks.get(processDefinitionId);
if (task != null)
return task;
BpmnModel model = this.getBpmnModel(processDefinitionId);
StartEvent startElement = this.findMqPlainStartEvent(processDefinitionId, model);
if (startElement == null)
return null;
List<SequenceFlow> flows = startElement.getOutgoingFlows();
this.logger.trace("Found {} outgoing flows in the process start element: {}: {}", flows.size(), processDefinitionId, startElement.getId());
if (flows.isEmpty()) {
this.logger.warn("A start event is expected to always have an outgoing flow; skipping process: {}", processDefinitionId);
return null;
} else if (flows.size() > 1) {
this.logger.debug("A start event with multiple outgoing flows cannot have an MQ subscription; skipping process: {}", processDefinitionId);
return null;
}
SequenceFlow flow = flows.iterator().next();
FlowElement targetFlowElement = flow.getTargetFlowElement();
if (targetFlowElement == null) {
this.logger.warn("A start event outgoing flow is expected to always have a target element; skipping process: {}", processDefinitionId);
return null;
}
this.logger.trace("Found element '{}' after start event '{}' in process '{}'", targetFlowElement.getId(), startElement.getId(), processDefinitionId);
if (!(targetFlowElement instanceof ServiceTask)) {
this.logger.trace("A non-service task immediately after start event; skipping process: {}: {}: {}", processDefinitionId, targetFlowElement.getId(), targetFlowElement.getName());
return null;
}
task = (ServiceTask) targetFlowElement;
if (!"delegateExpression".equals(task.getImplementationType())) {
this.logger.trace("A non-delegate service task immediately after start event; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName());
return null;
}
this.logger.trace("Found service task immediately after start event: {}: {}", task.getImplementationType(), task.getImplementation());
Matcher matcher = this.expressionPattern.matcher(task.getImplementation());
if (!matcher.find()) {
this.logger.warn("A service task delegate expression is in an unexpected format: {}; skipping process: {}: {}: {}", task.getImplementation(), processDefinitionId, task.getId(), task.getName());
return null;
}
String beanId = matcher.group(1).trim();
this.logger.trace("Looking up bean: {}", beanId);
JavaDelegate delegateBean = this.context.getBean(beanId, JavaDelegate.class);
this.logger.trace("Found bean: {}: {}", beanId, delegateBean);
if (delegateBean == null) {
this.logger.trace("The service task delegate has no bean; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName());
return null;
} else if (!(delegateBean instanceof MqSubscribeDelegate)) {
this.logger.trace("The service task delegate is not an MQ subscription; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName());
return null;
}
this.processDefinitionMqSubscribeTasks.put(processDefinitionId, task);
this.logger.info("Process starts with an MQ subscription: {}: {}", processDefinitionId, task.getId(), task.getName());
return task;
}
private BpmnModel getBpmnModel(String processDefId) {
if (Context.getProcessDefinitionHelper() == null) {
// typically during initial startup
return this.services.getRepositoryService().getBpmnModel(processDefId);
} else {
// we cannot use the RepositoryService to get the BpmnModel as it is not yet cached in TX
return ProcessDefinitionUtil.getBpmnModel(processDefId);
}
}
protected StartEvent findMqPlainStartEvent(String processDefId, BpmnModel model) {
this.logger.trace("Finding all start elements in process: {}", processDefId);
List<StartEvent> startElements = model.getMainProcess().findFlowElementsOfType(StartEvent.class);
this.logger.trace("Found {} start elements in process: {}", startElements.size(), processDefId);
for (StartEvent startElement : startElements) {
// constrain to just 0..1 plain start events; ignore timer, signal, message, and error start events
if (!(startElement.getBehavior() instanceof NoneStartEventActivityBehavior)) {
this.logger.trace("Found non-plain start event; ignoring start event: {}: {}: {}", processDefId, startElement.getId(), startElement.getBehavior());
continue;
}
return startElement;
}
return null;
}
}

View File

@@ -2,6 +2,8 @@ package com.inteligr8.activiti.mq;
import java.io.IOException;
import com.inteligr8.activiti.mq.model.DeliveredMessage;
public interface TransactionalMessageHandler<BodyType> {
void onMessage(DeliveredMessage<BodyType> message) throws IOException;

View File

@@ -13,12 +13,12 @@ import javax.jms.JMSException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.inteligr8.activiti.mq.DeliveredMessage;
import com.inteligr8.activiti.mq.GenericDestination;
import com.inteligr8.activiti.mq.MqCommunicator;
import com.inteligr8.activiti.mq.MqSubscriptionListener;
import com.inteligr8.activiti.mq.PreparedMessage;
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
import com.inteligr8.activiti.mq.model.DeliveredMessage;
import com.inteligr8.activiti.mq.model.PreparedMessage;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
@@ -72,7 +72,7 @@ public class AmqpCommunicator implements MqCommunicator {
}
@Override
public <BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException {
public <BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException {
String correlationId = message.getCorrelationId() != null ? message.getCorrelationId() : UUID.randomUUID().toString();
AMQP.BasicProperties bprops = new AMQP.BasicProperties(null, null,
@@ -82,7 +82,7 @@ public class AmqpCommunicator implements MqCommunicator {
correlationId,
message.getReplyToQueueName() != null ? message.getReplyToQueueName() : null,
null, // expiration
null, // messageId
UUID.randomUUID().toString(), // messageId
null, // timestamp
null, // type
null, null, null);
@@ -100,7 +100,7 @@ public class AmqpCommunicator implements MqCommunicator {
con.close();
}
return correlationId;
return AmqpDeliveredMessage.transform(bprops);
}
@Override

View File

@@ -8,9 +8,10 @@ import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.inteligr8.activiti.mq.AbstractMessage;
import com.inteligr8.activiti.mq.ByteArrayChannel;
import com.inteligr8.activiti.mq.DeliveredMessage;
import com.inteligr8.activiti.mq.model.AbstractMessage;
import com.inteligr8.activiti.mq.model.DeliveredMessage;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
/**
@@ -47,6 +48,19 @@ public class AmqpDeliveredMessage<BodyType> extends AbstractMessage implements D
this.getProperties().putAll(message.getProperties().getHeaders());
}
public AmqpDeliveredMessage(AMQP.BasicProperties props, Class<BodyType> bodyType) throws IOException {
this.setMessageId(props.getMessageId());
this.setCorrelationId(props.getCorrelationId());
}
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(AMQP.BasicProperties props) throws IOException {
return new AmqpDeliveredMessage<>(props, null);
}
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(AMQP.BasicProperties props, Class<BodyType> bodyType) throws IOException {
return new AmqpDeliveredMessage<>(props, bodyType);
}
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(Delivery message) throws IOException {
return new AmqpDeliveredMessage<>(message, null);
}

View File

@@ -12,8 +12,8 @@ import org.apache.commons.io.IOUtils;
import org.jgroups.util.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.inteligr8.activiti.mq.AbstractMessage;
import com.inteligr8.activiti.mq.PreparedMessage;
import com.inteligr8.activiti.mq.model.AbstractMessage;
import com.inteligr8.activiti.mq.model.PreparedMessage;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;

View File

@@ -14,12 +14,12 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.inteligr8.activiti.mq.DeliveredMessage;
import com.inteligr8.activiti.mq.GenericDestination;
import com.inteligr8.activiti.mq.MqCommunicator;
import com.inteligr8.activiti.mq.MqSubscriptionListener;
import com.inteligr8.activiti.mq.PreparedMessage;
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
import com.inteligr8.activiti.mq.model.DeliveredMessage;
import com.inteligr8.activiti.mq.model.PreparedMessage;
public class JmsCommunicator implements MqCommunicator {
@@ -68,7 +68,7 @@ public class JmsCommunicator implements MqCommunicator {
}
@Override
public <BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException {
public <BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException {
Connection con = this.connect();
try {
return this.send(con, destination, (JmsPreparedMessage<BodyType>) message);
@@ -77,18 +77,18 @@ public class JmsCommunicator implements MqCommunicator {
}
}
protected <BodyType> String send(Connection con, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
protected <BodyType> DeliveredMessage<BodyType> send(Connection con, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
Session session = this.start(con);
try {
String correlationId = this.send(session, destination, message);
DeliveredMessage<BodyType> dmessage = this.send(session, destination, message);
session.commit();
return correlationId;
return dmessage;
} finally {
session.close();
}
}
public <BodyType> String send(Session session, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
public <BodyType> DeliveredMessage<BodyType> send(Session session, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
MessageProducer messenger = session.createProducer(destination.toJmsQueue(session));
try {
if (destination.getDeliveryMode() != null)
@@ -103,7 +103,7 @@ public class JmsCommunicator implements MqCommunicator {
this.logger.debug("Sending message to queue: {}", destination.getQueueName());
messenger.send(jmsmsg);
return jmsmsg.getJMSCorrelationID();
return JmsDeliveredMessage.transform(jmsmsg);
} finally {
messenger.close();
}
@@ -143,37 +143,43 @@ public class JmsCommunicator implements MqCommunicator {
public <BodyType> JmsDeliveredMessage<BodyType> receive(Session session, GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener) throws JMSException, TimeoutException {
String messageSelector = correlationId == null ? null : ("JMSCorrelationID='" + correlationId + "'");
Message receivedMessage = null;
MessageConsumer messenger = session.createConsumer(destination.toJmsQueue(session), messageSelector);
try {
if (listener != null)
listener.consuming(messenger);
if (timeoutInMillis < 0L) {
this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName());
return JmsDeliveredMessage.transform(messenger.receive());
} else if (timeoutInMillis == 0L) {
this.logger.debug("Checking for message without waiting: {}", destination.getQueueName());
return JmsDeliveredMessage.transform(messenger.receiveNoWait());
} else {
this.logger.debug("Waiting for message for {} ms: {}", timeoutInMillis, destination.getQueueName());
try {
long startTime = System.currentTimeMillis();
Message message = messenger.receive(timeoutInMillis);
long elapsedTime = System.currentTimeMillis() - startTime;
if (message != null) {
this.logger.debug("Received message after {} ms: {}", elapsedTime, destination.getQueueName());
return JmsDeliveredMessage.transform(message);
if (timeoutInMillis < 0L) {
this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName());
receivedMessage = messenger.receive();
} else if (timeoutInMillis == 0L) {
this.logger.debug("Checking for message without waiting: {}", destination.getQueueName());
receivedMessage = messenger.receiveNoWait();
} else {
this.logger.debug("Waiting for message for {} ms: {}", timeoutInMillis, destination.getQueueName());
receivedMessage = messenger.receive(timeoutInMillis);
}
if (elapsedTime < timeoutInMillis) {
throw new JMSException("The reading of the queue ended prematurely");
} else {
throw new TimeoutException("A timeout of " + timeoutInMillis + " ms was reached");
long elapsedTime = System.currentTimeMillis() - startTime;
if (receivedMessage != null) {
this.logger.debug("Received message after {} ms: {}", elapsedTime, destination.getQueueName());
return JmsDeliveredMessage.transform(receivedMessage);
}
if (timeoutInMillis > 0L) {
if (elapsedTime >= timeoutInMillis)
throw new TimeoutException("A timeout of " + timeoutInMillis + " ms was reached");
this.logger.debug("Done waiting for message after {} minutes: {}", elapsedTime / 60000L, destination.getQueueName());
}
return null;
} finally {
if (listener != null)
listener.consumed(messenger);
}
} finally {
if (listener != null)
listener.consumed(messenger);
messenger.close();
}
}

View File

@@ -20,8 +20,8 @@ import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.inteligr8.activiti.mq.AbstractMessage;
import com.inteligr8.activiti.mq.DeliveredMessage;
import com.inteligr8.activiti.mq.model.AbstractMessage;
import com.inteligr8.activiti.mq.model.DeliveredMessage;
public class JmsDeliveredMessage<BodyType> extends AbstractMessage implements DeliveredMessage<BodyType> {

View File

@@ -14,8 +14,8 @@ import javax.jms.StreamMessage;
import org.apache.commons.io.IOUtils;
import org.jgroups.util.UUID;
import com.inteligr8.activiti.mq.AbstractMessage;
import com.inteligr8.activiti.mq.PreparedMessage;
import com.inteligr8.activiti.mq.model.AbstractMessage;
import com.inteligr8.activiti.mq.model.PreparedMessage;
public class JmsPreparedMessage<BodyType> extends AbstractMessage implements PreparedMessage<BodyType> {

View File

@@ -1,4 +1,4 @@
package com.inteligr8.activiti.mq;
package com.inteligr8.activiti.mq.model;
import java.util.HashMap;
import java.util.Map;

View File

@@ -1,4 +1,4 @@
package com.inteligr8.activiti.mq;
package com.inteligr8.activiti.mq.model;
import java.time.OffsetDateTime;

View File

@@ -1,4 +1,4 @@
package com.inteligr8.activiti.mq;
package com.inteligr8.activiti.mq.model;
public interface Message {

View File

@@ -1,4 +1,4 @@
package com.inteligr8.activiti.mq;
package com.inteligr8.activiti.mq.model;
import java.util.HashMap;
import java.util.Map;

View File

@@ -7,8 +7,6 @@ import java.util.concurrent.TimeoutException;
import javax.annotation.PostConstruct;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -17,6 +15,9 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.inteligr8.activiti.mq.model.DeliveredMessage;
import com.inteligr8.activiti.mq.model.PreparedMessage;
@Component
public class MQDockerIT {
@@ -106,15 +107,18 @@ public class MQDockerIT {
OffsetDateTime beforeSend = OffsetDateTime.now();
this.logger.debug("Timestamp before send: {}", beforeSend);
String correlationId = communicator.send(destination, message);
Assertions.assertNotNull(correlationId);
DeliveredMessage<String> sentMessage = communicator.send(destination, message);
Assertions.assertNotNull(sentMessage);
Assertions.assertNotNull(sentMessage.getMessageId());
Assertions.assertNotNull(sentMessage.getCorrelationId());
DeliveredMessage<String> receivedMessage = communicator.receive(destination, 25L, correlationId);
DeliveredMessage<String> receivedMessage = communicator.receive(destination, 25L, sentMessage.getCorrelationId());
Assertions.assertNotNull(receivedMessage);
this.logger.debug("Timestamp of delivery: {}", receivedMessage.getDeliveryTime());
Assertions.assertNotNull(receivedMessage.getMessageId());
Assertions.assertEquals(correlationId, receivedMessage.getCorrelationId());
Assertions.assertEquals(sentMessage.getMessageId(), receivedMessage.getMessageId());
Assertions.assertEquals(sentMessage.getCorrelationId(), receivedMessage.getCorrelationId());
Assertions.assertEquals(message.getReplyToQueueName(), receivedMessage.getReplyToQueueName());
Assertions.assertEquals("{}", receivedMessage.getContent());
Assertions.assertNotNull(receivedMessage.getDeliveryTime());

View File

@@ -0,0 +1,3 @@
javascript
js
ecmascript

24
src/test/vscode/test.http Normal file
View File

@@ -0,0 +1,24 @@
@baseUrl = http://localhost:8080/activiti-app
@username = admin@app.activiti.com
@password = admin
@basic = YWRtaW5AYXBwLmFjdGl2aXRpLmNvbTphZG1pbg==
###
# @name getPis
POST {{baseUrl}}/api/enterprise/historic-process-instances/query
Authorization: BASIC {{username}}:{{password}}
Content-type: application/json
{
"finished": false
}
###
# @name getPis2
GET {{baseUrl}}/api/runtime/process-instances?tenantId=tenant_1
Authorization: BASIC {{username}}:{{password}}
###
# @name getExecs
GET {{baseUrl}}/api/runtime/executions?tenantId=tenant_1
Authorization: BASIC {{username}}:{{password}}