receive null when closed; treat reply subscription different

This commit is contained in:
2025-03-11 08:58:09 -04:00
parent 2bf612cac2
commit b8d75dd95f
6 changed files with 103 additions and 58 deletions

View File

@@ -86,11 +86,11 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
try {
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
communicator.receive(destination,
if (communicator.receive(destination,
new MqSubscriptionListener() {
@Override
public void consuming(AutoCloseable consumerCloseable) {
subscriptionService.consuming(execution, consumerCloseable);
subscriptionService.autoconsuming(execution, consumerCloseable);
}
@Override
@@ -113,7 +113,10 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
mqExecution.setStatusQueueName(message.getStatusQueueName());
}
}
);
) == null) {
this.logger.debug("Gracefully stopped looking for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
throw new BpmnError("cancelled", "MQ subscription cancelled gracefully");
}
} catch (TimeoutException te) {
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());

View File

@@ -86,7 +86,7 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
try {
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
communicator.receive(destination, correlationId,
if (communicator.receive(destination, correlationId,
new MqSubscriptionListener() {
@Override
public void consuming(AutoCloseable consumerCloseable) {
@@ -112,7 +112,10 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
mqExecution.setStatusQueueName(message.getStatusQueueName());
}
}
);
) == null) {
this.logger.debug("Gracefully stopped looking for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
throw new BpmnError("cancelled", "MQ subscription cancelled gracefully");
}
} catch (TimeoutException te) {
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());

View File

@@ -8,6 +8,7 @@ import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.Execution;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -30,16 +31,37 @@ public class MqSubscriptionService extends MqExecutionService {
* also necessary to remove the execution when it is removed from the
* `activityExecutionMap` map.
*/
private Map<String, AutoCloseable> executionSubscriptionMap = new HashMap<>();
private Map<String, Pair<Boolean, AutoCloseable>> executionSubscriptionMap = new HashMap<>();
/**
* This method registers a consuming execution; an execution that was not
* automatically started as part of the framework. This is for reply
* subscriptions and not starter subscriptions.
*
* @param execution An Activiti execution.
* @param consumerCloseable A closeable MQ consumer.
*/
public synchronized void consuming(DelegateExecution execution, AutoCloseable consumerCloseable) {
this.executing(execution);
this.executionSubscriptionMap.put(execution.getId(), consumerCloseable);
this.executionSubscriptionMap.put(execution.getId(), Pair.of(false, consumerCloseable));
}
/**
* This method registers an auto-consuming execution; an execution that was
* automatically started as part of the framework. This is for starter
* subscriptions and not reply subscriptions.
*
* @param execution An Activiti execution.
* @param consumerCloseable A closeable MQ consumer.
*/
public synchronized void autoconsuming(DelegateExecution execution, AutoCloseable consumerCloseable) {
this.executing(execution);
this.executionSubscriptionMap.put(execution.getId(), Pair.of(true, consumerCloseable));
}
public synchronized void consumed(DelegateExecution execution, AutoCloseable consumerCloseable) {
AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
if (cachedConsumerCloseable != consumerCloseable)
Pair<Boolean, AutoCloseable> cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
if (cachedConsumerCloseable != null && cachedConsumerCloseable.getRight() != consumerCloseable)
throw new IllegalStateException("The consumer objects were expected to be identical");
this.executionSubscriptionMap.remove(execution.getId());
@@ -48,7 +70,7 @@ public class MqSubscriptionService extends MqExecutionService {
@Override
public synchronized boolean cancelled(Execution execution) {
AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
Pair<Boolean, AutoCloseable> cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
if (cachedConsumerCloseable == null) {
this.logger.trace("An execution was cancelled, but had no registered subscription to close: {}", execution.getId());
return false;
@@ -56,7 +78,7 @@ public class MqSubscriptionService extends MqExecutionService {
// this will eventually lead to a call to "consumed() above"
try {
cachedConsumerCloseable.close();
cachedConsumerCloseable.getRight().close();
return super.cancelled(execution);
} catch (Exception e) {
throw new IllegalStateException(e);
@@ -75,10 +97,10 @@ public class MqSubscriptionService extends MqExecutionService {
return false;
this.logger.trace("Removing MQ subscription execution: {}", executionId);
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null) {
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}", executionId, consumer);
consumer.close();
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}", executionId, consumer.getRight());
consumer.getRight().close();
}
return true;
@@ -99,11 +121,11 @@ public class MqSubscriptionService extends MqExecutionService {
String processDefinitionKey = processDefinition.getKey();
for (String executionId : executionIds) {
this.logger.trace("Removing MQ subscription execution: {}: {}", processDefinitionId, executionId);
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null) {
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer);
consumer.close();
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null && consumer.getLeft()) {
this.logger.trace("Removing MQ subscription execution: {}: {}", processDefinitionId, executionId);
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer.getRight());
consumer.getRight().close();
}
}
@@ -122,11 +144,11 @@ public class MqSubscriptionService extends MqExecutionService {
String processDefinitionKey = latestProcessDefinition.getKey();
for (String executionId : executionIds) {
this.logger.trace("Removing MQ subscription execution: {}: {}", latestProcessDefinitionId, executionId);
AutoCloseable consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null) {
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer);
consumer.close();
Pair<Boolean, AutoCloseable> consumer = this.executionSubscriptionMap.remove(executionId);
if (consumer != null && consumer.getLeft()) {
this.logger.trace("Removing MQ subscription execution: {}: {}", latestProcessDefinitionId, executionId);
this.logger.debug("Closing MessageConsumer to terminate a subscription early: {}: {}: {}", processDefinitionKey, executionId, consumer.getRight());
consumer.getRight().close();
}
}

View File

@@ -143,37 +143,43 @@ public class JmsCommunicator implements MqCommunicator {
public <BodyType> JmsDeliveredMessage<BodyType> receive(Session session, GenericDestination destination, long timeoutInMillis, String correlationId, MqSubscriptionListener listener) throws JMSException, TimeoutException {
String messageSelector = correlationId == null ? null : ("JMSCorrelationID='" + correlationId + "'");
Message receivedMessage = null;
MessageConsumer messenger = session.createConsumer(destination.toJmsQueue(session), messageSelector);
try {
if (listener != null)
listener.consuming(messenger);
if (timeoutInMillis < 0L) {
this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName());
return JmsDeliveredMessage.transform(messenger.receive());
} else if (timeoutInMillis == 0L) {
this.logger.debug("Checking for message without waiting: {}", destination.getQueueName());
return JmsDeliveredMessage.transform(messenger.receiveNoWait());
} else {
this.logger.debug("Waiting for message for {} ms: {}", timeoutInMillis, destination.getQueueName());
try {
long startTime = System.currentTimeMillis();
Message message = messenger.receive(timeoutInMillis);
if (timeoutInMillis < 0L) {
this.logger.debug("Waiting for message indefinitely: {}", destination.getQueueName());
receivedMessage = messenger.receive();
} else if (timeoutInMillis == 0L) {
this.logger.debug("Checking for message without waiting: {}", destination.getQueueName());
receivedMessage = messenger.receiveNoWait();
} else {
this.logger.debug("Waiting for message for {} ms: {}", timeoutInMillis, destination.getQueueName());
receivedMessage = messenger.receive(timeoutInMillis);
}
long elapsedTime = System.currentTimeMillis() - startTime;
if (message != null) {
if (receivedMessage != null) {
this.logger.debug("Received message after {} ms: {}", elapsedTime, destination.getQueueName());
return JmsDeliveredMessage.transform(message);
return JmsDeliveredMessage.transform(receivedMessage);
}
if (timeoutInMillis > 0L) {
if (elapsedTime >= timeoutInMillis)
throw new TimeoutException("A timeout of " + timeoutInMillis + " ms was reached");
this.logger.debug("Done waiting for message after {} minutes: {}", elapsedTime / 60000L, destination.getQueueName());
}
if (elapsedTime < timeoutInMillis) {
throw new JMSException("The reading of the queue ended prematurely");
} else {
throw new TimeoutException("A timeout of " + timeoutInMillis + " ms was reached");
}
return null;
} finally {
if (listener != null)
listener.consumed(messenger);
}
} finally {
if (listener != null)
listener.consumed(messenger);
messenger.close();
}
}

View File

@@ -1,13 +0,0 @@
@baseUrl = "http://localhost:8080/activiti-app/api"
@username = "admin@app.activiti.com"
@password = "admin"
# @name getPis
POST {{baseUrl}}/enterprise/historic-process-instances/query
Authorization: BASIC "{{username}}:{{password}}"
Content-type: application/json
{
"finished": false
}

24
src/test/vscode/test.http Normal file
View File

@@ -0,0 +1,24 @@
@baseUrl = http://localhost:8080/activiti-app
@username = admin@app.activiti.com
@password = admin
@basic = YWRtaW5AYXBwLmFjdGl2aXRpLmNvbTphZG1pbg==
###
# @name getPis
POST {{baseUrl}}/api/enterprise/historic-process-instances/query
Authorization: BASIC {{username}}:{{password}}
Content-type: application/json
{
"finished": false
}
###
# @name getPis2
GET {{baseUrl}}/api/runtime/process-instances?tenantId=tenant_1
Authorization: BASIC {{username}}:{{password}}
###
# @name getExecs
GET {{baseUrl}}/api/runtime/executions?tenantId=tenant_1
Authorization: BASIC {{username}}:{{password}}