diff --git a/pom.xml b/pom.xml index df92505..ec3a009 100644 --- a/pom.xml +++ b/pom.xml @@ -43,8 +43,6 @@ 8 4.8.0 - 7.4.2 - 22.22 1.9.19 -javaagent:/var/lib/tomcat/dev/lib/aspectjweaver-${aspectj.version}.jar @@ -75,18 +73,6 @@ 1.0.1 amp - - javax.transaction - javax.transaction-api - 1.3 - provided - - - jakarta.transaction - jakarta.transaction-api - 2.0.1 - provided - @@ -99,6 +85,28 @@ + + org.codehaus.mojo + build-helper-maven-plugin + 3.6.0 + + + add-source + add-source + + + src/main/${source.tx.path} + + + + + + + maven-jar-plugin + + ${classifier.tx.name} + + io.repaint.maven tiles-maven-plugin @@ -119,6 +127,49 @@ + + debug + + true + + + + javax.transaction + + true + + + javax-tx + javax_tx + 7.4.2 + 22.22 + + + + javax.transaction + javax.transaction-api + 1.3 + provided + + + + + jakarta.transaction + + jakarta-tx + jakarta_tx + 23.2.1 + 23.2.0.60 + + + + jakarta.transaction + jakarta.transaction-api + 2.0.1 + provided + + + ossrh-release diff --git a/src/main/jakarta-tx/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransaction.java b/src/main/jakarta-tx/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransaction.java new file mode 100644 index 0000000..79c5567 --- /dev/null +++ b/src/main/jakarta-tx/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransaction.java @@ -0,0 +1,96 @@ +package com.inteligr8.alfresco.annotations.service.impl; + +import javax.transaction.xa.XAResource; + +import org.alfresco.repo.transaction.AlfrescoTransactionSupport; +import org.alfresco.repo.transaction.TransactionListener; + +import jakarta.transaction.HeuristicMixedException; +import jakarta.transaction.HeuristicRollbackException; +import jakarta.transaction.NotSupportedException; +import jakarta.transaction.RollbackException; +import jakarta.transaction.Status; +import jakarta.transaction.Synchronization; +import jakarta.transaction.SystemException; +import jakarta.transaction.Transaction; +import jakarta.transaction.UserTransaction; + +public class AlfrescoTransaction implements UserTransaction, Transaction { + + private UserTransaction userTx; + + public AlfrescoTransaction(UserTransaction userTx) { + this.userTx = userTx; + } + + @Override + public void begin() throws NotSupportedException, SystemException { + this.userTx.begin(); + } + + @Override + public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, + SecurityException, IllegalStateException, SystemException { + this.userTx.commit(); + } + + @Override + public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException { + return false; + } + + @Override + public boolean enlistResource(XAResource xaRes) throws RollbackException, IllegalStateException, SystemException { + return false; + } + + @Override + public int getStatus() throws SystemException { + return this.userTx.getStatus(); + } + + @Override + public void registerSynchronization(Synchronization sync) + throws RollbackException, IllegalStateException, SystemException { + AlfrescoTransactionSupport.bindListener(new TransactionListener() { + @Override + public void flush() { + } + + @Override + public void beforeCompletion() { + sync.beforeCompletion(); + } + + @Override + public void beforeCommit(boolean readOnly) { + } + + @Override + public void afterRollback() { + sync.afterCompletion(Status.STATUS_ROLLEDBACK); + } + + @Override + public void afterCommit() { + sync.afterCompletion(Status.STATUS_COMMITTED); + } + }); + } + + @Override + public void rollback() throws IllegalStateException, SecurityException, SystemException { + this.userTx.rollback(); + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException { + this.userTx.setRollbackOnly(); + } + + @Override + public void setTransactionTimeout(int seconds) throws SystemException { + this.userTx.setTransactionTimeout(seconds); + } + +} diff --git a/src/main/jakarta-tx/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransactionManager.java b/src/main/jakarta-tx/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransactionManager.java new file mode 100644 index 0000000..5300f29 --- /dev/null +++ b/src/main/jakarta-tx/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransactionManager.java @@ -0,0 +1,112 @@ +package com.inteligr8.alfresco.annotations.service.impl; + +import java.util.function.Supplier; + +import org.alfresco.service.transaction.TransactionService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import jakarta.transaction.HeuristicMixedException; +import jakarta.transaction.HeuristicRollbackException; +import jakarta.transaction.InvalidTransactionException; +import jakarta.transaction.NotSupportedException; +import jakarta.transaction.RollbackException; +import jakarta.transaction.Status; +import jakarta.transaction.SystemException; +import jakarta.transaction.Transaction; +import jakarta.transaction.TransactionManager; +import jakarta.transaction.UserTransaction; + +/** + * This bean implements a standard TransactionManager for ACS. + */ +@Component +public class AlfrescoTransactionManager implements TransactionManager { + + @Autowired + private TransactionService txService; + + private ThreadLocal tx = ThreadLocal.withInitial(new Supplier() { + @Override + public AlfrescoTransaction get() { + return null; + } + }); + + @Override + public void begin() throws NotSupportedException, SystemException { + UserTransaction userTx = this.txService.getNonPropagatingUserTransaction(); + AlfrescoTransaction tx = new AlfrescoTransaction(userTx); + tx.begin(); + this.tx.set(tx); + } + + @Override + public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, + SecurityException, IllegalStateException, SystemException { + AlfrescoTransaction tx = this.tx.get(); + if (tx == null) + throw new IllegalStateException(); + + tx.commit(); + } + + @Override + public int getStatus() throws SystemException { + AlfrescoTransaction tx = this.tx.get(); + if (tx == null) + return Status.STATUS_NO_TRANSACTION; + + return tx.getStatus(); + } + + @Override + public Transaction getTransaction() throws SystemException { + return this.tx.get(); + } + + @Override + public void resume(Transaction tx) throws InvalidTransactionException, IllegalStateException, SystemException { + if (!(tx instanceof AlfrescoTransaction)) + throw new InvalidTransactionException("An AlfrescoTransaction is expected; received: " + tx.getClass()); + if (this.tx.get() != null) + throw new IllegalStateException(); + + this.tx.set((AlfrescoTransaction) tx); + } + + @Override + public void rollback() throws IllegalStateException, SecurityException, SystemException { + AlfrescoTransaction tx = this.tx.get(); + if (tx == null) + throw new IllegalStateException(); + + tx.rollback(); + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException { + AlfrescoTransaction tx = this.tx.get(); + if (tx == null) + throw new IllegalStateException(); + + tx.setRollbackOnly(); + } + + @Override + public void setTransactionTimeout(int seconds) throws SystemException { + AlfrescoTransaction tx = this.tx.get(); + if (tx != null) + tx.setTransactionTimeout(seconds); + } + + @Override + public Transaction suspend() throws SystemException { + try { + return this.tx.get(); + } finally { + this.tx.set(null); + } + } + +} diff --git a/src/main/jakarta-tx/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java b/src/main/jakarta-tx/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java new file mode 100644 index 0000000..4b14c6b --- /dev/null +++ b/src/main/jakarta-tx/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java @@ -0,0 +1,639 @@ +package com.inteligr8.alfresco.annotations.service.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Parameter; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.Temporal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.alfresco.error.AlfrescoRuntimeException; +import org.alfresco.model.ContentModel; +import org.alfresco.repo.cache.SimpleCache; +import org.alfresco.repo.dictionary.M2Model; +import org.alfresco.repo.version.common.VersionImpl; +import org.alfresco.service.cmr.action.Action; +import org.alfresco.service.cmr.action.ActionService; +import org.alfresco.service.cmr.dictionary.CustomModelService; +import org.alfresco.service.cmr.dictionary.DictionaryService; +import org.alfresco.service.cmr.repository.ContentReader; +import org.alfresco.service.cmr.repository.ContentService; +import org.alfresco.service.cmr.repository.NodeRef; +import org.alfresco.service.cmr.version.Version; +import org.alfresco.service.namespace.NamespaceService; +import org.alfresco.service.namespace.QName; +import org.alfresco.service.transaction.TransactionService; +import org.alfresco.util.Pair; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.reflect.MethodSignature; +import org.messaginghub.pooled.jms.JmsPoolConnectionFactory; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; +import org.quartz.impl.JobDetailImpl; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationEvent; +import org.springframework.extensions.surf.util.AbstractLifecycleBean; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.inteligr8.alfresco.annotations.AuthorizedAsSystem; +import com.inteligr8.alfresco.annotations.Threadable; +import com.inteligr8.alfresco.annotations.Threaded; +import com.inteligr8.alfresco.annotations.TransactionalRetryable; +import com.inteligr8.alfresco.annotations.job.AsyncJob; +import com.inteligr8.alfresco.annotations.service.AsyncProcessException; +import com.inteligr8.alfresco.annotations.service.AsyncService; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.jms.Connection; +import jakarta.jms.JMSException; +import jakarta.jms.Message; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Queue; +import jakarta.jms.Session; + +/** + * This class provides integration with MQ for the asynchronous method executions. + * + * @author brian@inteligr8.com + */ +@Component("async.mq") +public class MqAsyncService extends AbstractLifecycleBean implements AsyncService, InitializingBean, DisposableBean, Threadable { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final JobKey jobKey = new JobKey("mq-async", "inteligr8-annotations"); + private final Pattern typePattern = Pattern.compile("v([0-9]+):([^:#]+)#(.+)"); + private final ObjectMapper om = new ObjectMapper(); + + @Value("${inteligr8.async.mq.enabled}") + protected boolean enabled; + + @Value("${inteligr8.async.mq.workerThreads}") + protected int workerThreads; + + @Value("${inteligr8.async.mq.url}") + protected String url; + + @Value("${inteligr8.async.mq.username}") + protected String username; + + @Value("${inteligr8.async.mq.password}") + protected String password; + + @Value("${inteligr8.async.mq.queue}") + protected String queueName; + + @Value("${inteligr8.async.mq.errorQueue}") + protected String errorQueueName; + + @Value("${inteligr8.async.mq.clientId}") + protected String clientId; + + @Value("${inteligr8.async.mq.pool.max}") + protected short maxConnections; + + @Autowired + protected ActionService actionService; + + @Autowired + protected ContentService contentService; + + @Autowired + protected CustomModelService modelService; + + @Autowired + protected DictionaryService dictionaryService; + + @Autowired + protected NamespaceService namespaceService; + + @Autowired + protected TransactionService txService; + + private String hostname; + + private JmsPoolConnectionFactory factory; + + private SimpleCache, String>, Method> methodCache; + + private ThreadLocal isAsync = ThreadLocal.withInitial(new Supplier() { + @Override + public Boolean get() { + return false; + } + }); + + @Override + public void afterPropertiesSet() throws Exception { + this.init(); + } + + @Override + public void destroy() throws Exception { + this.uninit(); + } + + /** + * @PostConstruct does not work in ACS + */ + @PostConstruct + protected void init() { + if (!this.enabled) + return; + + try { + this.hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException uhe) { + this.hostname = "unknown"; + } + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.url); + + JmsPoolConnectionFactory pool = new JmsPoolConnectionFactory(); + pool.setConnectionFactory(factory); + pool.setMaxConnections(this.maxConnections); + pool.start(); + + this.factory = pool; + + if (this.workerThreads <= 0) + throw new AlfrescoRuntimeException("The 'inteligr8.async.mq.workerThreads' property must be positive"); + } + + @PreDestroy + protected void uninit() { + if (this.factory != null) + this.factory.stop(); + } + + @Override + protected void onBootstrap(ApplicationEvent event) { + if (!this.enabled) + return; + + JobDetailImpl jobDetail = new JobDetailImpl(); + jobDetail.setKey(this.jobKey); + jobDetail.setRequestsRecovery(true); + jobDetail.setJobClass(AsyncJob.class); + jobDetail.getJobDataMap().put("asyncService", this); + + Trigger trigger = TriggerBuilder.newTrigger() + .startNow() + .build(); + + try { + StdSchedulerFactory.getDefaultScheduler() + .scheduleJob(jobDetail, trigger); + } catch (SchedulerException se) { + this.logger.error("The MQ async service job failed to start; no asynchronous executions will be processed!", se); + } + } + + @Override + protected void onShutdown(ApplicationEvent event) { + try { + Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); + scheduler.deleteJob(this.jobKey); + } catch (SchedulerException se) { + this.logger.warn("The MQ async service job failed to stop", se); + } + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public Integer getThreads() { + return this.workerThreads; + } + + @Override + public boolean isCurrentThreadAsynchronous() { + return this.isAsync.get(); + } + + @Override + @Transactional + public void poll() throws AsyncProcessException { + this.logger.trace("poll()"); + this.isAsync.set(true); + + try { + Connection mqcon = this.factory.createConnection(this.username, this.password); + try { + mqcon.setClientID(this.clientId + "-service-" + this.hostname); + + Session mqsession = mqcon.createSession(true, Session.CLIENT_ACKNOWLEDGE); + try { + this.logger.debug("Polling messages for asynchronous policy execution"); + this.pollErrors(mqsession); + this.pollMain(mqsession); + } finally { + mqsession.close(); + } + } finally { + mqcon.close(); + } + } catch (JMSException je) { + throw new AsyncProcessException("A JMS messaging issue occurred", je); + } + } + + private void pollErrors(Session mqsession) throws JMSException { + this.logger.debug("Polling previously errored messages"); + + Queue mqqueue = mqsession.createQueue(this.errorQueueName); + Set msgIds = new HashSet<>(); + int ackd = 0; + + MessageConsumer consumer = mqsession.createConsumer(mqqueue); + try { + while (!Thread.currentThread().isInterrupted()) { + Boolean processed = this.pollTx(mqsession, consumer, msgIds); + if (processed == null) { + break; + } else if (processed.booleanValue()) { + ackd++; + } + } + } finally { + consumer.close(); + } + + this.logger.info("Successfully processed {} of {} previously errored messages", ackd, msgIds.size()); + } + + private void pollMain(Session mqsession) throws JMSException { + this.logger.debug("Polling ongoing messages ..."); + + Queue mqqueue = mqsession.createQueue(this.queueName); + this.pollMainThreaded(mqsession, mqqueue); + } + + @Threaded(name = "mq-poll", join = true) + private void pollMainThreaded(Session mqsession, Queue mqqueue) throws JMSException { + MessageConsumer consumer = mqsession.createConsumer(mqqueue); + try { + while (!Thread.currentThread().isInterrupted()) { + pollTx(mqsession, consumer, null); + } + } finally { + consumer.close(); + } + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + @TransactionalRetryable(maxRetries = 3) + @AuthorizedAsSystem + private Boolean pollTx(Session mqsession, MessageConsumer consumer, Set msgIds) throws JMSException { + Message mqmsg = consumer.receive(); + + if (msgIds != null && !msgIds.add(mqmsg.getJMSMessageID())) { + this.logger.debug("Received a message again; assuming we have (re)tried all errored messages: {}", mqmsg.getJMSMessageID()); + return null; + } + + try { + if (this.processIncomingMessage(mqsession, mqmsg, msgIds != null)) { + mqmsg.acknowledge(); + return true; + } + } catch (RuntimeException | Error e) { + this.logger.error("An unexpected issue occurred", e); + } + + return false; + } + + private boolean processIncomingMessage(Session mqsession, Message mqmsg, boolean isErrorQueue) throws JMSException { + String msgId = mqmsg.getJMSMessageID(); + this.logger.debug("Received message: {}", msgId); + + String type = mqmsg.getJMSType(); + Matcher matcher = this.typePattern.matcher(type); + if (!matcher.find()) { + this.logger.warn("The queue has a message ('{}') with an unsupported JMS type: {}", msgId, type); + return false; + } + + try { + Class beanClass = Class.forName(matcher.group(2)); + this.logger.trace("Preparing to execute using bean type: {}", beanClass); + Object bean = this.getApplicationContext().getBean(beanClass); + this.logger.trace("Found qualifying bean: {}", bean); + + String methodName = matcher.group(3); + Method method = this.findMethod(beanClass, methodName); + this.logger.trace("Found qualifying method: {}", method); + Parameter[] params = method.getParameters(); + + Object[] args = new Object[params.length]; + + for (int a = 0; a < args.length; a++) { + Object arg = mqmsg.getObjectProperty("arg" + a); + if (arg == null) + continue; + + args[a] = this.unmarshal(params[a], arg); + } + + switch (method.getName()) { + case "onLoadDynamicModel": + args[1] = args[0]; + args[0] = this.loadModel((NodeRef) args[1]); + } + + method.invoke(bean, args); + } catch (ClassNotFoundException cnfe) { + this.logger.error("A bean could not be found; will try on next restart"); + this.logger.error("The bean '{}' could not be found: {}", matcher.group(2), cnfe.getMessage()); + if (isErrorQueue) + return false; + this.moveToErrorQueue(mqsession, mqmsg); + } catch (IOException ie) { + this.logger.warn("This should never happen: " + ie.getMessage()); + // return to queue and retry indefinitely + return false; + } catch (NoSuchMethodException nsme) { + this.logger.error("A bean enum argument could not be constructed; will try on next restart"); + this.logger.error("An argument could not be The bean '{}' could not be found: {}", matcher.group(2), nsme.getMessage()); + if (isErrorQueue) + return false; + this.moveToErrorQueue(mqsession, mqmsg); + } catch (IllegalAccessException iae) { + this.logger.error("A bean method was not accessible (public); will try on next restart"); + this.logger.warn("The bean '{}' method '{}' is not accessible: {}", matcher.group(2), matcher.group(3), iae.getMessage()); + if (isErrorQueue) + return false; + this.moveToErrorQueue(mqsession, mqmsg); + } catch (InstantiationException | InvocationTargetException ie) { + this.logger.error("A bean method execution failed; will try on next restart"); + this.logger.warn("The bean '{}' method '{}' execution failed: {}", matcher.group(2), matcher.group(3), ie.getMessage()); + if (isErrorQueue) + return false; + this.moveToErrorQueue(mqsession, mqmsg); + } + + return true; + } + + private void moveToErrorQueue(Session mqsession, Message mqmsg) throws JMSException { + Queue mqqueue = mqsession.createQueue(this.errorQueueName); + + MessageProducer producer = mqsession.createProducer(mqqueue); + try { + producer.send(mqmsg); + } finally { + producer.close(); + } + } + + private Method findMethod(Class clazz, String methodName) { + Pair, String> key = new Pair<>(clazz, methodName); + Method method = this.methodCache.get(key); + if (method != null) { + this.logger.trace("Found method in cache: {}", method); + return method; + } + + this.logger.trace("Looping through bean type methods to find: {}", methodName); + + for (Method amethod : clazz.getDeclaredMethods()) { + if (amethod.getName().equals(methodName)) { + this.logger.debug("Found and caching method: {} => {}", key, amethod); + this.methodCache.put(key, amethod); + return amethod; + } + } + + throw new IllegalStateException("The bean (" + clazz + ") does not implement the method: " + methodName); + } + + public void push(ProceedingJoinPoint joinPoint) throws AsyncProcessException { + this.logger.trace("push({})", joinPoint); + + if (!(joinPoint.getSignature() instanceof MethodSignature)) + throw new IllegalStateException("The join point must be on methods and methods have signatures"); + + Object bean = joinPoint.getThis(); + this.logger.debug("Queuing for bean: {}", bean.getClass()); + + MethodSignature methodSig = (MethodSignature) joinPoint.getSignature(); + Method method = methodSig.getMethod(); + this.logger.debug("Queuing for method: {}", method); + + this.push(bean, method.getName(), Arrays.asList(joinPoint.getArgs())); + } + + @Transactional + public void push(Object callbackBean, String callbackMethod, List args) throws AsyncProcessException { + this.logger.trace("push({}, {}, {})", callbackBean.getClass(), callbackMethod, args); + + UUID msgId = UUID.randomUUID(); + + try { + Connection mqcon = this.factory.createConnection(this.username, this.password); + try { + mqcon.setClientID(this.clientId + "-client-" + this.hostname); + + Session mqsession = mqcon.createSession(true, Session.AUTO_ACKNOWLEDGE); + try { + this.logger.trace("Sending policy as message: {} => {}", callbackMethod, msgId); + + Queue mqqueue = mqsession.createQueue(this.queueName); + + Message mqmsg = mqsession.createMessage(); + mqmsg.setJMSMessageID(msgId.toString()); + mqmsg.setJMSType("v1:" + callbackBean.getClass() + "#" + callbackMethod); + + int i = 0; + for (Object arg : args) + mqmsg.setObjectProperty("arg" + (i++), this.marshal(arg)); + + MessageProducer producer = mqsession.createProducer(mqqueue); + try { + producer.send(mqmsg); + } finally { + producer.close(); + } + + this.logger.debug("Sent node as message: {} => {}", callbackMethod, msgId); + } finally { + mqsession.close(); + } + } finally { + mqcon.close(); + } + } catch (JMSException je) { + throw new AsyncProcessException("A JMS messaging issue occurred", je); + } + } + + @SuppressWarnings({ "unchecked" }) + private Object unmarshal(Parameter param, Object arg) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + Class paramType = param.getType(); + this.logger.trace("Unmarshaling parameter of type: {}", paramType); + + if (arg instanceof String || arg instanceof Number || arg instanceof Boolean) { + this.logger.trace("Unmarshaling primitive: {}", arg); + return arg; + } else if (Temporal.class.isAssignableFrom(paramType)) { + if (OffsetDateTime.class.isAssignableFrom(paramType)) { + return OffsetDateTime.from(DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse(arg.toString())); + } else if (ZonedDateTime.class.isAssignableFrom(paramType)) { + return ZonedDateTime.from(DateTimeFormatter.ISO_ZONED_DATE_TIME.parse(arg.toString())); + } else if (LocalDate.class.isAssignableFrom(paramType)) { + return LocalDate.from(DateTimeFormatter.ISO_LOCAL_DATE.parse(arg.toString())); + } else if (LocalDateTime.class.isAssignableFrom(paramType)) { + return LocalDateTime.from(DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(arg.toString())); + } else if (Instant.class.isAssignableFrom(paramType)) { + return Instant.from(DateTimeFormatter.ISO_INSTANT.parse(arg.toString())); + } else if (LocalTime.class.isAssignableFrom(paramType)) { + return LocalTime.from(DateTimeFormatter.ISO_LOCAL_TIME.parse(arg.toString())); + } else if (OffsetTime.class.isAssignableFrom(paramType)) { + return OffsetTime.from(DateTimeFormatter.ISO_OFFSET_TIME.parse(arg.toString())); + } else { + throw new UnsupportedOperationException(); + } + } else if (Version.class.isAssignableFrom(paramType)) { + this.logger.trace("Unmarshaling as JSON object: {}", arg); + Map argMap = (Map) this.om.convertValue(arg, Map.class); + + Map versionPropertiesMap = (Map) argMap.get("properties"); + NodeRef nodeRef = new NodeRef((String) argMap.get("nodeRef")); + + Version version = new VersionImpl(versionPropertiesMap, nodeRef); + this.logger.trace("Unmarshaled version: {} = {}", param.getName(), version); + return version; + } else if (Action.class.isAssignableFrom(paramType)) { + this.logger.trace("Unmarshaling as JSON object: {}", arg); + Map argMap = (Map) this.om.convertValue(arg, Map.class); + + String actionId = (String) argMap.get("actionId"); + NodeRef nodeRef = new NodeRef((String) argMap.get("nodeRef")); + this.logger.trace("Unmarshaling action: {}, {}", actionId, nodeRef); + + Action action = this.actionService.getAction(nodeRef, actionId); + this.logger.trace("Unmarshaled action: {} = {}", param.getName(), action); + return action; + } else if (Collection.class.isAssignableFrom(paramType)) { + this.logger.trace("Unmarshaling as JSON array: {}", arg); + return this.om.convertValue(arg, Collection.class); + } else if (Map.class.isAssignableFrom(paramType)) { + this.logger.trace("Unmarshaling as JSON object: {}", arg); + return this.om.convertValue(arg, Map.class); + } else if (QName.class.isAssignableFrom(paramType)) { + this.logger.trace("Unmarshaling as QName: {}", arg); + return QName.createQName((String) arg); + } else if (Enum.class.isAssignableFrom(paramType)) { + this.logger.trace("Unmarshaling as Enum: {}", arg); + Method cons = paramType.getDeclaredMethod("valueOf", String.class); + return cons.invoke(null, arg.toString()); + } else { + this.logger.trace("Unmarshaling as POJO: {}", arg); + try { + Constructor cons = paramType.getConstructor(String.class); + return cons.newInstance(arg.toString()); + } catch (NoSuchMethodException nsme) { + Method method = paramType.getDeclaredMethod("valueOf", String.class); + return method.invoke(null, arg.toString()); + } + } + } + + private Object marshal(Object arg) { + if (arg instanceof String || arg instanceof Number || arg instanceof Boolean) { + return arg; + } else if (arg instanceof Temporal) { + return arg.toString(); + } else if (arg instanceof Version) { + Version version = (Version) arg; + Map map = new HashMap<>(); + map.put("nodeRef", version.getFrozenStateNodeRef()); + map.put("properties", version.getVersionProperties()); + + this.logger.trace("Marshaling Version as JSON object: {}", map); + return this.om.convertValue(map, String.class); + } else if (arg instanceof Action) { + Action action = (Action) arg; + Map map = new HashMap<>(); + map.put("nodeRef", action.getNodeRef()); + map.put("actionId", action.getId()); + + this.logger.trace("Marshaling Action as JSON object: {}", map); + return this.om.convertValue(map, String.class); + } else if (arg instanceof Collection) { + List list = new ArrayList<>(((Collection)arg).size()); + for (Object obj : (Collection) arg) + list.add(this.marshal(obj)); + + this.logger.trace("Marshaling Java Collection as JSON array: {}", list); + return this.om.convertValue(list, String.class); + } else if (arg instanceof Map) { + Map map = new HashMap<>(); + for (Entry entry : ((Map) arg).entrySet()) { + Object key = this.marshal(entry.getKey()); + Object value = this.marshal(entry.getValue()); + map.put(key, value); + } + + this.logger.trace("Marshaling Java Map as JSON object: {}", map); + return this.om.convertValue(map, String.class); + } else { + this.logger.trace("Marshaling Java object as JSON object: {}", arg); + return this.om.convertValue(arg, String.class); + } + } + + private M2Model loadModel(NodeRef nodeRef) throws IOException { + ContentReader creader = this.contentService.getReader(nodeRef, ContentModel.PROP_CONTENT); + InputStream istream = creader.getContentInputStream(); + try { + return M2Model.createModel(istream); + } finally { + istream.close(); + } + } + +} diff --git a/src/main/java/com/inteligr8/alfresco/annotations/util/JakartaTransactionalAnnotationAdapter.java b/src/main/jakarta-tx/com/inteligr8/alfresco/annotations/util/JtaTransactionalAnnotationAdapter.java similarity index 76% rename from src/main/java/com/inteligr8/alfresco/annotations/util/JakartaTransactionalAnnotationAdapter.java rename to src/main/jakarta-tx/com/inteligr8/alfresco/annotations/util/JtaTransactionalAnnotationAdapter.java index c591091..e635dea 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/util/JakartaTransactionalAnnotationAdapter.java +++ b/src/main/jakarta-tx/com/inteligr8/alfresco/annotations/util/JtaTransactionalAnnotationAdapter.java @@ -5,11 +5,17 @@ import jakarta.transaction.Transactional; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Propagation; -public class JakartaTransactionalAnnotationAdapter implements TransactionalAnnotationAdapter { +public class JtaTransactionalAnnotationAdapter implements TransactionalAnnotationAdapter { + + public static final String JTA_INTERFACE_NAME = "jakarta.transaction.Transactional"; private final Transactional txl; - public JakartaTransactionalAnnotationAdapter(Transactional txl) { + public static JtaTransactionalAnnotationAdapter cast(Object obj) { + return new JtaTransactionalAnnotationAdapter((Transactional) obj); + } + + public JtaTransactionalAnnotationAdapter(Transactional txl) { this.txl = txl; } diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/QNameBasedAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/QNameBasedAspect.java index fb66d38..c5dabcc 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/QNameBasedAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/QNameBasedAspect.java @@ -8,8 +8,6 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; -import javax.annotation.PostConstruct; - import org.alfresco.repo.cache.DefaultSimpleCache; import org.alfresco.repo.cache.SimpleCache; import org.alfresco.service.cmr.repository.AssociationRef; @@ -23,6 +21,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import jakarta.annotation.PostConstruct; + public abstract class QNameBasedAspect extends AbstractMethodOrParameterAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); diff --git a/src/main/java/com/inteligr8/alfresco/annotations/aspect/RetryingTransactionAspect.java b/src/main/java/com/inteligr8/alfresco/annotations/aspect/RetryingTransactionAspect.java index 1be4758..6950d8d 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/aspect/RetryingTransactionAspect.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/aspect/RetryingTransactionAspect.java @@ -21,7 +21,6 @@ import org.springframework.transaction.IllegalTransactionStateException; import org.springframework.transaction.annotation.Transactional; import com.inteligr8.alfresco.annotations.TransactionalRetryable; -import com.inteligr8.alfresco.annotations.util.JakartaTransactionalAnnotationAdapter; import com.inteligr8.alfresco.annotations.util.JtaTransactionalAnnotationAdapter; import com.inteligr8.alfresco.annotations.util.SpringTransactionalAnnotationAdapter; import com.inteligr8.alfresco.annotations.util.TransactionalAnnotationAdapter; @@ -55,19 +54,15 @@ public class RetryingTransactionAspect { public void isTransactionalAnnotated() { } - @Pointcut("@annotation(javax.transaction.Transactional) && execution(* *(..))") + @Pointcut("@annotation(" + JtaTransactionalAnnotationAdapter.JTA_INTERFACE_NAME + ") && execution(* *(..))") public void isJtaTransactionalAnnotated() { } - @Pointcut("@annotation(jakarta.transaction.Transactional) && execution(* *(..))") - public void isJakartaTransactionalAnnotated() { - } - @Pointcut("@annotation(com.inteligr8.alfresco.annotations.TransactionalRetryable) && execution(* *(..))") public void isTransactionalRetryableAnnotated() { } - @Around("isTransactionalAnnotated() || isJtaTransactionalAnnotated() || isJakartaTransactionalAnnotated() || isTransactionalRetryableAnnotated()") + @Around("isTransactionalAnnotated() || isJtaTransactionalAnnotated() || isTransactionalRetryableAnnotated()") public Object retryingTransactional(ProceedingJoinPoint joinPoint) throws Throwable { this.logger.trace("retryingTransactional({})", joinPoint); @@ -91,13 +86,9 @@ public class RetryingTransactionAspect { if (txl != null) return new SpringTransactionalAnnotationAdapter((Transactional) txl); - txl = this.getOptionalAnnotation(method, "javax.transaction.Transactional"); + txl = this.getOptionalAnnotation(method, JtaTransactionalAnnotationAdapter.JTA_INTERFACE_NAME); if (txl != null) - return new JtaTransactionalAnnotationAdapter((javax.transaction.Transactional) txl); - - txl = this.getOptionalAnnotation(method, "jakarta.transaction.Transactional"); - if (txl != null) - return new JakartaTransactionalAnnotationAdapter((jakarta.transaction.Transactional) txl); + return JtaTransactionalAnnotationAdapter.cast(txl); return null; } @@ -172,6 +163,7 @@ public class RetryingTransactionAspect { if (txl.isReadOnly()) throw new IllegalTransactionStateException("A read/write transaction exists where a read-only one is mandatory"); } + return false; case NOT_SUPPORTED: switch (AlfrescoTransactionSupport.getTransactionReadState()) { case TXN_NONE: diff --git a/src/main/java/com/inteligr8/alfresco/annotations/service/impl/ThreadPoolAsyncService.java b/src/main/java/com/inteligr8/alfresco/annotations/service/impl/ThreadPoolAsyncService.java index 70421be..d6eb30a 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/service/impl/ThreadPoolAsyncService.java +++ b/src/main/java/com/inteligr8/alfresco/annotations/service/impl/ThreadPoolAsyncService.java @@ -22,6 +22,7 @@ import org.quartz.impl.JobDetailImpl; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEvent; import org.springframework.extensions.surf.util.AbstractLifecycleBean; @@ -32,6 +33,8 @@ import com.inteligr8.alfresco.annotations.job.AsyncJob; import com.inteligr8.alfresco.annotations.service.AsyncProcessException; import com.inteligr8.alfresco.annotations.service.AsyncService; +import jakarta.annotation.PostConstruct; + /** * This class provides a non-persistent alternative to MQ for asynchronous method * execution. @@ -39,7 +42,7 @@ import com.inteligr8.alfresco.annotations.service.AsyncService; * @author brian@inteligr8.com */ @Component("async.thread") -public class ThreadPoolAsyncService extends AbstractLifecycleBean implements AsyncService { +public class ThreadPoolAsyncService extends AbstractLifecycleBean implements AsyncService, InitializingBean { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final JobKey jobKey = new JobKey("thread-async", "inteligr8-annotations"); @@ -61,9 +64,19 @@ public class ThreadPoolAsyncService extends AbstractLifecycleBean implements Asy }); @Override - protected void onBootstrap(ApplicationEvent event) { + public void afterPropertiesSet() throws Exception { + this.init(); + } + + /** + * @PostConstruct doesn't work in ACS for whatever reason + */ + @PostConstruct + protected void init() { this.queue = new LinkedBlockingQueue<>(this.queueSize); - + } + + protected void onBootstrap(ApplicationEvent event) { JobDetailImpl jobDetail = new JobDetailImpl(); jobDetail.setKey(this.jobKey); jobDetail.setRequestsRecovery(true); diff --git a/src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransaction.java b/src/main/javax-tx/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransaction.java similarity index 100% rename from src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransaction.java rename to src/main/javax-tx/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransaction.java diff --git a/src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransactionManager.java b/src/main/javax-tx/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransactionManager.java similarity index 100% rename from src/main/java/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransactionManager.java rename to src/main/javax-tx/com/inteligr8/alfresco/annotations/service/impl/AlfrescoTransactionManager.java diff --git a/src/main/java/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java b/src/main/javax-tx/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java similarity index 96% rename from src/main/java/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java rename to src/main/javax-tx/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java index b01ddba..339f8c1 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java +++ b/src/main/javax-tx/com/inteligr8/alfresco/annotations/service/impl/MqAsyncService.java @@ -40,6 +40,7 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import org.alfresco.error.AlfrescoRuntimeException; import org.alfresco.model.ContentModel; import org.alfresco.repo.cache.SimpleCache; import org.alfresco.repo.dictionary.M2Model; @@ -69,6 +70,8 @@ import org.quartz.impl.JobDetailImpl; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEvent; @@ -86,13 +89,16 @@ import com.inteligr8.alfresco.annotations.job.AsyncJob; import com.inteligr8.alfresco.annotations.service.AsyncProcessException; import com.inteligr8.alfresco.annotations.service.AsyncService; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; + /** * This class provides integration with MQ for the asynchronous method executions. * * @author brian@inteligr8.com */ @Component("async.mq") -public class MqAsyncService extends AbstractLifecycleBean implements AsyncService, Threadable { +public class MqAsyncService extends AbstractLifecycleBean implements AsyncService, InitializingBean, DisposableBean, Threadable { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final JobKey jobKey = new JobKey("mq-async", "inteligr8-annotations"); @@ -158,7 +164,20 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic }); @Override - protected void onBootstrap(ApplicationEvent event) { + public void afterPropertiesSet() throws Exception { + this.init(); + } + + @Override + public void destroy() throws Exception { + this.uninit(); + } + + /** + * @PostConstruct does not work in ACS + */ + @PostConstruct + protected void init() { if (!this.enabled) return; @@ -178,7 +197,19 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic this.factory = pool; if (this.workerThreads <= 0) - return; + throw new AlfrescoRuntimeException("The 'inteligr8.async.mq.workerThreads' property must be positive"); + } + + @PreDestroy + protected void uninit() { + if (this.factory != null) + this.factory.stop(); + } + + @Override + protected void onBootstrap(ApplicationEvent event) { + if (!this.enabled) + return; JobDetailImpl jobDetail = new JobDetailImpl(); jobDetail.setKey(this.jobKey); @@ -206,9 +237,6 @@ public class MqAsyncService extends AbstractLifecycleBean implements AsyncServic } catch (SchedulerException se) { this.logger.warn("The MQ async service job failed to stop", se); } - - if (this.factory != null) - this.factory.stop(); } @Override diff --git a/src/main/java/com/inteligr8/alfresco/annotations/util/JtaTransactionalAnnotationAdapter.java b/src/main/javax-tx/com/inteligr8/alfresco/annotations/util/JtaTransactionalAnnotationAdapter.java similarity index 85% rename from src/main/java/com/inteligr8/alfresco/annotations/util/JtaTransactionalAnnotationAdapter.java rename to src/main/javax-tx/com/inteligr8/alfresco/annotations/util/JtaTransactionalAnnotationAdapter.java index 594fd0e..3665021 100644 --- a/src/main/java/com/inteligr8/alfresco/annotations/util/JtaTransactionalAnnotationAdapter.java +++ b/src/main/javax-tx/com/inteligr8/alfresco/annotations/util/JtaTransactionalAnnotationAdapter.java @@ -7,8 +7,14 @@ import org.springframework.transaction.annotation.Propagation; public class JtaTransactionalAnnotationAdapter implements TransactionalAnnotationAdapter { + public static final String JTA_INTERFACE_NAME = "javax.transaction.Transactional"; + private final Transactional txl; + public static JtaTransactionalAnnotationAdapter cast(Object obj) { + return new JtaTransactionalAnnotationAdapter((Transactional) obj); + } + public JtaTransactionalAnnotationAdapter(Transactional txl) { this.txl = txl; } diff --git a/src/test/java/com/inteligr8/alfresco/annotations/TransactionalTest.java b/src/test/java/com/inteligr8/alfresco/annotations/TransactionalTest.java index 11a3331..b17d98f 100644 --- a/src/test/java/com/inteligr8/alfresco/annotations/TransactionalTest.java +++ b/src/test/java/com/inteligr8/alfresco/annotations/TransactionalTest.java @@ -44,9 +44,8 @@ public class TransactionalTest extends AbstractLifecycleBean { try { this.tryNoSupportsTransactional(); - throw new IllegalStateException(); } catch (IllegalTransactionStateException uoe) { - // suppress + throw new IllegalStateException(); } try { @@ -200,7 +199,8 @@ public class TransactionalTest extends AbstractLifecycleBean { @Transactional(propagation = Propagation.NOT_SUPPORTED) private void tryNoSupportsTransactional() { - throw new UnsupportedOperationException(); + Assert.isNull(AlfrescoTransactionSupport.getTransactionId(), "Expected no transaction"); + Assert.isTrue(TxnReadState.TXN_NONE.equals(AlfrescoTransactionSupport.getTransactionReadState()), "Expected not transaction"); } @Transactional(propagation = Propagation.NEVER)