mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
Merged BRANCHES/V3.2 to HEAD:
17409: ETHREEOH-1630 Webscript Kind=org.alfresco.repository.content.stream returns information about the Thumbnail webscript 17444: Update svn:mergeinfo 17940: Updated JBPM log level (GraphElement) to hide errors which should be handled by Alfresco retrying txn handler 17989: AVM - unreported NPE in OrphanRepear (clustered) 17997: WCM clustering - ensure Alfresco/JBPM job executor handles retries (from Alfresco retrying txn handler) git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@19197 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -271,6 +271,12 @@ public class OrphanReaper
|
||||
return null;
|
||||
}
|
||||
AVMNode node = AVMDAOs.Instance().fAVMNodeDAO.getByID(fPurgeQueue.removeFirst());
|
||||
if (node == null)
|
||||
{
|
||||
// eg. cluster, multiple reapers
|
||||
continue;
|
||||
}
|
||||
|
||||
// Save away the ancestor and merged from fields from this node.
|
||||
HistoryLink hlink = AVMDAOs.Instance().fHistoryLinkDAO.getByDescendent(node);
|
||||
AVMNode ancestor = null;
|
||||
|
@@ -18,6 +18,8 @@
|
||||
*/
|
||||
package org.alfresco.repo.workflow.jbpm;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
@@ -29,7 +31,10 @@ import org.alfresco.service.namespace.NamespaceService;
|
||||
import org.alfresco.service.namespace.QName;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.hibernate.HibernateException;
|
||||
import org.jbpm.JbpmConfiguration;
|
||||
import org.jbpm.JbpmContext;
|
||||
import org.jbpm.db.JobSession;
|
||||
import org.jbpm.job.Job;
|
||||
import org.jbpm.job.executor.JobExecutorThread;
|
||||
|
||||
@@ -55,6 +60,8 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
|
||||
private long jobLockTTL = 0;
|
||||
private String jobLockToken = null;
|
||||
|
||||
private JbpmConfiguration jbpmConfiguration;
|
||||
|
||||
@Override
|
||||
public void setActive(boolean isActive)
|
||||
{
|
||||
@@ -71,6 +78,8 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
|
||||
this.jbpmMaxLockTime = maxLockTime;
|
||||
|
||||
this.jobLockTTL = jbpmMaxLockTime+(1000 * 60 * 10);
|
||||
|
||||
this.jbpmConfiguration = jbpmConfiguration;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -160,22 +169,124 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected void executeJob(Job job)
|
||||
protected void executeJob(final Job jobIn)
|
||||
{
|
||||
if ((!isActive) || (alfrescoJobExecutor.getTransactionService().isReadOnly()))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// based on JBPM 3.3.1 (JobExecutorThread.executeJob)
|
||||
// - wrap executeJob / deleteJob in Alfresco retries
|
||||
// - add setRollbackOnly warnings
|
||||
// - if Alfresco retries fail, attempt to set JBPM job exception/retries
|
||||
|
||||
try
|
||||
{
|
||||
alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper().doInTransaction(new TransactionJob(job));
|
||||
RetryingTransactionHelper tranHelper = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper();
|
||||
tranHelper.doInTransaction(new RetryingTransactionCallback<Object>()
|
||||
{
|
||||
public Object execute() throws Throwable
|
||||
{
|
||||
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
|
||||
try
|
||||
{
|
||||
JobSession jobSession = jbpmContext.getJobSession();
|
||||
Job job = jobSession.loadJob(jobIn.getId());
|
||||
|
||||
if (logger.isTraceEnabled())
|
||||
{
|
||||
logger.trace("executing " + job);
|
||||
}
|
||||
|
||||
if (job.execute(jbpmContext))
|
||||
{
|
||||
jobSession.deleteJob(job);
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("executed and deleted: " + job);
|
||||
}
|
||||
}
|
||||
|
||||
// if this job is locked too long
|
||||
long totalLockTimeInMillis = System.currentTimeMillis() - job.getLockTime().getTime();
|
||||
if (totalLockTimeInMillis>jbpmMaxLockTime)
|
||||
{
|
||||
logger.warn("setRollbackOnly: exceeded maxLockTime ("+jbpmMaxLockTime+") " + job);
|
||||
jbpmContext.setRollbackOnly();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
jbpmContext.close();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (LockAcquisitionException e)
|
||||
{
|
||||
// ignore
|
||||
jobLockToken = null;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
if (logger.isErrorEnabled())
|
||||
{
|
||||
logger.error("failed to execute " + jobIn, e);
|
||||
}
|
||||
|
||||
if (!isPersistenceException(e))
|
||||
{
|
||||
try
|
||||
{
|
||||
final StringWriter memoryWriter = new StringWriter();
|
||||
e.printStackTrace(new PrintWriter(memoryWriter));
|
||||
|
||||
RetryingTransactionHelper tranHelper = alfrescoJobExecutor.getTransactionService().getRetryingTransactionHelper();
|
||||
tranHelper.doInTransaction(new RetryingTransactionCallback<Object>()
|
||||
{
|
||||
public Object execute() throws Throwable
|
||||
{
|
||||
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
|
||||
try
|
||||
{
|
||||
JobSession jobSession = jbpmContext.getJobSession();
|
||||
final Job job = jobSession.loadJob(jobIn.getId());
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("attempting to update exception/retries: " + job);
|
||||
}
|
||||
|
||||
job.setException(memoryWriter.toString());
|
||||
job.setRetries(job.getRetries()-1);
|
||||
|
||||
if (logger.isInfoEnabled())
|
||||
{
|
||||
logger.info("updated job exception and set to "+job.getRetries()+ " retries: " + jobIn);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
jbpmContext.close();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e2)
|
||||
{
|
||||
if (logger.isErrorEnabled())
|
||||
{
|
||||
logger.error("failed to update job exception/retries " + jobIn, e2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getExecutorLock()
|
||||
@@ -254,37 +365,18 @@ public class AlfrescoJobExecutorThread extends JobExecutorThread
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class for holding Job reference
|
||||
*
|
||||
* @author davidc
|
||||
*/
|
||||
private class TransactionJob implements RetryingTransactionCallback<Object>
|
||||
private boolean isPersistenceException(Throwable throwable)
|
||||
{
|
||||
private Job job;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param job the job to execute
|
||||
*/
|
||||
public TransactionJob(Job job)
|
||||
do
|
||||
{
|
||||
this.job = job;
|
||||
}
|
||||
|
||||
public Object execute() throws Throwable
|
||||
if (throwable instanceof HibernateException)
|
||||
{
|
||||
refreshExecutorLock(jobLockToken);
|
||||
return true;
|
||||
}
|
||||
throwable = throwable.getCause();
|
||||
}
|
||||
while (throwable != null);
|
||||
|
||||
AlfrescoJobExecutorThread.super.executeJob(job);
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("executed job: "+job);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user