From b8d75dd95fd6b49e78e77df96b20d8643b24b1c6 Mon Sep 17 00:00:00 2001 From: "Brian M. Long" Date: Tue, 11 Mar 2025 08:58:09 -0400 Subject: [PATCH] receive null when closed; treat reply subscription different --- .../activiti/mq/MqSubscribeDelegate.java | 9 ++- .../activiti/mq/MqSubscribeReplyDelegate.java | 7 ++- .../activiti/mq/MqSubscriptionService.java | 62 +++++++++++++------ .../activiti/mq/jms/JmsCommunicator.java | 46 ++++++++------ src/test/vscode/pi.http | 13 ---- src/test/vscode/test.http | 24 +++++++ 6 files changed, 103 insertions(+), 58 deletions(-) delete mode 100644 src/test/vscode/pi.http create mode 100644 src/test/vscode/test.http diff --git a/src/main/java/com/inteligr8/activiti/mq/MqSubscribeDelegate.java b/src/main/java/com/inteligr8/activiti/mq/MqSubscribeDelegate.java index e037bc6..9090e8a 100644 --- a/src/main/java/com/inteligr8/activiti/mq/MqSubscribeDelegate.java +++ b/src/main/java/com/inteligr8/activiti/mq/MqSubscribeDelegate.java @@ -86,11 +86,11 @@ public class MqSubscribeDelegate extends AbstractMqDelegate { try { MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel()); - communicator.receive(destination, + if (communicator.receive(destination, new MqSubscriptionListener() { @Override public void consuming(AutoCloseable consumerCloseable) { - subscriptionService.consuming(execution, consumerCloseable); + subscriptionService.autoconsuming(execution, consumerCloseable); } @Override @@ -113,7 +113,10 @@ public class MqSubscribeDelegate extends AbstractMqDelegate { mqExecution.setStatusQueueName(message.getStatusQueueName()); } } - ); + ) == null) { + this.logger.debug("Gracefully stopped looking for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel()); + throw new BpmnError("cancelled", "MQ subscription cancelled gracefully"); + } } catch (TimeoutException te) { this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te); throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage()); diff --git a/src/main/java/com/inteligr8/activiti/mq/MqSubscribeReplyDelegate.java b/src/main/java/com/inteligr8/activiti/mq/MqSubscribeReplyDelegate.java index 62949f2..ca80606 100644 --- a/src/main/java/com/inteligr8/activiti/mq/MqSubscribeReplyDelegate.java +++ b/src/main/java/com/inteligr8/activiti/mq/MqSubscribeReplyDelegate.java @@ -86,7 +86,7 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate { try { MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel()); - communicator.receive(destination, correlationId, + if (communicator.receive(destination, correlationId, new MqSubscriptionListener() { @Override public void consuming(AutoCloseable consumerCloseable) { @@ -112,7 +112,10 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate { mqExecution.setStatusQueueName(message.getStatusQueueName()); } } - ); + ) == null) { + this.logger.debug("Gracefully stopped looking for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel()); + throw new BpmnError("cancelled", "MQ subscription cancelled gracefully"); + } } catch (TimeoutException te) { this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te); throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage()); diff --git a/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java b/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java index 8328a34..6e27716 100644 --- a/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java +++ b/src/main/java/com/inteligr8/activiti/mq/MqSubscriptionService.java @@ -8,6 +8,7 @@ import org.activiti.engine.ProcessEngine; import org.activiti.engine.delegate.DelegateExecution; import org.activiti.engine.repository.ProcessDefinition; import org.activiti.engine.runtime.Execution; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -30,16 +31,37 @@ public class MqSubscriptionService extends MqExecutionService { * also necessary to remove the execution when it is removed from the * `activityExecutionMap` map. */ - private Map executionSubscriptionMap = new HashMap<>(); - + private Map> executionSubscriptionMap = new HashMap<>(); + + /** + * This method registers a consuming execution; an execution that was not + * automatically started as part of the framework. This is for reply + * subscriptions and not starter subscriptions. + * + * @param execution An Activiti execution. + * @param consumerCloseable A closeable MQ consumer. + */ public synchronized void consuming(DelegateExecution execution, AutoCloseable consumerCloseable) { this.executing(execution); - this.executionSubscriptionMap.put(execution.getId(), consumerCloseable); + this.executionSubscriptionMap.put(execution.getId(), Pair.of(false, consumerCloseable)); + } + + /** + * This method registers an auto-consuming execution; an execution that was + * automatically started as part of the framework. This is for starter + * subscriptions and not reply subscriptions. + * + * @param execution An Activiti execution. + * @param consumerCloseable A closeable MQ consumer. + */ + public synchronized void autoconsuming(DelegateExecution execution, AutoCloseable consumerCloseable) { + this.executing(execution); + this.executionSubscriptionMap.put(execution.getId(), Pair.of(true, consumerCloseable)); } public synchronized void consumed(DelegateExecution execution, AutoCloseable consumerCloseable) { - AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId()); - if (cachedConsumerCloseable != consumerCloseable) + Pair cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId()); + if (cachedConsumerCloseable != null && cachedConsumerCloseable.getRight() != consumerCloseable) throw new IllegalStateException("The consumer objects were expected to be identical"); this.executionSubscriptionMap.remove(execution.getId()); @@ -48,7 +70,7 @@ public class MqSubscriptionService extends MqExecutionService { @Override public synchronized boolean cancelled(Execution execution) { - AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId()); + Pair cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId()); if (cachedConsumerCloseable == null) { this.logger.trace("An execution was cancelled, but had no registered subscription to close: {}", execution.getId()); return false; @@ -56,7 +78,7 @@ public class MqSubscriptionService extends MqExecutionService { // this will eventually lead to a call to "consumed() above" try { - cachedConsumerCloseable.close(); + cachedConsumerCloseable.getRight().close(); return super.cancelled(execution); } catch (Exception e) { throw new IllegalStateException(e); @@ -75,10 +97,10 @@ public class MqSubscriptionService extends MqExecutionService { return false; this.logger.trace("Removing MQ subscription execution: {}", executionId); - AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId); + Pair consumer = this.executionSubscriptionMap.remove(executionId); if (consumer != null) { - this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}", executionId, consumer); - consumer.close(); + this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}", executionId, consumer.getRight()); + consumer.getRight().close(); } return true; @@ -99,11 +121,11 @@ public class MqSubscriptionService extends MqExecutionService { String processDefinitionKey = processDefinition.getKey(); for (String executionId : executionIds) { - this.logger.trace("Removing MQ subscription execution: {}: {}", processDefinitionId, executionId); - AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId); - if (consumer != null) { - this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer); - consumer.close(); + Pair consumer = this.executionSubscriptionMap.remove(executionId); + if (consumer != null && consumer.getLeft()) { + this.logger.trace("Removing MQ subscription execution: {}: {}", processDefinitionId, executionId); + this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer.getRight()); + consumer.getRight().close(); } } @@ -122,11 +144,11 @@ public class MqSubscriptionService extends MqExecutionService { String processDefinitionKey = latestProcessDefinition.getKey(); for (String executionId : executionIds) { - this.logger.trace("Removing MQ subscription execution: {}: {}", latestProcessDefinitionId, executionId); - AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId); - if (consumer != null) { - this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer); - consumer.close(); + Pair consumer = this.executionSubscriptionMap.remove(executionId); + if (consumer != null && consumer.getLeft()) { + this.logger.trace("Removing MQ subscription execution: {}: {}", latestProcessDefinitionId, executionId); + this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer.getRight()); + consumer.getRight().close(); } } diff --git a/src/main/java/com/inteligr8/activiti/mq/jms/JmsCommunicator.java b/src/main/java/com/inteligr8/activiti/mq/jms/JmsCommunicator.java index e5db86e..01cb954 100644 --- a/src/main/java/com/inteligr8/activiti/mq/jms/JmsCommunicator.java +++ b/src/main/java/com/inteligr8/activiti/mq/jms/JmsCommunicator.java @@ -143,37 +143,43 @@ public class JmsCommunicator implements MqCommunicator { public JmsDeliveredMessage receive(Session session, GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener) throws JMSException, TimeoutException { String messageSelector = correlationId == null ? null : ("JMSCorrelationID='" + correlationId + "'"); + Message receivedMessage = null; + MessageConsumer messenger = session.createConsumer(destination.toJmsQueue(session), messageSelector); try { if (listener != null) listener.consuming(messenger); - - if (timeoutInMillis < 0L) { - this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName()); - return JmsDeliveredMessage.transform(messenger.receive()); - } else if (timeoutInMillis == 0L) { - this.logger.debug("Checking for message without waiting: {}", destination.getQueueName()); - return JmsDeliveredMessage.transform(messenger.receiveNoWait()); - } else { - this.logger.debug("Waiting for message for {} ms: {}", timeoutInMillis, destination.getQueueName()); + try { long startTime = System.currentTimeMillis(); - Message message = messenger.receive(timeoutInMillis); + if (timeoutInMillis < 0L) { + this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName()); + receivedMessage = messenger.receive(); + } else if (timeoutInMillis == 0L) { + this.logger.debug("Checking for message without waiting: {}", destination.getQueueName()); + receivedMessage = messenger.receiveNoWait(); + } else { + this.logger.debug("Waiting for message for {} ms: {}", timeoutInMillis, destination.getQueueName()); + receivedMessage = messenger.receive(timeoutInMillis); + } + long elapsedTime = System.currentTimeMillis() - startTime; - if (message != null) { + if (receivedMessage != null) { this.logger.debug("Received message after {} ms: {}", elapsedTime, destination.getQueueName()); - return JmsDeliveredMessage.transform(message); + return JmsDeliveredMessage.transform(receivedMessage); + } + + if (timeoutInMillis > 0L) { + if (elapsedTime >= timeoutInMillis) + throw new TimeoutException("A timeout of " + timeoutInMillis + " ms was reached"); + this.logger.debug("Done waiting for message after {} minutes: {}", elapsedTime / 60000L, destination.getQueueName()); } - if (elapsedTime < timeoutInMillis) { - throw new JMSException("The reading of the queue ended prematurely"); - } else { - throw new TimeoutException("A timeout of " + timeoutInMillis + " ms was reached"); - } + return null; + } finally { + if (listener != null) + listener.consumed(messenger); } } finally { - if (listener != null) - listener.consumed(messenger); - messenger.close(); } } diff --git a/src/test/vscode/pi.http b/src/test/vscode/pi.http deleted file mode 100644 index de1a907..0000000 --- a/src/test/vscode/pi.http +++ /dev/null @@ -1,13 +0,0 @@ -@baseUrl = "http://localhost:8080/activiti-app/api" -@username = "admin@app.activiti.com" -@password = "admin" - -# @name getPis -POST {{baseUrl}}/enterprise/historic-process-instances/query -Authorization: BASIC "{{username}}:{{password}}" -Content-type: application/json - -{ - "finished": false -} - diff --git a/src/test/vscode/test.http b/src/test/vscode/test.http new file mode 100644 index 0000000..fbe29df --- /dev/null +++ b/src/test/vscode/test.http @@ -0,0 +1,24 @@ +@baseUrl = http://localhost:8080/activiti-app +@username = admin@app.activiti.com +@password = admin +@basic = YWRtaW5AYXBwLmFjdGl2aXRpLmNvbTphZG1pbg== + +### +# @name getPis +POST {{baseUrl}}/api/enterprise/historic-process-instances/query +Authorization: BASIC {{username}}:{{password}} +Content-type: application/json + +{ + "finished": false +} + +### +# @name getPis2 +GET {{baseUrl}}/api/runtime/process-instances?tenantId=tenant_1 +Authorization: BASIC {{username}}:{{password}} + +### +# @name getExecs +GET {{baseUrl}}/api/runtime/executions?tenantId=tenant_1 +Authorization: BASIC {{username}}:{{password}}