mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-07-31 17:39:05 +00:00
[ACS-1291] Asynchronous mechanism to send events (#351)
* Performance optimisation spike * Event2 is now sending event asynchronously * Now forcing synchronous calls in tests for events2 * Now qualifying the event service used in tests Co-authored-by: Nana Insaidoo <insaidoo.nana@yahoo.it>
This commit is contained in:
@@ -32,6 +32,7 @@ import java.util.Deque;
|
|||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
import org.alfresco.repo.event.v1.model.EventType;
|
import org.alfresco.repo.event.v1.model.EventType;
|
||||||
import org.alfresco.repo.event.v1.model.RepoEvent;
|
import org.alfresco.repo.event.v1.model.RepoEvent;
|
||||||
@@ -95,6 +96,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
private PersonService personService;
|
private PersonService personService;
|
||||||
protected NodeResourceHelper nodeResourceHelper;
|
protected NodeResourceHelper nodeResourceHelper;
|
||||||
|
|
||||||
|
private Executor threadPoolExecutor;
|
||||||
private NodeTypeFilter nodeTypeFilter;
|
private NodeTypeFilter nodeTypeFilter;
|
||||||
private ChildAssociationTypeFilter childAssociationTypeFilter;
|
private ChildAssociationTypeFilter childAssociationTypeFilter;
|
||||||
private EventUserFilter userFilter;
|
private EventUserFilter userFilter;
|
||||||
@@ -113,6 +115,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
PropertyCheck.mandatory(this, "transactionService", transactionService);
|
PropertyCheck.mandatory(this, "transactionService", transactionService);
|
||||||
PropertyCheck.mandatory(this, "personService", personService);
|
PropertyCheck.mandatory(this, "personService", personService);
|
||||||
PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper);
|
PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper);
|
||||||
|
PropertyCheck.mandatory(this, "threadPoolExecutor", threadPoolExecutor);
|
||||||
|
|
||||||
this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter();
|
this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter();
|
||||||
this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter();
|
this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter();
|
||||||
@@ -198,6 +201,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
this.nodeResourceHelper = nodeResourceHelper;
|
this.nodeResourceHelper = nodeResourceHelper;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setThreadPoolExecutor(Executor threadPoolExecutor)
|
||||||
|
{
|
||||||
|
this.threadPoolExecutor = threadPoolExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onCreateNode(ChildAssociationRef childAssocRef)
|
public void onCreateNode(ChildAssociationRef childAssocRef)
|
||||||
{
|
{
|
||||||
@@ -427,6 +435,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator)
|
protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator)
|
||||||
|
{
|
||||||
|
threadPoolExecutor.execute(()-> sendEventNow(nodeRef, consolidator));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendEventNow(NodeRef nodeRef, EventConsolidator consolidator)
|
||||||
{
|
{
|
||||||
if (consolidator.isTemporaryNode())
|
if (consolidator.isTemporaryNode())
|
||||||
{
|
{
|
||||||
@@ -468,6 +481,12 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
|
protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
|
||||||
|
{
|
||||||
|
threadPoolExecutor.execute(()-> sendEventNow(childAssociationRef, consolidator));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendEventNow(ChildAssociationRef childAssociationRef,
|
||||||
|
ChildAssociationEventConsolidator consolidator)
|
||||||
{
|
{
|
||||||
if (consolidator.isTemporaryChildAssociation())
|
if (consolidator.isTemporaryChildAssociation())
|
||||||
{
|
{
|
||||||
@@ -508,6 +527,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
|
protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
|
||||||
|
{
|
||||||
|
threadPoolExecutor.execute(()-> sendEventNow(peerAssociationRef, consolidator));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendEventNow(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
|
||||||
{
|
{
|
||||||
if (consolidator.isTemporaryPeerAssociation())
|
if (consolidator.isTemporaryPeerAssociation())
|
||||||
{
|
{
|
||||||
@@ -533,6 +557,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
LOGGER.trace("List of Events:" + listOfEvents);
|
LOGGER.trace("List of Events:" + listOfEvents);
|
||||||
LOGGER.trace("Sending event:" + event);
|
LOGGER.trace("Sending event:" + event);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Need to execute this in another read txn because Camel expects it
|
// Need to execute this in another read txn because Camel expects it
|
||||||
transactionService.getRetryingTransactionHelper().doInTransaction((RetryingTransactionCallback<Void>) () -> {
|
transactionService.getRetryingTransactionHelper().doInTransaction((RetryingTransactionCallback<Void>) () -> {
|
||||||
event2MessageProducer.send(event);
|
event2MessageProducer.send(event);
|
||||||
|
@@ -56,5 +56,23 @@
|
|||||||
|
|
||||||
<bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2">
|
<bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2">
|
||||||
<property name="nodeResourceHelper" ref="nodeResourceHelper"/>
|
<property name="nodeResourceHelper" ref="nodeResourceHelper"/>
|
||||||
|
<property name="threadPoolExecutor">
|
||||||
|
<ref bean="eventAsyncThreadPool"/>
|
||||||
|
</property>
|
||||||
|
</bean>
|
||||||
|
|
||||||
|
<bean id="eventAsyncThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean">
|
||||||
|
<property name="poolName">
|
||||||
|
<value>eventAsyncThreadPool</value>
|
||||||
|
</property>
|
||||||
|
<property name="corePoolSize">
|
||||||
|
<value>${repo.event2.threadPool.coreSize}</value>
|
||||||
|
</property>
|
||||||
|
<property name="maximumPoolSize">
|
||||||
|
<value>${repo.event2.threadPool.maximumSize}</value>
|
||||||
|
</property>
|
||||||
|
<property name="threadPriority">
|
||||||
|
<value>${repo.event2.threadPool.priority}</value>
|
||||||
|
</property>
|
||||||
</bean>
|
</bean>
|
||||||
</beans>
|
</beans>
|
||||||
|
@@ -1209,6 +1209,10 @@ repo.event2.filter.childAssocTypes=rn:rendition
|
|||||||
repo.event2.filter.users=System, null
|
repo.event2.filter.users=System, null
|
||||||
# Topic name
|
# Topic name
|
||||||
repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2
|
repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2
|
||||||
|
# Thread pool for async delivery
|
||||||
|
repo.event2.threadPool.priority=1
|
||||||
|
repo.event2.threadPool.coreSize=8
|
||||||
|
repo.event2.threadPool.maximumSize=10
|
||||||
|
|
||||||
# MNT-21083
|
# MNT-21083
|
||||||
# --DELETE_NOT_EXISTS - default settings
|
# --DELETE_NOT_EXISTS - default settings
|
||||||
|
@@ -31,6 +31,7 @@ import static org.awaitility.Awaitility.await;
|
|||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
|
|
||||||
@@ -104,6 +105,10 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
|||||||
@Autowired
|
@Autowired
|
||||||
protected ObjectMapper event2ObjectMapper;
|
protected ObjectMapper event2ObjectMapper;
|
||||||
|
|
||||||
|
@Autowired @Qualifier("eventGeneratorV2")
|
||||||
|
protected EventGenerator eventGenerator;
|
||||||
|
|
||||||
|
|
||||||
protected NodeRef rootNodeRef;
|
protected NodeRef rootNodeRef;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
@@ -143,6 +148,18 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void forceEventGeneratorToBeSynchronous() {
|
||||||
|
eventGenerator.setThreadPoolExecutor(new Executor()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void execute(Runnable command)
|
||||||
|
{
|
||||||
|
command.run();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown()
|
public void tearDown()
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user