major refactoring
This commit is contained in:
2
pom.xml
2
pom.xml
@@ -27,7 +27,7 @@
|
||||
<spring.version>5.3.31</spring.version>
|
||||
<activiti.version>7.11.1</activiti.version>
|
||||
<tomcat-rad.version>9-2.2</tomcat-rad.version>
|
||||
<aps.tomcat.opts>-Dinteligr8.mq.connectors.docker-mq.url=failover:\(tcp://${project.artifactId}-mq:61616\)?timeout=3000 -Dinteligr8.mq.connectors.docker-mq.username=admin -Dinteligr8.mq.connectors.docker-mq.password=admin</aps.tomcat.opts>
|
||||
<aps.tomcat.opts>-Dinteligr8.mq.connectors.docker-mq.url=failover:\(tcp://${project.artifactId}-mq:61616\)?timeout=3000 -Dinteligr8.mq.connectors.docker-mq.username=admin -Dinteligr8.mq.connectors.docker-mq.password=admin -Dvalidator.editor.dmn.espression=false -Dvalidator.editor.bpmn.disable.scripttask=false -Djavascript.secure-scripting.enabled=false -Djavascript.secure-scripting.enable-class-whitelisting=false -Dbeans.whitelisting.enabled=false -Del.whitelisting.enabled=false -Dshell.whitelisting.enabled=false</aps.tomcat.opts>
|
||||
|
||||
<!-- reloads in APS are slower than a restart -->
|
||||
<aps.hotswap.enabled>false</aps.hotswap.enabled>
|
||||
|
@@ -94,11 +94,11 @@ public class ActivitiEntityEventMonitor implements ActivitiEventListener, Applic
|
||||
*/
|
||||
@Override
|
||||
public void onEvent(ActivitiEvent event) {
|
||||
this.logger.trace("Triggered by event: {}", event);
|
||||
this.logger.trace("Triggered by event: {}: {}; pdId: {}; piId: {}; execId: {}", event.getType(), event.getClass(), event.getProcessDefinitionId(), event.getProcessInstanceId(), event.getExecutionId());
|
||||
|
||||
for (ActivitiEntityEventListener<? extends Entity> listener : this.listeners) {
|
||||
ActivitiEntityEvent aaevent = (ActivitiEntityEvent) event;
|
||||
if (listener.ofEntityType((Entity) aaevent.getEntity()))
|
||||
if (aaevent.getEntity() instanceof Entity && listener.ofEntityType((Entity) aaevent.getEntity()))
|
||||
listener.onEntityEvent(aaevent);
|
||||
}
|
||||
}
|
||||
|
@@ -5,13 +5,16 @@ import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
public interface MqCommunicator {
|
||||
|
||||
boolean validateConnection();
|
||||
|
||||
<BodyType> PreparedMessage<BodyType> createPreparedMessage();
|
||||
|
||||
<BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException;
|
||||
<BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException;
|
||||
|
||||
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination) throws JMSException, IOException, TimeoutException {
|
||||
return this.receive(destination, -1L, null, null, null);
|
||||
|
@@ -54,7 +54,12 @@ public class MqDelegateExecution {
|
||||
}
|
||||
|
||||
protected void setMqVariable(String varName, Object value) {
|
||||
this.execution.setVariable(varName, value);
|
||||
varName = this.formulateVariableName(varName);
|
||||
if (this.task.doWriteToProcessScope()) {
|
||||
this.execution.setVariable(varName, value);
|
||||
} else {
|
||||
this.execution.setVariableLocal(varName, value);
|
||||
}
|
||||
}
|
||||
|
||||
public String formulateVariableName(String varName) {
|
||||
|
@@ -138,8 +138,17 @@ public class MqExecutionService {
|
||||
* @return `true` if execution was cached; `false` otherwise.
|
||||
*/
|
||||
public boolean cancel(String executionId) throws Exception {
|
||||
this.logger.trace("cancel({})", executionId);
|
||||
Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult();
|
||||
if (execution == null) {
|
||||
this.logger.debug("No execution to cancel: {}", executionId);
|
||||
return false;
|
||||
}
|
||||
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
|
||||
if (pi == null) {
|
||||
this.logger.debug("No process instance to cancel: {}", executionId);
|
||||
return false;
|
||||
}
|
||||
|
||||
return this.cancel(pi.getProcessDefinitionId(), executionId);
|
||||
}
|
||||
@@ -152,6 +161,7 @@ public class MqExecutionService {
|
||||
* @return A set of execution identifiers that were cancelled.
|
||||
*/
|
||||
public Set<String> cancelAll(String processDefinitionId) throws Exception {
|
||||
this.logger.trace("cancelAll({})", processDefinitionId);
|
||||
ProcessDefinition processDefinition = this.services.getRepositoryService().getProcessDefinition(processDefinitionId);
|
||||
String processDefinitionKey = processDefinition.getKey();
|
||||
|
||||
@@ -163,7 +173,10 @@ public class MqExecutionService {
|
||||
* @return A set of execution identifiers that were in the now cleared map.
|
||||
*/
|
||||
public synchronized Set<String> cancelAllOtherVersions(String latestProcessDefinitionId) throws Exception {
|
||||
this.logger.trace("cancelAllOtherVersions({})", latestProcessDefinitionId);
|
||||
ProcessDefinition latestProcessDefinition = this.services.getRepositoryService().getProcessDefinition(latestProcessDefinitionId);
|
||||
if (latestProcessDefinition == null)
|
||||
return Collections.emptySet();
|
||||
String processDefinitionKey = latestProcessDefinition.getKey();
|
||||
|
||||
Set<String> executionIds = new HashSet<>();
|
||||
|
@@ -8,6 +8,9 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* This needs to be changed to detect just process versioning (deactivation?)
|
||||
*/
|
||||
@Component
|
||||
public class MqJobEventListener implements ActivitiEntityEventListener<JobEntity> {
|
||||
|
||||
@@ -31,14 +34,13 @@ public class MqJobEventListener implements ActivitiEntityEventListener<JobEntity
|
||||
String executionId, JobEntity entity) {
|
||||
this.logger.trace("Triggered by job event: {}", entity);
|
||||
|
||||
switch (eventType) {
|
||||
case ENTITY_DELETED:
|
||||
case ENTITY_SUSPENDED:
|
||||
// we need to stop the listener
|
||||
this.onJobRemoveEvent(entity);
|
||||
break;
|
||||
default:
|
||||
}
|
||||
// switch (eventType) {
|
||||
// case ENTITY_DELETED:
|
||||
// case ENTITY_SUSPENDED:
|
||||
// this.onJobRemoveEvent(entity);
|
||||
// break;
|
||||
// default:
|
||||
// }
|
||||
}
|
||||
|
||||
protected void onJobRemoveEvent(JobEntity entity) {
|
||||
|
@@ -2,6 +2,7 @@ package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.activiti.engine.ActivitiObjectNotFoundException;
|
||||
import org.activiti.engine.delegate.event.ActivitiEventType;
|
||||
import org.activiti.engine.impl.persistence.entity.Entity;
|
||||
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
|
||||
@@ -51,8 +52,9 @@ public class MqProcessDefinitionEventListener implements ActivitiEntityEventList
|
||||
this.logger.trace("Triggered by process definition event: {}", entity);
|
||||
|
||||
switch (eventType) {
|
||||
case ENTITY_CREATED:
|
||||
case ENTITY_INITIALIZED:
|
||||
// we cannot use the ProcessDefinitionEntity for ENTITY_INITIALIZED as we need the BpmnModel later and it is not yet cached
|
||||
// we cannot use the ProcessDefinitionEntity for ENTITY_CREATED or ENTITY_INITIALIZED as we need the BpmnModel and it is not yet cached
|
||||
// we must use DeploymentEntity and then dig down for the process definitions
|
||||
break;
|
||||
case ENTITY_DELETED:
|
||||
@@ -61,8 +63,6 @@ public class MqProcessDefinitionEventListener implements ActivitiEntityEventList
|
||||
this.onProcessDefinitionRemoveEvent((ProcessDefinitionEntity) entity);
|
||||
break;
|
||||
default:
|
||||
// we need to start a listener
|
||||
this.onProcessDefinitionAddEvent((ProcessDefinitionEntity) entity);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -70,11 +70,15 @@ public class MqProcessDefinitionEventListener implements ActivitiEntityEventList
|
||||
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
|
||||
this.logger.debug("Triggered by process definition addition: {}", entity);
|
||||
|
||||
// cancel subscriptions on all legacy version process definitions
|
||||
this.unsubscribeOtherMqSubscribeTasks(entity.getId());
|
||||
|
||||
if (this.registry.isMqStart(entity.getId()))
|
||||
this.looper.loop(entity.getId());
|
||||
try {
|
||||
// cancel subscriptions on all legacy version process definitions
|
||||
this.unsubscribeOtherMqSubscribeTasks(entity.getId());
|
||||
|
||||
if (this.registry.isMqStart(entity.getId()))
|
||||
this.looper.loop(entity.getId());
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
this.logger.debug("Added process definition could not be found; because of orders of operation or transaction isolation: {}: {}", entity.getId(), aonfe.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
|
||||
@@ -95,6 +99,8 @@ public class MqProcessDefinitionEventListener implements ActivitiEntityEventList
|
||||
} else {
|
||||
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
|
||||
}
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
throw aonfe;
|
||||
} catch (Exception e) {
|
||||
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
|
||||
}
|
||||
@@ -108,6 +114,8 @@ public class MqProcessDefinitionEventListener implements ActivitiEntityEventList
|
||||
} else {
|
||||
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
|
||||
}
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
throw aonfe;
|
||||
} catch (Exception e) {
|
||||
this.logger.error("The subscriptions could not be cancelled: " + procDefId, e);
|
||||
}
|
||||
|
@@ -13,6 +13,9 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
/**
|
||||
* This delegate sends a message to an MQ queue.
|
||||
*
|
||||
@@ -48,6 +51,7 @@ public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
* @param execution An Activiti delegate execution (source task or execution/task listener).
|
||||
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
|
||||
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
|
||||
* @field mq_metadataProcessScope [optional] `true` to set the `mq_messageId` variable below in the process instance scope; otherwise the variable will be local to the task. `mq_correlationId` is always at the process scope.
|
||||
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
|
||||
* @field mq_priority [optional] A priority of the MQ message. May be an expression. Value depends on MQ protocol.
|
||||
* @field mq_payload [optional] The body of the MQ message. May include expressions.
|
||||
@@ -56,6 +60,7 @@ public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
* @field mq_statusQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send status updates.
|
||||
* @varIn mq_correlationId [optional] The correlationId of the message to send.
|
||||
* @varOut mq_correlationId The correlationId of the message sent.
|
||||
* @varOut mq_messageId The messageId of the message sent.
|
||||
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
|
||||
* @throws BPMNError network The MQ connection experienced network issues.
|
||||
* @throws BPMNError mq An unknown MQ issue occurred.
|
||||
@@ -91,10 +96,11 @@ public class MqPublishDelegate extends AbstractMqDelegate {
|
||||
}
|
||||
}
|
||||
|
||||
String correlationId = communicator.send(destination, message);
|
||||
this.logger.debug("Sent MQ message: {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), correlationId);
|
||||
DeliveredMessage<?> deliveredMessage = communicator.send(destination, message);
|
||||
this.logger.debug("Sent MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), deliveredMessage.getMessageId(), deliveredMessage.getCorrelationId());
|
||||
|
||||
mqExecution.setCorrelationId(correlationId);
|
||||
mqExecution.setMessageId(deliveredMessage.getMessageId());
|
||||
mqExecution.setCorrelationId(deliveredMessage.getCorrelationId());
|
||||
} catch (TimeoutException te) {
|
||||
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
|
||||
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
|
||||
|
@@ -13,6 +13,8 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
/**
|
||||
* This delegate listens for messages on an MQ queue.
|
||||
*
|
||||
|
@@ -8,15 +8,6 @@ public class MqSubscribeDelegateExecution extends MqDelegateExecution {
|
||||
public MqSubscribeDelegateExecution(ProcessEngine services, MqServiceTaskService msts, DelegateExecution execution) {
|
||||
super(services, msts, execution);
|
||||
}
|
||||
|
||||
protected void setMqVariable(String varName, Object value) {
|
||||
varName = this.formulateVariableName(varName);
|
||||
if (this.task.doWriteToProcessScope()) {
|
||||
this.execution.setVariable(varName, value);
|
||||
} else {
|
||||
this.execution.setVariableLocal(varName, value);
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getConcurrencyFromModel() {
|
||||
return this.getFieldValueFromModel(Constants.FIELD_CONCURRENCY, Integer.class);
|
||||
|
@@ -13,6 +13,8 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
/**
|
||||
* This delegate listens for a reply message on an MQ queue.
|
||||
*
|
||||
|
@@ -13,6 +13,7 @@ import org.activiti.bpmn.model.FlowElement;
|
||||
import org.activiti.bpmn.model.SequenceFlow;
|
||||
import org.activiti.bpmn.model.ServiceTask;
|
||||
import org.activiti.bpmn.model.StartEvent;
|
||||
import org.activiti.engine.ActivitiObjectNotFoundException;
|
||||
import org.activiti.engine.ProcessEngine;
|
||||
import org.activiti.engine.delegate.JavaDelegate;
|
||||
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
|
||||
@@ -47,8 +48,12 @@ public class ProcessDefinitionRegistry {
|
||||
if (this.processDefinitionMqSubscribeTasks.containsKey(processDefinitionId)) {
|
||||
return this.processDefinitionMqSubscribeTasks.get(processDefinitionId) != null;
|
||||
} else {
|
||||
// not yet cached; cache it; then return
|
||||
return this.findMqStartSubscribeTask(processDefinitionId) != null;
|
||||
try {
|
||||
// not yet cached; cache it; then return
|
||||
return this.findMqStartSubscribeTask(processDefinitionId) != null;
|
||||
} catch (ActivitiObjectNotFoundException aonfe) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -2,6 +2,8 @@ package com.inteligr8.activiti.mq;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
public interface TransactionalMessageHandler<BodyType> {
|
||||
|
||||
void onMessage(DeliveredMessage<BodyType> message) throws IOException;
|
||||
|
@@ -13,12 +13,12 @@ import javax.jms.JMSException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.GenericDestination;
|
||||
import com.inteligr8.activiti.mq.MqCommunicator;
|
||||
import com.inteligr8.activiti.mq.MqSubscriptionListener;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.CancelCallback;
|
||||
import com.rabbitmq.client.Channel;
|
||||
@@ -72,7 +72,7 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException {
|
||||
public <BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException {
|
||||
String correlationId = message.getCorrelationId() != null ? message.getCorrelationId() : UUID.randomUUID().toString();
|
||||
|
||||
AMQP.BasicProperties bprops = new AMQP.BasicProperties(null, null,
|
||||
@@ -82,7 +82,7 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
correlationId,
|
||||
message.getReplyToQueueName() != null ? message.getReplyToQueueName() : null,
|
||||
null, // expiration
|
||||
null, // messageId
|
||||
UUID.randomUUID().toString(), // messageId
|
||||
null, // timestamp
|
||||
null, // type
|
||||
null, null, null);
|
||||
@@ -99,8 +99,8 @@ public class AmqpCommunicator implements MqCommunicator {
|
||||
} finally {
|
||||
con.close();
|
||||
}
|
||||
|
||||
return correlationId;
|
||||
|
||||
return AmqpDeliveredMessage.transform(bprops);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@@ -8,9 +8,10 @@ import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.ByteArrayChannel;
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.Delivery;
|
||||
|
||||
/**
|
||||
@@ -47,6 +48,19 @@ public class AmqpDeliveredMessage<BodyType> extends AbstractMessage implements D
|
||||
this.getProperties().putAll(message.getProperties().getHeaders());
|
||||
}
|
||||
|
||||
public AmqpDeliveredMessage(AMQP.BasicProperties props, Class<BodyType> bodyType) throws IOException {
|
||||
this.setMessageId(props.getMessageId());
|
||||
this.setCorrelationId(props.getCorrelationId());
|
||||
}
|
||||
|
||||
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(AMQP.BasicProperties props) throws IOException {
|
||||
return new AmqpDeliveredMessage<>(props, null);
|
||||
}
|
||||
|
||||
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(AMQP.BasicProperties props, Class<BodyType> bodyType) throws IOException {
|
||||
return new AmqpDeliveredMessage<>(props, bodyType);
|
||||
}
|
||||
|
||||
public static <BodyType> AmqpDeliveredMessage<BodyType> transform(Delivery message) throws IOException {
|
||||
return new AmqpDeliveredMessage<>(message, null);
|
||||
}
|
||||
|
@@ -12,8 +12,8 @@ import org.apache.commons.io.IOUtils;
|
||||
import org.jgroups.util.UUID;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.Delivery;
|
||||
|
||||
|
@@ -14,12 +14,12 @@ import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.GenericDestination;
|
||||
import com.inteligr8.activiti.mq.MqCommunicator;
|
||||
import com.inteligr8.activiti.mq.MqSubscriptionListener;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
public class JmsCommunicator implements MqCommunicator {
|
||||
|
||||
@@ -68,7 +68,7 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException {
|
||||
public <BodyType> DeliveredMessage<BodyType> send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException {
|
||||
Connection con = this.connect();
|
||||
try {
|
||||
return this.send(con, destination, (JmsPreparedMessage<BodyType>) message);
|
||||
@@ -77,18 +77,18 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
}
|
||||
}
|
||||
|
||||
protected <BodyType> String send(Connection con, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
protected <BodyType> DeliveredMessage<BodyType> send(Connection con, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
Session session = this.start(con);
|
||||
try {
|
||||
String correlationId = this.send(session, destination, message);
|
||||
DeliveredMessage<BodyType> dmessage = this.send(session, destination, message);
|
||||
session.commit();
|
||||
return correlationId;
|
||||
return dmessage;
|
||||
} finally {
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
|
||||
public <BodyType> String send(Session session, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
public <BodyType> DeliveredMessage<BodyType> send(Session session, GenericDestination destination, JmsPreparedMessage<BodyType> message) throws JMSException {
|
||||
MessageProducer messenger = session.createProducer(destination.toJmsQueue(session));
|
||||
try {
|
||||
if (destination.getDeliveryMode() != null)
|
||||
@@ -103,7 +103,7 @@ public class JmsCommunicator implements MqCommunicator {
|
||||
this.logger.debug("Sending message to queue: {}", destination.getQueueName());
|
||||
messenger.send(jmsmsg);
|
||||
|
||||
return jmsmsg.getJMSCorrelationID();
|
||||
return JmsDeliveredMessage.transform(jmsmsg);
|
||||
} finally {
|
||||
messenger.close();
|
||||
}
|
||||
|
@@ -20,8 +20,8 @@ import org.apache.commons.io.IOUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
|
||||
public class JmsDeliveredMessage<BodyType> extends AbstractMessage implements DeliveredMessage<BodyType> {
|
||||
|
||||
|
@@ -14,8 +14,8 @@ import javax.jms.StreamMessage;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.jgroups.util.UUID;
|
||||
|
||||
import com.inteligr8.activiti.mq.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.PreparedMessage;
|
||||
import com.inteligr8.activiti.mq.model.AbstractMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
public class JmsPreparedMessage<BodyType> extends AbstractMessage implements PreparedMessage<BodyType> {
|
||||
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
public interface Message {
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package com.inteligr8.activiti.mq;
|
||||
package com.inteligr8.activiti.mq.model;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
@@ -7,8 +7,6 @@ import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.Queue;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -17,6 +15,9 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.inteligr8.activiti.mq.model.DeliveredMessage;
|
||||
import com.inteligr8.activiti.mq.model.PreparedMessage;
|
||||
|
||||
@Component
|
||||
public class MQDockerIT {
|
||||
|
||||
@@ -106,15 +107,18 @@ public class MQDockerIT {
|
||||
OffsetDateTime beforeSend = OffsetDateTime.now();
|
||||
this.logger.debug("Timestamp before send: {}", beforeSend);
|
||||
|
||||
String correlationId = communicator.send(destination, message);
|
||||
Assertions.assertNotNull(correlationId);
|
||||
DeliveredMessage<String> sentMessage = communicator.send(destination, message);
|
||||
Assertions.assertNotNull(sentMessage);
|
||||
Assertions.assertNotNull(sentMessage.getMessageId());
|
||||
Assertions.assertNotNull(sentMessage.getCorrelationId());
|
||||
|
||||
DeliveredMessage<String> receivedMessage = communicator.receive(destination, 25L, correlationId);
|
||||
DeliveredMessage<String> receivedMessage = communicator.receive(destination, 25L, sentMessage.getCorrelationId());
|
||||
Assertions.assertNotNull(receivedMessage);
|
||||
this.logger.debug("Timestamp of delivery: {}", receivedMessage.getDeliveryTime());
|
||||
|
||||
Assertions.assertNotNull(receivedMessage.getMessageId());
|
||||
Assertions.assertEquals(correlationId, receivedMessage.getCorrelationId());
|
||||
Assertions.assertEquals(sentMessage.getMessageId(), receivedMessage.getMessageId());
|
||||
Assertions.assertEquals(sentMessage.getCorrelationId(), receivedMessage.getCorrelationId());
|
||||
Assertions.assertEquals(message.getReplyToQueueName(), receivedMessage.getReplyToQueueName());
|
||||
Assertions.assertEquals("{}", receivedMessage.getContent());
|
||||
Assertions.assertNotNull(receivedMessage.getDeliveryTime());
|
||||
|
3
src/test/resources/activiti/whitelisted-scripts.conf
Normal file
3
src/test/resources/activiti/whitelisted-scripts.conf
Normal file
@@ -0,0 +1,3 @@
|
||||
javascript
|
||||
js
|
||||
ecmascript
|
13
src/test/vscode/pi.http
Normal file
13
src/test/vscode/pi.http
Normal file
@@ -0,0 +1,13 @@
|
||||
@baseUrl = "http://localhost:8080/activiti-app/api"
|
||||
@username = "admin@app.activiti.com"
|
||||
@password = "admin"
|
||||
|
||||
# @name getPis
|
||||
POST {{baseUrl}}/enterprise/historic-process-instances/query
|
||||
Authorization: BASIC "{{username}}:{{password}}"
|
||||
Content-type: application/json
|
||||
|
||||
{
|
||||
"finished": false
|
||||
}
|
||||
|
Reference in New Issue
Block a user