completed initial impl
This commit is contained in:
parent
3ba01ddd9f
commit
88dc27a9ee
2
.gitignore
vendored
2
.gitignore
vendored
@ -12,3 +12,5 @@ pom.xml.versionsBackup
|
||||
|
||||
# IntelliJ
|
||||
/.idea/
|
||||
/.apt_generated/
|
||||
/.apt_generated_tests/
|
||||
|
50
apidocs/bean-mqPublishDelegate.md
Normal file
50
apidocs/bean-mqPublishDelegate.md
Normal file
@ -0,0 +1,50 @@
|
||||
# API Documentation: MQ Activiti Extension: `mqPublishDelegate`
|
||||
|
||||
*Author*: [brian@inteligr8.com](mailto:brian@inteligr8.com)
|
||||
*Since Version*: 1.0
|
||||
|
||||
|
||||
This delegate sends a message to an MQ queue.
|
||||
|
||||
*See Also*: [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md)
|
||||
*See Also*: [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md)
|
||||
|
||||
## <a name="delegate"></a> Delegate Expression Uses
|
||||
|
||||
This bean may be used as a *Delegate* using the "Delegate Expression" field as shown in the snippet below.
|
||||
|
||||
```javascript
|
||||
${mqPublishDelegate}
|
||||
```
|
||||
|
||||
You may use it in a **Service Task**.
|
||||
|
||||
|
||||
This method sends a message to an MQ queue.
|
||||
|
||||
It does not wait for a response and will continue after a successful publish of the message to the specified queue. Use `mqSubscribeReplyDelegate` in a subsequent task to handle the response asynchronously.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqPublishDelegate` task.
|
||||
|
||||
| Input Type | Name | Java Type | Documentation |
|
||||
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
|
||||
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
|
||||
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
|
||||
|
||||
| Result Type | Java Type, Name, or Error Code | Documentation |
|
||||
| ------------------------ | ------------------------------------------------ | -------------------------------- |
|
||||
| Activiti Variable | `mq_correlationId` | The correlationId of the message sent. |
|
||||
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
|
||||
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |
|
||||
| Thrown BPMN Error | `mq` | An unknown MQ issue occurred. |
|
||||
|
||||
*See Also*: [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md#delegate)
|
||||
*See Also*: [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md#delegate)
|
||||
|
||||
---
|
||||
|
||||
[Back to Index](index.md)
|
||||
|
||||
Generated by the [Inteligr8](https://inteligr8.com) [Activiti API Doclet](https://bitbucket.org/inteligr8/activiti-api-doclet).
|
||||
|
59
apidocs/bean-mqSubscribeDelegate.md
Normal file
59
apidocs/bean-mqSubscribeDelegate.md
Normal file
@ -0,0 +1,59 @@
|
||||
# API Documentation: MQ Activiti Extension: `mqSubscribeDelegate`
|
||||
|
||||
*Author*: [brian@inteligr8.com](mailto:brian@inteligr8.com)
|
||||
*Since Version*: 1.0
|
||||
|
||||
|
||||
This delegate listens for messages on an MQ queue.
|
||||
|
||||
*See Also*: [`mqPublishDelegate`](bean-mqPublishDelegate.md)
|
||||
*See Also*: [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md)
|
||||
|
||||
## <a name="delegate"></a> Delegate Expression Uses
|
||||
|
||||
This bean may be used as a *Delegate* using the "Delegate Expression" field as shown in the snippet below.
|
||||
|
||||
```javascript
|
||||
${mqSubscribeDelegate}
|
||||
```
|
||||
|
||||
You may use it in a **Service Task**.
|
||||
|
||||
|
||||
This delegate listens for messages on an MQ queue.
|
||||
|
||||
It does not wait for a response to a specific message put on an MQ queue. That is for the `mqPublishDelegate` and `mqSubscribeReplyDelegate` tasks.
|
||||
|
||||
When used, this task must be the first task after a plain start event in a process. If used in any other way, it will error and the process will fail validation.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this task. The resultant variables will then include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
|
||||
TODO map response to variables
|
||||
|
||||
| Input Type | Name | Java Type | Documentation |
|
||||
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
|
||||
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
|
||||
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
|
||||
|
||||
| Result Type | Java Type, Name, or Error Code | Documentation |
|
||||
| ------------------------ | ------------------------------------------------ | -------------------------------- |
|
||||
| Activiti Variable | `mq_correlationId` | The correlating identifier of the message received. Use to correlate between sent/received messages; like a thread of communication. |
|
||||
| Activiti Variable | `mq_messageId` | The unique message identifer of the message received. |
|
||||
| Activiti Variable | `mq_deliveryTime` | The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti. |
|
||||
| Activiti Variable | `mq_priority` | An integer priority of the message; value depends on MQ protocol. |
|
||||
| Activiti Variable | `mq_replyQueueName` | The name of a queue or topic to use to reply to the received message. The reply message must have the same correlating identifier. |
|
||||
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
|
||||
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |
|
||||
| Thrown BPMN Error | `mq` | An unknown MQ issue occurred. |
|
||||
|
||||
*See Also*: [`mqPublishDelegate`](bean-mqPublishDelegate.md#delegate)
|
||||
*See Also*: [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md#delegate)
|
||||
|
||||
---
|
||||
|
||||
[Back to Index](index.md)
|
||||
|
||||
Generated by the [Inteligr8](https://inteligr8.com) [Activiti API Doclet](https://bitbucket.org/inteligr8/activiti-api-doclet).
|
||||
|
55
apidocs/bean-mqSubscribeReplyDelegate.md
Normal file
55
apidocs/bean-mqSubscribeReplyDelegate.md
Normal file
@ -0,0 +1,55 @@
|
||||
# API Documentation: MQ Activiti Extension: `mqSubscribeReplyDelegate`
|
||||
|
||||
*Author*: [brian@inteligr8.com](mailto:brian@inteligr8.com)
|
||||
*Since Version*: 1.0
|
||||
|
||||
|
||||
This delegate listens for a reply message on an MQ queue.
|
||||
|
||||
*See Also*: mqPublishDelegate
|
||||
*See Also*: mqSubscribeDelegate
|
||||
|
||||
## <a name="delegate"></a> Delegate Expression Uses
|
||||
|
||||
This bean may be used as a *Delegate* using the "Delegate Expression" field as shown in the snippet below.
|
||||
|
||||
```javascript
|
||||
${mqSubscribeReplyDelegate}
|
||||
```
|
||||
|
||||
You may use it in a **Service Task**.
|
||||
|
||||
|
||||
This method listens for a reply message on an MQ queue.
|
||||
|
||||
It uses the `mq_correlationId` variable to select the corresponding reply message. That variable is automatically set by the `mqPublishDelegate` task. This is meant to be used after `mqPublishDelegate` and not just by itself. If you want to start processes with an MQ subscription see the `mqSubscribeDelegate` task.
|
||||
|
||||
If you have more than one MQ communication in a single process, set the `mq_messageName` field on this and the corresponding `mqPublishDelegate` task.
|
||||
|
||||
TODO map response to variables
|
||||
|
||||
| Input Type | Name | Java Type | Documentation |
|
||||
| ------------------------ | ------------------------ | ------------------------------------------------ | ---------------------------------- |
|
||||
| BPMN Field | `mq_connectorId` | | An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`. |
|
||||
| BPMN Field | `mq_queueName` | | The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created. |
|
||||
| BPMN Field | `mq_metadataProcessScope` | | [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task. |
|
||||
| BPMN Field | `mq_messageName` | | [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process. |
|
||||
| Activiti Variable | `mq_correlationId` | | The correlating identifier of the message to receive. Used to correlate between sent/received messages; like a thread of communication. |
|
||||
|
||||
| Result Type | Java Type, Name, or Error Code | Documentation |
|
||||
| ------------------------ | ------------------------------------------------ | -------------------------------- |
|
||||
| Activiti Variable | `mq_messageId` | The unique message identifer of the message received. |
|
||||
| Activiti Variable | `mq_deliveryTime` | The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti. |
|
||||
| Activiti Variable | `mq_priority` | An integer priority of the message; value depends on MQ protocol. |
|
||||
| Activiti Variable | `mq_replyQueueName` | The name of a queue or topic to use to reply to the received message. The reply message must have the same correlating identifier. |
|
||||
| Thrown BPMN Error | `correlation` | `mq_correlationId` is required. |
|
||||
| Thrown BPMN Error | `timeout` | The MQ connection timed out connecting or waiting for a message. |
|
||||
| Thrown BPMN Error | `network` | The MQ connection experienced network issues. |
|
||||
| Thrown BPMN Error | `mq` | An unknown MQ issue occurred. |
|
||||
|
||||
---
|
||||
|
||||
[Back to Index](index.md)
|
||||
|
||||
Generated by the [Inteligr8](https://inteligr8.com) [Activiti API Doclet](https://bitbucket.org/inteligr8/activiti-api-doclet).
|
||||
|
30
apidocs/index.md
Normal file
30
apidocs/index.md
Normal file
@ -0,0 +1,30 @@
|
||||
|
||||
# API Documentation: MQ Activiti Extension
|
||||
|
||||
This file documents the MQ Activiti Extension library. When this library is deployed in Activiti, including Alfresco Process Services, the following API can be utilized.
|
||||
|
||||
Activiti has a set of features that allow a model designer to call these Java-based APIs. These can be from various tasks, listeners, and expressions. The exposure of APIs may differ from method to method. Those details are in this documentation. See the table below for the various integrations between Activiti and these libraries.
|
||||
|
||||
| Activiti Feature | API Notes |
|
||||
| -------------------------------------- | --------- |
|
||||
| Delegate Expression | Any named Spring Bean that implements the `JavaDelegate` interface. |
|
||||
| Expression | Any named Spring Bean and one of its public methods. |
|
||||
| Script | Any named Spring Beans and their public methods. |
|
||||
| Execution Listener Delegate Expression | Any named Spring Bean that implements the `ExecutionListener` interface. |
|
||||
| Execution Listener Expression | Any named Spring Bean and its public method. |
|
||||
| Task Listener Delegate Expression | Any named Spring Bean that implements the `TaskListener` interface. |
|
||||
| Task Listener Expression | Any named Spring Bean and its public method. |
|
||||
|
||||
It is important to note that Activiti expressions use the JUEL language and Activiti scripts use the specified/configured language. Popular configured languages may be JavaScript or Groovy. In any case, the same API notation may be used. It is also worth noting that execution/task listeners may exist on most objects.
|
||||
|
||||
## Delegates
|
||||
|
||||
| Delegate ID | Brief Documentation |
|
||||
| -------------------------------- | ------------------------- |
|
||||
| [`mqPublishDelegate`](bean-mqPublishDelegate.md) | This delegate sends a message to an MQ queue. |
|
||||
| [`mqSubscribeDelegate`](bean-mqSubscribeDelegate.md) | This delegate listens for messages on an MQ queue. |
|
||||
| [`mqSubscribeReplyDelegate`](bean-mqSubscribeReplyDelegate.md) | This delegate listens for a reply message on an MQ queue. |
|
||||
|
||||
---
|
||||
|
||||
Generated by the [Inteligr8](https://inteligr8.com) [Activiti API Doclet](https://bitbucket.org/inteligr8/activiti-api-doclet).
|
79
parent/pom.xml
Normal file
79
parent/pom.xml
Normal file
@ -0,0 +1,79 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.inteligr8.activiti</groupId>
|
||||
<artifactId>mq-activiti-ext-parent</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<name>MQ Activiti Extension Parent</name>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<maven.compiler.source>11</maven.compiler.source>
|
||||
<maven.compiler.target>11</maven.compiler.target>
|
||||
<maven.compiler.release>11</maven.compiler.release>
|
||||
|
||||
<activemq.version>5.18.6</activemq.version>
|
||||
<activemq.httpPort>8161</activemq.httpPort>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<!-- Using Docker to host the web application -->
|
||||
<plugin>
|
||||
<groupId>io.fabric8</groupId>
|
||||
<artifactId>docker-maven-plugin</artifactId>
|
||||
<version>0.45.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>start-mq</id>
|
||||
<!-- We had to put this in a parent project in order for MQ to start before APS -->
|
||||
<phase>pre-integration-test</phase>
|
||||
<goals><goal>start</goal></goals>
|
||||
<configuration>
|
||||
<images>
|
||||
<image>
|
||||
<name>apache/activemq-classic:${activemq.version}</name>
|
||||
<alias>mq</alias>
|
||||
<run>
|
||||
<skip>${skipTests}</skip>
|
||||
<env>
|
||||
<ACTIVEMQ_WEBCONSOLE_USE_DEFAULT_ADDRESS>false</ACTIVEMQ_WEBCONSOLE_USE_DEFAULT_ADDRESS>
|
||||
<ACTIVEMQ_USERNAME>alfresco</ACTIVEMQ_USERNAME>
|
||||
<ACTIVEMQ_PASSWORD>alfresco</ACTIVEMQ_PASSWORD>
|
||||
<ACTIVEMQ_WEBADMIN_USERNAME>admin</ACTIVEMQ_WEBADMIN_USERNAME>
|
||||
<ACTIVEMQ_WEBADMIN_PASSWORD>admin</ACTIVEMQ_WEBADMIN_PASSWORD>
|
||||
</env>
|
||||
<ports>
|
||||
<port>${activemq.httpPort}:8161</port>
|
||||
</ports>
|
||||
<network>
|
||||
<mode>custom</mode>
|
||||
<name>mq-activiti-ext</name>
|
||||
</network>
|
||||
</run>
|
||||
</image>
|
||||
</images>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>stop-mq</id>
|
||||
<phase>post-integration-test</phase>
|
||||
<goals><goal>stop</goal></goals>
|
||||
<configuration>
|
||||
<images>
|
||||
<image>
|
||||
<name>apache/activemq-classic:${activemq.version}</name>
|
||||
<alias>mq</alias>
|
||||
</image>
|
||||
</images>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
245
pom.xml
245
pom.xml
@ -2,8 +2,14 @@
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>com.inteligr8.activiti</groupId>
|
||||
<artifactId>mq-activiti-ext-parent</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<relativePath>parent/pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<groupId>com.inteligr8.activiti</groupId>
|
||||
<artifactId>mq-activiti-ext</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
@ -16,17 +22,20 @@
|
||||
<maven.compiler.target>11</maven.compiler.target>
|
||||
<maven.compiler.release>11</maven.compiler.release>
|
||||
|
||||
<aps.version>2.4.1</aps.version>
|
||||
<spring.version>5.3.29</spring.version>
|
||||
<activiti.version>7.11.0</activiti.version>
|
||||
<jackson.version>2.13.5</jackson.version>
|
||||
<aps.docker-tag>docker.inteligr8.com/inteligr8/alfresco-process-services</aps.docker-tag>
|
||||
<aps.version>2.4.6</aps.version>
|
||||
<spring.version>5.3.31</spring.version>
|
||||
<activiti.version>7.11.1</activiti.version>
|
||||
<tomcat-rad.version>9-2.2</tomcat-rad.version>
|
||||
<aps.tomcat.opts>-Dinteligr8.mq.connectors.docker-mq.url=failover:\(tcp://${project.artifactId}-mq:61616\)?timeout=3000 -Dinteligr8.mq.connectors.docker-mq.username=admin -Dinteligr8.mq.connectors.docker-mq.password=admin</aps.tomcat.opts>
|
||||
|
||||
<rabbitmq.version>5.12.0</rabbitmq.version>
|
||||
<activemq.version>5.18.3</activemq.version>
|
||||
<!-- reloads in APS are slower than a restart -->
|
||||
<aps.hotswap.enabled>false</aps.hotswap.enabled>
|
||||
|
||||
<rabbitmq.version>5.18.0</rabbitmq.version>
|
||||
<activemq.version>5.18.6</activemq.version>
|
||||
<activemq.jms.version>2.31.2</activemq.jms.version>
|
||||
|
||||
<!-- for testin -->
|
||||
<cxf.version>3.0.12</cxf.version>
|
||||
<activemq.httpPort>8161</activemq.httpPort>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
@ -64,20 +73,29 @@
|
||||
<groupId>org.activiti</groupId>
|
||||
<artifactId>activiti-spring</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<!-- avoiding old tomcat-* versions -->
|
||||
<groupId>org.apache.tomcat.embed</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<!-- This can help with debugging in IDE; never do RAD or compile with it -->
|
||||
<!--
|
||||
<dependency>
|
||||
<groupId>org.activiti</groupId>
|
||||
<artifactId>activiti-api-process-runtime</artifactId>
|
||||
<version>${activiti.version}</version>
|
||||
<groupId>com.activiti</groupId>
|
||||
<artifactId>activiti-app</artifactId>
|
||||
<version>${aps.version}</version>
|
||||
<classifier>classes</classifier>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.activiti</groupId>
|
||||
<artifactId>activiti-api-task-runtime</artifactId>
|
||||
<version>${activiti.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>*</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
-->
|
||||
|
||||
<!-- MQ -->
|
||||
<dependency>
|
||||
@ -90,19 +108,38 @@
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-client</artifactId>
|
||||
<version>${activemq.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-jms-client</artifactId>
|
||||
<version>${activemq.jms.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>commons-collections</groupId>
|
||||
<artifactId>commons-collections</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-beanutils</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- Testing only -->
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.13.2</version>
|
||||
<scope>test</scope>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>5.10.5</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
@ -116,23 +153,60 @@
|
||||
<version>4.5.9</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.mattbertolini</groupId>
|
||||
<artifactId>liquibase-slf4j</artifactId>
|
||||
<version>5.1.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<!-- avoids log4j dependency -->
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.13.0</version>
|
||||
</plugin>
|
||||
<!-- avoids struts and vulnerable jetty dependency -->
|
||||
<plugin>
|
||||
<artifactId>maven-site-plugin</artifactId>
|
||||
<version>3.21.0</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>3.5.2</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-failsafe-plugin</artifactId>
|
||||
<version>3.5.2</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
<plugins>
|
||||
<!-- avoids log4j dependency -->
|
||||
<!--
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.11.0</version>
|
||||
</plugin>
|
||||
<!-- avoids struts dependency -->
|
||||
<plugin>
|
||||
<artifactId>maven-site-plugin</artifactId>
|
||||
<version>3.12.1</version>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>package-stencil</id>
|
||||
<phase>package</phase>
|
||||
<goals><goal>jar</goal></goals>
|
||||
<configuration>
|
||||
<classesDirectory>${basedir}/src/main/activiti</classesDirectory>
|
||||
<classifier>stencil</classifier>
|
||||
<includes>
|
||||
<include>**/*</include>
|
||||
</includes>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
-->
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>3.6.0</version>
|
||||
<version>3.7.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>assemble-jar</id>
|
||||
@ -144,17 +218,49 @@
|
||||
</descriptorRefs>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>package-stencil</id>
|
||||
<phase>package</phase>
|
||||
<goals><goal>single</goal></goals>
|
||||
<configuration>
|
||||
<inlineDescriptors>
|
||||
<inlineDescriptor>
|
||||
<id>stencil</id>
|
||||
<formats>
|
||||
<format>zip</format>
|
||||
</formats>
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<directory>src/main/activiti</directory>
|
||||
<outputDirectory>.</outputDirectory>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
</inlineDescriptor>
|
||||
</inlineDescriptors>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-failsafe-plugin</artifactId>
|
||||
<configuration>
|
||||
<includes>
|
||||
<include>**/*IT.java</include>
|
||||
</includes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>io.repaint.maven</groupId>
|
||||
<artifactId>tiles-maven-plugin</artifactId>
|
||||
<version>2.36</version>
|
||||
<version>2.40</version>
|
||||
<extensions>true</extensions>
|
||||
<configuration>
|
||||
<tiles>
|
||||
<!-- Documentation: https://bitbucket.org/inteligr8/ootbee-beedk/src/stable/beedk-aps-ext-rad-tile -->
|
||||
<tile>com.inteligr8.ootbee:beedk-aps-ext-rad-tile:[1.0.16,1.1)</tile>
|
||||
<tile>com.inteligr8.ootbee:beedk-aps-ext-rad-tile:[1.1,1.1.999)</tile>
|
||||
<!-- Documentation: https://bitbucket.org/inteligr8/ootbee-beedk/src/stable/beedk-aps-ext-it-tile -->
|
||||
<tile>com.inteligr8.ootbee:beedk-aps-ext-it-tile:[1.1,1.1.999)</tile>
|
||||
</tiles>
|
||||
</configuration>
|
||||
</plugin>
|
||||
@ -162,6 +268,66 @@
|
||||
</build>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>rad-mq</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>rad</name>
|
||||
</property>
|
||||
</activation>
|
||||
<build>
|
||||
<plugins>
|
||||
<!-- Using Docker to host the web application -->
|
||||
<plugin>
|
||||
<groupId>io.fabric8</groupId>
|
||||
<artifactId>docker-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>run-mq</id>
|
||||
<phase>process-test-resources</phase>
|
||||
<goals><goal>start</goal></goals>
|
||||
<configuration>
|
||||
<images>
|
||||
<image>
|
||||
<name>apache/activemq-classic:${activemq.version}</name>
|
||||
<alias>mq</alias>
|
||||
<run>
|
||||
<env>
|
||||
<ACTIVEMQ_WEBCONSOLE_USE_DEFAULT_ADDRESS>false</ACTIVEMQ_WEBCONSOLE_USE_DEFAULT_ADDRESS>
|
||||
<ACTIVEMQ_USERNAME>alfresco</ACTIVEMQ_USERNAME>
|
||||
<ACTIVEMQ_PASSWORD>alfresco</ACTIVEMQ_PASSWORD>
|
||||
<ACTIVEMQ_WEBADMIN_USERNAME>admin</ACTIVEMQ_WEBADMIN_USERNAME>
|
||||
<ACTIVEMQ_WEBADMIN_PASSWORD>admin</ACTIVEMQ_WEBADMIN_PASSWORD>
|
||||
</env>
|
||||
<ports>
|
||||
<port>${activemq.httpPort}:8161</port>
|
||||
</ports>
|
||||
<network>
|
||||
<mode>custom</mode>
|
||||
<name>${project.artifactId}</name>
|
||||
</network>
|
||||
</run>
|
||||
</image>
|
||||
</images>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>non-rad</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>!rad</name>
|
||||
</property>
|
||||
</activation>
|
||||
<properties>
|
||||
<docker.showLogs>true</docker.showLogs>
|
||||
<aps.timeout>90000</aps.timeout> <!-- tests do some delays -->
|
||||
</properties>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>apidoc</id>
|
||||
<build>
|
||||
@ -179,7 +345,7 @@
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<version>3.4.1</version>
|
||||
<version>3.8.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>generate-doclet</id>
|
||||
@ -190,7 +356,7 @@
|
||||
<docletArtifact>
|
||||
<groupId>com.inteligr8.activiti</groupId>
|
||||
<artifactId>activiti-api-doclet</artifactId>
|
||||
<version>1.0.4</version>
|
||||
<version>1.1.0</version>
|
||||
</docletArtifact>
|
||||
<useStandardDocletOptions>false</useStandardDocletOptions>
|
||||
<destDir>apidocs</destDir>
|
||||
@ -199,6 +365,9 @@
|
||||
<additionalOption>--title 'API Documentation'</additionalOption>
|
||||
<additionalOption>--apiName '${project.name}</additionalOption>
|
||||
</additionalOptions>
|
||||
|
||||
<!-- This is required in maven-javadoc-plugin v3.5+ -->
|
||||
<sourceFileIncludes>**/*.java</sourceFileIncludes>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
|
8
rad.sh
8
rad.sh
@ -6,17 +6,17 @@ discoverArtifactId() {
|
||||
|
||||
rebuild() {
|
||||
echo "Rebuilding project ..."
|
||||
mvn process-classes
|
||||
mvn process-test-classes
|
||||
}
|
||||
|
||||
start() {
|
||||
echo "Rebuilding project and starting Docker containers to support rapid application development ..."
|
||||
mvn -Drad process-classes
|
||||
mvn -Drad process-test-classes
|
||||
}
|
||||
|
||||
start_log() {
|
||||
echo "Rebuilding project and starting Docker containers to support rapid application development ..."
|
||||
mvn -Drad -Ddocker.showLogs process-classes
|
||||
mvn -Drad -Ddocker.showLogs process-test-classes
|
||||
}
|
||||
|
||||
stop() {
|
||||
@ -31,6 +31,8 @@ stop() {
|
||||
|
||||
tail_logs() {
|
||||
discoverArtifactId
|
||||
echo "artifact: ${ARTIFACT_ID}"
|
||||
docker container ls -q --filter name=${ARTIFACT_ID}-$1
|
||||
docker container logs -f `docker container ls -q --filter name=${ARTIFACT_ID}-$1`
|
||||
}
|
||||
|
||||
|
3358
src/main/activiti/MQ.json
Normal file
3358
src/main/activiti/MQ.json
Normal file
File diff suppressed because one or more lines are too long
BIN
src/main/activiti/images/1-1.png
Normal file
BIN
src/main/activiti/images/1-1.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 1.0 KiB |
BIN
src/main/activiti/images/2-2.png
Normal file
BIN
src/main/activiti/images/2-2.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 1015 B |
BIN
src/main/activiti/images/3-3.png
Normal file
BIN
src/main/activiti/images/3-3.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 1015 B |
@ -2,11 +2,15 @@ package com.activiti.extension.conf;
|
||||
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.FullyQualifiedAnnotationBeanNameGenerator;
|
||||
|
||||
@Configuration
|
||||
@ComponentScan(basePackages = {
|
||||
@ComponentScan(
|
||||
basePackages = {
|
||||
"com.inteligr8.activiti.mq"
|
||||
})
|
||||
},
|
||||
nameGenerator = FullyQualifiedAnnotationBeanNameGenerator.class
|
||||
)
|
||||
public class MqSpringComponentScanner {
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,34 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import org.activiti.bpmn.model.Activity;
|
||||
import org.activiti.engine.delegate.event.ActivitiActivityEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventListener;
|
||||
|
||||
public abstract class AbstractActivityListener implements ActivitiEventListener {
|
||||
|
||||
private final String processDefId;
|
||||
private final Activity activity;
|
||||
|
||||
public AbstractActivityListener(String processDefId, Activity activity) {
|
||||
this.processDefId = processDefId;
|
||||
this.activity = activity;
|
||||
}
|
||||
|
||||
public abstract void onEvent(ActivitiActivityEvent event);
|
||||
|
||||
@Override
|
||||
public final void onEvent(ActivitiEvent event) {
|
||||
if (!event.getProcessDefinitionId().equals(this.processDefId))
|
||||
return;
|
||||
if (!(event instanceof ActivitiActivityEvent))
|
||||
return;
|
||||
|
||||
ActivitiActivityEvent aaevent = (ActivitiActivityEvent) event;
|
||||
if (!aaevent.getActivityId().equals(this.activity.getId()))
|
||||
return;
|
||||
|
||||
this.onEvent(aaevent);
|
||||
}
|
||||
|
||||
}
|
22
src/main/java/com/inteligr8/activiti/mq/AbstractMessage.java
Normal file
22
src/main/java/com/inteligr8/activiti/mq/AbstractMessage.java
Normal file
@ -0,0 +1,22 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class AbstractMessage implements Message {
|
||||
|
||||
private final Map<String, Object> map = new HashMap<>();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> T getProperty(String key) {
|
||||
return (T) this.map.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProperty(String key, Object value) {
|
||||
this.map.put(key, value);
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.activiti.engine.delegate.JavaDelegate;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.core.env.Environment;
|
||||
|
||||
import com.activiti.domain.idm.EndpointConfiguration;
|
||||
import com.activiti.service.api.EndpointService;
|
||||
|
||||
public abstract class AbstractMqDelegate implements JavaDelegate {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
private final String propertyPrefix = "inteligr8.mq";
|
||||
|
||||
@Autowired
|
||||
private Environment env;
|
||||
|
||||
@Autowired
|
||||
private EndpointService endpointService;
|
||||
|
||||
@Autowired
|
||||
private MqCommunicationService mqCommService;
|
||||
|
||||
private Map<String, MqCommunicator> communicators = new HashMap<>();
|
||||
|
||||
protected synchronized MqCommunicator getConnection(String connectorId) throws JMSException, TimeoutException, IOException {
|
||||
MqCommunicator communicator = this.communicators.get(connectorId);
|
||||
if (communicator == null) {
|
||||
EndpointConfiguration endpointConfig = this.endpointService.getConfigurationByName(connectorId);
|
||||
if (endpointConfig != null) {
|
||||
communicator = this.mqCommService.getCommunicator(endpointConfig);
|
||||
} else {
|
||||
this.logger.debug("Cannot find APS endpoint declaration; looking in properties for configuration: {}", connectorId);
|
||||
String propertyKeyPrefix = this.propertyPrefix + ".connectors." + connectorId;
|
||||
String urlPropertyKey = propertyKeyPrefix + ".url";
|
||||
String url = this.env.getProperty(urlPropertyKey);
|
||||
if (url == null)
|
||||
throw new IllegalStateException("The '" + connectorId + "' APS endpoint (" + connectorId + ") or URL property (" + urlPropertyKey + ") could not be found.");
|
||||
|
||||
String username = this.env.getProperty(propertyKeyPrefix + ".username");
|
||||
String password = this.env.getProperty(propertyKeyPrefix + ".password");
|
||||
communicator = this.mqCommService.getCommunicator(URI.create(url), username, password);
|
||||
}
|
||||
|
||||
this.communicators.put(connectorId, communicator);
|
||||
}
|
||||
|
||||
return communicator;
|
||||
}
|
||||
|
||||
protected Object mapValueToVariableValue(Object jmsValue) {
|
||||
// TODO
|
||||
return jmsValue;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
|
||||
public class ByteArrayChannel implements ReadableByteChannel {
|
||||
|
||||
private final ByteBuffer bytes;
|
||||
private boolean closed = false;
|
||||
|
||||
public ByteArrayChannel(byte[] bytes) {
|
||||
this.bytes = ByteBuffer.wrap(bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
this.closed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return !this.closed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(ByteBuffer dst) throws IOException {
|
||||
if (!this.bytes.hasRemaining())
|
||||
return -1;
|
||||
if (!dst.hasRemaining())
|
||||
return 0;
|
||||
|
||||
int bytesToWrite = Math.min(this.bytes.remaining(), dst.remaining());
|
||||
this.bytes.limit(this.bytes.position() + bytesToWrite);
|
||||
dst.put(this.bytes);
|
||||
|
||||
return bytesToWrite;
|
||||
}
|
||||
|
||||
}
|
31
src/main/java/com/inteligr8/activiti/mq/Constants.java
Normal file
31
src/main/java/com/inteligr8/activiti/mq/Constants.java
Normal file
@ -0,0 +1,31 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
public class Constants {
|
||||
|
||||
/**
|
||||
* These are the BPMN field names set on the ServiceTask in the MQ stencil
|
||||
*/
|
||||
public static final String FIELD_METADATA_PROCESS_SCOPE = "mq_metadataProcessScope";
|
||||
public static final String FIELD_CONNECTOR_ID = "mq_connectorId";
|
||||
public static final String FIELD_QUEUE_NAME = "mq_queueName";
|
||||
public static final String FIELD_MESSAGE_NAME = "mq_messageName";
|
||||
public static final String FIELD_REPLY_QUEUE_NAME = "mq_replyQueueName";
|
||||
public static final String FIELD_STATUS_QUEUE_NAME = "mq_statusQueueName";
|
||||
public static final String FIELD_PAYLOAD = "mq_payload";
|
||||
public static final String FIELD_PAYLOAD_MIME_TYPE = "mq_payloadMimeType";
|
||||
|
||||
/**
|
||||
* These are the variable names set on tasks or process instances
|
||||
*
|
||||
* When an `mq_messageName` field is provided, it is appended to the end of these variable names (e.g. `mq_messageId_{mq_messageName}`)
|
||||
*/
|
||||
public static final String VARIABLE_CORRELATION_ID = "mq_correlationId";
|
||||
public static final String VARIABLE_MESSAGE_ID = "mq_messageId";
|
||||
public static final String VARIABLE_DELIVERY_TIME = "mq_deliveryTime";
|
||||
public static final String VARIABLE_PRIORITY = "mq_priority";
|
||||
public static final String VARIABLE_REPLY_QUEUE_NAME = "mq_replyQueueName";
|
||||
public static final String VARIABLE_STATUS_QUEUE_NAME = "mq_statusQueueName";
|
||||
public static final String VARIABLE_PAYLOAD = "mq_payload";
|
||||
public static final String VARIABLE_PAYLOAD_MIME_TYPE = "mq_payloadMimeType";
|
||||
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
|
||||
public interface DeliveredMessage<BodyType> extends PreparedMessage<BodyType> {
|
||||
|
||||
default String getMessageId() {
|
||||
return this.getProperty("messageId");
|
||||
}
|
||||
|
||||
default void setMessageId(String messageId) {
|
||||
this.setProperty("messageId", messageId);
|
||||
}
|
||||
|
||||
default Integer getPriority() {
|
||||
return this.getProperty("priority");
|
||||
}
|
||||
|
||||
default void setPriority(Integer priority) {
|
||||
this.setProperty("priority", priority);
|
||||
}
|
||||
|
||||
default OffsetDateTime getDeliveryTime() {
|
||||
return this.getProperty("deliveryTime");
|
||||
}
|
||||
|
||||
default void setDeliveryTime(OffsetDateTime deliveryTime) {
|
||||
this.setProperty("deliveryTime", deliveryTime);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.Topic;
|
||||
|
||||
public class GenericDestination {
|
||||
|
||||
private String queueName;
|
||||
private Integer deliveryMode;
|
||||
private Integer priority;
|
||||
private Long timeToLive;
|
||||
|
||||
public String getQueueName() {
|
||||
return queueName;
|
||||
}
|
||||
|
||||
public void setQueueName(String queueName) {
|
||||
this.queueName = queueName;
|
||||
}
|
||||
|
||||
public Integer getDeliveryMode() {
|
||||
return deliveryMode;
|
||||
}
|
||||
|
||||
public void setDeliveryMode(Integer deliveryMode) {
|
||||
this.deliveryMode = deliveryMode;
|
||||
}
|
||||
|
||||
public Integer getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
public void setPriority(Integer priority) {
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
public Long getTimeToLive() {
|
||||
return timeToLive;
|
||||
}
|
||||
|
||||
public void setTimeToLive(Long timeToLive) {
|
||||
this.timeToLive = timeToLive;
|
||||
}
|
||||
|
||||
public Queue toJmsQueue(Session session) throws JMSException {
|
||||
return session.createQueue(this.queueName);
|
||||
}
|
||||
|
||||
public Topic toJmsTopic(Session session) throws JMSException {
|
||||
return session.createTopic(this.queueName);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,354 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
import org.activiti.bpmn.model.BpmnModel;
|
||||
import org.activiti.bpmn.model.FlowElement;
|
||||
import org.activiti.bpmn.model.SequenceFlow;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
import org.activiti.bpmn.model.StartEvent;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.JavaDelegate;
|
||||
import org.activiti.engine.delegate.event.ActivitiActivityEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEvent;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventListener;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventType;
|
||||
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
|
||||
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
|
||||
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
|
||||
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
|
||||
import org.activiti.engine.repository.ProcessDefinition;
|
||||
import org.activiti.engine.repository.ProcessDefinitionQuery;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.event.ApplicationContextEvent;
|
||||
import org.springframework.context.event.ContextClosedEvent;
|
||||
import org.springframework.context.event.ContextRefreshedEvent;
|
||||
import org.springframework.context.event.ContextStartedEvent;
|
||||
import org.springframework.context.event.ContextStoppedEvent;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.activiti.domain.idm.Tenant;
|
||||
|
||||
@Component
|
||||
public class MQProcessDefinitionMonitor implements ActivitiEventListener, ApplicationListener<ApplicationContextEvent> {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(MQProcessDefinitionMonitor.class);
|
||||
private final Pattern expressionPattern = Pattern.compile("\\$\\{(.+)\\}");
|
||||
|
||||
@Autowired
|
||||
private ProcessEngine services;
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext context;
|
||||
|
||||
@Autowired
|
||||
private TenantFinderService tenantFinderService;
|
||||
|
||||
private Map<String, AbstractActivityListener> activeListeners = new HashMap<>();
|
||||
|
||||
/**
|
||||
* This method will add/remove listeners to the specific process
|
||||
* definitions that use MQ subscriptions to start process instances at
|
||||
* application startup and shutdown.
|
||||
*/
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationContextEvent event) {
|
||||
if (event instanceof ContextRefreshedEvent || event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent) {
|
||||
this.logger.debug("Discovered {} active process definitions to stop listening to", this.activeListeners.size());
|
||||
this.deloopMqSubscribeTasks();
|
||||
}
|
||||
|
||||
if (event instanceof ContextRefreshedEvent || event instanceof ContextStartedEvent) {
|
||||
Tenant tenant = this.tenantFinderService.findTenant();
|
||||
|
||||
ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery().active();
|
||||
if (tenant == null) {
|
||||
procDefQuery.processDefinitionWithoutTenantId();
|
||||
} else {
|
||||
String tenantId = this.tenantFinderService.transform(tenant.getId());
|
||||
procDefQuery.processDefinitionTenantId(tenantId);
|
||||
}
|
||||
|
||||
List<ProcessDefinition> procDefs = procDefQuery.list();
|
||||
this.logger.debug("Found {} active process definitions", procDefs.size());
|
||||
for (ProcessDefinition procDef : procDefs) {
|
||||
this.logger.debug("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId());
|
||||
|
||||
ServiceTask task = this.findMqStartSubscribeTask(procDef.getId());
|
||||
if (task == null)
|
||||
continue;
|
||||
|
||||
this.loopMqSubscribeTask(procDef.getId(), task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will start monitoring for changes in deployment, app, and
|
||||
* process definitions.
|
||||
*/
|
||||
@PostConstruct
|
||||
protected void init() {
|
||||
this.services.getRuntimeService().addEventListener(this,
|
||||
ActivitiEventType.ENTITY_INITIALIZED,
|
||||
ActivitiEventType.ENTITY_UPDATED,
|
||||
ActivitiEventType.ENTITY_ACTIVATED,
|
||||
ActivitiEventType.ENTITY_SUSPENDED,
|
||||
ActivitiEventType.ENTITY_DELETED);
|
||||
this.logger.debug("Started listening for entity events");
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will stop monitoring for changes in deployment, app, and
|
||||
* process definitions.
|
||||
*/
|
||||
@PreDestroy
|
||||
protected void uninit() {
|
||||
this.logger.debug("Stopping listening for entity events ...");
|
||||
this.services.getRuntimeService().removeEventListener(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is fired by the Activiti platform. It is called on every
|
||||
* entity event except the creation event. You can see that in the init()
|
||||
* method above.
|
||||
*/
|
||||
@Override
|
||||
public void onEvent(ActivitiEvent event) {
|
||||
this.logger.trace("Triggered by event: {}", event);
|
||||
this.onEntityEvent((ActivitiEntityEvent) event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFailOnException() {
|
||||
return true;
|
||||
}
|
||||
|
||||
protected void onEntityEvent(ActivitiEntityEvent aaevent) {
|
||||
this.logger.trace("Triggered by entity: {}", aaevent.getEntity());
|
||||
|
||||
if (aaevent.getEntity() instanceof ProcessDefinitionEntity) {
|
||||
this.logger.debug("Triggered by process definition state change: {}", aaevent.getEntity());
|
||||
|
||||
switch (aaevent.getType()) {
|
||||
case ENTITY_INITIALIZED:
|
||||
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
|
||||
// we must use DeploymentEntity and then dig down for the process definitions
|
||||
break;
|
||||
case ENTITY_DELETED:
|
||||
case ENTITY_SUSPENDED:
|
||||
// we need to stop the listener
|
||||
this.onProcessDefinitionRemoveEvent((ProcessDefinitionEntity) aaevent.getEntity());
|
||||
break;
|
||||
default:
|
||||
// we need to start a listener
|
||||
this.onProcessDefinitionAddEvent((ProcessDefinitionEntity) aaevent.getEntity());
|
||||
}
|
||||
// we only want to deal with ProcessDefinition entities
|
||||
} else if (aaevent.getEntity() instanceof DeploymentEntity) {
|
||||
switch (aaevent.getType()) {
|
||||
case ENTITY_INITIALIZED:
|
||||
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
|
||||
// we must use DeploymentEntity and then dig down for the process definitions
|
||||
this.onDeploymentAddEvent((DeploymentEntity) aaevent.getEntity());
|
||||
break;
|
||||
default:
|
||||
}
|
||||
// we only want to deal with ProcessDefinition entities
|
||||
}
|
||||
}
|
||||
|
||||
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
|
||||
this.logger.debug("Triggered by process definition addition: {}", entity);
|
||||
|
||||
ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
|
||||
if (task == null)
|
||||
return;
|
||||
|
||||
this.loopMqSubscribeTask(entity.getId(), task);
|
||||
}
|
||||
|
||||
protected void onDeploymentAddEvent(DeploymentEntity entity) {
|
||||
this.logger.debug("Triggered by deployment addition: {}", entity);
|
||||
|
||||
List<ProcessDefinitionEntity> procDefEntities = entity.getDeployedArtifacts(ProcessDefinitionEntity.class);
|
||||
if (procDefEntities == null)
|
||||
return;
|
||||
this.logger.debug("Found {} process definitions in deployment: {}", procDefEntities.size(), entity.getId());
|
||||
|
||||
for (ProcessDefinitionEntity procDefEntity : procDefEntities) {
|
||||
this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName());
|
||||
|
||||
ServiceTask task = this.findMqStartSubscribeTask(procDefEntity.getId());
|
||||
if (task == null)
|
||||
return;
|
||||
|
||||
this.loopMqSubscribeTask(procDefEntity.getId(), task);
|
||||
}
|
||||
}
|
||||
|
||||
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
|
||||
this.logger.debug("Triggered by process definition removal: {}", entity);
|
||||
this.deloopMqSubscribeTask(entity.getId());
|
||||
}
|
||||
|
||||
protected synchronized void loopMqSubscribeTask(String processDefId, ServiceTask task) {
|
||||
// start a process instance on the process
|
||||
this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue", processDefId);
|
||||
this.services.getRuntimeService().startProcessInstanceById(processDefId);
|
||||
|
||||
if (this.activeListeners.containsKey(processDefId)) {
|
||||
this.logger.debug("The process definition already has a looping listener: {}", processDefId);
|
||||
return;
|
||||
}
|
||||
|
||||
AbstractActivityListener listener = new AbstractActivityListener(processDefId, task) {
|
||||
@Override
|
||||
public void onEvent(ActivitiActivityEvent event) {
|
||||
logger.debug("MQ message received; starting another process instance on process '{}' to re-subscribe to the MQ queue", event.getProcessDefinitionId());
|
||||
services.getRuntimeService().startProcessInstanceById(processDefId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFailOnException() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
// the ServiceTask will then execute, waiting for a message on the queue
|
||||
// when a message is received, it will be responsible for starting a new process instance
|
||||
// that new process instance will wait for the next message on the queue
|
||||
// repeat
|
||||
this.services.getRuntimeService().addEventListener(listener, ActivitiEventType.ACTIVITY_COMPLETED);
|
||||
|
||||
// register the listener so we can possibly remove it later
|
||||
this.activeListeners.put(processDefId, listener);
|
||||
}
|
||||
|
||||
protected synchronized int deloopMqSubscribeTasks() {
|
||||
int count = 0;
|
||||
|
||||
Iterator<Entry<String, AbstractActivityListener>> i = this.activeListeners.entrySet().iterator();
|
||||
while (i.hasNext()) {
|
||||
Entry<String, AbstractActivityListener> listener = i.next();
|
||||
|
||||
this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", listener.getKey());
|
||||
this.services.getRuntimeService().removeEventListener(listener.getValue());
|
||||
|
||||
i.remove();
|
||||
count++;
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
protected synchronized boolean deloopMqSubscribeTask(String processDefId) {
|
||||
AbstractActivityListener listener = this.activeListeners.remove(processDefId);
|
||||
if (listener == null) {
|
||||
this.logger.trace("No MQ subscription listener found; no listener to stop");
|
||||
return false;
|
||||
}
|
||||
|
||||
this.logger.debug("An MQ subscription listener was found; removing/stopping: {}", processDefId);
|
||||
this.services.getRuntimeService().removeEventListener(listener);
|
||||
return true;
|
||||
}
|
||||
|
||||
protected ServiceTask findMqStartSubscribeTask(String processDefId) {
|
||||
// we cannot use the RepositoryService to get the BpmnModel as it is not yet cached in TX
|
||||
BpmnModel model = ProcessDefinitionUtil.getBpmnModel(processDefId);
|
||||
StartEvent startElement = this.findMqPlainStartEvent(processDefId, model);
|
||||
if (startElement == null)
|
||||
return null;
|
||||
|
||||
List<SequenceFlow> flows = startElement.getOutgoingFlows();
|
||||
this.logger.trace("Found {} outgoing flows in the process start element: {}: {}", flows.size(), processDefId, startElement.getId());
|
||||
if (flows.isEmpty()) {
|
||||
this.logger.warn("A start event is expected to always have an outgoing flow; skipping process: {}", processDefId);
|
||||
return null;
|
||||
} else if (flows.size() > 1) {
|
||||
this.logger.debug("A start event with multiple outgoing flows cannot have an MQ subscription; skipping process: {}", processDefId);
|
||||
return null;
|
||||
}
|
||||
|
||||
SequenceFlow flow = flows.iterator().next();
|
||||
FlowElement targetFlowElement = flow.getTargetFlowElement();
|
||||
if (targetFlowElement == null) {
|
||||
this.logger.warn("A start event outgoing flow is expected to always have a target element; skipping process: {}", processDefId);
|
||||
return null;
|
||||
}
|
||||
this.logger.trace("Found element '{}' after start event '{}' in process '{}'", targetFlowElement.getId(), startElement.getId(), processDefId);
|
||||
|
||||
if (!(targetFlowElement instanceof ServiceTask)) {
|
||||
this.logger.trace("A non-service task immediately after start event; skipping process: {}: {}: {}", processDefId, targetFlowElement.getId(), targetFlowElement.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
ServiceTask task = (ServiceTask) targetFlowElement;
|
||||
if (!"delegateExpression".equals(task.getImplementationType())) {
|
||||
this.logger.trace("A non-delegate service task immediately after start event; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
|
||||
return null;
|
||||
}
|
||||
this.logger.trace("Found service task immediately after start event: {}: {}", task.getImplementationType(), task.getImplementation());
|
||||
|
||||
Matcher matcher = this.expressionPattern.matcher(task.getImplementation());
|
||||
if (!matcher.find()) {
|
||||
this.logger.warn("A service task delegate expression is in an unexpected format: {}; skipping process: {}: {}: {}", task.getImplementation(), processDefId, task.getId(), task.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
String beanId = matcher.group(1).trim();
|
||||
this.logger.trace("Looking up bean: {}", beanId);
|
||||
|
||||
JavaDelegate delegateBean = this.context.getBean(beanId, JavaDelegate.class);
|
||||
this.logger.trace("Found bean: {}: {}", beanId, delegateBean);
|
||||
if (delegateBean == null) {
|
||||
this.logger.trace("The service task delegate has no bean; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
|
||||
return null;
|
||||
} else if (!(delegateBean instanceof MqSubscribeDelegate)) {
|
||||
this.logger.trace("The service task delegate is not an MQ subscription; skipping process: {}: {}: {}", processDefId, task.getId(), task.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
this.logger.debug("Process starts with an MQ subscription: {}: {}", processDefId, task.getId(), task.getName());
|
||||
return task;
|
||||
}
|
||||
|
||||
protected StartEvent findMqPlainStartEvent(String processDefId, BpmnModel model) {
|
||||
this.logger.trace("Finding all start elements in process: {}", processDefId);
|
||||
List<StartEvent> startElements = model.getMainProcess().findFlowElementsOfType(StartEvent.class);
|
||||
this.logger.trace("Found {} start elements in process: {}", startElements.size(), processDefId);
|
||||
for (StartEvent startElement : startElements) {
|
||||
// constrain to just 0..1 plain start events; ignore timer, signal, message, and error start events
|
||||
if (!(startElement.getBehavior() instanceof NoneStartEventActivityBehavior)) {
|
||||
this.logger.trace("Found non-plain start event; ignoring start event: {}: {}: {}", processDefId, startElement.getId(), startElement.getBehavior());
|
||||
continue;
|
||||
}
|
||||
|
||||
return startElement;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
protected ServiceTask castToServiceTask(FlowElement flowElement) {
|
||||
if (!(flowElement instanceof ServiceTask))
|
||||
return null;
|
||||
return (ServiceTask) flowElement;
|
||||
}
|
||||
|
||||
}
|
9
src/main/java/com/inteligr8/activiti/mq/Message.java
Normal file
9
src/main/java/com/inteligr8/activiti/mq/Message.java
Normal file
@ -0,0 +1,9 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
public interface Message {
|
||||
|
||||
<T> T getProperty(String key);
|
||||
|
||||
void setProperty(String key, Object value);
|
||||
|
||||
}
|
@ -0,0 +1,69 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.activiti.domain.idm.EndpointConfiguration;
|
||||
import com.inteligr8.activiti.mq.amqp.AmqpCommunicator;
|
||||
import com.inteligr8.activiti.mq.jms.JmsCommunicator;
|
||||
|
||||
@Component
|
||||
public class MqCommunicationService {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
private Map<String, MqCommunicator> communicators = new HashMap<>();
|
||||
|
||||
public MqCommunicator getCommunicator(EndpointConfiguration endpoint) throws JMSException {
|
||||
return this.getCommunicator(
|
||||
endpoint.getProtocol().getProtocol(),
|
||||
endpoint.getUrl(),
|
||||
endpoint.getBasicAuth().getUsername(),
|
||||
endpoint.getBasicAuth().getPassword());
|
||||
}
|
||||
|
||||
public MqCommunicator getCommunicator(URI url, String username, String password) throws JMSException {
|
||||
return this.getCommunicator(
|
||||
url.getScheme(),
|
||||
url.toString(),
|
||||
username,
|
||||
password);
|
||||
}
|
||||
|
||||
protected synchronized MqCommunicator getCommunicator(String protocol, String url, String username, String password) throws JMSException {
|
||||
String key = url + "@" + username;
|
||||
MqCommunicator communicator = this.communicators.get(key);
|
||||
if (communicator == null) {
|
||||
switch (protocol.toLowerCase()) {
|
||||
case "ampq":
|
||||
this.logger.debug("Creating AMQP connector: {}", url);
|
||||
communicator = new AmqpCommunicator(url, username, password);
|
||||
break;
|
||||
case "failover":
|
||||
case "tcp":
|
||||
case "http":
|
||||
this.logger.debug("Creating TCP OpenWire connector: {}", url);
|
||||
communicator = new JmsCommunicator(url, username, password);
|
||||
break;
|
||||
case "ssl":
|
||||
case "https":
|
||||
this.logger.debug("Creating SSL OpenWire connector: {}", url);
|
||||
communicator = new JmsCommunicator(url, username, password);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("The " + protocol + " is not supported by this library");
|
||||
}
|
||||
|
||||
this.communicators.put(key, communicator);
|
||||
}
|
||||
|
||||
return communicator;
|
||||
}
|
||||
|
||||
}
|
46
src/main/java/com/inteligr8/activiti/mq/MqCommunicator.java
Normal file
46
src/main/java/com/inteligr8/activiti/mq/MqCommunicator.java
Normal file
@ -0,0 +1,46 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
public interface MqCommunicator {
|
||||
|
||||
boolean validateConnection();
|
||||
|
||||
<BodyType> PreparedMessage<BodyType> createPreparedMessage();
|
||||
|
||||
<BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException;
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, null, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, timeoutInMillis, null, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, String correlationId) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, correlationId, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, timeoutInMillis, correlationId, null);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, null, handler);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, timeoutInMillis, null, handler);
|
||||
}
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, correlationId, handler);
|
||||
}
|
||||
|
||||
<BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException;
|
||||
|
||||
}
|
175
src/main/java/com/inteligr8/activiti/mq/MqDelegateExecution.java
Normal file
175
src/main/java/com/inteligr8/activiti/mq/MqDelegateExecution.java
Normal file
@ -0,0 +1,175 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.annotation.OverridingMethodsMustInvokeSuper;
|
||||
|
||||
import org.activiti.bpmn.model.FieldExtension;
|
||||
import org.activiti.bpmn.model.FlowElement;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
import org.activiti.engine.delegate.Expression;
|
||||
import org.activiti.engine.impl.context.Context;
|
||||
import org.activiti.engine.impl.el.ExpressionManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MqDelegateExecution {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
protected final ProcessEngine services;
|
||||
protected final DelegateExecution execution;
|
||||
protected final ServiceTask task;
|
||||
protected final Map<String, FieldExtension> fieldMap = new HashMap<>();
|
||||
private boolean metadataToProcessScope = false;
|
||||
|
||||
public MqDelegateExecution(ProcessEngine services, DelegateExecution execution) {
|
||||
this.services = services;
|
||||
this.execution = execution;
|
||||
|
||||
FlowElement flowElement = execution.getCurrentFlowElement();
|
||||
if (!(flowElement instanceof ServiceTask))
|
||||
throw new IllegalStateException("This should never happen");
|
||||
this.task = (ServiceTask) flowElement;
|
||||
this.logger.trace("Discovered task: {}: {}", this.task.getId(), this.task.getName());
|
||||
|
||||
this.logger.trace("Indexing {} fields", this.task.getFieldExtensions().size());
|
||||
for (FieldExtension field : this.task.getFieldExtensions()) {
|
||||
this.logger.trace("Discovering field: {}: {}: {}", field.getId(), field.getFieldName(), field.getStringValue());
|
||||
|
||||
switch (field.getFieldName()) {
|
||||
case Constants.FIELD_METADATA_PROCESS_SCOPE:
|
||||
this.metadataToProcessScope = Boolean.valueOf(field.getStringValue());
|
||||
break;
|
||||
default:
|
||||
this.fieldMap.put(field.getFieldName(), field);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OverridingMethodsMustInvokeSuper
|
||||
public void validate() {
|
||||
if (this.fieldMap.get(Constants.FIELD_CONNECTOR_ID) == null)
|
||||
throw new IllegalStateException("The '" + this.execution.getCurrentActivityId() + "' activity must define an 'MQ Connector ID'");
|
||||
if (this.fieldMap.get(Constants.FIELD_QUEUE_NAME) == null)
|
||||
throw new IllegalStateException("The '" + this.execution.getCurrentActivityId() + "' activity must define an 'MQ Queue Name'");
|
||||
}
|
||||
|
||||
public String getTaskId() {
|
||||
return this.task.getId();
|
||||
}
|
||||
|
||||
public boolean doWriteToProcessScope() {
|
||||
return this.metadataToProcessScope;
|
||||
}
|
||||
|
||||
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type) {
|
||||
return this.getFieldValueFromModel(fieldName, type, false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type, boolean forceExpressionProcessing) {
|
||||
FieldExtension field = this.fieldMap.get(fieldName);
|
||||
if (field == null) {
|
||||
return null;
|
||||
} else if (field.getExpression() != null && field.getExpression().length() > 0) {
|
||||
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||
Expression expr = exprman.createExpression(field.getExpression());
|
||||
return (T) expr.getValue(this.execution);
|
||||
} else if (forceExpressionProcessing) {
|
||||
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||
Expression expr = exprman.createExpression(field.getStringValue());
|
||||
return (T) expr.getValue(this.execution);
|
||||
} else {
|
||||
return (T) field.getStringValue();
|
||||
}
|
||||
}
|
||||
|
||||
public String getConnectorIdFromModel() {
|
||||
return this.getFieldValueFromModel(Constants.FIELD_CONNECTOR_ID, String.class);
|
||||
}
|
||||
|
||||
public String getQueueNameFromModel() {
|
||||
return this.getFieldValueFromModel(Constants.FIELD_QUEUE_NAME, String.class);
|
||||
}
|
||||
|
||||
public String getMessageNameFromModel() {
|
||||
return this.getFieldValueFromModel(Constants.FIELD_MESSAGE_NAME, String.class);
|
||||
}
|
||||
|
||||
public String getPayloadFromModel() {
|
||||
return this.getFieldValueFromModel(Constants.FIELD_PAYLOAD, String.class, true);
|
||||
}
|
||||
|
||||
public String getPayloadMimeTypeFromModel() {
|
||||
return this.getFieldValueFromModel(Constants.FIELD_PAYLOAD_MIME_TYPE, String.class);
|
||||
}
|
||||
|
||||
protected void setMqVariable(String varName, Object value) {
|
||||
varName = this.formulateVariableName(varName);
|
||||
if (this.doWriteToProcessScope()) {
|
||||
this.execution.setVariable(varName, value);
|
||||
} else {
|
||||
this.execution.setVariableLocal(varName, value);
|
||||
}
|
||||
}
|
||||
|
||||
public String formulateVariableName(String varName) {
|
||||
if (this.getMessageNameFromModel() != null)
|
||||
varName += "_" + this.getMessageNameFromModel();
|
||||
return varName;
|
||||
}
|
||||
|
||||
public String getCorrelationId() {
|
||||
return this.execution.getVariable(this.formulateVariableName(Constants.VARIABLE_CORRELATION_ID), String.class);
|
||||
}
|
||||
|
||||
public void setCorrelationId(String correlationId) {
|
||||
this.execution.setVariable(this.formulateVariableName(Constants.VARIABLE_CORRELATION_ID), correlationId);
|
||||
}
|
||||
|
||||
public void setMessageId(String messageId) {
|
||||
this.setMqVariable(Constants.VARIABLE_MESSAGE_ID, messageId);
|
||||
}
|
||||
|
||||
public void setDeliveryTime(OffsetDateTime deliveryTime) {
|
||||
if (deliveryTime != null)
|
||||
this.setMqVariable(Constants.VARIABLE_DELIVERY_TIME, Date.from(deliveryTime.toInstant()));
|
||||
}
|
||||
|
||||
public void setPriority(Integer priority) {
|
||||
if (priority != null)
|
||||
this.setMqVariable(Constants.VARIABLE_PRIORITY, priority);
|
||||
}
|
||||
|
||||
public void setReplyToQueueName(String replyToQueueName) {
|
||||
if (replyToQueueName != null)
|
||||
this.setMqVariable(Constants.VARIABLE_REPLY_QUEUE_NAME, replyToQueueName);
|
||||
}
|
||||
|
||||
public void setStatusQueueName(String statusQueueName) {
|
||||
if (statusQueueName != null)
|
||||
this.setMqVariable(Constants.VARIABLE_STATUS_QUEUE_NAME, statusQueueName);
|
||||
}
|
||||
|
||||
public void setPayload(String payload, String mimeType) {
|
||||
this.setPayload(payload);
|
||||
this.setPayloadMimeType(mimeType);
|
||||
}
|
||||
|
||||
public void setPayload(String payload) {
|
||||
if (payload != null)
|
||||
this.setMqVariable(Constants.VARIABLE_PAYLOAD, payload);
|
||||
}
|
||||
|
||||
public void setPayloadMimeType(String payloadMimeType) {
|
||||
if (payloadMimeType != null)
|
||||
this.setMqVariable(Constants.VARIABLE_PAYLOAD_MIME_TYPE, payloadMimeType);
|
||||
}
|
||||
|
||||
}
|
@ -1,69 +0,0 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
import org.activiti.engine.delegate.JavaDelegate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.activiti.domain.idm.EndpointConfiguration;
|
||||
import com.activiti.service.api.EndpointService;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.BasicProperties;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
|
||||
@Component
|
||||
public class MqPublishActivity implements JavaDelegate {
|
||||
|
||||
@Autowired
|
||||
private ProcessEngine services;
|
||||
|
||||
@Autowired
|
||||
private EndpointService endpointService;
|
||||
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
MqPublishActivityExecution mqExecution = new MqPublishActivityExecution(this.services, execution);
|
||||
mqExecution.validate();
|
||||
|
||||
try {
|
||||
Connection mqcon = this.getConnection(mqExecution.getMqConnectorId());
|
||||
Channel mqchannel = this.getChannel(mqcon);
|
||||
|
||||
BasicProperties props = new AMQP.BasicProperties();
|
||||
mqchannel.basicPublish(mqExecution.getMqOutgoingQueue(), null, null, message.toByteArray());
|
||||
} catch (TimeoutException te) {
|
||||
} catch (IOException ie) {
|
||||
}
|
||||
}
|
||||
|
||||
private Connection getConnection(String connectorId) throws TimeoutException, IOException {
|
||||
EndpointConfiguration endpointConfig = this.endpointService.getConfigurationByName(connectorId);
|
||||
if (endpointConfig == null)
|
||||
throw new IllegalStateException("The '" + connectorId + "' endpoint is not defined");
|
||||
|
||||
ConnectionFactory conFactory = new ConnectionFactory();
|
||||
conFactory.setUsername(endpointConfig.getBasicAuth().getUsername());
|
||||
conFactory.setPassword(endpointConfig.getBasicAuth().getPassword());
|
||||
try {
|
||||
conFactory.setUri(endpointConfig.getUrl());
|
||||
return conFactory.newConnection();
|
||||
} catch (KeyManagementException kme) {
|
||||
} catch (NoSuchAlgorithmException nsae) {
|
||||
} catch (URISyntaxException use) {
|
||||
}
|
||||
}
|
||||
|
||||
private Channel getChannel(Connection mqcon) throws IOException {
|
||||
return mqcon.createChannel();
|
||||
}
|
||||
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.activiti.bpmn.model.FieldExtension;
|
||||
import org.activiti.bpmn.model.FlowElement;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
|
||||
public class MqPublishActivityExecution {
|
||||
|
||||
private final ProcessEngine services;
|
||||
private final DelegateExecution execution;
|
||||
private final ServiceTask task;
|
||||
private final Map<String, String> fieldMap = new HashMap<String, String>();
|
||||
|
||||
public MqPublishActivityExecution(ProcessEngine services, DelegateExecution execution) {
|
||||
this.services = services;
|
||||
this.execution = execution;
|
||||
|
||||
FlowElement flowElement = this.execution.getCurrentFlowElement();
|
||||
if (!(flowElement instanceof ServiceTask))
|
||||
throw new IllegalStateException("This should never happen");
|
||||
this.task = (ServiceTask) flowElement;
|
||||
|
||||
for (FieldExtension field : this.task.getFieldExtensions())
|
||||
this.fieldMap.put(field.getId(), field.getStringValue());
|
||||
}
|
||||
|
||||
public void validate() {
|
||||
if (this.fieldMap.get("mqConnectorId") == null)
|
||||
throw new IllegalStateException("The '" + this.execution.getCurrentActivityId() + "' activity must define an 'MQ Connector ID'");
|
||||
if (this.fieldMap.get("mqOutgoingQueue") == null)
|
||||
throw new IllegalStateException("The '" + this.execution.getCurrentActivityId() + "' activity must define an 'MQ Outgoing Queue'");
|
||||
}
|
||||
|
||||
public String getMqConnectorId() {
|
||||
return this.fieldMap.get("mqConnectorId");
|
||||
}
|
||||
|
||||
public String getMqOutgoingQueue() {
|
||||
return this.fieldMap.get("mqOutgoingQueue");
|
||||
}
|
||||
|
||||
public String getMqOutgoingReplyQueue() {
|
||||
return this.fieldMap.get("mqOutgoingReplyQueue");
|
||||
}
|
||||
|
||||
}
|
102
src/main/java/com/inteligr8/activiti/mq/MqPublishDelegate.java
Normal file
102
src/main/java/com/inteligr8/activiti/mq/MqPublishDelegate.java
Normal file
@ -0,0 +1,102 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.BpmnError;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* This delegate sends a message to an MQ queue.
|
||||
*
|
||||
* @author brian@inteligr8.com
|
||||
* @since 1.0
|
||||
* @see mqSubscribeReplyDelegate#
|
||||
* @see mqSubscribeDelegate#
|
||||
*/
|
||||
@Component(MqPublishDelegate.BEAN_ID)
|
||||
public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
|
||||
public static final String BEAN_ID = "mqPublishDelegate";
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
private ProcessEngine services;
|
||||
|
||||
/**
|
||||
* This method sends a message to an MQ queue.
|
||||
*
|
||||
* It does not wait for a response and will continue after a successful
|
||||
* publish of the message to the specified queue. Use
|
||||
* `mqSubscribeReplyDelegate` in a subsequent task to handle the response
|
||||
* asynchronously.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this and the corresponding `mqPublishDelegate`
|
||||
* task.
|
||||
*
|
||||
* @param execution An Activiti delegate execution (source task or execution/task listener).
|
||||
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_payload [optional] The body of the MQ message. May include expressions.
|
||||
* @field mq_payloadMimeType [optional] The MIME type of the body of the MQ message.
|
||||
* @field mq_replyQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send a reply.
|
||||
* @field mq_statusQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send status updates.
|
||||
* @varOut mq_correlationId The correlationId of the message sent.
|
||||
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
|
||||
* @throws BPMNError network The MQ connection experienced network issues.
|
||||
* @throws BPMNError mq An unknown MQ issue occurred.
|
||||
* @see mqSubscribeReplyDelegate#delegate
|
||||
* @see mqSubscribeDelegate#delegate
|
||||
*/
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
MqPublishDelegateExecution mqExecution = new MqPublishDelegateExecution(this.services, execution);
|
||||
mqExecution.validate();
|
||||
|
||||
GenericDestination destination = new GenericDestination();
|
||||
destination.setQueueName(mqExecution.getQueueNameFromModel());
|
||||
|
||||
try {
|
||||
MqCommunicator communicator = this.getConnection(mqExecution.getConnectorIdFromModel());
|
||||
|
||||
PreparedMessage<String> message = communicator.createPreparedMessage();
|
||||
if (mqExecution.getStatusQueueNameFromModel() != null)
|
||||
message.setProperty("inteligr8.statusQueueName", mqExecution.getStatusQueueNameFromModel());
|
||||
message.setReplyToQueueName(mqExecution.getReplyQueueNameFromModel());
|
||||
|
||||
String payload = mqExecution.getPayloadFromModel();
|
||||
if (payload != null) {
|
||||
String payloadMimeType = mqExecution.getPayloadMimeTypeFromModel();
|
||||
if (payloadMimeType != null) {
|
||||
message.setContent(payload, payloadMimeType);
|
||||
} else {
|
||||
message.setContent(payload);
|
||||
}
|
||||
}
|
||||
|
||||
String correlationId = communicator.send(destination, message);
|
||||
this.logger.debug("Sent MQ message: {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), correlationId);
|
||||
|
||||
mqExecution.setCorrelationId(correlationId);
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
} catch (IOException ie) {
|
||||
this.logger.error("MQ connection or network communication failed: " + ie.getMessage(), ie);
|
||||
throw new BpmnError("network", "MQ connection or network communication failed: " + ie.getMessage());
|
||||
} catch (JMSException je) {
|
||||
this.logger.error("JMS communication failed: " + je.getMessage(), je);
|
||||
throw new BpmnError("mq", "JMS communication failed: " + je.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
|
||||
public class MqPublishDelegateExecution extends MqDelegateExecution {
|
||||
|
||||
public MqPublishDelegateExecution(ProcessEngine services, DelegateExecution execution) {
|
||||
super(services, execution);
|
||||
}
|
||||
|
||||
public String getReplyQueueNameFromModel() {
|
||||
return (String) this.getFieldValueFromModel(Constants.FIELD_REPLY_QUEUE_NAME, String.class);
|
||||
}
|
||||
|
||||
public String getStatusQueueNameFromModel() {
|
||||
return (String) this.getFieldValueFromModel(Constants.FIELD_STATUS_QUEUE_NAME, String.class);
|
||||
}
|
||||
|
||||
}
|
107
src/main/java/com/inteligr8/activiti/mq/MqSubscribeDelegate.java
Normal file
107
src/main/java/com/inteligr8/activiti/mq/MqSubscribeDelegate.java
Normal file
@ -0,0 +1,107 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.BpmnError;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* This delegate listens for messages on an MQ queue.
|
||||
*
|
||||
* @author brian@inteligr8.com
|
||||
* @since 1.0
|
||||
* @see mqPublishDelegate#
|
||||
* @see mqSubscribeReplyDelegate#
|
||||
*/
|
||||
@Component(MqSubscribeDelegate.BEAN_ID)
|
||||
public class MqSubscribeDelegate extends AbstractMqDelegate {
|
||||
|
||||
public static final String BEAN_ID = "mqSubscribeDelegate";
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
private ProcessEngine services;
|
||||
|
||||
/**
|
||||
* This delegate listens for messages on an MQ queue.
|
||||
*
|
||||
* It does not wait for a response to a specific message put on an MQ queue.
|
||||
* That is for the `mqPublishDelegate` and `mqSubscribeReplyDelegate` tasks.
|
||||
*
|
||||
* When used, this task must be the first task after a plain start event in a
|
||||
* process. If used in any other way, it will error and the process will fail
|
||||
* validation.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this task. The resultant variables will then
|
||||
* include that message name (e.g. `mq_messageId_{mq_messageName}`).
|
||||
*
|
||||
* TODO map response to variables
|
||||
*
|
||||
* @param execution An Activiti delegate execution (source task or execution/task listener).
|
||||
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @varOut mq_correlationId The correlating identifier of the message received. Use to correlate between sent/received messages; like a thread of communication.
|
||||
* @varOut mq_messageId The unique message identifer of the message received.
|
||||
* @varOut mq_deliveryTime The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti.
|
||||
* @varOut mq_priority An integer priority of the message; value depends on MQ protocol.
|
||||
* @varOut mq_payload The body of the MQ message. May include expressions. May be `null`.
|
||||
* @varOut mq_payloadMimeType The MIME type of the body of the MQ message. May be `null`.
|
||||
* @varOut mq_replyQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
|
||||
* @throws BPMNError network The MQ connection experienced network issues.
|
||||
* @throws BPMNError mq An unknown MQ issue occurred.
|
||||
* @see mqPublishDelegate#delegate
|
||||
* @see mqSubscribeReplyDelegate#delegate
|
||||
*/
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, execution);
|
||||
mqExecution.validate();
|
||||
|
||||
GenericDestination destination = new GenericDestination();
|
||||
destination.setQueueName(mqExecution.getQueueNameFromModel());
|
||||
|
||||
this.logger.debug("Will look for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
|
||||
|
||||
try {
|
||||
MqCommunicator communicator = this.getConnection(mqExecution.getConnectorIdFromModel());
|
||||
communicator.receive(destination, new TransactionalMessageHandler<String>() {
|
||||
@Override
|
||||
public void onMessage(DeliveredMessage<String> message) throws IOException {
|
||||
logger.debug("Received MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), message.getCorrelationId(), message.getMessageId());
|
||||
|
||||
if (message.getMessageId() != null)
|
||||
mqExecution.setMqVariable(Constants.VARIABLE_MESSAGE_ID, message.getMessageId());
|
||||
mqExecution.setCorrelationId(message.getCorrelationId());
|
||||
mqExecution.setDeliveryTime(message.getDeliveryTime());
|
||||
mqExecution.setPriority(message.getPriority());
|
||||
mqExecution.setPayload(message.getContent(), message.getContentType());
|
||||
mqExecution.setReplyToQueueName(message.getReplyToQueueName());
|
||||
mqExecution.setStatusQueueName(message.getStatusQueueName());
|
||||
}
|
||||
});
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
} catch (IOException ie) {
|
||||
this.logger.error("MQ connection or network communication failed: " + ie.getMessage(), ie);
|
||||
throw new BpmnError("network", "MQ connection or network communication failed: " + ie.getMessage());
|
||||
} catch (JMSException je) {
|
||||
this.logger.error("JMS communication failed: " + je.getMessage(), je);
|
||||
throw new BpmnError("mq", "JMS communication failed: " + je.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
|
||||
public class MqSubscribeDelegateExecution extends MqDelegateExecution {
|
||||
|
||||
public MqSubscribeDelegateExecution(ProcessEngine services, DelegateExecution execution) {
|
||||
super(services, execution);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,107 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.BpmnError;
|
||||
import org.activiti.engine.delegate.DelegateExecution;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* This delegate listens for a reply message on an MQ queue.
|
||||
*
|
||||
* @author brian@inteligr8.com
|
||||
* @since 1.0
|
||||
* @see mqPublishDelegate#
|
||||
* @see mqSubscribeDelegate#
|
||||
*/
|
||||
@Component(MqSubscribeReplyDelegate.BEAN_ID)
|
||||
public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
|
||||
|
||||
public static final String BEAN_ID = "mqSubscribeReplyDelegate";
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
private ProcessEngine services;
|
||||
|
||||
/**
|
||||
* This method listens for a reply message on an MQ queue.
|
||||
*
|
||||
* It uses the `mq_correlationId` variable to select the corresponding reply
|
||||
* message. That variable is automatically set by the `mqPublishDelegate`
|
||||
* task. This is meant to be used after `mqPublishDelegate` and not just by
|
||||
* itself. If you want to start processes with an MQ subscription see the
|
||||
* `mqSubscribeDelegate` task.
|
||||
*
|
||||
* If you have more than one MQ communication in a single process, set the
|
||||
* `mq_messageName` field on this and the corresponding `mqPublishDelegate`
|
||||
* task.
|
||||
*
|
||||
* TODO map response to variables
|
||||
*
|
||||
* @param execution An Activiti delegate execution (source task or execution/task listener).
|
||||
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @varIn mq_correlationId The correlating identifier of the message to receive. Used to correlate between sent/received messages; like a thread of communication.
|
||||
* @varOut mq_messageId The unique message identifer of the message received.
|
||||
* @varOut mq_deliveryTime The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti.
|
||||
* @varOut mq_priority An integer priority of the message; value depends on MQ protocol.
|
||||
* @varOut mq_payload The body of the MQ message. May include expressions. May be `null`.
|
||||
* @varOut mq_payloadMimeType The MIME type of the body of the MQ message. May be `null`.
|
||||
* @varOut mq_replyQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
|
||||
* @throws BPMNError correlation `mq_correlationId` is required.
|
||||
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
|
||||
* @throws BPMNError network The MQ connection experienced network issues.
|
||||
* @throws BPMNError mq An unknown MQ issue occurred.
|
||||
*/
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, execution);
|
||||
mqExecution.validate();
|
||||
|
||||
GenericDestination destination = new GenericDestination();
|
||||
destination.setQueueName(mqExecution.getQueueNameFromModel());
|
||||
|
||||
String correlationId = mqExecution.getCorrelationId();
|
||||
if (correlationId == null)
|
||||
throw new BpmnError("correlation", "A correlation ID could not be found");
|
||||
this.logger.debug("Will look only for MQ message: {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), correlationId);
|
||||
|
||||
try {
|
||||
MqCommunicator communicator = this.getConnection(mqExecution.getConnectorIdFromModel());
|
||||
communicator.receive(destination, correlationId, new TransactionalMessageHandler<String>() {
|
||||
@Override
|
||||
public void onMessage(DeliveredMessage<String> message) {
|
||||
logger.debug("Received MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), message.getCorrelationId(), message.getMessageId());
|
||||
|
||||
if (message.getMessageId() != null)
|
||||
mqExecution.setMqVariable(Constants.VARIABLE_MESSAGE_ID, message.getMessageId());
|
||||
mqExecution.setDeliveryTime(message.getDeliveryTime());
|
||||
mqExecution.setPriority(message.getPriority());
|
||||
mqExecution.setPayload(message.getContent(), message.getContentType());
|
||||
mqExecution.setReplyToQueueName(message.getReplyToQueueName());
|
||||
mqExecution.setStatusQueueName(message.getStatusQueueName());
|
||||
}
|
||||
});
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
} catch (IOException ie) {
|
||||
this.logger.error("MQ connection or network communication failed: " + ie.getMessage(), ie);
|
||||
throw new BpmnError("network", "MQ connection or network communication failed: " + ie.getMessage());
|
||||
} catch (JMSException je) {
|
||||
this.logger.error("JMS communication failed: " + je.getMessage(), je);
|
||||
throw new BpmnError("mq", "JMS communication failed: " + je.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
74
src/main/java/com/inteligr8/activiti/mq/PreparedMessage.java
Normal file
74
src/main/java/com/inteligr8/activiti/mq/PreparedMessage.java
Normal file
@ -0,0 +1,74 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.annotation.concurrent.NotThreadSafe;
|
||||
|
||||
@NotThreadSafe
|
||||
public interface PreparedMessage<BodyType> extends Message {
|
||||
|
||||
default String getCorrelationId() {
|
||||
return this.getProperty("correlationId");
|
||||
}
|
||||
|
||||
default void setCorrelationId(String correlationId) {
|
||||
this.setProperty("correlationId", correlationId);
|
||||
}
|
||||
|
||||
default Map<String, Object> getProperties() {
|
||||
return this.getProperty("properties");
|
||||
}
|
||||
|
||||
// not thread safe
|
||||
default void setProperty(String key, Object value) {
|
||||
Map<String, Object> properties = this.getProperties();
|
||||
if (properties == null)
|
||||
properties = new HashMap<>();
|
||||
this.setProperty("properties", properties);
|
||||
|
||||
if (value == null) {
|
||||
properties.remove(key);
|
||||
} else {
|
||||
properties.put(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
default String getReplyToQueueName() {
|
||||
return this.getProperty("replyToQueueName");
|
||||
}
|
||||
|
||||
default void setReplyToQueueName(String replyToQueueName) {
|
||||
this.setProperty("replyToQueueName", replyToQueueName);
|
||||
}
|
||||
|
||||
default String getStatusQueueName() {
|
||||
return this.getProperty("statusQueueName");
|
||||
}
|
||||
|
||||
default void setStatusQueueName(String statusQueueName) {
|
||||
this.setProperty("statusQueueName", statusQueueName);
|
||||
}
|
||||
|
||||
default BodyType getContent() {
|
||||
return this.getProperty("content");
|
||||
}
|
||||
|
||||
default void setContent(BodyType content) {
|
||||
this.setProperty("content", content);
|
||||
}
|
||||
|
||||
default String getContentType() {
|
||||
return this.getProperty("contentType");
|
||||
}
|
||||
|
||||
default void setContentType(String contentType) {
|
||||
this.setProperty("contentType", contentType);
|
||||
}
|
||||
|
||||
default void setContent(BodyType content, String contentType) {
|
||||
this.setContent(content);
|
||||
this.setContentType(contentType);
|
||||
}
|
||||
|
||||
}
|
122
src/main/java/com/inteligr8/activiti/mq/TenantFinderService.java
Normal file
122
src/main/java/com/inteligr8/activiti/mq/TenantFinderService.java
Normal file
@ -0,0 +1,122 @@
|
||||
/*
|
||||
* This program is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or (at your
|
||||
* option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||
* more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along
|
||||
* with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.event.ApplicationStartedEvent;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.activiti.domain.idm.Tenant;
|
||||
import com.activiti.service.idm.TenantService;
|
||||
import com.activiti.service.license.LicenseService;
|
||||
|
||||
/**
|
||||
* This service simpler tenant meta-data access.
|
||||
*
|
||||
* @author brian@inteligr8.com
|
||||
*/
|
||||
@Component
|
||||
public class TenantFinderService implements ApplicationListener<ApplicationStartedEvent> {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired(required = false)
|
||||
private LicenseService licenseService;
|
||||
|
||||
@Autowired(required = false)
|
||||
private TenantService tenantService;
|
||||
|
||||
@Value("${inteligr8.mq.tenant:#{null}}")
|
||||
private String tenant;
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationStartedEvent event) {
|
||||
this.logger.debug("context: {}", event.getApplicationContext());
|
||||
Tenant tenant = this.findTenant();
|
||||
if (tenant == null) {
|
||||
this.logger.info("No default tenant");
|
||||
} else {
|
||||
this.logger.info("Default tenant: {}: {}", tenant.getId(), tenant.getName());
|
||||
}
|
||||
}
|
||||
|
||||
public String transform(Long tenantId) {
|
||||
return tenantId == null ? null : ("tenant_" + tenantId);
|
||||
}
|
||||
|
||||
public Long transform(String tenantId) {
|
||||
return tenantId == null ? null : Long.parseLong(tenantId.substring("tenant_".length()));
|
||||
}
|
||||
|
||||
public Long findTenantId() {
|
||||
Tenant tenant = this.findTenant();
|
||||
return tenant == null ? null : tenant.getId();
|
||||
}
|
||||
|
||||
public Tenant findTenant() {
|
||||
this.logger.trace("Checking for a single tenant ...");
|
||||
|
||||
String tenantName = null;
|
||||
if (this.tenant != null) {
|
||||
tenantName = this.tenant;
|
||||
} else {
|
||||
List<Object[]> tenants = this.tenantService.getAllTenants();
|
||||
if (tenants == null || tenants.isEmpty()) {
|
||||
this.logger.warn("No tenants found!");
|
||||
return null;
|
||||
} else if (tenants.size() == 1) {
|
||||
Object[] tenant = tenants.iterator().next();
|
||||
this.logger.debug("Only one tenant available; selecting it: {}", tenant[0]);
|
||||
return this.tenantService.getTenant((Long)tenant[0]);
|
||||
} else {
|
||||
tenantName = this.licenseService.getDefaultTenantName();
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.trace("Trying to find by tenant name: {}", tenantName);
|
||||
|
||||
List<Tenant> tenants = this.tenantService.findTenantsByName(tenantName);
|
||||
if (tenants == null || tenants.isEmpty()) {
|
||||
this.logger.warn("Named tenant not found");
|
||||
return null;
|
||||
}
|
||||
|
||||
this.logger.debug("Found {} tenants with name {}; selecting the first one", tenants.size(), tenantName);
|
||||
return tenants.iterator().next();
|
||||
}
|
||||
|
||||
public Collection<Tenant> getTenants() {
|
||||
List<Object[]> tenantObjs = this.tenantService.getAllTenants();
|
||||
|
||||
List<Tenant> tenants = new ArrayList<>(tenantObjs.size());
|
||||
for (Object[] tenantObj : tenantObjs) {
|
||||
if (tenantObj != null && tenantObj[0] != null) {
|
||||
Tenant tenant = this.tenantService.getTenant((Long)tenantObj[0]);
|
||||
tenants.add(tenant);
|
||||
}
|
||||
}
|
||||
|
||||
return tenants;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,9 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public interface TransactionalMessageHandler<BodyType> {
|
||||
|
||||
void onMessage(DeliveredMessage<BodyType> message) throws IOException;
|
||||
|
||||
}
|
@ -0,0 +1,141 @@
|
||||
package com.inteligr8.activiti.mq.amqp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Collections;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.GenericDestination;
|
||||
import com.inteligr8.activiti.mq.MqCommunicator;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.CancelCallback;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
import com.rabbitmq.client.Delivery;
|
||||
|
||||
public class AmqpCommunicator implements MqCommunicator {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
private final ConnectionFactory confactory;
|
||||
|
||||
public AmqpCommunicator(String url, String username, String password) {
|
||||
this.confactory = new ConnectionFactory();
|
||||
this.confactory.setUsername(username);
|
||||
this.confactory.setPassword(password);
|
||||
try {
|
||||
this.confactory.setUri(url);
|
||||
} catch (KeyManagementException kme) {
|
||||
throw new RuntimeException(kme);
|
||||
} catch (NoSuchAlgorithmException nsae) {
|
||||
throw new RuntimeException(nsae);
|
||||
} catch (URISyntaxException use) {
|
||||
throw new RuntimeException(use);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean validateConnection() {
|
||||
try {
|
||||
Connection con = this.connect();
|
||||
try {
|
||||
return true;
|
||||
} finally {
|
||||
con.close();
|
||||
}
|
||||
} catch (TimeoutException | IOException e) {
|
||||
this.logger.debug("Connection found to be invalid: " + e.getMessage(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> PreparedMessage<BodyType> createPreparedMessage() {
|
||||
return new AmqpPreparedMessage<>();
|
||||
}
|
||||
|
||||
public Connection connect() throws TimeoutException, IOException {
|
||||
return this.confactory.newConnection();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException {
|
||||
String correlationId = message.getCorrelationId() != null ? message.getCorrelationId() : UUID.randomUUID().toString();
|
||||
|
||||
AMQP.BasicProperties bprops = new AMQP.BasicProperties(null, null,
|
||||
message.getProperties(),
|
||||
destination.getDeliveryMode() != null ? destination.getDeliveryMode() : null,
|
||||
destination.getPriority() != null ? destination.getPriority() : null,
|
||||
correlationId,
|
||||
message.getReplyToQueueName() != null ? message.getReplyToQueueName() : null,
|
||||
null, // expiration
|
||||
null, // messageId
|
||||
null, // timestamp
|
||||
null, // type
|
||||
null, null, null);
|
||||
|
||||
Connection con = this.connect();
|
||||
try {
|
||||
Channel channel = con.createChannel();
|
||||
try {
|
||||
channel.queueDeclare(destination.getQueueName(), true, false, false, Collections.emptyMap());
|
||||
channel.basicPublish(destination.getQueueName(), null, bprops, null);
|
||||
} finally {
|
||||
channel.close();
|
||||
}
|
||||
} finally {
|
||||
con.close();
|
||||
}
|
||||
|
||||
return correlationId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, final String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
|
||||
Connection con = this.connect();
|
||||
try {
|
||||
Channel channel = con.createChannel();
|
||||
try {
|
||||
channel.queueDeclare(destination.getQueueName(), true, false, false, Collections.emptyMap());
|
||||
channel.basicConsume(
|
||||
destination.getQueueName(),
|
||||
false, // auto-ack
|
||||
new DeliverCallback() {
|
||||
@Override
|
||||
public void handle(String consumerTag, Delivery message) throws IOException {
|
||||
if (correlationId != null && !correlationId.equals(message.getProperties().getCorrelationId()))
|
||||
channel.basicNack(message.getEnvelope().getDeliveryTag(), true, true);
|
||||
|
||||
logger.debug("Received message '{}' from queue: {}", message.getProperties().getMessageId(), message.getEnvelope().getExchange());
|
||||
// TODO
|
||||
}
|
||||
},
|
||||
new CancelCallback() {
|
||||
@Override
|
||||
public void handle(String consumerTag) throws IOException {
|
||||
}
|
||||
}
|
||||
);
|
||||
} finally {
|
||||
channel.close();
|
||||
}
|
||||
} finally {
|
||||
con.close();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
package com.inteligr8.activiti.mq.amqp;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.ByteArrayChannel;
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.rabbitmq.client.Delivery;
|
||||
|
||||
/**
|
||||
* FIXME this is completley untested
|
||||
*/
|
||||
public class AmqpDeliveredMessage<BodyType> extends AbstractMessage implements DeliveredMessage<BodyType> {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public AmqpDeliveredMessage(Delivery message, Class<BodyType> bodyType) throws IOException {
|
||||
this.setCorrelationId(message.getProperties().getCorrelationId());
|
||||
this.setMessageId(message.getProperties().getMessageId());
|
||||
this.setPriority(message.getProperties().getPriority());
|
||||
if (message.getProperties().getTimestamp() != null)
|
||||
this.setDeliveryTime(OffsetDateTime.ofInstant(message.getProperties().getTimestamp().toInstant(), ZoneOffset.systemDefault()));
|
||||
this.setReplyToQueueName(message.getProperties().getReplyTo());
|
||||
this.setContentType(message.getProperties().getContentType());
|
||||
|
||||
String encoding = message.getProperties().getContentEncoding() == null ? "utf-8" : message.getProperties().getContentEncoding();
|
||||
if (String.class.isAssignableFrom(bodyType.getClass())) {
|
||||
this.setContent((BodyType) new String(message.getBody(), encoding));
|
||||
} else if (byte[].class.isAssignableFrom(bodyType)) {
|
||||
this.setContent((BodyType) message.getBody());
|
||||
} else if (InputStream.class.isAssignableFrom(bodyType)) {
|
||||
this.setContent((BodyType) new ByteArrayInputStream(message.getBody()));
|
||||
} else if (ReadableByteChannel.class.isAssignableFrom(bodyType)) {
|
||||
this.setContent((BodyType) new ByteArrayChannel(message.getBody()));
|
||||
} else {
|
||||
String str = new String(message.getBody(), encoding);
|
||||
ObjectMapper om = new ObjectMapper();
|
||||
this.setContent(om.readValue(str, bodyType));
|
||||
}
|
||||
|
||||
if (message.getProperties().getHeaders() != null)
|
||||
this.getProperties().putAll(message.getProperties().getHeaders());
|
||||
}
|
||||
|
||||
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(Delivery message) throws IOException {
|
||||
return new AmqpDeliveredMessage<>(message, null);
|
||||
}
|
||||
|
||||
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(Delivery message, Class<BodyType> bodyType) throws IOException {
|
||||
return new AmqpDeliveredMessage<>(message, bodyType);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,78 @@
|
||||
package com.inteligr8.activiti.mq.amqp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.util.Date;
|
||||
|
||||
import javax.jms.DeliveryMode;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.jgroups.util.UUID;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.Delivery;
|
||||
|
||||
/**
|
||||
* FIXME this is completely untested
|
||||
*/
|
||||
public class AmqpPreparedMessage<BodyType> extends AbstractMessage implements PreparedMessage<BodyType> {
|
||||
|
||||
public Delivery transform(Class<BodyType> bodyType) throws IOException {
|
||||
if (this.getContentType() == null) {
|
||||
if (String.class.isAssignableFrom(bodyType)) {
|
||||
this.setContentType("text/plain");
|
||||
} else if (byte[].class.isAssignableFrom(bodyType)) {
|
||||
this.setContentType("application/octet-stream");
|
||||
} else if (InputStream.class.isAssignableFrom(bodyType)) {
|
||||
this.setContentType("application/octet-stream");
|
||||
} else {
|
||||
// if not set, we are going to force it to JSON using the JackSON mapper
|
||||
this.setContentType("application/json");
|
||||
}
|
||||
}
|
||||
|
||||
String correlationId = this.getCorrelationId();
|
||||
if (correlationId == null)
|
||||
correlationId = UUID.randomUUID().toString();
|
||||
|
||||
AMQP.BasicProperties bprops = new AMQP.BasicProperties(
|
||||
this.getContentType(),
|
||||
"utf-8",
|
||||
this.getProperties(),
|
||||
DeliveryMode.PERSISTENT, // yes, this is a JMS class, but the values are the same; survive restarts
|
||||
null, // default priority
|
||||
correlationId,
|
||||
this.getReplyToQueueName() != null ? this.getReplyToQueueName() : null,
|
||||
null, // expiration
|
||||
null, // messageId
|
||||
new Date(), // timestamp
|
||||
null, // type
|
||||
null, // userId
|
||||
null, // appId
|
||||
null // clusterId
|
||||
);
|
||||
|
||||
byte[] content = null;
|
||||
if (String.class.isAssignableFrom(bodyType)) {
|
||||
content = ((String) this.getContent()).getBytes("utf-8");
|
||||
} else if (byte[].class.isAssignableFrom(bodyType)) {
|
||||
content = (byte[]) this.getContent();
|
||||
} else if (InputStream.class.isAssignableFrom(bodyType)) {
|
||||
content = IOUtils.toByteArray((InputStream) this.getContent());
|
||||
} else if (ReadableByteChannel.class.isAssignableFrom(bodyType)) {
|
||||
InputStream istream = Channels.newInputStream((ReadableByteChannel) this.getContent());
|
||||
content = IOUtils.toByteArray(istream);
|
||||
} else {
|
||||
ObjectMapper om = new ObjectMapper();
|
||||
content = om.writeValueAsString(this.getContent()).getBytes("utf-8");
|
||||
}
|
||||
|
||||
return new Delivery(null, bprops, content);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
package com.inteligr8.activiti.mq.jms;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
public class BytesMessageInputStream extends InputStream {
|
||||
|
||||
private final BytesMessage bmessage;
|
||||
|
||||
public BytesMessageInputStream(BytesMessage bmessage) {
|
||||
this.bmessage = bmessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
try {
|
||||
return this.bmessage.readByte();
|
||||
} catch (JMSException je) {
|
||||
throw new IOException(je);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b) throws IOException {
|
||||
try {
|
||||
return this.bmessage.readBytes(b);
|
||||
} catch (JMSException je) {
|
||||
throw new IOException(je);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
try {
|
||||
if (off > 0)
|
||||
// skip offset bytes
|
||||
this.bmessage.readBytes(new byte[off], off);
|
||||
return this.bmessage.readBytes(b, len);
|
||||
} catch (JMSException je) {
|
||||
throw new IOException(je);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
package com.inteligr8.activiti.mq.jms;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
public class BytesMessageOutputStream extends OutputStream {
|
||||
|
||||
private final BytesMessage bmessage;
|
||||
|
||||
public BytesMessageOutputStream(BytesMessage bmessage) {
|
||||
this.bmessage = bmessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
try {
|
||||
this.bmessage.writeByte((byte) b);
|
||||
} catch (JMSException je) {
|
||||
throw new IOException(je);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b) throws IOException {
|
||||
try {
|
||||
this.bmessage.writeBytes(b);
|
||||
} catch (JMSException je) {
|
||||
throw new IOException(je);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b, int offset, int length) throws IOException {
|
||||
try {
|
||||
this.bmessage.writeBytes(b, offset, length);
|
||||
} catch (JMSException je) {
|
||||
throw new IOException(je);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
174
src/main/java/com/inteligr8/activiti/mq/jms/JmsCommunicator.java
Normal file
174
src/main/java/com/inteligr8/activiti/mq/jms/JmsCommunicator.java
Normal file
@ -0,0 +1,174 @@
|
||||
package com.inteligr8.activiti.mq.jms;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.GenericDestination;
|
||||
import com.inteligr8.activiti.mq.MqCommunicator;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
|
||||
|
||||
public class JmsCommunicator implements MqCommunicator {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
private final ActiveMQConnectionFactory confactory;
|
||||
|
||||
public JmsCommunicator(String url, String username, String password) throws JMSException {
|
||||
this.confactory = new ActiveMQConnectionFactory();
|
||||
this.confactory.setUserName(username);
|
||||
this.confactory.setPassword(password);
|
||||
this.confactory.setBrokerURL(url);
|
||||
this.confactory.setClientIDPrefix("activiti-app");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean validateConnection() {
|
||||
try {
|
||||
Connection con = this.connect();
|
||||
try {
|
||||
Session session = this.start(con);
|
||||
try {
|
||||
return true;
|
||||
} finally {
|
||||
session.close();
|
||||
}
|
||||
} finally {
|
||||
con.close();
|
||||
}
|
||||
} catch (JMSException je) {
|
||||
this.logger.debug("Connection found to be invalid: " + je.getMessage(), je);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> PreparedMessage<BodyType> createPreparedMessage() {
|
||||
return new JmsPreparedMessage<>();
|
||||
}
|
||||
|
||||
public Connection connect() throws JMSException {
|
||||
return this.confactory.createConnection();
|
||||
}
|
||||
|
||||
public Session start(Connection con) throws JMSException {
|
||||
return con.createSession(Session.SESSION_TRANSACTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException {
|
||||
Connection con = this.connect();
|
||||
try {
|
||||
return this.send(con, destination, (JmsPreparedMessage<BodyType>) message);
|
||||
} finally {
|
||||
con.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected <BodyType> String send(Connection con, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
Session session = this.start(con);
|
||||
try {
|
||||
String correlationId = this.send(session, destination, message);
|
||||
session.commit();
|
||||
return correlationId;
|
||||
} finally {
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
|
||||
public <BodyType> String send(Session session, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
MessageProducer messenger = session.createProducer(destination.toJmsQueue(session));
|
||||
try {
|
||||
if (destination.getDeliveryMode() != null)
|
||||
messenger.setDeliveryMode(destination.getDeliveryMode());
|
||||
if (destination.getPriority() != null)
|
||||
messenger.setPriority(destination.getPriority());
|
||||
if (destination.getTimeToLive() != null)
|
||||
messenger.setTimeToLive(destination.getTimeToLive());
|
||||
|
||||
Message jmsmsg = message.transform(session);
|
||||
|
||||
this.logger.debug("Sending message to queue: {}", destination.getQueueName());
|
||||
messenger.send(jmsmsg);
|
||||
|
||||
return jmsmsg.getJMSCorrelationID();
|
||||
} finally {
|
||||
messenger.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, TimeoutException, IOException {
|
||||
Connection con = this.connect();
|
||||
try {
|
||||
return this.receive(con, destination, timeoutInMillis, correlationId, handler);
|
||||
} finally {
|
||||
con.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected <BodyType> JmsDeliveredMessage<BodyType> receive(Connection con, GenericDestination destination, long timeoutInMillis, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, TimeoutException, IOException {
|
||||
con.start();
|
||||
try {
|
||||
Session session = this.start(con);
|
||||
try {
|
||||
JmsDeliveredMessage<BodyType> message = this.receive(session, destination, timeoutInMillis, correlationId);
|
||||
if (message == null)
|
||||
return null;
|
||||
|
||||
if (handler != null)
|
||||
handler.onMessage(message);
|
||||
|
||||
session.commit();
|
||||
return message;
|
||||
} finally {
|
||||
session.close();
|
||||
}
|
||||
} finally {
|
||||
con.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public <BodyType> JmsDeliveredMessage<BodyType> receive(Session session, GenericDestination destination, long timeoutInMillis, String correlationId) throws JMSException, TimeoutException {
|
||||
String messageSelector = correlationId == null ? null : ("JMSCorrelationID='" + correlationId + "'");
|
||||
MessageConsumer messenger = session.createConsumer(destination.toJmsQueue(session), messageSelector);
|
||||
try {
|
||||
if (timeoutInMillis < 0L) {
|
||||
this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName());
|
||||
return JmsDeliveredMessage.transform(messenger.receive());
|
||||
} else if (timeoutInMillis == 0L) {
|
||||
this.logger.debug("Checking for message without waiting: {}", destination.getQueueName());
|
||||
return JmsDeliveredMessage.transform(messenger.receiveNoWait());
|
||||
} else {
|
||||
this.logger.debug("Waiting for message for {} ms: {}", timeoutInMillis, destination.getQueueName());
|
||||
long startTime = System.currentTimeMillis();
|
||||
Message message = messenger.receive(timeoutInMillis);
|
||||
long elapsedTime = System.currentTimeMillis() - startTime;
|
||||
if (message != null) {
|
||||
this.logger.debug("Received message after {} ms: {}", elapsedTime, destination.getQueueName());
|
||||
return JmsDeliveredMessage.transform(message);
|
||||
}
|
||||
|
||||
if (elapsedTime < timeoutInMillis) {
|
||||
throw new JMSException("The reading of the queue ended prematurely");
|
||||
} else {
|
||||
throw new TimeoutException("A timeout of " + timeoutInMillis + " ms was reached");
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
messenger.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,103 @@
|
||||
package com.inteligr8.activiti.mq.jms;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.time.Instant;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.Enumeration;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
|
||||
public class JmsDeliveredMessage<BodyType> extends AbstractMessage implements DeliveredMessage<BodyType> {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public JmsDeliveredMessage(Message message, Class<BodyType> bodyType) throws JMSException {
|
||||
this.setCorrelationId(message.getJMSCorrelationID());
|
||||
this.setMessageId(message.getJMSMessageID());
|
||||
this.setPriority(message.getJMSPriority());
|
||||
|
||||
if (message.getJMSDeliveryTime() > 0L) {
|
||||
this.setDeliveryTime(OffsetDateTime.ofInstant(Instant.ofEpochMilli(message.getJMSDeliveryTime()), ZoneOffset.UTC));
|
||||
} else if (message.getJMSTimestamp() > 0L) {
|
||||
this.setDeliveryTime(OffsetDateTime.ofInstant(Instant.ofEpochMilli(message.getJMSTimestamp()), ZoneOffset.UTC));
|
||||
}
|
||||
|
||||
if (message instanceof TextMessage) {
|
||||
if (bodyType == null || String.class.isAssignableFrom(bodyType)) {
|
||||
this.setContent((BodyType) message.getBody(String.class));
|
||||
} else {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
} else if (message instanceof BytesMessage) {
|
||||
if (bodyType == null || bodyType.isArray() && Byte.class.equals(bodyType.getComponentType())) {
|
||||
InputStream istream = new BytesMessageInputStream((BytesMessage) message);
|
||||
try {
|
||||
this.setContent((BodyType) IOUtils.toByteArray(istream));
|
||||
} catch (IOException ie) {
|
||||
throw (JMSException) ie.getCause();
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
} else if (message instanceof StreamMessage) {
|
||||
if (bodyType == null || InputStream.class.isAssignableFrom(bodyType)) {
|
||||
InputStream istream = new StreamMessageInputStream((StreamMessage) message);
|
||||
this.setContent((BodyType) istream);
|
||||
} else {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
} else {
|
||||
// message with no content
|
||||
String body = message.getBody(String.class);
|
||||
if (body != null && body.length() > 0) {
|
||||
this.logger.warn("A message with an unexpected content body was received: {}", message.getJMSMessageID());
|
||||
this.logger.debug("Message '{}' body: {}", message.getJMSMessageID(), body);
|
||||
}
|
||||
|
||||
this.setContent((BodyType) body);
|
||||
}
|
||||
|
||||
Destination destination = message.getJMSReplyTo();
|
||||
if (destination instanceof Queue) {
|
||||
this.setReplyToQueueName(((Queue) destination).getQueueName());
|
||||
} else if (destination instanceof Topic) {
|
||||
this.setReplyToQueueName(((Topic) destination).getTopicName());
|
||||
}
|
||||
|
||||
Enumeration<String> keys = message.getPropertyNames();
|
||||
while (keys.hasMoreElements()) {
|
||||
String key = keys.nextElement();
|
||||
this.setProperty(key, message.getObjectProperty(key));
|
||||
}
|
||||
}
|
||||
|
||||
public static <BodyType> JmsDeliveredMessage<BodyType> transform(Message message) throws JMSException {
|
||||
if (message == null)
|
||||
return null;
|
||||
return new JmsDeliveredMessage<BodyType>(message, null);
|
||||
}
|
||||
|
||||
public static <BodyType> JmsDeliveredMessage<BodyType> transform(Message message, Class<BodyType> bodyType) throws JMSException {
|
||||
if (message == null)
|
||||
return null;
|
||||
return new JmsDeliveredMessage<BodyType>(message, bodyType);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
package com.inteligr8.activiti.mq.jms;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.StreamMessage;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.jgroups.util.UUID;
|
||||
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
|
||||
public class JmsPreparedMessage<BodyType> extends AbstractMessage implements PreparedMessage<BodyType> {
|
||||
|
||||
public Message transform(Session session) throws JMSException {
|
||||
Message message = null;
|
||||
|
||||
try {
|
||||
if (this.getContent() == null) {
|
||||
message = session.createMessage();
|
||||
} else if (this.getContent() instanceof String) {
|
||||
message = session.createTextMessage((String) this.getContent());
|
||||
} else if (this.getContent() instanceof byte[]) {
|
||||
BytesMessage bmessage = session.createBytesMessage();
|
||||
IOUtils.write((byte[]) this.getContent(), new BytesMessageOutputStream(bmessage));
|
||||
message = bmessage;
|
||||
} else if (this.getContent() instanceof InputStream) {
|
||||
StreamMessage smessage = session.createStreamMessage();
|
||||
IOUtils.write((byte[]) this.getContent(), new StreamMessageOutputStream(smessage));
|
||||
message = smessage;
|
||||
} else {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
throw (JMSException) ie.getCause();
|
||||
}
|
||||
|
||||
if (this.getCorrelationId() == null) {
|
||||
message.setJMSCorrelationID(UUID.randomUUID().toString());
|
||||
} else {
|
||||
message.setJMSCorrelationID(this.getCorrelationId());
|
||||
}
|
||||
|
||||
if (this.getProperties() != null) {
|
||||
for (Entry<String, Object> prop : this.getProperties().entrySet())
|
||||
message.setObjectProperty(prop.getKey(), prop.getValue());
|
||||
}
|
||||
|
||||
if (this.getReplyToQueueName() != null)
|
||||
message.setJMSReplyTo(session.createQueue(this.getReplyToQueueName()));
|
||||
|
||||
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); // survive restarts
|
||||
return message;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package com.inteligr8.activiti.mq.jms;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.StreamMessage;
|
||||
|
||||
public class StreamMessageInputStream extends InputStream {
|
||||
|
||||
private final StreamMessage smessage;
|
||||
|
||||
public StreamMessageInputStream(StreamMessage smessage) {
|
||||
this.smessage = smessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
try {
|
||||
return this.smessage.readByte();
|
||||
} catch (JMSException je) {
|
||||
throw new IOException(je);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b) throws IOException {
|
||||
try {
|
||||
return this.smessage.readBytes(b);
|
||||
} catch (JMSException je) {
|
||||
throw new IOException(je);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
package com.inteligr8.activiti.mq.jms;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.StreamMessage;
|
||||
|
||||
public class StreamMessageOutputStream extends OutputStream {
|
||||
|
||||
private final StreamMessage smessage;
|
||||
|
||||
public StreamMessageOutputStream(StreamMessage smessage) {
|
||||
this.smessage = smessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
try {
|
||||
this.smessage.writeByte((byte) b);
|
||||
} catch (JMSException je) {
|
||||
throw new IOException(je);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b) throws IOException {
|
||||
try {
|
||||
this.smessage.writeBytes(b);
|
||||
} catch (JMSException je) {
|
||||
throw new IOException(je);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b, int offset, int length) throws IOException {
|
||||
try {
|
||||
this.smessage.writeBytes(b, offset, length);
|
||||
} catch (JMSException je) {
|
||||
throw new IOException(je);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
126
src/test/java/com/inteligr8/activiti/mq/MQDockerIT.java
Normal file
126
src/test/java/com/inteligr8/activiti/mq/MQDockerIT.java
Normal file
@ -0,0 +1,126 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.Queue;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class MQDockerIT {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
private MqCommunicationService commsService;
|
||||
|
||||
@PostConstruct
|
||||
private void init() throws JMSException, IOException, TimeoutException, InterruptedException {
|
||||
this.testDockerTcpConnection();
|
||||
this.testDockerTcpReceiveEmpty();
|
||||
this.testDockerTcpSendReceive();
|
||||
this.testDockerFailoverConnection();
|
||||
this.testDockerFailoverReceiveEmpty();
|
||||
this.testDockerFailoverSendReceive();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDockerTcpConnection() throws JMSException {
|
||||
this.testDockerConnection("tcp://mq-activiti-ext-mq:61616");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDockerTcpReceiveEmpty() throws JMSException, IOException {
|
||||
this.testDockerReceiveEmpty("tcp://mq-activiti-ext-mq:61616");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDockerTcpSendReceive() throws JMSException, IOException, TimeoutException, InterruptedException {
|
||||
this.testDockerSendReceive("tcp://mq-activiti-ext-mq:61616");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDockerFailoverConnection() throws JMSException {
|
||||
this.testDockerConnection("failover:(tcp://mq-activiti-ext-mq:61616)?timeout=5000");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDockerFailoverReceiveEmpty() throws JMSException, IOException {
|
||||
this.testDockerReceiveEmpty("failover:(tcp://mq-activiti-ext-mq:61616)?timeout=5000");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDockerFailoverSendReceive() throws JMSException, IOException, TimeoutException, InterruptedException {
|
||||
this.testDockerSendReceive("failover:(tcp://mq-activiti-ext-mq:61616)?timeout=5000");
|
||||
}
|
||||
|
||||
protected void testDockerConnection(String url) throws JMSException {
|
||||
this.logger.info("Testing MQ connection: {}", url);
|
||||
MqCommunicator communicator = this.commsService.getCommunicator(URI.create(url), "admin", "admin");
|
||||
Assertions.assertTrue(communicator.validateConnection());
|
||||
}
|
||||
|
||||
protected void testDockerReceiveEmpty(String url) throws JMSException, IOException {
|
||||
this.logger.info("Testing MQ empty queue receiving: {}", url);
|
||||
MqCommunicator communicator = this.commsService.getCommunicator(URI.create(url), "admin", "admin");
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
GenericDestination destination = new GenericDestination();
|
||||
destination.setQueueName("com.inteligr8.mq.test");
|
||||
try {
|
||||
communicator.receive(destination, 3000L);
|
||||
Assertions.fail();
|
||||
} catch (TimeoutException te) {
|
||||
long elapsedTime = System.currentTimeMillis() - startTime;
|
||||
if (elapsedTime < 3000L) {
|
||||
Assertions.fail("Timed out earlier than it should have: " + elapsedTime + " ms");
|
||||
} else if (elapsedTime > 3100L) {
|
||||
Assertions.fail("Timed out later than it should have: " + elapsedTime + " ms");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void testDockerSendReceive(String url) throws JMSException, IOException, TimeoutException, InterruptedException {
|
||||
this.logger.info("Testing MQ queue send/receive: {}", url);
|
||||
MqCommunicator communicator = this.commsService.getCommunicator(URI.create(url), "admin", "admin");
|
||||
|
||||
GenericDestination destination = new GenericDestination();
|
||||
destination.setQueueName("com.inteligr8.mq.test");
|
||||
|
||||
PreparedMessage<String> message = communicator.createPreparedMessage();
|
||||
message.setContent("{}", "application/json");
|
||||
message.setReplyToQueueName("com.inteligr8.mq.test.reply");
|
||||
|
||||
OffsetDateTime beforeSend = OffsetDateTime.now();
|
||||
this.logger.debug("Timestamp before send: {}", beforeSend);
|
||||
|
||||
String correlationId = communicator.send(destination, message);
|
||||
Assertions.assertNotNull(correlationId);
|
||||
|
||||
DeliveredMessage<String> receivedMessage = communicator.receive(destination, 25L, correlationId);
|
||||
Assertions.assertNotNull(receivedMessage);
|
||||
this.logger.debug("Timestamp of delivery: {}", receivedMessage.getDeliveryTime());
|
||||
|
||||
Assertions.assertNotNull(receivedMessage.getMessageId());
|
||||
Assertions.assertEquals(correlationId, receivedMessage.getCorrelationId());
|
||||
Assertions.assertEquals(message.getReplyToQueueName(), receivedMessage.getReplyToQueueName());
|
||||
Assertions.assertEquals("{}", receivedMessage.getContent());
|
||||
Assertions.assertNotNull(receivedMessage.getDeliveryTime());
|
||||
Assertions.assertFalse(receivedMessage.getDeliveryTime().isBefore(beforeSend));
|
||||
Assertions.assertFalse(receivedMessage.getDeliveryTime().isAfter(OffsetDateTime.now()));
|
||||
Assertions.assertNotNull(receivedMessage.getPriority());
|
||||
}
|
||||
|
||||
}
|
51
src/test/resources/log4j2-test.properties
Normal file
51
src/test/resources/log4j2-test.properties
Normal file
@ -0,0 +1,51 @@
|
||||
rootLogger.level=info
|
||||
rootLogger.appenderRef.stdout.ref=ConsoleAppender
|
||||
|
||||
###### Console appender definition #######
|
||||
# Direct log messages to stdout
|
||||
appender.console.type=Console
|
||||
appender.console.name=ConsoleAppender
|
||||
appender.console.layout.type=PatternLayout
|
||||
appender.console.layout.pattern=%highlight{%d{hh:mm:ss,SSS} [%t] %-5p} %c - %m%n
|
||||
|
||||
logger.apache.name=org.apache
|
||||
logger.apache.level=warn
|
||||
|
||||
logger.springframework.name=org.springframework
|
||||
logger.springframework.level=warn
|
||||
|
||||
logger.apache-ibatis.name=org.apache.ibatis
|
||||
logger.apache-ibatis.level=info
|
||||
|
||||
logger.springframework-security.name=org.springframework.security
|
||||
logger.springframework-security.level=warn
|
||||
|
||||
logger.hibernate.name=org.hibernate
|
||||
logger.hibernate.level=warn
|
||||
|
||||
logger.hibernate-validator.name=org.hibernate.validator
|
||||
logger.hibernate-validator.level=warn
|
||||
|
||||
logger.springframework-web.name=org.springframework.web
|
||||
logger.springframework-web.level=warn
|
||||
|
||||
logger.ryantenney.name=com.ryantenney
|
||||
logger.ryantenney.level=warn
|
||||
|
||||
logger.javax-activation.name=javax.activation
|
||||
logger.javax-activation.level=info
|
||||
|
||||
logger.hibernate-engine-internal.name=org.hibernate.engine.internal
|
||||
logger.hibernate-engine-internal.level=warn
|
||||
|
||||
logger.codahale-metrics.name=com.codahale.metrics
|
||||
logger.codahale-metrics.level=warn
|
||||
|
||||
logger.zaxxer.name=com.zaxxer
|
||||
logger.zaxxer.level=warn
|
||||
|
||||
logger.liquibase.name=liquibase
|
||||
logger.liquibase.level=WARN
|
||||
|
||||
logger.inteligr8-mq.name=com.inteligr8.activiti.mq
|
||||
logger.inteligr8-mq.level=TRACE
|
Loading…
x
Reference in New Issue
Block a user