From 5ee3cbee422d3bfe2c1800e5f655f170b86c9d72 Mon Sep 17 00:00:00 2001 From: "Brian M. Long" Date: Wed, 28 May 2025 14:48:19 -0400 Subject: [PATCH] add metrics; thread pool naming --- .../src/test/resources/log4j2-test.properties | 3 + .../async/AbstractJobAwareAsyncExecutor.java | 76 ++++++++++++++----- .../async/SpringJobAwareAsyncExecutor.java | 2 +- .../src/test/resources/log4j2-test.properties | 3 + 4 files changed, 65 insertions(+), 19 deletions(-) diff --git a/activiti-app-ext/src/test/resources/log4j2-test.properties b/activiti-app-ext/src/test/resources/log4j2-test.properties index 527ea1f..44f3fbc 100644 --- a/activiti-app-ext/src/test/resources/log4j2-test.properties +++ b/activiti-app-ext/src/test/resources/log4j2-test.properties @@ -16,3 +16,6 @@ logger.async-ext.level=trace logger.async-aps-ext.name=com.inteligr8.alfresco.activiti.async logger.async-aps-ext.level=trace + +logger.async-metrics.name=inteligr8.async.metrics +logger.async-metrics.level=debug diff --git a/activiti-ext/src/main/java/com/inteligr8/activiti/async/AbstractJobAwareAsyncExecutor.java b/activiti-ext/src/main/java/com/inteligr8/activiti/async/AbstractJobAwareAsyncExecutor.java index 0398fd2..b8c3a3b 100644 --- a/activiti-ext/src/main/java/com/inteligr8/activiti/async/AbstractJobAwareAsyncExecutor.java +++ b/activiti-ext/src/main/java/com/inteligr8/activiti/async/AbstractJobAwareAsyncExecutor.java @@ -17,6 +17,12 @@ package com.inteligr8.activiti.async; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.activiti.engine.ProcessEngine; import org.activiti.engine.impl.asyncexecutor.AsyncExecutor; @@ -25,23 +31,20 @@ import org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl; import org.activiti.engine.repository.ProcessDefinition; import org.activiti.engine.runtime.Execution; import org.activiti.engine.runtime.Job; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor { private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Logger metricsLogger = LoggerFactory.getLogger("inteligr8.async.metrics"); - protected Map processDefExecutors = new HashMap<>(); - protected Map activityExecutors = new HashMap<>(); + protected Map processDefExecutors = new HashMap<>(); + protected Map activityExecutors = new HashMap<>(); private AsyncExecutor fallbackExecutor; - public AbstractJobAwareAsyncExecutor() { - this.logger.debug("Creating fallback async executor"); - this.fallbackExecutor = this.createExecutor(null, null, null, null, null, null); - } - - protected DefaultAsyncJobExecutor createExecutor( + protected DefaultAsyncJobExecutor createExecutor(String id, Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis, Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) { this.logger.trace("Creating async executor: {}/{}<{}", corePoolSize, maxPoolSize, queueSize); @@ -52,6 +55,16 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor { executor.setKeepAliveTime(keepAliveTimeMillis == null ? 5000L : keepAliveTimeMillis); executor.setDefaultAsyncJobAcquireWaitTimeInMillis(defaultAsyncJobAcquireWaitTimeMillis == null ? 10000 : defaultAsyncJobAcquireWaitTimeMillis); executor.setDefaultTimerJobAcquireWaitTimeInMillis(defaultTimerJobAcquireWaitTimeMillis == null ? 10000 : defaultTimerJobAcquireWaitTimeMillis); + + if (id != null) { + BlockingQueue queue = new ArrayBlockingQueue<>(queueSize); + executor.setThreadPoolQueue(queue); + + ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("activiti-async-" + id + "-thread-%d").build(); + ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTimeMillis, TimeUnit.MILLISECONDS, queue, threadFactory); + executor.setExecutorService(threadPool); + } + return executor; } @@ -59,7 +72,7 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor { Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis, Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) { this.logger.debug("Creating process definition async executor: {}", processDefinitionKey); - AsyncExecutor executor = this.createExecutor(corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis, + DefaultAsyncJobExecutor executor = this.createExecutor(processDefinitionKey, corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis, defaultAsyncJobAcquireWaitTimeMillis, defaultTimerJobAcquireWaitTimeMillis); this.processDefExecutors.put(processDefinitionKey, executor); } @@ -68,9 +81,10 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor { Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis, Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) { this.logger.debug("Creating activity async executor: {} [in {}]", activityId, processDefinitionKey); - AsyncExecutor executor = this.createExecutor(corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis, + String id = processDefinitionKey + ":" + activityId; + DefaultAsyncJobExecutor executor = this.createExecutor(id, corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis, defaultAsyncJobAcquireWaitTimeMillis, defaultTimerJobAcquireWaitTimeMillis); - this.activityExecutors.put(processDefinitionKey + ":" + activityId, executor); + this.activityExecutors.put(id, executor); } protected abstract ProcessEngine getProcessEngineContext(); @@ -79,6 +93,7 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor { public boolean executeAsyncJob(Job job) { if (this.activityExecutors.isEmpty() && this.processDefExecutors.isEmpty()) { this.logger.trace("[job:{}] No extra async executors defined; using fallback async executor: {}", job.getId(), this.fallbackExecutor); + this.captureMetrics(null, this.fallbackExecutor); return this.fallbackExecutor.executeAsyncJob(job); } @@ -91,28 +106,39 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor { .singleResult(); this.logger.debug("[job:{}] Discovered job process definition: {}", job.getId(), processDefinition == null ? null : processDefinition.getName()); + String executorId = null; AsyncExecutor executor = null; Execution execution = null; if (!this.activityExecutors.isEmpty()) { + if (job.getExecutionId() != null) { + this.logger.warn("[job:{}] Job is corrupt; has no execution; using fallback async executor: {}", job.getId(), this.fallbackExecutor); + this.captureMetrics(null, this.fallbackExecutor); + return this.fallbackExecutor.executeAsyncJob(job); + } + this.logger.trace("[job:{}] Finding job execution: {}", job.getId(), job.getExecutionId()); execution = engine.getRuntimeService().createExecutionQuery() .executionId(job.getExecutionId()) .singleResult(); - this.logger.debug("[job:{}] Discovered job execution: {}", job.getId(), execution.getName()); + this.logger.debug("[job:{}] Discovered job execution at activity: {}", job.getId(), execution.getActivityId()); String activityId = processDefinition.getKey() + ":" + execution.getActivityId(); this.logger.trace("[job:{}] Finding activity async executor: {}", job.getId(), activityId); executor = this.activityExecutors.get(activityId); - if (executor != null) + if (executor != null) { this.logger.debug("[job:{}] Discovered and using activity async executor: {}: {}", job.getId(), activityId, executor); + executorId = activityId; + } } if (executor == null && processDefinition != null) { this.logger.trace("[job:{}] Finding process definition async executor: {}", job.getId(), processDefinition.getKey()); executor = this.processDefExecutors.get(processDefinition.getKey()); - if (executor != null) + if (executor != null) { this.logger.debug("[job:{}] Discovered and using process definition async executor: {}: {}", job.getId(), processDefinition.getKey(), executor); + executorId = processDefinition.getKey(); + } } if (executor == null) { @@ -120,9 +146,18 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor { this.logger.trace("[job:{}] Using fallback async executor: {}", job.getId(), executor); } + this.captureMetrics(executorId, executor); return executor.executeAsyncJob(job); } + protected void captureMetrics(String executorId, AsyncExecutor executor) { + if (this.metricsLogger.isDebugEnabled() && executor instanceof DefaultAsyncJobExecutor) { + DefaultAsyncJobExecutor dajexecutor = (DefaultAsyncJobExecutor) executor; + ThreadPoolExecutor pool = (ThreadPoolExecutor) dajexecutor.getExecutorService(); + this.metricsLogger.debug("[executorId:{}] threads: {}->{}/{} | aggregate: {}/{}", executorId, pool.getQueue().size(), pool.getActiveCount(), pool.getMaximumPoolSize(), pool.getCompletedTaskCount(), pool.getTaskCount()); + } + } + public void setFallbackAsyncExecutor(AsyncExecutor executor) { this.fallbackExecutor = executor; } @@ -139,15 +174,20 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor { @Override public void start() { + if (this.fallbackExecutor == null) { + this.logger.debug("Creating fallback async executor"); + this.fallbackExecutor = this.createExecutor(null, null, null, null, null, null, null); + } + this.logger.trace("Starting fallback async executor: {}", this.fallbackExecutor); this.fallbackExecutor.start(); - for (Entry executor : this.processDefExecutors.entrySet()) { + for (Entry executor : this.processDefExecutors.entrySet()) { this.logger.trace("Starting process definition async executor: {}: {}", executor.getKey(), executor.getValue()); executor.getValue().start(); } - for (Entry executor : this.activityExecutors.entrySet()) { + for (Entry executor : this.activityExecutors.entrySet()) { this.logger.trace("Starting activity async executor: {}: {}", executor.getKey(), executor.getValue()); executor.getValue().start(); } @@ -155,12 +195,12 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor { @Override public void shutdown() { - for (Entry executor : this.activityExecutors.entrySet()) { + for (Entry executor : this.activityExecutors.entrySet()) { this.logger.trace("Stopping activity async executor: {}: {}", executor.getKey(), executor.getValue()); executor.getValue().shutdown(); } - for (Entry executor : this.processDefExecutors.entrySet()) { + for (Entry executor : this.processDefExecutors.entrySet()) { this.logger.trace("Stopping process definition async executor: {}: {}", executor.getKey(), executor.getValue()); executor.getValue().shutdown(); } diff --git a/activiti-ext/src/main/java/com/inteligr8/activiti/async/SpringJobAwareAsyncExecutor.java b/activiti-ext/src/main/java/com/inteligr8/activiti/async/SpringJobAwareAsyncExecutor.java index 4ba3a22..a3a7107 100644 --- a/activiti-ext/src/main/java/com/inteligr8/activiti/async/SpringJobAwareAsyncExecutor.java +++ b/activiti-ext/src/main/java/com/inteligr8/activiti/async/SpringJobAwareAsyncExecutor.java @@ -72,7 +72,7 @@ public class SpringJobAwareAsyncExecutor extends AbstractJobAwareAsyncExecutor i } private DefaultAsyncJobExecutor createExecutor(String id) { - return super.createExecutor( + return super.createExecutor(id, this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".corePoolSize", Integer.class, 1), this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".maxPoolSize", Integer.class, 4), this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".queueSize", Integer.class, 512), diff --git a/activiti-ext/src/test/resources/log4j2-test.properties b/activiti-ext/src/test/resources/log4j2-test.properties index 572ce9c..d2b14b5 100644 --- a/activiti-ext/src/test/resources/log4j2-test.properties +++ b/activiti-ext/src/test/resources/log4j2-test.properties @@ -13,3 +13,6 @@ logger.aspose-license.level=off logger.async-ext.name=com.inteligr8.activiti.async logger.async-ext.level=trace + +logger.async-metrics.name=inteligr8.async.metrics +logger.async-metrics.level=debug