add metrics; thread pool naming

This commit is contained in:
2025-05-28 14:48:19 -04:00
parent 518219d501
commit 5ee3cbee42
4 changed files with 65 additions and 19 deletions

View File

@@ -16,3 +16,6 @@ logger.async-ext.level=trace
logger.async-aps-ext.name=com.inteligr8.alfresco.activiti.async logger.async-aps-ext.name=com.inteligr8.alfresco.activiti.async
logger.async-aps-ext.level=trace logger.async-aps-ext.level=trace
logger.async-metrics.name=inteligr8.async.metrics
logger.async-metrics.level=debug

View File

@@ -17,6 +17,12 @@ package com.inteligr8.activiti.async;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.activiti.engine.ProcessEngine; import org.activiti.engine.ProcessEngine;
import org.activiti.engine.impl.asyncexecutor.AsyncExecutor; import org.activiti.engine.impl.asyncexecutor.AsyncExecutor;
@@ -25,23 +31,20 @@ import org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.activiti.engine.repository.ProcessDefinition; import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.Execution; import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.Job; import org.activiti.engine.runtime.Job;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor { public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Logger metricsLogger = LoggerFactory.getLogger("inteligr8.async.metrics");
protected Map<String, AsyncExecutor> processDefExecutors = new HashMap<>(); protected Map<String, DefaultAsyncJobExecutor> processDefExecutors = new HashMap<>();
protected Map<String, AsyncExecutor> activityExecutors = new HashMap<>(); protected Map<String, DefaultAsyncJobExecutor> activityExecutors = new HashMap<>();
private AsyncExecutor fallbackExecutor; private AsyncExecutor fallbackExecutor;
public AbstractJobAwareAsyncExecutor() { protected DefaultAsyncJobExecutor createExecutor(String id,
this.logger.debug("Creating fallback async executor");
this.fallbackExecutor = this.createExecutor(null, null, null, null, null, null);
}
protected DefaultAsyncJobExecutor createExecutor(
Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis, Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis,
Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) { Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) {
this.logger.trace("Creating async executor: {}/{}<{}", corePoolSize, maxPoolSize, queueSize); this.logger.trace("Creating async executor: {}/{}<{}", corePoolSize, maxPoolSize, queueSize);
@@ -52,6 +55,16 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
executor.setKeepAliveTime(keepAliveTimeMillis == null ? 5000L : keepAliveTimeMillis); executor.setKeepAliveTime(keepAliveTimeMillis == null ? 5000L : keepAliveTimeMillis);
executor.setDefaultAsyncJobAcquireWaitTimeInMillis(defaultAsyncJobAcquireWaitTimeMillis == null ? 10000 : defaultAsyncJobAcquireWaitTimeMillis); executor.setDefaultAsyncJobAcquireWaitTimeInMillis(defaultAsyncJobAcquireWaitTimeMillis == null ? 10000 : defaultAsyncJobAcquireWaitTimeMillis);
executor.setDefaultTimerJobAcquireWaitTimeInMillis(defaultTimerJobAcquireWaitTimeMillis == null ? 10000 : defaultTimerJobAcquireWaitTimeMillis); executor.setDefaultTimerJobAcquireWaitTimeInMillis(defaultTimerJobAcquireWaitTimeMillis == null ? 10000 : defaultTimerJobAcquireWaitTimeMillis);
if (id != null) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(queueSize);
executor.setThreadPoolQueue(queue);
ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("activiti-async-" + id + "-thread-%d").build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTimeMillis, TimeUnit.MILLISECONDS, queue, threadFactory);
executor.setExecutorService(threadPool);
}
return executor; return executor;
} }
@@ -59,7 +72,7 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis, Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis,
Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) { Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) {
this.logger.debug("Creating process definition async executor: {}", processDefinitionKey); this.logger.debug("Creating process definition async executor: {}", processDefinitionKey);
AsyncExecutor executor = this.createExecutor(corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis, DefaultAsyncJobExecutor executor = this.createExecutor(processDefinitionKey, corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis,
defaultAsyncJobAcquireWaitTimeMillis, defaultTimerJobAcquireWaitTimeMillis); defaultAsyncJobAcquireWaitTimeMillis, defaultTimerJobAcquireWaitTimeMillis);
this.processDefExecutors.put(processDefinitionKey, executor); this.processDefExecutors.put(processDefinitionKey, executor);
} }
@@ -68,9 +81,10 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis, Integer corePoolSize, Integer maxPoolSize, Integer queueSize, Long keepAliveTimeMillis,
Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) { Integer defaultAsyncJobAcquireWaitTimeMillis, Integer defaultTimerJobAcquireWaitTimeMillis) {
this.logger.debug("Creating activity async executor: {} [in {}]", activityId, processDefinitionKey); this.logger.debug("Creating activity async executor: {} [in {}]", activityId, processDefinitionKey);
AsyncExecutor executor = this.createExecutor(corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis, String id = processDefinitionKey + ":" + activityId;
DefaultAsyncJobExecutor executor = this.createExecutor(id, corePoolSize, maxPoolSize, queueSize, keepAliveTimeMillis,
defaultAsyncJobAcquireWaitTimeMillis, defaultTimerJobAcquireWaitTimeMillis); defaultAsyncJobAcquireWaitTimeMillis, defaultTimerJobAcquireWaitTimeMillis);
this.activityExecutors.put(processDefinitionKey + ":" + activityId, executor); this.activityExecutors.put(id, executor);
} }
protected abstract ProcessEngine getProcessEngineContext(); protected abstract ProcessEngine getProcessEngineContext();
@@ -79,6 +93,7 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
public boolean executeAsyncJob(Job job) { public boolean executeAsyncJob(Job job) {
if (this.activityExecutors.isEmpty() && this.processDefExecutors.isEmpty()) { if (this.activityExecutors.isEmpty() && this.processDefExecutors.isEmpty()) {
this.logger.trace("[job:{}] No extra async executors defined; using fallback async executor: {}", job.getId(), this.fallbackExecutor); this.logger.trace("[job:{}] No extra async executors defined; using fallback async executor: {}", job.getId(), this.fallbackExecutor);
this.captureMetrics(null, this.fallbackExecutor);
return this.fallbackExecutor.executeAsyncJob(job); return this.fallbackExecutor.executeAsyncJob(job);
} }
@@ -91,28 +106,39 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
.singleResult(); .singleResult();
this.logger.debug("[job:{}] Discovered job process definition: {}", job.getId(), processDefinition == null ? null : processDefinition.getName()); this.logger.debug("[job:{}] Discovered job process definition: {}", job.getId(), processDefinition == null ? null : processDefinition.getName());
String executorId = null;
AsyncExecutor executor = null; AsyncExecutor executor = null;
Execution execution = null; Execution execution = null;
if (!this.activityExecutors.isEmpty()) { if (!this.activityExecutors.isEmpty()) {
if (job.getExecutionId() != null) {
this.logger.warn("[job:{}] Job is corrupt; has no execution; using fallback async executor: {}", job.getId(), this.fallbackExecutor);
this.captureMetrics(null, this.fallbackExecutor);
return this.fallbackExecutor.executeAsyncJob(job);
}
this.logger.trace("[job:{}] Finding job execution: {}", job.getId(), job.getExecutionId()); this.logger.trace("[job:{}] Finding job execution: {}", job.getId(), job.getExecutionId());
execution = engine.getRuntimeService().createExecutionQuery() execution = engine.getRuntimeService().createExecutionQuery()
.executionId(job.getExecutionId()) .executionId(job.getExecutionId())
.singleResult(); .singleResult();
this.logger.debug("[job:{}] Discovered job execution: {}", job.getId(), execution.getName()); this.logger.debug("[job:{}] Discovered job execution at activity: {}", job.getId(), execution.getActivityId());
String activityId = processDefinition.getKey() + ":" + execution.getActivityId(); String activityId = processDefinition.getKey() + ":" + execution.getActivityId();
this.logger.trace("[job:{}] Finding activity async executor: {}", job.getId(), activityId); this.logger.trace("[job:{}] Finding activity async executor: {}", job.getId(), activityId);
executor = this.activityExecutors.get(activityId); executor = this.activityExecutors.get(activityId);
if (executor != null) if (executor != null) {
this.logger.debug("[job:{}] Discovered and using activity async executor: {}: {}", job.getId(), activityId, executor); this.logger.debug("[job:{}] Discovered and using activity async executor: {}: {}", job.getId(), activityId, executor);
executorId = activityId;
}
} }
if (executor == null && processDefinition != null) { if (executor == null && processDefinition != null) {
this.logger.trace("[job:{}] Finding process definition async executor: {}", job.getId(), processDefinition.getKey()); this.logger.trace("[job:{}] Finding process definition async executor: {}", job.getId(), processDefinition.getKey());
executor = this.processDefExecutors.get(processDefinition.getKey()); executor = this.processDefExecutors.get(processDefinition.getKey());
if (executor != null) if (executor != null) {
this.logger.debug("[job:{}] Discovered and using process definition async executor: {}: {}", job.getId(), processDefinition.getKey(), executor); this.logger.debug("[job:{}] Discovered and using process definition async executor: {}: {}", job.getId(), processDefinition.getKey(), executor);
executorId = processDefinition.getKey();
}
} }
if (executor == null) { if (executor == null) {
@@ -120,9 +146,18 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
this.logger.trace("[job:{}] Using fallback async executor: {}", job.getId(), executor); this.logger.trace("[job:{}] Using fallback async executor: {}", job.getId(), executor);
} }
this.captureMetrics(executorId, executor);
return executor.executeAsyncJob(job); return executor.executeAsyncJob(job);
} }
protected void captureMetrics(String executorId, AsyncExecutor executor) {
if (this.metricsLogger.isDebugEnabled() && executor instanceof DefaultAsyncJobExecutor) {
DefaultAsyncJobExecutor dajexecutor = (DefaultAsyncJobExecutor) executor;
ThreadPoolExecutor pool = (ThreadPoolExecutor) dajexecutor.getExecutorService();
this.metricsLogger.debug("[executorId:{}] threads: {}->{}/{} | aggregate: {}/{}", executorId, pool.getQueue().size(), pool.getActiveCount(), pool.getMaximumPoolSize(), pool.getCompletedTaskCount(), pool.getTaskCount());
}
}
public void setFallbackAsyncExecutor(AsyncExecutor executor) { public void setFallbackAsyncExecutor(AsyncExecutor executor) {
this.fallbackExecutor = executor; this.fallbackExecutor = executor;
} }
@@ -139,15 +174,20 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
@Override @Override
public void start() { public void start() {
if (this.fallbackExecutor == null) {
this.logger.debug("Creating fallback async executor");
this.fallbackExecutor = this.createExecutor(null, null, null, null, null, null, null);
}
this.logger.trace("Starting fallback async executor: {}", this.fallbackExecutor); this.logger.trace("Starting fallback async executor: {}", this.fallbackExecutor);
this.fallbackExecutor.start(); this.fallbackExecutor.start();
for (Entry<String, AsyncExecutor> executor : this.processDefExecutors.entrySet()) { for (Entry<String, ? extends AsyncExecutor> executor : this.processDefExecutors.entrySet()) {
this.logger.trace("Starting process definition async executor: {}: {}", executor.getKey(), executor.getValue()); this.logger.trace("Starting process definition async executor: {}: {}", executor.getKey(), executor.getValue());
executor.getValue().start(); executor.getValue().start();
} }
for (Entry<String, AsyncExecutor> executor : this.activityExecutors.entrySet()) { for (Entry<String, ? extends AsyncExecutor> executor : this.activityExecutors.entrySet()) {
this.logger.trace("Starting activity async executor: {}: {}", executor.getKey(), executor.getValue()); this.logger.trace("Starting activity async executor: {}: {}", executor.getKey(), executor.getValue());
executor.getValue().start(); executor.getValue().start();
} }
@@ -155,12 +195,12 @@ public abstract class AbstractJobAwareAsyncExecutor implements AsyncExecutor {
@Override @Override
public void shutdown() { public void shutdown() {
for (Entry<String, AsyncExecutor> executor : this.activityExecutors.entrySet()) { for (Entry<String, ? extends AsyncExecutor> executor : this.activityExecutors.entrySet()) {
this.logger.trace("Stopping activity async executor: {}: {}", executor.getKey(), executor.getValue()); this.logger.trace("Stopping activity async executor: {}: {}", executor.getKey(), executor.getValue());
executor.getValue().shutdown(); executor.getValue().shutdown();
} }
for (Entry<String, AsyncExecutor> executor : this.processDefExecutors.entrySet()) { for (Entry<String, ? extends AsyncExecutor> executor : this.processDefExecutors.entrySet()) {
this.logger.trace("Stopping process definition async executor: {}: {}", executor.getKey(), executor.getValue()); this.logger.trace("Stopping process definition async executor: {}: {}", executor.getKey(), executor.getValue());
executor.getValue().shutdown(); executor.getValue().shutdown();
} }

View File

@@ -72,7 +72,7 @@ public class SpringJobAwareAsyncExecutor extends AbstractJobAwareAsyncExecutor i
} }
private DefaultAsyncJobExecutor createExecutor(String id) { private DefaultAsyncJobExecutor createExecutor(String id) {
return super.createExecutor( return super.createExecutor(id,
this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".corePoolSize", Integer.class, 1), this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".corePoolSize", Integer.class, 1),
this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".maxPoolSize", Integer.class, 4), this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".maxPoolSize", Integer.class, 4),
this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".queueSize", Integer.class, 512), this.appContext.getEnvironment().getProperty("inteligr8.async.executor." + id + ".queueSize", Integer.class, 512),

View File

@@ -13,3 +13,6 @@ logger.aspose-license.level=off
logger.async-ext.name=com.inteligr8.activiti.async logger.async-ext.name=com.inteligr8.activiti.async
logger.async-ext.level=trace logger.async-ext.level=trace
logger.async-metrics.name=inteligr8.async.metrics
logger.async-metrics.level=debug