diff --git a/alfresco-docker-alfresco-pdf-renderer/src/main/resources/application.properties b/alfresco-docker-alfresco-pdf-renderer/src/main/resources/application.properties new file mode 100644 index 00000000..5dde9b28 --- /dev/null +++ b/alfresco-docker-alfresco-pdf-renderer/src/main/resources/application.properties @@ -0,0 +1 @@ +queue.engineRequestQueue=${TRANSFORM_ENGINE_REQUEST_QUEUE:org.alfresco.transform.engine.alfresco-pdf-renderer.acs} \ No newline at end of file diff --git a/alfresco-docker-imagemagick/src/main/resources/application.properties b/alfresco-docker-imagemagick/src/main/resources/application.properties new file mode 100644 index 00000000..858dc70f --- /dev/null +++ b/alfresco-docker-imagemagick/src/main/resources/application.properties @@ -0,0 +1 @@ +queue.engineRequestQueue=${TRANSFORM_ENGINE_REQUEST_QUEUE:org.alfresco.transform.engine.imagemagick.acs} \ No newline at end of file diff --git a/alfresco-docker-libreoffice/src/main/resources/application.properties b/alfresco-docker-libreoffice/src/main/resources/application.properties new file mode 100644 index 00000000..321c56ed --- /dev/null +++ b/alfresco-docker-libreoffice/src/main/resources/application.properties @@ -0,0 +1 @@ +queue.engineRequestQueue=${TRANSFORM_ENGINE_REQUEST_QUEUE:org.alfresco.transform.engine.libreoffice.acs} \ No newline at end of file diff --git a/alfresco-docker-tika/src/main/resources/application.properties b/alfresco-docker-tika/src/main/resources/application.properties new file mode 100644 index 00000000..82f9c44e --- /dev/null +++ b/alfresco-docker-tika/src/main/resources/application.properties @@ -0,0 +1 @@ +queue.engineRequestQueue=${TRANSFORM_ENGINE_REQUEST_QUEUE:org.alfresco.transform.engine.tika.acs} \ No newline at end of file diff --git a/alfresco-transformer-base/pom.xml b/alfresco-transformer-base/pom.xml index e927e55c..9a9abc5c 100644 --- a/alfresco-transformer-base/pom.xml +++ b/alfresco-transformer-base/pom.xml @@ -42,6 +42,22 @@ org.alfresco alfresco-transform-data-model + + org.springframework.boot + spring-boot-starter-activemq + + + org.apache.activemq + activemq-client + + + com.fasterxml.jackson.core + jackson-annotations + + + org.messaginghub + pooled-jms + diff --git a/alfresco-transformer-base/src/main/java/org/alfresco/transformer/AbstractTransformerController.java b/alfresco-transformer-base/src/main/java/org/alfresco/transformer/AbstractTransformerController.java index 14de0678..e93b7476 100644 --- a/alfresco-transformer-base/src/main/java/org/alfresco/transformer/AbstractTransformerController.java +++ b/alfresco-transformer-base/src/main/java/org/alfresco/transformer/AbstractTransformerController.java @@ -117,6 +117,7 @@ public abstract class AbstractTransformerController implements TransformControll logger.info("Received {}, timeout {} ms", request, timeout); final TransformReply reply = new TransformReply(); + reply.setInternalContext(request.getInternalContext()); reply.setRequestId(request.getRequestId()); reply.setSourceReference(request.getSourceReference()); reply.setSchema(request.getSchema()); diff --git a/alfresco-transformer-base/src/main/java/org/alfresco/transformer/QueueTransformService.java b/alfresco-transformer-base/src/main/java/org/alfresco/transformer/QueueTransformService.java new file mode 100644 index 00000000..cc4d546a --- /dev/null +++ b/alfresco-transformer-base/src/main/java/org/alfresco/transformer/QueueTransformService.java @@ -0,0 +1,142 @@ +package org.alfresco.transformer; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; + +import org.alfresco.transform.client.model.TransformReply; +import org.alfresco.transform.client.model.TransformRequest; +import org.alfresco.transformer.messaging.TransformMessageConverter; +import org.alfresco.transformer.messaging.TransformReplySender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.jms.annotation.JmsListener; +import org.springframework.jms.support.converter.MessageConversionException; +import org.springframework.stereotype.Component; + +/** + * Queue Transformer service. + * This service reads all the requests for the particular engine, forwards them to the worker + * component (at this time the injected controller - to be refactored) and sends back the reply + * to the {@link Message#getJMSReplyTo()} value. If this value is missing we've got to a dead end. + * + * @author Lucian Tuca + * created on 18/12/2018 + */ +@Component +public class QueueTransformService +{ + private static final Logger logger = LoggerFactory.getLogger(QueueTransformService.class); + + + // TODO: I know this is not smart but all the the transformation logic is in the Controller. + // The controller also manages the probes. There's tons of refactoring needed there, hence this. Sorry. + @Autowired + private TransformController transformController; + + @Autowired + private TransformMessageConverter transformMessageConverter; + + @Autowired + private TransformReplySender transformReplySender; + + @JmsListener(destination = "${queue.engineRequestQueue}", concurrency = "${jms-listener.concurrency}") + public void receive(final Message msg) + { + final String correlationId = tryRetrieveCorrelationId(msg); + Destination replyToDestinationQueue; + + try + { + replyToDestinationQueue = msg.getJMSReplyTo(); + } + catch (JMSException e) + { + logger.error("Cannot find 'replyTo' destination queue for message with correlationID {}. Stopping. ", correlationId); + return; + } + + logger.info("New T-Request from queue with correlationId: {0}"); + + TransformReply reply = transformController.transform(convert(msg, correlationId, replyToDestinationQueue), null).getBody(); + + transformReplySender.send(replyToDestinationQueue, reply); + } + + /** + * Tries to convert the JMS {@link Message} to a {@link TransformRequest} + * If any errors occur standard error {@link TransformReply} are sent back + * + * @param msg Message to be deserialized + * @param correlationId CorrelationId of the message + * @param destination Needed in case deserialization fails. Passed here so we don't retrieve it again. + * @return The converted {@link TransformRequest} instance + */ + private TransformRequest convert(final Message msg, final String correlationId, Destination destination) + { + try + { + return (TransformRequest) transformMessageConverter.fromMessage(msg); + } + catch (MessageConversionException e) + { + String message = "Message conversion exception during T-Request deserialization: "; + logger.error(message + e.getMessage(), e); + replyWithBadRequest(destination, message + e.getMessage(), correlationId); + } + catch (JMSException e) + { + String message = "JMS exception during T-Request deserialization: "; + logger.error(message + e.getMessage(), e); + replyWithInternalSvErr(destination, message + e.getMessage(), correlationId); + } + catch (Exception e) + { + String message = "Exception during T-Request deserialization: "; + logger.error(message + e.getMessage(), e); + replyWithInternalSvErr(destination, message + e.getMessage(), correlationId); + } + catch (Throwable t) + { + logger.error("Error during T-Request deserialization" + t.getMessage(), t); + throw t; + } + return null; + } + + + private void replyWithBadRequest(final Destination destination, final String msg, final String correlationId) + { + replyWithError(destination, HttpStatus.BAD_REQUEST, msg, correlationId); + } + + private void replyWithInternalSvErr(final Destination destination, final String msg, final String correlationId) + { + replyWithError(destination, HttpStatus.INTERNAL_SERVER_ERROR, msg, correlationId); + } + + private void replyWithError(final Destination destination, final HttpStatus status, final String msg, + final String correlationId) + { + final TransformReply reply = TransformReply.builder() + .withStatus(status.value()) + .withErrorDetails(msg) + .build(); + + transformReplySender.send(destination, reply, correlationId); + } + + private static String tryRetrieveCorrelationId(final Message msg) + { + try + { + return msg.getJMSCorrelationID(); + } + catch (Exception ignore) + { + return null; + } + } +} diff --git a/alfresco-transformer-base/src/main/java/org/alfresco/transformer/messaging/MessagingConfig.java b/alfresco-transformer-base/src/main/java/org/alfresco/transformer/messaging/MessagingConfig.java new file mode 100644 index 00000000..f9a533fb --- /dev/null +++ b/alfresco-transformer-base/src/main/java/org/alfresco/transformer/messaging/MessagingConfig.java @@ -0,0 +1,66 @@ +package org.alfresco.transformer.messaging; + +import javax.jms.ConnectionFactory; + +import org.alfresco.transform.client.model.TransformRequestValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jms.annotation.JmsListenerConfigurer; +import org.springframework.jms.config.DefaultJmsListenerContainerFactory; +import org.springframework.jms.config.JmsListenerEndpointRegistrar; +import org.springframework.jms.connection.JmsTransactionManager; +import org.springframework.lang.NonNull; +import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * JMS and messaging configuration for the T-Engines. Contains the basic config in order to have the + * T-Engine able to read from queues and send a reply back. + * + * @author Lucian Tuca + * created on 18/12/2018 + */ +@Configuration +public class MessagingConfig implements JmsListenerConfigurer +{ + private static final Logger logger = LoggerFactory.getLogger(MessagingConfig.class); + + @Override + public void configureJmsListeners(@NonNull JmsListenerEndpointRegistrar registrar) + { + registrar.setMessageHandlerMethodFactory(methodFactory()); + } + + @Bean + public DefaultMessageHandlerMethodFactory methodFactory() + { + DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); + factory.setValidator(new TransformRequestValidator()); + return factory; + } + + @Bean + public DefaultJmsListenerContainerFactory jmsListenerContainerFactory( + final ConnectionFactory connectionFactory, + final TransformMessageConverter transformMessageConverter) + { + final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + factory.setMessageConverter(transformMessageConverter); + factory.setErrorHandler(t -> logger.error("JMS error: " + t.getMessage(), t)); + factory.setTransactionManager(transactionManager(connectionFactory)); + return factory; + } + + @Bean + public PlatformTransactionManager transactionManager(final ConnectionFactory connectionFactory) + { + final JmsTransactionManager transactionManager = new JmsTransactionManager(); + transactionManager.setConnectionFactory(connectionFactory); + return transactionManager; + } +} + + diff --git a/alfresco-transformer-base/src/main/java/org/alfresco/transformer/messaging/TransformMessageConverter.java b/alfresco-transformer-base/src/main/java/org/alfresco/transformer/messaging/TransformMessageConverter.java new file mode 100644 index 00000000..1885e519 --- /dev/null +++ b/alfresco-transformer-base/src/main/java/org/alfresco/transformer/messaging/TransformMessageConverter.java @@ -0,0 +1,83 @@ +/* + * Copyright 2015-2018 Alfresco Software, Ltd. All rights reserved. + * + * License rights for this program may be obtained from Alfresco Software, Ltd. + * pursuant to a written agreement and any use of this program without such an + * agreement is prohibited. + */ + +package org.alfresco.transformer.messaging; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; + +import org.alfresco.transform.client.model.TransformReply; +import org.alfresco.transform.client.model.TransformRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jms.support.converter.MappingJackson2MessageConverter; +import org.springframework.jms.support.converter.MessageConversionException; +import org.springframework.jms.support.converter.MessageConverter; +import org.springframework.jms.support.converter.MessageType; +import org.springframework.lang.NonNull; +import org.springframework.stereotype.Service; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.google.common.collect.ImmutableMap; + +/** + * TODO: Duplicated from the Router + * Custom wrapper over MappingJackson2MessageConverter for T-Request/T-Reply objects. + * + * @author Cezar Leahu + */ +@Service +public class TransformMessageConverter implements MessageConverter +{ + private static final Logger logger = LoggerFactory.getLogger(TransformMessageConverter.class); + + private static final MappingJackson2MessageConverter converter; + private static final JavaType TRANSFORM_REQUEST_TYPE = + TypeFactory.defaultInstance().constructType(TransformRequest.class); + + static + { + converter = new MappingJackson2MessageConverter() + { + @Override + @NonNull + protected JavaType getJavaTypeForMessage(final Message message) throws JMSException + { + if (message.getStringProperty("_type") == null) + { + return TRANSFORM_REQUEST_TYPE; + } + return super.getJavaTypeForMessage(message); + } + }; + converter.setTargetType(MessageType.BYTES); + converter.setTypeIdPropertyName("_type"); + converter.setTypeIdMappings(ImmutableMap.of( + TransformRequest.class.getName(), TransformRequest.class, + TransformReply.class.getName(), TransformReply.class) + ); + } + + @Override + @NonNull + public Message toMessage( + @NonNull final Object object, + @NonNull final Session session) throws JMSException, MessageConversionException + { + return converter.toMessage(object, session); + } + + @Override + @NonNull + public Object fromMessage(@NonNull final Message message) throws JMSException + { + return converter.fromMessage(message); + } +} diff --git a/alfresco-transformer-base/src/main/java/org/alfresco/transformer/messaging/TransformReplySender.java b/alfresco-transformer-base/src/main/java/org/alfresco/transformer/messaging/TransformReplySender.java new file mode 100644 index 00000000..aeb768ea --- /dev/null +++ b/alfresco-transformer-base/src/main/java/org/alfresco/transformer/messaging/TransformReplySender.java @@ -0,0 +1,57 @@ +/* + * Copyright 2015-2018 Alfresco Software, Ltd. All rights reserved. + * + * License rights for this program may be obtained from Alfresco Software, Ltd. + * pursuant to a written agreement and any use of this program without such an + * agreement is prohibited. + */ +package org.alfresco.transformer.messaging; + +import javax.jms.Destination; + +import org.alfresco.transform.client.model.TransformReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.stereotype.Component; + +/** + * TODO: Duplicated from the Router + * TransformReplySender Bean + *

+ * JMS message sender/publisher + * + * @author Cezar Leahu + */ +@Component +public class TransformReplySender +{ + private static final Logger logger = LoggerFactory.getLogger(TransformReplySender.class); + + @Autowired + private JmsTemplate jmsTemplate; + + public void send(final Destination destination, final TransformReply reply) + { + send(destination, reply, reply.getRequestId()); + } + + public void send(final Destination destination, final TransformReply reply, final String correlationId) + { + try + { + //jmsTemplate.setSessionTransacted(true); // do we need this? + jmsTemplate.convertAndSend(destination, reply, m -> { + m.setJMSCorrelationID(correlationId); + return m; + }); + logger.info("Sent: {} - with correlation ID {}", reply, correlationId); + } + catch (Exception e) + { + logger.error( + "Failed to send T-Reply " + reply + " - for correlation ID " + correlationId, e); + } + } +} diff --git a/alfresco-transformer-base/src/main/resources/application.yaml b/alfresco-transformer-base/src/main/resources/application.yaml index b02538a2..d91b0f1b 100644 --- a/alfresco-transformer-base/src/main/resources/application.yaml +++ b/alfresco-transformer-base/src/main/resources/application.yaml @@ -3,6 +3,13 @@ spring: multipart: max-file-size: 8192MB max-request-size: 8192MB + activemq: + broker-url: ${ACTIVEMQ_URL:nio://localhost:61616}?jms.watchTopicAdvisories=false + user: ${ACTIVEMQ_USER:admin} + password: ${ACTIVEMQ_PASSWORD:admin} + pool: + enabled: true + max-connections: 10 server: port: 8090 @@ -18,6 +25,9 @@ logging: fileStoreUrl: ${FILE_STORE_URL:http://localhost:8099/alfresco/api/-default-/private/sfs/versions/1/file} +jms-listener: + concurrency: ${JMS_LISTENER_CONCURRENCY:1-10} + management: endpoints: web: diff --git a/pom.xml b/pom.xml index d12b5dbc..b9d3e86d 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,9 @@ 8.8 3.0.1.1 ${project.version} - 0.0.7 + 0.5.2 + 5.15.8 + 2.9.6 @@ -80,6 +82,16 @@ cxf-rt-transports-http 3.2.6 + + com.fasterxml.jackson.core + jackson-annotations + ${dependency.jackson.version} + + + org.apache.activemq + activemq-client + ${dependency.activemq.version} +