Compare commits

...

11 Commits

Author SHA1 Message Date
Manish Kumar
c8591f9bde Merge branch 'master' of https://github.com/Alfresco/alfresco-community-repo into feature/APPS-2486-IntegrateSqsSns 2024-01-04 10:21:04 +05:30
Manish Kumar
9bf7ecfa87 [APPS-2486] changed topic name 2023-12-29 12:47:09 +05:30
Manish Kumar
83fc44559f [APPS-2486] added changes 2023-12-29 12:22:19 +05:30
Manish Kumar
a4b0949f2d [APPS-2486] corrected conditon in renditon and transform 2023-12-26 16:35:06 +05:30
Manish Kumar
27a03dd009 [APPS-2486] added condition for repository queues to switch between different endpoint 2023-12-26 13:53:06 +05:30
Manish Kumar
c72b8759db [APPS-2486] added condition to switch between different endpoint 2023-12-20 17:06:32 +05:30
Manish Kumar
b8639dbfc6 [APPS-2486] added method getEndpointUrl 2023-12-20 14:31:50 +05:30
Manish Kumar
b3f9d13571 [APPS-2486] pushed initial configurations 2023-12-19 20:45:49 +05:30
Manish Kumar
bc05fa831a added camelTOSqs class 2023-12-13 17:38:29 +05:30
Manish Kumar
d2ac56ad98 added some attributes 2023-12-06 15:55:09 +05:30
Manish Kumar
0b796edcb1 added some initial code 2023-12-06 14:56:04 +05:30
7 changed files with 81 additions and 4 deletions

View File

@@ -11,6 +11,18 @@
</parent>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.camel/camel-aws2-sqs -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws2-sqs</artifactId>
<version>4.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.camel/camel-aws2-sns -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws2-sns</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.alfresco</groupId>
<artifactId>alfresco-data-model</artifactId>

View File

@@ -25,6 +25,7 @@
*/
package org.alfresco.messaging.camel.routes;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -43,6 +44,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -56,6 +58,9 @@ public class OnContentUpdateRenditionRoute extends RouteBuilder
{
private static Log logger = LogFactory.getLog(OnContentUpdateRenditionRoute.class);
@Autowired
@Qualifier("global-properties")
Properties globalProperties;
@Value("${acs.repo.rendition.events.endpoint}")
public String sourceQueue;
@@ -81,7 +86,7 @@ public class OnContentUpdateRenditionRoute extends RouteBuilder
Behaviour.NotificationFrequency.EVERY_EVENT);
policyComponent.bindClassBehaviour(ContentServicePolicies.OnContentUpdatePolicy.QNAME, RenditionModel.ASPECT_RENDITIONED, eventBehaviour);
from(sourceQueue).threads().executorService(executorService).process("renditionEventProcessor").end();
from(isBrokerEnabled() ? sourceQueue : getEndpointUrl()).threads().executorService(executorService).process("renditionEventProcessor").end();
}
@SuppressWarnings("unused")
@@ -102,4 +107,15 @@ public class OnContentUpdateRenditionRoute extends RouteBuilder
event.setNewContent(newContent);
return event;
}
public Boolean isBrokerEnabled()
{
return Boolean.valueOf(this.globalProperties.getProperty("messaging.broker.enabled"));
}
public String getEndpointUrl()
{
return this.globalProperties.getProperty("acs.repo.rendition.events.sqs.endpoint")
+"?accessKey=RAW("+this.globalProperties.getProperty("connector.sns.accessKey")
+")&secretKey=RAW("+this.globalProperties.getProperty("connector.sns.secretKey")
+")&region="+this.globalProperties.getProperty("connector.sns.region");
}
}

View File

@@ -34,6 +34,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -47,6 +48,10 @@ public class TransformRequestConsumer extends RouteBuilder
{
private static Log logger = LogFactory.getLog(TransformRequestConsumer.class);
@Autowired
@Qualifier("global-properties")
private Properties globalProperties;
@Value("${acs.repo.transform.request.endpoint}")
public String sourceQueue;
@@ -80,6 +85,17 @@ public class TransformRequestConsumer extends RouteBuilder
logger.debug("SourceQueue is " + sourceQueue);
}
from(sourceQueue).threads().executorService(executorService).process(processor).end();
from(isBrokerEnabled() ? sourceQueue : getEndpointUrl()).threads().executorService(executorService).process(processor).end();
}
public Boolean isBrokerEnabled()
{
return Boolean.valueOf(this.globalProperties.getProperty("messaging.broker.enabled"));
}
public String getEndpointUrl()
{
return this.globalProperties.getProperty("acs.repo.transform.request.sqs.endpoint")
+"?accessKey=RAW("+this.globalProperties.getProperty("connector.sns.accessKey")
+")&secretKey=RAW("+this.globalProperties.getProperty("connector.sns.secretKey")
+")&region="+this.globalProperties.getProperty("connector.sns.region");
}
}

View File

@@ -48,6 +48,7 @@ public class Event2MessageProducer extends AbstractEventProducer implements Init
PropertyCheck.mandatory(this, "producer", this.producer);
PropertyCheck.mandatory(this, "endpoint", this.endpoint);
PropertyCheck.mandatory(this, "objectMapper", this.objectMapper);
PropertyCheck.mandatory(this, "globalProperty", this.globalProperties);
if (StringUtils.isEmpty(this.endpoint))
{
@@ -57,7 +58,7 @@ public class Event2MessageProducer extends AbstractEventProducer implements Init
public void send(Object event)
{
send(this.endpoint, null, event, null);
send(isBrokerEnabled() ? this.endpoint : getEndpointUrl(), null, event, null);
}
@Override
@@ -73,7 +74,6 @@ public class Event2MessageProducer extends AbstractEventProducer implements Init
{
exchangePattern = ExchangePattern.InOnly;
}
this.producer.sendBodyAndHeaders(endpointUri, exchangePattern, event, this.addHeaders(headers));
}
catch (Exception e)
@@ -81,4 +81,16 @@ public class Event2MessageProducer extends AbstractEventProducer implements Init
throw new AlfrescoRuntimeException(ERROR_SENDING, e);
}
}
public String getEndpointUrl()
{
return this.globalProperties.getProperty("repo.event2.sns.endpoint")
+"?accessKey=RAW("+this.globalProperties.getProperty("connector.sns.accessKey")
+")&secretKey=RAW("+this.globalProperties.getProperty("connector.sns.secretKey")
+")&region="+this.globalProperties.getProperty("connector.sns.region");
}
public Boolean isBrokerEnabled()
{
return Boolean.valueOf(this.globalProperties.getProperty("messaging.broker.enabled"));
}
}

View File

@@ -33,6 +33,7 @@ import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Abstract helper to send events to an endpoint. The
@@ -60,6 +61,7 @@ public abstract class AbstractEventProducer
protected ProducerTemplate producer;
protected String endpoint;
protected ObjectMapper objectMapper;
protected Properties globalProperties;
public void setProducer(ProducerTemplate producer)
{
@@ -75,6 +77,10 @@ public abstract class AbstractEventProducer
{
this.objectMapper = objectMapper;
}
public void setGlobalProperties(Properties globalProperties)
{
this.globalProperties = globalProperties;
}
protected Map<String, Object> addHeaders(Map<String, Object> origHeaders)
{

View File

@@ -57,6 +57,7 @@
<bean id="event2MessageProducer" class="org.alfresco.repo.event2.Event2MessageProducer">
<property name="producer" ref="camelProducerTemplate"/>
<property name="endpoint" value="${repo.event2.topic.endpoint}"/>
<property name="globalProperties" ref="global-properties"/>
<property name="objectMapper" ref="event2ObjectMapper"/>
</bean>
</beans>

View File

@@ -1241,6 +1241,11 @@ repo.event2.queue.dequeueThreadPool.priority=1
repo.event2.queue.dequeueThreadPool.coreSize=1
repo.event2.queue.dequeueThreadPool.maximumSize=1
#if enabled then it will use ActiveMQ as a message broker
messaging.broker.enabled=true
connector.sns.accessKey=changeMe
connector.sns.secretKey=changeMe
connector.sns.region=changeMe
# MNT-21083
# --DELETE_NOT_EXISTS - default settings
@@ -1375,3 +1380,12 @@ scripts.execution.maxMemoryUsedInBytes=-1
# Number of instructions that will trigger the observer
scripts.execution.observerInstructionCount=5000
# -------AWS Specific Queues and Topics----------
#Topics
repo.event2.sns.endpoint=aws2-sns://alfresco-repo-event2
#Queues
acs.repo.rendition.events.sqs.endpoint=aws2-sqs://acs-repo-rendition-events
acs.repo.transform.request.sqs.endpoint=aws2-sqs://acs-repo-transform-request