Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
7a5065419a | |||
d3c79cac9c | |||
5aef2d4446 | |||
dfb24cbd1f | |||
7a67634cfb | |||
41b5271617 | |||
cc14a59959 |
2
pom.xml
2
pom.xml
@@ -11,7 +11,7 @@
|
|||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<artifactId>mq-activiti-ext</artifactId>
|
<artifactId>mq-activiti-ext</artifactId>
|
||||||
<version>1.0.4</version>
|
<version>1.0.6</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<name>MQ Activiti Extension</name>
|
<name>MQ Activiti Extension</name>
|
||||||
|
@@ -3087,8 +3087,7 @@
|
|||||||
"mq_prioritypackage",
|
"mq_prioritypackage",
|
||||||
"mq_payloadpackage",
|
"mq_payloadpackage",
|
||||||
"mq_replyQueueNamepackage",
|
"mq_replyQueueNamepackage",
|
||||||
"mq_statusQueueNamepackage",
|
"mq_statusQueueNamepackage"
|
||||||
"mq_metadataProcessScopepackage"
|
|
||||||
],
|
],
|
||||||
"hiddenPropertyPackages": [
|
"hiddenPropertyPackages": [
|
||||||
"multiinstance_typepackage",
|
"multiinstance_typepackage",
|
||||||
|
@@ -28,6 +28,7 @@ import org.activiti.engine.delegate.event.ActivitiEventType;
|
|||||||
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
|
import org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior;
|
||||||
import org.activiti.engine.impl.context.Context;
|
import org.activiti.engine.impl.context.Context;
|
||||||
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
|
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
|
||||||
|
import org.activiti.engine.impl.persistence.entity.JobEntity;
|
||||||
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
|
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
|
||||||
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
|
import org.activiti.engine.impl.util.ProcessDefinitionUtil;
|
||||||
import org.activiti.engine.repository.ProcessDefinition;
|
import org.activiti.engine.repository.ProcessDefinition;
|
||||||
@@ -234,6 +235,17 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
// we only want to deal with ProcessDefinition entities
|
// we only want to deal with ProcessDefinition entities
|
||||||
|
} else if (aaevent.getEntity() instanceof JobEntity) {
|
||||||
|
this.logger.trace("Triggered by job state change: {}", aaevent.getEntity());
|
||||||
|
|
||||||
|
switch (aaevent.getType()) {
|
||||||
|
case ENTITY_DELETED:
|
||||||
|
case ENTITY_SUSPENDED:
|
||||||
|
// we need to stop the listener
|
||||||
|
this.onJobRemoveEvent((JobEntity) aaevent.getEntity());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -282,6 +294,12 @@ public class MQProcessDefinitionMonitor implements ActivitiEventListener, Applic
|
|||||||
this.deloopMqSubscribeTask(entity.getId());
|
this.deloopMqSubscribeTask(entity.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void onJobRemoveEvent(JobEntity entity) {
|
||||||
|
this.logger.trace("Triggered by job removal: {}", entity);
|
||||||
|
this.subscriptionService.cancelled(entity.getExecutionId());
|
||||||
|
this.logger.debug("Subscription execution ended due to job removal: job: {}: exec: {}", entity.getId(), entity.getExecutionId());
|
||||||
|
}
|
||||||
|
|
||||||
protected void unsubscribeOtherMqSubscribeTasks(ProcessDefinitionEntity procDef) {
|
protected void unsubscribeOtherMqSubscribeTasks(ProcessDefinitionEntity procDef) {
|
||||||
this.unsubscribeOtherMqSubscribeTasks(procDef.getId());
|
this.unsubscribeOtherMqSubscribeTasks(procDef.getId());
|
||||||
}
|
}
|
||||||
|
@@ -54,12 +54,7 @@ public class MqDelegateExecution {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void setMqVariable(String varName, Object value) {
|
protected void setMqVariable(String varName, Object value) {
|
||||||
varName = this.formulateVariableName(varName);
|
this.execution.setVariable(varName, value);
|
||||||
if (this.task.doWriteToProcessScope()) {
|
|
||||||
this.execution.setVariable(varName, value);
|
|
||||||
} else {
|
|
||||||
this.execution.setVariableLocal(varName, value);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String formulateVariableName(String varName) {
|
public String formulateVariableName(String varName) {
|
||||||
@@ -67,11 +62,19 @@ public class MqDelegateExecution {
|
|||||||
varName += "_" + this.getMessageNameFromModel();
|
varName += "_" + this.getMessageNameFromModel();
|
||||||
return varName;
|
return varName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Always pull from the process scope.
|
||||||
|
* @param correlationId
|
||||||
|
*/
|
||||||
public String getCorrelationId() {
|
public String getCorrelationId() {
|
||||||
return this.execution.getVariable(this.formulateVariableName(Constants.VARIABLE_CORRELATION_ID), String.class);
|
return this.execution.getVariable(this.formulateVariableName(Constants.VARIABLE_CORRELATION_ID), String.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Always set at process scope.
|
||||||
|
* @param correlationId
|
||||||
|
*/
|
||||||
public void setCorrelationId(String correlationId) {
|
public void setCorrelationId(String correlationId) {
|
||||||
this.execution.setVariable(this.formulateVariableName(Constants.VARIABLE_CORRELATION_ID), correlationId);
|
this.execution.setVariable(this.formulateVariableName(Constants.VARIABLE_CORRELATION_ID), correlationId);
|
||||||
}
|
}
|
||||||
|
@@ -8,6 +8,9 @@ import java.util.Set;
|
|||||||
import org.activiti.engine.ProcessEngine;
|
import org.activiti.engine.ProcessEngine;
|
||||||
import org.activiti.engine.delegate.DelegateExecution;
|
import org.activiti.engine.delegate.DelegateExecution;
|
||||||
import org.activiti.engine.repository.ProcessDefinition;
|
import org.activiti.engine.repository.ProcessDefinition;
|
||||||
|
import org.activiti.engine.runtime.Execution;
|
||||||
|
import org.activiti.engine.runtime.Job;
|
||||||
|
import org.activiti.engine.runtime.ProcessInstance;
|
||||||
import org.apache.commons.collections4.MultiValuedMap;
|
import org.apache.commons.collections4.MultiValuedMap;
|
||||||
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
|
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
@@ -26,13 +29,13 @@ public class MqExecutionService {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* The size of the keys is limited to the number of process definitions
|
* The size of the keys is limited to the number of process definitions
|
||||||
* series are defined. This excludes versions. It would actually only
|
* series defined. This excludes versions. It would actually only contain
|
||||||
* contain ones with an MQ subscribe task. It would never be a significant
|
* ones with an MQ subscribe task. It would never be a significant memory
|
||||||
* memory hog.
|
* hog.
|
||||||
*
|
*
|
||||||
* The size of the values is limited to the number of active process
|
* The size of the values is limited to the number of active process
|
||||||
* definition versions exist. So it would never be a significant memory
|
* definition versions that exist. So it would never be a significant
|
||||||
* hog.
|
* memory hog.
|
||||||
*
|
*
|
||||||
* The size of the keys/values have nothing to do with the number of
|
* The size of the keys/values have nothing to do with the number of
|
||||||
* process instances or executions.
|
* process instances or executions.
|
||||||
@@ -81,8 +84,10 @@ public class MqExecutionService {
|
|||||||
private MultiValuedMap<Pair<String, String>, String> activityExecutionMap = new HashSetValuedHashMap<>();
|
private MultiValuedMap<Pair<String, String>, String> activityExecutionMap = new HashSetValuedHashMap<>();
|
||||||
|
|
||||||
public synchronized void executing(DelegateExecution execution) {
|
public synchronized void executing(DelegateExecution execution) {
|
||||||
|
ProcessDefinition procDef = this.services.getRepositoryService().createProcessDefinitionQuery().processDefinitionId(execution.getProcessDefinitionId()).singleResult();
|
||||||
|
this.processDefinitionKeyMap.put(procDef.getKey(), execution.getProcessDefinitionId());
|
||||||
|
|
||||||
Pair<String, String> key = this.toKey(execution);
|
Pair<String, String> key = this.toKey(execution);
|
||||||
this.processDefinitionKeyMap.put(execution.getProcessDefinitionId(), key.getLeft());
|
|
||||||
this.processDefinitionActivityMap.put(key.getLeft(), key.getRight());
|
this.processDefinitionActivityMap.put(key.getLeft(), key.getRight());
|
||||||
this.activityExecutionMap.put(key, execution.getId());
|
this.activityExecutionMap.put(key, execution.getId());
|
||||||
}
|
}
|
||||||
@@ -92,6 +97,39 @@ public class MqExecutionService {
|
|||||||
this.activityExecutionMap.removeMapping(key, execution.getId());
|
this.activityExecutionMap.removeMapping(key, execution.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized final boolean cancelled(String executionId) {
|
||||||
|
Execution execution = this.services.getRuntimeService().createExecutionQuery().executionId(executionId).singleResult();
|
||||||
|
return this.cancelled(execution);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized boolean cancelled(Execution execution) {
|
||||||
|
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
|
||||||
|
if (execution.getActivityId() != null) {
|
||||||
|
Pair<String, String> key = this.toKey(pi.getProcessDefinitionId(), execution.getActivityId());
|
||||||
|
return this.activityExecutionMap.removeMapping(key, execution.getId());
|
||||||
|
} else {
|
||||||
|
this.logger.trace("No activity discovered, so checking all activities in the process definition: {}: {}", pi.getProcessDefinitionId(), execution.getId());
|
||||||
|
Collection<String> activityIds = this.processDefinitionActivityMap.get(pi.getProcessDefinitionId());
|
||||||
|
boolean removed = false;
|
||||||
|
|
||||||
|
for (String activityId : activityIds) {
|
||||||
|
Pair<String, String> key = this.toKey(pi.getProcessDefinitionId(), activityId);
|
||||||
|
removed = this.activityExecutionMap.removeMapping(key, execution.getId()) || removed;
|
||||||
|
}
|
||||||
|
|
||||||
|
return removed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized final boolean cancelledJob(String jobId) {
|
||||||
|
Job job = this.services.getManagementService().createJobQuery().jobId(jobId).singleResult();
|
||||||
|
return this.cancelledJob(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized final boolean cancelledJob(Job job) {
|
||||||
|
return this.cancelled(job.getExecutionId());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
|
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
|
||||||
* @return A set of execution identifiers that were in the now cleared map.
|
* @return A set of execution identifiers that were in the now cleared map.
|
||||||
|
@@ -70,26 +70,62 @@ public class MqServiceTask {
|
|||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type, VariableScope varscope, boolean forceExpressionProcessing) {
|
protected <T> T getFieldValueFromModel(String fieldName, Class<T> type, VariableScope varscope, boolean forceExpressionProcessing) {
|
||||||
|
Object value = this.getFieldValueFromModel(fieldName, varscope, forceExpressionProcessing);
|
||||||
|
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
} else if (String.class.isAssignableFrom(type)) {
|
||||||
|
if (value instanceof String) {
|
||||||
|
return (T) value;
|
||||||
|
} else {
|
||||||
|
return (T) value.toString();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
Method method = type.getMethod("valueOf", value.getClass());
|
||||||
|
return (T) method.invoke(null, value);
|
||||||
|
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
|
||||||
|
String strvalue;
|
||||||
|
if (value instanceof String) {
|
||||||
|
strvalue = (String) value;
|
||||||
|
} else {
|
||||||
|
strvalue = value.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Method method = type.getMethod("valueOf", String.class);
|
||||||
|
return (T) method.invoke(null, strvalue);
|
||||||
|
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e2) {
|
||||||
|
throw new IllegalArgumentException("The target type '" + type + "' has no 'valueOf' method for String or type: " + value.getClass());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Object getFieldValueFromModel(String fieldName, VariableScope varscope, boolean forceExpressionProcessing) {
|
||||||
FieldExtension field = this.fieldMap.get(fieldName);
|
FieldExtension field = this.fieldMap.get(fieldName);
|
||||||
if (field == null) {
|
if (field == null) {
|
||||||
return null;
|
return null;
|
||||||
} else if (field.getExpression() != null && field.getExpression().length() > 0) {
|
} else if (field.getExpression() != null && field.getExpression().length() > 0) {
|
||||||
|
this.logger.trace("Field value is recognized as an expression by the Activity framework: {}: {}", fieldName, field.getExpression());
|
||||||
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||||
Expression expr = exprman.createExpression(field.getExpression());
|
Expression expr = exprman.createExpression(field.getExpression());
|
||||||
return (T) expr.getValue(varscope);
|
return expr.getValue(varscope);
|
||||||
|
} else if (field.getStringValue() == null) {
|
||||||
|
this.logger.trace("Field value is null: {}", fieldName);
|
||||||
|
return null;
|
||||||
} else if (forceExpressionProcessing) {
|
} else if (forceExpressionProcessing) {
|
||||||
|
this.logger.trace("Field value will be processed as potentially having expression(s): {}", fieldName);
|
||||||
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||||
Expression expr = exprman.createExpression(field.getStringValue());
|
Expression expr = exprman.createExpression(field.getStringValue());
|
||||||
return (T) expr.getValue(varscope);
|
return expr.getValue(varscope);
|
||||||
} else if (String.class.isAssignableFrom(type)) {
|
} else if (field.getStringValue().startsWith("${") && field.getStringValue().endsWith("}")) {
|
||||||
return (T) field.getStringValue();
|
this.logger.trace("Field value is recognized as an expression by the MQ extension: {}: {}", fieldName, field.getStringValue());
|
||||||
|
ExpressionManager exprman = Context.getProcessEngineConfiguration().getExpressionManager();
|
||||||
|
Expression expr = exprman.createExpression(field.getStringValue());
|
||||||
|
return expr.getValue(varscope);
|
||||||
} else {
|
} else {
|
||||||
try {
|
return field.getStringValue();
|
||||||
Method method = type.getMethod("valueOf", String.class);
|
|
||||||
return (T) method.invoke(null, field.getStringValue());
|
|
||||||
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
|
|
||||||
throw new IllegalArgumentException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -8,6 +8,15 @@ public class MqSubscribeDelegateExecution extends MqDelegateExecution {
|
|||||||
public MqSubscribeDelegateExecution(ProcessEngine services, MqServiceTaskService msts, DelegateExecution execution) {
|
public MqSubscribeDelegateExecution(ProcessEngine services, MqServiceTaskService msts, DelegateExecution execution) {
|
||||||
super(services, msts, execution);
|
super(services, msts, execution);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void setMqVariable(String varName, Object value) {
|
||||||
|
varName = this.formulateVariableName(varName);
|
||||||
|
if (this.task.doWriteToProcessScope()) {
|
||||||
|
this.execution.setVariable(varName, value);
|
||||||
|
} else {
|
||||||
|
this.execution.setVariableLocal(varName, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Integer getConcurrencyFromModel() {
|
public Integer getConcurrencyFromModel() {
|
||||||
return this.getFieldValueFromModel(Constants.FIELD_CONCURRENCY, Integer.class);
|
return this.getFieldValueFromModel(Constants.FIELD_CONCURRENCY, Integer.class);
|
||||||
|
@@ -7,6 +7,7 @@ import java.util.Set;
|
|||||||
import org.activiti.engine.ProcessEngine;
|
import org.activiti.engine.ProcessEngine;
|
||||||
import org.activiti.engine.delegate.DelegateExecution;
|
import org.activiti.engine.delegate.DelegateExecution;
|
||||||
import org.activiti.engine.repository.ProcessDefinition;
|
import org.activiti.engine.repository.ProcessDefinition;
|
||||||
|
import org.activiti.engine.runtime.Execution;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -45,6 +46,23 @@ public class MqSubscriptionService extends MqExecutionService {
|
|||||||
this.executed(execution);
|
this.executed(execution);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean cancelled(Execution execution) {
|
||||||
|
AutoCloseable cachedConsumerCloseable = this.executionSubscriptionMap.get(execution.getId());
|
||||||
|
if (cachedConsumerCloseable == null) {
|
||||||
|
this.logger.trace("An execution was cancelled, but had no registered subscription to close: {}", execution.getId());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// this will eventually lead to a call to "consumed() above"
|
||||||
|
try {
|
||||||
|
cachedConsumerCloseable.close();
|
||||||
|
return super.cancelled(execution);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IllegalStateException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
|
* @param latestProcessDefinitionId A process definition identifier to NOT clear. All other versions will be cleared.
|
||||||
* @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended.
|
* @return A set of execution identifiers subscribed to MQ that were in the now cleared map; all subscriptions now ended.
|
||||||
|
Reference in New Issue
Block a user