diff --git a/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java b/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java index 06fea65ba9..34f8a5b3ab 100644 --- a/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java +++ b/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java @@ -32,6 +32,7 @@ import java.util.Deque; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Executor; import org.alfresco.repo.event.v1.model.EventType; import org.alfresco.repo.event.v1.model.RepoEvent; @@ -95,6 +96,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin private PersonService personService; protected NodeResourceHelper nodeResourceHelper; + private Executor threadPoolExecutor; private NodeTypeFilter nodeTypeFilter; private ChildAssociationTypeFilter childAssociationTypeFilter; private EventUserFilter userFilter; @@ -113,6 +115,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin PropertyCheck.mandatory(this, "transactionService", transactionService); PropertyCheck.mandatory(this, "personService", personService); PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper); + PropertyCheck.mandatory(this, "threadPoolExecutor", threadPoolExecutor); this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter(); this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter(); @@ -198,6 +201,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin this.nodeResourceHelper = nodeResourceHelper; } + public void setThreadPoolExecutor(Executor threadPoolExecutor) + { + this.threadPoolExecutor = threadPoolExecutor; + } + @Override public void onCreateNode(ChildAssociationRef childAssocRef) { @@ -427,6 +435,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin } protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator) + { + threadPoolExecutor.execute(()-> sendEventNow(nodeRef, consolidator)); + } + + private void sendEventNow(NodeRef nodeRef, EventConsolidator consolidator) { if (consolidator.isTemporaryNode()) { @@ -468,6 +481,12 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin } protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator) + { + threadPoolExecutor.execute(()-> sendEventNow(childAssociationRef, consolidator)); + } + + private void sendEventNow(ChildAssociationRef childAssociationRef, + ChildAssociationEventConsolidator consolidator) { if (consolidator.isTemporaryChildAssociation()) { @@ -508,6 +527,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin } protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator) + { + threadPoolExecutor.execute(()-> sendEventNow(peerAssociationRef, consolidator)); + } + + private void sendEventNow(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator) { if (consolidator.isTemporaryPeerAssociation()) { @@ -533,6 +557,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin LOGGER.trace("List of Events:" + listOfEvents); LOGGER.trace("Sending event:" + event); } + // Need to execute this in another read txn because Camel expects it transactionService.getRetryingTransactionHelper().doInTransaction((RetryingTransactionCallback) () -> { event2MessageProducer.send(event); diff --git a/repository/src/main/resources/alfresco/events2-context.xml b/repository/src/main/resources/alfresco/events2-context.xml index c5d899023f..e1fc358a65 100644 --- a/repository/src/main/resources/alfresco/events2-context.xml +++ b/repository/src/main/resources/alfresco/events2-context.xml @@ -56,5 +56,23 @@ + + + + + + + + eventAsyncThreadPool + + + ${repo.event2.threadPool.coreSize} + + + ${repo.event2.threadPool.maximumSize} + + + ${repo.event2.threadPool.priority} + diff --git a/repository/src/main/resources/alfresco/repository.properties b/repository/src/main/resources/alfresco/repository.properties index 7f5b92d9c6..1830e2f831 100644 --- a/repository/src/main/resources/alfresco/repository.properties +++ b/repository/src/main/resources/alfresco/repository.properties @@ -1209,6 +1209,10 @@ repo.event2.filter.childAssocTypes=rn:rendition repo.event2.filter.users=System, null # Topic name repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2 +# Thread pool for async delivery +repo.event2.threadPool.priority=1 +repo.event2.threadPool.coreSize=8 +repo.event2.threadPool.maximumSize=10 # MNT-21083 # --DELETE_NOT_EXISTS - default settings diff --git a/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java b/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java index 458336401f..64e365d013 100644 --- a/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java +++ b/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java @@ -31,6 +31,7 @@ import static org.awaitility.Awaitility.await; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; import javax.jms.ConnectionFactory; @@ -104,6 +105,10 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest @Autowired protected ObjectMapper event2ObjectMapper; + @Autowired @Qualifier("eventGeneratorV2") + protected EventGenerator eventGenerator; + + protected NodeRef rootNodeRef; @BeforeClass @@ -143,6 +148,18 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest }); } + @Before + public void forceEventGeneratorToBeSynchronous() { + eventGenerator.setThreadPoolExecutor(new Executor() + { + @Override + public void execute(Runnable command) + { + command.run(); + } + }); + } + @After public void tearDown() {