Merge branch 'ATS-227' into 'master'

ATS-227 : PoC: Improve scaling/performance of transforms via T-Engine queues ?

See merge request Repository/alfresco-docker-transformers!39
This commit is contained in:
Lucian Tuca 2019-01-11 08:49:10 +00:00
commit fec9ea5eda
12 changed files with 392 additions and 1 deletions

View File

@ -0,0 +1 @@
queue.engineRequestQueue=${TRANSFORM_ENGINE_REQUEST_QUEUE:org.alfresco.transform.engine.alfresco-pdf-renderer.acs}

View File

@ -0,0 +1 @@
queue.engineRequestQueue=${TRANSFORM_ENGINE_REQUEST_QUEUE:org.alfresco.transform.engine.imagemagick.acs}

View File

@ -0,0 +1 @@
queue.engineRequestQueue=${TRANSFORM_ENGINE_REQUEST_QUEUE:org.alfresco.transform.engine.libreoffice.acs}

View File

@ -0,0 +1 @@
queue.engineRequestQueue=${TRANSFORM_ENGINE_REQUEST_QUEUE:org.alfresco.transform.engine.tika.acs}

View File

@ -42,6 +42,22 @@
<groupId>org.alfresco</groupId> <groupId>org.alfresco</groupId>
<artifactId>alfresco-transform-data-model</artifactId> <artifactId>alfresco-transform-data-model</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -117,6 +117,7 @@ public abstract class AbstractTransformerController implements TransformControll
logger.info("Received {}, timeout {} ms", request, timeout); logger.info("Received {}, timeout {} ms", request, timeout);
final TransformReply reply = new TransformReply(); final TransformReply reply = new TransformReply();
reply.setInternalContext(request.getInternalContext());
reply.setRequestId(request.getRequestId()); reply.setRequestId(request.getRequestId());
reply.setSourceReference(request.getSourceReference()); reply.setSourceReference(request.getSourceReference());
reply.setSchema(request.getSchema()); reply.setSchema(request.getSchema());

View File

@ -0,0 +1,142 @@
package org.alfresco.transformer;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import org.alfresco.transform.client.model.TransformReply;
import org.alfresco.transform.client.model.TransformRequest;
import org.alfresco.transformer.messaging.TransformMessageConverter;
import org.alfresco.transformer.messaging.TransformReplySender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.stereotype.Component;
/**
* Queue Transformer service.
* This service reads all the requests for the particular engine, forwards them to the worker
* component (at this time the injected controller - to be refactored) and sends back the reply
* to the {@link Message#getJMSReplyTo()} value. If this value is missing we've got to a dead end.
*
* @author Lucian Tuca
* created on 18/12/2018
*/
@Component
public class QueueTransformService
{
private static final Logger logger = LoggerFactory.getLogger(QueueTransformService.class);
// TODO: I know this is not smart but all the the transformation logic is in the Controller.
// The controller also manages the probes. There's tons of refactoring needed there, hence this. Sorry.
@Autowired
private TransformController transformController;
@Autowired
private TransformMessageConverter transformMessageConverter;
@Autowired
private TransformReplySender transformReplySender;
@JmsListener(destination = "${queue.engineRequestQueue}", concurrency = "${jms-listener.concurrency}")
public void receive(final Message msg)
{
final String correlationId = tryRetrieveCorrelationId(msg);
Destination replyToDestinationQueue;
try
{
replyToDestinationQueue = msg.getJMSReplyTo();
}
catch (JMSException e)
{
logger.error("Cannot find 'replyTo' destination queue for message with correlationID {}. Stopping. ", correlationId);
return;
}
logger.info("New T-Request from queue with correlationId: {0}");
TransformReply reply = transformController.transform(convert(msg, correlationId, replyToDestinationQueue), null).getBody();
transformReplySender.send(replyToDestinationQueue, reply);
}
/**
* Tries to convert the JMS {@link Message} to a {@link TransformRequest}
* If any errors occur standard error {@link TransformReply} are sent back
*
* @param msg Message to be deserialized
* @param correlationId CorrelationId of the message
* @param destination Needed in case deserialization fails. Passed here so we don't retrieve it again.
* @return The converted {@link TransformRequest} instance
*/
private TransformRequest convert(final Message msg, final String correlationId, Destination destination)
{
try
{
return (TransformRequest) transformMessageConverter.fromMessage(msg);
}
catch (MessageConversionException e)
{
String message = "Message conversion exception during T-Request deserialization: ";
logger.error(message + e.getMessage(), e);
replyWithBadRequest(destination, message + e.getMessage(), correlationId);
}
catch (JMSException e)
{
String message = "JMS exception during T-Request deserialization: ";
logger.error(message + e.getMessage(), e);
replyWithInternalSvErr(destination, message + e.getMessage(), correlationId);
}
catch (Exception e)
{
String message = "Exception during T-Request deserialization: ";
logger.error(message + e.getMessage(), e);
replyWithInternalSvErr(destination, message + e.getMessage(), correlationId);
}
catch (Throwable t)
{
logger.error("Error during T-Request deserialization" + t.getMessage(), t);
throw t;
}
return null;
}
private void replyWithBadRequest(final Destination destination, final String msg, final String correlationId)
{
replyWithError(destination, HttpStatus.BAD_REQUEST, msg, correlationId);
}
private void replyWithInternalSvErr(final Destination destination, final String msg, final String correlationId)
{
replyWithError(destination, HttpStatus.INTERNAL_SERVER_ERROR, msg, correlationId);
}
private void replyWithError(final Destination destination, final HttpStatus status, final String msg,
final String correlationId)
{
final TransformReply reply = TransformReply.builder()
.withStatus(status.value())
.withErrorDetails(msg)
.build();
transformReplySender.send(destination, reply, correlationId);
}
private static String tryRetrieveCorrelationId(final Message msg)
{
try
{
return msg.getJMSCorrelationID();
}
catch (Exception ignore)
{
return null;
}
}
}

View File

@ -0,0 +1,66 @@
package org.alfresco.transformer.messaging;
import javax.jms.ConnectionFactory;
import org.alfresco.transform.client.model.TransformRequestValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.JmsListenerConfigurer;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerEndpointRegistrar;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.lang.NonNull;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.transaction.PlatformTransactionManager;
/**
* JMS and messaging configuration for the T-Engines. Contains the basic config in order to have the
* T-Engine able to read from queues and send a reply back.
*
* @author Lucian Tuca
* created on 18/12/2018
*/
@Configuration
public class MessagingConfig implements JmsListenerConfigurer
{
private static final Logger logger = LoggerFactory.getLogger(MessagingConfig.class);
@Override
public void configureJmsListeners(@NonNull JmsListenerEndpointRegistrar registrar)
{
registrar.setMessageHandlerMethodFactory(methodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory methodFactory()
{
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(new TransformRequestValidator());
return factory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
final ConnectionFactory connectionFactory,
final TransformMessageConverter transformMessageConverter)
{
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(transformMessageConverter);
factory.setErrorHandler(t -> logger.error("JMS error: " + t.getMessage(), t));
factory.setTransactionManager(transactionManager(connectionFactory));
return factory;
}
@Bean
public PlatformTransactionManager transactionManager(final ConnectionFactory connectionFactory)
{
final JmsTransactionManager transactionManager = new JmsTransactionManager();
transactionManager.setConnectionFactory(connectionFactory);
return transactionManager;
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright 2015-2018 Alfresco Software, Ltd. All rights reserved.
*
* License rights for this program may be obtained from Alfresco Software, Ltd.
* pursuant to a written agreement and any use of this program without such an
* agreement is prohibited.
*/
package org.alfresco.transformer.messaging;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.alfresco.transform.client.model.TransformReply;
import org.alfresco.transform.client.model.TransformRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.collect.ImmutableMap;
/**
* TODO: Duplicated from the Router
* Custom wrapper over MappingJackson2MessageConverter for T-Request/T-Reply objects.
*
* @author Cezar Leahu
*/
@Service
public class TransformMessageConverter implements MessageConverter
{
private static final Logger logger = LoggerFactory.getLogger(TransformMessageConverter.class);
private static final MappingJackson2MessageConverter converter;
private static final JavaType TRANSFORM_REQUEST_TYPE =
TypeFactory.defaultInstance().constructType(TransformRequest.class);
static
{
converter = new MappingJackson2MessageConverter()
{
@Override
@NonNull
protected JavaType getJavaTypeForMessage(final Message message) throws JMSException
{
if (message.getStringProperty("_type") == null)
{
return TRANSFORM_REQUEST_TYPE;
}
return super.getJavaTypeForMessage(message);
}
};
converter.setTargetType(MessageType.BYTES);
converter.setTypeIdPropertyName("_type");
converter.setTypeIdMappings(ImmutableMap.of(
TransformRequest.class.getName(), TransformRequest.class,
TransformReply.class.getName(), TransformReply.class)
);
}
@Override
@NonNull
public Message toMessage(
@NonNull final Object object,
@NonNull final Session session) throws JMSException, MessageConversionException
{
return converter.toMessage(object, session);
}
@Override
@NonNull
public Object fromMessage(@NonNull final Message message) throws JMSException
{
return converter.fromMessage(message);
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2015-2018 Alfresco Software, Ltd. All rights reserved.
*
* License rights for this program may be obtained from Alfresco Software, Ltd.
* pursuant to a written agreement and any use of this program without such an
* agreement is prohibited.
*/
package org.alfresco.transformer.messaging;
import javax.jms.Destination;
import org.alfresco.transform.client.model.TransformReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
/**
* TODO: Duplicated from the Router
* TransformReplySender Bean
* <p/>
* JMS message sender/publisher
*
* @author Cezar Leahu
*/
@Component
public class TransformReplySender
{
private static final Logger logger = LoggerFactory.getLogger(TransformReplySender.class);
@Autowired
private JmsTemplate jmsTemplate;
public void send(final Destination destination, final TransformReply reply)
{
send(destination, reply, reply.getRequestId());
}
public void send(final Destination destination, final TransformReply reply, final String correlationId)
{
try
{
//jmsTemplate.setSessionTransacted(true); // do we need this?
jmsTemplate.convertAndSend(destination, reply, m -> {
m.setJMSCorrelationID(correlationId);
return m;
});
logger.info("Sent: {} - with correlation ID {}", reply, correlationId);
}
catch (Exception e)
{
logger.error(
"Failed to send T-Reply " + reply + " - for correlation ID " + correlationId, e);
}
}
}

View File

@ -3,6 +3,13 @@ spring:
multipart: multipart:
max-file-size: 8192MB max-file-size: 8192MB
max-request-size: 8192MB max-request-size: 8192MB
activemq:
broker-url: ${ACTIVEMQ_URL:nio://localhost:61616}?jms.watchTopicAdvisories=false
user: ${ACTIVEMQ_USER:admin}
password: ${ACTIVEMQ_PASSWORD:admin}
pool:
enabled: true
max-connections: 10
server: server:
port: 8090 port: 8090
@ -18,6 +25,9 @@ logging:
fileStoreUrl: ${FILE_STORE_URL:http://localhost:8099/alfresco/api/-default-/private/sfs/versions/1/file} fileStoreUrl: ${FILE_STORE_URL:http://localhost:8099/alfresco/api/-default-/private/sfs/versions/1/file}
jms-listener:
concurrency: ${JMS_LISTENER_CONCURRENCY:1-10}
management: management:
endpoints: endpoints:
web: web:

14
pom.xml
View File

@ -24,7 +24,9 @@
<dependency.alfresco-data-model.version>8.8</dependency.alfresco-data-model.version> <dependency.alfresco-data-model.version>8.8</dependency.alfresco-data-model.version>
<dependency.alfresco-jodconverter-core.version>3.0.1.1</dependency.alfresco-jodconverter-core.version> <dependency.alfresco-jodconverter-core.version>3.0.1.1</dependency.alfresco-jodconverter-core.version>
<env.project_version>${project.version}</env.project_version> <env.project_version>${project.version}</env.project_version>
<alfresco-transform-data-model.version>0.0.7</alfresco-transform-data-model.version> <alfresco-transform-data-model.version>0.5.2</alfresco-transform-data-model.version>
<dependency.activemq.version>5.15.8</dependency.activemq.version>
<dependency.jackson.version>2.9.6</dependency.jackson.version>
</properties> </properties>
<modules> <modules>
@ -80,6 +82,16 @@
<artifactId>cxf-rt-transports-http</artifactId> <artifactId>cxf-rt-transports-http</artifactId>
<version>3.2.6</version> <version>3.2.6</version>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${dependency.jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>${dependency.activemq.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>