diff --git a/src/main/java/com/inteligr8/activiti/timer/AbstractTimerRetryListener.java b/src/main/java/com/inteligr8/activiti/timer/AbstractTimerRetryListener.java index 47eb2a6..475024b 100644 --- a/src/main/java/com/inteligr8/activiti/timer/AbstractTimerRetryListener.java +++ b/src/main/java/com/inteligr8/activiti/timer/AbstractTimerRetryListener.java @@ -1,3 +1,17 @@ +/* + * This program is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see . + */ package com.inteligr8.activiti.timer; import java.util.Collection; @@ -15,16 +29,18 @@ import org.activiti.engine.ProcessEngine; import org.activiti.engine.delegate.event.ActivitiEntityEvent; import org.activiti.engine.delegate.event.ActivitiEvent; import org.activiti.engine.delegate.event.ActivitiEventListener; -import org.activiti.engine.impl.persistence.entity.TimerJobEntity; +import org.activiti.engine.impl.persistence.entity.AbstractJobEntity; +import org.activiti.engine.runtime.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -public abstract class AbstractTimerRetryListener implements ActivitiEventListener { +public abstract class AbstractTimerRetryListener implements ActivitiEventListener { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Pattern isoRepeatPattern = Pattern.compile("^R(-?[0-9]*)/"); @@ -33,84 +49,109 @@ public abstract class AbstractTimerRetryListener implements ActivitiEventListene private ObjectMapper om; @Autowired - private ProcessEngine services; + protected ProcessEngine services; + + @Value("${inteligr8.timerExtension.alwaysRecordAttempts:false}") + private boolean alwaysRecordAttempts; @Override public boolean isFailOnException() { return true; } - protected boolean isEnabled(ActivitiEvent aevent, String timerId, int repeats) { + protected boolean isEnabled(ActivitiEvent aevent, TimerEventDefinition timerDef, int repeats) { + if (repeats < 0 && !this.alwaysRecordAttempts) { + this.logger.debug("[pi:{}] The timer is not repeat limited, so not recording the number of tries: {}.{}", + aevent.getProcessInstanceId(), aevent.getProcessDefinitionId(), timerDef.getId()); + return false; + } + return true; } @Override public void onEvent(ActivitiEvent event) { - this.logger.trace("onEvent({}, {})", event.getExecutionId(), event.getClass()); + this.logger.trace("onEvent({}, {}, {}, {})", event.getType(), event.getExecutionId(), event.getProcessInstanceId(), event.getClass()); try { - TimerJobEntity job = this.getTimerJobEntity(event); - String timerId = this.getTimerId(job); - this.logger.trace("Handling event for timer: {}.{}", event.getProcessDefinitionId(), timerId); + T job = this.getJobEntity(event); + String activityId = this.getJobActivityId(job); + this.logger.trace("Handling event for entity: {}.{}", event.getProcessDefinitionId(), activityId); - Event modelEvent = this.getModelEvent(event.getProcessDefinitionId(), timerId); + Event modelEvent = this.getModelEvent(event.getProcessDefinitionId(), activityId); if (modelEvent == null || modelEvent instanceof StartEvent) { - this.logger.trace("Start timer detected; no special handling needed: {}", timerId); + this.logger.trace("Start event detected; no special handling needed: {}", activityId); return; } - this.onTimerEvent(event, job, timerId, modelEvent); - + this.onActivityEvent(event, job, modelEvent); } catch (IllegalArgumentException iae) { + this.logger.warn(iae.getMessage()); } catch (IllegalStateException ise) { + this.logger.warn(ise.getMessage()); } catch (JsonProcessingException jpe) { - + this.logger.warn(jpe.getMessage()); } } - public void onTimerEvent(ActivitiEvent event, TimerJobEntity job, String timerId, Event modelEvent) { + public void onActivityEvent(ActivitiEvent event, T job, Event modelEvent) { + // we are listening to timer events, so the event must have a timer definition TimerEventDefinition timerDef = this.getTimerDefinition(modelEvent); + if (timerDef == null) + throw new IllegalStateException("The event must have a timer definition, but none were found"); + this.onTimerEvent(event, job, modelEvent, timerDef); + } + + public void onTimerEvent(ActivitiEvent event, T job, Event modelEvent, TimerEventDefinition timerDef) { int maxRepeats = this.determineRepeats(timerDef); if (maxRepeats < 0) { - this.logger.debug("Timer is configured to repeat indefinitely"); + this.logger.debug("The '{}' timer is configured to repeat indefinitely", timerDef.getId()); } else { - this.logger.debug("Timer is configured to repeat {} times for {} total attempts", maxRepeats, maxRepeats+1); + this.logger.debug("The '{}' timer is configured to repeat {} times for {} total attempts", timerDef.getId(), maxRepeats, maxRepeats+1); } - if (!this.isEnabled(event, timerId, maxRepeats)) + if (!this.isEnabled(event, timerDef, maxRepeats)) return; - this.onTimerEvent(event, job, timerId, modelEvent, maxRepeats); + this.onTimerEvent(event, job, modelEvent, timerDef, maxRepeats); } - public void onTimerEvent(ActivitiEvent event, TimerJobEntity job, String timerId, Event modelEvent, int maxRepeats) { - this.logger.trace("Attempts stored in variable: {}", this.getAttemptsVariableName(timerId)); + public void onTimerEvent(ActivitiEvent event, T job, Event modelEvent, TimerEventDefinition timerDef, int maxRepeats) { + this.logger.trace("Attempts stored in variable: {}", this.getAttemptsVariableName(timerDef.getId())); - int attempts = this.getAttempts(job, timerId); + int attempts = this.getAttempts(job, timerDef.getId()); this.logger.debug("Timer executed {} of {} times in process instance: {}", attempts, maxRepeats+1, event.getProcessInstanceId()); - this.onTimerEvent(event, job, timerId, modelEvent, maxRepeats, attempts); + if (maxRepeats >= 0 && attempts > maxRepeats) { + this.onTimerExhaustion(event, modelEvent, job); + } else { + this.onTimerAttempt(event, job, timerDef, attempts); + } } - public abstract void onTimerEvent(ActivitiEvent event, TimerJobEntity job, String timerId, Event modelEvent, int maxRepeats, int attempts); + public void onTimerExhaustion(ActivitiEvent event, Event modelEvent, T job) { + } + + public abstract void onTimerAttempt(ActivitiEvent event, T job, TimerEventDefinition timerDef, int attempts); - protected TimerJobEntity getTimerJobEntity(ActivitiEvent event) { + @SuppressWarnings("unchecked") + protected T getJobEntity(ActivitiEvent event) { if (!(event instanceof ActivitiEntityEvent)) - throw new IllegalArgumentException("An entity event was expected"); + throw new IllegalArgumentException("An entity event was expected; got: " + event.getClass()); Object entity = ((ActivitiEntityEvent) event).getEntity(); if (entity == null) { - throw new IllegalStateException("A timer entity was expected; got: null"); - } else if (!(entity instanceof TimerJobEntity)) { - throw new IllegalStateException("A timer entity was expected; got: " + entity.getClass()); + throw new IllegalStateException("An entity was expected; got: null"); + } else if (!(entity instanceof AbstractJobEntity)) { + throw new IllegalStateException("An job entity was expected; got: " + entity.getClass()); } else { - return (TimerJobEntity) entity; + return (T) entity; } } - protected String getTimerId(TimerJobEntity job) throws JsonProcessingException { + protected String getJobActivityId(Job job) throws JsonProcessingException { JsonNode configJson = this.om.readTree(job.getJobHandlerConfiguration()); JsonNode activityIdJson = configJson.get("activityId"); if (activityIdJson == null || activityIdJson.isNull()) @@ -141,24 +182,28 @@ public abstract class AbstractTimerRetryListener implements ActivitiEventListene protected TimerEventDefinition getTimerDefinition(Event modelEvent) { Collection eventDefs = modelEvent.getEventDefinitions(); if (eventDefs == null || eventDefs.isEmpty()) - throw new IllegalStateException("A timer element must have an event definition, but none were found"); + return null; for (EventDefinition eventDef : eventDefs) { if (eventDef instanceof TimerEventDefinition) return (TimerEventDefinition) eventDef; } - throw new IllegalStateException("A timer element must have a timer event definition, but none were found"); + return null; } + /** + * @param timerDef + * @return -1 for indefinition; 0 to never repeat; 1+ for number of repeats + */ protected int determineRepeats(TimerEventDefinition timerDef) { String cycle = timerDef.getTimeCycle(); if (cycle == null) - return 0; + return -1; Matcher matcher = this.isoRepeatPattern.matcher(cycle); if (!matcher.find()) - return 0; + return -1; String repeatValue = matcher.group(1); if (repeatValue.length() == 0) { @@ -172,7 +217,7 @@ public abstract class AbstractTimerRetryListener implements ActivitiEventListene return timerId + ":attempts"; } - protected int getAttempts(TimerJobEntity job, String timerId) { + protected int getAttempts(Job job, String timerId) { Integer attempts = this.services.getRuntimeService().getVariable( job.getExecutionId(), this.getAttemptsVariableName(timerId), @@ -182,7 +227,7 @@ public abstract class AbstractTimerRetryListener implements ActivitiEventListene return attempts.intValue(); } - protected void setAttempts(TimerJobEntity job, String timerId, int attempts) { + protected void setAttempts(Job job, String timerId, int attempts) { this.services.getRuntimeService().setVariable(job.getExecutionId(), this.getAttemptsVariableName(timerId), attempts); diff --git a/src/main/java/com/inteligr8/activiti/timer/FiredTimerRetryListener.java b/src/main/java/com/inteligr8/activiti/timer/FiredTimerRetryListener.java index d10ccf8..51950e8 100644 --- a/src/main/java/com/inteligr8/activiti/timer/FiredTimerRetryListener.java +++ b/src/main/java/com/inteligr8/activiti/timer/FiredTimerRetryListener.java @@ -1,49 +1,51 @@ +/* + * This program is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see . + */ package com.inteligr8.activiti.timer; -import org.activiti.bpmn.model.Event; +import org.activiti.bpmn.model.TimerEventDefinition; import org.activiti.engine.delegate.event.ActivitiEvent; -import org.activiti.engine.impl.persistence.entity.TimerJobEntity; +import org.activiti.engine.impl.persistence.entity.JobEntity; +import org.activiti.engine.runtime.ProcessInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component -public class FiredTimerRetryListener extends AbstractTimerRetryListener { +public class FiredTimerRetryListener extends AbstractTimerRetryListener { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - @Value("${timerExt.alwaysRecordTries:false}") - private boolean alwaysRecordTries; - @Override - protected boolean isEnabled(ActivitiEvent event, String timerId, int repeats) { - if (this.alwaysRecordTries) { - return true; - } else if (repeats < 0) { - this.logger.debug("Timer repeats indefinitely, so not recording the number of attempts: {}.{}", event.getProcessDefinitionId(), timerId); - return false; - } else if (repeats == 0) { - this.logger.debug("Timer does not repeat, so not recording the number of attempts: {}.{}", event.getProcessDefinitionId(), timerId); - return false; - } else { - return true; - } - } - - @Override - public void onTimerEvent(ActivitiEvent event, TimerJobEntity job, String timerId, Event modelEvent, int maxRepeats, int attempts) { - this.logger.trace("onTimerEvent({}, {}, {}, {}, {}, {})", event.getExecutionId(), job.getId(), timerId, modelEvent.getId(), maxRepeats, attempts); + public void onTimerAttempt(ActivitiEvent event, JobEntity job, TimerEventDefinition timerDef, int attempts) { + this.logger.trace("onTimerAttempt({}, {}, {}, {})", event.getExecutionId(), job.getId(), timerDef.getId(), attempts); - attempts++; - - if (maxRepeats < 0) { - this.logger.trace("Timer '{}' fired for attempt {}: {}", timerId, attempts, event.getProcessInstanceId()); - } else if (attempts > maxRepeats) { - this.logger.warn("Timer '{}' repeats exhausted; should not have been scheduled: {}", timerId, event.getProcessInstanceId()); + /** + * If the timer is fired in the same transaction where the prcoess instance ends, SQL issue occur. + * This is undoubtedly an Activiti bug, but we will do a check here just to avoid it. + */ + ProcessInstance pi = this.services.getRuntimeService().createProcessInstanceQuery() + .processInstanceId(job.getProcessInstanceId()) + .singleResult(); + if (pi == null) { + this.logger.debug("The process instance could not be found; unable to increment the timer variable: {}", timerDef.getId()); + } else if (pi.isEnded()) { + this.logger.debug("The process instance has ended; unable to increment the timer variable: {}", timerDef.getId()); } else { - this.logger.debug("Timer '{}' fired for attempt {} of {}: {}", timerId, attempts, maxRepeats+1, event.getProcessInstanceId()); - this.setAttempts(job, timerId, attempts); + attempts++; + this.setAttempts(job, timerDef.getId(), attempts); + this.logger.debug("Fired timer incremented attempts: {}: {}: {}", event.getProcessInstanceId(), timerDef.getId(), attempts); } } diff --git a/src/main/java/com/inteligr8/activiti/timer/ScheduledTimerRetryListener.java b/src/main/java/com/inteligr8/activiti/timer/ScheduledTimerRetryListener.java index 672eb85..7639216 100644 --- a/src/main/java/com/inteligr8/activiti/timer/ScheduledTimerRetryListener.java +++ b/src/main/java/com/inteligr8/activiti/timer/ScheduledTimerRetryListener.java @@ -1,40 +1,58 @@ +/* + * This program is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see . + */ package com.inteligr8.activiti.timer; import org.activiti.bpmn.model.BoundaryEvent; import org.activiti.bpmn.model.Event; -import org.activiti.engine.ProcessEngine; +import org.activiti.bpmn.model.TimerEventDefinition; import org.activiti.engine.delegate.event.ActivitiEvent; import org.activiti.engine.impl.persistence.entity.TimerJobEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component -public class ScheduledTimerRetryListener extends AbstractTimerRetryListener { +public class ScheduledTimerRetryListener extends AbstractTimerRetryListener { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - @Autowired - private ProcessEngine services; + + @Value("${inteligr8.timerExtension.recordAttemptOnScheduling:true}") + private boolean recordAttemptOnScheduling; @Override - public void onTimerEvent(ActivitiEvent event, TimerJobEntity job, String timerId, Event modelEvent, int maxRepeats, int attempts) { - this.logger.trace("onTimerEvent({}, {}, {}, {}, {}, {})", event.getExecutionId(), job.getId(), timerId, modelEvent.getId(), maxRepeats, attempts); + public void onTimerExhaustion(ActivitiEvent event, Event modelEvent, TimerJobEntity job) { + this.logger.trace("onTimerExhaustion({}, {}, {})", event.getExecutionId(), modelEvent.getId(), job.getId()); - if (maxRepeats < 0) { - this.logger.trace("Timer '{}' scheduled for attempt {}: {}", timerId, attempts+1, event.getProcessInstanceId()); - } else if (attempts > maxRepeats) { - this.logger.debug("Timer '{}' repeats exhausted: {}", timerId, event.getProcessInstanceId()); - if (modelEvent instanceof BoundaryEvent) { - this.logger.debug("Cancelling timer '{}' schedule; will not retry any more: {}", timerId, event.getProcessInstanceId()); - this.services.getManagementService().deleteTimerJob(job.getId()); - } else { - this.logger.debug("Moving timer '{}' schedule job to deadletter; will not automatically retry any more: {}", timerId, event.getProcessInstanceId()); - this.services.getManagementService().moveJobToDeadLetterJob(job.getId()); - } + if (event instanceof BoundaryEvent) { + this.logger.info("Scheduled timer cancelled; will not retry any more: {}", event.getProcessInstanceId()); + this.services.getManagementService().deleteTimerJob(job.getId()); } else { - this.logger.trace("Timer '{}' scheduled for attempt {} of {}: {}", timerId, attempts+1, maxRepeats+1, event.getProcessInstanceId()); + this.logger.info("Scheduled timer cancelled and moved to deadletter; will not automatically retry any more: {}", event.getProcessInstanceId()); + this.services.getManagementService().moveJobToDeadLetterJob(job.getId()); + } + } + + @Override + public void onTimerAttempt(ActivitiEvent event, TimerJobEntity job, TimerEventDefinition timerDef, int attempts) { + this.logger.trace("onTimerAttempt({}, {}, {}, {})", event.getExecutionId(), job.getId(), timerDef.getId(), attempts); + + if (this.recordAttemptOnScheduling) { + attempts++; + this.setAttempts(job, timerDef.getId(), attempts); + this.logger.debug("Scheduled timer incremented attempts: {}: {}: {}", event.getProcessInstanceId(), timerDef.getId(), attempts); } } diff --git a/src/main/java/com/inteligr8/activiti/timer/TimerExtension.java b/src/main/java/com/inteligr8/activiti/timer/TimerExtension.java index 741327f..840482d 100644 --- a/src/main/java/com/inteligr8/activiti/timer/TimerExtension.java +++ b/src/main/java/com/inteligr8/activiti/timer/TimerExtension.java @@ -1,3 +1,17 @@ +/* + * This program is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see . + */ package com.inteligr8.activiti.timer; import javax.annotation.PostConstruct; @@ -5,6 +19,7 @@ import javax.annotation.PostConstruct; import org.activiti.engine.ProcessEngine; import org.activiti.engine.delegate.event.ActivitiEventType; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component @@ -19,14 +34,33 @@ public class TimerExtension { @Autowired private FiredTimerRetryListener firedListener; + /** + * By default, the attempt is incremented when the timer expires/fires. + * That listener doesn't get called until the end of the regular execution. + * If that execution ends the process instance, then setting a variable + * results in wild SQL exceptions. That is an Activiti bug. + * + * The bug results in a rollback to the timer, eventually reaching the + * deadletter state. Setting this to true causes the increment to happen + * when the timer is scheduled, not expired/fired. This means the attempt + * is recorded early, but we don't run into the bug. + * + * Setting it to `false` causes the increment to happen after the attempt, + * which may also be undesirable; and also must skirt the Activiti bug. + */ + @Value("${inteligr8.timerExtension.recordAttemptOnScheduling:true}") + private boolean recordAttemptOnScheduling; + @PostConstruct public void registerListeners() { this.services.getRuntimeService().addEventListener( this.scheduledListener, ActivitiEventType.TIMER_SCHEDULED); - this.services.getRuntimeService().addEventListener( - this.firedListener, - ActivitiEventType.TIMER_FIRED); + if (!this.recordAttemptOnScheduling) { + this.services.getRuntimeService().addEventListener( + this.firedListener, + ActivitiEventType.TIMER_FIRED); + } } }