diff --git a/source/java/org/alfresco/repo/workflow/jbpm/AlfrescoJobExecutorThread.java b/source/java/org/alfresco/repo/workflow/jbpm/AlfrescoJobExecutorThread.java index 5913ae8b9f..8fb1ec9047 100644 --- a/source/java/org/alfresco/repo/workflow/jbpm/AlfrescoJobExecutorThread.java +++ b/source/java/org/alfresco/repo/workflow/jbpm/AlfrescoJobExecutorThread.java @@ -27,6 +27,7 @@ import java.util.Date; import org.alfresco.repo.lock.LockAcquisitionException; import org.alfresco.repo.security.authentication.AuthenticationUtil; import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork; +import org.alfresco.repo.transaction.DoNotRetryException; import org.alfresco.repo.transaction.RetryingTransactionHelper; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.namespace.NamespaceService; @@ -39,9 +40,10 @@ import org.jbpm.JbpmContext; import org.jbpm.db.JobSession; import org.jbpm.job.Job; import org.jbpm.job.executor.JobExecutorThread; +import org.jbpm.persistence.JbpmPersistenceException; +import org.jbpm.svc.Services; import org.jbpm.taskmgmt.exe.TaskInstance; - /** * Alfresco Job Executor Thread * @@ -49,39 +51,43 @@ import org.jbpm.taskmgmt.exe.TaskInstance; */ public class AlfrescoJobExecutorThread extends JobExecutorThread { - /** The name of the lock used to ensure that job executor does not run on more than one node at the same time. */ - private static final QName LOCK_QNAME = QName.createQName(NamespaceService.SYSTEM_MODEL_1_0_URI, - "AlfrescoJbpmJobExecutor"); - - private static Log logger = LogFactory.getLog(AlfrescoJobExecutorThread.class); - + /** + * The name of the lock used to ensure that job executor does not run on + * more than one node at the same time. + */ + private static final QName LOCK_QNAME = QName.createQName(NamespaceService.SYSTEM_MODEL_1_0_URI, + "AlfrescoJbpmJobExecutor"); + + private static Log logger = LogFactory.getLog(AlfrescoJobExecutorThread.class); + private AlfrescoJobExecutor alfrescoJobExecutor; - private boolean isActive = true; - - private long jbpmMaxLockTime; - - private long jobLockTTL = 0; - private String jobLockToken = null; - - private JbpmConfiguration jbpmConfiguration; - + private boolean isActive = true; + + private long jbpmMaxLockTime; + + private long jobLockTTL = 0; + private String jobLockToken = null; + + private JbpmConfiguration jbpmConfiguration; + @Override public void setActive(boolean isActive) { this.isActive = isActive; } - + /** * Constructor */ - public AlfrescoJobExecutorThread(String name, AlfrescoJobExecutor jobExecutor, JbpmConfiguration jbpmConfiguration, int idleInterval, int maxIdleInterval, long maxLockTime, int maxHistory) + public AlfrescoJobExecutorThread(String name, AlfrescoJobExecutor jobExecutor, JbpmConfiguration jbpmConfiguration, + int idleInterval, int maxIdleInterval, long maxLockTime, int maxHistory) { super(name, jobExecutor, jbpmConfiguration, idleInterval, maxIdleInterval, maxLockTime, maxHistory); this.alfrescoJobExecutor = jobExecutor; this.jbpmMaxLockTime = maxLockTime; - - this.jobLockTTL = jbpmMaxLockTime+(1000 * 60 * 10); - + + this.jobLockTTL = jbpmMaxLockTime + (1000 * 60 * 10); + this.jbpmConfiguration = jbpmConfiguration; } @@ -90,50 +96,52 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread protected Collection acquireJobs() { Collection jobs = Collections.EMPTY_LIST; - - if ((isActive) && (! alfrescoJobExecutor.getTransactionService().isReadOnly())) + + if ((isActive) && (!alfrescoJobExecutor.getTransactionService().isReadOnly())) { try { - jobs = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper().doInTransaction( - new RetryingTransactionHelper.RetryingTransactionCallback() { - public Collection execute() throws Throwable + jobs = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper() + .doInTransaction(new RetryingTransactionHelper.RetryingTransactionCallback() { - if (jobLockToken != null) + public Collection execute() throws Throwable { - refreshExecutorLock(jobLockToken); + if (jobLockToken != null) + { + refreshExecutorLock(jobLockToken); + } + else + { + jobLockToken = getExecutorLock(); + } + + try + { + return AlfrescoJobExecutorThread.super.acquireJobs(); + } + catch (Throwable t) + { + logger.error("Failed to acquire jobs"); + releaseExecutorLock(jobLockToken); + jobLockToken = null; + throw t; + } } - else - { - jobLockToken = getExecutorLock(); - } - - try - { - return AlfrescoJobExecutorThread.super.acquireJobs(); - } - catch (Throwable t) - { - logger.error("Failed to acquire jobs"); - releaseExecutorLock(jobLockToken); - jobLockToken = null; - throw t; - } - } - }); - + }); + if (jobs != null) { - if (logger.isDebugEnabled() && (! logger.isTraceEnabled()) && (! jobs.isEmpty())) + if (logger.isDebugEnabled() && (!logger.isTraceEnabled()) && (!jobs.isEmpty())) { - logger.debug("acquired "+jobs.size()+" job"+((jobs.size() != 1) ? "s" : "")); + logger.debug("acquired " + jobs.size() + " job" + ((jobs.size() != 1) ? "s" : "")); } - + if (logger.isTraceEnabled()) { - logger.trace("acquired "+jobs.size()+" job"+((jobs.size() != 1) ? "s" : "")+((jobs.size() > 0) ? ": "+jobs.toString() : "")); + logger.trace("acquired " + jobs.size() + " job" + ((jobs.size() != 1) ? "s" : "") + + ((jobs.size() > 0) ? ": " + jobs.toString() : "")); } - + if (jobs.size() == 0) { releaseExecutorLock(jobLockToken); @@ -147,7 +155,7 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread jobLockToken = null; } } - + return jobs; } @@ -158,16 +166,17 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread { return null; } - - return alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper().doInTransaction( - new RetryingTransactionHelper.RetryingTransactionCallback() { - public Date execute() throws Throwable + + return alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper() + .doInTransaction(new RetryingTransactionHelper.RetryingTransactionCallback() { - return AlfrescoJobExecutorThread.super.getNextDueDate(); - } - }, true); + public Date execute() throws Throwable + { + return AlfrescoJobExecutorThread.super.getNextDueDate(); + } + }, true); } - + /** * {@inheritDoc} */ @@ -184,15 +193,15 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread return null; } }, getActorId(jobIn)); - + // clear authentication context for this thread AuthenticationUtil.clearCurrentSecurityContext(); } - + private String getActorId(final Job jobIn) { TaskInstance taskInstance = jobIn.getTaskInstance(); - + if (taskInstance != null) { String actorId = taskInstance.getActorId(); @@ -203,19 +212,19 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread } return AuthenticationUtil.getSystemUserName(); } - + private void executeJobImpl(final Job jobIn) { if ((!isActive) || (alfrescoJobExecutor.getTransactionService().isReadOnly())) { return; } - + // based on JBPM 3.3.1 (JobExecutorThread.executeJob) // - wrap executeJob / deleteJob in Alfresco retries // - add setRollbackOnly warnings // - if Alfresco retries fail, attempt to set JBPM job exception/retries - + try { RetryingTransactionHelper tranHelper = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper(); @@ -228,35 +237,45 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread { JobSession jobSession = jbpmContext.getJobSession(); Job job = jobSession.loadJob(jobIn.getId()); - + if (logger.isTraceEnabled()) { logger.trace("executing " + job); } - + if (job.execute(jbpmContext)) { jobSession.deleteJob(job); - + if (logger.isDebugEnabled()) - { + { logger.debug("executed and deleted: " + job); } } - + // if this job is locked too long long totalLockTimeInMillis = System.currentTimeMillis() - job.getLockTime().getTime(); - if (totalLockTimeInMillis>jbpmMaxLockTime) + if (totalLockTimeInMillis > jbpmMaxLockTime) { - logger.warn("setRollbackOnly: exceeded maxLockTime ("+jbpmMaxLockTime+") " + job); + logger.warn("setRollbackOnly: exceeded maxLockTime (" + jbpmMaxLockTime + ") " + job); jbpmContext.setRollbackOnly(); } - } + } + catch(Exception e) + { + if(isPersistenceException(e)) + { + throw new AlfrescoJbpmPersistenceException(e); + } + else + { + throw e; + } + } finally { jbpmContext.close(); } - return null; } }); @@ -266,77 +285,93 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread // ignore jobLockToken = null; } - catch (Exception e) + catch(JbpmPersistenceException pe) + { + if(Services.isCausedByStaleState(pe)) + { + if (logger.isDebugEnabled()) + { + logger.debug("optimistic locking failed, couldn't complete job "+ jobIn, pe); + } + } + else handleException(jobIn, pe); + } + catch (final Exception e) + { + handleException(jobIn, e); + } + } + + private void handleException(final Job jobIn, final Exception e) + { + + if (logger.isErrorEnabled()) + { + logger.error("failed to execute " + jobIn, e); + } + try + { + RetryingTransactionHelper tranHelper = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper(); + tranHelper.doInTransaction(new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext(); + try + { + JobSession jobSession = jbpmContext.getJobSession(); + Job job = jobSession.loadJob(jobIn.getId()); + + StringWriter memoryWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(memoryWriter)); + job.setException(memoryWriter.toString()); + if (logger.isDebugEnabled()) + { + logger.debug("attempting to update exception/retries: " + job); + } + int retries = 0; + if (isPersistenceException(e) ==false) + { + retries = job.getRetries() -1; + } + job.setRetries(retries); + if (logger.isInfoEnabled()) + { + String msg = "updated job exception and set to " + job.getRetries() + " retries: " + jobIn; + logger.info(msg); + } + } + finally + { + jbpmContext.close(); + } + + return null; + } + }); + } + catch (Exception e2) { if (logger.isErrorEnabled()) { - logger.error("failed to execute " + jobIn, e); - } - - if (!isPersistenceException(e)) - { - try - { - final StringWriter memoryWriter = new StringWriter(); - e.printStackTrace(new PrintWriter(memoryWriter)); - - RetryingTransactionHelper tranHelper = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper(); - tranHelper.doInTransaction(new RetryingTransactionCallback() - { - public Object execute() throws Throwable - { - JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext(); - try - { - JobSession jobSession = jbpmContext.getJobSession(); - final Job job = jobSession.loadJob(jobIn.getId()); - - if (logger.isDebugEnabled()) - { - logger.debug("attempting to update exception/retries: " + job); - } - - job.setException(memoryWriter.toString()); - job.setRetries(job.getRetries()-1); - - if (logger.isInfoEnabled()) - { - logger.info("updated job exception and set to "+job.getRetries()+ " retries: " + jobIn); - } - } - finally - { - jbpmContext.close(); - } - - return null; - } - }); - } - catch (Exception e2) - { - if (logger.isErrorEnabled()) - { - logger.error("failed to update job exception/retries " + jobIn, e2); - } - } + logger.error("failed to update job exception/retries " + jobIn, e2); } } } - + private String getExecutorLock() { String lockToken = null; - + if (alfrescoJobExecutor.getJobExecutorLockEnabled()) { try { lockToken = alfrescoJobExecutor.getJobLockService().getLock(LOCK_QNAME, jobLockTTL, 3000, 10); - + if (logger.isTraceEnabled()) { - logger.trace(Thread.currentThread().getName()+" got lock token: "+lockToken); + logger.trace(Thread.currentThread().getName() + " got lock token: " + lockToken); } } catch (LockAcquisitionException e) @@ -348,10 +383,10 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread throw e; } } - + return lockToken; } - + private void refreshExecutorLock(String lockToken) { if (lockToken != null) @@ -359,23 +394,24 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread try { alfrescoJobExecutor.getJobLockService().refreshLock(lockToken, LOCK_QNAME, jobLockTTL); - + if (logger.isTraceEnabled()) { - logger.trace(Thread.currentThread().getName()+" refreshed lock token: "+lockToken); + logger.trace(Thread.currentThread().getName() + " refreshed lock token: " + lockToken); } } catch (LockAcquisitionException e) { if (logger.isTraceEnabled()) { - logger.trace("Failed to refresh Alfresco Job Executor lock - may no longer exist ("+lockToken+")"); + logger.trace("Failed to refresh Alfresco Job Executor lock - may no longer exist (" + lockToken + + ")"); } throw e; } } } - + private void releaseExecutorLock(String lockToken) { if (lockToken != null) @@ -383,24 +419,25 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread try { alfrescoJobExecutor.getJobLockService().releaseLock(lockToken, LOCK_QNAME); - + if (logger.isTraceEnabled()) { - logger.trace(Thread.currentThread().getName()+" released lock token: "+lockToken); + logger.trace(Thread.currentThread().getName() + " released lock token: " + lockToken); } } catch (LockAcquisitionException e) { if (logger.isTraceEnabled()) { - logger.trace("Failed to release Alfresco Job Executor lock - may no longer exist ("+lockToken+")"); + logger.trace("Failed to release Alfresco Job Executor lock - may no longer exist (" + lockToken + + ")"); } throw e; } } } - - private boolean isPersistenceException(Throwable throwable) + + private boolean isPersistenceException(Throwable throwable) { do { @@ -409,9 +446,19 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread return true; } throwable = throwable.getCause(); - } + } while (throwable != null); - return false; + return false; } + + public static class AlfrescoJbpmPersistenceException extends Exception implements DoNotRetryException + { + private static final long serialVersionUID = -2233119713831272158L; + + public AlfrescoJbpmPersistenceException(Throwable cause) { + super(cause); + } + } + } diff --git a/source/java/org/alfresco/repo/workflow/jbpm/JbpmTimerExceptionTest.java b/source/java/org/alfresco/repo/workflow/jbpm/JbpmTimerExceptionTest.java new file mode 100644 index 0000000000..0e6553f385 --- /dev/null +++ b/source/java/org/alfresco/repo/workflow/jbpm/JbpmTimerExceptionTest.java @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2005-2010 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.workflow.jbpm; + +import java.io.InputStream; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.alfresco.repo.content.MimetypeMap; +import org.alfresco.repo.security.authentication.AuthenticationUtil; +import org.alfresco.repo.workflow.WorkflowAdminServiceImpl; +import org.alfresco.repo.workflow.WorkflowModel; +import org.alfresco.repo.workflow.WorkflowTestHelper; +import org.alfresco.service.ServiceRegistry; +import org.alfresco.service.cmr.repository.NodeRef; +import org.alfresco.service.cmr.workflow.WorkflowDeployment; +import org.alfresco.service.cmr.workflow.WorkflowPath; +import org.alfresco.service.cmr.workflow.WorkflowService; +import org.alfresco.service.cmr.workflow.WorkflowTask; +import org.alfresco.service.namespace.QName; +import org.alfresco.util.ApplicationContextHelper; +import org.hibernate.HibernateException; +import org.springframework.context.ApplicationContext; + +import junit.framework.TestCase; + +/** + * @author Nick Smith + * @since 4.0 + * + */ +public class JbpmTimerExceptionTest extends TestCase +{ + private static final String defLocation = "jbpmresources/test_timerException.xml"; + private static final String defName = "jbpm$wf:testTimerException"; + + private WorkflowService workflowService; + private WorkflowTestHelper testHelper; + private String defId; + + public void testTimerException() throws Exception + { + NodeRef pckg = workflowService.createPackage(null); + Map params = new HashMap(); + params.put(WorkflowModel.ASSOC_PACKAGE, pckg); + params.put(WorkflowModel.ASSOC_ASSIGNEE, AuthenticationUtil.getAdminUserName()); + + WorkflowPath path = workflowService.startWorkflow(defId, params); + String instanceId = path.getInstance().getId(); + WorkflowTask start = workflowService.getStartTask(instanceId); + workflowService.endTask(start.getId(), null); + Thread.sleep(30000); + System.out.println("Done!"); + } + + @Override + protected void setUp() throws Exception + { + ApplicationContext ctx = ApplicationContextHelper.getApplicationContext(); + ServiceRegistry services = (ServiceRegistry) ctx.getBean(ServiceRegistry.SERVICE_REGISTRY); + workflowService = services.getWorkflowService(); + WorkflowAdminServiceImpl adminService = (WorkflowAdminServiceImpl) ctx.getBean(WorkflowAdminServiceImpl.NAME); + + AuthenticationUtil.setAdminUserAsFullyAuthenticatedUser(); + testHelper = new WorkflowTestHelper(adminService, JBPMEngine.ENGINE_ID, false); + testHelper.setVisible(true); + + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + InputStream input = classLoader.getResourceAsStream(defLocation); + if(workflowService.isDefinitionDeployed(JBPMEngine.ENGINE_ID, input, MimetypeMap.MIMETYPE_XML) == false) + { + input = classLoader.getResourceAsStream(defLocation); + WorkflowDeployment deployment= workflowService.deployDefinition(JBPMEngine.ENGINE_ID, input, MimetypeMap.MIMETYPE_XML); + defId = deployment.getDefinition().getId(); + + } + else + { + defId = workflowService.getDefinitionByName(defName).getId(); + } + } + + @Override + protected void tearDown() throws Exception + { + workflowService.undeployDefinition(defId); + testHelper.tearDown(); + AuthenticationUtil.clearCurrentSecurityContext(); + } + + public static void throwException() throws HibernateException + { + throw new HibernateException("My Timer Exception"); + } +} diff --git a/source/test-resources/jbpmresources/test_timerException.xml b/source/test-resources/jbpmresources/test_timerException.xml new file mode 100644 index 0000000000..6b5a79ce1a --- /dev/null +++ b/source/test-resources/jbpmresources/test_timerException.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + #{bpm_assignee} + + + + + + + + + + + + + + + + + + + + \ No newline at end of file