added threading/cluster sync; more tests

This commit is contained in:
2023-05-09 10:35:40 -04:00
parent b116adeaa9
commit afcfbbc61a
31 changed files with 1146 additions and 94 deletions

4
rad.sh
View File

@@ -1,7 +1,7 @@
#!/bin/sh #!/bin/bash
discoverArtifactId() { discoverArtifactId() {
ARTIFACT_ID=`mvn -q -Dexpression=project.artifactId -DforceStdout help:evaluate` ARTIFACT_ID=`mvn -q -Dexpression=project.artifactId -DforceStdout help:evaluate | sed 's/\x1B\[[0-9;]\{1,\}[A-Za-z]//g'`
} }
rebuild() { rebuild() {

View File

@@ -8,5 +8,7 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD) @Target(ElementType.METHOD)
public @interface Asynchronous { public @interface Asynchronous {
boolean durable() default true;
} }

View File

@@ -0,0 +1,20 @@
package com.inteligr8.alfresco.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface ClusterSynchronized {
String value() default "";
long acquireWaitBetweenRetriesInMillis() default 100L;
int acquireMaxRetries() default 300;
long lockTimeoutInMillis() default 5000L;
}

View File

@@ -0,0 +1,15 @@
package com.inteligr8.alfresco.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target({
ElementType.METHOD,
ElementType.PARAMETER
})
public @interface IfNotNull {
}

View File

@@ -0,0 +1,17 @@
package com.inteligr8.alfresco.annotations;
public interface Threadable {
default Integer getThreads() {
return null;
}
default Integer getConcurrency() {
return null;
}
default Integer getThreadPriority() {
return null;
}
}

View File

@@ -0,0 +1,24 @@
package com.inteligr8.alfresco.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Threaded {
String name() default "";
int threads() default 1;
int concurrency() default 0;
int priority() default Thread.NORM_PRIORITY;
boolean join() default false;
long joinWaitMillis() default 0L;
}

View File

@@ -0,0 +1,53 @@
package com.inteligr8.alfresco.annotations.aspect;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
public class AbstractMethodAspect<A extends Annotation> extends AbstractWarnOnceService {
protected A getAnnotation(ProceedingJoinPoint joinPoint, Class<A> annotationClass, boolean warnReturn, boolean warnThrows) {
Method method = this.getMethod(joinPoint, annotationClass, warnReturn, warnThrows);
return method.getAnnotation(annotationClass);
}
protected Method getMethod(ProceedingJoinPoint joinPoint, Object messagePrefix, boolean warnReturn, boolean warnThrows) {
if (!(joinPoint.getSignature() instanceof MethodSignature))
throw new IllegalStateException(this.createMessagePrefix(messagePrefix) + " must be on methods");
MethodSignature methodSig = (MethodSignature) joinPoint.getSignature();
if (warnReturn && methodSig.getReturnType() != null && !this.isReturnTypeVoid(methodSig)) {
this.warn(joinPoint.toLongString(),
"{} has a return value or throws clause; 'null' is always returned and subthread exceptions don't propagate: {}: {}",
this.createMessagePrefix(messagePrefix), joinPoint, methodSig.getReturnType());
}
if (warnThrows && methodSig.getExceptionTypes() != null && methodSig.getExceptionTypes().length > 0) {
this.warn(joinPoint.toLongString(),
"{} has a return value or throws clause; 'null' is always returned and subthread exceptions don't propagate: {}: {}",
this.createMessagePrefix(messagePrefix), joinPoint, methodSig.getExceptionTypes());
}
return methodSig.getMethod();
}
protected boolean isReturnTypeVoid(MethodSignature methodSig) {
return Void.class.equals(methodSig.getReturnType()) || void.class.equals(methodSig.getReturnType());
}
private String createMessagePrefix(Object obj) {
if (obj instanceof Class<?>) {
Class<?> clazz = (Class<?>) obj;
if (clazz.isAnnotation()) {
return "The @" + ((Class<?>) obj).getSimpleName() + " annotated method";
} else {
return "The " + ((Class<?>) obj).getSimpleName() + " class method";
}
} else {
return obj.toString();
}
}
}

View File

@@ -8,7 +8,7 @@ import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public abstract class MethodOrParameterAspect<T extends Annotation> { public abstract class AbstractMethodOrParameterAspect<T extends Annotation> {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());

View File

@@ -0,0 +1,20 @@
package com.inteligr8.alfresco.annotations.aspect;
import java.util.HashSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AbstractWarnOnceService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private Set<String> warned = new HashSet<>();
protected void warn(String key, String message, Object... arguments) {
if (this.warned.add(key))
this.logger.warn(message, arguments);
}
}

View File

@@ -3,21 +3,32 @@ package com.inteligr8.alfresco.annotations.aspect;
import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;
import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature; import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import com.inteligr8.alfresco.annotations.Asynchronous;
import com.inteligr8.alfresco.annotations.service.AsyncService; import com.inteligr8.alfresco.annotations.service.AsyncService;
@Aspect @Aspect
public class AsyncAspect { @DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.AsyncAspect, *")
//@Order(Ordered.HIGHEST_PRECEDENCE + Byte.MAX_VALUE)
//@Component
public class AsyncAspect extends AbstractMethodAspect<Asynchronous> {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired @Autowired
private AsyncService asyncService; @Qualifier("async.mq")
private AsyncService durableAsyncService;
@Autowired
@Qualifier("async.thread")
private AsyncService volatileAsyncService;
@Pointcut("@annotation(com.inteligr8.alfresco.annotations.Asynchronous) && execution(* *(..))") @Pointcut("@annotation(com.inteligr8.alfresco.annotations.Asynchronous) && execution(* *(..))")
public void isAsyncAnnotated() { public void isAsyncAnnotated() {
@@ -25,16 +36,28 @@ public class AsyncAspect {
@Around("isAsyncAnnotated()") @Around("isAsyncAnnotated()")
public Object async(ProceedingJoinPoint joinPoint) throws Throwable { public Object async(ProceedingJoinPoint joinPoint) throws Throwable {
if (this.asyncService.isCurrentThreadAsynchronous()) { this.logger.trace("async({})", joinPoint);
this.logger.trace("Intercepted an @Async method call while already asynchronous; executing synchronously");
Asynchronous async = this.getAnnotation(joinPoint, Asynchronous.class, true, true);
AsyncService asyncService = async.durable() ? this.durableAsyncService : this.volatileAsyncService;
if (!asyncService.isEnabled()) {
this.warn(joinPoint.toLongString(),
"Intercepted an @Asynchronous method call while the appropriate asynchronous service is not enabled; continuing synchronously");
return joinPoint.proceed();
} else if (asyncService.isCurrentThreadAsynchronous()) {
this.logger.debug("Intercepted an @Asynchronous method call while already asynchronous; continuing synchronously");
return joinPoint.proceed(); return joinPoint.proceed();
} else { } else {
this.logger.trace("Intercepted an @Async method call; redirecting to Async service"); this.logger.trace("Intercepted an @Asynchronous method call; redirecting to the appropriate asynchronous service");
this.asyncService.push(joinPoint); asyncService.push(joinPoint);
MethodSignature methodSig = (MethodSignature) joinPoint.getSignature(); MethodSignature methodSig = (MethodSignature) joinPoint.getSignature();
if (!Void.class.equals(methodSig.getReturnType())) if (!this.isReturnTypeVoid(methodSig)) {
this.logger.warn("An @Asynchronous method returns a value, which is not expected/allowed: {}", methodSig.getMethod()); this.warn(joinPoint.toLongString(),
"An @Asynchronous method returns a value, which is not expected/allowed: {}",
methodSig.getMethod());
}
return null; return null;
} }
} }

View File

@@ -8,19 +8,20 @@ import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;
import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature; import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import com.inteligr8.alfresco.annotations.Authorizable; import com.inteligr8.alfresco.annotations.Authorizable;
import com.inteligr8.alfresco.annotations.Authorized; import com.inteligr8.alfresco.annotations.Authorized;
import com.inteligr8.alfresco.annotations.AuthorizedAsSystem; import com.inteligr8.alfresco.annotations.AuthorizedAsSystem;
@Aspect @Aspect
@Order(Ordered.HIGHEST_PRECEDENCE + Short.MAX_VALUE) @DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.AuthorizedAspect, com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect")
//@Order(Ordered.HIGHEST_PRECEDENCE + Short.MAX_VALUE)
//@Component
public class AuthorizedAspect { public class AuthorizedAspect {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());

View File

@@ -9,14 +9,12 @@ import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import com.inteligr8.alfresco.annotations.IfChildAssociationIsPrimary; import com.inteligr8.alfresco.annotations.IfChildAssociationIsPrimary;
@Aspect @Aspect
@Order(Ordered.HIGHEST_PRECEDENCE + Byte.MAX_VALUE) // ordering before transaction/authorized //@Component
public class ChildIsPrimaryAspect extends MethodOrParameterAspect<IfChildAssociationIsPrimary> { public class ChildIsPrimaryAspect extends AbstractMethodOrParameterAspect<IfChildAssociationIsPrimary> {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());

View File

@@ -0,0 +1,67 @@
package com.inteligr8.alfresco.annotations.aspect;
import java.lang.reflect.Method;
import org.alfresco.repo.lock.JobLockService;
import org.alfresco.service.namespace.QName;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.inteligr8.alfresco.annotations.ClusterSynchronized;
@Aspect
@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect, com.inteligr8.alfresco.annotations.aspect.ClusterSynchronizedAspect")
//@Component
//@Order(Ordered.LOWEST_PRECEDENCE - Byte.MAX_VALUE)
public class ClusterSynchronizedAspect extends AbstractMethodAspect<ClusterSynchronized> {
private static final String NS = "http://inteligr8.com/alfresco/model";
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private JobLockService jobLockService;
@Pointcut("@annotation(com.inteligr8.alfresco.annotations.ClusterSynchronized) && execution(* *(..))")
public void isClusterSyncAnnotated() {
}
@Around("isClusterSyncAnnotated()")
public Object clusterSync(ProceedingJoinPoint joinPoint) throws Throwable {
this.logger.trace("clusterSync({})", joinPoint);
Method method = this.getMethod(joinPoint, ClusterSynchronized.class, false, false);
ClusterSynchronized clusterSync = method.getAnnotation(ClusterSynchronized.class);
QName lockQName = this.getLockQName(clusterSync, method);
this.logger.debug("Acquiring cluster lock: {}", lockQName);
String lockToken = this.jobLockService.getLock(lockQName, clusterSync.lockTimeoutInMillis(),
clusterSync.acquireWaitBetweenRetriesInMillis(), clusterSync.acquireMaxRetries());
try {
this.logger.trace("Acquired cluster lock: {}", lockQName);
return joinPoint.proceed();
} finally {
this.logger.debug("Releasing cluster lock: {}", lockQName);
this.jobLockService.releaseLock(lockToken, lockQName);
}
}
private QName getLockQName(ClusterSynchronized clusterSync, Method method) {
String lockName = StringUtils.trimToNull(clusterSync.value());
if (lockName != null) {
return QName.createQNameWithValidLocalName(NS, lockName);
} else {
String methodId = method.getDeclaringClass().getSimpleName() + "_" + method.getName();
return QName.createQNameWithValidLocalName(NS, methodId);
}
}
}

View File

@@ -18,6 +18,7 @@ import org.alfresco.service.namespace.QNamePattern;
import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;
import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -28,6 +29,8 @@ import com.inteligr8.alfresco.annotations.NodeAspectConstrainable;
import com.inteligr8.alfresco.annotations.IfNodeHasAspect; import com.inteligr8.alfresco.annotations.IfNodeHasAspect;
@Aspect @Aspect
@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect, com.inteligr8.alfresco.annotations.aspect.NodeAspectAspect")
//@Component
public class NodeAspectAspect extends QNameBasedAspect<IfNodeHasAspect> { public class NodeAspectAspect extends QNameBasedAspect<IfNodeHasAspect> {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());

View File

@@ -18,6 +18,7 @@ import org.alfresco.service.namespace.QNamePattern;
import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;
import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -28,6 +29,8 @@ import com.inteligr8.alfresco.annotations.IfNodeOfType;
import com.inteligr8.alfresco.annotations.NodeTypeConstrainable; import com.inteligr8.alfresco.annotations.NodeTypeConstrainable;
@Aspect @Aspect
@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect, com.inteligr8.alfresco.annotations.aspect.NodeTypeAspect")
//@Component
public class NodeTypeAspect extends QNameBasedAspect<IfNodeOfType> { public class NodeTypeAspect extends QNameBasedAspect<IfNodeOfType> {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());

View File

@@ -2,8 +2,6 @@ package com.inteligr8.alfresco.annotations.aspect;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import javax.annotation.Nonnull;
import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Aspect;
@@ -11,27 +9,28 @@ import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature; import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order; import com.inteligr8.alfresco.annotations.IfNotNull;
@Aspect @Aspect
@Order(Ordered.HIGHEST_PRECEDENCE + 64) //@Order(Ordered.HIGHEST_PRECEDENCE + 64)
//@Component
public class NotNullAspect { public class NotNullAspect {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Pointcut("execution(* *(@javax.annotation.Nonnull (*), ..))") @Pointcut("execution(* *(@com.inteligr8.alfresco.annotations.IfNotNull (*), ..))")
public void isNonnullAnnotated() { public void isNotNullAnnotated() {
} }
@Around("isNonnullAnnotated()") @Around("isNotNullAnnotated()")
public Object isNotNull(ProceedingJoinPoint joinPoint) throws Throwable { public Object isNotNull(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSig = (MethodSignature) joinPoint.getSignature(); MethodSignature methodSig = (MethodSignature) joinPoint.getSignature();
Method method = methodSig.getMethod(); Method method = methodSig.getMethod();
for (int p = 0; p < method.getParameterCount(); p++) { for (int p = 0; p < method.getParameterCount(); p++) {
if (joinPoint.getArgs()[p] == null && method.getParameters()[p].isAnnotationPresent(Nonnull.class)) { if (joinPoint.getArgs()[p] == null && method.getParameters()[p].isAnnotationPresent(IfNotNull.class)) {
this.logger.debug("A @Nonnull parameter is `null`; skipping method: {}", method); this.logger.debug("A @IfNotNull parameter is `null`; skipping method: {}", method);
return null; return null;
} }
} }

View File

@@ -14,6 +14,7 @@ import org.alfresco.service.cmr.repository.NodeService;
import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;
import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -22,7 +23,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import com.inteligr8.alfresco.annotations.IfNodeExists; import com.inteligr8.alfresco.annotations.IfNodeExists;
@Aspect @Aspect
public class OperableNodeAspect extends MethodOrParameterAspect<IfNodeExists> { @DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect, com.inteligr8.alfresco.annotations.aspect.OperableNodeAspect")
//@Component
public class OperableNodeAspect extends AbstractMethodOrParameterAspect<IfNodeExists> {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());

View File

@@ -17,7 +17,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
public abstract class QNameBasedAspect<T extends Annotation> extends MethodOrParameterAspect<T> { public abstract class QNameBasedAspect<T extends Annotation> extends AbstractMethodOrParameterAspect<T> {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());

View File

@@ -1,38 +1,34 @@
package com.inteligr8.alfresco.annotations.aspect; package com.inteligr8.alfresco.annotations.aspect;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Set;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport; import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport.TxnReadState;
import org.alfresco.repo.transaction.RetryingTransactionHelper; import org.alfresco.repo.transaction.RetryingTransactionHelper;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.transaction.TransactionService; import org.alfresco.service.transaction.TransactionService;
import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.DeclarePrecedence;
import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature; import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.transaction.IllegalTransactionStateException; import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import com.inteligr8.alfresco.annotations.TransactionalRetryable; import com.inteligr8.alfresco.annotations.TransactionalRetryable;
@Aspect @Aspect
@Order(Ordered.LOWEST_PRECEDENCE - Short.MAX_VALUE) @DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.AuthorizedAspect, com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect")
//@Component //@Component
//@Order(Ordered.LOWEST_PRECEDENCE - Short.MAX_VALUE)
public class RetryingTransactionAspect { public class RetryingTransactionAspect {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Set<String> warned = new HashSet<>();
@Autowired @Autowired
private TransactionService txService; private TransactionService txService;
@@ -40,33 +36,29 @@ public class RetryingTransactionAspect {
public void isTransactionalAnnotated() { public void isTransactionalAnnotated() {
} }
@Pointcut("@annotation(org.springframework.transaction.annotation.TransactionalRetryable) && execution(* *(..))") @Pointcut("@annotation(com.inteligr8.alfresco.annotations.TransactionalRetryable) && execution(* *(..))")
public void isTransactionalRetryableAnnotated() { public void isTransactionalRetryableAnnotated() {
} }
@Around("isTransactionalAnnotated()") @Around("isTransactionalAnnotated() || isTransactionalRetryableAnnotated()")
public Object retryingTransactional(ProceedingJoinPoint joinPoint) throws Throwable { public Object retryingTransactional(ProceedingJoinPoint joinPoint) throws Throwable {
this.logger.trace("retryingTransactional({})", joinPoint); this.logger.trace("retryingTransactional({})", joinPoint);
Method method = this.getMethod(joinPoint); Method method = this.getMethod(joinPoint);
Transactional txl = method.getAnnotation(Transactional.class); Transactional txl = method.getAnnotation(Transactional.class);
TransactionalRetryable txtry = method.getAnnotation(TransactionalRetryable.class);
if (this.doCreateNewTxContext(txl) || this.isReadStateChange(txl)) { if (this.doCreateNewTxContext(txl) || this.isReadStateChange(txl)) {
TransactionalRetryable txtry = method.getAnnotation(TransactionalRetryable.class);
this.logger.debug("Changing TX context: {} => [ro: {}, new: {}]", AlfrescoTransactionSupport.getTransactionReadState(), txl.readOnly(), txl.propagation()); this.logger.debug("Changing TX context: {} => [ro: {}, new: {}]", AlfrescoTransactionSupport.getTransactionReadState(), txl.readOnly(), txl.propagation());
return this.execute(joinPoint, txl, txtry); return this.execute(joinPoint, txl, txtry);
} else if (this.doCreateNewTxRetryContext(txtry)) {
this.logger.debug("Changing TX context: retries: {}", txtry.maxRetries());
return this.execute(joinPoint, null, txtry);
} else { } else {
return joinPoint.proceed(); return joinPoint.proceed();
} }
} }
@Before("isTransactionalRetryableAnnotated() && !isTransactionalAnnotated()")
public void warn(ProceedingJoinPoint joinPoint) throws Throwable {
if (this.warned.add(joinPoint.toLongString()))
this.logger.warn("A @TransactionalRetryable annotation was found without a @Transactional annotation; it will be ignored: {}", joinPoint);
}
private Method getMethod(ProceedingJoinPoint joinPoint) { private Method getMethod(ProceedingJoinPoint joinPoint) {
if (!(joinPoint.getSignature() instanceof MethodSignature)) if (!(joinPoint.getSignature() instanceof MethodSignature))
throw new IllegalStateException("The @Transactional or @TransactionalRetryable annotations must be on methods"); throw new IllegalStateException("The @Transactional or @TransactionalRetryable annotations must be on methods");
@@ -76,6 +68,9 @@ public class RetryingTransactionAspect {
} }
private boolean isReadStateChange(Transactional txl) { private boolean isReadStateChange(Transactional txl) {
if (txl == null)
return false;
switch (txl.propagation()) { switch (txl.propagation()) {
case NEVER: case NEVER:
case NOT_SUPPORTED: case NOT_SUPPORTED:
@@ -97,8 +92,14 @@ public class RetryingTransactionAspect {
} }
} }
private boolean doCreateNewTxRetryContext(TransactionalRetryable txtry) {
return txtry != null;
}
private boolean doCreateNewTxContext(Transactional txl) { private boolean doCreateNewTxContext(Transactional txl) {
switch (txl.propagation()) { if (txl == null) {
return false;
} else switch (txl.propagation()) {
case NEVER: case NEVER:
switch (AlfrescoTransactionSupport.getTransactionReadState()) { switch (AlfrescoTransactionSupport.getTransactionReadState()) {
case TXN_NONE: case TXN_NONE:
@@ -164,12 +165,13 @@ public class RetryingTransactionAspect {
if (txtry.incRetryWaitInMillis() > 0) if (txtry.incRetryWaitInMillis() > 0)
rthelper.setRetryWaitIncrementMs(txtry.incRetryWaitInMillis()); rthelper.setRetryWaitIncrementMs(txtry.incRetryWaitInMillis());
} }
if (txl.timeout() > 0) if (txl != null && txl.timeout() > 0)
rthelper.setMaxExecutionMs(txl.timeout() * 1000L); rthelper.setMaxExecutionMs(txl.timeout() * 1000L);
try { try {
this.logger.trace("source tx: {}", AlfrescoTransactionSupport.getTransactionId()); this.logger.trace("source tx: {}", AlfrescoTransactionSupport.getTransactionId());
return rthelper.doInTransaction(rtcallback, txl.readOnly(), true); boolean readonly = txl != null && txl.readOnly() || txl == null && AlfrescoTransactionSupport.getTransactionReadState() == TxnReadState.TXN_READ_ONLY;
return rthelper.doInTransaction(rtcallback, readonly, txl != null);
} catch (RuntimeException re) { } catch (RuntimeException re) {
// attempt to unwrap the exception // attempt to unwrap the exception
if (re.getMessage() == null) { if (re.getMessage() == null) {

View File

@@ -0,0 +1,178 @@
package com.inteligr8.alfresco.annotations.aspect;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.inteligr8.alfresco.annotations.Threadable;
import com.inteligr8.alfresco.annotations.Threaded;
@Aspect
@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.AsyncAspect, com.inteligr8.alfresco.annotations.aspect.ThreadedAspect, *")
//@Order(Ordered.HIGHEST_PRECEDENCE + Byte.MAX_VALUE)
//@Component
public class ThreadedAspect extends AbstractMethodAspect<Threaded> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private ThreadLocal<Set<String>> nested = ThreadLocal.withInitial(new Supplier<Set<String>>() {
public Set<String> get() {
return new HashSet<>();
}
});
@Pointcut("@annotation(com.inteligr8.alfresco.annotations.Threaded) && execution(* *(..))")
public void isThreadedAnnotated() {
}
@Around("isThreadedAnnotated()")
public Object threaded(ProceedingJoinPoint joinPoint) throws Throwable {
// AspectJ does not recursively match annotations if the same thread is calling joinPoint.proceed()
// but when a different thread calls it, it doesn't know it is already processing the annotation
// so we are going to use a ThreadLocal to prevent recursion across threads
if (this.nested.get().contains(joinPoint.getSignature().toLongString()))
return joinPoint.proceed();
this.logger.trace("threaded({})", joinPoint);
Threaded threaded = this.getAnnotation(joinPoint, Threaded.class, true, true);
MergedThreadConfiguration threadConfig = new MergedThreadConfiguration(joinPoint, threaded);
ThreadFactoryBuilder tfbuilder = new ThreadFactoryBuilder()
.setPriority(threadConfig.getThreadPriority())
.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("The thread '" + t.getName() + "' had an exception", e);
}
});
if (threaded.name().length() > 0)
tfbuilder.setNameFormat(threaded.name() + "-%d");
Integer concurrency = threadConfig.getConcurrency();
if (concurrency == null)
concurrency = threadConfig.getThreads();
BlockingQueue<Runnable> threadQueue = new ArrayBlockingQueue<>(threadConfig.getThreads().intValue());
ThreadPoolExecutor threadExecutor = new ThreadPoolExecutor(concurrency.intValue(), concurrency.intValue(),
1L, TimeUnit.SECONDS,
threadQueue,
tfbuilder.build());
Callable<Object> callable = this.createCallable(joinPoint);
this.logger.debug("Starting {} threads", threadConfig.getThreads());
for (int t = 0; t < threadConfig.getThreads().intValue(); t++)
threadExecutor.submit(callable);
threadExecutor.shutdown();
if (threaded.join()) {
long waitMillis = threaded.joinWaitMillis() == 0L ? 300000L : threaded.joinWaitMillis();
do {
this.logger.debug("Blocking this thread until subthreads finish: {}", Thread.currentThread().getId());
if (threadExecutor.awaitTermination(waitMillis, TimeUnit.MILLISECONDS)) {
this.logger.trace("Subthreads finished; unblocking this thread: {}", Thread.currentThread().getId());
break;
}
} while (threaded.joinWaitMillis() == 0L);
this.logger.debug("Subthreads running: {}; unblocking this thread: {}", threadExecutor.getActiveCount(), Thread.currentThread().getId());
}
return null;
}
private Callable<Object> createCallable(final ProceedingJoinPoint joinPoint) throws Throwable {
return new Callable<Object>() {
@Override
public Object call() throws Exception {
logger.debug("entering thread: {}", Thread.currentThread().getId());
// AspectJ does not recursively match annotations if the same thread is calling joinPoint.proceed()
// but when a different thread calls it, it doesn't know it is already processing the annotation
// so we are going to use a ThreadLocal to prevent recursion across threads
nested.get().add(joinPoint.getSignature().toLongString());
// we cannot use joinPoint.proceed() as it will only execute in 1 thread
// we need to recreate the method call, fresh, using reflection
MethodSignature methodSig = (MethodSignature) joinPoint.getSignature();
Method method = methodSig.getMethod();
try {
method.setAccessible(true);
return method.invoke(joinPoint.getThis(), joinPoint.getArgs());
} catch (Exception | Error e) {
logger.error("An unexpected issue occurred in thread #" + Thread.currentThread().getId(), e);
throw e;
} catch (Throwable t) {
logger.error("An unexpected issue occurred in thread #" + Thread.currentThread().getId(), t);
throw new RuntimeException("This should never happen", t);
} finally {
logger.trace("leaving thread: {}", Thread.currentThread().getId());
}
}
};
}
private class MergedThreadConfiguration implements Threadable {
private final Threaded threaded;
private final Threadable threadable;
public MergedThreadConfiguration(ProceedingJoinPoint joinPoint, Threaded threaded) {
this.threaded = threaded;
this.threadable = (joinPoint.getThis() instanceof Threadable) ? (Threadable) joinPoint.getThis() : null;
}
@Override
public Integer getThreads() {
if (this.threadable != null && this.threadable.getThreads() != null) {
return this.threadable.getThreads();
} else {
return this.threaded.threads();
}
}
@Override
public Integer getConcurrency() {
if (this.threadable != null && this.threadable.getConcurrency() != null) {
return this.threadable.getConcurrency();
} else if (this.threaded.concurrency() <= 0) {
return null;
} else {
return this.threaded.concurrency();
}
}
@Override
public Integer getThreadPriority() {
if (this.threadable != null && this.threadable.getThreadPriority() != null) {
return this.threadable.getThreadPriority();
} else {
return this.threaded.priority();
}
}
}
}

View File

@@ -0,0 +1,95 @@
package com.inteligr8.alfresco.annotations.service.impl;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.UserTransaction;
import javax.transaction.xa.XAResource;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
import org.alfresco.repo.transaction.TransactionListener;
public class AlfrescoTransaction implements UserTransaction, Transaction {
private UserTransaction userTx;
public AlfrescoTransaction(UserTransaction userTx) {
this.userTx = userTx;
}
@Override
public void begin() throws NotSupportedException, SystemException {
this.userTx.begin();
}
@Override
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException,
SecurityException, IllegalStateException, SystemException {
this.userTx.commit();
}
@Override
public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
return false;
}
@Override
public boolean enlistResource(XAResource xaRes) throws RollbackException, IllegalStateException, SystemException {
return false;
}
@Override
public int getStatus() throws SystemException {
return this.userTx.getStatus();
}
@Override
public void registerSynchronization(Synchronization sync)
throws RollbackException, IllegalStateException, SystemException {
AlfrescoTransactionSupport.bindListener(new TransactionListener() {
@Override
public void flush() {
}
@Override
public void beforeCompletion() {
sync.beforeCompletion();
}
@Override
public void beforeCommit(boolean readOnly) {
}
@Override
public void afterRollback() {
sync.afterCompletion(Status.STATUS_ROLLEDBACK);
}
@Override
public void afterCommit() {
sync.afterCompletion(Status.STATUS_COMMITTED);
}
});
}
@Override
public void rollback() throws IllegalStateException, SecurityException, SystemException {
this.userTx.rollback();
}
@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
this.userTx.setRollbackOnly();
}
@Override
public void setTransactionTimeout(int seconds) throws SystemException {
this.userTx.setTransactionTimeout(seconds);
}
}

View File

@@ -0,0 +1,109 @@
package com.inteligr8.alfresco.annotations.service.impl;
import java.util.function.Supplier;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.InvalidTransactionException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
import org.alfresco.service.transaction.TransactionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class AlfrescoTransactionManager implements TransactionManager {
@Autowired
private TransactionService txService;
private ThreadLocal<AlfrescoTransaction> tx = ThreadLocal.withInitial(new Supplier<AlfrescoTransaction>() {
@Override
public AlfrescoTransaction get() {
return null;
}
});
@Override
public void begin() throws NotSupportedException, SystemException {
UserTransaction userTx = this.txService.getNonPropagatingUserTransaction();
AlfrescoTransaction tx = new AlfrescoTransaction(userTx);
tx.begin();
this.tx.set(tx);
}
@Override
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException,
SecurityException, IllegalStateException, SystemException {
AlfrescoTransaction tx = this.tx.get();
if (tx == null)
throw new IllegalStateException();
tx.commit();
}
@Override
public int getStatus() throws SystemException {
AlfrescoTransaction tx = this.tx.get();
if (tx == null)
return Status.STATUS_NO_TRANSACTION;
return tx.getStatus();
}
@Override
public Transaction getTransaction() throws SystemException {
return this.tx.get();
}
@Override
public void resume(Transaction tx) throws InvalidTransactionException, IllegalStateException, SystemException {
if (!(tx instanceof AlfrescoTransaction))
throw new InvalidTransactionException("An AlfrescoTransaction is expected; received: " + tx.getClass());
if (this.tx.get() != null)
throw new IllegalStateException();
this.tx.set((AlfrescoTransaction) tx);
}
@Override
public void rollback() throws IllegalStateException, SecurityException, SystemException {
AlfrescoTransaction tx = this.tx.get();
if (tx == null)
throw new IllegalStateException();
tx.rollback();
}
@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
AlfrescoTransaction tx = this.tx.get();
if (tx == null)
throw new IllegalStateException();
tx.setRollbackOnly();
}
@Override
public void setTransactionTimeout(int seconds) throws SystemException {
AlfrescoTransaction tx = this.tx.get();
if (tx != null)
tx.setTransactionTimeout(seconds);
}
@Override
public Transaction suspend() throws SystemException {
try {
return this.tx.get();
} finally {
this.tx.set(null);
}
}
}

View File

@@ -7,6 +7,8 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Parameter; import java.lang.reflect.Parameter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@@ -33,7 +35,6 @@ import javax.transaction.TransactionManager;
import org.alfresco.model.ContentModel; import org.alfresco.model.ContentModel;
import org.alfresco.repo.cache.SimpleCache; import org.alfresco.repo.cache.SimpleCache;
import org.alfresco.repo.dictionary.M2Model; import org.alfresco.repo.dictionary.M2Model;
import org.alfresco.repo.search.transaction.SimpleTransactionManager;
import org.alfresco.repo.version.common.VersionImpl; import org.alfresco.repo.version.common.VersionImpl;
import org.alfresco.service.cmr.action.Action; import org.alfresco.service.cmr.action.Action;
import org.alfresco.service.cmr.action.ActionService; import org.alfresco.service.cmr.action.ActionService;
@@ -54,6 +55,8 @@ import org.aspectj.lang.reflect.MethodSignature;
import org.quartz.JobKey; import org.quartz.JobKey;
import org.quartz.Scheduler; import org.quartz.Scheduler;
import org.quartz.SchedulerException; import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.JobDetailImpl; import org.quartz.impl.JobDetailImpl;
import org.quartz.impl.StdSchedulerFactory; import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -68,6 +71,8 @@ import org.springframework.transaction.annotation.Transactional;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.inteligr8.alfresco.annotations.AuthorizedAsSystem; import com.inteligr8.alfresco.annotations.AuthorizedAsSystem;
import com.inteligr8.alfresco.annotations.Threadable;
import com.inteligr8.alfresco.annotations.Threaded;
import com.inteligr8.alfresco.annotations.TransactionalRetryable; import com.inteligr8.alfresco.annotations.TransactionalRetryable;
import com.inteligr8.alfresco.annotations.job.AsyncJob; import com.inteligr8.alfresco.annotations.job.AsyncJob;
import com.inteligr8.alfresco.annotations.service.AsyncProcessException; import com.inteligr8.alfresco.annotations.service.AsyncProcessException;
@@ -78,26 +83,28 @@ import com.inteligr8.alfresco.annotations.service.AsyncService;
* *
* @author brian@inteligr8.com * @author brian@inteligr8.com
*/ */
@Component @Component("async.mq")
public class MqAsyncService extends AbstractLifecycleBean implements AsyncService { public class MqAsyncService extends AbstractLifecycleBean implements AsyncService, Threadable {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final JobKey jobKey = new JobKey("mq-async", "inteligr8-annotations");
private final Pattern typePattern = Pattern.compile("v([0-9]+):([^:#]+)#(.+)"); private final Pattern typePattern = Pattern.compile("v([0-9]+):([^:#]+)#(.+)");
private final ObjectMapper om = new ObjectMapper(); private final ObjectMapper om = new ObjectMapper();
@Value("${inteligr8.async.mq.enabled}") @Value("${inteligr8.async.mq.enabled}")
protected boolean enabled; protected boolean enabled;
@Value("${inteligr8.async.mq.worker}") @Value("${inteligr8.async.mq.workerThreads}")
protected boolean workerEnabled; protected int workerThreads;
@Value("${inteligr8.async.mq.url:#{null}}") @Value("${inteligr8.async.mq.url}")
//@Value("${messaging.broker.url}")
protected String url; protected String url;
@Value("${inteligr8.async.mq.username:#{null}}") @Value("${inteligr8.async.mq.username}")
protected String username; protected String username;
@Value("${inteligr8.async.mq.password:#{null}}") @Value("${inteligr8.async.mq.password}")
protected String password; protected String password;
@Value("${inteligr8.async.mq.queue}") @Value("${inteligr8.async.mq.queue}")
@@ -130,6 +137,11 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic
@Autowired @Autowired
protected TransactionService txService; protected TransactionService txService;
@Autowired
protected TransactionManager txManager;
private String hostname;
private XaPooledConnectionFactory factory; private XaPooledConnectionFactory factory;
private SimpleCache<Pair<Class<?>, String>, Method> methodCache; private SimpleCache<Pair<Class<?>, String>, Method> methodCache;
@@ -146,38 +158,52 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic
if (!this.enabled) if (!this.enabled)
return; return;
try {
this.hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException uhe) {
this.hostname = "unknown";
}
ActiveMQXAConnectionFactory factory = new ActiveMQXAConnectionFactory(this.url); ActiveMQXAConnectionFactory factory = new ActiveMQXAConnectionFactory(this.url);
XaPooledConnectionFactory pool = new XaPooledConnectionFactory(); XaPooledConnectionFactory pool = new XaPooledConnectionFactory();
pool.setConnectionFactory(factory); pool.setConnectionFactory(factory);
pool.setMaxConnections(this.maxConnections); pool.setMaxConnections(this.maxConnections);
pool.setTransactionManager(SimpleTransactionManager.getInstance()); pool.setTransactionManager(this.txManager);
pool.start(); pool.start();
this.factory = pool; this.factory = pool;
if (!this.workerEnabled) if (this.workerThreads <= 0)
return; return;
JobKey jobKey = new JobKey("behaviour-async", "inteligr8-annotations");
JobDetailImpl jobDetail = new JobDetailImpl(); JobDetailImpl jobDetail = new JobDetailImpl();
jobDetail.setKey(jobKey); jobDetail.setKey(this.jobKey);
jobDetail.setRequestsRecovery(true); jobDetail.setRequestsRecovery(true);
jobDetail.setJobClass(AsyncJob.class); jobDetail.setJobClass(AsyncJob.class);
jobDetail.getJobDataMap().put("asyncService", this); jobDetail.getJobDataMap().put("asyncService", this);
Trigger trigger = TriggerBuilder.newTrigger()
.startNow()
.build();
try { try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); StdSchedulerFactory.getDefaultScheduler()
scheduler.addJob(jobDetail, false); .scheduleJob(jobDetail, trigger);
scheduler.triggerJob(jobKey);
} catch (SchedulerException se) { } catch (SchedulerException se) {
this.logger.error("The behavior policy async service failed to start; no asynchronous policies will be processed!", se); this.logger.error("The MQ async service job failed to start; no asynchronous executions will be processed!", se);
} }
} }
@Override @Override
protected void onShutdown(ApplicationEvent event) { protected void onShutdown(ApplicationEvent event) {
try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.deleteJob(this.jobKey);
} catch (SchedulerException se) {
this.logger.warn("The MQ async service job failed to stop", se);
}
if (this.factory != null) if (this.factory != null)
this.factory.stop(); this.factory.stop();
} }
@@ -187,12 +213,18 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic
return enabled; return enabled;
} }
@Override
public Integer getThreads() {
return this.workerThreads;
}
@Override @Override
public boolean isCurrentThreadAsynchronous() { public boolean isCurrentThreadAsynchronous() {
return this.isAsync.get(); return this.isAsync.get();
} }
@Override @Override
@Transactional
public void poll() throws AsyncProcessException { public void poll() throws AsyncProcessException {
this.logger.trace("poll()"); this.logger.trace("poll()");
this.isAsync.set(true); this.isAsync.set(true);
@@ -200,7 +232,7 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic
try { try {
Connection mqcon = this.factory.createConnection(this.username, this.password); Connection mqcon = this.factory.createConnection(this.username, this.password);
try { try {
mqcon.setClientID(this.clientId); mqcon.setClientID(this.clientId + "-service-" + this.hostname);
Session mqsession = mqcon.createSession(true, Session.CLIENT_ACKNOWLEDGE); Session mqsession = mqcon.createSession(true, Session.CLIENT_ACKNOWLEDGE);
try { try {
@@ -246,11 +278,15 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic
this.logger.debug("Polling ongoing messages ..."); this.logger.debug("Polling ongoing messages ...");
Queue mqqueue = mqsession.createQueue(this.queueName); Queue mqqueue = mqsession.createQueue(this.queueName);
this.pollMainThreaded(mqsession, mqqueue);
}
@Threaded(name = "mq-poll", join = true)
private void pollMainThreaded(Session mqsession, Queue mqqueue) throws JMSException {
MessageConsumer consumer = mqsession.createConsumer(mqqueue); MessageConsumer consumer = mqsession.createConsumer(mqqueue);
try { try {
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
this.pollTx(mqsession, consumer, null); pollTx(mqsession, consumer, null);
} }
} finally { } finally {
consumer.close(); consumer.close();
@@ -320,7 +356,7 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic
method.invoke(bean, args); method.invoke(bean, args);
} catch (ClassNotFoundException cnfe) { } catch (ClassNotFoundException cnfe) {
this.logger.error("A policy bean could not be found; will try on next restart"); this.logger.error("A bean could not be found; will try on next restart");
this.logger.error("The bean '{}' could not be found: {}", matcher.group(2), cnfe.getMessage()); this.logger.error("The bean '{}' could not be found: {}", matcher.group(2), cnfe.getMessage());
if (isErrorQueue) if (isErrorQueue)
return false; return false;
@@ -330,19 +366,19 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic
// return to queue and retry indefinitely // return to queue and retry indefinitely
return false; return false;
} catch (NoSuchMethodException nsme) { } catch (NoSuchMethodException nsme) {
this.logger.error("A policy enumeration argument could not be constructed; will try on next restart"); this.logger.error("A bean enum argument could not be constructed; will try on next restart");
this.logger.error("An argument could not be The bean '{}' could not be found: {}", matcher.group(2), nsme.getMessage()); this.logger.error("An argument could not be The bean '{}' could not be found: {}", matcher.group(2), nsme.getMessage());
if (isErrorQueue) if (isErrorQueue)
return false; return false;
this.moveToErrorQueue(mqsession, mqmsg); this.moveToErrorQueue(mqsession, mqmsg);
} catch (IllegalAccessException iae) { } catch (IllegalAccessException iae) {
this.logger.error("A policy method was not accessible (public); will try on next restart"); this.logger.error("A bean method was not accessible (public); will try on next restart");
this.logger.warn("The bean '{}' method '{}' is not accessible: {}", matcher.group(2), matcher.group(3), iae.getMessage()); this.logger.warn("The bean '{}' method '{}' is not accessible: {}", matcher.group(2), matcher.group(3), iae.getMessage());
if (isErrorQueue) if (isErrorQueue)
return false; return false;
this.moveToErrorQueue(mqsession, mqmsg); this.moveToErrorQueue(mqsession, mqmsg);
} catch (InstantiationException | InvocationTargetException ie) { } catch (InstantiationException | InvocationTargetException ie) {
this.logger.error("A policy method execution failed; will try on next restart"); this.logger.error("A bean method execution failed; will try on next restart");
this.logger.warn("The bean '{}' method '{}' execution failed: {}", matcher.group(2), matcher.group(3), ie.getMessage()); this.logger.warn("The bean '{}' method '{}' execution failed: {}", matcher.group(2), matcher.group(3), ie.getMessage());
if (isErrorQueue) if (isErrorQueue)
return false; return false;
@@ -390,26 +426,26 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic
if (!(joinPoint.getSignature() instanceof MethodSignature)) if (!(joinPoint.getSignature() instanceof MethodSignature))
throw new IllegalStateException("The join point must be on methods and methods have signatures"); throw new IllegalStateException("The join point must be on methods and methods have signatures");
Class<?> beanType = joinPoint.getThis().getClass(); Object bean = joinPoint.getThis();
this.logger.debug("Queuing for bean: {}", beanType); this.logger.debug("Queuing for bean: {}", bean.getClass());
MethodSignature methodSig = (MethodSignature) joinPoint.getSignature(); MethodSignature methodSig = (MethodSignature) joinPoint.getSignature();
Method method = methodSig.getMethod(); Method method = methodSig.getMethod();
this.logger.debug("Queuing for method: {}", method); this.logger.debug("Queuing for method: {}", method);
this.push(beanType, method.getName(), Arrays.asList(joinPoint.getArgs())); this.push(bean, method.getName(), Arrays.asList(joinPoint.getArgs()));
} }
@Transactional @Transactional
public void push(Object callbackBean, String callbackMethod, List<Object> args) throws AsyncProcessException { public void push(Object callbackBean, String callbackMethod, List<Object> args) throws AsyncProcessException {
this.logger.trace("push({}, {}, {})", callbackBean, callbackMethod, args); this.logger.trace("push({}, {}, {})", callbackBean.getClass(), callbackMethod, args);
UUID msgId = UUID.randomUUID(); UUID msgId = UUID.randomUUID();
try { try {
Connection mqcon = this.factory.createConnection(this.username, this.password); Connection mqcon = this.factory.createConnection(this.username, this.password);
try { try {
mqcon.setClientID(this.clientId); mqcon.setClientID(this.clientId + "-client-" + this.hostname);
Session mqsession = mqcon.createSession(true, Session.AUTO_ACKNOWLEDGE); Session mqsession = mqcon.createSession(true, Session.AUTO_ACKNOWLEDGE);
try { try {
@@ -433,7 +469,6 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic
} }
this.logger.debug("Sent node as message: {} => {}", callbackMethod, msgId); this.logger.debug("Sent node as message: {} => {}", callbackMethod, msgId);
mqsession.commit();
} finally { } finally {
mqsession.close(); mqsession.close();
} }

View File

@@ -0,0 +1,175 @@
package com.inteligr8.alfresco.annotations.service.impl;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.JobDetailImpl;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.extensions.surf.util.AbstractLifecycleBean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.inteligr8.alfresco.annotations.job.AsyncJob;
import com.inteligr8.alfresco.annotations.service.AsyncProcessException;
import com.inteligr8.alfresco.annotations.service.AsyncService;
/**
* This class provides a non-persistent alternative to MQ for asynchronous execution.
*
* @author brian@inteligr8.com
*/
@Component("async.thread")
public class ThreadPoolAsyncService extends AbstractLifecycleBean implements AsyncService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final JobKey jobKey = new JobKey("thread-async", "inteligr8-annotations");
@Value("${inteligr8.async.workableThreads}")
protected int queueSize;
@Value("${inteligr8.async.workerThreads}")
protected byte poolSize;
private LinkedBlockingQueue<Runnable> queue;
private ThreadPoolExecutor pool;
private ThreadLocal<Boolean> isAsync = ThreadLocal.withInitial(new Supplier<Boolean>() {
@Override
public Boolean get() {
return false;
}
});
@Override
protected void onBootstrap(ApplicationEvent event) {
this.queue = new LinkedBlockingQueue<>(this.queueSize);
JobDetailImpl jobDetail = new JobDetailImpl();
jobDetail.setKey(this.jobKey);
jobDetail.setRequestsRecovery(true);
jobDetail.setJobClass(AsyncJob.class);
jobDetail.getJobDataMap().put("asyncService", this);
Trigger trigger = TriggerBuilder.newTrigger()
.startNow()
.build();
try {
StdSchedulerFactory.getDefaultScheduler()
.scheduleJob(jobDetail, trigger);
} catch (SchedulerException se) {
this.logger.error("The async service job failed to start; no asynchronous executions will be processed!", se);
}
}
@Override
protected void onShutdown(ApplicationEvent event) {
try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.deleteJob(this.jobKey);
} catch (SchedulerException se) {
this.logger.warn("The MQ async service job failed to stop", se);
}
}
@Override
public boolean isEnabled() {
return true;
}
@Override
public boolean isCurrentThreadAsynchronous() {
return this.isAsync.get();
}
@Override
public void poll() throws AsyncProcessException {
this.logger.trace("poll()");
this.isAsync.set(true);
this.pool = new ThreadPoolExecutor(1, this.poolSize, 5L, TimeUnit.SECONDS, this.queue);
try {
while (!this.pool.isTerminated()) {
this.logger.debug("Perpetually waiting for thread pool to terminate ...");
this.pool.awaitTermination(300L, TimeUnit.SECONDS);
}
this.logger.info("The Async service thread pool terminated");
} catch (InterruptedException ie) {
this.logger.info("The Async service thread pool was interrupted");
}
}
@Transactional
protected void executeWork(Object callbackBean, Method callbackMethod, List<Object> args) {
try {
callbackMethod.invoke(callbackBean, args.toArray());
} catch (IllegalAccessException iae) {
this.logger.error("A bean method was not accessible (public)");
this.logger.warn("The bean '{}' method '{}' is not accessible: {}", callbackBean.getClass(), callbackMethod, iae.getMessage());
} catch (InvocationTargetException ite) {
this.logger.error("A bean method execution failed");
this.logger.warn("The bean '{}' method '{}' execution failed: {}", callbackBean.getClass(), callbackMethod, ite.getMessage());
}
}
public void push(ProceedingJoinPoint joinPoint) throws AsyncProcessException {
this.logger.trace("push({})", joinPoint);
if (!(joinPoint.getSignature() instanceof MethodSignature))
throw new IllegalStateException("The join point must be on methods and methods have signatures");
Object bean = joinPoint.getThis();
this.logger.debug("Queuing for bean: {}", bean.getClass());
MethodSignature methodSig = (MethodSignature) joinPoint.getSignature();
Method method = methodSig.getMethod();
this.logger.debug("Queuing for method: {}", method);
this.push(bean, method, Arrays.asList(joinPoint.getArgs()));
}
public void push(Object callbackBean, Method callbackMethod, List<Object> args) throws AsyncProcessException {
this.logger.trace("push({}, {}, {})", callbackBean, callbackMethod, args);
RunAsWork<Void> work = new RunAsWork<Void>() {
@Override
public Void doWork() {
executeWork(callbackBean, callbackMethod, args);
return null;
}
};
final String runAs = AuthenticationUtil.getRunAsUser();
Runnable runnable = new Runnable() {
@Override
public void run() {
// run the thread with the same authentication context as the initiating user
AuthenticationUtil.runAs(work, runAs);
}
};
this.queue.offer(runnable);
}
}

View File

@@ -1,14 +1,18 @@
<aspectj> <aspectj>
<aspects> <aspects>
<aspect name="com.inteligr8.alfresco.annotations.aspect.NotNullAspect" /> <!-- These must be in precedence order; highest to lowest -->
<aspect name="com.inteligr8.alfresco.annotations.aspect.NotNullAspect" />
<aspect name="com.inteligr8.alfresco.annotations.aspect.ThreadedAspect" />
<aspect name="com.inteligr8.alfresco.annotations.aspect.AsyncAspect" /> <aspect name="com.inteligr8.alfresco.annotations.aspect.AsyncAspect" />
<aspect name="com.inteligr8.alfresco.annotations.aspect.AuthorizedAspect" /> <aspect name="com.inteligr8.alfresco.annotations.aspect.AuthorizedAspect" />
<aspect name="com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect" />
<aspect name="com.inteligr8.alfresco.annotations.aspect.ClusterSynchronizedAspect" />
<aspect name="com.inteligr8.alfresco.annotations.aspect.OperableNodeAspect" /> <aspect name="com.inteligr8.alfresco.annotations.aspect.OperableNodeAspect" />
<aspect name="com.inteligr8.alfresco.annotations.aspect.NodeTypeAspect" /> <aspect name="com.inteligr8.alfresco.annotations.aspect.NodeTypeAspect" />
<aspect name="com.inteligr8.alfresco.annotations.aspect.NodeAspectAspect" /> <aspect name="com.inteligr8.alfresco.annotations.aspect.NodeAspectAspect" />
<aspect name="com.inteligr8.alfresco.annotations.aspect.ChildIsPrimaryAspect" /> <aspect name="com.inteligr8.alfresco.annotations.aspect.ChildIsPrimaryAspect" />
<aspect name="com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect" />
</aspects> </aspects>
</aspectj> </aspectj>

View File

@@ -1,14 +1,24 @@
inteligr8.annotations.aspectj.scanPackages=com.inteligr8.alfresco.annotations inteligr8.annotations.aspectj.scanPackages=com.inteligr8.alfresco.annotations
inteligr8.async.mq.enabled=false # should @Asynchronous be joined with MQ (or local thread pool)
inteligr8.async.mq.worker=true inteligr8.async.mq.enabled=true
# threads to execute @Asynchronous methods (when MQ disabled above or by method)
inteligr8.async.workableThreads=100
inteligr8.async.workerThreads=5
# threads to execute @Asynchronous methods (when MQ enabled above; 0 disables execution)
inteligr8.async.mq.workerThreads=1
# MQ settings
inteligr8.async.mq.url=${messaging.broker.url} inteligr8.async.mq.url=${messaging.broker.url}
inteligr8.async.mq.username=${messaging.broker.username} inteligr8.async.mq.username=${messaging.broker.username}
inteligr8.async.mq.password=${messaging.broker.password} inteligr8.async.mq.password=${messaging.broker.password}
inteligr8.async.mq.queuePrefix=inteligr8.acs. inteligr8.async.mq.queue=inteligr8.acs.async
inteligr8.async.mq.clientId=acs inteligr8.async.mq.errorQueue=inteligr8.acs.asyncError
inteligr8.async.mq.pool.max=5 inteligr8.async.mq.clientId=inteligr8-async
inteligr8.async.mq.pool.max=2
inteligr8.cache.nodeTypeConstrainable.maxBeans=16 inteligr8.cache.nodeTypeConstrainable.maxBeans=16
inteligr8.cache.nodeAspectConstrainable.maxBeans=16 inteligr8.cache.nodeAspectConstrainable.maxBeans=16

View File

@@ -0,0 +1,76 @@
package com.inteligr8.alfresco.annotations;
import org.alfresco.error.AlfrescoRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.extensions.surf.util.AbstractLifecycleBean;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
@Component
public class AsynchronousTest extends AbstractLifecycleBean {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private long mainThreadId;
private Object obj;
@Override
protected void onBootstrap(ApplicationEvent event) {
this.logger.info("Running test: " + this.getClass());
this.mainThreadId = Thread.currentThread().getId();
this.logger.info("Running in thread: {}", this.mainThreadId);
this.tryThreadPool();
this.tryMq();
try {
Thread.sleep(500L);
} catch (InterruptedException ie) {
throw new AlfrescoRuntimeException("This should never happen", ie);
}
this.obj = new Object();
}
@Override
protected void onShutdown(ApplicationEvent event) {
}
@Asynchronous(durable = false)
private void tryThreadPool() {
this.logger.info("Running in another thread: {}", Thread.currentThread().getId());
Assert.isTrue(Thread.currentThread().getId() != this.mainThreadId, "This method is not executing in a separate thread");
Assert.isNull(this.obj, "The random value is expected to be 'null'");
try {
Thread.sleep(1000L);
} catch (InterruptedException ie) {
throw new AlfrescoRuntimeException("This should never happen", ie);
}
Assert.isTrue(this.obj != null, "The random value is not expected to be 'null'");
}
@Asynchronous
private void tryMq() {
this.logger.info("Running in another thread: {}", Thread.currentThread().getId());
Assert.isTrue(Thread.currentThread().getId() != this.mainThreadId, "This method is not executing in a separate thread");
Assert.isNull(this.obj, "The random value is expected to be 'null'");
try {
Thread.sleep(1000L);
} catch (InterruptedException ie) {
throw new AlfrescoRuntimeException("This should never happen", ie);
}
Assert.isTrue(this.obj != null, "The random value is not expected to be 'null'");
}
}

View File

@@ -0,0 +1,48 @@
package com.inteligr8.alfresco.annotations;
import org.apache.commons.lang3.mutable.MutableInt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.extensions.surf.util.AbstractLifecycleBean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
@Component
public class ClusterSynchronizedTest extends AbstractLifecycleBean {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
protected void onBootstrap(ApplicationEvent event) {
MutableInt threadsRun = new MutableInt();
this.threadThenLock(threadsRun);
}
@Override
protected void onShutdown(ApplicationEvent event) {
}
@Threaded(name = "cluster-sync", threads = 5, join = true)
@Transactional
@ClusterSynchronized
private void threadThenLock(MutableInt threadsRun) {
this.lock(threadsRun);
}
private void lock(MutableInt threadsRun) {
int t = threadsRun.intValue();
this.logger.debug("After start of a mutually exclusive execution block: {}", t);
try {
Thread.sleep(200L);
} catch (InterruptedException ie) {
}
Assert.isTrue(t == threadsRun.intValue(), "The threads run unexpectedly changed: " + t + " != " + threadsRun);
threadsRun.increment();
this.logger.debug("Before end of a mutually exclusive execution block: {}", t);
}
}

View File

@@ -1,7 +1,5 @@
package com.inteligr8.alfresco.annotations; package com.inteligr8.alfresco.annotations;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableBoolean;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -32,11 +30,11 @@ public class IfNotNullTest extends AbstractLifecycleBean {
protected void onShutdown(ApplicationEvent event) { protected void onShutdown(ApplicationEvent event) {
} }
private void tryNotNull(@Nonnull String str) { private void tryNotNull(@IfNotNull String str) {
executed.setTrue(); this.executed.setTrue();
} }
private void tryNull(@Nonnull String str) { private void tryNull(@IfNotNull String str) {
throw new IllegalStateException(); throw new IllegalStateException();
} }

View File

@@ -0,0 +1,74 @@
package com.inteligr8.alfresco.annotations;
import java.util.Random;
import org.apache.commons.lang3.mutable.MutableInt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.extensions.surf.util.AbstractLifecycleBean;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
@Component
public class ThreadedTest extends AbstractLifecycleBean {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Random random = new Random();
@Override
protected void onBootstrap(ApplicationEvent event) {
this.logger.info("Running test: {}", this.getClass());
Assert.isTrue(Thread.currentThread().getId() > 0L, "An unexpected non-positive thread ID: " + Thread.currentThread().getId());
this.logger.info("Main thread: {}", Thread.currentThread().getId());
MutableInt threadCount = new MutableInt();
this.try5sync(Thread.currentThread(), threadCount);
Assert.isTrue(threadCount.intValue() == 5, "An unexpected thread count record: 5 != " + threadCount);
threadCount.setValue(0);
this.try5(Thread.currentThread(), threadCount);
Assert.isTrue(threadCount.intValue() < 5, "An unexpected thread count record: 5 >= " + threadCount);
try {
Thread.sleep(500L);
} catch (InterruptedException ie) {
// suppress
}
Assert.isTrue(threadCount.intValue() == 5, "An unexpected thread count record: 5 != " + threadCount);
}
@Override
protected void onShutdown(ApplicationEvent event) {
}
@Threaded(threads = 5)
private void try5(Thread parentThread, MutableInt threadCount) {
this.logger.info("Running inside thread: {}", Thread.currentThread().getId());
Assert.isTrue(parentThread.getId() != Thread.currentThread().getId(), "An unexpected thread ID: " + Thread.currentThread().getId());
try {
Thread.sleep(this.random.nextInt(100) + 100L);
threadCount.increment();
} catch (InterruptedException ie) {
// suppress
}
}
@Threaded(threads = 5, join = true)
private void try5sync(Thread parentThread, MutableInt threadCount) {
this.logger.info("Running inside thread: {}", Thread.currentThread().getId());
Assert.isTrue(parentThread.getId() != Thread.currentThread().getId(), "An unexpected thread ID: " + Thread.currentThread().getId());
try {
Thread.sleep(this.random.nextInt(100) + 100L);
threadCount.increment();
} catch (InterruptedException ie) {
// suppress
}
}
}

View File

@@ -215,7 +215,7 @@ public class TransactionalTest extends AbstractLifecycleBean {
@TransactionalRetryable @TransactionalRetryable
private void tryRetryOnlyTransactional(String originTxId) { private void tryRetryOnlyTransactional(String originTxId) {
if (originTxId == null) { if (originTxId == null) {
Assert.isNull(AlfrescoTransactionSupport.getTransactionId(), "An unexpected transaction"); Assert.isTrue(AlfrescoTransactionSupport.getTransactionId() != null, "Expected a new transaction");
} else { } else {
Assert.isTrue(AlfrescoTransactionSupport.getTransactionId().equals(originTxId), "Expected the same transaction: " + AlfrescoTransactionSupport.getTransactionId() + " != " + originTxId); Assert.isTrue(AlfrescoTransactionSupport.getTransactionId().equals(originTxId), "Expected the same transaction: " + AlfrescoTransactionSupport.getTransactionId() + " != " + originTxId);
} }