Files
mq-activiti-ext/src/main/java/com/inteligr8/activiti/mq/MqSubscribeDelegate.java

139 lines
7.3 KiB
Java

package com.inteligr8.activiti.mq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import javax.jms.JMSException;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.BpmnError;
import org.activiti.engine.delegate.DelegateExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.inteligr8.activiti.mq.model.DeliveredMessage;
/**
* This delegate listens for messages on an MQ queue.
*
* This is expected to exist in a Service Task immediately after a plain start
* activity. This will cause process instances to automatically be created in
* order to maintain the MQ subscription as messages are received. If used in
* any other way, it will error and the process will fail validation.
*
* This does not wait for a response to a specific message, but instead to all
* messages put on an MQ queue. That is for the `mqSubscribeReplyDelegate`
* delegate.
*
* If you have more than one MQ communication in a single process, set the
* `mq_messageName` field on this task. The resultant variables will then
* include that message name (e.g. `mq_messageId_{mq_messageName}`).
*
* This requires a long running connection to MQ. It runs in a long running
* Activiti job/execution. If there is a failure or the server is restarted,
* the Activiti job will fail and automatically retry per the Activiti standard
* features. After exhausting retries, it may eventually dead-letter. Retry
* the job to continue the subscription.
*
* @author brian@inteligr8.com
* @since 1.0
* @see mqPublishDelegate#
* @see mqSubscribeReplyDelegate#
*/
@Component(MqSubscribeDelegate.BEAN_ID)
public class MqSubscribeDelegate extends AbstractMqDelegate {
public static final String BEAN_ID = "mqSubscribeDelegate";
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ProcessEngine services;
@Autowired
private MqServiceTaskService msts;
@Autowired
private MqSubscriptionService subscriptionService;
/**
* This method makes this bean an Activiti delegate.
*
* @param execution An Activiti delegate execution (source task or execution/task listener).
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
* @field mq_metadataProcessScope [optional] `true` to set all Activiti result variables in the process instance scope; otherwise the variables will be local to the task.
* @field mq_messageName [optional] A unique identifier to append to Activiti result variable names when more than one MQ message publication/subscription is supported by a process.
* @field mq_concurrency The number of process instances to simultaneously listen on the queue. Only positive numbers are accepted.
* @varOut mq_correlationId The correlating identifier of the message received. Use to correlate between sent/received messages; like a thread of communication.
* @varOut mq_messageId The unique message identifer of the message received.
* @varOut mq_deliveryTime The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti.
* @varOut mq_priority An integer priority of the message; supported range depends on MQ protocol.
* @varOut mq_payload The body of the MQ message; may be `null`.
* @varOut mq_payloadMimeType The MIME type of `mq_payload`; may be `null`.
* @varOut mq_replyQueueName The name of an MQ destination (queue or topic) to use to reply to the received message. The reply message must have the same correlating identifier. May be `null`.
* @varOut mq_statusQueueName The name of an MQ destination (queue or topic) where status updates on the received message are expected to be sent. The status message must have the same correlating identifier. May be `null`.
* @throws BPMNError timeout The MQ connection timed out connecting or waiting for a message.
* @throws BPMNError network The MQ connection experienced network issues.
* @throws BPMNError mq An unknown MQ issue occurred.
* @see mqPublishDelegate#delegate
* @see mqSubscribeReplyDelegate#delegate
*/
@Override
public void execute(final DelegateExecution execution) {
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, this.msts, execution);
mqExecution.validate();
GenericDestination destination = new GenericDestination();
destination.setQueueName(mqExecution.getQueueNameFromModel());
this.logger.debug("Will look for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
try {
MqCommunicator communicator = this.getCommunicator(mqExecution.getConnectorIdFromModel());
if (communicator.receive(destination,
new MqSubscriptionListener() {
@Override
public void consuming(AutoCloseable consumerCloseable) {
subscriptionService.autoconsuming(execution, consumerCloseable);
}
@Override
public void consumed(AutoCloseable consumerCloseable) {
subscriptionService.consumed(execution, consumerCloseable);
}
},
new TransactionalMessageHandler<String>() {
@Override
public void onMessage(DeliveredMessage<String> message) throws IOException {
logger.debug("Received MQ message: {} => {} => {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel(), message.getCorrelationId(), message.getMessageId());
if (message.getMessageId() != null)
mqExecution.setMqVariable(Constants.VARIABLE_MESSAGE_ID, message.getMessageId());
mqExecution.setCorrelationId(message.getCorrelationId());
mqExecution.setDeliveryTime(message.getDeliveryTime());
mqExecution.setPriority(message.getPriority());
mqExecution.setPayload(message.getContent(), message.getContentType());
mqExecution.setReplyToQueueName(message.getReplyToQueueName());
mqExecution.setStatusQueueName(message.getStatusQueueName());
}
}
) == null) {
this.logger.debug("Gracefully stopped looking for MQ messages: {} => {}", mqExecution.getConnectorIdFromModel(), mqExecution.getQueueNameFromModel());
this.services.getRuntimeService().deleteProcessInstance(execution.getProcessInstanceId(), "MQ subscription cancelled gracefully");
}
} catch (TimeoutException te) {
this.logger.error("MQ connection or communication timed out: " + te.getMessage(), te);
throw new BpmnError("timeout", "MQ connection or communication timed out: " + te.getMessage());
} catch (IOException ie) {
this.logger.error("MQ connection or network communication failed: " + ie.getMessage(), ie);
throw new BpmnError("network", "MQ connection or network communication failed: " + ie.getMessage());
} catch (JMSException je) {
this.logger.error("JMS communication failed: " + je.getMessage(), je);
throw new BpmnError("mq", "JMS communication failed: " + je.getMessage());
}
}
}