final touches

This commit is contained in:
Brian Long 2023-11-06 14:26:49 -05:00
parent f0a61f5f57
commit fe11673255
4 changed files with 190 additions and 91 deletions

View File

@ -1,3 +1,17 @@
/*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.inteligr8.activiti.timer;
import java.util.Collection;
@ -15,16 +29,18 @@ import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.delegate.event.ActivitiEventListener;
import org.activiti.engine.impl.persistence.entity.TimerJobEntity;
import org.activiti.engine.impl.persistence.entity.AbstractJobEntity;
import org.activiti.engine.runtime.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public abstract class AbstractTimerRetryListener implements ActivitiEventListener {
public abstract class AbstractTimerRetryListener<T extends AbstractJobEntity> implements ActivitiEventListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Pattern isoRepeatPattern = Pattern.compile("^R(-?[0-9]*)/");
@ -33,84 +49,109 @@ public abstract class AbstractTimerRetryListener implements ActivitiEventListene
private ObjectMapper om;
@Autowired
private ProcessEngine services;
protected ProcessEngine services;
@Value("${inteligr8.timerExtension.alwaysRecordAttempts:false}")
private boolean alwaysRecordAttempts;
@Override
public boolean isFailOnException() {
return true;
}
protected boolean isEnabled(ActivitiEvent aevent, String timerId, int repeats) {
protected boolean isEnabled(ActivitiEvent aevent, TimerEventDefinition timerDef, int repeats) {
if (repeats < 0 && !this.alwaysRecordAttempts) {
this.logger.debug("[pi:{}] The timer is not repeat limited, so not recording the number of tries: {}.{}",
aevent.getProcessInstanceId(), aevent.getProcessDefinitionId(), timerDef.getId());
return false;
}
return true;
}
@Override
public void onEvent(ActivitiEvent event) {
this.logger.trace("onEvent({}, {})", event.getExecutionId(), event.getClass());
this.logger.trace("onEvent({}, {}, {}, {})", event.getType(), event.getExecutionId(), event.getProcessInstanceId(), event.getClass());
try {
TimerJobEntity job = this.getTimerJobEntity(event);
String timerId = this.getTimerId(job);
this.logger.trace("Handling event for timer: {}.{}", event.getProcessDefinitionId(), timerId);
T job = this.getJobEntity(event);
String activityId = this.getJobActivityId(job);
this.logger.trace("Handling event for entity: {}.{}", event.getProcessDefinitionId(), activityId);
Event modelEvent = this.getModelEvent(event.getProcessDefinitionId(), timerId);
Event modelEvent = this.getModelEvent(event.getProcessDefinitionId(), activityId);
if (modelEvent == null || modelEvent instanceof StartEvent) {
this.logger.trace("Start timer detected; no special handling needed: {}", timerId);
this.logger.trace("Start event detected; no special handling needed: {}", activityId);
return;
}
this.onTimerEvent(event, job, timerId, modelEvent);
this.onActivityEvent(event, job, modelEvent);
} catch (IllegalArgumentException iae) {
this.logger.warn(iae.getMessage());
} catch (IllegalStateException ise) {
this.logger.warn(ise.getMessage());
} catch (JsonProcessingException jpe) {
this.logger.warn(jpe.getMessage());
}
}
public void onTimerEvent(ActivitiEvent event, TimerJobEntity job, String timerId, Event modelEvent) {
public void onActivityEvent(ActivitiEvent event, T job, Event modelEvent) {
// we are listening to timer events, so the event must have a timer definition
TimerEventDefinition timerDef = this.getTimerDefinition(modelEvent);
if (timerDef == null)
throw new IllegalStateException("The event must have a timer definition, but none were found");
this.onTimerEvent(event, job, modelEvent, timerDef);
}
public void onTimerEvent(ActivitiEvent event, T job, Event modelEvent, TimerEventDefinition timerDef) {
int maxRepeats = this.determineRepeats(timerDef);
if (maxRepeats < 0) {
this.logger.debug("Timer is configured to repeat indefinitely");
this.logger.debug("The '{}' timer is configured to repeat indefinitely", timerDef.getId());
} else {
this.logger.debug("Timer is configured to repeat {} times for {} total attempts", maxRepeats, maxRepeats+1);
this.logger.debug("The '{}' timer is configured to repeat {} times for {} total attempts", timerDef.getId(), maxRepeats, maxRepeats+1);
}
if (!this.isEnabled(event, timerId, maxRepeats))
if (!this.isEnabled(event, timerDef, maxRepeats))
return;
this.onTimerEvent(event, job, timerId, modelEvent, maxRepeats);
this.onTimerEvent(event, job, modelEvent, timerDef, maxRepeats);
}
public void onTimerEvent(ActivitiEvent event, TimerJobEntity job, String timerId, Event modelEvent, int maxRepeats) {
this.logger.trace("Attempts stored in variable: {}", this.getAttemptsVariableName(timerId));
public void onTimerEvent(ActivitiEvent event, T job, Event modelEvent, TimerEventDefinition timerDef, int maxRepeats) {
this.logger.trace("Attempts stored in variable: {}", this.getAttemptsVariableName(timerDef.getId()));
int attempts = this.getAttempts(job, timerId);
int attempts = this.getAttempts(job, timerDef.getId());
this.logger.debug("Timer executed {} of {} times in process instance: {}", attempts, maxRepeats+1, event.getProcessInstanceId());
this.onTimerEvent(event, job, timerId, modelEvent, maxRepeats, attempts);
if (maxRepeats >= 0 && attempts > maxRepeats) {
this.onTimerExhaustion(event, modelEvent, job);
} else {
this.onTimerAttempt(event, job, timerDef, attempts);
}
}
public abstract void onTimerEvent(ActivitiEvent event, TimerJobEntity job, String timerId, Event modelEvent, int maxRepeats, int attempts);
public void onTimerExhaustion(ActivitiEvent event, Event modelEvent, T job) {
}
public abstract void onTimerAttempt(ActivitiEvent event, T job, TimerEventDefinition timerDef, int attempts);
protected TimerJobEntity getTimerJobEntity(ActivitiEvent event) {
@SuppressWarnings("unchecked")
protected T getJobEntity(ActivitiEvent event) {
if (!(event instanceof ActivitiEntityEvent))
throw new IllegalArgumentException("An entity event was expected");
throw new IllegalArgumentException("An entity event was expected; got: " + event.getClass());
Object entity = ((ActivitiEntityEvent) event).getEntity();
if (entity == null) {
throw new IllegalStateException("A timer entity was expected; got: null");
} else if (!(entity instanceof TimerJobEntity)) {
throw new IllegalStateException("A timer entity was expected; got: " + entity.getClass());
throw new IllegalStateException("An entity was expected; got: null");
} else if (!(entity instanceof AbstractJobEntity)) {
throw new IllegalStateException("An job entity was expected; got: " + entity.getClass());
} else {
return (TimerJobEntity) entity;
return (T) entity;
}
}
protected String getTimerId(TimerJobEntity job) throws JsonProcessingException {
protected String getJobActivityId(Job job) throws JsonProcessingException {
JsonNode configJson = this.om.readTree(job.getJobHandlerConfiguration());
JsonNode activityIdJson = configJson.get("activityId");
if (activityIdJson == null || activityIdJson.isNull())
@ -141,24 +182,28 @@ public abstract class AbstractTimerRetryListener implements ActivitiEventListene
protected TimerEventDefinition getTimerDefinition(Event modelEvent) {
Collection<EventDefinition> eventDefs = modelEvent.getEventDefinitions();
if (eventDefs == null || eventDefs.isEmpty())
throw new IllegalStateException("A timer element must have an event definition, but none were found");
return null;
for (EventDefinition eventDef : eventDefs) {
if (eventDef instanceof TimerEventDefinition)
return (TimerEventDefinition) eventDef;
}
throw new IllegalStateException("A timer element must have a timer event definition, but none were found");
return null;
}
/**
* @param timerDef
* @return -1 for indefinition; 0 to never repeat; 1+ for number of repeats
*/
protected int determineRepeats(TimerEventDefinition timerDef) {
String cycle = timerDef.getTimeCycle();
if (cycle == null)
return 0;
return -1;
Matcher matcher = this.isoRepeatPattern.matcher(cycle);
if (!matcher.find())
return 0;
return -1;
String repeatValue = matcher.group(1);
if (repeatValue.length() == 0) {
@ -172,7 +217,7 @@ public abstract class AbstractTimerRetryListener implements ActivitiEventListene
return timerId + ":attempts";
}
protected int getAttempts(TimerJobEntity job, String timerId) {
protected int getAttempts(Job job, String timerId) {
Integer attempts = this.services.getRuntimeService().getVariable(
job.getExecutionId(),
this.getAttemptsVariableName(timerId),
@ -182,7 +227,7 @@ public abstract class AbstractTimerRetryListener implements ActivitiEventListene
return attempts.intValue();
}
protected void setAttempts(TimerJobEntity job, String timerId, int attempts) {
protected void setAttempts(Job job, String timerId, int attempts) {
this.services.getRuntimeService().setVariable(job.getExecutionId(),
this.getAttemptsVariableName(timerId),
attempts);

View File

@ -1,49 +1,51 @@
/*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.inteligr8.activiti.timer;
import org.activiti.bpmn.model.Event;
import org.activiti.bpmn.model.TimerEventDefinition;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.impl.persistence.entity.TimerJobEntity;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.activiti.engine.runtime.ProcessInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class FiredTimerRetryListener extends AbstractTimerRetryListener {
public class FiredTimerRetryListener extends AbstractTimerRetryListener<JobEntity> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${timerExt.alwaysRecordTries:false}")
private boolean alwaysRecordTries;
@Override
protected boolean isEnabled(ActivitiEvent event, String timerId, int repeats) {
if (this.alwaysRecordTries) {
return true;
} else if (repeats < 0) {
this.logger.debug("Timer repeats indefinitely, so not recording the number of attempts: {}.{}", event.getProcessDefinitionId(), timerId);
return false;
} else if (repeats == 0) {
this.logger.debug("Timer does not repeat, so not recording the number of attempts: {}.{}", event.getProcessDefinitionId(), timerId);
return false;
} else {
return true;
}
}
@Override
public void onTimerEvent(ActivitiEvent event, TimerJobEntity job, String timerId, Event modelEvent, int maxRepeats, int attempts) {
this.logger.trace("onTimerEvent({}, {}, {}, {}, {}, {})", event.getExecutionId(), job.getId(), timerId, modelEvent.getId(), maxRepeats, attempts);
public void onTimerAttempt(ActivitiEvent event, JobEntity job, TimerEventDefinition timerDef, int attempts) {
this.logger.trace("onTimerAttempt({}, {}, {}, {})", event.getExecutionId(), job.getId(), timerDef.getId(), attempts);
attempts++;
if (maxRepeats < 0) {
this.logger.trace("Timer '{}' fired for attempt {}: {}", timerId, attempts, event.getProcessInstanceId());
} else if (attempts > maxRepeats) {
this.logger.warn("Timer '{}' repeats exhausted; should not have been scheduled: {}", timerId, event.getProcessInstanceId());
/**
* If the timer is fired in the same transaction where the prcoess instance ends, SQL issue occur.
* This is undoubtedly an Activiti bug, but we will do a check here just to avoid it.
*/
ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery()
.processInstanceId(job.getProcessInstanceId())
.singleResult();
if (pi == null) {
this.logger.debug("The process instance could not be found; unable to increment the timer variable: {}", timerDef.getId());
} else if (pi.isEnded()) {
this.logger.debug("The process instance has ended; unable to increment the timer variable: {}", timerDef.getId());
} else {
this.logger.debug("Timer '{}' fired for attempt {} of {}: {}", timerId, attempts, maxRepeats+1, event.getProcessInstanceId());
this.setAttempts(job, timerId, attempts);
attempts++;
this.setAttempts(job, timerDef.getId(), attempts);
this.logger.debug("Fired timer incremented attempts: {}: {}: {}", event.getProcessInstanceId(), timerDef.getId(), attempts);
}
}

View File

@ -1,40 +1,58 @@
/*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.inteligr8.activiti.timer;
import org.activiti.bpmn.model.BoundaryEvent;
import org.activiti.bpmn.model.Event;
import org.activiti.engine.ProcessEngine;
import org.activiti.bpmn.model.TimerEventDefinition;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.impl.persistence.entity.TimerJobEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class ScheduledTimerRetryListener extends AbstractTimerRetryListener {
public class ScheduledTimerRetryListener extends AbstractTimerRetryListener<TimerJobEntity> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ProcessEngine services;
@Value("${inteligr8.timerExtension.recordAttemptOnScheduling:true}")
private boolean recordAttemptOnScheduling;
@Override
public void onTimerEvent(ActivitiEvent event, TimerJobEntity job, String timerId, Event modelEvent, int maxRepeats, int attempts) {
this.logger.trace("onTimerEvent({}, {}, {}, {}, {}, {})", event.getExecutionId(), job.getId(), timerId, modelEvent.getId(), maxRepeats, attempts);
public void onTimerExhaustion(ActivitiEvent event, Event modelEvent, TimerJobEntity job) {
this.logger.trace("onTimerExhaustion({}, {}, {})", event.getExecutionId(), modelEvent.getId(), job.getId());
if (maxRepeats < 0) {
this.logger.trace("Timer '{}' scheduled for attempt {}: {}", timerId, attempts+1, event.getProcessInstanceId());
} else if (attempts > maxRepeats) {
this.logger.debug("Timer '{}' repeats exhausted: {}", timerId, event.getProcessInstanceId());
if (modelEvent instanceof BoundaryEvent) {
this.logger.debug("Cancelling timer '{}' schedule; will not retry any more: {}", timerId, event.getProcessInstanceId());
this.services.getManagementService().deleteTimerJob(job.getId());
} else {
this.logger.debug("Moving timer '{}' schedule job to deadletter; will not automatically retry any more: {}", timerId, event.getProcessInstanceId());
this.services.getManagementService().moveJobToDeadLetterJob(job.getId());
}
if (event instanceof BoundaryEvent) {
this.logger.info("Scheduled timer cancelled; will not retry any more: {}", event.getProcessInstanceId());
this.services.getManagementService().deleteTimerJob(job.getId());
} else {
this.logger.trace("Timer '{}' scheduled for attempt {} of {}: {}", timerId, attempts+1, maxRepeats+1, event.getProcessInstanceId());
this.logger.info("Scheduled timer cancelled and moved to deadletter; will not automatically retry any more: {}", event.getProcessInstanceId());
this.services.getManagementService().moveJobToDeadLetterJob(job.getId());
}
}
@Override
public void onTimerAttempt(ActivitiEvent event, TimerJobEntity job, TimerEventDefinition timerDef, int attempts) {
this.logger.trace("onTimerAttempt({}, {}, {}, {})", event.getExecutionId(), job.getId(), timerDef.getId(), attempts);
if (this.recordAttemptOnScheduling) {
attempts++;
this.setAttempts(job, timerDef.getId(), attempts);
this.logger.debug("Scheduled timer incremented attempts: {}: {}: {}", event.getProcessInstanceId(), timerDef.getId(), attempts);
}
}

View File

@ -1,3 +1,17 @@
/*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.inteligr8.activiti.timer;
import javax.annotation.PostConstruct;
@ -5,6 +19,7 @@ import javax.annotation.PostConstruct;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@ -19,14 +34,33 @@ public class TimerExtension {
@Autowired
private FiredTimerRetryListener firedListener;
/**
* By default, the attempt is incremented when the timer expires/fires.
* That listener doesn't get called until the end of the regular execution.
* If that execution ends the process instance, then setting a variable
* results in wild SQL exceptions. That is an Activiti bug.
*
* The bug results in a rollback to the timer, eventually reaching the
* deadletter state. Setting this to true causes the increment to happen
* when the timer is scheduled, not expired/fired. This means the attempt
* is recorded early, but we don't run into the bug.
*
* Setting it to `false` causes the increment to happen after the attempt,
* which may also be undesirable; and also must skirt the Activiti bug.
*/
@Value("${inteligr8.timerExtension.recordAttemptOnScheduling:true}")
private boolean recordAttemptOnScheduling;
@PostConstruct
public void registerListeners() {
this.services.getRuntimeService().addEventListener(
this.scheduledListener,
ActivitiEventType.TIMER_SCHEDULED);
this.services.getRuntimeService().addEventListener(
this.firedListener,
ActivitiEventType.TIMER_FIRED);
if (!this.recordAttemptOnScheduling) {
this.services.getRuntimeService().addEventListener(
this.firedListener,
ActivitiEventType.TIMER_FIRED);
}
}
}