From afcfbbc61a2b632803b066fc3890c759ad9c29df Mon Sep 17 00:00:00 2001 From: "Brian M. Long" Date: Tue, 9 May 2023 10:35:40 -0400 Subject: [PATCH] added threading/cluster sync; more tests --- rad.sh | 4 +- .../alfresco/annotations/Asynchronous.java | 2 + .../annotations/ClusterSynchronized.java | 20 ++ .../alfresco/annotations/IfNotNull.java | 15 ++ .../alfresco/annotations/Threadable.java | 17 ++ .../alfresco/annotations/Threaded.java | 24 +++ .../aspect/AbstractMethodAspect.java | 53 ++++++ ...a => AbstractMethodOrParameterAspect.java} | 2 +- .../aspect/AbstractWarnOnceService.java | 20 ++ .../annotations/aspect/AsyncAspect.java | 39 +++- .../annotations/aspect/AuthorizedAspect.java | 7 +- .../aspect/ChildIsPrimaryAspect.java | 6 +- .../aspect/ClusterSynchronizedAspect.java | 67 +++++++ .../annotations/aspect/NodeAspectAspect.java | 3 + .../annotations/aspect/NodeTypeAspect.java | 3 + .../annotations/aspect/NotNullAspect.java | 19 +- .../aspect/OperableNodeAspect.java | 5 +- .../annotations/aspect/QNameBasedAspect.java | 2 +- .../aspect/RetryingTransactionAspect.java | 44 ++--- .../annotations/aspect/ThreadedAspect.java | 178 ++++++++++++++++++ .../service/impl/AlfrescoTransaction.java | 95 ++++++++++ .../impl/AlfrescoTransactionManager.java | 109 +++++++++++ .../service/impl/MqAsyncService.java | 95 +++++++--- .../service/impl/ThreadPoolAsyncService.java | 175 +++++++++++++++++ src/main/resources/META-INF/aop.xml | 8 +- .../alfresco-global.properties | 20 +- .../annotations/AsynchronousTest.java | 76 ++++++++ .../annotations/ClusterSynchronizedTest.java | 48 +++++ .../alfresco/annotations/IfNotNullTest.java | 8 +- .../alfresco/annotations/ThreadedTest.java | 74 ++++++++ .../annotations/TransactionalTest.java | 2 +- 31 files changed, 1146 insertions(+), 94 deletions(-) create mode 100644 src/main/java/com/inteligr8/alfresco/annotations/ClusterSynchronized.java create mode 100644 src/main/java/com/inteligr8/alfresco/annotations/IfNotNull.java create mode 100644 src/main/java/com/inteligr8/alfresco/annotations/Threadable.java create mode 100644 src/main/java/com/inteligr8/alfresco/annotations/Threaded.java create mode 100644 src/main/java/com/inteligr8/alfresco/annotations/aspect/AbstractMethodAspect.java rename src/main/java/com/inteligr8/alfresco/annotations/aspect/{MethodOrParameterAspect.java => AbstractMethodOrParameterAspect.java} (94%) create mode 100644 src/main/java/com/inteligr8/alfresco/annotations/aspect/AbstractWarnOnceService.java create mode 100644 src/main/java/com/inteligr8/alfresco/annotations/aspect/ClusterSynchronizedAspect.java create mode 100644 src/main/java/com/inteligr8/alfresco/annotations/aspect/ThreadedAspect.java create mode 100644 src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransaction.java create mode 100644 src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransactionManager.java create mode 100644 src/main/java/com/inteligr8/alfresco/annotations/service/impl/ThreadPoolAsyncService.java create mode 100644 src/test/java/com/inteligr8/alfresco/annotations/AsynchronousTest.java create mode 100644 src/test/java/com/inteligr8/alfresco/annotations/ClusterSynchronizedTest.java create mode 100644 src/test/java/com/inteligr8/alfresco/annotations/ThreadedTest.java diff --git a/rad.sh b/rad.sh index 8c1e390..aa08375 100644 --- a/rad.sh +++ b/rad.sh @@ -1,7 +1,7 @@ -#!/bin/sh +#!/bin/bash discoverArtifactId() { - ARTIFACT_ID=`mvn -q -Dexpression=project.artifactId -DforceStdout help:evaluate` + ARTIFACT_ID=`mvn -q -Dexpression=project.artifactId -DforceStdout help:evaluate | sed 's/\x1B\[[0-9;]\{1,\}[A-Za-z]//g'` } rebuild() { diff --git a/src/main/java/com/inteligr8/alfresco/annotations/Asynchronous.java b/src/main/java/com/inteligr8/alfresco/annotations/Asynchronous.java index a8b5531..cd7b3c2 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/Asynchronous.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/Asynchronous.java @@ -8,5 +8,7 @@ import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface Asynchronous { + + boolean durable() default true; } diff --git a/src/main/java/com/inteligr8/alfresco/annotations/ClusterSynchronized.java b/src/main/java/com/inteligr8/alfresco/annotations/ClusterSynchronized.java new file mode 100644 index 0000000..775f864 --- /dev/null +++ b/src/main/java/com/inteligr8/alfresco/annotations/ClusterSynchronized.java @@ -0,0 +1,20 @@ +package com.inteligr8.alfresco.annotations; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface ClusterSynchronized { + + String value() default ""; + + long acquireWaitBetweenRetriesInMillis() default 100L; + + int acquireMaxRetries() default 300; + + long lockTimeoutInMillis() default 5000L; + +} diff --git a/src/main/java/com/inteligr8/alfresco/annotations/IfNotNull.java b/src/main/java/com/inteligr8/alfresco/annotations/IfNotNull.java new file mode 100644 index 0000000..e03e5ca --- /dev/null +++ b/src/main/java/com/inteligr8/alfresco/annotations/IfNotNull.java @@ -0,0 +1,15 @@ +package com.inteligr8.alfresco.annotations; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ + ElementType.METHOD, + ElementType.PARAMETER +}) +public @interface IfNotNull { + +} diff --git a/src/main/java/com/inteligr8/alfresco/annotations/Threadable.java b/src/main/java/com/inteligr8/alfresco/annotations/Threadable.java new file mode 100644 index 0000000..8d4e26a --- /dev/null +++ b/src/main/java/com/inteligr8/alfresco/annotations/Threadable.java @@ -0,0 +1,17 @@ +package com.inteligr8.alfresco.annotations; + +public interface Threadable { + + default Integer getThreads() { + return null; + } + + default Integer getConcurrency() { + return null; + } + + default Integer getThreadPriority() { + return null; + } + +} diff --git a/src/main/java/com/inteligr8/alfresco/annotations/Threaded.java b/src/main/java/com/inteligr8/alfresco/annotations/Threaded.java new file mode 100644 index 0000000..438ab56 --- /dev/null +++ b/src/main/java/com/inteligr8/alfresco/annotations/Threaded.java @@ -0,0 +1,24 @@ +package com.inteligr8.alfresco.annotations; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface Threaded { + + String name() default ""; + + int threads() default 1; + + int concurrency() default 0; + + int priority() default Thread.NORM_PRIORITY; + + boolean join() default false; + + long joinWaitMillis() default 0L; + +} diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/AbstractMethodAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/AbstractMethodAspect.java new file mode 100644 index 0000000..60fcfbb --- /dev/null +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/AbstractMethodAspect.java @@ -0,0 +1,53 @@ +package com.inteligr8.alfresco.annotations.aspect; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; + +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.reflect.MethodSignature; + +public class AbstractMethodAspect extends AbstractWarnOnceService { + + protected A getAnnotation(ProceedingJoinPoint joinPoint, Class annotationClass, boolean warnReturn, boolean warnThrows) { + Method method = this.getMethod(joinPoint, annotationClass, warnReturn, warnThrows); + return method.getAnnotation(annotationClass); + } + + protected Method getMethod(ProceedingJoinPoint joinPoint, Object messagePrefix, boolean warnReturn, boolean warnThrows) { + if (!(joinPoint.getSignature() instanceof MethodSignature)) + throw new IllegalStateException(this.createMessagePrefix(messagePrefix) + " must be on methods"); + + MethodSignature methodSig = (MethodSignature) joinPoint.getSignature(); + if (warnReturn && methodSig.getReturnType() != null && !this.isReturnTypeVoid(methodSig)) { + this.warn(joinPoint.toLongString(), + "{} has a return value or throws clause; 'null' is always returned and subthread exceptions don't propagate: {}: {}", + this.createMessagePrefix(messagePrefix), joinPoint, methodSig.getReturnType()); + } + + if (warnThrows && methodSig.getExceptionTypes() != null && methodSig.getExceptionTypes().length > 0) { + this.warn(joinPoint.toLongString(), + "{} has a return value or throws clause; 'null' is always returned and subthread exceptions don't propagate: {}: {}", + this.createMessagePrefix(messagePrefix), joinPoint, methodSig.getExceptionTypes()); + } + + return methodSig.getMethod(); + } + + protected boolean isReturnTypeVoid(MethodSignature methodSig) { + return Void.class.equals(methodSig.getReturnType()) || void.class.equals(methodSig.getReturnType()); + } + + private String createMessagePrefix(Object obj) { + if (obj instanceof Class) { + Class clazz = (Class) obj; + if (clazz.isAnnotation()) { + return "The @" + ((Class) obj).getSimpleName() + " annotated method"; + } else { + return "The " + ((Class) obj).getSimpleName() + " class method"; + } + } else { + return obj.toString(); + } + } + +} diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/MethodOrParameterAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/AbstractMethodOrParameterAspect.java similarity index 94% rename from src/main/java/com/inteligr8/alfresco/annotations/aspect/MethodOrParameterAspect.java rename to src/main/java/com/inteligr8/alfresco/annotations/aspect/AbstractMethodOrParameterAspect.java index 2124abe..47f18d2 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/MethodOrParameterAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/AbstractMethodOrParameterAspect.java @@ -8,7 +8,7 @@ import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class MethodOrParameterAspect { +public abstract class AbstractMethodOrParameterAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/AbstractWarnOnceService.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/AbstractWarnOnceService.java new file mode 100644 index 0000000..d14e6f0 --- /dev/null +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/AbstractWarnOnceService.java @@ -0,0 +1,20 @@ +package com.inteligr8.alfresco.annotations.aspect; + +import java.util.HashSet; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AbstractWarnOnceService { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private Set warned = new HashSet<>(); + + protected void warn(String key, String message, Object... arguments) { + if (this.warned.add(key)) + this.logger.warn(message, arguments); + } + +} diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/AsyncAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/AsyncAspect.java index 221f887..cd653c3 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/AsyncAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/AsyncAspect.java @@ -3,21 +3,32 @@ package com.inteligr8.alfresco.annotations.aspect; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.DeclarePrecedence; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import com.inteligr8.alfresco.annotations.Asynchronous; import com.inteligr8.alfresco.annotations.service.AsyncService; @Aspect -public class AsyncAspect { +@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.AsyncAspect, *") +//@Order(Ordered.HIGHEST_PRECEDENCE + Byte.MAX_VALUE) +//@Component +public class AsyncAspect extends AbstractMethodAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired - private AsyncService asyncService; + @Qualifier("async.mq") + private AsyncService durableAsyncService; + + @Autowired + @Qualifier("async.thread") + private AsyncService volatileAsyncService; @Pointcut("@annotation(com.inteligr8.alfresco.annotations.Asynchronous) && execution(* *(..))") public void isAsyncAnnotated() { @@ -25,16 +36,28 @@ public class AsyncAspect { @Around("isAsyncAnnotated()") public Object async(ProceedingJoinPoint joinPoint) throws Throwable { - if (this.asyncService.isCurrentThreadAsynchronous()) { - this.logger.trace("Intercepted an @Async method call while already asynchronous; executing synchronously"); + this.logger.trace("async({})", joinPoint); + + Asynchronous async = this.getAnnotation(joinPoint, Asynchronous.class, true, true); + + AsyncService asyncService = async.durable() ? this.durableAsyncService : this.volatileAsyncService; + if (!asyncService.isEnabled()) { + this.warn(joinPoint.toLongString(), + "Intercepted an @Asynchronous method call while the appropriate asynchronous service is not enabled; continuing synchronously"); + return joinPoint.proceed(); + } else if (asyncService.isCurrentThreadAsynchronous()) { + this.logger.debug("Intercepted an @Asynchronous method call while already asynchronous; continuing synchronously"); return joinPoint.proceed(); } else { - this.logger.trace("Intercepted an @Async method call; redirecting to Async service"); - this.asyncService.push(joinPoint); + this.logger.trace("Intercepted an @Asynchronous method call; redirecting to the appropriate asynchronous service"); + asyncService.push(joinPoint); MethodSignature methodSig = (MethodSignature) joinPoint.getSignature(); - if (!Void.class.equals(methodSig.getReturnType())) - this.logger.warn("An @Asynchronous method returns a value, which is not expected/allowed: {}", methodSig.getMethod()); + if (!this.isReturnTypeVoid(methodSig)) { + this.warn(joinPoint.toLongString(), + "An @Asynchronous method returns a value, which is not expected/allowed: {}", + methodSig.getMethod()); + } return null; } } diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/AuthorizedAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/AuthorizedAspect.java index 7e0f4ce..4d2ba02 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/AuthorizedAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/AuthorizedAspect.java @@ -8,19 +8,20 @@ import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.DeclarePrecedence; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; import com.inteligr8.alfresco.annotations.Authorizable; import com.inteligr8.alfresco.annotations.Authorized; import com.inteligr8.alfresco.annotations.AuthorizedAsSystem; @Aspect -@Order(Ordered.HIGHEST_PRECEDENCE + Short.MAX_VALUE) +@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.AuthorizedAspect, com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect") +//@Order(Ordered.HIGHEST_PRECEDENCE + Short.MAX_VALUE) +//@Component public class AuthorizedAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/ChildIsPrimaryAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/ChildIsPrimaryAspect.java index c46fea0..3a54f32 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/ChildIsPrimaryAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/ChildIsPrimaryAspect.java @@ -9,14 +9,12 @@ import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; import com.inteligr8.alfresco.annotations.IfChildAssociationIsPrimary; @Aspect -@Order(Ordered.HIGHEST_PRECEDENCE + Byte.MAX_VALUE) // ordering before transaction/authorized -public class ChildIsPrimaryAspect extends MethodOrParameterAspect { +//@Component +public class ChildIsPrimaryAspect extends AbstractMethodOrParameterAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/ClusterSynchronizedAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/ClusterSynchronizedAspect.java new file mode 100644 index 0000000..2d48225 --- /dev/null +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/ClusterSynchronizedAspect.java @@ -0,0 +1,67 @@ +package com.inteligr8.alfresco.annotations.aspect; + +import java.lang.reflect.Method; + +import org.alfresco.repo.lock.JobLockService; +import org.alfresco.service.namespace.QName; +import org.apache.commons.lang3.StringUtils; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.DeclarePrecedence; +import org.aspectj.lang.annotation.Pointcut; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import com.inteligr8.alfresco.annotations.ClusterSynchronized; + +@Aspect +@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect, com.inteligr8.alfresco.annotations.aspect.ClusterSynchronizedAspect") +//@Component +//@Order(Ordered.LOWEST_PRECEDENCE - Byte.MAX_VALUE) +public class ClusterSynchronizedAspect extends AbstractMethodAspect { + + private static final String NS = "http://inteligr8.com/alfresco/model"; + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private JobLockService jobLockService; + + @Pointcut("@annotation(com.inteligr8.alfresco.annotations.ClusterSynchronized) && execution(* *(..))") + public void isClusterSyncAnnotated() { + } + + @Around("isClusterSyncAnnotated()") + public Object clusterSync(ProceedingJoinPoint joinPoint) throws Throwable { + this.logger.trace("clusterSync({})", joinPoint); + + Method method = this.getMethod(joinPoint, ClusterSynchronized.class, false, false); + ClusterSynchronized clusterSync = method.getAnnotation(ClusterSynchronized.class); + + QName lockQName = this.getLockQName(clusterSync, method); + + this.logger.debug("Acquiring cluster lock: {}", lockQName); + String lockToken = this.jobLockService.getLock(lockQName, clusterSync.lockTimeoutInMillis(), + clusterSync.acquireWaitBetweenRetriesInMillis(), clusterSync.acquireMaxRetries()); + try { + this.logger.trace("Acquired cluster lock: {}", lockQName); + + return joinPoint.proceed(); + } finally { + this.logger.debug("Releasing cluster lock: {}", lockQName); + this.jobLockService.releaseLock(lockToken, lockQName); + } + } + + private QName getLockQName(ClusterSynchronized clusterSync, Method method) { + String lockName = StringUtils.trimToNull(clusterSync.value()); + if (lockName != null) { + return QName.createQNameWithValidLocalName(NS, lockName); + } else { + String methodId = method.getDeclaringClass().getSimpleName() + "_" + method.getName(); + return QName.createQNameWithValidLocalName(NS, methodId); + } + } + +} diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/NodeAspectAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/NodeAspectAspect.java index ea16e82..95edcb6 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/NodeAspectAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/NodeAspectAspect.java @@ -18,6 +18,7 @@ import org.alfresco.service.namespace.QNamePattern; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.DeclarePrecedence; import org.aspectj.lang.annotation.Pointcut; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +29,8 @@ import com.inteligr8.alfresco.annotations.NodeAspectConstrainable; import com.inteligr8.alfresco.annotations.IfNodeHasAspect; @Aspect +@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect, com.inteligr8.alfresco.annotations.aspect.NodeAspectAspect") +//@Component public class NodeAspectAspect extends QNameBasedAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/NodeTypeAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/NodeTypeAspect.java index 7bd4f3b..a89eb04 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/NodeTypeAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/NodeTypeAspect.java @@ -18,6 +18,7 @@ import org.alfresco.service.namespace.QNamePattern; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.DeclarePrecedence; import org.aspectj.lang.annotation.Pointcut; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +29,8 @@ import com.inteligr8.alfresco.annotations.IfNodeOfType; import com.inteligr8.alfresco.annotations.NodeTypeConstrainable; @Aspect +@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect, com.inteligr8.alfresco.annotations.aspect.NodeTypeAspect") +//@Component public class NodeTypeAspect extends QNameBasedAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/NotNullAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/NotNullAspect.java index f7aa81b..3a13747 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/NotNullAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/NotNullAspect.java @@ -2,8 +2,6 @@ package com.inteligr8.alfresco.annotations.aspect; import java.lang.reflect.Method; -import javax.annotation.Nonnull; - import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; @@ -11,27 +9,28 @@ import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; + +import com.inteligr8.alfresco.annotations.IfNotNull; @Aspect -@Order(Ordered.HIGHEST_PRECEDENCE + 64) +//@Order(Ordered.HIGHEST_PRECEDENCE + 64) +//@Component public class NotNullAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - @Pointcut("execution(* *(@javax.annotation.Nonnull (*), ..))") - public void isNonnullAnnotated() { + @Pointcut("execution(* *(@com.inteligr8.alfresco.annotations.IfNotNull (*), ..))") + public void isNotNullAnnotated() { } - @Around("isNonnullAnnotated()") + @Around("isNotNullAnnotated()") public Object isNotNull(ProceedingJoinPoint joinPoint) throws Throwable { MethodSignature methodSig = (MethodSignature) joinPoint.getSignature(); Method method = methodSig.getMethod(); for (int p = 0; p < method.getParameterCount(); p++) { - if (joinPoint.getArgs()[p] == null && method.getParameters()[p].isAnnotationPresent(Nonnull.class)) { - this.logger.debug("A @Nonnull parameter is `null`; skipping method: {}", method); + if (joinPoint.getArgs()[p] == null && method.getParameters()[p].isAnnotationPresent(IfNotNull.class)) { + this.logger.debug("A @IfNotNull parameter is `null`; skipping method: {}", method); return null; } } diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/OperableNodeAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/OperableNodeAspect.java index ad60d2e..c477621 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/OperableNodeAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/OperableNodeAspect.java @@ -14,6 +14,7 @@ import org.alfresco.service.cmr.repository.NodeService; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.DeclarePrecedence; import org.aspectj.lang.annotation.Pointcut; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +23,9 @@ import org.springframework.beans.factory.annotation.Autowired; import com.inteligr8.alfresco.annotations.IfNodeExists; @Aspect -public class OperableNodeAspect extends MethodOrParameterAspect { +@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect, com.inteligr8.alfresco.annotations.aspect.OperableNodeAspect") +//@Component +public class OperableNodeAspect extends AbstractMethodOrParameterAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/QNameBasedAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/QNameBasedAspect.java index ef437d7..2a944e7 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/QNameBasedAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/QNameBasedAspect.java @@ -17,7 +17,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -public abstract class QNameBasedAspect extends MethodOrParameterAspect { +public abstract class QNameBasedAspect extends AbstractMethodOrParameterAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/RetryingTransactionAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/RetryingTransactionAspect.java index b8bc15b..d5a6233 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/RetryingTransactionAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/RetryingTransactionAspect.java @@ -1,38 +1,34 @@ package com.inteligr8.alfresco.annotations.aspect; import java.lang.reflect.Method; -import java.util.HashSet; -import java.util.Set; import org.alfresco.repo.transaction.AlfrescoTransactionSupport; +import org.alfresco.repo.transaction.AlfrescoTransactionSupport.TxnReadState; import org.alfresco.repo.transaction.RetryingTransactionHelper; import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.transaction.TransactionService; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; -import org.aspectj.lang.annotation.Before; +import org.aspectj.lang.annotation.DeclarePrecedence; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; import org.springframework.transaction.IllegalTransactionStateException; import org.springframework.transaction.annotation.Transactional; import com.inteligr8.alfresco.annotations.TransactionalRetryable; @Aspect -@Order(Ordered.LOWEST_PRECEDENCE - Short.MAX_VALUE) +@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.AuthorizedAspect, com.inteligr8.alfresco.annotations.aspect.RetryingTransactionAspect") //@Component +//@Order(Ordered.LOWEST_PRECEDENCE - Short.MAX_VALUE) public class RetryingTransactionAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Set warned = new HashSet<>(); - @Autowired private TransactionService txService; @@ -40,33 +36,29 @@ public class RetryingTransactionAspect { public void isTransactionalAnnotated() { } - @Pointcut("@annotation(org.springframework.transaction.annotation.TransactionalRetryable) && execution(* *(..))") + @Pointcut("@annotation(com.inteligr8.alfresco.annotations.TransactionalRetryable) && execution(* *(..))") public void isTransactionalRetryableAnnotated() { } - @Around("isTransactionalAnnotated()") + @Around("isTransactionalAnnotated() || isTransactionalRetryableAnnotated()") public Object retryingTransactional(ProceedingJoinPoint joinPoint) throws Throwable { this.logger.trace("retryingTransactional({})", joinPoint); Method method = this.getMethod(joinPoint); Transactional txl = method.getAnnotation(Transactional.class); + TransactionalRetryable txtry = method.getAnnotation(TransactionalRetryable.class); if (this.doCreateNewTxContext(txl) || this.isReadStateChange(txl)) { - TransactionalRetryable txtry = method.getAnnotation(TransactionalRetryable.class); - this.logger.debug("Changing TX context: {} => [ro: {}, new: {}]", AlfrescoTransactionSupport.getTransactionReadState(), txl.readOnly(), txl.propagation()); return this.execute(joinPoint, txl, txtry); + } else if (this.doCreateNewTxRetryContext(txtry)) { + this.logger.debug("Changing TX context: retries: {}", txtry.maxRetries()); + return this.execute(joinPoint, null, txtry); } else { return joinPoint.proceed(); } } - @Before("isTransactionalRetryableAnnotated() && !isTransactionalAnnotated()") - public void warn(ProceedingJoinPoint joinPoint) throws Throwable { - if (this.warned.add(joinPoint.toLongString())) - this.logger.warn("A @TransactionalRetryable annotation was found without a @Transactional annotation; it will be ignored: {}", joinPoint); - } - private Method getMethod(ProceedingJoinPoint joinPoint) { if (!(joinPoint.getSignature() instanceof MethodSignature)) throw new IllegalStateException("The @Transactional or @TransactionalRetryable annotations must be on methods"); @@ -76,6 +68,9 @@ public class RetryingTransactionAspect { } private boolean isReadStateChange(Transactional txl) { + if (txl == null) + return false; + switch (txl.propagation()) { case NEVER: case NOT_SUPPORTED: @@ -97,8 +92,14 @@ public class RetryingTransactionAspect { } } + private boolean doCreateNewTxRetryContext(TransactionalRetryable txtry) { + return txtry != null; + } + private boolean doCreateNewTxContext(Transactional txl) { - switch (txl.propagation()) { + if (txl == null) { + return false; + } else switch (txl.propagation()) { case NEVER: switch (AlfrescoTransactionSupport.getTransactionReadState()) { case TXN_NONE: @@ -164,12 +165,13 @@ public class RetryingTransactionAspect { if (txtry.incRetryWaitInMillis() > 0) rthelper.setRetryWaitIncrementMs(txtry.incRetryWaitInMillis()); } - if (txl.timeout() > 0) + if (txl != null && txl.timeout() > 0) rthelper.setMaxExecutionMs(txl.timeout() * 1000L); try { this.logger.trace("source tx: {}", AlfrescoTransactionSupport.getTransactionId()); - return rthelper.doInTransaction(rtcallback, txl.readOnly(), true); + boolean readonly = txl != null && txl.readOnly() || txl == null && AlfrescoTransactionSupport.getTransactionReadState() == TxnReadState.TXN_READ_ONLY; + return rthelper.doInTransaction(rtcallback, readonly, txl != null); } catch (RuntimeException re) { // attempt to unwrap the exception if (re.getMessage() == null) { diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/ThreadedAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/ThreadedAspect.java new file mode 100644 index 0000000..2521f02 --- /dev/null +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/ThreadedAspect.java @@ -0,0 +1,178 @@ +package com.inteligr8.alfresco.annotations.aspect; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.DeclarePrecedence; +import org.aspectj.lang.annotation.Pointcut; +import org.aspectj.lang.reflect.MethodSignature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.inteligr8.alfresco.annotations.Threadable; +import com.inteligr8.alfresco.annotations.Threaded; + +@Aspect +@DeclarePrecedence("com.inteligr8.alfresco.annotations.aspect.AsyncAspect, com.inteligr8.alfresco.annotations.aspect.ThreadedAspect, *") +//@Order(Ordered.HIGHEST_PRECEDENCE + Byte.MAX_VALUE) +//@Component +public class ThreadedAspect extends AbstractMethodAspect { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private ThreadLocal> nested = ThreadLocal.withInitial(new Supplier>() { + public Set get() { + return new HashSet<>(); + } + }); + + @Pointcut("@annotation(com.inteligr8.alfresco.annotations.Threaded) && execution(* *(..))") + public void isThreadedAnnotated() { + } + + @Around("isThreadedAnnotated()") + public Object threaded(ProceedingJoinPoint joinPoint) throws Throwable { + // AspectJ does not recursively match annotations if the same thread is calling joinPoint.proceed() + // but when a different thread calls it, it doesn't know it is already processing the annotation + // so we are going to use a ThreadLocal to prevent recursion across threads + if (this.nested.get().contains(joinPoint.getSignature().toLongString())) + return joinPoint.proceed(); + + this.logger.trace("threaded({})", joinPoint); + + Threaded threaded = this.getAnnotation(joinPoint, Threaded.class, true, true); + MergedThreadConfiguration threadConfig = new MergedThreadConfiguration(joinPoint, threaded); + + ThreadFactoryBuilder tfbuilder = new ThreadFactoryBuilder() + .setPriority(threadConfig.getThreadPriority()) + .setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + logger.error("The thread '" + t.getName() + "' had an exception", e); + } + }); + if (threaded.name().length() > 0) + tfbuilder.setNameFormat(threaded.name() + "-%d"); + + Integer concurrency = threadConfig.getConcurrency(); + if (concurrency == null) + concurrency = threadConfig.getThreads(); + BlockingQueue threadQueue = new ArrayBlockingQueue<>(threadConfig.getThreads().intValue()); + + ThreadPoolExecutor threadExecutor = new ThreadPoolExecutor(concurrency.intValue(), concurrency.intValue(), + 1L, TimeUnit.SECONDS, + threadQueue, + tfbuilder.build()); + + Callable callable = this.createCallable(joinPoint); + + this.logger.debug("Starting {} threads", threadConfig.getThreads()); + for (int t = 0; t < threadConfig.getThreads().intValue(); t++) + threadExecutor.submit(callable); + + threadExecutor.shutdown(); + + if (threaded.join()) { + long waitMillis = threaded.joinWaitMillis() == 0L ? 300000L : threaded.joinWaitMillis(); + do { + this.logger.debug("Blocking this thread until subthreads finish: {}", Thread.currentThread().getId()); + if (threadExecutor.awaitTermination(waitMillis, TimeUnit.MILLISECONDS)) { + this.logger.trace("Subthreads finished; unblocking this thread: {}", Thread.currentThread().getId()); + break; + } + } while (threaded.joinWaitMillis() == 0L); + + this.logger.debug("Subthreads running: {}; unblocking this thread: {}", threadExecutor.getActiveCount(), Thread.currentThread().getId()); + } + + return null; + } + + private Callable createCallable(final ProceedingJoinPoint joinPoint) throws Throwable { + return new Callable() { + @Override + public Object call() throws Exception { + logger.debug("entering thread: {}", Thread.currentThread().getId()); + + // AspectJ does not recursively match annotations if the same thread is calling joinPoint.proceed() + // but when a different thread calls it, it doesn't know it is already processing the annotation + // so we are going to use a ThreadLocal to prevent recursion across threads + nested.get().add(joinPoint.getSignature().toLongString()); + + // we cannot use joinPoint.proceed() as it will only execute in 1 thread + // we need to recreate the method call, fresh, using reflection + MethodSignature methodSig = (MethodSignature) joinPoint.getSignature(); + Method method = methodSig.getMethod(); + + try { + method.setAccessible(true); + return method.invoke(joinPoint.getThis(), joinPoint.getArgs()); + } catch (Exception | Error e) { + logger.error("An unexpected issue occurred in thread #" + Thread.currentThread().getId(), e); + throw e; + } catch (Throwable t) { + logger.error("An unexpected issue occurred in thread #" + Thread.currentThread().getId(), t); + throw new RuntimeException("This should never happen", t); + } finally { + logger.trace("leaving thread: {}", Thread.currentThread().getId()); + } + } + }; + } + + + + private class MergedThreadConfiguration implements Threadable { + + private final Threaded threaded; + private final Threadable threadable; + + public MergedThreadConfiguration(ProceedingJoinPoint joinPoint, Threaded threaded) { + this.threaded = threaded; + this.threadable = (joinPoint.getThis() instanceof Threadable) ? (Threadable) joinPoint.getThis() : null; + } + + @Override + public Integer getThreads() { + if (this.threadable != null && this.threadable.getThreads() != null) { + return this.threadable.getThreads(); + } else { + return this.threaded.threads(); + } + } + + @Override + public Integer getConcurrency() { + if (this.threadable != null && this.threadable.getConcurrency() != null) { + return this.threadable.getConcurrency(); + } else if (this.threaded.concurrency() <= 0) { + return null; + } else { + return this.threaded.concurrency(); + } + } + + @Override + public Integer getThreadPriority() { + if (this.threadable != null && this.threadable.getThreadPriority() != null) { + return this.threadable.getThreadPriority(); + } else { + return this.threaded.priority(); + } + } + + } + +} diff --git a/src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransaction.java b/src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransaction.java new file mode 100644 index 0000000..4e9f9c2 --- /dev/null +++ b/src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransaction.java @@ -0,0 +1,95 @@ +package com.inteligr8.alfresco.annotations.service.impl; + +import javax.transaction.HeuristicMixedException; +import javax.transaction.HeuristicRollbackException; +import javax.transaction.NotSupportedException; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.Synchronization; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.UserTransaction; +import javax.transaction.xa.XAResource; + +import org.alfresco.repo.transaction.AlfrescoTransactionSupport; +import org.alfresco.repo.transaction.TransactionListener; + +public class AlfrescoTransaction implements UserTransaction, Transaction { + + private UserTransaction userTx; + + public AlfrescoTransaction(UserTransaction userTx) { + this.userTx = userTx; + } + + @Override + public void begin() throws NotSupportedException, SystemException { + this.userTx.begin(); + } + + @Override + public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, + SecurityException, IllegalStateException, SystemException { + this.userTx.commit(); + } + + @Override + public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException { + return false; + } + + @Override + public boolean enlistResource(XAResource xaRes) throws RollbackException, IllegalStateException, SystemException { + return false; + } + + @Override + public int getStatus() throws SystemException { + return this.userTx.getStatus(); + } + + @Override + public void registerSynchronization(Synchronization sync) + throws RollbackException, IllegalStateException, SystemException { + AlfrescoTransactionSupport.bindListener(new TransactionListener() { + @Override + public void flush() { + } + + @Override + public void beforeCompletion() { + sync.beforeCompletion(); + } + + @Override + public void beforeCommit(boolean readOnly) { + } + + @Override + public void afterRollback() { + sync.afterCompletion(Status.STATUS_ROLLEDBACK); + } + + @Override + public void afterCommit() { + sync.afterCompletion(Status.STATUS_COMMITTED); + } + }); + } + + @Override + public void rollback() throws IllegalStateException, SecurityException, SystemException { + this.userTx.rollback(); + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException { + this.userTx.setRollbackOnly(); + } + + @Override + public void setTransactionTimeout(int seconds) throws SystemException { + this.userTx.setTransactionTimeout(seconds); + } + +} diff --git a/src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransactionManager.java b/src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransactionManager.java new file mode 100644 index 0000000..f0f777f --- /dev/null +++ b/src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransactionManager.java @@ -0,0 +1,109 @@ +package com.inteligr8.alfresco.annotations.service.impl; + +import java.util.function.Supplier; + +import javax.transaction.HeuristicMixedException; +import javax.transaction.HeuristicRollbackException; +import javax.transaction.InvalidTransactionException; +import javax.transaction.NotSupportedException; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.UserTransaction; + +import org.alfresco.service.transaction.TransactionService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class AlfrescoTransactionManager implements TransactionManager { + + @Autowired + private TransactionService txService; + + private ThreadLocal tx = ThreadLocal.withInitial(new Supplier() { + @Override + public AlfrescoTransaction get() { + return null; + } + }); + + @Override + public void begin() throws NotSupportedException, SystemException { + UserTransaction userTx = this.txService.getNonPropagatingUserTransaction(); + AlfrescoTransaction tx = new AlfrescoTransaction(userTx); + tx.begin(); + this.tx.set(tx); + } + + @Override + public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, + SecurityException, IllegalStateException, SystemException { + AlfrescoTransaction tx = this.tx.get(); + if (tx == null) + throw new IllegalStateException(); + + tx.commit(); + } + + @Override + public int getStatus() throws SystemException { + AlfrescoTransaction tx = this.tx.get(); + if (tx == null) + return Status.STATUS_NO_TRANSACTION; + + return tx.getStatus(); + } + + @Override + public Transaction getTransaction() throws SystemException { + return this.tx.get(); + } + + @Override + public void resume(Transaction tx) throws InvalidTransactionException, IllegalStateException, SystemException { + if (!(tx instanceof AlfrescoTransaction)) + throw new InvalidTransactionException("An AlfrescoTransaction is expected; received: " + tx.getClass()); + if (this.tx.get() != null) + throw new IllegalStateException(); + + this.tx.set((AlfrescoTransaction) tx); + } + + @Override + public void rollback() throws IllegalStateException, SecurityException, SystemException { + AlfrescoTransaction tx = this.tx.get(); + if (tx == null) + throw new IllegalStateException(); + + tx.rollback(); + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException { + AlfrescoTransaction tx = this.tx.get(); + if (tx == null) + throw new IllegalStateException(); + + tx.setRollbackOnly(); + } + + @Override + public void setTransactionTimeout(int seconds) throws SystemException { + AlfrescoTransaction tx = this.tx.get(); + if (tx != null) + tx.setTransactionTimeout(seconds); + } + + @Override + public Transaction suspend() throws SystemException { + try { + return this.tx.get(); + } finally { + this.tx.set(null); + } + } + +} diff --git a/src/main/java/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java b/src/main/java/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java index d576374..9bf5c21 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java @@ -7,6 +7,8 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Parameter; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -33,7 +35,6 @@ import javax.transaction.TransactionManager; import org.alfresco.model.ContentModel; import org.alfresco.repo.cache.SimpleCache; import org.alfresco.repo.dictionary.M2Model; -import org.alfresco.repo.search.transaction.SimpleTransactionManager; import org.alfresco.repo.version.common.VersionImpl; import org.alfresco.service.cmr.action.Action; import org.alfresco.service.cmr.action.ActionService; @@ -54,6 +55,8 @@ import org.aspectj.lang.reflect.MethodSignature; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; import org.quartz.impl.JobDetailImpl; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; @@ -68,6 +71,8 @@ import org.springframework.transaction.annotation.Transactional; import com.fasterxml.jackson.databind.ObjectMapper; import com.inteligr8.alfresco.annotations.AuthorizedAsSystem; +import com.inteligr8.alfresco.annotations.Threadable; +import com.inteligr8.alfresco.annotations.Threaded; import com.inteligr8.alfresco.annotations.TransactionalRetryable; import com.inteligr8.alfresco.annotations.job.AsyncJob; import com.inteligr8.alfresco.annotations.service.AsyncProcessException; @@ -78,26 +83,28 @@ import com.inteligr8.alfresco.annotations.service.AsyncService; * * @author brian@inteligr8.com */ -@Component -public class MqAsyncService extends AbstractLifecycleBean implements AsyncService { +@Component("async.mq") +public class MqAsyncService extends AbstractLifecycleBean implements AsyncService, Threadable { private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final JobKey jobKey = new JobKey("mq-async", "inteligr8-annotations"); private final Pattern typePattern = Pattern.compile("v([0-9]+):([^:#]+)#(.+)"); private final ObjectMapper om = new ObjectMapper(); @Value("${inteligr8.async.mq.enabled}") protected boolean enabled; - @Value("${inteligr8.async.mq.worker}") - protected boolean workerEnabled; + @Value("${inteligr8.async.mq.workerThreads}") + protected int workerThreads; - @Value("${inteligr8.async.mq.url:#{null}}") + @Value("${inteligr8.async.mq.url}") + //@Value("${messaging.broker.url}") protected String url; - @Value("${inteligr8.async.mq.username:#{null}}") + @Value("${inteligr8.async.mq.username}") protected String username; - @Value("${inteligr8.async.mq.password:#{null}}") + @Value("${inteligr8.async.mq.password}") protected String password; @Value("${inteligr8.async.mq.queue}") @@ -130,6 +137,11 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic @Autowired protected TransactionService txService; + @Autowired + protected TransactionManager txManager; + + private String hostname; + private XaPooledConnectionFactory factory; private SimpleCache, String>, Method> methodCache; @@ -146,38 +158,52 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic if (!this.enabled) return; + try { + this.hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException uhe) { + this.hostname = "unknown"; + } + ActiveMQXAConnectionFactory factory = new ActiveMQXAConnectionFactory(this.url); XaPooledConnectionFactory pool = new XaPooledConnectionFactory(); pool.setConnectionFactory(factory); pool.setMaxConnections(this.maxConnections); - pool.setTransactionManager(SimpleTransactionManager.getInstance()); + pool.setTransactionManager(this.txManager); pool.start(); this.factory = pool; - if (!this.workerEnabled) + if (this.workerThreads <= 0) return; - JobKey jobKey = new JobKey("behaviour-async", "inteligr8-annotations"); - JobDetailImpl jobDetail = new JobDetailImpl(); - jobDetail.setKey(jobKey); + jobDetail.setKey(this.jobKey); jobDetail.setRequestsRecovery(true); jobDetail.setJobClass(AsyncJob.class); jobDetail.getJobDataMap().put("asyncService", this); + Trigger trigger = TriggerBuilder.newTrigger() + .startNow() + .build(); + try { - Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); - scheduler.addJob(jobDetail, false); - scheduler.triggerJob(jobKey); + StdSchedulerFactory.getDefaultScheduler() + .scheduleJob(jobDetail, trigger); } catch (SchedulerException se) { - this.logger.error("The behavior policy async service failed to start; no asynchronous policies will be processed!", se); + this.logger.error("The MQ async service job failed to start; no asynchronous executions will be processed!", se); } } @Override protected void onShutdown(ApplicationEvent event) { + try { + Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); + scheduler.deleteJob(this.jobKey); + } catch (SchedulerException se) { + this.logger.warn("The MQ async service job failed to stop", se); + } + if (this.factory != null) this.factory.stop(); } @@ -187,12 +213,18 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic return enabled; } + @Override + public Integer getThreads() { + return this.workerThreads; + } + @Override public boolean isCurrentThreadAsynchronous() { return this.isAsync.get(); } @Override + @Transactional public void poll() throws AsyncProcessException { this.logger.trace("poll()"); this.isAsync.set(true); @@ -200,7 +232,7 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic try { Connection mqcon = this.factory.createConnection(this.username, this.password); try { - mqcon.setClientID(this.clientId); + mqcon.setClientID(this.clientId + "-service-" + this.hostname); Session mqsession = mqcon.createSession(true, Session.CLIENT_ACKNOWLEDGE); try { @@ -246,11 +278,15 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic this.logger.debug("Polling ongoing messages ..."); Queue mqqueue = mqsession.createQueue(this.queueName); - + this.pollMainThreaded(mqsession, mqqueue); + } + + @Threaded(name = "mq-poll", join = true) + private void pollMainThreaded(Session mqsession, Queue mqqueue) throws JMSException { MessageConsumer consumer = mqsession.createConsumer(mqqueue); try { while (!Thread.currentThread().isInterrupted()) { - this.pollTx(mqsession, consumer, null); + pollTx(mqsession, consumer, null); } } finally { consumer.close(); @@ -320,7 +356,7 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic method.invoke(bean, args); } catch (ClassNotFoundException cnfe) { - this.logger.error("A policy bean could not be found; will try on next restart"); + this.logger.error("A bean could not be found; will try on next restart"); this.logger.error("The bean '{}' could not be found: {}", matcher.group(2), cnfe.getMessage()); if (isErrorQueue) return false; @@ -330,19 +366,19 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic // return to queue and retry indefinitely return false; } catch (NoSuchMethodException nsme) { - this.logger.error("A policy enumeration argument could not be constructed; will try on next restart"); + this.logger.error("A bean enum argument could not be constructed; will try on next restart"); this.logger.error("An argument could not be The bean '{}' could not be found: {}", matcher.group(2), nsme.getMessage()); if (isErrorQueue) return false; this.moveToErrorQueue(mqsession, mqmsg); } catch (IllegalAccessException iae) { - this.logger.error("A policy method was not accessible (public); will try on next restart"); + this.logger.error("A bean method was not accessible (public); will try on next restart"); this.logger.warn("The bean '{}' method '{}' is not accessible: {}", matcher.group(2), matcher.group(3), iae.getMessage()); if (isErrorQueue) return false; this.moveToErrorQueue(mqsession, mqmsg); } catch (InstantiationException | InvocationTargetException ie) { - this.logger.error("A policy method execution failed; will try on next restart"); + this.logger.error("A bean method execution failed; will try on next restart"); this.logger.warn("The bean '{}' method '{}' execution failed: {}", matcher.group(2), matcher.group(3), ie.getMessage()); if (isErrorQueue) return false; @@ -390,26 +426,26 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic if (!(joinPoint.getSignature() instanceof MethodSignature)) throw new IllegalStateException("The join point must be on methods and methods have signatures"); - Class beanType = joinPoint.getThis().getClass(); - this.logger.debug("Queuing for bean: {}", beanType); + Object bean = joinPoint.getThis(); + this.logger.debug("Queuing for bean: {}", bean.getClass()); MethodSignature methodSig = (MethodSignature) joinPoint.getSignature(); Method method = methodSig.getMethod(); this.logger.debug("Queuing for method: {}", method); - this.push(beanType, method.getName(), Arrays.asList(joinPoint.getArgs())); + this.push(bean, method.getName(), Arrays.asList(joinPoint.getArgs())); } @Transactional public void push(Object callbackBean, String callbackMethod, List args) throws AsyncProcessException { - this.logger.trace("push({}, {}, {})", callbackBean, callbackMethod, args); + this.logger.trace("push({}, {}, {})", callbackBean.getClass(), callbackMethod, args); UUID msgId = UUID.randomUUID(); try { Connection mqcon = this.factory.createConnection(this.username, this.password); try { - mqcon.setClientID(this.clientId); + mqcon.setClientID(this.clientId + "-client-" + this.hostname); Session mqsession = mqcon.createSession(true, Session.AUTO_ACKNOWLEDGE); try { @@ -433,7 +469,6 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic } this.logger.debug("Sent node as message: {} => {}", callbackMethod, msgId); - mqsession.commit(); } finally { mqsession.close(); } diff --git a/src/main/java/com/inteligr8/alfresco/annotations/service/impl/ThreadPoolAsyncService.java b/src/main/java/com/inteligr8/alfresco/annotations/service/impl/ThreadPoolAsyncService.java new file mode 100644 index 0000000..10941a7 --- /dev/null +++ b/src/main/java/com/inteligr8/alfresco/annotations/service/impl/ThreadPoolAsyncService.java @@ -0,0 +1,175 @@ +package com.inteligr8.alfresco.annotations.service.impl; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.alfresco.repo.security.authentication.AuthenticationUtil; +import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.reflect.MethodSignature; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; +import org.quartz.impl.JobDetailImpl; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationEvent; +import org.springframework.extensions.surf.util.AbstractLifecycleBean; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import com.inteligr8.alfresco.annotations.job.AsyncJob; +import com.inteligr8.alfresco.annotations.service.AsyncProcessException; +import com.inteligr8.alfresco.annotations.service.AsyncService; + +/** + * This class provides a non-persistent alternative to MQ for asynchronous execution. + * + * @author brian@inteligr8.com + */ +@Component("async.thread") +public class ThreadPoolAsyncService extends AbstractLifecycleBean implements AsyncService { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final JobKey jobKey = new JobKey("thread-async", "inteligr8-annotations"); + + @Value("${inteligr8.async.workableThreads}") + protected int queueSize; + + @Value("${inteligr8.async.workerThreads}") + protected byte poolSize; + + private LinkedBlockingQueue queue; + private ThreadPoolExecutor pool; + + private ThreadLocal isAsync = ThreadLocal.withInitial(new Supplier() { + @Override + public Boolean get() { + return false; + } + }); + + @Override + protected void onBootstrap(ApplicationEvent event) { + this.queue = new LinkedBlockingQueue<>(this.queueSize); + + JobDetailImpl jobDetail = new JobDetailImpl(); + jobDetail.setKey(this.jobKey); + jobDetail.setRequestsRecovery(true); + jobDetail.setJobClass(AsyncJob.class); + jobDetail.getJobDataMap().put("asyncService", this); + + Trigger trigger = TriggerBuilder.newTrigger() + .startNow() + .build(); + + try { + StdSchedulerFactory.getDefaultScheduler() + .scheduleJob(jobDetail, trigger); + } catch (SchedulerException se) { + this.logger.error("The async service job failed to start; no asynchronous executions will be processed!", se); + } + } + + @Override + protected void onShutdown(ApplicationEvent event) { + try { + Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); + scheduler.deleteJob(this.jobKey); + } catch (SchedulerException se) { + this.logger.warn("The MQ async service job failed to stop", se); + } + } + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public boolean isCurrentThreadAsynchronous() { + return this.isAsync.get(); + } + + @Override + public void poll() throws AsyncProcessException { + this.logger.trace("poll()"); + this.isAsync.set(true); + + this.pool = new ThreadPoolExecutor(1, this.poolSize, 5L, TimeUnit.SECONDS, this.queue); + + try { + while (!this.pool.isTerminated()) { + this.logger.debug("Perpetually waiting for thread pool to terminate ..."); + this.pool.awaitTermination(300L, TimeUnit.SECONDS); + } + + this.logger.info("The Async service thread pool terminated"); + } catch (InterruptedException ie) { + this.logger.info("The Async service thread pool was interrupted"); + } + } + + @Transactional + protected void executeWork(Object callbackBean, Method callbackMethod, List args) { + try { + callbackMethod.invoke(callbackBean, args.toArray()); + } catch (IllegalAccessException iae) { + this.logger.error("A bean method was not accessible (public)"); + this.logger.warn("The bean '{}' method '{}' is not accessible: {}", callbackBean.getClass(), callbackMethod, iae.getMessage()); + } catch (InvocationTargetException ite) { + this.logger.error("A bean method execution failed"); + this.logger.warn("The bean '{}' method '{}' execution failed: {}", callbackBean.getClass(), callbackMethod, ite.getMessage()); + } + } + + public void push(ProceedingJoinPoint joinPoint) throws AsyncProcessException { + this.logger.trace("push({})", joinPoint); + + if (!(joinPoint.getSignature() instanceof MethodSignature)) + throw new IllegalStateException("The join point must be on methods and methods have signatures"); + + Object bean = joinPoint.getThis(); + this.logger.debug("Queuing for bean: {}", bean.getClass()); + + MethodSignature methodSig = (MethodSignature) joinPoint.getSignature(); + Method method = methodSig.getMethod(); + this.logger.debug("Queuing for method: {}", method); + + this.push(bean, method, Arrays.asList(joinPoint.getArgs())); + } + + public void push(Object callbackBean, Method callbackMethod, List args) throws AsyncProcessException { + this.logger.trace("push({}, {}, {})", callbackBean, callbackMethod, args); + + RunAsWork work = new RunAsWork() { + @Override + public Void doWork() { + executeWork(callbackBean, callbackMethod, args); + return null; + } + }; + + final String runAs = AuthenticationUtil.getRunAsUser(); + Runnable runnable = new Runnable() { + @Override + public void run() { + // run the thread with the same authentication context as the initiating user + AuthenticationUtil.runAs(work, runAs); + } + }; + + this.queue.offer(runnable); + } + +} diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index f0654b8..4b13f64 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -1,14 +1,18 @@ - + + + - + + + \ No newline at end of file diff --git a/src/main/resources/alfresco/module/com.inteligr8.alfresco.annotations-platform-module/alfresco-global.properties b/src/main/resources/alfresco/module/com.inteligr8.alfresco.annotations-platform-module/alfresco-global.properties index 2dcd89c..7bb3cb2 100644 --- a/src/main/resources/alfresco/module/com.inteligr8.alfresco.annotations-platform-module/alfresco-global.properties +++ b/src/main/resources/alfresco/module/com.inteligr8.alfresco.annotations-platform-module/alfresco-global.properties @@ -1,14 +1,24 @@ inteligr8.annotations.aspectj.scanPackages=com.inteligr8.alfresco.annotations -inteligr8.async.mq.enabled=false -inteligr8.async.mq.worker=true +# should @Asynchronous be joined with MQ (or local thread pool) +inteligr8.async.mq.enabled=true + +# threads to execute @Asynchronous methods (when MQ disabled above or by method) +inteligr8.async.workableThreads=100 +inteligr8.async.workerThreads=5 + +# threads to execute @Asynchronous methods (when MQ enabled above; 0 disables execution) +inteligr8.async.mq.workerThreads=1 + +# MQ settings inteligr8.async.mq.url=${messaging.broker.url} inteligr8.async.mq.username=${messaging.broker.username} inteligr8.async.mq.password=${messaging.broker.password} -inteligr8.async.mq.queuePrefix=inteligr8.acs. -inteligr8.async.mq.clientId=acs -inteligr8.async.mq.pool.max=5 +inteligr8.async.mq.queue=inteligr8.acs.async +inteligr8.async.mq.errorQueue=inteligr8.acs.asyncError +inteligr8.async.mq.clientId=inteligr8-async +inteligr8.async.mq.pool.max=2 inteligr8.cache.nodeTypeConstrainable.maxBeans=16 inteligr8.cache.nodeAspectConstrainable.maxBeans=16 diff --git a/src/test/java/com/inteligr8/alfresco/annotations/AsynchronousTest.java b/src/test/java/com/inteligr8/alfresco/annotations/AsynchronousTest.java new file mode 100644 index 0000000..d2a3f05 --- /dev/null +++ b/src/test/java/com/inteligr8/alfresco/annotations/AsynchronousTest.java @@ -0,0 +1,76 @@ +package com.inteligr8.alfresco.annotations; + +import org.alfresco.error.AlfrescoRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEvent; +import org.springframework.extensions.surf.util.AbstractLifecycleBean; +import org.springframework.stereotype.Component; +import org.springframework.util.Assert; + +@Component +public class AsynchronousTest extends AbstractLifecycleBean { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private long mainThreadId; + private Object obj; + + @Override + protected void onBootstrap(ApplicationEvent event) { + this.logger.info("Running test: " + this.getClass()); + + this.mainThreadId = Thread.currentThread().getId(); + this.logger.info("Running in thread: {}", this.mainThreadId); + + this.tryThreadPool(); + this.tryMq(); + + try { + Thread.sleep(500L); + } catch (InterruptedException ie) { + throw new AlfrescoRuntimeException("This should never happen", ie); + } + + this.obj = new Object(); + } + + @Override + protected void onShutdown(ApplicationEvent event) { + } + + @Asynchronous(durable = false) + private void tryThreadPool() { + this.logger.info("Running in another thread: {}", Thread.currentThread().getId()); + + Assert.isTrue(Thread.currentThread().getId() != this.mainThreadId, "This method is not executing in a separate thread"); + + Assert.isNull(this.obj, "The random value is expected to be 'null'"); + + try { + Thread.sleep(1000L); + } catch (InterruptedException ie) { + throw new AlfrescoRuntimeException("This should never happen", ie); + } + + Assert.isTrue(this.obj != null, "The random value is not expected to be 'null'"); + } + + @Asynchronous + private void tryMq() { + this.logger.info("Running in another thread: {}", Thread.currentThread().getId()); + + Assert.isTrue(Thread.currentThread().getId() != this.mainThreadId, "This method is not executing in a separate thread"); + + Assert.isNull(this.obj, "The random value is expected to be 'null'"); + + try { + Thread.sleep(1000L); + } catch (InterruptedException ie) { + throw new AlfrescoRuntimeException("This should never happen", ie); + } + + Assert.isTrue(this.obj != null, "The random value is not expected to be 'null'"); + } + +} diff --git a/src/test/java/com/inteligr8/alfresco/annotations/ClusterSynchronizedTest.java b/src/test/java/com/inteligr8/alfresco/annotations/ClusterSynchronizedTest.java new file mode 100644 index 0000000..471bc19 --- /dev/null +++ b/src/test/java/com/inteligr8/alfresco/annotations/ClusterSynchronizedTest.java @@ -0,0 +1,48 @@ +package com.inteligr8.alfresco.annotations; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEvent; +import org.springframework.extensions.surf.util.AbstractLifecycleBean; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.Assert; + +@Component +public class ClusterSynchronizedTest extends AbstractLifecycleBean { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Override + protected void onBootstrap(ApplicationEvent event) { + MutableInt threadsRun = new MutableInt(); + this.threadThenLock(threadsRun); + } + + @Override + protected void onShutdown(ApplicationEvent event) { + } + + @Threaded(name = "cluster-sync", threads = 5, join = true) + @Transactional + @ClusterSynchronized + private void threadThenLock(MutableInt threadsRun) { + this.lock(threadsRun); + } + + private void lock(MutableInt threadsRun) { + int t = threadsRun.intValue(); + this.logger.debug("After start of a mutually exclusive execution block: {}", t); + + try { + Thread.sleep(200L); + } catch (InterruptedException ie) { + } + + Assert.isTrue(t == threadsRun.intValue(), "The threads run unexpectedly changed: " + t + " != " + threadsRun); + threadsRun.increment(); + this.logger.debug("Before end of a mutually exclusive execution block: {}", t); + } + +} diff --git a/src/test/java/com/inteligr8/alfresco/annotations/IfNotNullTest.java b/src/test/java/com/inteligr8/alfresco/annotations/IfNotNullTest.java index 9a1c160..867a477 100644 --- a/src/test/java/com/inteligr8/alfresco/annotations/IfNotNullTest.java +++ b/src/test/java/com/inteligr8/alfresco/annotations/IfNotNullTest.java @@ -1,7 +1,5 @@ package com.inteligr8.alfresco.annotations; -import javax.annotation.Nonnull; - import org.apache.commons.lang3.mutable.MutableBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,11 +30,11 @@ public class IfNotNullTest extends AbstractLifecycleBean { protected void onShutdown(ApplicationEvent event) { } - private void tryNotNull(@Nonnull String str) { - executed.setTrue(); + private void tryNotNull(@IfNotNull String str) { + this.executed.setTrue(); } - private void tryNull(@Nonnull String str) { + private void tryNull(@IfNotNull String str) { throw new IllegalStateException(); } diff --git a/src/test/java/com/inteligr8/alfresco/annotations/ThreadedTest.java b/src/test/java/com/inteligr8/alfresco/annotations/ThreadedTest.java new file mode 100644 index 0000000..03ac442 --- /dev/null +++ b/src/test/java/com/inteligr8/alfresco/annotations/ThreadedTest.java @@ -0,0 +1,74 @@ +package com.inteligr8.alfresco.annotations; + +import java.util.Random; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEvent; +import org.springframework.extensions.surf.util.AbstractLifecycleBean; +import org.springframework.stereotype.Component; +import org.springframework.util.Assert; + +@Component +public class ThreadedTest extends AbstractLifecycleBean { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final Random random = new Random(); + + @Override + protected void onBootstrap(ApplicationEvent event) { + this.logger.info("Running test: {}", this.getClass()); + Assert.isTrue(Thread.currentThread().getId() > 0L, "An unexpected non-positive thread ID: " + Thread.currentThread().getId()); + + this.logger.info("Main thread: {}", Thread.currentThread().getId()); + + MutableInt threadCount = new MutableInt(); + this.try5sync(Thread.currentThread(), threadCount); + Assert.isTrue(threadCount.intValue() == 5, "An unexpected thread count record: 5 != " + threadCount); + + threadCount.setValue(0); + this.try5(Thread.currentThread(), threadCount); + Assert.isTrue(threadCount.intValue() < 5, "An unexpected thread count record: 5 >= " + threadCount); + + try { + Thread.sleep(500L); + } catch (InterruptedException ie) { + // suppress + } + + Assert.isTrue(threadCount.intValue() == 5, "An unexpected thread count record: 5 != " + threadCount); + } + + @Override + protected void onShutdown(ApplicationEvent event) { + } + + @Threaded(threads = 5) + private void try5(Thread parentThread, MutableInt threadCount) { + this.logger.info("Running inside thread: {}", Thread.currentThread().getId()); + Assert.isTrue(parentThread.getId() != Thread.currentThread().getId(), "An unexpected thread ID: " + Thread.currentThread().getId()); + + try { + Thread.sleep(this.random.nextInt(100) + 100L); + threadCount.increment(); + } catch (InterruptedException ie) { + // suppress + } + } + + @Threaded(threads = 5, join = true) + private void try5sync(Thread parentThread, MutableInt threadCount) { + this.logger.info("Running inside thread: {}", Thread.currentThread().getId()); + Assert.isTrue(parentThread.getId() != Thread.currentThread().getId(), "An unexpected thread ID: " + Thread.currentThread().getId()); + + try { + Thread.sleep(this.random.nextInt(100) + 100L); + threadCount.increment(); + } catch (InterruptedException ie) { + // suppress + } + } + +} diff --git a/src/test/java/com/inteligr8/alfresco/annotations/TransactionalTest.java b/src/test/java/com/inteligr8/alfresco/annotations/TransactionalTest.java index 8b6d0f6..11a3331 100644 --- a/src/test/java/com/inteligr8/alfresco/annotations/TransactionalTest.java +++ b/src/test/java/com/inteligr8/alfresco/annotations/TransactionalTest.java @@ -215,7 +215,7 @@ public class TransactionalTest extends AbstractLifecycleBean { @TransactionalRetryable private void tryRetryOnlyTransactional(String originTxId) { if (originTxId == null) { - Assert.isNull(AlfrescoTransactionSupport.getTransactionId(), "An unexpected transaction"); + Assert.isTrue(AlfrescoTransactionSupport.getTransactionId() != null, "Expected a new transaction"); } else { Assert.isTrue(AlfrescoTransactionSupport.getTransactionId().equals(originTxId), "Expected the same transaction: " + AlfrescoTransactionSupport.getTransactionId() + " != " + originTxId); }