added priority/concurrency

This commit is contained in:
2025-02-13 15:58:06 -05:00
parent 9762f74e89
commit 4fe113b8a7
15 changed files with 355 additions and 97 deletions

View File

@@ -1186,6 +1186,40 @@
}
]
},
{
"name": "mq_prioritypackage",
"properties": [
{
"id": "mq_priority",
"type": "String",
"title": "MQ Priority",
"value": "",
"description": "MQ message priority; depends on protocol specification",
"popular": false,
"custom": {
"includeInXML": true,
"xmlPropertyName": "mq_priority"
}
}
]
},
{
"name": "mq_concurrencypackage",
"properties": [
{
"id": "mq_concurrency",
"type": "String",
"title": "MQ Concurrency (positive number)",
"value": "",
"description": "Number of MQ subscription threads",
"popular": false,
"custom": {
"includeInXML": true,
"xmlPropertyName": "mq_concurrency"
}
}
]
},
{
"name": "mq_payloadpackage",
"properties": [
@@ -3050,10 +3084,11 @@
"mq_connectorIdpackage",
"mq_queueNamepackage",
"mq_messageNamepackage",
"mq_prioritypackage",
"mq_payloadpackage",
"mq_replyQueueNamepackage",
"mq_statusQueueNamepackage",
"mq_metadataProcessScopepackage",
"mq_payloadpackage"
"mq_metadataProcessScopepackage"
],
"hiddenPropertyPackages": [
"multiinstance_typepackage",
@@ -3153,6 +3188,7 @@
"mq_connectorIdpackage",
"mq_queueNamepackage",
"mq_messageNamepackage",
"mq_concurrencypackage",
"mq_metadataProcessScopepackage"
],
"hiddenPropertyPackages": [

View File

@@ -9,6 +9,8 @@ public class Constants {
public static final String FIELD_CONNECTOR_ID = "mq_connectorId";
public static final String FIELD_QUEUE_NAME = "mq_queueName";
public static final String FIELD_MESSAGE_NAME = "mq_messageName";
public static final String FIELD_CONCURRENCY = "mq_concurrency";
public static final String FIELD_PRIORITY = "mq_priority";
public static final String FIELD_REPLY_QUEUE_NAME = "mq_replyQueueName";
public static final String FIELD_STATUS_QUEUE_NAME = "mq_statusQueueName";
public static final String FIELD_PAYLOAD = "mq_payload";

View File

@@ -12,6 +12,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.activiti.bpmn.model.BpmnModel;
import org.activiti.bpmn.model.FieldExtension;
import org.activiti.bpmn.model.FlowElement;
import org.activiti.bpmn.model.SequenceFlow;
import org.activiti.bpmn.model.ServiceTask;
@@ -30,6 +31,9 @@ import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.repository.ProcessDefinitionQuery;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.ExecutionQuery;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -74,30 +78,86 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
}
if (event instanceof ContextRefreshedEvent || event instanceof ContextStartedEvent) {
Tenant tenant = this.tenantFinderService.findTenant();
ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery().active();
if (tenant == null) {
procDefQuery.processDefinitionWithoutTenantId();
} else {
String tenantId = this.tenantFinderService.transform(tenant.getId());
procDefQuery.processDefinitionTenantId(tenantId);
}
List<ProcessDefinition> procDefs = procDefQuery.list();
String tenantId = this.findTenantId();
List<ProcessDefinition> procDefs = this.findLatestActiveProcessDefinnitions(tenantId);
this.logger.debug("Found {} active process definitions", procDefs.size());
for (ProcessDefinition procDef : procDefs) {
this.logger.debug("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId());
this.logger.trace("Inspecting process definition for qualifying MQ subscriptions: {}", procDef.getId());
ServiceTask task = this.findMqStartSubscribeTask(procDef.getId());
if (task == null)
continue;
this.loopMqSubscribeTask(procDef.getId(), task);
int concurrency = this.determineConcurrency(task);
this.logger.debug("Process definition MQ subscription is configured for concurrency: {}: {}", procDef.getId(), concurrency);
List<Execution> execs = this.findExecutionsByServiceTask(tenantId, procDef.getId(), task);
this.logger.debug("Process appears to have {} executions waiting on the MQ subscription: {}", execs.size(), procDef.getId());
if (execs.size() < concurrency) {
this.logger.info("Process has {} too few executions waiting on the MQ subscription; starting them: {}", (concurrency - execs.size()), procDef.getId());
for (int thread = execs.size(); thread < concurrency; thread++) {
this.loopMqSubscribeTask(procDef.getId(), task);
}
}
}
}
}
protected String findTenantId() {
Tenant tenant = this.tenantFinderService.findTenant();
return this.tenantFinderService.transform(tenant);
}
protected List<ProcessDefinition> findLatestActiveProcessDefinnitions(String tenantId) {
ProcessDefinitionQuery procDefQuery = this.services.getRepositoryService().createProcessDefinitionQuery()
.latestVersion()
.active();
if (tenantId == null) {
procDefQuery.processDefinitionWithoutTenantId();
} else {
procDefQuery.processDefinitionTenantId(tenantId);
}
return procDefQuery.list();
}
protected List<Execution> findExecutionsByServiceTask(String tenantId, String processDefinitionId, ServiceTask task) {
ExecutionQuery execQuery = this.services.getRuntimeService().createExecutionQuery()
.processDefinitionId(processDefinitionId)
.activityId(task.getId());
if (tenantId == null) {
execQuery.executionWithoutTenantId();
} else {
execQuery.executionTenantId(tenantId);
}
return execQuery.list();
}
protected Integer findConcurrency(ServiceTask task) {
for (FieldExtension fieldext : task.getFieldExtensions()) {
if (fieldext.getFieldName().equals(Constants.FIELD_CONCURRENCY)) {
String concurrencyStr = StringUtils.trimToNull(fieldext.getStringValue());
return concurrencyStr == null ? null : Integer.valueOf(concurrencyStr);
}
}
return null;
}
protected int determineConcurrency(ServiceTask task) {
Integer concurrency = this.findConcurrency(task);
if (concurrency == null) {
return 1;
} else if (concurrency.intValue() < 1) {
this.logger.warn("The task defines an illegal concurrency of {}; using 1: {}", concurrency, task.getId());
return 1;
} else {
return concurrency.intValue();
}
}
/**
* This method will start monitoring for changes in deployment, app, and
* process definitions.
@@ -212,10 +272,9 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
this.logger.debug("Starting process instance on process '{}' to subscribe to an MQ queue", processDefId);
this.services.getRuntimeService().startProcessInstanceById(processDefId);
if (this.activeListeners.containsKey(processDefId)) {
this.logger.debug("The process definition already has a looping listener: {}", processDefId);
if (this.activeListeners.containsKey(processDefId))
// one listener, no matter how many instances are subscribed
return;
}
AbstractActivityListener listener = new AbstractActivityListener(processDefId, task) {
@Override
@@ -334,7 +393,7 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
return null;
}
this.logger.debug("Process starts with an MQ subscription: {}: {}", processDefId, task.getId(), task.getName());
this.logger.info("Process starts with an MQ subscription: {}: {}", processDefId, task.getId(), task.getName());
return task;
}

View File

@@ -2,92 +2,35 @@ package com.inteligr8.activiti.mq;
import java.time.OffsetDateTime;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.OverridingMethodsMustInvokeSuper;
import org.activiti.bpmn.model.FieldExtension;
import org.activiti.bpmn.model.FlowElement;
import org.activiti.bpmn.model.ServiceTask;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.delegate.Expression;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.el.ExpressionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqDelegateExecution {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
protected final ProcessEngine services;
protected final DelegateExecution execution;
protected final ServiceTask task;
protected final Map<String, FieldExtension> fieldMap = new HashMap<>();
private boolean metadataToProcessScope = false;
protected final MqServiceTask task;
public MqDelegateExecution(ProcessEngine services, DelegateExecution execution) {
this.services = services;
public MqDelegateExecution(ProcessEngine services, MqServiceTaskService msts, DelegateExecution execution) {
this.execution = execution;
FlowElement flowElement = execution.getCurrentFlowElement();
if (!(flowElement instanceof ServiceTask))
throw new IllegalStateException("This should never happen");
this.task = (ServiceTask) flowElement;
this.logger.trace("Discovered task: {}: {}", this.task.getId(), this.task.getName());
this.logger.trace("Indexing {} fields", this.task.getFieldExtensions().size());
for (FieldExtension field : this.task.getFieldExtensions()) {
this.logger.trace("Discovering field: {}: {}: {}", field.getId(), field.getFieldName(), field.getStringValue());
switch (field.getFieldName()) {
case Constants.FIELD_METADATA_PROCESS_SCOPE:
this.metadataToProcessScope = Boolean.valueOf(field.getStringValue());
break;
default:
this.fieldMap.put(field.getFieldName(), field);
}
}
this.task = msts.get(execution.getProcessDefinitionId(), execution.getCurrentFlowElement());
}
@OverridingMethodsMustInvokeSuper
public void validate() {
if (this.fieldMap.get(Constants.FIELD_CONNECTOR_ID) == null)
throw new IllegalStateException("The '" + this.execution.getCurrentActivityId() + "' activity must define an 'MQ Connector ID'");
if (this.fieldMap.get(Constants.FIELD_QUEUE_NAME) == null)
throw new IllegalStateException("The '" + this.execution.getCurrentActivityId() + "' activity must define an 'MQ Queue Name'");
}
public String getTaskId() {
return this.task.getId();
}
public boolean doWriteToProcessScope() {
return this.metadataToProcessScope;
}
/**
* Unlike MqServiceTask, this allows for expression expansion based on the variables
*/
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type) {
return this.getFieldValueFromModel(fieldName, type, false);
return this.task.getFieldValueFromModel(fieldName, type, this.execution, false);
}
@SuppressWarnings("unchecked")
/**
* Unlike MqServiceTask, this allows for expression expansion based on the variables; even partial values
*/
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type, boolean forceExpressionProcessing) {
FieldExtension field = this.fieldMap.get(fieldName);
if (field == null) {
return null;
} else if (field.getExpression() != null && field.getExpression().length() > 0) {
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
Expression expr = exprman.createExpression(field.getExpression());
return (T) expr.getValue(this.execution);
} else if (forceExpressionProcessing) {
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
Expression expr = exprman.createExpression(field.getStringValue());
return (T) expr.getValue(this.execution);
} else {
return (T) field.getStringValue();
}
return this.task.getFieldValueFromModel(fieldName, type, this.execution, forceExpressionProcessing);
}
public String getConnectorIdFromModel() {
@@ -112,7 +55,7 @@ public class MqDelegateExecution {
protected void setMqVariable(String varName, Object value) {
varName = this.formulateVariableName(varName);
if (this.doWriteToProcessScope()) {
if (this.task.doWriteToProcessScope()) {
this.execution.setVariable(varName, value);
} else {
this.execution.setVariableLocal(varName, value);

View File

@@ -30,6 +30,9 @@ public class MqPublishDelegate extends AbstractMqDelegate {
@Autowired
private ProcessEngine services;
@Autowired
private MqServiceTaskService msts;
/**
* This method sends a message to an MQ queue.
*
@@ -46,6 +49,7 @@ public class MqPublishDelegate extends AbstractMqDelegate {
* @field mq_connectorId An Activiti App Tenant Endpoint ID or Java system property in the format `inteligr8.mq.connectors.{connectorId}.url`. Using system properties support `url`, `username`, and `password`.
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
* @field mq_priority [optional] A priority of the MQ message. May be an expression. Value depends on MQ protocol.
* @field mq_payload [optional] The body of the MQ message. May include expressions.
* @field mq_payloadMimeType [optional] The MIME type of the body of the MQ message.
* @field mq_replyQueueName [optional] The name of an MQ destination (queue or topic). This tells the processor of the message where to send a reply.
@@ -59,7 +63,7 @@ public class MqPublishDelegate extends AbstractMqDelegate {
*/
@Override
public void execute(DelegateExecution execution) {
MqPublishDelegateExecution mqExecution = new MqPublishDelegateExecution(this.services, execution);
MqPublishDelegateExecution mqExecution = new MqPublishDelegateExecution(this.services, this.msts, execution);
mqExecution.validate();
GenericDestination destination = new GenericDestination();
@@ -72,6 +76,7 @@ public class MqPublishDelegate extends AbstractMqDelegate {
if (mqExecution.getStatusQueueNameFromModel() != null)
message.setProperty("inteligr8.statusQueueName", mqExecution.getStatusQueueNameFromModel());
message.setReplyToQueueName(mqExecution.getReplyQueueNameFromModel());
message.setPriority(mqExecution.getPriorityFromModel());
String payload = mqExecution.getPayloadFromModel();
if (payload != null) {

View File

@@ -5,8 +5,12 @@ import org.activiti.engine.delegate.DelegateExecution;
public class MqPublishDelegateExecution extends MqDelegateExecution {
public MqPublishDelegateExecution(ProcessEngine services, DelegateExecution execution) {
super(services, execution);
public MqPublishDelegateExecution(ProcessEngine services, MqServiceTaskService msts, DelegateExecution execution) {
super(services, msts, execution);
}
public Integer getPriorityFromModel() {
return this.getFieldValueFromModel(Constants.FIELD_PRIORITY, Integer.class);
}
public String getReplyQueueNameFromModel() {

View File

@@ -0,0 +1,96 @@
package com.inteligr8.activiti.mq;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.OverridingMethodsMustInvokeSuper;
import org.activiti.bpmn.model.FieldExtension;
import org.activiti.bpmn.model.ServiceTask;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.Expression;
import org.activiti.engine.delegate.VariableScope;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.el.ExpressionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqServiceTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
protected final ProcessEngine services;
protected final ServiceTask task;
protected final Map<String, FieldExtension> fieldMap = new HashMap<>();
private boolean metadataToProcessScope = false;
public MqServiceTask(ProcessEngine services, ServiceTask task) {
this.services = services;
this.task = task;
this.logger.trace("Indexing {} fields", this.task.getFieldExtensions().size());
for (FieldExtension field : this.task.getFieldExtensions()) {
this.logger.trace("Discovering field: {}: {}: {}", field.getId(), field.getFieldName(), field.getStringValue());
switch (field.getFieldName()) {
case Constants.FIELD_METADATA_PROCESS_SCOPE:
this.metadataToProcessScope = Boolean.valueOf(field.getStringValue());
break;
default:
this.fieldMap.put(field.getFieldName(), field);
}
}
}
@OverridingMethodsMustInvokeSuper
public void validate() {
if (this.fieldMap.get(Constants.FIELD_CONNECTOR_ID) == null)
throw new IllegalStateException("The '" + this.task.getId() + "' activity must define an 'MQ Connector ID'");
if (this.fieldMap.get(Constants.FIELD_QUEUE_NAME) == null)
throw new IllegalStateException("The '" + this.task.getId() + "' activity must define an 'MQ Queue Name'");
}
public String getTaskId() {
return this.task.getId();
}
public boolean doWriteToProcessScope() {
return this.metadataToProcessScope;
}
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type) {
return this.getFieldValueFromModel(fieldName, type, null, false);
}
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type, VariableScope varscope) {
return this.getFieldValueFromModel(fieldName, type, varscope, false);
}
@SuppressWarnings("unchecked")
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type, VariableScope varscope, boolean forceExpressionProcessing) {
FieldExtension field = this.fieldMap.get(fieldName);
if (field == null) {
return null;
} else if (field.getExpression() != null && field.getExpression().length() > 0) {
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
Expression expr = exprman.createExpression(field.getExpression());
return (T) expr.getValue(varscope);
} else if (forceExpressionProcessing) {
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
Expression expr = exprman.createExpression(field.getStringValue());
return (T) expr.getValue(varscope);
} else if (String.class.isAssignableFrom(type)) {
return (T) field.getStringValue();
} else {
try {
Method method = type.getMethod("valueOf", String.class);
return (T) method.invoke(null, field.getStringValue());
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new IllegalArgumentException(e);
}
}
}
}

View File

@@ -0,0 +1,82 @@
package com.inteligr8.activiti.mq;
import java.util.HashMap;
import java.util.Map;
import org.activiti.bpmn.model.BpmnModel;
import org.activiti.bpmn.model.FlowElement;
import org.activiti.bpmn.model.ServiceTask;
import org.activiti.engine.ProcessEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqServiceTaskService {
private final Logger logger = LoggerFactory.getLogger(MqServiceTaskService.class);
@Autowired
private ProcessEngine services;
/**
* This is an indefinite cache with no updates or removals.
* We should be fine here.
*/
private Map<String, MqServiceTask> taskCache = new HashMap<>();
public MqServiceTask get(String processDefinitionId, String taskId) {
String key = processDefinitionId + "|" + taskId;
if (this.taskCache.containsKey(key))
return this.taskCache.get(key);
BpmnModel model = this.services.getRepositoryService().getBpmnModel(processDefinitionId);
if (model == null)
throw new IllegalArgumentException("The process definition '" + processDefinitionId + "' could not be found");
FlowElement element = model.getFlowElement(taskId);
if (element == null)
throw new IllegalArgumentException("The service task '" + taskId + "' could not be found in the process definition '" + processDefinitionId + "' model");
if (!(element instanceof ServiceTask))
throw new IllegalArgumentException("The model element '" + taskId + "' is not a service task as expected");
ServiceTask task = (ServiceTask) element;
this.logger.trace("Discovered service task: {}: {}", task.getId(), task.getName());
MqServiceTask mqtask = new MqServiceTask(this.services, task);
mqtask.validate();
this.taskCache.put(key, mqtask);
return mqtask;
}
public MqServiceTask get(String processDefinitionId, FlowElement taskElement) {
String key = processDefinitionId + "|" + taskElement.getId();
if (this.taskCache.containsKey(key))
return this.taskCache.get(key);
if (!(taskElement instanceof ServiceTask))
throw new IllegalArgumentException("The model element '" + taskElement.getId() + "' is not a service task as expected");
ServiceTask task = (ServiceTask) taskElement;
this.logger.trace("Discovered service task: {}: {}", task.getId(), task.getName());
MqServiceTask mqtask = new MqServiceTask(this.services, task);
mqtask.validate();
this.taskCache.put(key, mqtask);
return mqtask;
}
public MqServiceTask get(String processDefinitionId, ServiceTask task) {
String key = processDefinitionId + "|" + task.getId();
if (this.taskCache.containsKey(key))
return this.taskCache.get(key);
MqServiceTask mqtask = new MqServiceTask(this.services, task);
mqtask.validate();
this.taskCache.put(key, mqtask);
return mqtask;
}
}

View File

@@ -29,6 +29,9 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
@Autowired
private ProcessEngine services;
@Autowired
private MqServiceTaskService msts;
/**
* This delegate listens for messages on an MQ queue.
@@ -51,6 +54,7 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
* @field mq_queueName The name of an MQ destination (queue or topic). This is the target of the message. If it doesn't exist, it will be created.
* @field mq_metadataProcessScope [optional] `true` to set all variables below in the process instance scope; otherwise the variables will be local to the task.
* @field mq_messageName [optional] A unique identifier to append to Activiti variables when more than one MQ message publication/subscription is supported by a process.
* @field mq_concurrency The number of process instances to simultaneously listen on the queue. Only positive numbers are accepted.
* @varOut mq_correlationId The correlating identifier of the message received. Use to correlate between sent/received messages; like a thread of communication.
* @varOut mq_messageId The unique message identifer of the message received.
* @varOut mq_deliveryTime The time the message was delivered; not received. It is of the `java.util.Date` type as supported by Activiti.
@@ -67,7 +71,7 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
*/
@Override
public void execute(DelegateExecution execution) {
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, execution);
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, this.msts, execution);
mqExecution.validate();
GenericDestination destination = new GenericDestination();
@@ -103,5 +107,10 @@ public class MqSubscribeDelegate extends AbstractMqDelegate {
throw new BpmnError("mq", "JMS communication failed: " + je.getMessage());
}
}
public Integer getConcurrency(DelegateExecution execution) {
MqServiceTask task = this.msts.get(execution.getProcessDefinitionId(), execution.getCurrentActivityId());
return task == null ? 1 : task.getFieldValueFromModel(Constants.FIELD_CONCURRENCY, Integer.class);
}
}

View File

@@ -5,8 +5,12 @@ import org.activiti.engine.delegate.DelegateExecution;
public class MqSubscribeDelegateExecution extends MqDelegateExecution {
public MqSubscribeDelegateExecution(ProcessEngine services, DelegateExecution execution) {
super(services, execution);
public MqSubscribeDelegateExecution(ProcessEngine services, MqServiceTaskService msts, DelegateExecution execution) {
super(services, msts, execution);
}
public Integer getConcurrencyFromModel() {
return this.getFieldValueFromModel(Constants.FIELD_CONCURRENCY, Integer.class);
}
}

View File

@@ -30,6 +30,9 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
@Autowired
private ProcessEngine services;
@Autowired
private MqServiceTaskService msts;
/**
* This method listens for a reply message on an MQ queue.
*
@@ -65,7 +68,7 @@ public class MqSubscribeReplyDelegate extends AbstractMqDelegate {
*/
@Override
public void execute(DelegateExecution execution) {
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, execution);
MqSubscribeDelegateExecution mqExecution = new MqSubscribeDelegateExecution(this.services, this.msts, execution);
mqExecution.validate();
GenericDestination destination = new GenericDestination();

View File

@@ -34,6 +34,14 @@ public interface PreparedMessage<BodyType> extends Message {
}
}
default Integer getPriority() {
return this.getProperty("priority");
}
default void setPriority(Integer priority) {
this.setProperty("priority", priority);
}
default String getReplyToQueueName() {
return this.getProperty("replyToQueueName");
}

View File

@@ -64,6 +64,10 @@ public class TenantFinderService implements ApplicationListener<ApplicationStart
return tenantId == null ? null : ("tenant_" + tenantId);
}
public String transform(Tenant tenant) {
return tenant == null ? null : this.transform(tenant.getId());
}
public Long transform(String tenantId) {
return tenantId == null ? null : Long.parseLong(tenantId.substring("tenant_".length()));
}

View File

@@ -45,7 +45,7 @@ public class AmqpPreparedMessage<BodyType> extends AbstractMessage implements Pr
"utf-8",
this.getProperties(),
DeliveryMode.PERSISTENT, // yes, this is a JMS class, but the values are the same; survive restarts
null, // default priority
this.getPriority(),
correlationId,
this.getReplyToQueueName() != null ? this.getReplyToQueueName() : null,
null, // expiration

View File

@@ -56,6 +56,9 @@ public class JmsPreparedMessage<BodyType> extends AbstractMessage implements Pre
if (this.getReplyToQueueName() != null)
message.setJMSReplyTo(session.createQueue(this.getReplyToQueueName()));
if (this.getPriority() != null)
message.setJMSPriority(this.getPriority());
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); // survive restarts
return message;
}