From 07a68c5fa6738457ab6b7672a05cf112210e1fde Mon Sep 17 00:00:00 2001 From: Derek Hulley Date: Wed, 10 Mar 2010 17:16:34 +0000 Subject: [PATCH] Merged BRANCHES/V3.2 to HEAD: 17409: ETHREEOH-1630 Webscript Kind=org.alfresco.repository.content.stream returns information about the Thumbnail webscript 17444: Update svn:mergeinfo 17940: Updated JBPM log level (GraphElement) to hide errors which should be handled by Alfresco retrying txn handler 17989: AVM - unreported NPE in OrphanRepear (clustered) 17997: WCM clustering - ensure Alfresco/JBPM job executor handles retries (from Alfresco retrying txn handler) git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@19197 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261 --- .../org/alfresco/repo/avm/OrphanReaper.java | 18 ++- .../jbpm/AlfrescoJobExecutorThread.java | 152 ++++++++++++++---- 2 files changed, 134 insertions(+), 36 deletions(-) diff --git a/source/java/org/alfresco/repo/avm/OrphanReaper.java b/source/java/org/alfresco/repo/avm/OrphanReaper.java index 7199fa5e77..f0f046867b 100644 --- a/source/java/org/alfresco/repo/avm/OrphanReaper.java +++ b/source/java/org/alfresco/repo/avm/OrphanReaper.java @@ -271,6 +271,12 @@ public class OrphanReaper return null; } AVMNode node = AVMDAOs.Instance().fAVMNodeDAO.getByID(fPurgeQueue.removeFirst()); + if (node == null) + { + // eg. cluster, multiple reapers + continue; + } + // Save away the ancestor and merged from fields from this node. HistoryLink hlink = AVMDAOs.Instance().fHistoryLinkDAO.getByDescendent(node); AVMNode ancestor = null; @@ -286,7 +292,7 @@ public class OrphanReaper mergedFrom = mlink.getMfrom(); AVMDAOs.Instance().fMergeLinkDAO.delete(mlink); } - + // Get all the nodes that have this node as ancestor. List links = AVMDAOs.Instance().fHistoryLinkDAO.getByAncestor(node); for (HistoryLink link : links) @@ -306,13 +312,13 @@ public class OrphanReaper link.getMto().setMergedFrom(ancestor); AVMDAOs.Instance().fMergeLinkDAO.delete(link); } - + // Get rid of all properties belonging to this node. - AVMDAOs.Instance().fAVMNodeDAO.deleteProperties(node.getId()); - + AVMDAOs.Instance().fAVMNodeDAO.deleteProperties(node.getId()); + // Get rid of all aspects belonging to this node. - AVMDAOs.Instance().fAVMNodeDAO.deleteAspects(node.getId()); - + AVMDAOs.Instance().fAVMNodeDAO.deleteAspects(node.getId()); + // Get rid of ACL. DbAccessControlList acl = node.getAcl(); node.setAcl(null); diff --git a/source/java/org/alfresco/repo/workflow/jbpm/AlfrescoJobExecutorThread.java b/source/java/org/alfresco/repo/workflow/jbpm/AlfrescoJobExecutorThread.java index 0d31f78272..c3165adcac 100644 --- a/source/java/org/alfresco/repo/workflow/jbpm/AlfrescoJobExecutorThread.java +++ b/source/java/org/alfresco/repo/workflow/jbpm/AlfrescoJobExecutorThread.java @@ -18,6 +18,8 @@ */ package org.alfresco.repo.workflow.jbpm; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -29,7 +31,10 @@ import org.alfresco.service.namespace.NamespaceService; import org.alfresco.service.namespace.QName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.hibernate.HibernateException; import org.jbpm.JbpmConfiguration; +import org.jbpm.JbpmContext; +import org.jbpm.db.JobSession; import org.jbpm.job.Job; import org.jbpm.job.executor.JobExecutorThread; @@ -55,6 +60,8 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread private long jobLockTTL = 0; private String jobLockToken = null; + private JbpmConfiguration jbpmConfiguration; + @Override public void setActive(boolean isActive) { @@ -71,6 +78,8 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread this.jbpmMaxLockTime = maxLockTime; this.jobLockTTL = jbpmMaxLockTime+(1000 * 60 * 10); + + this.jbpmConfiguration = jbpmConfiguration; } @SuppressWarnings("unchecked") @@ -160,22 +169,124 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread * {@inheritDoc} */ @Override - protected void executeJob(Job job) + protected void executeJob(final Job jobIn) { if ((!isActive) || (alfrescoJobExecutor.getTransactionService().isReadOnly())) { return; } + // based on JBPM 3.3.1 (JobExecutorThread.executeJob) + // - wrap executeJob / deleteJob in Alfresco retries + // - add setRollbackOnly warnings + // - if Alfresco retries fail, attempt to set JBPM job exception/retries + try { - alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper().doInTransaction(new TransactionJob(job)); + RetryingTransactionHelper tranHelper = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper(); + tranHelper.doInTransaction(new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext(); + try + { + JobSession jobSession = jbpmContext.getJobSession(); + Job job = jobSession.loadJob(jobIn.getId()); + + if (logger.isTraceEnabled()) + { + logger.trace("executing " + job); + } + + if (job.execute(jbpmContext)) + { + jobSession.deleteJob(job); + + if (logger.isDebugEnabled()) + { + logger.debug("executed and deleted: " + job); + } + } + + // if this job is locked too long + long totalLockTimeInMillis = System.currentTimeMillis() - job.getLockTime().getTime(); + if (totalLockTimeInMillis>jbpmMaxLockTime) + { + logger.warn("setRollbackOnly: exceeded maxLockTime ("+jbpmMaxLockTime+") " + job); + jbpmContext.setRollbackOnly(); + } + } + finally + { + jbpmContext.close(); + } + + return null; + } + }); } catch (LockAcquisitionException e) { // ignore jobLockToken = null; } + catch (Exception e) + { + if (logger.isErrorEnabled()) + { + logger.error("failed to execute " + jobIn, e); + } + + if (!isPersistenceException(e)) + { + try + { + final StringWriter memoryWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(memoryWriter)); + + RetryingTransactionHelper tranHelper = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper(); + tranHelper.doInTransaction(new RetryingTransactionCallback() + { + public Object execute() throws Throwable + { + JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext(); + try + { + JobSession jobSession = jbpmContext.getJobSession(); + final Job job = jobSession.loadJob(jobIn.getId()); + + if (logger.isDebugEnabled()) + { + logger.debug("attempting to update exception/retries: " + job); + } + + job.setException(memoryWriter.toString()); + job.setRetries(job.getRetries()-1); + + if (logger.isInfoEnabled()) + { + logger.info("updated job exception and set to "+job.getRetries()+ " retries: " + jobIn); + } + } + finally + { + jbpmContext.close(); + } + + return null; + } + }); + } + catch (Exception e2) + { + if (logger.isErrorEnabled()) + { + logger.error("failed to update job exception/retries " + jobIn, e2); + } + } + } + } } private String getExecutorLock() @@ -254,37 +365,18 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread } } - /** - * Helper class for holding Job reference - * - * @author davidc - */ - private class TransactionJob implements RetryingTransactionCallback + private boolean isPersistenceException(Throwable throwable) { - private Job job; - - /** - * Constructor - * - * @param job the job to execute - */ - public TransactionJob(Job job) + do { - this.job = job; - } - - public Object execute() throws Throwable - { - refreshExecutorLock(jobLockToken); - - AlfrescoJobExecutorThread.super.executeJob(job); - - if (logger.isDebugEnabled()) + if (throwable instanceof HibernateException) { - logger.debug("executed job: "+job); + return true; } - - return null; - } + throwable = throwable.getCause(); + } + while (throwable != null); + + return false; } }