ALF-10947 Fixed issue where repeating JBPM timer was causing an infitinte loop if an exception was thrown within the timer event.

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@32063 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
N Smith
2011-11-17 15:38:46 +00:00
parent 7f7224c2f5
commit 645913c9bf
3 changed files with 339 additions and 144 deletions

View File

@@ -27,6 +27,7 @@ import java.util.Date;
import org.alfresco.repo.lock.LockAcquisitionException;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork;
import org.alfresco.repo.transaction.DoNotRetryException;
import org.alfresco.repo.transaction.RetryingTransactionHelper;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.namespace.NamespaceService;
@@ -39,9 +40,10 @@ import org.jbpm.JbpmContext;
import org.jbpm.db.JobSession;
import org.jbpm.job.Job;
import org.jbpm.job.executor.JobExecutorThread;
import org.jbpm.persistence.JbpmPersistenceException;
import org.jbpm.svc.Services;
import org.jbpm.taskmgmt.exe.TaskInstance;
/**
* Alfresco Job Executor Thread
*
@@ -49,39 +51,43 @@ import org.jbpm.taskmgmt.exe.TaskInstance;
*/
public class AlfrescoJobExecutorThread extends JobExecutorThread
{
/** The name of the lock used to ensure that job executor does not run on more than one node at the same time. */
private static final QName LOCK_QNAME = QName.createQName(NamespaceService.SYSTEM_MODEL_1_0_URI,
"AlfrescoJbpmJobExecutor");
private static Log logger = LogFactory.getLog(AlfrescoJobExecutorThread.class);
/**
* The name of the lock used to ensure that job executor does not run on
* more than one node at the same time.
*/
private static final QName LOCK_QNAME = QName.createQName(NamespaceService.SYSTEM_MODEL_1_0_URI,
"AlfrescoJbpmJobExecutor");
private static Log logger = LogFactory.getLog(AlfrescoJobExecutorThread.class);
private AlfrescoJobExecutor alfrescoJobExecutor;
private boolean isActive = true;
private long jbpmMaxLockTime;
private long jobLockTTL = 0;
private String jobLockToken = null;
private JbpmConfiguration jbpmConfiguration;
private boolean isActive = true;
private long jbpmMaxLockTime;
private long jobLockTTL = 0;
private String jobLockToken = null;
private JbpmConfiguration jbpmConfiguration;
@Override
public void setActive(boolean isActive)
{
this.isActive = isActive;
}
/**
* Constructor
*/
public AlfrescoJobExecutorThread(String name, AlfrescoJobExecutor jobExecutor, JbpmConfiguration jbpmConfiguration, int idleInterval, int maxIdleInterval, long maxLockTime, int maxHistory)
public AlfrescoJobExecutorThread(String name, AlfrescoJobExecutor jobExecutor, JbpmConfiguration jbpmConfiguration,
int idleInterval, int maxIdleInterval, long maxLockTime, int maxHistory)
{
super(name, jobExecutor, jbpmConfiguration, idleInterval, maxIdleInterval, maxLockTime, maxHistory);
this.alfrescoJobExecutor = jobExecutor;
this.jbpmMaxLockTime = maxLockTime;
this.jobLockTTL = jbpmMaxLockTime+(1000 * 60 * 10);
this.jobLockTTL = jbpmMaxLockTime + (1000 * 60 * 10);
this.jbpmConfiguration = jbpmConfiguration;
}
@@ -90,50 +96,52 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
protected Collection acquireJobs()
{
Collection jobs = Collections.EMPTY_LIST;
if ((isActive) && (! alfrescoJobExecutor.getTransactionService().isReadOnly()))
if ((isActive) && (!alfrescoJobExecutor.getTransactionService().isReadOnly()))
{
try
{
jobs = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper().doInTransaction(
new RetryingTransactionHelper.RetryingTransactionCallback<Collection>() {
public Collection execute() throws Throwable
jobs = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper()
.doInTransaction(new RetryingTransactionHelper.RetryingTransactionCallback<Collection>()
{
if (jobLockToken != null)
public Collection execute() throws Throwable
{
refreshExecutorLock(jobLockToken);
if (jobLockToken != null)
{
refreshExecutorLock(jobLockToken);
}
else
{
jobLockToken = getExecutorLock();
}
try
{
return AlfrescoJobExecutorThread.super.acquireJobs();
}
catch (Throwable t)
{
logger.error("Failed to acquire jobs");
releaseExecutorLock(jobLockToken);
jobLockToken = null;
throw t;
}
}
else
{
jobLockToken = getExecutorLock();
}
try
{
return AlfrescoJobExecutorThread.super.acquireJobs();
}
catch (Throwable t)
{
logger.error("Failed to acquire jobs");
releaseExecutorLock(jobLockToken);
jobLockToken = null;
throw t;
}
}
});
});
if (jobs != null)
{
if (logger.isDebugEnabled() && (! logger.isTraceEnabled()) && (! jobs.isEmpty()))
if (logger.isDebugEnabled() && (!logger.isTraceEnabled()) && (!jobs.isEmpty()))
{
logger.debug("acquired "+jobs.size()+" job"+((jobs.size() != 1) ? "s" : ""));
logger.debug("acquired " + jobs.size() + " job" + ((jobs.size() != 1) ? "s" : ""));
}
if (logger.isTraceEnabled())
{
logger.trace("acquired "+jobs.size()+" job"+((jobs.size() != 1) ? "s" : "")+((jobs.size() > 0) ? ": "+jobs.toString() : ""));
logger.trace("acquired " + jobs.size() + " job" + ((jobs.size() != 1) ? "s" : "")
+ ((jobs.size() > 0) ? ": " + jobs.toString() : ""));
}
if (jobs.size() == 0)
{
releaseExecutorLock(jobLockToken);
@@ -147,7 +155,7 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
jobLockToken = null;
}
}
return jobs;
}
@@ -158,16 +166,17 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
{
return null;
}
return alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper().doInTransaction(
new RetryingTransactionHelper.RetryingTransactionCallback<Date>() {
public Date execute() throws Throwable
return alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper()
.doInTransaction(new RetryingTransactionHelper.RetryingTransactionCallback<Date>()
{
return AlfrescoJobExecutorThread.super.getNextDueDate();
}
}, true);
public Date execute() throws Throwable
{
return AlfrescoJobExecutorThread.super.getNextDueDate();
}
}, true);
}
/**
* {@inheritDoc}
*/
@@ -184,15 +193,15 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
return null;
}
}, getActorId(jobIn));
// clear authentication context for this thread
AuthenticationUtil.clearCurrentSecurityContext();
}
private String getActorId(final Job jobIn)
{
TaskInstance taskInstance = jobIn.getTaskInstance();
if (taskInstance != null)
{
String actorId = taskInstance.getActorId();
@@ -203,19 +212,19 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
}
return AuthenticationUtil.getSystemUserName();
}
private void executeJobImpl(final Job jobIn)
{
if ((!isActive) || (alfrescoJobExecutor.getTransactionService().isReadOnly()))
{
return;
}
// based on JBPM 3.3.1 (JobExecutorThread.executeJob)
// - wrap executeJob / deleteJob in Alfresco retries
// - add setRollbackOnly warnings
// - if Alfresco retries fail, attempt to set JBPM job exception/retries
try
{
RetryingTransactionHelper tranHelper = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper();
@@ -228,35 +237,45 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
{
JobSession jobSession = jbpmContext.getJobSession();
Job job = jobSession.loadJob(jobIn.getId());
if (logger.isTraceEnabled())
{
logger.trace("executing " + job);
}
if (job.execute(jbpmContext))
{
jobSession.deleteJob(job);
if (logger.isDebugEnabled())
{
{
logger.debug("executed and deleted: " + job);
}
}
// if this job is locked too long
long totalLockTimeInMillis = System.currentTimeMillis() - job.getLockTime().getTime();
if (totalLockTimeInMillis>jbpmMaxLockTime)
if (totalLockTimeInMillis > jbpmMaxLockTime)
{
logger.warn("setRollbackOnly: exceeded maxLockTime ("+jbpmMaxLockTime+") " + job);
logger.warn("setRollbackOnly: exceeded maxLockTime (" + jbpmMaxLockTime + ") " + job);
jbpmContext.setRollbackOnly();
}
}
}
catch(Exception e)
{
if(isPersistenceException(e))
{
throw new AlfrescoJbpmPersistenceException(e);
}
else
{
throw e;
}
}
finally
{
jbpmContext.close();
}
return null;
}
});
@@ -266,77 +285,93 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
// ignore
jobLockToken = null;
}
catch (Exception e)
catch(JbpmPersistenceException pe)
{
if(Services.isCausedByStaleState(pe))
{
if (logger.isDebugEnabled())
{
logger.debug("optimistic locking failed, couldn't complete job "+ jobIn, pe);
}
}
else handleException(jobIn, pe);
}
catch (final Exception e)
{
handleException(jobIn, e);
}
}
private void handleException(final Job jobIn, final Exception e)
{
if (logger.isErrorEnabled())
{
logger.error("failed to execute " + jobIn, e);
}
try
{
RetryingTransactionHelper tranHelper = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper();
tranHelper.doInTransaction(new RetryingTransactionCallback<Object>()
{
public Object execute() throws Throwable
{
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
try
{
JobSession jobSession = jbpmContext.getJobSession();
Job job = jobSession.loadJob(jobIn.getId());
StringWriter memoryWriter = new StringWriter();
e.printStackTrace(new PrintWriter(memoryWriter));
job.setException(memoryWriter.toString());
if (logger.isDebugEnabled())
{
logger.debug("attempting to update exception/retries: " + job);
}
int retries = 0;
if (isPersistenceException(e) ==false)
{
retries = job.getRetries() -1;
}
job.setRetries(retries);
if (logger.isInfoEnabled())
{
String msg = "updated job exception and set to " + job.getRetries() + " retries: " + jobIn;
logger.info(msg);
}
}
finally
{
jbpmContext.close();
}
return null;
}
});
}
catch (Exception e2)
{
if (logger.isErrorEnabled())
{
logger.error("failed to execute " + jobIn, e);
}
if (!isPersistenceException(e))
{
try
{
final StringWriter memoryWriter = new StringWriter();
e.printStackTrace(new PrintWriter(memoryWriter));
RetryingTransactionHelper tranHelper = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper();
tranHelper.doInTransaction(new RetryingTransactionCallback<Object>()
{
public Object execute() throws Throwable
{
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
try
{
JobSession jobSession = jbpmContext.getJobSession();
final Job job = jobSession.loadJob(jobIn.getId());
if (logger.isDebugEnabled())
{
logger.debug("attempting to update exception/retries: " + job);
}
job.setException(memoryWriter.toString());
job.setRetries(job.getRetries()-1);
if (logger.isInfoEnabled())
{
logger.info("updated job exception and set to "+job.getRetries()+ " retries: " + jobIn);
}
}
finally
{
jbpmContext.close();
}
return null;
}
});
}
catch (Exception e2)
{
if (logger.isErrorEnabled())
{
logger.error("failed to update job exception/retries " + jobIn, e2);
}
}
logger.error("failed to update job exception/retries " + jobIn, e2);
}
}
}
private String getExecutorLock()
{
String lockToken = null;
if (alfrescoJobExecutor.getJobExecutorLockEnabled())
{
try
{
lockToken = alfrescoJobExecutor.getJobLockService().getLock(LOCK_QNAME, jobLockTTL, 3000, 10);
if (logger.isTraceEnabled())
{
logger.trace(Thread.currentThread().getName()+" got lock token: "+lockToken);
logger.trace(Thread.currentThread().getName() + " got lock token: " + lockToken);
}
}
catch (LockAcquisitionException e)
@@ -348,10 +383,10 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
throw e;
}
}
return lockToken;
}
private void refreshExecutorLock(String lockToken)
{
if (lockToken != null)
@@ -359,23 +394,24 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
try
{
alfrescoJobExecutor.getJobLockService().refreshLock(lockToken, LOCK_QNAME, jobLockTTL);
if (logger.isTraceEnabled())
{
logger.trace(Thread.currentThread().getName()+" refreshed lock token: "+lockToken);
logger.trace(Thread.currentThread().getName() + " refreshed lock token: " + lockToken);
}
}
catch (LockAcquisitionException e)
{
if (logger.isTraceEnabled())
{
logger.trace("Failed to refresh Alfresco Job Executor lock - may no longer exist ("+lockToken+")");
logger.trace("Failed to refresh Alfresco Job Executor lock - may no longer exist (" + lockToken
+ ")");
}
throw e;
}
}
}
private void releaseExecutorLock(String lockToken)
{
if (lockToken != null)
@@ -383,24 +419,25 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
try
{
alfrescoJobExecutor.getJobLockService().releaseLock(lockToken, LOCK_QNAME);
if (logger.isTraceEnabled())
{
logger.trace(Thread.currentThread().getName()+" released lock token: "+lockToken);
logger.trace(Thread.currentThread().getName() + " released lock token: " + lockToken);
}
}
catch (LockAcquisitionException e)
{
if (logger.isTraceEnabled())
{
logger.trace("Failed to release Alfresco Job Executor lock - may no longer exist ("+lockToken+")");
logger.trace("Failed to release Alfresco Job Executor lock - may no longer exist (" + lockToken
+ ")");
}
throw e;
}
}
}
private boolean isPersistenceException(Throwable throwable)
private boolean isPersistenceException(Throwable throwable)
{
do
{
@@ -409,9 +446,19 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
return true;
}
throwable = throwable.getCause();
}
}
while (throwable != null);
return false;
return false;
}
public static class AlfrescoJbpmPersistenceException extends Exception implements DoNotRetryException
{
private static final long serialVersionUID = -2233119713831272158L;
public AlfrescoJbpmPersistenceException(Throwable cause) {
super(cause);
}
}
}

View File

@@ -0,0 +1,113 @@
/*
* Copyright (C) 2005-2010 Alfresco Software Limited.
*
* This file is part of Alfresco
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
*/
package org.alfresco.repo.workflow.jbpm;
import java.io.InputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.alfresco.repo.content.MimetypeMap;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.workflow.WorkflowAdminServiceImpl;
import org.alfresco.repo.workflow.WorkflowModel;
import org.alfresco.repo.workflow.WorkflowTestHelper;
import org.alfresco.service.ServiceRegistry;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.workflow.WorkflowDeployment;
import org.alfresco.service.cmr.workflow.WorkflowPath;
import org.alfresco.service.cmr.workflow.WorkflowService;
import org.alfresco.service.cmr.workflow.WorkflowTask;
import org.alfresco.service.namespace.QName;
import org.alfresco.util.ApplicationContextHelper;
import org.hibernate.HibernateException;
import org.springframework.context.ApplicationContext;
import junit.framework.TestCase;
/**
* @author Nick Smith
* @since 4.0
*
*/
public class JbpmTimerExceptionTest extends TestCase
{
private static final String defLocation = "jbpmresources/test_timerException.xml";
private static final String defName = "jbpm$wf:testTimerException";
private WorkflowService workflowService;
private WorkflowTestHelper testHelper;
private String defId;
public void testTimerException() throws Exception
{
NodeRef pckg = workflowService.createPackage(null);
Map<QName, Serializable> params = new HashMap<QName, Serializable>();
params.put(WorkflowModel.ASSOC_PACKAGE, pckg);
params.put(WorkflowModel.ASSOC_ASSIGNEE, AuthenticationUtil.getAdminUserName());
WorkflowPath path = workflowService.startWorkflow(defId, params);
String instanceId = path.getInstance().getId();
WorkflowTask start = workflowService.getStartTask(instanceId);
workflowService.endTask(start.getId(), null);
Thread.sleep(30000);
System.out.println("Done!");
}
@Override
protected void setUp() throws Exception
{
ApplicationContext ctx = ApplicationContextHelper.getApplicationContext();
ServiceRegistry services = (ServiceRegistry) ctx.getBean(ServiceRegistry.SERVICE_REGISTRY);
workflowService = services.getWorkflowService();
WorkflowAdminServiceImpl adminService = (WorkflowAdminServiceImpl) ctx.getBean(WorkflowAdminServiceImpl.NAME);
AuthenticationUtil.setAdminUserAsFullyAuthenticatedUser();
testHelper = new WorkflowTestHelper(adminService, JBPMEngine.ENGINE_ID, false);
testHelper.setVisible(true);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
InputStream input = classLoader.getResourceAsStream(defLocation);
if(workflowService.isDefinitionDeployed(JBPMEngine.ENGINE_ID, input, MimetypeMap.MIMETYPE_XML) == false)
{
input = classLoader.getResourceAsStream(defLocation);
WorkflowDeployment deployment= workflowService.deployDefinition(JBPMEngine.ENGINE_ID, input, MimetypeMap.MIMETYPE_XML);
defId = deployment.getDefinition().getId();
}
else
{
defId = workflowService.getDefinitionByName(defName).getId();
}
}
@Override
protected void tearDown() throws Exception
{
workflowService.undeployDefinition(defId);
testHelper.tearDown();
AuthenticationUtil.clearCurrentSecurityContext();
}
public static void throwException() throws HibernateException
{
throw new HibernateException("My Timer Exception");
}
}