Compare commits
24 Commits
Author | SHA1 | Date | |
---|---|---|---|
bf6ba664f5 | |||
d5fe8e3176 | |||
7020a6569c | |||
893596898b | |||
61c552fdd2 | |||
1d2250abd4 | |||
c1cc36afc2 | |||
0d9b40f020 | |||
b8d75dd95f | |||
2bf612cac2 | |||
e3d49c2811 | |||
5234aad45d | |||
6089c6c69a | |||
8b90e42614 | |||
1d98e90e97 | |||
740843908d | |||
e593d50bb8 | |||
7a5065419a | |||
d3c79cac9c | |||
5aef2d4446 | |||
dfb24cbd1f | |||
7a67634cfb | |||
41b5271617 | |||
cc14a59959 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,6 +1,7 @@
|
||||
# Maven
|
||||
target
|
||||
pom.xml.versionsBackup
|
||||
*-pom.xml
|
||||
|
||||
# Eclipse
|
||||
.project
|
||||
|
@@ -6,6 +6,10 @@
|
||||
|
||||
This delegate sends a message to an MQ queue.
|
||||
|
||||
The process instance will continue after a successful publish of the message by this delegate to the specified queue. Use `mqSubscribeReplyDelegate` in a subsequent task to handle the response asynchronously.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqSubscribeReplyDelegate` task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
|
||||
*See Also*: [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md)
|
||||
*See Also*: [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md)
|
||||
|
||||
@@ -20,21 +24,25 @@ ${mqPublishDelegate}
|
||||
You may use it in a **Service Task**.
|
||||
|
||||
|
||||
This method sends a message to an MQ queue.
|
||||
|
||||
It does not wait for a response and will continue after a successful publish of the message to the specified queue. Use `mqSubscribeReplyDelegate` in a subsequent task to handle the response asynchronously.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqPublishDelegate` task.
|
||||
This method makes this bean an Activiti delegate.
|
||||
|
||||
| Input Type | Name | Java Type | Documentation |
|
||||
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
|
||||
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
|
||||
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set the `mq_messageId` result variable in the process instance scope; otherwise the variable will be local to the task. `mq_correlationId` is always at the process scope. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process. |
|
||||
| BPMN Field | `mq_priority` | | [optional] A priority of the MQ message. May be an expression. Value depends on MQ protocol. |
|
||||
| BPMN Field | `mq_payload` | | [optional] The body of the MQ message. May include expressions. |
|
||||
| BPMN Field | `mq_payloadMimeType` | | [optional] The MIME type of the body of the MQ message. |
|
||||
| BPMN Field | `mq_replyQueueName` | | [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send a reply. |
|
||||
| BPMN Field | `mq_statusQueueName` | | [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send status updates. |
|
||||
| Activiti Variable | `mq_correlationId` | | [optional] The correlationId of the message to send. |
|
||||
|
||||
| Result Type | Java Type, Name, or Error Code | Documentation |
|
||||
| ------------------------ | ------------------------------------------------ | -------------------------------- |
|
||||
| Activiti Variable | `mq_correlationId` | The correlationId of the message sent. |
|
||||
| Activiti Variable | `mq_messageId` | The messageId of the message sent. |
|
||||
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
|
||||
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |
|
||||
| Thrown BPMN Error | `mq` | An unknown MQ issue occurred. |
|
||||
|
@@ -6,6 +6,14 @@
|
||||
|
||||
This delegate listens for messages on an MQ queue.
|
||||
|
||||
This is expected to exist in a Service Task immediately after a plain start activity. This will cause process instances to automatically be created in order to maintain the MQ subscription as messages are received. If used in any other way, it will error and the process will fail validation.
|
||||
|
||||
This does not wait for a response to a specific message, but instead to all messages put on an MQ queue. That is for the `mqSubscribeReplyDelegate` delegate.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
|
||||
This requires a long running connection to MQ. It runs in a long running Activiti job/execution. If there is a failure or the server is restarted, the Activiti job will fail and automatically retry per the Activiti standard features. After exhausting retries, it may eventually dead-letter. Retry the job to continue the subscription.
|
||||
|
||||
*See Also*: [`mqPublishDelegate`](bean-mqPublishDelegate.md)
|
||||
*See Also*: [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md)
|
||||
|
||||
@@ -20,22 +28,15 @@ ${mqSubscribeDelegate}
|
||||
You may use it in a **Service Task**.
|
||||
|
||||
|
||||
This delegate listens for messages on an MQ queue.
|
||||
|
||||
It does not wait for a response to a specific message put on an MQ queue. That is for the `mqPublishDelegate` and `mqSubscribeReplyDelegate` tasks.
|
||||
|
||||
When used, this task must be the first task after a plain start event in a process. If used in any other way, it will error and the process will fail validation.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
|
||||
TODO map response to variables
|
||||
This method makes this bean an Activiti delegate.
|
||||
|
||||
| Input Type | Name | Java Type | Documentation |
|
||||
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
|
||||
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
|
||||
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process. |
|
||||
| BPMN Field | `mq_concurrency` | | The number of process instances to simultaneously listen on the queue. Only positive numbers are accepted. |
|
||||
|
||||
| Result Type | Java Type, Name, or Error Code | Documentation |
|
||||
| ------------------------ | ------------------------------------------------ | -------------------------------- |
|
||||
@@ -43,7 +44,10 @@ TODO map response to variables
|
||||
| Activiti Variable | `mq_messageId` | The unique message identifer of the message received. |
|
||||
| Activiti Variable | `mq_deliveryTime` | The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti. |
|
||||
| Activiti Variable | `mq_priority` | An integer priority of the message; value depends on MQ protocol. |
|
||||
| Activiti Variable | `mq_replyQueueName` | The name of a queue or topic to use to reply to the received message. The reply message must have the same correlating identifier. |
|
||||
| Activiti Variable | `mq_payload` | The body of the MQ message. May include expressions. May be `null`. |
|
||||
| Activiti Variable | `mq_payloadMimeType` | The MIME type of the body of the MQ message. May be `null`. |
|
||||
| Activiti Variable | `mq_replyQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
|
||||
| Activiti Variable | `mq_statusQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
|
||||
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
|
||||
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |
|
||||
| Thrown BPMN Error | `mq` | An unknown MQ issue occurred. |
|
||||
|
@@ -4,10 +4,16 @@
|
||||
*Since Version*: 1.0
|
||||
|
||||
|
||||
This delegate listens for a reply message on an MQ queue.
|
||||
This method listens for a reply message on an MQ queue.
|
||||
|
||||
*See Also*: mqPublishDelegate
|
||||
*See Also*: mqSubscribeDelegate
|
||||
The process instance will block until a corresponding reply message is received. It uses the `mq_correlationId` variable to select the corresponding reply message. That variable is automatically set by the `mqPublishDelegate` task. This is meant to be used after `mqPublishDelegate` and not just by itself. If you want to start processes with an MQ subscription see the `mqSubscribeDelegate` task.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqPublishDelegate` task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
|
||||
This requires a long running connection to MQ. It runs in a long running Activiti job/execution. If there is a failure or the server is restarted, the Activiti job will fail and automatically retry per the Activiti standard features. After exhausting retries, it may eventually dead-letter. Retry the job to continue the subscription.
|
||||
|
||||
*See Also*: [`mqPublishDelegate`](bean-mqPublishDelegate.md)
|
||||
*See Also*: [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md)
|
||||
|
||||
## <a name="delegate"></a> Delegate Expression Uses
|
||||
|
||||
@@ -20,20 +26,14 @@ ${mqSubscribeReplyDelegate}
|
||||
You may use it in a **Service Task**.
|
||||
|
||||
|
||||
This method listens for a reply message on an MQ queue.
|
||||
|
||||
It uses the `mq_correlationId` variable to select the corresponding reply message. That variable is automatically set by the `mqPublishDelegate` task. This is meant to be used after `mqPublishDelegate` and not just by itself. If you want to start processes with an MQ subscription see the `mqSubscribeDelegate` task.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqPublishDelegate` task.
|
||||
|
||||
TODO map response to variables
|
||||
This method makes this bean an Activiti delegate.
|
||||
|
||||
| Input Type | Name | Java Type | Documentation |
|
||||
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
|
||||
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
|
||||
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process. |
|
||||
| Activiti Variable | `mq_correlationId` | | The correlating identifier of the message to receive. Used to correlate between sent/received messages; like a thread of communication. |
|
||||
|
||||
| Result Type | Java Type, Name, or Error Code | Documentation |
|
||||
@@ -41,7 +41,10 @@ TODO map response to variables
|
||||
| Activiti Variable | `mq_messageId` | The unique message identifer of the message received. |
|
||||
| Activiti Variable | `mq_deliveryTime` | The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti. |
|
||||
| Activiti Variable | `mq_priority` | An integer priority of the message; value depends on MQ protocol. |
|
||||
| Activiti Variable | `mq_replyQueueName` | The name of a queue or topic to use to reply to the received message. The reply message must have the same correlating identifier. |
|
||||
| Activiti Variable | `mq_payload` | The body of the MQ message. May include expressions. May be `null`. |
|
||||
| Activiti Variable | `mq_payloadMimeType` | The MIME type of the body of the MQ message. May be `null`. |
|
||||
| Activiti Variable | `mq_replyQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
|
||||
| Activiti Variable | `mq_statusQueueName` | The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`. |
|
||||
| Thrown BPMN Error | `correlation` | `mq_correlationId` is required. |
|
||||
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
|
||||
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |
|
||||
|
@@ -23,7 +23,7 @@ It is important to note that Activiti expressions use the JUEL language and Acti
|
||||
| -------------------------------- | ------------------------- |
|
||||
| [`mqPublishDelegate`](bean-mqPublishDelegate.md) | This delegate sends a message to an MQ queue. |
|
||||
| [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md) | This delegate listens for messages on an MQ queue. |
|
||||
| [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md) | This delegate listens for a reply message on an MQ queue. |
|
||||
| [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md) | This method listens for a reply message on an MQ queue. |
|
||||
|
||||
---
|
||||
|
||||
|
43
pom.xml
43
pom.xml
@@ -11,7 +11,7 @@
|
||||
</parent>
|
||||
|
||||
<artifactId>mq-activiti-ext</artifactId>
|
||||
<version>1.0.4</version>
|
||||
<version>1.0.10</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>MQ Activiti Extension</name>
|
||||
@@ -27,7 +27,7 @@
|
||||
<spring.version>5.3.31</spring.version>
|
||||
<activiti.version>7.11.1</activiti.version>
|
||||
<tomcat-rad.version>9-2.2</tomcat-rad.version>
|
||||
<aps.tomcat.opts>-Dinteligr8.mq.connectors.docker-mq.url=failover:\(tcp://${project.artifactId}-mq:61616\)?timeout=3000 -Dinteligr8.mq.connectors.docker-mq.username=admin -Dinteligr8.mq.connectors.docker-mq.password=admin</aps.tomcat.opts>
|
||||
<aps.tomcat.opts>-Dinteligr8.mq.connectors.docker-mq.url=failover:\(tcp://${project.artifactId}-mq:61616\)?timeout=3000 -Dinteligr8.mq.connectors.docker-mq.username=admin -Dinteligr8.mq.connectors.docker-mq.password=admin -Dvalidator.editor.dmn.espression=false -Dvalidator.editor.bpmn.disable.scripttask=false -Djavascript.secure-scripting.enabled=false -Djavascript.secure-scripting.enable-class-whitelisting=false -Dbeans.whitelisting.enabled=false -Del.whitelisting.enabled=false -Dshell.whitelisting.enabled=false</aps.tomcat.opts>
|
||||
|
||||
<!-- reloads in APS are slower than a restart -->
|
||||
<aps.hotswap.enabled>false</aps.hotswap.enabled>
|
||||
@@ -140,6 +140,7 @@
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>5.10.5</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
@@ -189,16 +190,6 @@
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>3.7.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>assemble-jar</id>
|
||||
<phase>package</phase>
|
||||
<goals><goal>single</goal></goals>
|
||||
<configuration>
|
||||
<descriptorRefs>
|
||||
<descriptorRef>jar-with-dependencies</descriptorRef>
|
||||
</descriptorRefs>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>package-stencil</id>
|
||||
<phase>package</phase>
|
||||
@@ -223,6 +214,34 @@
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.6.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>shade-deps</id>
|
||||
<phase>package</phase>
|
||||
<goals><goal>shade</goal></goals>
|
||||
<configuration>
|
||||
<shadedArtifactAttached>true</shadedArtifactAttached>
|
||||
<relocations>
|
||||
<relocation>
|
||||
<shadedPattern>shaded.mq.</shadedPattern>
|
||||
<excludes>
|
||||
<!-- anything in the root directory of a JAR -->
|
||||
<exclude>*</exclude>
|
||||
<!-- anything in the META-INF directory of a JAR -->
|
||||
<exclude>META-INF/**</exclude>
|
||||
<!-- anything developed in this extension -->
|
||||
<exclude>com.activiti.extension.**</exclude>
|
||||
<exclude>com.inteligr8.activiti.mq.**</exclude>
|
||||
</excludes>
|
||||
</relocation>
|
||||
</relocations>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-failsafe-plugin</artifactId>
|
||||
<configuration>
|
||||
|
@@ -3087,8 +3087,7 @@
|
||||
"mq_prioritypackage",
|
||||
"mq_payloadpackage",
|
||||
"mq_replyQueueNamepackage",
|
||||
"mq_statusQueueNamepackage",
|
||||
"mq_metadataProcessScopepackage"
|
||||
"mq_statusQueueNamepackage"
|
||||
],
|
||||
"hiddenPropertyPackages": [
|
||||
"multiinstance_typepackage",
|
||||
|
@@ -0,0 +1,69 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventType;
|
||||
import org.activiti.engine.impl.persistence.entity.Entity;
|
||||
|
||||
public interface ActivitiEntityEventListener<T extends Entity> {
|
||||
|
||||
/**
|
||||
* This method checks to see if this listener is for the specified entity.
|
||||
*
|
||||
* @param entity An Activiti entity.
|
||||
* @return `true` if this listener supports the entity; `false` otherwise.
|
||||
*/
|
||||
default boolean ofEntityType(Entity entity) {
|
||||
return entity == null ? false : this.ofEntityType(entity.getClass());
|
||||
}
|
||||
|
||||
/**
|
||||
* This method checks to see if this listener is for the specified entity
|
||||
* class.
|
||||
*
|
||||
* @param entityType An Activiti entity class.
|
||||
* @return `true` if this listener supports the entity type; `false` otherwise.
|
||||
*/
|
||||
boolean ofEntityType(Class<? extends Entity> entityType);
|
||||
|
||||
/**
|
||||
* This method allows for initialization on application startup. Any
|
||||
* resources this listener should subscribe to, should be connected or
|
||||
* opened.
|
||||
*/
|
||||
default void onApplicationStartup() {
|
||||
}
|
||||
|
||||
/**
|
||||
* This method allows for uninitialization on application shutdown. Any
|
||||
* resources this listener subscribes to should be disconnected or closed.
|
||||
*
|
||||
* The `ActivitiEntityEventListener#onEvent()` method may still be called
|
||||
* before startup or after shutdown.
|
||||
*/
|
||||
default void onApplicationShutdown() {
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is fired every time there is a qualifying entity event.
|
||||
*
|
||||
* @param aaevent An Activiti entity event.
|
||||
*/
|
||||
default void onEntityEvent(ActivitiEntityEvent aaevent) {
|
||||
@SuppressWarnings("unchecked")
|
||||
T entity = (T) aaevent.getEntity();
|
||||
|
||||
this.onEntityEvent(aaevent.getType(), aaevent.getProcessDefinitionId(), aaevent.getProcessInstanceId(), aaevent.getExecutionId(), entity);
|
||||
}
|
||||
|
||||
/**
|
||||
* Thi smethod is fired every time there is a qualifying entity event.
|
||||
*
|
||||
* @param eventType An Activiti event type; limited to `ENTITY_*`.
|
||||
* @param processDefinitionId The unique identifier of the process definition subject to the event.
|
||||
* @param processInstanceId The unique identifier of the process instance subject to the event.
|
||||
* @param executionId The unique identifier of the execution subject to the event.
|
||||
* @param entity The Activiti entity.
|
||||
*/
|
||||
void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId, String executionId, T entity);
|
||||
|
||||
}
|
@@ -0,0 +1,111 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventListener;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventType;
|
||||
import org.activiti.engine.impl.persistence.entity.Entity;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.event.ApplicationContextEvent;
|
||||
import org.springframework.context.event.ContextClosedEvent;
|
||||
import org.springframework.context.event.ContextRefreshedEvent;
|
||||
import org.springframework.context.event.ContextStartedEvent;
|
||||
import org.springframework.context.event.ContextStoppedEvent;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class ActivitiEntityEventMonitor implements ActivitiEventListener, ApplicationListener<ApplicationContextEvent> {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(ActivitiEntityEventMonitor.class);
|
||||
|
||||
@Autowired
|
||||
private List<ActivitiEntityEventListener<? extends Entity>> listeners;
|
||||
|
||||
@Autowired
|
||||
private ProcessEngine services;
|
||||
|
||||
/**
|
||||
* This method is fired by Spring, the framework behind Activiti. This
|
||||
* forwards the application event to each listener so they can initialize
|
||||
* or uninitialize.
|
||||
*/
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationContextEvent event) {
|
||||
if (event instanceof ContextRefreshedEvent || event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent) {
|
||||
this.logger.debug("Application context refresh/stop/close detected; shutting down listeners: {}", event);
|
||||
|
||||
for (ActivitiEntityEventListener<? extends Entity> listener : this.listeners) {
|
||||
listener.onApplicationShutdown();
|
||||
}
|
||||
}
|
||||
|
||||
// the listener cannot be active until the context is initialized
|
||||
if (event instanceof ContextRefreshedEvent || event instanceof ContextStartedEvent) {
|
||||
this.logger.debug("Application context refresh/start detected; starting up listeners: {}", event);
|
||||
|
||||
for (ActivitiEntityEventListener<? extends Entity> listener : this.listeners) {
|
||||
listener.onApplicationStartup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will start monitoring for changes in deployment, app, and
|
||||
* process definitions.
|
||||
*/
|
||||
@PostConstruct
|
||||
protected void init() {
|
||||
this.logger.debug("Bean initialized; starting to listen for any entity event ...");
|
||||
|
||||
this.services.getRuntimeService().addEventListener(this,
|
||||
ActivitiEventType.ENTITY_CREATED,
|
||||
ActivitiEventType.ENTITY_INITIALIZED,
|
||||
ActivitiEventType.ENTITY_UPDATED,
|
||||
ActivitiEventType.ENTITY_ACTIVATED,
|
||||
ActivitiEventType.ENTITY_SUSPENDED,
|
||||
ActivitiEventType.ENTITY_DELETED);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will stop monitoring for changes in deployment, app, and
|
||||
* process definitions.
|
||||
*/
|
||||
@PreDestroy
|
||||
protected void uninit() {
|
||||
this.logger.debug("Bean uninitialized; stopping listener for any entity event ...");
|
||||
|
||||
this.services.getRuntimeService().removeEventListener(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is fired by the Activiti platform. It is called on every
|
||||
* entity event.
|
||||
*
|
||||
* @param event An Activiti event.
|
||||
*/
|
||||
@Override
|
||||
public void onEvent(ActivitiEvent event) {
|
||||
this.logger.trace("Triggered by event: {}: {}; pdId: {}; piId: {}; execId: {}", event.getType(), event.getClass(), event.getProcessDefinitionId(), event.getProcessInstanceId(), event.getExecutionId());
|
||||
|
||||
for (ActivitiEntityEventListener<? extends Entity> listener : this.listeners) {
|
||||
ActivitiEntityEvent aaevent = (ActivitiEntityEvent) event;
|
||||
if (aaevent.getEntity() instanceof Entity && listener.ofEntityType((Entity) aaevent.getEntity()))
|
||||
listener.onEntityEvent(aaevent);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFailOnException() {
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
@@ -1,455 +0,0 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
import org.activiti.bpmn.model.BpmnModel;
|
||||
import org.activiti.bpmn.model.FieldExtension;
|
||||
import org.activiti.bpmn.model.FlowElement;
|
||||
import org.activiti.bpmn.model.SequenceFlow;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
import org.activiti.bpmn.model.StartEvent;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.JavaDelegate;
|
||||
import org.activiti.engine.delegate.event.ActivitiActivityEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventListener;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventType;
|
||||
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
|
||||
import org.activiti.engine.impl.context.Context;
|
||||
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
|
||||
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
|
||||
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
|
||||
import org.activiti.engine.repository.ProcessDefinition;
|
||||
import org.activiti.engine.repository.ProcessDefinitionQuery;
|
||||
import org.activiti.engine.runtime.Execution;
|
||||
import org.activiti.engine.runtime.ExecutionQuery;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.event.ApplicationContextEvent;
|
||||
import org.springframework.context.event.ContextClosedEvent;
|
||||
import org.springframework.context.event.ContextRefreshedEvent;
|
||||
import org.springframework.context.event.ContextStartedEvent;
|
||||
import org.springframework.context.event.ContextStoppedEvent;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.activiti.domain.idm.Tenant;
|
||||
|
||||
@Component
|
||||
public class MQProcessDefinitionMonitor implements ActivitiEventListener, ApplicationListener<ApplicationContextEvent> {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(MQProcessDefinitionMonitor.class);
|
||||
private final Pattern expressionPattern = Pattern.compile("\\$\\{(.+)\\}");
|
||||
|
||||
@Autowired
|
||||
private ProcessEngine services;
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext context;
|
||||
|
||||
@Autowired
|
||||
private TenantFinderService tenantFinderService;
|
||||
|
||||
@Autowired
|
||||
private MqSubscriptionService subscriptionService;
|
||||
|
||||
private Map<String, AbstractActivityListener> activeListeners = new HashMap<>();
|
||||
|
||||
/**
|
||||
* This method will add/remove listeners to the specific process
|
||||
* definitions that use MQ subscriptions to start process instances at
|
||||
* application startup and shutdown.
|
||||
*/
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationContextEvent event) {
|
||||
if (event instanceof ContextRefreshedEvent || event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent) {
|
||||
this.logger.debug("Discovered {} active process definitions to stop listening to", this.activeListeners.size());
|
||||
this.deloopMqSubscribeTasks();
|
||||
}
|
||||
|
||||
if (event instanceof ContextRefreshedEvent || event instanceof ContextStartedEvent) {
|
||||
String tenantId = this.findTenantId();
|
||||
List<ProcessDefinition> procDefs = this.findLatestActiveProcessDefinnitions(tenantId);
|
||||
this.logger.debug("Found {} active process definitions", procDefs.size());
|
||||
for (ProcessDefinition procDef : procDefs) {
|
||||
this.logger.trace("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId());
|
||||
|
||||
ServiceTask task = this.findMqStartSubscribeTask(procDef.getId());
|
||||
if (task == null)
|
||||
continue;
|
||||
|
||||
int concurrency = this.determineConcurrency(task);
|
||||
this.logger.debug("Process definition MQ subscription is configured for concurrency: {}: {}", procDef.getId(), concurrency);
|
||||
|
||||
List<Execution> execs = this.findExecutionsByServiceTask(tenantId, procDef.getId(), task);
|
||||
this.logger.debug("Process appears to have {} executions waiting on the MQ subscription: {}", execs.size(), procDef.getId());
|
||||
|
||||
if (execs.size() < concurrency) {
|
||||
this.logger.info("Process has {} too few executions waiting on the MQ subscription; starting them: {}", (concurrency - execs.size()), procDef.getId());
|
||||
for (int thread = execs.size(); thread < concurrency; thread++) {
|
||||
this.loopMqSubscribeTask(procDef.getId(), task);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected String findTenantId() {
|
||||
Tenant tenant = this.tenantFinderService.findTenant();
|
||||
return this.tenantFinderService.transform(tenant);
|
||||
}
|
||||
|
||||
protected List<ProcessDefinition> findLatestActiveProcessDefinnitions(String tenantId) {
|
||||
ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery()
|
||||
.latestVersion()
|
||||
.active();
|
||||
if (tenantId == null) {
|
||||
procDefQuery.processDefinitionWithoutTenantId();
|
||||
} else {
|
||||
procDefQuery.processDefinitionTenantId(tenantId);
|
||||
}
|
||||
|
||||
return procDefQuery.list();
|
||||
}
|
||||
|
||||
protected List<Execution> findExecutionsByServiceTask(String tenantId, String processDefinitionId, ServiceTask task) {
|
||||
ExecutionQuery execQuery = this.services.getRuntimeService().createExecutionQuery()
|
||||
.processDefinitionId(processDefinitionId)
|
||||
.activityId(task.getId());
|
||||
if (tenantId == null) {
|
||||
execQuery.executionWithoutTenantId();
|
||||
} else {
|
||||
execQuery.executionTenantId(tenantId);
|
||||
}
|
||||
|
||||
return execQuery.list();
|
||||
}
|
||||
|
||||
protected Integer findConcurrency(ServiceTask task) {
|
||||
for (FieldExtension fieldext : task.getFieldExtensions()) {
|
||||
if (fieldext.getFieldName().equals(Constants.FIELD_CONCURRENCY)) {
|
||||
String concurrencyStr = StringUtils.trimToNull(fieldext.getStringValue());
|
||||
return concurrencyStr == null ? null : Integer.valueOf(concurrencyStr);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
protected int determineConcurrency(ServiceTask task) {
|
||||
Integer concurrency = this.findConcurrency(task);
|
||||
if (concurrency == null) {
|
||||
return 1;
|
||||
} else if (concurrency.intValue() < 1) {
|
||||
this.logger.warn("The task defines an illegal concurrency of {}; using 1: {}", concurrency, task.getId());
|
||||
return 1;
|
||||
} else {
|
||||
return concurrency.intValue();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will start monitoring for changes in deployment, app, and
|
||||
* process definitions.
|
||||
*/
|
||||
@PostConstruct
|
||||
protected void init() {
|
||||
this.services.getRuntimeService().addEventListener(this,
|
||||
ActivitiEventType.ENTITY_INITIALIZED,
|
||||
ActivitiEventType.ENTITY_UPDATED,
|
||||
ActivitiEventType.ENTITY_ACTIVATED,
|
||||
ActivitiEventType.ENTITY_SUSPENDED,
|
||||
ActivitiEventType.ENTITY_DELETED);
|
||||
this.logger.debug("Started listening for entity events");
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will stop monitoring for changes in deployment, app, and
|
||||
* process definitions.
|
||||
*/
|
||||
@PreDestroy
|
||||
protected void uninit() {
|
||||
this.logger.debug("Stopping listening for entity events ...");
|
||||
this.services.getRuntimeService().removeEventListener(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is fired by the Activiti platform. It is called on every
|
||||
* entity event except the creation event. You can see that in the init()
|
||||
* method above.
|
||||
*/
|
||||
@Override
|
||||
public void onEvent(ActivitiEvent event) {
|
||||
this.logger.trace("Triggered by event: {}", event);
|
||||
this.onEntityEvent((ActivitiEntityEvent) event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFailOnException() {
|
||||
return true;
|
||||
}
|
||||
|
||||
protected void onEntityEvent(ActivitiEntityEvent aaevent) {
|
||||
this.logger.trace("Triggered by entity: {}", aaevent.getEntity());
|
||||
|
||||
if (aaevent.getEntity() instanceof ProcessDefinitionEntity) {
|
||||
this.logger.debug("Triggered by process definition state change: {}", aaevent.getEntity());
|
||||
|
||||
switch (aaevent.getType()) {
|
||||
case ENTITY_INITIALIZED:
|
||||
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
|
||||
// we must use DeploymentEntity and then dig down for the process definitions
|
||||
break;
|
||||
case ENTITY_DELETED:
|
||||
case ENTITY_SUSPENDED:
|
||||
// we need to stop the listener
|
||||
this.onProcessDefinitionRemoveEvent((ProcessDefinitionEntity) aaevent.getEntity());
|
||||
break;
|
||||
default:
|
||||
// we need to start a listener
|
||||
this.onProcessDefinitionAddEvent((ProcessDefinitionEntity) aaevent.getEntity());
|
||||
}
|
||||
// we only want to deal with ProcessDefinition entities
|
||||
} else if (aaevent.getEntity() instanceof DeploymentEntity) {
|
||||
switch (aaevent.getType()) {
|
||||
case ENTITY_INITIALIZED:
|
||||
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
|
||||
// we must use DeploymentEntity and then dig down for the process definitions
|
||||
this.onDeploymentAddEvent((DeploymentEntity) aaevent.getEntity());
|
||||
break;
|
||||
default:
|
||||
}
|
||||
// we only want to deal with ProcessDefinition entities
|
||||
}
|
||||
}
|
||||
|
||||
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
|
||||
this.logger.debug("Triggered by process definition addition: {}", entity);
|
||||
|
||||
this.unsubscribeOtherMqSubscribeTasks(entity);
|
||||
|
||||
ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
|
||||
if (task == null)
|
||||
return;
|
||||
|
||||
this.loopMqSubscribeTask(entity.getId(), task);
|
||||
}
|
||||
|
||||
protected void onDeploymentAddEvent(DeploymentEntity entity) {
|
||||
this.logger.debug("Triggered by deployment addition: {}", entity);
|
||||
|
||||
List<ProcessDefinitionEntity> procDefEntities = entity.getDeployedArtifacts(ProcessDefinitionEntity.class);
|
||||
if (procDefEntities == null)
|
||||
return;
|
||||
this.logger.debug("Found {} process definitions in deployment: {}", procDefEntities.size(), entity.getId());
|
||||
|
||||
for (ProcessDefinitionEntity procDefEntity : procDefEntities) {
|
||||
this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName());
|
||||
|
||||
this.unsubscribeOtherMqSubscribeTasks(procDefEntity);
|
||||
|
||||
ServiceTask task = this.findMqStartSubscribeTask(procDefEntity.getId());
|
||||
if (task == null)
|
||||
return;
|
||||
|
||||
this.loopMqSubscribeTask(procDefEntity.getId(), task);
|
||||
}
|
||||
}
|
||||
|
||||
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
|
||||
this.logger.debug("Triggered by process definition removal: {}", entity);
|
||||
|
||||
this.unsubscribeOtherMqSubscribeTasks(entity);
|
||||
|
||||
ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
|
||||
if (task == null)
|
||||
return;
|
||||
|
||||
this.deloopMqSubscribeTask(entity.getId());
|
||||
}
|
||||
|
||||
protected void unsubscribeOtherMqSubscribeTasks(ProcessDefinitionEntity procDef) {
|
||||
this.unsubscribeOtherMqSubscribeTasks(procDef.getId());
|
||||
}
|
||||
|
||||
protected void unsubscribeOtherMqSubscribeTasks(String procDefId) {
|
||||
try {
|
||||
Set<String> executionIds = this.subscriptionService.clearOtherVersions(procDefId);
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
|
||||
} else {
|
||||
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
this.logger.error("The subscriptions could not be cleared: " + procDefId, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized void loopMqSubscribeTask(String processDefId, ServiceTask task) {
|
||||
// start a process instance on the process
|
||||
this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue", processDefId);
|
||||
this.services.getRuntimeService().startProcessInstanceById(processDefId);
|
||||
|
||||
if (this.activeListeners.containsKey(processDefId))
|
||||
// one listener, no matter how many instances are subscribed
|
||||
return;
|
||||
|
||||
AbstractActivityListener listener = new AbstractActivityListener(processDefId, task) {
|
||||
@Override
|
||||
public void onEvent(ActivitiActivityEvent event) {
|
||||
logger.debug("MQ message received; starting another process instance on process '{}' to re-subscribe to the MQ queue", event.getProcessDefinitionId());
|
||||
services.getRuntimeService().startProcessInstanceById(processDefId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFailOnException() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
// the ServiceTask will then execute, waiting for a message on the queue
|
||||
// when a message is received, it will be responsible for starting a new process instance
|
||||
// that new process instance will wait for the next message on the queue
|
||||
// repeat
|
||||
this.services.getRuntimeService().addEventListener(listener, ActivitiEventType.ACTIVITY_COMPLETED);
|
||||
|
||||
// register the listener so we can possibly remove it later
|
||||
this.activeListeners.put(processDefId, listener);
|
||||
}
|
||||
|
||||
protected synchronized int deloopMqSubscribeTasks() {
|
||||
int count = 0;
|
||||
|
||||
Iterator<Entry<String, AbstractActivityListener>> i = this.activeListeners.entrySet().iterator();
|
||||
while (i.hasNext()) {
|
||||
Entry<String, AbstractActivityListener> listener = i.next();
|
||||
|
||||
this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", listener.getKey());
|
||||
this.services.getRuntimeService().removeEventListener(listener.getValue());
|
||||
|
||||
i.remove();
|
||||
count++;
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
protected synchronized boolean deloopMqSubscribeTask(String processDefId) {
|
||||
AbstractActivityListener listener = this.activeListeners.remove(processDefId);
|
||||
if (listener == null) {
|
||||
this.logger.trace("No MQ subscription listener found; no listener to stop");
|
||||
return false;
|
||||
}
|
||||
|
||||
this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", processDefId);
|
||||
this.services.getRuntimeService().removeEventListener(listener);
|
||||
return true;
|
||||
}
|
||||
|
||||
private BpmnModel getBpmnModel(String processDefId) {
|
||||
if (Context.getProcessDefinitionHelper() == null) {
|
||||
// typically during initial startup
|
||||
return this.services.getRepositoryService().getBpmnModel(processDefId);
|
||||
} else {
|
||||
// we cannot use the RepositoryService to get the BpmnModel as it is not yet cached in TX
|
||||
return ProcessDefinitionUtil.getBpmnModel(processDefId);
|
||||
}
|
||||
}
|
||||
|
||||
protected ServiceTask findMqStartSubscribeTask(String processDefId) {
|
||||
BpmnModel model = this.getBpmnModel(processDefId);
|
||||
StartEvent startElement = this.findMqPlainStartEvent(processDefId, model);
|
||||
if (startElement == null)
|
||||
return null;
|
||||
|
||||
List<SequenceFlow> flows = startElement.getOutgoingFlows();
|
||||
this.logger.trace("Found {} outgoing flows in the process start element: {}: {}", flows.size(), processDefId, startElement.getId());
|
||||
if (flows.isEmpty()) {
|
||||
this.logger.warn("A start event is expected to always have an outgoing flow; skipping process: {}", processDefId);
|
||||
return null;
|
||||
} else if (flows.size() > 1) {
|
||||
this.logger.debug("A start event with multiple outgoing flows cannot have an MQ subscription; skipping process: {}", processDefId);
|
||||
return null;
|
||||
}
|
||||
|
||||
SequenceFlow flow = flows.iterator().next();
|
||||
FlowElement targetFlowElement = flow.getTargetFlowElement();
|
||||
if (targetFlowElement == null) {
|
||||
this.logger.warn("A start event outgoing flow is expected to always have a target element; skipping process: {}", processDefId);
|
||||
return null;
|
||||
}
|
||||
this.logger.trace("Found element '{}' after start event '{}' in process '{}'", targetFlowElement.getId(), startElement.getId(), processDefId);
|
||||
|
||||
if (!(targetFlowElement instanceof ServiceTask)) {
|
||||
this.logger.trace("A non-service task immediately after start event; skipping process: {}: {}: {}", processDefId, targetFlowElement.getId(), targetFlowElement.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
ServiceTask task = (ServiceTask) targetFlowElement;
|
||||
if (!"delegateExpression".equals(task.getImplementationType())) {
|
||||
this.logger.trace("A non-delegate service task immediately after start event; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
|
||||
return null;
|
||||
}
|
||||
this.logger.trace("Found service task immediately after start event: {}: {}", task.getImplementationType(), task.getImplementation());
|
||||
|
||||
Matcher matcher = this.expressionPattern.matcher(task.getImplementation());
|
||||
if (!matcher.find()) {
|
||||
this.logger.warn("A service task delegate expression is in an unexpected format: {}; skipping process: {}: {}: {}", task.getImplementation(), processDefId, task.getId(), task.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
String beanId = matcher.group(1).trim();
|
||||
this.logger.trace("Looking up bean: {}", beanId);
|
||||
|
||||
JavaDelegate delegateBean = this.context.getBean(beanId, JavaDelegate.class);
|
||||
this.logger.trace("Found bean: {}: {}", beanId, delegateBean);
|
||||
if (delegateBean == null) {
|
||||
this.logger.trace("The service task delegate has no bean; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
|
||||
return null;
|
||||
} else if (!(delegateBean instanceof MqSubscribeDelegate)) {
|
||||
this.logger.trace("The service task delegate is not an MQ subscription; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
this.logger.info("Process starts with an MQ subscription: {}: {}", processDefId, task.getId(), task.getName());
|
||||
return task;
|
||||
}
|
||||
|
||||
protected StartEvent findMqPlainStartEvent(String processDefId, BpmnModel model) {
|
||||
this.logger.trace("Finding all start elements in process: {}", processDefId);
|
||||
List<StartEvent> startElements = model.getMainProcess().findFlowElementsOfType(StartEvent.class);
|
||||
this.logger.trace("Found {} start elements in process: {}", startElements.size(), processDefId);
|
||||
for (StartEvent startElement : startElements) {
|
||||
// constrain to just 0..1 plain start events; ignore timer, signal, message, and error start events
|
||||
if (!(startElement.getBehavior() instanceof NoneStartEventActivityBehavior)) {
|
||||
this.logger.trace("Found non-plain start event; ignoring start event: {}: {}: {}", processDefId, startElement.getId(), startElement.getBehavior());
|
||||
continue;
|
||||
}
|
||||
|
||||
return startElement;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
protected ServiceTask castToServiceTask(FlowElement flowElement) {
|
||||
if (!(flowElement instanceof ServiceTask))
|
||||
return null;
|
||||
return (ServiceTask) flowElement;
|
||||
}
|
||||
|
||||
}
|
@@ -5,13 +5,16 @@ import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
public interface MqCommunicator {
|
||||
|
||||
boolean validateConnection();
|
||||
|
||||
<BodyType> PreparedMessage<BodyType> createPreparedMessage();
|
||||
|
||||
<BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException;
|
||||
<BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException;
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, null, null, null);
|
||||
|
@@ -67,11 +67,19 @@ public class MqDelegateExecution {
|
||||
varName += "_" + this.getMessageNameFromModel();
|
||||
return varName;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Always pull from the process scope.
|
||||
* @param correlationId
|
||||
*/
|
||||
public String getCorrelationId() {
|
||||
return this.execution.getVariable(this.formulateVariableName(Constants.VARIABLE_CORRELATION_ID), String.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Always set at process scope.
|
||||
* @param correlationId
|
||||
*/
|
||||
public void setCorrelationId(String correlationId) {
|
||||
this.execution.setVariable(this.formulateVariableName(Constants.VARIABLE_CORRELATION_ID), correlationId);
|
||||
}
|
||||
|
@@ -0,0 +1,85 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.activiti.engine.delegate.event.ActivitiEventType;
|
||||
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
|
||||
import org.activiti.engine.impl.persistence.entity.Entity;
|
||||
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class MqDeploymentEventListener implements ActivitiEntityEventListener<DeploymentEntity> {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
protected MqSubscribeLooper looper;
|
||||
|
||||
@Autowired
|
||||
protected ProcessDefinitionRegistry registry;
|
||||
|
||||
@Autowired
|
||||
private MqSubscriptionService subscriptionService;
|
||||
|
||||
@Override
|
||||
public boolean ofEntityType(Class<? extends Entity> entityType) {
|
||||
return DeploymentEntity.class.isAssignableFrom(entityType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean ofEntityType(Entity entity) {
|
||||
return entity instanceof DeploymentEntity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId,
|
||||
String executionId, DeploymentEntity entity) {
|
||||
this.logger.trace("Triggered by deployment event: {}", entity);
|
||||
|
||||
switch (eventType) {
|
||||
case ENTITY_INITIALIZED:
|
||||
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
|
||||
// we must use DeploymentEntity and then dig down for the process definitions
|
||||
this.onDeploymentAddEvent((DeploymentEntity) entity);
|
||||
break;
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
protected void onDeploymentAddEvent(DeploymentEntity entity) {
|
||||
this.logger.debug("Triggered by deployment addition: {}", entity);
|
||||
|
||||
List<ProcessDefinitionEntity> procDefEntities = entity.getDeployedArtifacts(ProcessDefinitionEntity.class);
|
||||
if (procDefEntities == null)
|
||||
return;
|
||||
this.logger.debug("Found {} process definitions in deployment: {}", procDefEntities.size(), entity.getId());
|
||||
|
||||
for (ProcessDefinitionEntity procDefEntity : procDefEntities) {
|
||||
this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName());
|
||||
|
||||
this.unsubscribeOtherMqSubscribeTasks(procDefEntity.getId());
|
||||
|
||||
if (this.registry.isMqStart(procDefEntity.getId()))
|
||||
this.looper.loop(procDefEntity.getId());
|
||||
}
|
||||
}
|
||||
|
||||
protected void unsubscribeOtherMqSubscribeTasks(String procDefId) {
|
||||
try {
|
||||
Set<String> executionIds = this.subscriptionService.cancelAllOtherVersions(procDefId);
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug("Subscription starter executions ended early: {}: {}", procDefId, executionIds);
|
||||
} else {
|
||||
this.logger.info("Subscriptions starter ended early: {}: {}", procDefId, executionIds.size());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -8,6 +8,9 @@ import java.util.Set;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
import org.activiti.engine.repository.ProcessDefinition;
|
||||
import org.activiti.engine.runtime.Execution;
|
||||
import org.activiti.engine.runtime.Job;
|
||||
import org.activiti.engine.runtime.ProcessInstance;
|
||||
import org.apache.commons.collections4.MultiValuedMap;
|
||||
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
@@ -26,13 +29,13 @@ public class MqExecutionService {
|
||||
|
||||
/**
|
||||
* The size of the keys is limited to the number of process definitions
|
||||
* series are defined. This excludes versions. It would actually only
|
||||
* contain ones with an MQ subscribe task. It would never be a significant
|
||||
* memory hog.
|
||||
* series defined. This excludes versions. It would actually only contain
|
||||
* ones with an MQ subscribe task. It would never be a significant memory
|
||||
* hog.
|
||||
*
|
||||
* The size of the values is limited to the number of active process
|
||||
* definition versions exist. So it would never be a significant memory
|
||||
* hog.
|
||||
* definition versions that exist. So it would never be a significant
|
||||
* memory hog.
|
||||
*
|
||||
* The size of the keys/values have nothing to do with the number of
|
||||
* process instances or executions.
|
||||
@@ -81,8 +84,10 @@ public class MqExecutionService {
|
||||
private MultiValuedMap<Pair<String, String>, String> activityExecutionMap = new HashSetValuedHashMap<>();
|
||||
|
||||
public synchronized void executing(DelegateExecution execution) {
|
||||
ProcessDefinition procDef = this.services.getRepositoryService().createProcessDefinitionQuery().processDefinitionId(execution.getProcessDefinitionId()).singleResult();
|
||||
this.processDefinitionKeyMap.put(procDef.getKey(), execution.getProcessDefinitionId());
|
||||
|
||||
Pair<String, String> key = this.toKey(execution);
|
||||
this.processDefinitionKeyMap.put(execution.getProcessDefinitionId(), key.getLeft());
|
||||
this.processDefinitionActivityMap.put(key.getLeft(), key.getRight());
|
||||
this.activityExecutionMap.put(key, execution.getId());
|
||||
}
|
||||
@@ -92,33 +97,134 @@ public class MqExecutionService {
|
||||
this.activityExecutionMap.removeMapping(key, execution.getId());
|
||||
}
|
||||
|
||||
public synchronized final boolean cancelled(String executionId) {
|
||||
Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult();
|
||||
return this.cancelled(execution);
|
||||
}
|
||||
|
||||
public synchronized boolean cancelled(Execution execution) {
|
||||
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
|
||||
if (execution.getActivityId() != null) {
|
||||
Pair<String, String> key = this.toKey(pi.getProcessDefinitionId(), execution.getActivityId());
|
||||
return this.activityExecutionMap.removeMapping(key, execution.getId());
|
||||
} else {
|
||||
this.logger.trace("No activity discovered, so checking all activities in the process definition: {}: {}", pi.getProcessDefinitionId(), execution.getId());
|
||||
Collection<String> activityIds = this.processDefinitionActivityMap.get(pi.getProcessDefinitionId());
|
||||
boolean removed = false;
|
||||
|
||||
for (String activityId : activityIds) {
|
||||
Pair<String, String> key = this.toKey(pi.getProcessDefinitionId(), activityId);
|
||||
removed = this.activityExecutionMap.removeMapping(key, execution.getId()) || removed;
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized final boolean cancelledJob(String jobId) {
|
||||
Job job = this.services.getManagementService().createJobQuery().jobId(jobId).singleResult();
|
||||
return this.cancelledJob(job);
|
||||
}
|
||||
|
||||
public synchronized final boolean cancelledJob(Job job) {
|
||||
return this.cancelled(job.getExecutionId());
|
||||
}
|
||||
|
||||
/**
|
||||
* This method cancels all the executions active on the specified process
|
||||
* definition.
|
||||
*
|
||||
* @param executionId An execution unique identifier.
|
||||
* @return `true` if execution was cached; `false` otherwise.
|
||||
*/
|
||||
public boolean cancel(String executionId) throws Exception {
|
||||
this.logger.trace("cancel({})", executionId);
|
||||
Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult();
|
||||
if (execution == null) {
|
||||
this.logger.debug("No execution to cancel: {}", executionId);
|
||||
return false;
|
||||
}
|
||||
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
|
||||
if (pi == null) {
|
||||
this.logger.debug("No process instance to cancel: {}", executionId);
|
||||
return false;
|
||||
}
|
||||
|
||||
return this.cancel(pi.getProcessDefinitionId(), executionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method cancels all the executions active on the specified process
|
||||
* definition.
|
||||
*
|
||||
* @param processDefinitionId A process definition unique identifier.
|
||||
* @return A set of execution identifiers that were cancelled.
|
||||
*/
|
||||
public Set<String> cancelAll(String processDefinitionId) throws Exception {
|
||||
this.logger.trace("cancelAll({})", processDefinitionId);
|
||||
ProcessDefinition processDefinition = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
|
||||
String processDefinitionKey = processDefinition.getKey();
|
||||
|
||||
return this.cancelAll(processDefinitionId, processDefinitionKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
|
||||
* @return A set of execution identifiers that were in the now cleared map.
|
||||
*/
|
||||
public synchronized Set<String> clearOtherVersions(String latestProcessDefinitionId) throws Exception {
|
||||
public synchronized Set<String> cancelAllOtherVersions(String latestProcessDefinitionId) throws Exception {
|
||||
this.logger.trace("cancelAllOtherVersions({})", latestProcessDefinitionId);
|
||||
ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId);
|
||||
if (latestProcessDefinition == null)
|
||||
return Collections.emptySet();
|
||||
String processDefinitionKey = latestProcessDefinition.getKey();
|
||||
|
||||
Set<String> executionIds = new HashSet<>();
|
||||
|
||||
Collection<String> processDefinitionIds = this.processDefinitionKeyMap.get(processDefinitionKey);
|
||||
for (String processDefinitionId : processDefinitionIds) {
|
||||
this.processDefinitionKeyMap.removeMapping(processDefinitionKey, processDefinitionId);
|
||||
|
||||
Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId);
|
||||
if (activityIds == null) {
|
||||
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
for (String activityId : activityIds) {
|
||||
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
|
||||
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
|
||||
Collection<String> activityExecutionIds = this.activityExecutionMap.remove(key);
|
||||
if (activityExecutionIds != null)
|
||||
executionIds.addAll(activityExecutionIds);
|
||||
}
|
||||
for (String processDefinitionId : processDefinitionIds)
|
||||
executionIds.addAll(this.cancelAll(processDefinitionId, processDefinitionKey));
|
||||
|
||||
return executionIds;
|
||||
}
|
||||
|
||||
private synchronized boolean cancel(String processDefinitionId, String executionId) throws Exception {
|
||||
this.logger.trace("Cancelling execution: {}: {}", processDefinitionId, executionId);
|
||||
|
||||
Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId);
|
||||
if (activityIds == null) {
|
||||
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
|
||||
return false;
|
||||
}
|
||||
|
||||
for (String activityId : activityIds) {
|
||||
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
|
||||
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
|
||||
Collection<String> activityExecutionIds = this.activityExecutionMap.get(key);
|
||||
if (activityExecutionIds.remove(executionId))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private synchronized Set<String> cancelAll(String processDefinitionId, String processDefinitionKey) throws Exception {
|
||||
Set<String> executionIds = new HashSet<>();
|
||||
|
||||
this.processDefinitionKeyMap.removeMapping(processDefinitionKey, processDefinitionId);
|
||||
|
||||
Collection<String> activityIds = this.processDefinitionActivityMap.remove(processDefinitionId);
|
||||
if (activityIds == null) {
|
||||
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
for (String activityId : activityIds) {
|
||||
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
|
||||
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
|
||||
Collection<String> activityExecutionIds = this.activityExecutionMap.remove(key);
|
||||
if (activityExecutionIds != null)
|
||||
executionIds.addAll(activityExecutionIds);
|
||||
}
|
||||
|
||||
return executionIds;
|
||||
|
@@ -0,0 +1,56 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import org.activiti.engine.delegate.event.ActivitiEventType;
|
||||
import org.activiti.engine.impl.persistence.entity.Entity;
|
||||
import org.activiti.engine.impl.persistence.entity.JobEntity;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* This needs to be changed to detect just process versioning (deactivation?)
|
||||
*/
|
||||
@Component
|
||||
public class MqJobEventListener implements ActivitiEntityEventListener<JobEntity> {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
protected MqSubscriptionService subscriptionService;
|
||||
|
||||
@Override
|
||||
public boolean ofEntityType(Class<? extends Entity> entityType) {
|
||||
return JobEntity.class.isAssignableFrom(entityType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean ofEntityType(Entity entity) {
|
||||
return entity instanceof JobEntity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId,
|
||||
String executionId, JobEntity entity) {
|
||||
this.logger.trace("Triggered by job event: {}", entity);
|
||||
|
||||
// switch (eventType) {
|
||||
// case ENTITY_DELETED:
|
||||
// case ENTITY_SUSPENDED:
|
||||
// this.onJobRemoveEvent(entity);
|
||||
// break;
|
||||
// default:
|
||||
// }
|
||||
}
|
||||
|
||||
protected void onJobRemoveEvent(JobEntity entity) {
|
||||
this.logger.trace("Triggered by job removal: {}", entity);
|
||||
try {
|
||||
this.subscriptionService.cancel(entity.getExecutionId());
|
||||
this.logger.debug("Subscription execution ended due to job removal: job: {}: exec: {}", entity.getId(), entity.getExecutionId());
|
||||
} catch (Exception e) {
|
||||
this.logger.error("Subscription execution failed to be canceled after job removal: job: " + entity.getId() + ": exec: " + entity.getExecutionId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,124 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.activiti.engine.ActivitiObjectNotFoundException;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventType;
|
||||
import org.activiti.engine.impl.persistence.entity.Entity;
|
||||
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class MqProcessDefinitionEventListener implements ActivitiEntityEventListener<ProcessDefinitionEntity> {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
protected MqSubscribeLooper looper;
|
||||
|
||||
@Autowired
|
||||
protected ProcessDefinitionRegistry registry;
|
||||
|
||||
@Autowired
|
||||
protected MqSubscriptionService subscriptionService;
|
||||
|
||||
@Override
|
||||
public boolean ofEntityType(Class<? extends Entity> entityType) {
|
||||
return ProcessDefinitionEntity.class.isAssignableFrom(entityType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean ofEntityType(Entity entity) {
|
||||
return entity instanceof ProcessDefinitionEntity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationStartup() {
|
||||
this.looper.deloopAllMqProcessDefinitions();
|
||||
this.looper.loopAllMqProcessDefinitions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationShutdown() {
|
||||
this.looper.deloopAllMqProcessDefinitions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEntityEvent(ActivitiEventType eventType, String processDefinitionId, String processInstanceId,
|
||||
String executionId, ProcessDefinitionEntity entity) {
|
||||
this.logger.trace("Triggered by process definition event: {}", entity);
|
||||
|
||||
switch (eventType) {
|
||||
case ENTITY_CREATED:
|
||||
case ENTITY_INITIALIZED:
|
||||
// we cannot use the ProcessDefinitionEntity for ENTITY_CREATED or ENTITY_INITIALIZED as we need the BpmnModel and it is not yet cached
|
||||
// we must use DeploymentEntity and then dig down for the process definitions
|
||||
break;
|
||||
case ENTITY_DELETED:
|
||||
case ENTITY_SUSPENDED:
|
||||
// we need to stop the listener
|
||||
this.onProcessDefinitionRemoveEvent((ProcessDefinitionEntity) entity);
|
||||
break;
|
||||
default:
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
|
||||
this.logger.debug("Triggered by process definition addition: {}", entity);
|
||||
|
||||
try {
|
||||
// cancel subscriptions on all legacy version process definitions
|
||||
this.unsubscribeOtherMqSubscribeTasks(entity.getId());
|
||||
|
||||
if (this.registry.isMqStart(entity.getId()))
|
||||
this.looper.loop(entity.getId());
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
this.logger.debug("Added process definition could not be found; because of orders of operation or transaction isolation: {}: {}", entity.getId(), aonfe.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
|
||||
this.logger.debug("Triggered by process definition removal: {}", entity);
|
||||
|
||||
// cancel subscriptions on this process definition
|
||||
this.unsubscribeMqSubscribeTasks(entity.getId());
|
||||
|
||||
if (this.registry.isMqStart(entity.getId()))
|
||||
this.looper.deloop(entity.getId());
|
||||
}
|
||||
|
||||
protected void unsubscribeMqSubscribeTasks(String procDefId) {
|
||||
try {
|
||||
Set<String> executionIds = this.subscriptionService.cancelAll(procDefId);
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
|
||||
} else {
|
||||
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
|
||||
}
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
throw aonfe;
|
||||
} catch (Exception e) {
|
||||
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void unsubscribeOtherMqSubscribeTasks(String procDefId) {
|
||||
try {
|
||||
Set<String> executionIds = this.subscriptionService.cancelAllOtherVersions(procDefId);
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
|
||||
} else {
|
||||
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
|
||||
}
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
throw aonfe;
|
||||
} catch (Exception e) {
|
||||
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -13,9 +13,21 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
/**
|
||||
* This delegate sends a message to an MQ queue.
|
||||
*
|
||||
* The process instance will continue after a successful publish of the message
|
||||
* by this delegate to the specified queue. Use `mqSubscribeReplyDelegate` in
|
||||
* a subsequent task to handle the response asynchronously.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this and the corresponding
|
||||
* `mqSubscribeReplyDelegate` task. The resultant variables will then include
|
||||
* that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
*
|
||||
* @author brian@inteligr8.com
|
||||
* @since 1.0
|
||||
* @see mqSubscribeReplyDelegate#
|
||||
@@ -34,27 +46,21 @@ public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
private MqServiceTaskService msts;
|
||||
|
||||
/**
|
||||
* This method sends a message to an MQ queue.
|
||||
*
|
||||
* It does not wait for a response and will continue after a successful
|
||||
* publish of the message to the specified queue. Use
|
||||
* `mqSubscribeReplyDelegate` in a subsequent task to handle the response
|
||||
* asynchronously.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this and the corresponding `mqPublishDelegate`
|
||||
* task.
|
||||
* This method makes this bean an Activiti delegate.
|
||||
*
|
||||
* @param execution An Activiti delegate execution (source task or execution/task listener).
|
||||
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_priority [optional] A priority of the MQ message. May be an expression. Value depends on MQ protocol.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set the `mq_messageId` result variable in the process instance scope; otherwise the variable will be local to the task. `mq_correlationId` is always at the process scope.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_priority [optional] A priority of the MQ message. May be an expression. Supported range depends on MQ protocol.
|
||||
* @field mq_payload [optional] The body of the MQ message. May include expressions.
|
||||
* @field mq_payloadMimeType [optional] The MIME type of the body of the MQ message.
|
||||
* @field mq_payloadMimeType [optional] The MIME type of the `mq_payload`.
|
||||
* @field mq_replyQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send a reply.
|
||||
* @field mq_statusQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send status updates.
|
||||
* @varIn mq_correlationId [optional] The correlationId of the message to send.
|
||||
* @varOut mq_correlationId The correlationId of the message sent.
|
||||
* @varOut mq_messageId The messageId of the message sent.
|
||||
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
|
||||
* @throws BPMNError network The MQ connection experienced network issues.
|
||||
* @throws BPMNError mq An unknown MQ issue occurred.
|
||||
@@ -73,6 +79,8 @@ public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
|
||||
|
||||
PreparedMessage<String> message = communicator.createPreparedMessage();
|
||||
if (mqExecution.getCorrelationId() != null)
|
||||
message.setCorrelationId(mqExecution.getCorrelationId());
|
||||
if (mqExecution.getStatusQueueNameFromModel() != null)
|
||||
message.setProperty("inteligr8.statusQueueName", mqExecution.getStatusQueueNameFromModel());
|
||||
message.setReplyToQueueName(mqExecution.getReplyQueueNameFromModel());
|
||||
@@ -88,10 +96,11 @@ public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
}
|
||||
}
|
||||
|
||||
String correlationId = communicator.send(destination, message);
|
||||
this.logger.debug("Sent MQ message: {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), correlationId);
|
||||
DeliveredMessage<?> deliveredMessage = communicator.send(destination, message);
|
||||
this.logger.debug("Sent MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), deliveredMessage.getMessageId(), deliveredMessage.getCorrelationId());
|
||||
|
||||
mqExecution.setCorrelationId(correlationId);
|
||||
mqExecution.setMessageId(deliveredMessage.getMessageId());
|
||||
mqExecution.setCorrelationId(deliveredMessage.getCorrelationId());
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
|
@@ -70,26 +70,62 @@ public class MqServiceTask {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type, VariableScope varscope, boolean forceExpressionProcessing) {
|
||||
Object value = this.getFieldValueFromModel(fieldName, varscope, forceExpressionProcessing);
|
||||
|
||||
if (value == null) {
|
||||
return null;
|
||||
} else if (String.class.isAssignableFrom(type)) {
|
||||
if (value instanceof String) {
|
||||
return (T) value;
|
||||
} else {
|
||||
return (T) value.toString();
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
Method method = type.getMethod("valueOf", value.getClass());
|
||||
return (T) method.invoke(null, value);
|
||||
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
|
||||
String strvalue;
|
||||
if (value instanceof String) {
|
||||
strvalue = (String) value;
|
||||
} else {
|
||||
strvalue = value.toString();
|
||||
}
|
||||
|
||||
try {
|
||||
Method method = type.getMethod("valueOf", String.class);
|
||||
return (T) method.invoke(null, strvalue);
|
||||
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e2) {
|
||||
throw new IllegalArgumentException("The target type '" + type + "' has no 'valueOf' method for String or type: " + value.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Object getFieldValueFromModel(String fieldName, VariableScope varscope, boolean forceExpressionProcessing) {
|
||||
FieldExtension field = this.fieldMap.get(fieldName);
|
||||
if (field == null) {
|
||||
return null;
|
||||
} else if (field.getExpression() != null && field.getExpression().length() > 0) {
|
||||
this.logger.trace("Field value is recognized as an expression by the Activity framework: {}: {}", fieldName, field.getExpression());
|
||||
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||
Expression expr = exprman.createExpression(field.getExpression());
|
||||
return (T) expr.getValue(varscope);
|
||||
return expr.getValue(varscope);
|
||||
} else if (field.getStringValue() == null) {
|
||||
this.logger.trace("Field value is null: {}", fieldName);
|
||||
return null;
|
||||
} else if (forceExpressionProcessing) {
|
||||
this.logger.trace("Field value will be processed as potentially having expression(s): {}", fieldName);
|
||||
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||
Expression expr = exprman.createExpression(field.getStringValue());
|
||||
return (T) expr.getValue(varscope);
|
||||
} else if (String.class.isAssignableFrom(type)) {
|
||||
return (T) field.getStringValue();
|
||||
return expr.getValue(varscope);
|
||||
} else if (field.getStringValue().startsWith("${") && field.getStringValue().endsWith("}")) {
|
||||
this.logger.trace("Field value is recognized as an expression by the MQ extension: {}: {}", fieldName, field.getStringValue());
|
||||
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||
Expression expr = exprman.createExpression(field.getStringValue());
|
||||
return expr.getValue(varscope);
|
||||
} else {
|
||||
try {
|
||||
Method method = type.getMethod("valueOf", String.class);
|
||||
return (T) method.invoke(null, field.getStringValue());
|
||||
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
return field.getStringValue();
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -13,9 +13,30 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
/**
|
||||
* This delegate listens for messages on an MQ queue.
|
||||
*
|
||||
* This is expected to exist in a Service Task immediately after a plain start
|
||||
* activity. This will cause process instances to automatically be created in
|
||||
* order to maintain the MQ subscription as messages are received. If used in
|
||||
* any other way, it will error and the process will fail validation.
|
||||
*
|
||||
* This does not wait for a response to a specific message, but instead to all
|
||||
* messages put on an MQ queue. That is for the `mqSubscribeReplyDelegate`
|
||||
* delegate.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this task. The resultant variables will then
|
||||
* include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
*
|
||||
* This requires a long running connection to MQ. It runs in a long running
|
||||
* Activiti job/execution. If there is a failure or the server is restarted,
|
||||
* the Activiti job will fail and automatically retry per the Activiti standard
|
||||
* features. After exhausting retries, it may eventually dead-letter. Retry
|
||||
* the job to continue the subscription.
|
||||
*
|
||||
* @author brian@inteligr8.com
|
||||
* @since 1.0
|
||||
* @see mqPublishDelegate#
|
||||
@@ -37,35 +58,22 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
private MqSubscriptionService subscriptionService;
|
||||
|
||||
/**
|
||||
* This delegate listens for messages on an MQ queue.
|
||||
*
|
||||
* It does not wait for a response to a specific message put on an MQ queue.
|
||||
* That is for the `mqPublishDelegate` and `mqSubscribeReplyDelegate` tasks.
|
||||
*
|
||||
* When used, this task must be the first task after a plain start event in a
|
||||
* process. If used in any other way, it will error and the process will fail
|
||||
* validation.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this task. The resultant variables will then
|
||||
* include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
*
|
||||
* TODO map response to variables
|
||||
* This method makes this bean an Activiti delegate.
|
||||
*
|
||||
* @param execution An Activiti delegate execution (source task or execution/task listener).
|
||||
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_concurrency The number of process instances to simultaneously listen on the queue. Only positive numbers are accepted.
|
||||
* @varOut mq_correlationId The correlating identifier of the message received. Use to correlate between sent/received messages; like a thread of communication.
|
||||
* @varOut mq_messageId The unique message identifer of the message received.
|
||||
* @varOut mq_deliveryTime The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti.
|
||||
* @varOut mq_priority An integer priority of the message; value depends on MQ protocol.
|
||||
* @varOut mq_payload The body of the MQ message. May include expressions. May be `null`.
|
||||
* @varOut mq_payloadMimeType The MIME type of the body of the MQ message. May be `null`.
|
||||
* @varOut mq_priority An integer priority of the message; supported range depends on MQ protocol.
|
||||
* @varOut mq_payload The body of the MQ message; may be `null`.
|
||||
* @varOut mq_payloadMimeType The MIME type of `mq_payload`; may be `null`.
|
||||
* @varOut mq_replyQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) where status updates on the received message are expected to be sent. The status message must have the same correlating identifier. May be `null`.
|
||||
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
|
||||
* @throws BPMNError network The MQ connection experienced network issues.
|
||||
* @throws BPMNError mq An unknown MQ issue occurred.
|
||||
@@ -84,11 +92,11 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
|
||||
try {
|
||||
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
|
||||
communicator.receive(destination,
|
||||
if (communicator.receive(destination,
|
||||
new MqSubscriptionListener() {
|
||||
@Override
|
||||
public void consuming(AutoCloseable consumerCloseable) {
|
||||
subscriptionService.consuming(execution, consumerCloseable);
|
||||
subscriptionService.autoconsuming(execution, consumerCloseable);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -111,7 +119,10 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
mqExecution.setStatusQueueName(message.getStatusQueueName());
|
||||
}
|
||||
}
|
||||
);
|
||||
) == null) {
|
||||
this.logger.debug("Gracefully stopped looking for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
|
||||
this.services.getRuntimeService().deleteProcessInstance(execution.getProcessInstanceId(), "MQ subscription cancelled gracefully");
|
||||
}
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
@@ -123,10 +134,5 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
throw new BpmnError("mq", "JMS communication failed: " + je.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getConcurrency(DelegateExecution execution) {
|
||||
MqServiceTask task = this.msts.get(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
|
||||
return task == null ? 1 : task.getFieldValueFromModel(Constants.FIELD_CONCURRENCY, Integer.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
274
src/main/java/com/inteligr8/activiti/mq/MqSubscribeLooper.java
Normal file
274
src/main/java/com/inteligr8/activiti/mq/MqSubscribeLooper.java
Normal file
@@ -0,0 +1,274 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import org.activiti.bpmn.model.FieldExtension;
|
||||
import org.activiti.bpmn.model.FlowElement;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.event.ActivitiActivityEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventType;
|
||||
import org.activiti.engine.repository.ProcessDefinition;
|
||||
import org.activiti.engine.repository.ProcessDefinitionQuery;
|
||||
import org.activiti.engine.runtime.Execution;
|
||||
import org.activiti.engine.runtime.ExecutionQuery;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.activiti.domain.idm.Tenant;
|
||||
|
||||
@Component
|
||||
public class MqSubscribeLooper {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
protected ProcessEngine services;
|
||||
|
||||
@Autowired
|
||||
protected ProcessDefinitionRegistry registry;
|
||||
|
||||
@Autowired
|
||||
protected TenantFinderService tenantFinderService;
|
||||
|
||||
@Autowired
|
||||
protected MqSubscriptionService subscriptionService;
|
||||
|
||||
protected Map<String, AbstractActivityListener> activeListeners = new HashMap<>();
|
||||
|
||||
/**
|
||||
* This method starts the listening of activity completions on all the MQ
|
||||
* subscribe tasks. This will start the infinite loop where new process
|
||||
* instances are started every time another process instance receives a
|
||||
* message.
|
||||
*/
|
||||
public void loopAllMqProcessDefinitions() {
|
||||
String tenantId = this.findTenantId();
|
||||
List<ProcessDefinition> procDefs = this.findLatestActiveProcessDefinnitions(tenantId);
|
||||
this.logger.debug("Found {} active process definitions", procDefs.size());
|
||||
|
||||
for (ProcessDefinition procDef : procDefs) {
|
||||
this.logger.trace("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId());
|
||||
|
||||
if (this.registry.isMqStart(procDef.getId()))
|
||||
this.loop(tenantId, procDef.getId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method starts the listening of activity completions on the MQ
|
||||
* subscribe task in the specified process definition. This will start the
|
||||
* infinite loop where new process instances are started every time another
|
||||
* process instance receives a message.
|
||||
*
|
||||
* @param processDefinitionId The unique identifier of the process definition to loop.
|
||||
*/
|
||||
public void loop(String processDefinitionId) {
|
||||
String tenantId = this.findTenantId();
|
||||
this.loop(tenantId, processDefinitionId);
|
||||
}
|
||||
|
||||
private void loop(String tenantId, String processDefinitionId) {
|
||||
ProcessDefinition procDef = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
|
||||
if (procDef == null)
|
||||
throw new IllegalArgumentException("The process definition does not exist: " + processDefinitionId);
|
||||
if (procDef.isSuspended())
|
||||
throw new IllegalStateException("The process definition is suspended: " + processDefinitionId);
|
||||
|
||||
ServiceTask task = this.registry.findMqStartSubscribeTask(processDefinitionId);
|
||||
if (task == null)
|
||||
throw new IllegalArgumentException("The process definition does not qualify for MQ subscription looping: " + processDefinitionId);
|
||||
|
||||
int concurrency = this.determineConcurrency(task);
|
||||
this.logger.debug("Process definition MQ subscription task is configured for concurrency: {}: {}", processDefinitionId, concurrency);
|
||||
|
||||
List<Execution> execs = this.findExecutionsByServiceTask(tenantId, processDefinitionId, task);
|
||||
this.logger.debug("Process appears to have {} executions waiting on the MQ subscription: {}", execs.size(), processDefinitionId);
|
||||
|
||||
if (execs.size() < concurrency) {
|
||||
this.logger.info("Process has {} too few executions waiting on the MQ subscription; starting them: {}", (concurrency - execs.size()), processDefinitionId);
|
||||
this.loop(tenantId, processDefinitionId, task, concurrency, execs);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void loop(String tenantId, String processDefinitionId, ServiceTask task, int concurrency, List<Execution> execs) {
|
||||
String key = processDefinitionId + "|" + task.getId();
|
||||
if (this.activeListeners.containsKey(key))
|
||||
// one listener, no matter how many instances are subscribed
|
||||
return;
|
||||
|
||||
AbstractActivityListener listener = new AbstractActivityListener(processDefinitionId, task) {
|
||||
@Override
|
||||
public void onEvent(ActivitiActivityEvent event) {
|
||||
logger.debug("MQ message received; starting another process instance on process '{}' to re-subscribe to the MQ queue", event.getProcessDefinitionId());
|
||||
services.getRuntimeService().startProcessInstanceById(event.getProcessDefinitionId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFailOnException() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
// the ServiceTask will then execute, waiting for a message on the queue
|
||||
// when a message is received, it will be responsible for starting a new process instance
|
||||
// that new process instance will wait for the next message on the queue
|
||||
// repeat
|
||||
this.services.getRuntimeService().addEventListener(listener, ActivitiEventType.ACTIVITY_COMPLETED);
|
||||
|
||||
// register the listener so we can possibly remove it later
|
||||
this.activeListeners.put(key, listener);
|
||||
|
||||
for (int thread = execs.size(); thread < concurrency; thread++) {
|
||||
// start a process instance on the process
|
||||
this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue/topic", processDefinitionId);
|
||||
this.services.getRuntimeService().startProcessInstanceById(processDefinitionId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method stops the listening of activity completions. This will end
|
||||
* the infinite loop where new process instances are started every time
|
||||
* another process instance receives a message.
|
||||
*
|
||||
* It will also attempt to cancel/close any active MQ subscription. If
|
||||
* they are already closed, it will be skipped. The `executionId` of
|
||||
* successfully cancelled subscriptions will be returned.
|
||||
*
|
||||
* @return A set of unique identifiers for Activiti executions that were successfully cancelled.
|
||||
*/
|
||||
public synchronized Set<String> deloopAllMqProcessDefinitions() {
|
||||
Set<String> executionIds = new HashSet<>();
|
||||
|
||||
Iterator<Entry<String, AbstractActivityListener>> i = this.activeListeners.entrySet().iterator();
|
||||
while (i.hasNext()) {
|
||||
Entry<String, AbstractActivityListener> listener = i.next();
|
||||
|
||||
String[] splitkey = listener.getKey().split("\\|");
|
||||
String processDefinitionId = splitkey[0];
|
||||
String activityId = splitkey[1];
|
||||
|
||||
executionIds.addAll(this.deloop(processDefinitionId, activityId, listener.getValue()));
|
||||
|
||||
i.remove();
|
||||
}
|
||||
|
||||
return executionIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method stops the listening of activity completions on the MQ
|
||||
* subscribe task in the specified process definition. This will end the
|
||||
* infinite loop where new process instances are started every time another
|
||||
* process instance receives a message.
|
||||
*
|
||||
* @param processDefinitionId The unique identifier of the process definition to loop.
|
||||
* @return A set of unique identifiers for Activiti executions that were successfully cancelled.
|
||||
*/
|
||||
public synchronized Set<String> deloop(String processDefinitionId) {
|
||||
ServiceTask task = this.registry.findMqStartSubscribeTask(processDefinitionId);
|
||||
if (task == null)
|
||||
throw new IllegalArgumentException();
|
||||
|
||||
String key = processDefinitionId + "|" + task.getId();
|
||||
AbstractActivityListener listener = this.activeListeners.remove(key);
|
||||
return this.deloop(processDefinitionId, task.getId(), listener);
|
||||
}
|
||||
|
||||
private Set<String> deloop(String processDefinitionId, String activityId, AbstractActivityListener listener) {
|
||||
if (listener == null) {
|
||||
this.logger.trace("No MQ subscription listener was found; no listener to stop");
|
||||
} else {
|
||||
this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", processDefinitionId);
|
||||
this.services.getRuntimeService().removeEventListener(listener);
|
||||
}
|
||||
|
||||
Set<String> executionIds = new HashSet<>();
|
||||
|
||||
List<Execution> executions = this.services.getRuntimeService().createExecutionQuery()
|
||||
.processDefinitionId(processDefinitionId)
|
||||
.activityId(activityId)
|
||||
.list();
|
||||
for (Execution execution : executions) {
|
||||
this.logger.debug("An MQ subscription was found; stopping execution: {}: {}: {}", processDefinitionId, activityId, execution.getId());
|
||||
if (this.subscriptionService.cancelled(execution)) {
|
||||
this.logger.info("An MQ subscription was closed: {}", execution.getId());
|
||||
executionIds.add(execution.getId());
|
||||
} else {
|
||||
this.logger.debug("An MQ subscription was already closed: {}", execution.getId());
|
||||
}
|
||||
}
|
||||
|
||||
return executionIds;
|
||||
}
|
||||
|
||||
protected String findTenantId() {
|
||||
Tenant tenant = this.tenantFinderService.findTenant();
|
||||
return this.tenantFinderService.transform(tenant);
|
||||
}
|
||||
|
||||
protected List<ProcessDefinition> findLatestActiveProcessDefinnitions(String tenantId) {
|
||||
ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery()
|
||||
.latestVersion()
|
||||
.active();
|
||||
if (tenantId == null) {
|
||||
procDefQuery.processDefinitionWithoutTenantId();
|
||||
} else {
|
||||
procDefQuery.processDefinitionTenantId(tenantId);
|
||||
}
|
||||
|
||||
return procDefQuery.list();
|
||||
}
|
||||
|
||||
protected List<Execution> findExecutionsByServiceTask(String tenantId, String processDefinitionId, ServiceTask task) {
|
||||
ExecutionQuery execQuery = this.services.getRuntimeService().createExecutionQuery()
|
||||
.processDefinitionId(processDefinitionId)
|
||||
.activityId(task.getId());
|
||||
if (tenantId == null) {
|
||||
execQuery.executionWithoutTenantId();
|
||||
} else {
|
||||
execQuery.executionTenantId(tenantId);
|
||||
}
|
||||
|
||||
return execQuery.list();
|
||||
}
|
||||
|
||||
protected Integer findConcurrency(ServiceTask task) {
|
||||
for (FieldExtension fieldext : task.getFieldExtensions()) {
|
||||
if (fieldext.getFieldName().equals(Constants.FIELD_CONCURRENCY)) {
|
||||
String concurrencyStr = StringUtils.trimToNull(fieldext.getStringValue());
|
||||
return concurrencyStr == null ? null : Integer.valueOf(concurrencyStr);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
protected int determineConcurrency(ServiceTask task) {
|
||||
Integer concurrency = this.findConcurrency(task);
|
||||
if (concurrency == null) {
|
||||
return 1;
|
||||
} else if (concurrency.intValue() < 1) {
|
||||
this.logger.warn("The task defines an illegal concurrency of {}; using 1: {}", concurrency, task.getId());
|
||||
return 1;
|
||||
} else {
|
||||
return concurrency.intValue();
|
||||
}
|
||||
}
|
||||
|
||||
protected ServiceTask castToServiceTask(FlowElement flowElement) {
|
||||
if (!(flowElement instanceof ServiceTask))
|
||||
return null;
|
||||
return (ServiceTask) flowElement;
|
||||
}
|
||||
|
||||
}
|
@@ -13,8 +13,28 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
/**
|
||||
* This delegate listens for a reply message on an MQ queue.
|
||||
* This method listens for a reply message on an MQ queue.
|
||||
*
|
||||
* The process instance will block until a corresponding reply message is
|
||||
* received. It uses the `mq_correlationId` variable to select the
|
||||
* corresponding reply message. That variable is automatically set by the
|
||||
* `mqPublishDelegate` task. This is meant to be used after
|
||||
* `mqPublishDelegate` and not just by itself. If you want to start processes
|
||||
* with an MQ subscription see the `mqSubscribeDelegate` task.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this and the corresponding `mqPublishDelegate`
|
||||
* task. The resultant variables will then include that message name (e.g.
|
||||
* `mq_messageId_{mq_messageName}`).
|
||||
*
|
||||
* This requires a long running connection to MQ. It runs in a long running
|
||||
* Activiti job/execution. If there is a failure or the server is restarted,
|
||||
* the Activiti job will fail and automatically retry per the Activiti standard
|
||||
* features. After exhausting retries, it may eventually dead-letter. Retry
|
||||
* the job to continue the subscription.
|
||||
*
|
||||
* @author brian@inteligr8.com
|
||||
* @since 1.0
|
||||
@@ -37,33 +57,21 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
|
||||
private MqSubscriptionService subscriptionService;
|
||||
|
||||
/**
|
||||
* This method listens for a reply message on an MQ queue.
|
||||
*
|
||||
* It uses the `mq_correlationId` variable to select the corresponding reply
|
||||
* message. That variable is automatically set by the `mqPublishDelegate`
|
||||
* task. This is meant to be used after `mqPublishDelegate` and not just by
|
||||
* itself. If you want to start processes with an MQ subscription see the
|
||||
* `mqSubscribeDelegate` task.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this and the corresponding `mqPublishDelegate`
|
||||
* task.
|
||||
*
|
||||
* TODO map response to variables
|
||||
* This method makes this bean an Activiti delegate.
|
||||
*
|
||||
* @param execution An Activiti delegate execution (source task or execution/task listener).
|
||||
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process.
|
||||
* @varIn mq_correlationId The correlating identifier of the message to receive. Used to correlate between sent/received messages; like a thread of communication.
|
||||
* @varOut mq_messageId The unique message identifer of the message received.
|
||||
* @varOut mq_deliveryTime The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti.
|
||||
* @varOut mq_priority An integer priority of the message; value depends on MQ protocol.
|
||||
* @varOut mq_payload The body of the MQ message. May include expressions. May be `null`.
|
||||
* @varOut mq_payloadMimeType The MIME type of the body of the MQ message. May be `null`.
|
||||
* @varOut mq_priority An integer priority of the message; supported range depends on MQ protocol.
|
||||
* @varOut mq_payload The body of the MQ message; may be `null`.
|
||||
* @varOut mq_payloadMimeType The MIME type of `mq_payload`; may be `null`.
|
||||
* @varOut mq_replyQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) where status updates on the received message are expected to be sent. The status message must have the same correlating identifier. May be `null`.
|
||||
* @throws BPMNError correlation `mq_correlationId` is required.
|
||||
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
|
||||
* @throws BPMNError network The MQ connection experienced network issues.
|
||||
@@ -84,7 +92,7 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
|
||||
|
||||
try {
|
||||
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
|
||||
communicator.receive(destination, correlationId,
|
||||
if (communicator.receive(destination, correlationId,
|
||||
new MqSubscriptionListener() {
|
||||
@Override
|
||||
public void consuming(AutoCloseable consumerCloseable) {
|
||||
@@ -110,7 +118,10 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
|
||||
mqExecution.setStatusQueueName(message.getStatusQueueName());
|
||||
}
|
||||
}
|
||||
);
|
||||
) == null) {
|
||||
this.logger.debug("Gracefully stopped looking for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
|
||||
throw new BpmnError("cancelled", "MQ subscription cancelled gracefully");
|
||||
}
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
|
@@ -7,6 +7,8 @@ import java.util.Set;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
import org.activiti.engine.repository.ProcessDefinition;
|
||||
import org.activiti.engine.runtime.Execution;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -29,39 +31,124 @@ public class MqSubscriptionService extends MqExecutionService {
|
||||
* also necessary to remove the execution when it is removed from the
|
||||
* `activityExecutionMap` map.
|
||||
*/
|
||||
private Map<String, AutoCloseable> executionSubscriptionMap = new HashMap<>();
|
||||
|
||||
private Map<String, Pair<Boolean, AutoCloseable>> executionSubscriptionMap = new HashMap<>();
|
||||
|
||||
/**
|
||||
* This method registers a consuming execution; an execution that was not
|
||||
* automatically started as part of the framework. This is for reply
|
||||
* subscriptions and not starter subscriptions.
|
||||
*
|
||||
* @param execution An Activiti execution.
|
||||
* @param consumerCloseable A closeable MQ consumer.
|
||||
*/
|
||||
public synchronized void consuming(DelegateExecution execution, AutoCloseable consumerCloseable) {
|
||||
this.executing(execution);
|
||||
this.executionSubscriptionMap.put(execution.getId(), consumerCloseable);
|
||||
this.executionSubscriptionMap.put(execution.getId(), Pair.of(false, consumerCloseable));
|
||||
}
|
||||
|
||||
/**
|
||||
* This method registers an auto-consuming execution; an execution that was
|
||||
* automatically started as part of the framework. This is for starter
|
||||
* subscriptions and not reply subscriptions.
|
||||
*
|
||||
* @param execution An Activiti execution.
|
||||
* @param consumerCloseable A closeable MQ consumer.
|
||||
*/
|
||||
public synchronized void autoconsuming(DelegateExecution execution, AutoCloseable consumerCloseable) {
|
||||
this.executing(execution);
|
||||
this.executionSubscriptionMap.put(execution.getId(), Pair.of(true, consumerCloseable));
|
||||
}
|
||||
|
||||
public synchronized void consumed(DelegateExecution execution, AutoCloseable consumerCloseable) {
|
||||
AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
|
||||
if (cachedConsumerCloseable != consumerCloseable)
|
||||
Pair<Boolean, AutoCloseable> cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
|
||||
if (cachedConsumerCloseable != null && cachedConsumerCloseable.getRight() != consumerCloseable)
|
||||
throw new IllegalStateException("The consumer objects were expected to be identical");
|
||||
|
||||
this.executionSubscriptionMap.remove(execution.getId());
|
||||
this.executed(execution);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean cancelled(Execution execution) {
|
||||
Pair<Boolean, AutoCloseable> cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
|
||||
if (cachedConsumerCloseable == null) {
|
||||
this.logger.trace("An execution was cancelled, but had no registered subscription to close: {}", execution.getId());
|
||||
return false;
|
||||
}
|
||||
|
||||
// this will eventually lead to a call to "consumed() above"
|
||||
try {
|
||||
cachedConsumerCloseable.getRight().close();
|
||||
return super.cancelled(execution);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method cancel the MQ subscription on the specified execution.
|
||||
*
|
||||
* @param executionId An execution unique identifier.
|
||||
* @return `true` if execution and MQ subscription were ended; `false` otherwise.
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean cancel(String executionId) throws Exception {
|
||||
if (!super.cancel(executionId))
|
||||
return false;
|
||||
|
||||
this.logger.trace("Removing MQ subscription execution: {}", executionId);
|
||||
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
|
||||
if (consumer != null) {
|
||||
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}", executionId, consumer.getRight());
|
||||
consumer.getRight().close();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method cancels all the MQ subscriptions active on the specified
|
||||
* process definition.
|
||||
*
|
||||
* @param processDefinitionId A process definition version unique identifier.
|
||||
* @return A set of execution identifiers subscribed to MQ that are now cancelled; all subscriptions now ended.
|
||||
*/
|
||||
@Override
|
||||
public synchronized Set<String> cancelAll(String processDefinitionId) throws Exception {
|
||||
Set<String> executionIds = super.cancelAll(processDefinitionId);
|
||||
|
||||
ProcessDefinition processDefinition = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
|
||||
String processDefinitionKey = processDefinition.getKey();
|
||||
|
||||
for (String executionId : executionIds) {
|
||||
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
|
||||
if (consumer != null && consumer.getLeft()) {
|
||||
this.logger.trace("Removing MQ subscription execution: {}: {}", processDefinitionId, executionId);
|
||||
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer.getRight());
|
||||
consumer.getRight().close();
|
||||
}
|
||||
}
|
||||
|
||||
return executionIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
|
||||
* @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended.
|
||||
*/
|
||||
@Override
|
||||
public synchronized Set<String> clearOtherVersions(String latestProcessDefinitionId) throws Exception {
|
||||
Set<String> executionIds = super.clearOtherVersions(latestProcessDefinitionId);
|
||||
public synchronized Set<String> cancelAllOtherVersions(String latestProcessDefinitionId) throws Exception {
|
||||
Set<String> executionIds = super.cancelAllOtherVersions(latestProcessDefinitionId);
|
||||
|
||||
ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId);
|
||||
String processDefinitionKey = latestProcessDefinition.getKey();
|
||||
|
||||
for (String executionId : executionIds) {
|
||||
this.logger.trace("Clearing process definition execution: {}: {}", latestProcessDefinitionId, executionId);
|
||||
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
|
||||
if (consumer != null) {
|
||||
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer);
|
||||
consumer.close();
|
||||
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
|
||||
if (consumer != null && consumer.getLeft()) {
|
||||
this.logger.trace("Removing MQ subscription execution: {}: {}", latestProcessDefinitionId, executionId);
|
||||
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer.getRight());
|
||||
consumer.getRight().close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -0,0 +1,151 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
import org.activiti.bpmn.model.BpmnModel;
|
||||
import org.activiti.bpmn.model.FlowElement;
|
||||
import org.activiti.bpmn.model.SequenceFlow;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
import org.activiti.bpmn.model.StartEvent;
|
||||
import org.activiti.engine.ActivitiObjectNotFoundException;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.JavaDelegate;
|
||||
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
|
||||
import org.activiti.engine.impl.context.Context;
|
||||
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class ProcessDefinitionRegistry {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
private final Pattern expressionPattern = Pattern.compile("\\$\\{(.+)\\}");
|
||||
|
||||
@Autowired
|
||||
protected ProcessEngine services;
|
||||
|
||||
@Autowired
|
||||
protected ApplicationContext context;
|
||||
|
||||
private Map<String, ServiceTask> processDefinitionMqSubscribeTasks;
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
this.processDefinitionMqSubscribeTasks = new HashMap<>();
|
||||
}
|
||||
|
||||
public synchronized boolean isMqStart(String processDefinitionId) {
|
||||
if (this.processDefinitionMqSubscribeTasks.containsKey(processDefinitionId)) {
|
||||
return this.processDefinitionMqSubscribeTasks.get(processDefinitionId) != null;
|
||||
} else {
|
||||
try {
|
||||
// not yet cached; cache it; then return
|
||||
return this.findMqStartSubscribeTask(processDefinitionId) != null;
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized ServiceTask findMqStartSubscribeTask(String processDefinitionId) {
|
||||
ServiceTask task = this.processDefinitionMqSubscribeTasks.get(processDefinitionId);
|
||||
if (task != null)
|
||||
return task;
|
||||
|
||||
BpmnModel model = this.getBpmnModel(processDefinitionId);
|
||||
StartEvent startElement = this.findMqPlainStartEvent(processDefinitionId, model);
|
||||
if (startElement == null)
|
||||
return null;
|
||||
|
||||
List<SequenceFlow> flows = startElement.getOutgoingFlows();
|
||||
this.logger.trace("Found {} outgoing flows in the process start element: {}: {}", flows.size(), processDefinitionId, startElement.getId());
|
||||
if (flows.isEmpty()) {
|
||||
this.logger.warn("A start event is expected to always have an outgoing flow; skipping process: {}", processDefinitionId);
|
||||
return null;
|
||||
} else if (flows.size() > 1) {
|
||||
this.logger.debug("A start event with multiple outgoing flows cannot have an MQ subscription; skipping process: {}", processDefinitionId);
|
||||
return null;
|
||||
}
|
||||
|
||||
SequenceFlow flow = flows.iterator().next();
|
||||
FlowElement targetFlowElement = flow.getTargetFlowElement();
|
||||
if (targetFlowElement == null) {
|
||||
this.logger.warn("A start event outgoing flow is expected to always have a target element; skipping process: {}", processDefinitionId);
|
||||
return null;
|
||||
}
|
||||
this.logger.trace("Found element '{}' after start event '{}' in process '{}'", targetFlowElement.getId(), startElement.getId(), processDefinitionId);
|
||||
|
||||
if (!(targetFlowElement instanceof ServiceTask)) {
|
||||
this.logger.trace("A non-service task immediately after start event; skipping process: {}: {}: {}", processDefinitionId, targetFlowElement.getId(), targetFlowElement.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
task = (ServiceTask) targetFlowElement;
|
||||
if (!"delegateExpression".equals(task.getImplementationType())) {
|
||||
this.logger.trace("A non-delegate service task immediately after start event; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName());
|
||||
return null;
|
||||
}
|
||||
this.logger.trace("Found service task immediately after start event: {}: {}", task.getImplementationType(), task.getImplementation());
|
||||
|
||||
Matcher matcher = this.expressionPattern.matcher(task.getImplementation());
|
||||
if (!matcher.find()) {
|
||||
this.logger.warn("A service task delegate expression is in an unexpected format: {}; skipping process: {}: {}: {}", task.getImplementation(), processDefinitionId, task.getId(), task.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
String beanId = matcher.group(1).trim();
|
||||
this.logger.trace("Looking up bean: {}", beanId);
|
||||
|
||||
JavaDelegate delegateBean = this.context.getBean(beanId, JavaDelegate.class);
|
||||
this.logger.trace("Found bean: {}: {}", beanId, delegateBean);
|
||||
if (delegateBean == null) {
|
||||
this.logger.trace("The service task delegate has no bean; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName());
|
||||
return null;
|
||||
} else if (!(delegateBean instanceof MqSubscribeDelegate)) {
|
||||
this.logger.trace("The service task delegate is not an MQ subscription; skipping process: {}: {}: {}", processDefinitionId, task.getId(), task.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
this.processDefinitionMqSubscribeTasks.put(processDefinitionId, task);
|
||||
this.logger.info("Process starts with an MQ subscription: {}: {}", processDefinitionId, task.getId(), task.getName());
|
||||
return task;
|
||||
}
|
||||
|
||||
private BpmnModel getBpmnModel(String processDefId) {
|
||||
if (Context.getProcessDefinitionHelper() == null) {
|
||||
// typically during initial startup
|
||||
return this.services.getRepositoryService().getBpmnModel(processDefId);
|
||||
} else {
|
||||
// we cannot use the RepositoryService to get the BpmnModel as it is not yet cached in TX
|
||||
return ProcessDefinitionUtil.getBpmnModel(processDefId);
|
||||
}
|
||||
}
|
||||
|
||||
protected StartEvent findMqPlainStartEvent(String processDefId, BpmnModel model) {
|
||||
this.logger.trace("Finding all start elements in process: {}", processDefId);
|
||||
List<StartEvent> startElements = model.getMainProcess().findFlowElementsOfType(StartEvent.class);
|
||||
this.logger.trace("Found {} start elements in process: {}", startElements.size(), processDefId);
|
||||
for (StartEvent startElement : startElements) {
|
||||
// constrain to just 0..1 plain start events; ignore timer, signal, message, and error start events
|
||||
if (!(startElement.getBehavior() instanceof NoneStartEventActivityBehavior)) {
|
||||
this.logger.trace("Found non-plain start event; ignoring start event: {}: {}: {}", processDefId, startElement.getId(), startElement.getBehavior());
|
||||
continue;
|
||||
}
|
||||
|
||||
return startElement;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
@@ -2,6 +2,8 @@ package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
public interface TransactionalMessageHandler<BodyType> {
|
||||
|
||||
void onMessage(DeliveredMessage<BodyType> message) throws IOException;
|
||||
|
@@ -13,12 +13,12 @@ import javax.jms.JMSException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.GenericDestination;
|
||||
import com.inteligr8.activiti.mq.MqCommunicator;
|
||||
import com.inteligr8.activiti.mq.MqSubscriptionListener;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.CancelCallback;
|
||||
import com.rabbitmq.client.Channel;
|
||||
@@ -72,7 +72,7 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException {
|
||||
public <BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException {
|
||||
String correlationId = message.getCorrelationId() != null ? message.getCorrelationId() : UUID.randomUUID().toString();
|
||||
|
||||
AMQP.BasicProperties bprops = new AMQP.BasicProperties(null, null,
|
||||
@@ -82,7 +82,7 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
correlationId,
|
||||
message.getReplyToQueueName() != null ? message.getReplyToQueueName() : null,
|
||||
null, // expiration
|
||||
null, // messageId
|
||||
UUID.randomUUID().toString(), // messageId
|
||||
null, // timestamp
|
||||
null, // type
|
||||
null, null, null);
|
||||
@@ -99,8 +99,8 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
} finally {
|
||||
con.close();
|
||||
}
|
||||
|
||||
return correlationId;
|
||||
|
||||
return AmqpDeliveredMessage.transform(bprops);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@@ -8,9 +8,10 @@ import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.ByteArrayChannel;
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.Delivery;
|
||||
|
||||
/**
|
||||
@@ -47,6 +48,19 @@ public class AmqpDeliveredMessage<BodyType> extends AbstractMessage implements D
|
||||
this.getProperties().putAll(message.getProperties().getHeaders());
|
||||
}
|
||||
|
||||
public AmqpDeliveredMessage(AMQP.BasicProperties props, Class<BodyType> bodyType) throws IOException {
|
||||
this.setMessageId(props.getMessageId());
|
||||
this.setCorrelationId(props.getCorrelationId());
|
||||
}
|
||||
|
||||
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(AMQP.BasicProperties props) throws IOException {
|
||||
return new AmqpDeliveredMessage<>(props, null);
|
||||
}
|
||||
|
||||
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(AMQP.BasicProperties props, Class<BodyType> bodyType) throws IOException {
|
||||
return new AmqpDeliveredMessage<>(props, bodyType);
|
||||
}
|
||||
|
||||
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(Delivery message) throws IOException {
|
||||
return new AmqpDeliveredMessage<>(message, null);
|
||||
}
|
||||
|
@@ -12,8 +12,8 @@ import org.apache.commons.io.IOUtils;
|
||||
import org.jgroups.util.UUID;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.Delivery;
|
||||
|
||||
|
@@ -14,12 +14,12 @@ import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.GenericDestination;
|
||||
import com.inteligr8.activiti.mq.MqCommunicator;
|
||||
import com.inteligr8.activiti.mq.MqSubscriptionListener;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
public class JmsCommunicator implements MqCommunicator {
|
||||
|
||||
@@ -68,7 +68,7 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException {
|
||||
public <BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException {
|
||||
Connection con = this.connect();
|
||||
try {
|
||||
return this.send(con, destination, (JmsPreparedMessage<BodyType>) message);
|
||||
@@ -77,18 +77,18 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
}
|
||||
}
|
||||
|
||||
protected <BodyType> String send(Connection con, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
protected <BodyType> DeliveredMessage<BodyType> send(Connection con, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
Session session = this.start(con);
|
||||
try {
|
||||
String correlationId = this.send(session, destination, message);
|
||||
DeliveredMessage<BodyType> dmessage = this.send(session, destination, message);
|
||||
session.commit();
|
||||
return correlationId;
|
||||
return dmessage;
|
||||
} finally {
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
|
||||
public <BodyType> String send(Session session, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
public <BodyType> DeliveredMessage<BodyType> send(Session session, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
MessageProducer messenger = session.createProducer(destination.toJmsQueue(session));
|
||||
try {
|
||||
if (destination.getDeliveryMode() != null)
|
||||
@@ -103,7 +103,7 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
this.logger.debug("Sending message to queue: {}", destination.getQueueName());
|
||||
messenger.send(jmsmsg);
|
||||
|
||||
return jmsmsg.getJMSCorrelationID();
|
||||
return JmsDeliveredMessage.transform(jmsmsg);
|
||||
} finally {
|
||||
messenger.close();
|
||||
}
|
||||
@@ -143,37 +143,43 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
|
||||
public <BodyType> JmsDeliveredMessage<BodyType> receive(Session session, GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener) throws JMSException, TimeoutException {
|
||||
String messageSelector = correlationId == null ? null : ("JMSCorrelationID='" + correlationId + "'");
|
||||
Message receivedMessage = null;
|
||||
|
||||
MessageConsumer messenger = session.createConsumer(destination.toJmsQueue(session), messageSelector);
|
||||
try {
|
||||
if (listener != null)
|
||||
listener.consuming(messenger);
|
||||
|
||||
if (timeoutInMillis < 0L) {
|
||||
this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName());
|
||||
return JmsDeliveredMessage.transform(messenger.receive());
|
||||
} else if (timeoutInMillis == 0L) {
|
||||
this.logger.debug("Checking for message without waiting: {}", destination.getQueueName());
|
||||
return JmsDeliveredMessage.transform(messenger.receiveNoWait());
|
||||
} else {
|
||||
this.logger.debug("Waiting for message for {} ms: {}", timeoutInMillis, destination.getQueueName());
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
Message message = messenger.receive(timeoutInMillis);
|
||||
if (timeoutInMillis < 0L) {
|
||||
this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName());
|
||||
receivedMessage = messenger.receive();
|
||||
} else if (timeoutInMillis == 0L) {
|
||||
this.logger.debug("Checking for message without waiting: {}", destination.getQueueName());
|
||||
receivedMessage = messenger.receiveNoWait();
|
||||
} else {
|
||||
this.logger.debug("Waiting for message for {} ms: {}", timeoutInMillis, destination.getQueueName());
|
||||
receivedMessage = messenger.receive(timeoutInMillis);
|
||||
}
|
||||
|
||||
long elapsedTime = System.currentTimeMillis() - startTime;
|
||||
if (message != null) {
|
||||
if (receivedMessage != null) {
|
||||
this.logger.debug("Received message after {} ms: {}", elapsedTime, destination.getQueueName());
|
||||
return JmsDeliveredMessage.transform(message);
|
||||
return JmsDeliveredMessage.transform(receivedMessage);
|
||||
}
|
||||
|
||||
if (timeoutInMillis > 0L) {
|
||||
if (elapsedTime >= timeoutInMillis)
|
||||
throw new TimeoutException("A timeout of " + timeoutInMillis + " ms was reached");
|
||||
this.logger.debug("Done waiting for message after {} minutes: {}", elapsedTime / 60000L, destination.getQueueName());
|
||||
}
|
||||
|
||||
if (elapsedTime < timeoutInMillis) {
|
||||
throw new JMSException("The reading of the queue ended prematurely");
|
||||
} else {
|
||||
throw new TimeoutException("A timeout of " + timeoutInMillis + " ms was reached");
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
if (listener != null)
|
||||
listener.consumed(messenger);
|
||||
}
|
||||
} finally {
|
||||
if (listener != null)
|
||||
listener.consumed(messenger);
|
||||
|
||||
messenger.close();
|
||||
}
|
||||
}
|
||||
|
@@ -20,8 +20,8 @@ import org.apache.commons.io.IOUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
public class JmsDeliveredMessage<BodyType> extends AbstractMessage implements DeliveredMessage<BodyType> {
|
||||
|
||||
|
@@ -14,8 +14,8 @@ import javax.jms.StreamMessage;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.jgroups.util.UUID;
|
||||
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
public class JmsPreparedMessage<BodyType> extends AbstractMessage implements PreparedMessage<BodyType> {
|
||||
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
public interface Message {
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
@@ -7,8 +7,6 @@ import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.Queue;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -17,6 +15,9 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
@Component
|
||||
public class MQDockerIT {
|
||||
|
||||
@@ -106,15 +107,18 @@ public class MQDockerIT {
|
||||
OffsetDateTime beforeSend = OffsetDateTime.now();
|
||||
this.logger.debug("Timestamp before send: {}", beforeSend);
|
||||
|
||||
String correlationId = communicator.send(destination, message);
|
||||
Assertions.assertNotNull(correlationId);
|
||||
DeliveredMessage<String> sentMessage = communicator.send(destination, message);
|
||||
Assertions.assertNotNull(sentMessage);
|
||||
Assertions.assertNotNull(sentMessage.getMessageId());
|
||||
Assertions.assertNotNull(sentMessage.getCorrelationId());
|
||||
|
||||
DeliveredMessage<String> receivedMessage = communicator.receive(destination, 25L, correlationId);
|
||||
DeliveredMessage<String> receivedMessage = communicator.receive(destination, 25L, sentMessage.getCorrelationId());
|
||||
Assertions.assertNotNull(receivedMessage);
|
||||
this.logger.debug("Timestamp of delivery: {}", receivedMessage.getDeliveryTime());
|
||||
|
||||
Assertions.assertNotNull(receivedMessage.getMessageId());
|
||||
Assertions.assertEquals(correlationId, receivedMessage.getCorrelationId());
|
||||
Assertions.assertEquals(sentMessage.getMessageId(), receivedMessage.getMessageId());
|
||||
Assertions.assertEquals(sentMessage.getCorrelationId(), receivedMessage.getCorrelationId());
|
||||
Assertions.assertEquals(message.getReplyToQueueName(), receivedMessage.getReplyToQueueName());
|
||||
Assertions.assertEquals("{}", receivedMessage.getContent());
|
||||
Assertions.assertNotNull(receivedMessage.getDeliveryTime());
|
||||
|
3
src/test/resources/activiti/whitelisted-scripts.conf
Normal file
3
src/test/resources/activiti/whitelisted-scripts.conf
Normal file
@@ -0,0 +1,3 @@
|
||||
javascript
|
||||
js
|
||||
ecmascript
|
24
src/test/vscode/test.http
Normal file
24
src/test/vscode/test.http
Normal file
@@ -0,0 +1,24 @@
|
||||
@baseUrl = http://localhost:8080/activiti-app
|
||||
@username = admin@app.activiti.com
|
||||
@password = admin
|
||||
@basic = YWRtaW5AYXBwLmFjdGl2aXRpLmNvbTphZG1pbg==
|
||||
|
||||
###
|
||||
# @name getPis
|
||||
POST {{baseUrl}}/api/enterprise/historic-process-instances/query
|
||||
Authorization: BASIC {{username}}:{{password}}
|
||||
Content-type: application/json
|
||||
|
||||
{
|
||||
"finished": false
|
||||
}
|
||||
|
||||
###
|
||||
# @name getPis2
|
||||
GET {{baseUrl}}/api/runtime/process-instances?tenantId=tenant_1
|
||||
Authorization: BASIC {{username}}:{{password}}
|
||||
|
||||
###
|
||||
# @name getExecs
|
||||
GET {{baseUrl}}/api/runtime/executions?tenantId=tenant_1
|
||||
Authorization: BASIC {{username}}:{{password}}
|
Reference in New Issue
Block a user