Merge branch 'develop' into stable

This commit is contained in:
2025-02-14 11:25:41 -05:00
11 changed files with 317 additions and 53 deletions

View File

@@ -33,7 +33,7 @@ public abstract class AbstractMqDelegate implements JavaDelegate {
private Map<String, MqCommunicator> communicators = new HashMap<>();
protected synchronized MqCommunicator getConnection(String connectorId) throws JMSException, TimeoutException, IOException {
protected synchronized MqCommunicator getCommunicator(String connectorId) throws JMSException, TimeoutException, IOException {
MqCommunicator communicator = this.communicators.get(connectorId);
if (communicator == null) {
EndpointConfiguration endpointConfig = this.endpointService.getConfigurationByName(connectorId);

View File

@@ -5,6 +5,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -63,6 +64,9 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
@Autowired
private TenantFinderService tenantFinderService;
@Autowired
private MqSubscriptionService subscriptionService;
private Map<String, AbstractActivityListener> activeListeners = new HashMap<>();
/**
@@ -235,11 +239,13 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
protected void onProcessDefinitionAddEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition addition: {}", entity);
this.unsubscribeMqSubscribeTasks(entity.getId());
ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
if (task == null)
return;
this.loopMqSubscribeTask(entity.getId(), task);
}
@@ -253,6 +259,8 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
for (ProcessDefinitionEntity procDefEntity : procDefEntities) {
this.logger.debug("Inspecting process definition: {}: {}: {}", procDefEntity.getId(), procDefEntity.getKey(), procDefEntity.getName());
this.unsubscribeMqSubscribeTasks(procDefEntity.getId());
ServiceTask task = this.findMqStartSubscribeTask(procDefEntity.getId());
if (task == null)
@@ -264,9 +272,29 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
protected void onProcessDefinitionRemoveEvent(ProcessDefinitionEntity entity) {
this.logger.debug("Triggered by process definition removal: {}", entity);
this.unsubscribeMqSubscribeTasks(entity.getId());
ServiceTask task = this.findMqStartSubscribeTask(entity.getId());
if (task == null)
return;
this.deloopMqSubscribeTask(entity.getId());
}
protected void unsubscribeMqSubscribeTasks(String procDefId) {
try {
Set<String> executionIds = this.subscriptionService.clear(procDefId);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Subscription executions ended early: {}: {}", procDefId, executionIds);
} else {
this.logger.info("Subscriptions ended early: {}: {}", procDefId, executionIds.size());
}
} catch (Exception e) {
this.logger.error("The subscriptions could not be cleared: " + procDefId, e);
}
}
protected synchronized void loopMqSubscribeTask(String processDefId, ServiceTask task) {
// start a process instance on the process
this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue", processDefId);

View File

@@ -14,33 +14,49 @@ public interface MqCommunicator {
<BodyType> String send(GenericDestination destination, PreparedMessage<BodyType> message) throws JMSException, IOException, TimeoutException;
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination) throws JMSException, IOException, TimeoutException {
return this.receive(destination, -1L, null, null);
return this.receive(destination, -1L, null, null, null);
}
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, MqSubscriptionListener listener) throws JMSException, IOException, TimeoutException {
return this.receive(destination, -1L, null, listener, null);
}
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis) throws JMSException, IOException, TimeoutException {
return this.receive(destination, timeoutInMillis, null, null);
return this.receive(destination, timeoutInMillis, null, null, null);
}
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, MqSubscriptionListener listener) throws JMSException, IOException, TimeoutException {
return this.receive(destination, timeoutInMillis, null, listener, null);
}
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, String correlationId) throws JMSException, IOException, TimeoutException {
return this.receive(destination, -1L, correlationId, null);
return this.receive(destination, -1L, correlationId, null, null);
}
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, String correlationId, MqSubscriptionListener listener) throws JMSException, IOException, TimeoutException {
return this.receive(destination, -1L, correlationId, listener, null);
}
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId) throws JMSException, IOException, TimeoutException {
return this.receive(destination, timeoutInMillis, correlationId, null);
return this.receive(destination, timeoutInMillis, correlationId, null, null);
}
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
return this.receive(destination, -1L, null, handler);
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener) throws JMSException, IOException, TimeoutException {
return this.receive(destination, timeoutInMillis, correlationId, listener, null);
}
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
return this.receive(destination, timeoutInMillis, null, handler);
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
return this.receive(destination, -1L, null, listener, handler);
}
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
return this.receive(destination, -1L, correlationId, handler);
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
return this.receive(destination, timeoutInMillis, null, listener, handler);
}
<BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException;
default <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, String correlationId, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
return this.receive(destination, -1L, correlationId, listener, handler);
}
<BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException;
}

View File

@@ -0,0 +1,101 @@
package com.inteligr8.activiti.mq;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.activiti.engine.delegate.DelegateExecution;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class MqExecutionService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* The size of the keys is limited to the number of process definitions
* defined. It would actually only contain ones with an MQ subscribe task.
* Even if we kept versioned or inactive process definitions in the map, it
* would never be a significant memory hog.
*
* The size of the values is limited to the number of MQ subscribe tasks
* defined in each process definition. So it would never be a significant
* memory hog.
*
* The size of the keys/values have nothing to do with the number of
* process instances or executions.
*
* This means it does not need to be trimmed. However, it is a good idea
* to remove process definition keys that have no active executions. You
* could do the same with active activities, but cleaning up process
* definitions will clean those up too.
*/
private MultiValuedMap<String, String> processDefinitionActivityMap = new HashSetValuedHashMap<>();
/**
* The size of the keys is limited to the number of MQ subscribe tasks
* defined in all process definitions. So it would never be a significant
* memory hog.
*
* The size of the values has no limit. It will grow with the number of
* executions (related to process instances).
*
* This means the map values need to be trimmed. When an MQ subscribe task
* is completed, it is paramount to remove the execution from the values of
* this map. It is also a good idea to remove the activity key when it is
* removed from the `processDefinitionActivityMap` map; and to propagate
* the removal of executions from the `executionSubscriptionMap` map.
*/
private MultiValuedMap<Pair<String, String>, String> activityExecutionMap = new HashSetValuedHashMap<>();
public synchronized void executing(DelegateExecution execution) {
this.processDefinitionActivityMap.put(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
Pair<String, String> key = this.toKey(execution);
this.activityExecutionMap.put(key, execution.getId());
}
public synchronized void executed(DelegateExecution execution) {
Pair<String, String> key = this.toKey(execution);
this.activityExecutionMap.removeMapping(key, execution.getId());
}
/**
* @param processDefinitionId A process definition identifier.
* @return A set of execution identifiers that were in the now cleared map.
*/
public synchronized Set<String> clear(String processDefinitionId) throws Exception {
Collection<String> activityIds = this.processDefinitionActivityMap.get(processDefinitionId);
if (activityIds == null) {
this.logger.debug("No activities/executions to clear for process definition: {}", processDefinitionId);
return Collections.emptySet();
}
Set<String> executionIds = new HashSet<>();
for (String activityId : activityIds) {
this.logger.trace("Clearing process definition activity: {}: {}", processDefinitionId, activityId);
Pair<String, String> key = this.toKey(processDefinitionId, activityId);
Collection<String> activityExecutionIds = this.activityExecutionMap.get(key);
if (activityExecutionIds != null)
executionIds.addAll(activityExecutionIds);
}
return executionIds;
}
protected Pair<String, String> toKey(DelegateExecution execution) {
return this.toKey(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
}
protected Pair<String, String> toKey(String processDefinitionId, String activityId) {
return Pair.of(processDefinitionId, activityId);
}
}

View File

@@ -70,7 +70,7 @@ public class MqPublishDelegate extends AbstractMqDelegate {
destination.setQueueName(mqExecution.getQueueNameFromModel());
try {
MqCommunicator communicator = this.getConnection(mqExecution.getConnectorIdFromModel());
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
PreparedMessage<String> message = communicator.createPreparedMessage();
if (mqExecution.getStatusQueueNameFromModel() != null)

View File

@@ -32,6 +32,9 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
@Autowired
private MqServiceTaskService msts;
@Autowired
private MqSubscriptionService subscriptionService;
/**
* This delegate listens for messages on an MQ queue.
@@ -70,7 +73,7 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
* @see mqSubscribeReplyDelegate#delegate
*/
@Override
public void execute(DelegateExecution execution) {
public void execute(final DelegateExecution execution) {
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, this.msts, execution);
mqExecution.validate();
@@ -80,22 +83,35 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
this.logger.debug("Will look for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
try {
MqCommunicator communicator = this.getConnection(mqExecution.getConnectorIdFromModel());
communicator.receive(destination, new TransactionalMessageHandler<String>() {
@Override
public void onMessage(DeliveredMessage<String> message) throws IOException {
logger.debug("Received MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), message.getCorrelationId(), message.getMessageId());
if (message.getMessageId() != null)
mqExecution.setMqVariable(Constants.VARIABLE_MESSAGE_ID, message.getMessageId());
mqExecution.setCorrelationId(message.getCorrelationId());
mqExecution.setDeliveryTime(message.getDeliveryTime());
mqExecution.setPriority(message.getPriority());
mqExecution.setPayload(message.getContent(), message.getContentType());
mqExecution.setReplyToQueueName(message.getReplyToQueueName());
mqExecution.setStatusQueueName(message.getStatusQueueName());
}
});
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
communicator.receive(destination,
new MqSubscriptionListener() {
@Override
public void consuming(AutoCloseable consumerCloseable) {
subscriptionService.consuming(execution, consumerCloseable);
}
@Override
public void consumed(AutoCloseable consumerCloseable) {
subscriptionService.consumed(execution, consumerCloseable);
}
},
new TransactionalMessageHandler<String>() {
@Override
public void onMessage(DeliveredMessage<String> message) throws IOException {
logger.debug("Received MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), message.getCorrelationId(), message.getMessageId());
if (message.getMessageId() != null)
mqExecution.setMqVariable(Constants.VARIABLE_MESSAGE_ID, message.getMessageId());
mqExecution.setCorrelationId(message.getCorrelationId());
mqExecution.setDeliveryTime(message.getDeliveryTime());
mqExecution.setPriority(message.getPriority());
mqExecution.setPayload(message.getContent(), message.getContentType());
mqExecution.setReplyToQueueName(message.getReplyToQueueName());
mqExecution.setStatusQueueName(message.getStatusQueueName());
}
}
);
} catch (TimeoutException te) {
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());

View File

@@ -33,6 +33,9 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
@Autowired
private MqServiceTaskService msts;
@Autowired
private MqSubscriptionService subscriptionService;
/**
* This method listens for a reply message on an MQ queue.
*
@@ -80,21 +83,34 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
this.logger.debug("Will look only for MQ message: {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), correlationId);
try {
MqCommunicator communicator = this.getConnection(mqExecution.getConnectorIdFromModel());
communicator.receive(destination, correlationId, new TransactionalMessageHandler<String>() {
@Override
public void onMessage(DeliveredMessage<String> message) {
logger.debug("Received MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), message.getCorrelationId(), message.getMessageId());
if (message.getMessageId() != null)
mqExecution.setMqVariable(Constants.VARIABLE_MESSAGE_ID, message.getMessageId());
mqExecution.setDeliveryTime(message.getDeliveryTime());
mqExecution.setPriority(message.getPriority());
mqExecution.setPayload(message.getContent(), message.getContentType());
mqExecution.setReplyToQueueName(message.getReplyToQueueName());
mqExecution.setStatusQueueName(message.getStatusQueueName());
}
});
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
communicator.receive(destination, correlationId,
new MqSubscriptionListener() {
@Override
public void consuming(AutoCloseable consumerCloseable) {
subscriptionService.consuming(execution, consumerCloseable);
}
@Override
public void consumed(AutoCloseable consumerCloseable) {
subscriptionService.consumed(execution, consumerCloseable);
}
},
new TransactionalMessageHandler<String>() {
@Override
public void onMessage(DeliveredMessage<String> message) {
logger.debug("Received MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), message.getCorrelationId(), message.getMessageId());
if (message.getMessageId() != null)
mqExecution.setMqVariable(Constants.VARIABLE_MESSAGE_ID, message.getMessageId());
mqExecution.setDeliveryTime(message.getDeliveryTime());
mqExecution.setPriority(message.getPriority());
mqExecution.setPayload(message.getContent(), message.getContentType());
mqExecution.setReplyToQueueName(message.getReplyToQueueName());
mqExecution.setStatusQueueName(message.getStatusQueueName());
}
}
);
} catch (TimeoutException te) {
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());

View File

@@ -0,0 +1,11 @@
package com.inteligr8.activiti.mq;
public interface MqSubscriptionListener {
default void consuming(AutoCloseable consumerCloseable) {
}
default void consumed(AutoCloseable consumerCloseable) {
}
}

View File

@@ -0,0 +1,62 @@
package com.inteligr8.activiti.mq;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.activiti.engine.delegate.DelegateExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class MqSubscriptionService extends MqExecutionService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* The size of the map has no limit. It will grow with the number of
* executions (related to process instances).
*
* This means the map needs to be trimmed. When an MQ subscribe task is
* completed, it is paramount to remove the execution from the map. It is
* also necessary to remove the execution when it is removed from the
* `activityExecutionMap` map.
*/
private Map<String, AutoCloseable> executionSubscriptionMap = new HashMap<>();
public synchronized void consuming(DelegateExecution execution, AutoCloseable consumerCloseable) {
this.executing(execution);
this.executionSubscriptionMap.put(execution.getId(), consumerCloseable);
}
public synchronized void consumed(DelegateExecution execution, AutoCloseable consumerCloseable) {
AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
if (cachedConsumerCloseable != consumerCloseable)
throw new IllegalStateException("The consumer objects were expected to be identical");
this.executionSubscriptionMap.remove(execution.getId());
this.executed(execution);
}
/**
* @param processDefinitionId A process definition identifier.
* @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended.
*/
@Override
public synchronized Set<String> clear(String processDefinitionId) throws Exception {
Set<String> executionIds = super.clear(processDefinitionId);
for (String executionId : executionIds) {
this.logger.trace("Clearing process definition execution: {}: {}", processDefinitionId, executionId);
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null) {
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionId, executionId, consumer);
consumer.close();
}
}
return executionIds;
}
}

View File

@@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory;
import com.inteligr8.activiti.mq.DeliveredMessage;
import com.inteligr8.activiti.mq.GenericDestination;
import com.inteligr8.activiti.mq.MqCommunicator;
import com.inteligr8.activiti.mq.MqSubscriptionListener;
import com.inteligr8.activiti.mq.PreparedMessage;
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
import com.rabbitmq.client.AMQP;
@@ -103,11 +104,14 @@ public class AmqpCommunicator implements MqCommunicator {
}
@Override
public <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, final String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
public <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, final String correlationId, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, IOException, TimeoutException {
Connection con = this.connect();
try {
Channel channel = con.createChannel();
try {
if (listener != null)
listener.consuming(channel);
channel.queueDeclare(destination.getQueueName(), true, false, false, Collections.emptyMap());
channel.basicConsume(
destination.getQueueName(),
@@ -115,6 +119,7 @@ public class AmqpCommunicator implements MqCommunicator {
new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
// FIXME we need a better solution here; maybe one consumer that distributes them baced on correlationId
if (correlationId != null && !correlationId.equals(message.getProperties().getCorrelationId()))
channel.basicNack(message.getEnvelope().getDeliveryTag(), true, true);
@@ -129,6 +134,9 @@ public class AmqpCommunicator implements MqCommunicator {
}
);
} finally {
if (listener != null)
listener.consumed(channel);
channel.close();
}
} finally {

View File

@@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
import com.inteligr8.activiti.mq.DeliveredMessage;
import com.inteligr8.activiti.mq.GenericDestination;
import com.inteligr8.activiti.mq.MqCommunicator;
import com.inteligr8.activiti.mq.MqSubscriptionListener;
import com.inteligr8.activiti.mq.PreparedMessage;
import com.inteligr8.activiti.mq.TransactionalMessageHandler;
@@ -109,21 +110,21 @@ public class JmsCommunicator implements MqCommunicator {
}
@Override
public <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, TimeoutException, IOException {
public <BodyType> DeliveredMessage<BodyType> receive(GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, TimeoutException, IOException {
Connection con = this.connect();
try {
return this.receive(con, destination, timeoutInMillis, correlationId, handler);
return this.receive(con, destination, timeoutInMillis, correlationId, listener, handler);
} finally {
con.close();
}
}
protected <BodyType> JmsDeliveredMessage<BodyType> receive(Connection con, GenericDestination destination, long timeoutInMillis, String correlationId, TransactionalMessageHandler<BodyType> handler) throws JMSException, TimeoutException, IOException {
protected <BodyType> JmsDeliveredMessage<BodyType> receive(Connection con, GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener, TransactionalMessageHandler<BodyType> handler) throws JMSException, TimeoutException, IOException {
con.start();
try {
Session session = this.start(con);
try {
JmsDeliveredMessage<BodyType> message = this.receive(session, destination, timeoutInMillis, correlationId);
JmsDeliveredMessage<BodyType> message = this.receive(session, destination, timeoutInMillis, correlationId, listener);
if (message == null)
return null;
@@ -140,10 +141,13 @@ public class JmsCommunicator implements MqCommunicator {
}
}
public <BodyType> JmsDeliveredMessage<BodyType> receive(Session session, GenericDestination destination, long timeoutInMillis, String correlationId) throws JMSException, TimeoutException {
public <BodyType> JmsDeliveredMessage<BodyType> receive(Session session, GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener) throws JMSException, TimeoutException {
String messageSelector = correlationId == null ? null : ("JMSCorrelationID='" + correlationId + "'");
MessageConsumer messenger = session.createConsumer(destination.toJmsQueue(session), messageSelector);
try {
if (listener != null)
listener.consuming(messenger);
if (timeoutInMillis < 0L) {
this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName());
return JmsDeliveredMessage.transform(messenger.receive());
@@ -167,8 +171,10 @@ public class JmsCommunicator implements MqCommunicator {
}
}
} finally {
if (listener != null)
listener.consumed(messenger);
messenger.close();
}
}
}