3 Commits

Author SHA1 Message Date
5ee3cbee42 add metrics; thread pool naming 2025-05-28 14:48:19 -04:00
518219d501 set aps-ext-rad tile version range 2025-05-28 09:57:36 -04:00
e6f73a5795 fix README link 2025-05-27 20:53:49 -04:00
6 changed files with 67 additions and 21 deletions

View File

@@ -12,7 +12,7 @@ To use this extension, it depends on your Activiti implementation.
### Without APS
You need to write some code to inject the `SpringJobAwareAsyncExecutor` or `PojoJobAwareAsyncExecutor` into your `ProcessEngineConfiguration`. This is very dependent on how your implementation is setup. Then you need to include this JAR in the classpath of your Activiti application. Once installed, remember to configure the extension as explained in the [Configuration](#Configuration) section below.
You need to write some code to inject the `SpringJobAwareAsyncExecutor` or `PojoJobAwareAsyncExecutor` into your `ProcessEngineConfiguration`. This is very dependent on how your implementation is setup. Then you need to include this JAR in the classpath of your Activiti application. Once installed, remember to configure the extension as explained in the [Configuration](#spring-configuration) section below.
### With APS

View File

@@ -16,3 +16,6 @@ logger.async-ext.level=trace
logger.async-aps-ext.name=com.inteligr8.alfresco.activiti.async
logger.async-aps-ext.level=trace
logger.async-metrics.name=inteligr8.async.metrics
logger.async-metrics.level=debug

View File

@@ -17,6 +17,12 @@ package com.inteligr8.activiti.async;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.impl.asyncexecutor.AsyncExecutor;
@@ -25,23 +31,20 @@ import org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.Job;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Logger metricsLogger = LoggerFactory.getLogger("inteligr8.async.metrics");
protected Map<String, AsyncExecutor> processDefExecutors = new HashMap<>();
protected Map<String, AsyncExecutor> activityExecutors = new HashMap<>();
protected Map<String, DefaultAsyncJobExecutor> processDefExecutors = new HashMap<>();
protected Map<String, DefaultAsyncJobExecutor> activityExecutors = new HashMap<>();
private AsyncExecutor fallbackExecutor;
public AbstractJobAwareAsyncExecutor() {
this.logger.debug("Creating fallback async executor");
this.fallbackExecutor = this.createExecutor(null, null, null, null, null, null);
}
protected DefaultAsyncJobExecutor createExecutor(
protected DefaultAsyncJobExecutor createExecutor(String id,
Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis,
Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) {
this.logger.trace("Creating async executor: {}/{}<{}", corePoolSize, maxPoolSize, queueSize);
@@ -52,6 +55,16 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
executor.setKeepAliveTime(keepAliveTimeMillis == null ? 5000L : keepAliveTimeMillis);
executor.setDefaultAsyncJobAcquireWaitTimeInMillis(defaultAsyncJobAcquireWaitTimeMillis == null ? 10000 : defaultAsyncJobAcquireWaitTimeMillis);
executor.setDefaultTimerJobAcquireWaitTimeInMillis(defaultTimerJobAcquireWaitTimeMillis == null ? 10000 : defaultTimerJobAcquireWaitTimeMillis);
if (id != null) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(queueSize);
executor.setThreadPoolQueue(queue);
ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("activiti-async-" + id + "-thread-%d").build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTimeMillis, TimeUnit.MILLISECONDS, queue, threadFactory);
executor.setExecutorService(threadPool);
}
return executor;
}
@@ -59,7 +72,7 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis,
Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) {
this.logger.debug("Creating process definition async executor: {}", processDefinitionKey);
AsyncExecutor executor = this.createExecutor(corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis,
DefaultAsyncJobExecutor executor = this.createExecutor(processDefinitionKey, corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis,
defaultAsyncJobAcquireWaitTimeMillis, defaultTimerJobAcquireWaitTimeMillis);
this.processDefExecutors.put(processDefinitionKey, executor);
}
@@ -68,9 +81,10 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis,
Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) {
this.logger.debug("Creating activity async executor: {} [in {}]", activityId, processDefinitionKey);
AsyncExecutor executor = this.createExecutor(corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis,
String id = processDefinitionKey + ":" + activityId;
DefaultAsyncJobExecutor executor = this.createExecutor(id, corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis,
defaultAsyncJobAcquireWaitTimeMillis, defaultTimerJobAcquireWaitTimeMillis);
this.activityExecutors.put(processDefinitionKey + ":" + activityId, executor);
this.activityExecutors.put(id, executor);
}
protected abstract ProcessEngine getProcessEngineContext();
@@ -79,6 +93,7 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
public boolean executeAsyncJob(Job job) {
if (this.activityExecutors.isEmpty() && this.processDefExecutors.isEmpty()) {
this.logger.trace("[job:{}] No extra async executors defined; using fallback async executor: {}", job.getId(), this.fallbackExecutor);
this.captureMetrics(null, this.fallbackExecutor);
return this.fallbackExecutor.executeAsyncJob(job);
}
@@ -91,28 +106,39 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
.singleResult();
this.logger.debug("[job:{}] Discovered job process definition: {}", job.getId(), processDefinition == null ? null : processDefinition.getName());
String executorId = null;
AsyncExecutor executor = null;
Execution execution = null;
if (!this.activityExecutors.isEmpty()) {
if (job.getExecutionId() != null) {
this.logger.warn("[job:{}] Job is corrupt; has no execution; using fallback async executor: {}", job.getId(), this.fallbackExecutor);
this.captureMetrics(null, this.fallbackExecutor);
return this.fallbackExecutor.executeAsyncJob(job);
}
this.logger.trace("[job:{}] Finding job execution: {}", job.getId(), job.getExecutionId());
execution = engine.getRuntimeService().createExecutionQuery()
.executionId(job.getExecutionId())
.singleResult();
this.logger.debug("[job:{}] Discovered job execution: {}", job.getId(), execution.getName());
this.logger.debug("[job:{}] Discovered job execution at activity: {}", job.getId(), execution.getActivityId());
String activityId = processDefinition.getKey() + ":" + execution.getActivityId();
this.logger.trace("[job:{}] Finding activity async executor: {}", job.getId(), activityId);
executor = this.activityExecutors.get(activityId);
if (executor != null)
if (executor != null) {
this.logger.debug("[job:{}] Discovered and using activity async executor: {}: {}", job.getId(), activityId, executor);
executorId = activityId;
}
}
if (executor == null && processDefinition != null) {
this.logger.trace("[job:{}] Finding process definition async executor: {}", job.getId(), processDefinition.getKey());
executor = this.processDefExecutors.get(processDefinition.getKey());
if (executor != null)
if (executor != null) {
this.logger.debug("[job:{}] Discovered and using process definition async executor: {}: {}", job.getId(), processDefinition.getKey(), executor);
executorId = processDefinition.getKey();
}
}
if (executor == null) {
@@ -120,9 +146,18 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
this.logger.trace("[job:{}] Using fallback async executor: {}", job.getId(), executor);
}
this.captureMetrics(executorId, executor);
return executor.executeAsyncJob(job);
}
protected void captureMetrics(String executorId, AsyncExecutor executor) {
if (this.metricsLogger.isDebugEnabled() && executor instanceof DefaultAsyncJobExecutor) {
DefaultAsyncJobExecutor dajexecutor = (DefaultAsyncJobExecutor) executor;
ThreadPoolExecutor pool = (ThreadPoolExecutor) dajexecutor.getExecutorService();
this.metricsLogger.debug("[executorId:{}] threads: {}->{}/{} | aggregate: {}/{}", executorId, pool.getQueue().size(), pool.getActiveCount(), pool.getMaximumPoolSize(), pool.getCompletedTaskCount(), pool.getTaskCount());
}
}
public void setFallbackAsyncExecutor(AsyncExecutor executor) {
this.fallbackExecutor = executor;
}
@@ -139,15 +174,20 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
@Override
public void start() {
if (this.fallbackExecutor == null) {
this.logger.debug("Creating fallback async executor");
this.fallbackExecutor = this.createExecutor(null, null, null, null, null, null, null);
}
this.logger.trace("Starting fallback async executor: {}", this.fallbackExecutor);
this.fallbackExecutor.start();
for (Entry<String, AsyncExecutor> executor : this.processDefExecutors.entrySet()) {
for (Entry<String, ? extends AsyncExecutor> executor : this.processDefExecutors.entrySet()) {
this.logger.trace("Starting process definition async executor: {}: {}", executor.getKey(), executor.getValue());
executor.getValue().start();
}
for (Entry<String, AsyncExecutor> executor : this.activityExecutors.entrySet()) {
for (Entry<String, ? extends AsyncExecutor> executor : this.activityExecutors.entrySet()) {
this.logger.trace("Starting activity async executor: {}: {}", executor.getKey(), executor.getValue());
executor.getValue().start();
}
@@ -155,12 +195,12 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
@Override
public void shutdown() {
for (Entry<String, AsyncExecutor> executor : this.activityExecutors.entrySet()) {
for (Entry<String, ? extends AsyncExecutor> executor : this.activityExecutors.entrySet()) {
this.logger.trace("Stopping activity async executor: {}: {}", executor.getKey(), executor.getValue());
executor.getValue().shutdown();
}
for (Entry<String, AsyncExecutor> executor : this.processDefExecutors.entrySet()) {
for (Entry<String, ? extends AsyncExecutor> executor : this.processDefExecutors.entrySet()) {
this.logger.trace("Stopping process definition async executor: {}: {}", executor.getKey(), executor.getValue());
executor.getValue().shutdown();
}

View File

@@ -72,7 +72,7 @@ public class SpringJobAwareAsyncExecutor extends AbstractJobAwareAsyncExecutor i
}
private DefaultAsyncJobExecutor createExecutor(String id) {
return super.createExecutor(
return super.createExecutor(id,
this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".corePoolSize", Integer.class, 1),
this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".maxPoolSize", Integer.class, 4),
this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".queueSize", Integer.class, 512),

View File

@@ -13,3 +13,6 @@ logger.aspose-license.level=off
logger.async-ext.name=com.inteligr8.activiti.async
logger.async-ext.level=trace
logger.async-metrics.name=inteligr8.async.metrics
logger.async-metrics.level=debug

View File

@@ -85,7 +85,7 @@
<applyBefore>com.inteligr8.activiti:async-activiti-ext-parent</applyBefore>
<tiles>
<!-- Documentation: https://git.inteligr8.com/inteligr8/ootbee-beedk/src/branch/stable/beedk-aps-ext-rad-tile -->
<tile>com.inteligr8.ootbee:beedk-aps-ext-rad-tile:1.1-SNAPSHOT</tile>
<tile>com.inteligr8.ootbee:beedk-aps-ext-rad-tile:[1.1,2.0)</tile>
</tiles>
</configuration>
</plugin>