mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-07-31 17:39:05 +00:00
Bugfix/repo 5610 events are not actually sent to activemq (#360)
* Add events tests * Polished put test: connects to JMS via TCP and validate that the event sent is also received back * Now the tests provides a simple main() that listens on the topic, useful for quick debug sessions * Now the user name is collected in the calling thread, so that the sendEvent does not silently fails * Apply changes following review * Now using queue system to guarantee events order * Add license * Updated logs and corrected comments * Remove empty methods * Now catering for spurious events at startup when database is bootstrapped * Now preserving the txn-id in all events * Moved up definitions in events2.xml after PR feedback Co-authored-by: Bruno Bossola <bruno@meterian.com>
This commit is contained in:
@@ -54,7 +54,6 @@ import org.alfresco.repo.policy.JavaBehaviour;
|
|||||||
import org.alfresco.repo.policy.PolicyComponent;
|
import org.alfresco.repo.policy.PolicyComponent;
|
||||||
import org.alfresco.repo.security.authentication.AuthenticationUtil;
|
import org.alfresco.repo.security.authentication.AuthenticationUtil;
|
||||||
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
|
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
|
||||||
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
|
|
||||||
import org.alfresco.service.cmr.dictionary.DictionaryService;
|
import org.alfresco.service.cmr.dictionary.DictionaryService;
|
||||||
import org.alfresco.service.cmr.repository.AssociationRef;
|
import org.alfresco.service.cmr.repository.AssociationRef;
|
||||||
import org.alfresco.service.cmr.repository.ChildAssociationRef;
|
import org.alfresco.service.cmr.repository.ChildAssociationRef;
|
||||||
@@ -90,11 +89,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
protected DictionaryService dictionaryService;
|
protected DictionaryService dictionaryService;
|
||||||
private DescriptorService descriptorService;
|
private DescriptorService descriptorService;
|
||||||
private EventFilterRegistry eventFilterRegistry;
|
private EventFilterRegistry eventFilterRegistry;
|
||||||
private Event2MessageProducer event2MessageProducer;
|
|
||||||
private TransactionService transactionService;
|
private TransactionService transactionService;
|
||||||
private PersonService personService;
|
private PersonService personService;
|
||||||
protected NodeResourceHelper nodeResourceHelper;
|
protected NodeResourceHelper nodeResourceHelper;
|
||||||
|
|
||||||
|
private EventGeneratorQueue eventGeneratorQueue;
|
||||||
private NodeTypeFilter nodeTypeFilter;
|
private NodeTypeFilter nodeTypeFilter;
|
||||||
private ChildAssociationTypeFilter childAssociationTypeFilter;
|
private ChildAssociationTypeFilter childAssociationTypeFilter;
|
||||||
private EventUserFilter userFilter;
|
private EventUserFilter userFilter;
|
||||||
@@ -109,10 +108,10 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
PropertyCheck.mandatory(this, "dictionaryService", dictionaryService);
|
PropertyCheck.mandatory(this, "dictionaryService", dictionaryService);
|
||||||
PropertyCheck.mandatory(this, "descriptorService", descriptorService);
|
PropertyCheck.mandatory(this, "descriptorService", descriptorService);
|
||||||
PropertyCheck.mandatory(this, "eventFilterRegistry", eventFilterRegistry);
|
PropertyCheck.mandatory(this, "eventFilterRegistry", eventFilterRegistry);
|
||||||
PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
|
|
||||||
PropertyCheck.mandatory(this, "transactionService", transactionService);
|
PropertyCheck.mandatory(this, "transactionService", transactionService);
|
||||||
PropertyCheck.mandatory(this, "personService", personService);
|
PropertyCheck.mandatory(this, "personService", personService);
|
||||||
PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper);
|
PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper);
|
||||||
|
PropertyCheck.mandatory(this, "eventGeneratorQueue", eventGeneratorQueue);
|
||||||
|
|
||||||
this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter();
|
this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter();
|
||||||
this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter();
|
this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter();
|
||||||
@@ -177,12 +176,6 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
this.eventFilterRegistry = eventFilterRegistry;
|
this.eventFilterRegistry = eventFilterRegistry;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
|
||||||
public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer)
|
|
||||||
{
|
|
||||||
this.event2MessageProducer = event2MessageProducer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTransactionService(TransactionService transactionService)
|
public void setTransactionService(TransactionService transactionService)
|
||||||
{
|
{
|
||||||
this.transactionService = transactionService;
|
this.transactionService = transactionService;
|
||||||
@@ -198,6 +191,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
this.nodeResourceHelper = nodeResourceHelper;
|
this.nodeResourceHelper = nodeResourceHelper;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setEventGeneratorQueue(EventGeneratorQueue eventGeneratorQueue)
|
||||||
|
{
|
||||||
|
this.eventGeneratorQueue = eventGeneratorQueue;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onCreateNode(ChildAssociationRef childAssocRef)
|
public void onCreateNode(ChildAssociationRef childAssocRef)
|
||||||
{
|
{
|
||||||
@@ -428,20 +426,26 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
|
|
||||||
protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator)
|
protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator)
|
||||||
{
|
{
|
||||||
|
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
|
||||||
|
eventGeneratorQueue.accept(()-> createEvent(nodeRef, consolidator, eventInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
private RepoEvent<?> createEvent(NodeRef nodeRef, EventConsolidator consolidator, EventInfo eventInfo)
|
||||||
|
{
|
||||||
|
String user = eventInfo.getPrincipal();
|
||||||
|
|
||||||
if (consolidator.isTemporaryNode())
|
if (consolidator.isTemporaryNode())
|
||||||
{
|
{
|
||||||
if (LOGGER.isTraceEnabled())
|
if (LOGGER.isTraceEnabled())
|
||||||
{
|
{
|
||||||
LOGGER.trace("Ignoring temporary node: " + nodeRef);
|
LOGGER.trace("Ignoring temporary node: " + nodeRef);
|
||||||
}
|
}
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final String user = AuthenticationUtil.getFullyAuthenticatedUser();
|
|
||||||
// Get the repo event before the filtering,
|
// Get the repo event before the filtering,
|
||||||
// so we can take the latest node info into account
|
// so we can take the latest node info into account
|
||||||
final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user));
|
final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo);
|
||||||
|
|
||||||
|
|
||||||
final QName nodeType = consolidator.getNodeType();
|
final QName nodeType = consolidator.getNodeType();
|
||||||
if (isFiltered(nodeType, user))
|
if (isFiltered(nodeType, user))
|
||||||
@@ -452,7 +456,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
+ ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString())
|
+ ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString())
|
||||||
+ "' created by: " + user);
|
+ "' created by: " + user);
|
||||||
}
|
}
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (event.getType().equals(EventType.NODE_UPDATED.getType()) && consolidator.isResourceBeforeAllFieldsNull())
|
if (event.getType().equals(EventType.NODE_UPDATED.getType()) && consolidator.isResourceBeforeAllFieldsNull())
|
||||||
@@ -461,27 +465,34 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
{
|
{
|
||||||
LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeRef);
|
LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeRef);
|
||||||
}
|
}
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
logAndSendEvent(event, consolidator.getEventTypes());
|
logEvent(event, consolidator.getEventTypes());
|
||||||
|
return event;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
|
protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
|
||||||
{
|
{
|
||||||
|
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
|
||||||
|
eventGeneratorQueue.accept(()-> createEvent(eventInfo, childAssociationRef, consolidator));
|
||||||
|
}
|
||||||
|
|
||||||
|
private RepoEvent<?> createEvent(EventInfo eventInfo, ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
|
||||||
|
{
|
||||||
|
String user = eventInfo.getPrincipal();
|
||||||
if (consolidator.isTemporaryChildAssociation())
|
if (consolidator.isTemporaryChildAssociation())
|
||||||
{
|
{
|
||||||
if (LOGGER.isTraceEnabled())
|
if (LOGGER.isTraceEnabled())
|
||||||
{
|
{
|
||||||
LOGGER.trace("Ignoring temporary child association: " + childAssociationRef);
|
LOGGER.trace("Ignoring temporary child association: " + childAssociationRef);
|
||||||
}
|
}
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final String user = AuthenticationUtil.getFullyAuthenticatedUser();
|
|
||||||
// Get the repo event before the filtering,
|
// Get the repo event before the filtering,
|
||||||
// so we can take the latest association info into account
|
// so we can take the latest association info into account
|
||||||
final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user));
|
final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo);
|
||||||
|
|
||||||
final QName childAssocType = consolidator.getChildAssocType();
|
final QName childAssocType = consolidator.getChildAssocType();
|
||||||
if (isFilteredChildAssociation(childAssocType, user))
|
if (isFilteredChildAssociation(childAssocType, user))
|
||||||
@@ -492,7 +503,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
|
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
|
||||||
+ "' created by: " + user);
|
+ "' created by: " + user);
|
||||||
}
|
}
|
||||||
return;
|
return null;
|
||||||
} else if (childAssociationRef.isPrimary())
|
} else if (childAssociationRef.isPrimary())
|
||||||
{
|
{
|
||||||
if (LOGGER.isTraceEnabled())
|
if (LOGGER.isTraceEnabled())
|
||||||
@@ -501,13 +512,20 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
|
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
|
||||||
+ "' created by: " + user);
|
+ "' created by: " + user);
|
||||||
}
|
}
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
logAndSendEvent(event, consolidator.getEventTypes());
|
logEvent(event, consolidator.getEventTypes());
|
||||||
|
return event;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
|
protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
|
||||||
|
{
|
||||||
|
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
|
||||||
|
eventGeneratorQueue.accept(()-> createEvent(eventInfo, peerAssociationRef, consolidator));
|
||||||
|
}
|
||||||
|
|
||||||
|
private RepoEvent<?> createEvent(EventInfo eventInfo, AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
|
||||||
{
|
{
|
||||||
if (consolidator.isTemporaryPeerAssociation())
|
if (consolidator.isTemporaryPeerAssociation())
|
||||||
{
|
{
|
||||||
@@ -515,30 +533,21 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
|||||||
{
|
{
|
||||||
LOGGER.trace("Ignoring temporary peer association: " + peerAssociationRef);
|
LOGGER.trace("Ignoring temporary peer association: " + peerAssociationRef);
|
||||||
}
|
}
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final String user = AuthenticationUtil.getFullyAuthenticatedUser();
|
RepoEvent<?> event = consolidator.getRepoEvent(eventInfo);
|
||||||
// Get the repo event before the filtering,
|
logEvent(event, consolidator.getEventTypes());
|
||||||
// so we can take the latest association info into account
|
return event;
|
||||||
final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user));
|
|
||||||
|
|
||||||
logAndSendEvent(event, consolidator.getEventTypes());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void logAndSendEvent(RepoEvent<?> event, Deque<EventType> listOfEvents)
|
private void logEvent(RepoEvent<?> event, Deque<EventType> listOfEvents)
|
||||||
{
|
{
|
||||||
if (LOGGER.isTraceEnabled())
|
if (LOGGER.isTraceEnabled())
|
||||||
{
|
{
|
||||||
LOGGER.trace("List of Events:" + listOfEvents);
|
LOGGER.trace("List of Events:" + listOfEvents);
|
||||||
LOGGER.trace("Sending event:" + event);
|
LOGGER.trace("Sending event:" + event);
|
||||||
}
|
}
|
||||||
// Need to execute this in another read txn because Camel expects it
|
|
||||||
transactionService.getRetryingTransactionHelper().doInTransaction((RetryingTransactionCallback<Void>) () -> {
|
|
||||||
event2MessageProducer.send(event);
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}, true, false);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -0,0 +1,179 @@
|
|||||||
|
/*
|
||||||
|
* #%L
|
||||||
|
* Alfresco Repository
|
||||||
|
* %%
|
||||||
|
* Copyright (C) 2005 - 2021 Alfresco Software Limited
|
||||||
|
* %%
|
||||||
|
* This file is part of the Alfresco software.
|
||||||
|
* If the software was purchased under a paid Alfresco license, the terms of
|
||||||
|
* the paid license agreement will prevail. Otherwise, the software is
|
||||||
|
* provided under the following open source license terms:
|
||||||
|
*
|
||||||
|
* Alfresco is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Alfresco is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Lesser General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Lesser General Public License
|
||||||
|
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
* #L%
|
||||||
|
*/
|
||||||
|
package org.alfresco.repo.event2;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.alfresco.repo.event.v1.model.RepoEvent;
|
||||||
|
import org.alfresco.util.PropertyCheck;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This queue allows to create asynchronously the RepoEvent offloading the work to a ThreadPool but
|
||||||
|
* at the same time it preserves the order of the events
|
||||||
|
*/
|
||||||
|
public class EventGeneratorQueue implements InitializingBean
|
||||||
|
{
|
||||||
|
protected static final Log LOGGER = LogFactory.getLog(EventGeneratorQueue.class);
|
||||||
|
|
||||||
|
protected Executor enqueueThreadPoolExecutor;
|
||||||
|
protected Executor dequeueThreadPoolExecutor;
|
||||||
|
protected Event2MessageProducer event2MessageProducer;
|
||||||
|
protected BlockingQueue<EventInMaking> queue = new LinkedBlockingQueue<>();
|
||||||
|
protected Runnable listener = createListener();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterPropertiesSet() throws Exception
|
||||||
|
{
|
||||||
|
PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor);
|
||||||
|
PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor);
|
||||||
|
PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer)
|
||||||
|
{
|
||||||
|
this.event2MessageProducer = event2MessageProducer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnqueueThreadPoolExecutor(Executor enqueueThreadPoolExecutor)
|
||||||
|
{
|
||||||
|
this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDequeueThreadPoolExecutor(Executor dequeueThreadPoolExecutor)
|
||||||
|
{
|
||||||
|
this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor;
|
||||||
|
dequeueThreadPoolExecutor.execute(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Procedure to enqueue the callback functions that creates an event.
|
||||||
|
* @param maker Callback function that creates an event.
|
||||||
|
*/
|
||||||
|
public void accept(Callable<RepoEvent<?>> maker)
|
||||||
|
{
|
||||||
|
EventInMaking eventInMaking = new EventInMaking(maker);
|
||||||
|
queue.offer(eventInMaking);
|
||||||
|
enqueueThreadPoolExecutor.execute(() -> {
|
||||||
|
try
|
||||||
|
{
|
||||||
|
eventInMaking.make();
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
LOGGER.error("Unexpected error while enqueuing maker function for repository event" + e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create listener task in charge of dequeuing and sending events ready to be sent.
|
||||||
|
* @return The task in charge of dequeuing and sending events ready to be sent.
|
||||||
|
*/
|
||||||
|
private Runnable createListener()
|
||||||
|
{
|
||||||
|
return new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
while (!Thread.interrupted())
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
EventInMaking eventInMaking = queue.take();
|
||||||
|
RepoEvent<?> event = eventInMaking.getEventWhenReady();
|
||||||
|
if (event != null)
|
||||||
|
{
|
||||||
|
event2MessageProducer.send(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
LOGGER.error("Unexpected error while dequeuing and sending repository event" + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
LOGGER.warn("Unexpected: rescheduling the listener thread.");
|
||||||
|
dequeueThreadPoolExecutor.execute(listener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Simple class that makes events and allows to retrieve them when ready
|
||||||
|
*/
|
||||||
|
private static class EventInMaking
|
||||||
|
{
|
||||||
|
private Callable<RepoEvent<?>> maker;
|
||||||
|
private volatile RepoEvent<?> event;
|
||||||
|
private CountDownLatch latch;
|
||||||
|
|
||||||
|
public EventInMaking(Callable<RepoEvent<?>> maker)
|
||||||
|
{
|
||||||
|
this.maker = maker;
|
||||||
|
this.latch = new CountDownLatch(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void make() throws Exception
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
event = maker.call();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public RepoEvent<?> getEventWhenReady() throws InterruptedException
|
||||||
|
{
|
||||||
|
latch.await(30, TimeUnit.SECONDS);
|
||||||
|
return event;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return maker.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@@ -38,9 +38,10 @@
|
|||||||
<property name="dictionaryService" ref="dictionaryService"/>
|
<property name="dictionaryService" ref="dictionaryService"/>
|
||||||
<property name="descriptorService" ref="descriptorComponent"/>
|
<property name="descriptorService" ref="descriptorComponent"/>
|
||||||
<property name="eventFilterRegistry" ref="event2FilterRegistry"/>
|
<property name="eventFilterRegistry" ref="event2FilterRegistry"/>
|
||||||
<property name="event2MessageProducer" ref="event2MessageProducer"/>
|
|
||||||
<property name="transactionService" ref="transactionService"/>
|
<property name="transactionService" ref="transactionService"/>
|
||||||
<property name="personService" ref="personService"/>
|
<property name="personService" ref="personService"/>
|
||||||
|
<property name="nodeResourceHelper" ref="nodeResourceHelper"/>
|
||||||
|
<property name="eventGeneratorQueue" ref="eventGeneratorQueue"/>
|
||||||
</bean>
|
</bean>
|
||||||
|
|
||||||
<bean id="baseNodeResourceHelper" abstract="true">
|
<bean id="baseNodeResourceHelper" abstract="true">
|
||||||
@@ -54,7 +55,45 @@
|
|||||||
|
|
||||||
<bean id="nodeResourceHelper" class="org.alfresco.repo.event2.NodeResourceHelper" parent="baseNodeResourceHelper"/>
|
<bean id="nodeResourceHelper" class="org.alfresco.repo.event2.NodeResourceHelper" parent="baseNodeResourceHelper"/>
|
||||||
|
|
||||||
<bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2">
|
<bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2"/>
|
||||||
<property name="nodeResourceHelper" ref="nodeResourceHelper"/>
|
|
||||||
|
<bean id="eventGeneratorQueue" class="org.alfresco.repo.event2.EventGeneratorQueue" >
|
||||||
|
<property name="enqueueThreadPoolExecutor">
|
||||||
|
<ref bean="eventAsyncEnqueueThreadPool" />
|
||||||
|
</property>
|
||||||
|
<property name="dequeueThreadPoolExecutor">
|
||||||
|
<ref bean="eventAsyncDequeueThreadPool" />
|
||||||
|
</property>
|
||||||
|
<property name="event2MessageProducer" ref="event2MessageProducer"/>
|
||||||
|
</bean>
|
||||||
|
|
||||||
|
<bean id="eventAsyncEnqueueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean">
|
||||||
|
<property name="poolName">
|
||||||
|
<value>eventAsyncEnqueueThreadPool</value>
|
||||||
|
</property>
|
||||||
|
<property name="corePoolSize">
|
||||||
|
<value>${repo.event2.queue.enqueueThreadPool.coreSize}</value>
|
||||||
|
</property>
|
||||||
|
<property name="maximumPoolSize">
|
||||||
|
<value>${repo.event2.queue.enqueueThreadPool.maximumSize}</value>
|
||||||
|
</property>
|
||||||
|
<property name="threadPriority">
|
||||||
|
<value>${repo.event2.queue.enqueueThreadPool.priority}</value>
|
||||||
|
</property>
|
||||||
|
</bean>
|
||||||
|
|
||||||
|
<bean id="eventAsyncDequeueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean">
|
||||||
|
<property name="poolName">
|
||||||
|
<value>eventAsyncDequeueThreadPool</value>
|
||||||
|
</property>
|
||||||
|
<property name="corePoolSize">
|
||||||
|
<value>${repo.event2.queue.dequeueThreadPool.coreSize}</value>
|
||||||
|
</property>
|
||||||
|
<property name="maximumPoolSize">
|
||||||
|
<value>${repo.event2.queue.dequeueThreadPool.maximumSize}</value>
|
||||||
|
</property>
|
||||||
|
<property name="threadPriority">
|
||||||
|
<value>${repo.event2.queue.dequeueThreadPool.priority}</value>
|
||||||
|
</property>
|
||||||
</bean>
|
</bean>
|
||||||
</beans>
|
</beans>
|
||||||
|
@@ -1211,6 +1211,15 @@ repo.event2.filter.childAssocTypes=rn:rendition
|
|||||||
repo.event2.filter.users=
|
repo.event2.filter.users=
|
||||||
# Topic name
|
# Topic name
|
||||||
repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2
|
repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2
|
||||||
|
# Thread pool for async enqueue of repo events
|
||||||
|
repo.event2.queue.enqueueThreadPool.priority=1
|
||||||
|
repo.event2.queue.enqueueThreadPool.coreSize=8
|
||||||
|
repo.event2.queue.enqueueThreadPool.maximumSize=10
|
||||||
|
# Thread pool for async dequeue and delivery of repo events
|
||||||
|
repo.event2.queue.dequeueThreadPool.priority=1
|
||||||
|
repo.event2.queue.dequeueThreadPool.coreSize=1
|
||||||
|
repo.event2.queue.dequeueThreadPool.maximumSize=1
|
||||||
|
|
||||||
|
|
||||||
# MNT-21083
|
# MNT-21083
|
||||||
# --DELETE_NOT_EXISTS - default settings
|
# --DELETE_NOT_EXISTS - default settings
|
||||||
|
@@ -30,6 +30,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
|
|||||||
import static org.awaitility.Awaitility.await;
|
import static org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
@@ -77,17 +78,19 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||||||
|
|
||||||
public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
||||||
{
|
{
|
||||||
|
protected static final boolean DEBUG = false;
|
||||||
|
|
||||||
protected static final String TEST_NAMESPACE = "http://www.alfresco.org/test/ContextAwareRepoEvent";
|
protected static final String TEST_NAMESPACE = "http://www.alfresco.org/test/ContextAwareRepoEvent";
|
||||||
|
protected static final RepoEventContainer EVENT_CONTAINER = new RepoEventContainer();
|
||||||
|
|
||||||
private static final String BROKER_URL = "tcp://localhost:61616";
|
private static final String BROKER_URL = "tcp://localhost:61616";
|
||||||
private static final String TOPIC_NAME = "alfresco.repo.event2";
|
private static final String TOPIC_NAME = "alfresco.repo.event2";
|
||||||
private static final String CAMEL_ROUTE = "jms:topic:" + TOPIC_NAME;
|
private static final String CAMEL_ROUTE = "jms:topic:" + TOPIC_NAME;
|
||||||
private static final RepoEventContainer EVENT_CONTAINER = new RepoEventContainer();
|
|
||||||
private static final CamelContext CAMEL_CONTEXT = new DefaultCamelContext();
|
private static final CamelContext CAMEL_CONTEXT = new DefaultCamelContext();
|
||||||
|
|
||||||
private static boolean isCamelConfigured;
|
private static boolean isCamelConfigured;
|
||||||
private static DataFormat dataFormat;
|
private static DataFormat dataFormat;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
protected RetryingTransactionHelper retryingTransactionHelper;
|
protected RetryingTransactionHelper retryingTransactionHelper;
|
||||||
|
|
||||||
@@ -104,6 +107,13 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
|||||||
@Autowired
|
@Autowired
|
||||||
protected ObjectMapper event2ObjectMapper;
|
protected ObjectMapper event2ObjectMapper;
|
||||||
|
|
||||||
|
@Autowired @Qualifier("eventGeneratorV2")
|
||||||
|
protected EventGenerator eventGenerator;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("eventGeneratorQueue")
|
||||||
|
protected EventGeneratorQueue eventQueue;
|
||||||
|
|
||||||
protected NodeRef rootNodeRef;
|
protected NodeRef rootNodeRef;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
@@ -141,8 +151,35 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
|||||||
}
|
}
|
||||||
return nodeService.getRootNode(storeRef);
|
return nodeService.getRootNode(storeRef);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
flushSpuriousEvents();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When running with an empty database some events related to the creation may
|
||||||
|
* creep up here making the test fails. After attempting several other
|
||||||
|
* strategies, a smart sleep seems to do the work.
|
||||||
|
*/
|
||||||
|
protected void flushSpuriousEvents() throws InterruptedException
|
||||||
|
{
|
||||||
|
int maxloops = 5;
|
||||||
|
|
||||||
|
int count = maxloops;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
Thread.sleep(165l);
|
||||||
|
if (EVENT_CONTAINER.isEmpty())
|
||||||
|
{
|
||||||
|
count--;
|
||||||
|
} else
|
||||||
|
{
|
||||||
|
EVENT_CONTAINER.reset();
|
||||||
|
count = maxloops;
|
||||||
|
}
|
||||||
|
|
||||||
|
} while (count > 0);
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown()
|
public void tearDown()
|
||||||
{
|
{
|
||||||
@@ -179,6 +216,16 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
|||||||
propertyMap).getChildRef());
|
propertyMap).getChildRef());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected NodeRef updateNodeName(NodeRef nodeRef, String newName)
|
||||||
|
{
|
||||||
|
PropertyMap propertyMap = new PropertyMap();
|
||||||
|
propertyMap.put(ContentModel.PROP_NAME, newName);
|
||||||
|
return retryingTransactionHelper.doInTransaction(() -> {
|
||||||
|
nodeService.addProperties(nodeRef, propertyMap);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
protected void deleteNode(NodeRef nodeRef)
|
protected void deleteNode(NodeRef nodeRef)
|
||||||
{
|
{
|
||||||
retryingTransactionHelper.doInTransaction(() -> {
|
retryingTransactionHelper.doInTransaction(() -> {
|
||||||
@@ -376,13 +423,18 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
|||||||
|
|
||||||
public static class RepoEventContainer implements Processor
|
public static class RepoEventContainer implements Processor
|
||||||
{
|
{
|
||||||
private final List<RepoEvent<?>> events = new ArrayList<>();
|
private final List<RepoEvent<?>> events = Collections.synchronizedList(new ArrayList<>());
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(Exchange exchange)
|
public void process(Exchange exchange)
|
||||||
{
|
{
|
||||||
Object object = exchange.getIn().getBody();
|
Object object = exchange.getIn().getBody();
|
||||||
events.add((RepoEvent<?>) object);
|
events.add((RepoEvent<?>) object);
|
||||||
|
|
||||||
|
if (DEBUG)
|
||||||
|
{
|
||||||
|
System.err.println("XX: "+object);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<RepoEvent<?>> getEvents()
|
public List<RepoEvent<?>> getEvents()
|
||||||
@@ -404,6 +456,12 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
|||||||
{
|
{
|
||||||
events.clear();
|
events.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isEmpty()
|
||||||
|
{
|
||||||
|
return events.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@@ -0,0 +1,290 @@
|
|||||||
|
/*
|
||||||
|
* #%L
|
||||||
|
* Alfresco Repository
|
||||||
|
* %%
|
||||||
|
* Copyright (C) 2005 - 2021 Alfresco Software Limited
|
||||||
|
* %%
|
||||||
|
* This file is part of the Alfresco software.
|
||||||
|
* If the software was purchased under a paid Alfresco license, the terms of
|
||||||
|
* the paid license agreement will prevail. Otherwise, the software is
|
||||||
|
* provided under the following open source license terms:
|
||||||
|
*
|
||||||
|
* Alfresco is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Alfresco is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Lesser General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Lesser General Public License
|
||||||
|
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
* #L%
|
||||||
|
*/
|
||||||
|
package org.alfresco.repo.event2;
|
||||||
|
|
||||||
|
import static java.lang.Thread.sleep;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.alfresco.repo.event.v1.model.RepoEvent;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
public class EventGeneratorQueueUnitTest
|
||||||
|
{
|
||||||
|
private EventGeneratorQueue queue;
|
||||||
|
|
||||||
|
private Event2MessageProducer bus;
|
||||||
|
private ExecutorService enqueuePool;
|
||||||
|
private ExecutorService dequeuePool;
|
||||||
|
private List<RepoEvent<?>> recordedEvents;
|
||||||
|
private Map<String, RepoEvent<?>> events;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup()
|
||||||
|
{
|
||||||
|
queue = new EventGeneratorQueue();
|
||||||
|
|
||||||
|
enqueuePool = newThreadPool();
|
||||||
|
queue.setEnqueueThreadPoolExecutor(enqueuePool);
|
||||||
|
dequeuePool = newThreadPool();
|
||||||
|
queue.setDequeueThreadPoolExecutor(dequeuePool);
|
||||||
|
|
||||||
|
bus = mock(Event2MessageProducer.class);
|
||||||
|
queue.setEvent2MessageProducer(bus);
|
||||||
|
|
||||||
|
events = new HashMap<>();
|
||||||
|
|
||||||
|
setupEventsRecorder();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown()
|
||||||
|
{
|
||||||
|
enqueuePool.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupEventsRecorder()
|
||||||
|
{
|
||||||
|
recordedEvents = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
Mockito.doAnswer(new Answer<Void>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable
|
||||||
|
{
|
||||||
|
RepoEvent<?> event = invocation.getArgument(0, RepoEvent.class);
|
||||||
|
recordedEvents.add(event);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(bus).send(any());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldReceiveSingleQuickMessage() throws Exception
|
||||||
|
{
|
||||||
|
queue.accept(messageWithDelay("A", 55l));
|
||||||
|
|
||||||
|
sleep(150l);
|
||||||
|
|
||||||
|
assertEquals(1, recordedEvents.size());
|
||||||
|
assertEquals("A", recordedEvents.get(0).getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotReceiveEventsWhenMessageIsNull() throws Exception
|
||||||
|
{
|
||||||
|
queue.accept(() -> { return null; });
|
||||||
|
|
||||||
|
sleep(150l);
|
||||||
|
|
||||||
|
assertEquals(0, recordedEvents.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldReceiveMultipleMessagesPreservingOrderScenarioOne() throws Exception {
|
||||||
|
queue.accept(messageWithDelay("A", 0l));
|
||||||
|
queue.accept(messageWithDelay("B", 100l));
|
||||||
|
queue.accept(messageWithDelay("C", 200l));
|
||||||
|
|
||||||
|
sleep(450l);
|
||||||
|
|
||||||
|
assertEquals(3, recordedEvents.size());
|
||||||
|
assertEquals("A", recordedEvents.get(0).getId());
|
||||||
|
assertEquals("B", recordedEvents.get(1).getId());
|
||||||
|
assertEquals("C", recordedEvents.get(2).getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldReceiveMultipleMessagesPreservingOrderScenarioTwo() throws Exception
|
||||||
|
{
|
||||||
|
queue.accept(messageWithDelay("A", 300l));
|
||||||
|
queue.accept(messageWithDelay("B", 150l));
|
||||||
|
queue.accept(messageWithDelay("C", 0l));
|
||||||
|
|
||||||
|
sleep(950l);
|
||||||
|
|
||||||
|
assertEquals(3, recordedEvents.size());
|
||||||
|
assertEquals("A", recordedEvents.get(0).getId());
|
||||||
|
assertEquals("B", recordedEvents.get(1).getId());
|
||||||
|
assertEquals("C", recordedEvents.get(2).getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisoned() throws Exception
|
||||||
|
{
|
||||||
|
queue.accept(messageWithDelay("A", 300l));
|
||||||
|
queue.accept(() -> {throw new RuntimeException("Boom! (not to worry, this is a test)");});
|
||||||
|
queue.accept(messageWithDelay("B", 55l));
|
||||||
|
queue.accept(messageWithDelay("C", 0l));
|
||||||
|
|
||||||
|
sleep(950l);
|
||||||
|
|
||||||
|
assertEquals(3, recordedEvents.size());
|
||||||
|
assertEquals("A", recordedEvents.get(0).getId());
|
||||||
|
assertEquals("B", recordedEvents.get(1).getId());
|
||||||
|
assertEquals("C", recordedEvents.get(2).getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisoned() throws Exception
|
||||||
|
{
|
||||||
|
Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l);
|
||||||
|
RepoEvent<?> messageB = makerB.call();
|
||||||
|
doThrow(new RuntimeException("Boom! (not to worry, this is a test)")).when(bus).send(messageB);
|
||||||
|
queue.accept(messageWithDelay("A", 300l));
|
||||||
|
queue.accept(makerB);
|
||||||
|
queue.accept(messageWithDelay("C", 0l));
|
||||||
|
|
||||||
|
sleep(950l);
|
||||||
|
|
||||||
|
assertEquals(2, recordedEvents.size());
|
||||||
|
assertEquals("A", recordedEvents.get(0).getId());
|
||||||
|
assertEquals("C", recordedEvents.get(1).getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisonedWithError() throws Exception
|
||||||
|
{
|
||||||
|
queue.accept(messageWithDelay("A", 300l));
|
||||||
|
queue.accept(() -> {throw new OutOfMemoryError("Boom! (not to worry, this is a test)");});
|
||||||
|
queue.accept(messageWithDelay("B", 55l));
|
||||||
|
queue.accept(messageWithDelay("C", 0l));
|
||||||
|
|
||||||
|
sleep(950l);
|
||||||
|
|
||||||
|
assertEquals(3, recordedEvents.size());
|
||||||
|
assertEquals("A", recordedEvents.get(0).getId());
|
||||||
|
assertEquals("B", recordedEvents.get(1).getId());
|
||||||
|
assertEquals("C", recordedEvents.get(2).getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisonedWithError() throws Exception
|
||||||
|
{
|
||||||
|
Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l);
|
||||||
|
RepoEvent<?> messageB = makerB.call();
|
||||||
|
doThrow(new OutOfMemoryError("Boom! (not to worry, this is a test)")).when(bus).send(messageB);
|
||||||
|
queue.accept(messageWithDelay("A", 300l));
|
||||||
|
queue.accept(makerB);
|
||||||
|
queue.accept(messageWithDelay("C", 0l));
|
||||||
|
|
||||||
|
sleep(950l);
|
||||||
|
|
||||||
|
assertEquals(2, recordedEvents.size());
|
||||||
|
assertEquals("A", recordedEvents.get(0).getId());
|
||||||
|
assertEquals("C", recordedEvents.get(1).getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Callable<RepoEvent<?>> messageWithDelay(String id, long delay)
|
||||||
|
{
|
||||||
|
Callable<RepoEvent<?>> res = new Callable<RepoEvent<?>>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RepoEvent<?> call() throws Exception
|
||||||
|
{
|
||||||
|
if(delay != 0)
|
||||||
|
{
|
||||||
|
sleep(delay);
|
||||||
|
}
|
||||||
|
return newRepoEvent(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
private RepoEvent<?> newRepoEvent(String id)
|
||||||
|
{
|
||||||
|
RepoEvent<?> ev = events.get(id);
|
||||||
|
if (ev!=null)
|
||||||
|
return ev;
|
||||||
|
|
||||||
|
ev = mock(RepoEvent.class);
|
||||||
|
when(ev.getId()).thenReturn(id);
|
||||||
|
when(ev.toString()).thenReturn(id);
|
||||||
|
events.put(id, ev);
|
||||||
|
|
||||||
|
return ev;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ExecutorService newThreadPool()
|
||||||
|
{
|
||||||
|
return new ThreadPoolExecutor(2, Integer.MAX_VALUE,
|
||||||
|
60L, TimeUnit.SECONDS,
|
||||||
|
new SynchronousQueue<Runnable>());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final Executor SYNC_EXECUTOR_SAME_THREAD = new Executor()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void execute(Runnable command)
|
||||||
|
{
|
||||||
|
command.run();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public static final Executor SYNC_EXECUTOR_NEW_THREAD = new Executor()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void execute(Runnable command)
|
||||||
|
{
|
||||||
|
Thread t = new Thread(command);
|
||||||
|
t.start();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
catch (InterruptedException e)
|
||||||
|
{
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
@@ -0,0 +1,249 @@
|
|||||||
|
/*
|
||||||
|
* #%L
|
||||||
|
* Alfresco Repository
|
||||||
|
* %%
|
||||||
|
* Copyright (C) 2005 - 2020 Alfresco Software Limited
|
||||||
|
* %%
|
||||||
|
* This file is part of the Alfresco software.
|
||||||
|
* If the software was purchased under a paid Alfresco license, the terms of
|
||||||
|
* the paid license agreement will prevail. Otherwise, the software is
|
||||||
|
* provided under the following open source license terms:
|
||||||
|
*
|
||||||
|
* Alfresco is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Alfresco is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Lesser General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Lesser General Public License
|
||||||
|
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
* #L%
|
||||||
|
*/
|
||||||
|
package org.alfresco.repo.event2;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.alfresco.model.ContentModel;
|
||||||
|
import org.alfresco.repo.event.databind.ObjectMapperFactory;
|
||||||
|
import org.alfresco.repo.event.v1.model.RepoEvent;
|
||||||
|
import org.alfresco.service.cmr.repository.NodeRef;
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.advisory.DestinationSource;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.awaitility.Awaitility;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
public class EventGeneratorTest extends AbstractContextAwareRepoEvent
|
||||||
|
{
|
||||||
|
private static final String EVENT2_TOPIC_NAME = "alfresco.repo.event2";
|
||||||
|
|
||||||
|
private static final long DUMP_BROKER_TIMEOUT = 50000000l;
|
||||||
|
|
||||||
|
@Autowired @Qualifier("event2ObjectMapper")
|
||||||
|
private ObjectMapper objectMapper;
|
||||||
|
|
||||||
|
private ActiveMQConnection connection;
|
||||||
|
protected List<RepoEvent<?>> receivedEvents;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void startupTopicListener() throws Exception
|
||||||
|
{
|
||||||
|
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||||
|
connection = (ActiveMQConnection) connectionFactory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createTopic(EVENT2_TOPIC_NAME);
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
receivedEvents = Collections.synchronizedList(new LinkedList<>());
|
||||||
|
consumer.setMessageListener(new MessageListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message)
|
||||||
|
{
|
||||||
|
String text = getText(message);
|
||||||
|
RepoEvent<?> event = toRepoEvent(text);
|
||||||
|
|
||||||
|
if (DEBUG)
|
||||||
|
{
|
||||||
|
System.err.println("RX: " + event);
|
||||||
|
}
|
||||||
|
|
||||||
|
receivedEvents.add(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RepoEvent<?> toRepoEvent(String json)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return objectMapper.readValue(json, RepoEvent.class);
|
||||||
|
} catch (Exception e)
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (DEBUG)
|
||||||
|
{
|
||||||
|
System.err.println("Now actively listening on topic " + EVENT2_TOPIC_NAME);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ObjectMapper createObjectMapper()
|
||||||
|
{
|
||||||
|
return ObjectMapperFactory.createInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void shutdownTopicListener() throws Exception
|
||||||
|
{
|
||||||
|
connection.close();
|
||||||
|
connection = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldReceiveEvent2EventsOnNodeCreation() throws Exception
|
||||||
|
{
|
||||||
|
createNode(ContentModel.TYPE_CONTENT);
|
||||||
|
|
||||||
|
Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 1);
|
||||||
|
|
||||||
|
RepoEvent<?> sent = getRepoEvent(1);
|
||||||
|
RepoEvent<?> received = receivedEvents.get(0);
|
||||||
|
assertEventsEquals("Events are different!", sent, received);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertEventsEquals(String message, RepoEvent<?> expected, RepoEvent<?> current)
|
||||||
|
{
|
||||||
|
if (DEBUG)
|
||||||
|
{
|
||||||
|
System.err.println("XP: " + expected);
|
||||||
|
System.err.println("CU: " + current);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(message, expected, current);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldReceiveEvent2EventsInOrder() throws Exception
|
||||||
|
{
|
||||||
|
NodeRef nodeRef = createNode(ContentModel.TYPE_CONTENT);
|
||||||
|
updateNodeName(nodeRef, "TestFile-" + System.currentTimeMillis() + ".txt");
|
||||||
|
deleteNode(nodeRef);
|
||||||
|
|
||||||
|
Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 3);
|
||||||
|
|
||||||
|
RepoEvent<?> sentCreation = getRepoEvent(1);
|
||||||
|
RepoEvent<?> sentUpdate = getRepoEvent(2);
|
||||||
|
RepoEvent<?> sentDeletion = getRepoEvent(3);
|
||||||
|
assertEquals("Expected create event!", sentCreation, (RepoEvent<?>) receivedEvents.get(0));
|
||||||
|
assertEquals("Expected update event!", sentUpdate, (RepoEvent<?>) receivedEvents.get(1));
|
||||||
|
assertEquals("Expected delete event!", sentDeletion, (RepoEvent<?>) receivedEvents.get(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getText(Message message)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ActiveMQTextMessage am = (ActiveMQTextMessage) message;
|
||||||
|
return am.getText();
|
||||||
|
} catch (JMSException e)
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// a simple main to investigate the contents of the local broker
|
||||||
|
public static void main(String[] args) throws Exception
|
||||||
|
{
|
||||||
|
dumpBroker("tcp://localhost:61616", DUMP_BROKER_TIMEOUT);
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void dumpBroker(String url, long timeout) throws Exception
|
||||||
|
{
|
||||||
|
System.out.println("Broker at url: '" + url + "'");
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
|
||||||
|
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
DestinationSource ds = connection.getDestinationSource();
|
||||||
|
|
||||||
|
Set<ActiveMQQueue> queues = ds.getQueues();
|
||||||
|
System.out.println("\nFound " + queues.size() + " queues:");
|
||||||
|
for (ActiveMQQueue queue : queues)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
System.out.println("- " + queue.getQueueName());
|
||||||
|
} catch (JMSException e)
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<ActiveMQTopic> topics = ds.getTopics();
|
||||||
|
System.out.println("\nFound " + topics.size() + " topics:");
|
||||||
|
for (ActiveMQTopic topic : topics)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
System.out.println("- " + topic.getTopicName());
|
||||||
|
} catch (JMSException e)
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createTopic(EVENT2_TOPIC_NAME);
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
System.out.println("\nListening to topic " + EVENT2_TOPIC_NAME + "...");
|
||||||
|
consumer.setMessageListener(new MessageListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message)
|
||||||
|
{
|
||||||
|
String text = getText(message);
|
||||||
|
System.out.println("Received message " + message + "\n" + text + "\n");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Thread.sleep(timeout);
|
||||||
|
} finally
|
||||||
|
{
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@@ -34,7 +34,8 @@ import org.junit.runners.Suite.SuiteClasses;
|
|||||||
UpdateRepoEventIT.class,
|
UpdateRepoEventIT.class,
|
||||||
DeleteRepoEventIT.class,
|
DeleteRepoEventIT.class,
|
||||||
ChildAssociationRepoEventIT.class,
|
ChildAssociationRepoEventIT.class,
|
||||||
PeerAssociationRepoEventIT.class
|
PeerAssociationRepoEventIT.class,
|
||||||
|
EventGeneratorTest.class
|
||||||
})
|
})
|
||||||
public class RepoEvent2ITSuite
|
public class RepoEvent2ITSuite
|
||||||
{
|
{
|
||||||
|
@@ -33,7 +33,8 @@ import org.junit.runners.Suite.SuiteClasses;
|
|||||||
@RunWith(Suite.class)
|
@RunWith(Suite.class)
|
||||||
@SuiteClasses({ EventFilterUnitTest.class,
|
@SuiteClasses({ EventFilterUnitTest.class,
|
||||||
EventConsolidatorUnitTest.class,
|
EventConsolidatorUnitTest.class,
|
||||||
EventJSONSchemaUnitTest.class
|
EventJSONSchemaUnitTest.class,
|
||||||
|
EventGeneratorQueueUnitTest.class
|
||||||
})
|
})
|
||||||
public class RepoEvent2UnitSuite
|
public class RepoEvent2UnitSuite
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user