mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-07-31 17:39:05 +00:00
Bugfix/repo 5610 events are not actually sent to activemq (#360)
* Add events tests * Polished put test: connects to JMS via TCP and validate that the event sent is also received back * Now the tests provides a simple main() that listens on the topic, useful for quick debug sessions * Now the user name is collected in the calling thread, so that the sendEvent does not silently fails * Apply changes following review * Now using queue system to guarantee events order * Add license * Updated logs and corrected comments * Remove empty methods * Now catering for spurious events at startup when database is bootstrapped * Now preserving the txn-id in all events * Moved up definitions in events2.xml after PR feedback Co-authored-by: Bruno Bossola <bruno@meterian.com>
This commit is contained in:
@@ -54,7 +54,6 @@ import org.alfresco.repo.policy.JavaBehaviour;
|
||||
import org.alfresco.repo.policy.PolicyComponent;
|
||||
import org.alfresco.repo.security.authentication.AuthenticationUtil;
|
||||
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
|
||||
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
|
||||
import org.alfresco.service.cmr.dictionary.DictionaryService;
|
||||
import org.alfresco.service.cmr.repository.AssociationRef;
|
||||
import org.alfresco.service.cmr.repository.ChildAssociationRef;
|
||||
@@ -90,11 +89,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
||||
protected DictionaryService dictionaryService;
|
||||
private DescriptorService descriptorService;
|
||||
private EventFilterRegistry eventFilterRegistry;
|
||||
private Event2MessageProducer event2MessageProducer;
|
||||
private TransactionService transactionService;
|
||||
private PersonService personService;
|
||||
protected NodeResourceHelper nodeResourceHelper;
|
||||
|
||||
private EventGeneratorQueue eventGeneratorQueue;
|
||||
private NodeTypeFilter nodeTypeFilter;
|
||||
private ChildAssociationTypeFilter childAssociationTypeFilter;
|
||||
private EventUserFilter userFilter;
|
||||
@@ -109,10 +108,10 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
||||
PropertyCheck.mandatory(this, "dictionaryService", dictionaryService);
|
||||
PropertyCheck.mandatory(this, "descriptorService", descriptorService);
|
||||
PropertyCheck.mandatory(this, "eventFilterRegistry", eventFilterRegistry);
|
||||
PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
|
||||
PropertyCheck.mandatory(this, "transactionService", transactionService);
|
||||
PropertyCheck.mandatory(this, "personService", personService);
|
||||
PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper);
|
||||
PropertyCheck.mandatory(this, "eventGeneratorQueue", eventGeneratorQueue);
|
||||
|
||||
this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter();
|
||||
this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter();
|
||||
@@ -177,12 +176,6 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
||||
this.eventFilterRegistry = eventFilterRegistry;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer)
|
||||
{
|
||||
this.event2MessageProducer = event2MessageProducer;
|
||||
}
|
||||
|
||||
public void setTransactionService(TransactionService transactionService)
|
||||
{
|
||||
this.transactionService = transactionService;
|
||||
@@ -198,6 +191,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
||||
this.nodeResourceHelper = nodeResourceHelper;
|
||||
}
|
||||
|
||||
public void setEventGeneratorQueue(EventGeneratorQueue eventGeneratorQueue)
|
||||
{
|
||||
this.eventGeneratorQueue = eventGeneratorQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCreateNode(ChildAssociationRef childAssocRef)
|
||||
{
|
||||
@@ -428,20 +426,26 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
||||
|
||||
protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator)
|
||||
{
|
||||
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
|
||||
eventGeneratorQueue.accept(()-> createEvent(nodeRef, consolidator, eventInfo));
|
||||
}
|
||||
|
||||
private RepoEvent<?> createEvent(NodeRef nodeRef, EventConsolidator consolidator, EventInfo eventInfo)
|
||||
{
|
||||
String user = eventInfo.getPrincipal();
|
||||
|
||||
if (consolidator.isTemporaryNode())
|
||||
{
|
||||
if (LOGGER.isTraceEnabled())
|
||||
{
|
||||
LOGGER.trace("Ignoring temporary node: " + nodeRef);
|
||||
}
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
final String user = AuthenticationUtil.getFullyAuthenticatedUser();
|
||||
// Get the repo event before the filtering,
|
||||
// so we can take the latest node info into account
|
||||
final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user));
|
||||
|
||||
final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo);
|
||||
|
||||
final QName nodeType = consolidator.getNodeType();
|
||||
if (isFiltered(nodeType, user))
|
||||
@@ -452,7 +456,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
||||
+ ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString())
|
||||
+ "' created by: " + user);
|
||||
}
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (event.getType().equals(EventType.NODE_UPDATED.getType()) && consolidator.isResourceBeforeAllFieldsNull())
|
||||
@@ -461,27 +465,34 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
||||
{
|
||||
LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeRef);
|
||||
}
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
logAndSendEvent(event, consolidator.getEventTypes());
|
||||
logEvent(event, consolidator.getEventTypes());
|
||||
return event;
|
||||
}
|
||||
|
||||
protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
|
||||
{
|
||||
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
|
||||
eventGeneratorQueue.accept(()-> createEvent(eventInfo, childAssociationRef, consolidator));
|
||||
}
|
||||
|
||||
private RepoEvent<?> createEvent(EventInfo eventInfo, ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
|
||||
{
|
||||
String user = eventInfo.getPrincipal();
|
||||
if (consolidator.isTemporaryChildAssociation())
|
||||
{
|
||||
if (LOGGER.isTraceEnabled())
|
||||
{
|
||||
LOGGER.trace("Ignoring temporary child association: " + childAssociationRef);
|
||||
}
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
final String user = AuthenticationUtil.getFullyAuthenticatedUser();
|
||||
// Get the repo event before the filtering,
|
||||
// so we can take the latest association info into account
|
||||
final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user));
|
||||
final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo);
|
||||
|
||||
final QName childAssocType = consolidator.getChildAssocType();
|
||||
if (isFilteredChildAssociation(childAssocType, user))
|
||||
@@ -492,7 +503,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
||||
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
|
||||
+ "' created by: " + user);
|
||||
}
|
||||
return;
|
||||
return null;
|
||||
} else if (childAssociationRef.isPrimary())
|
||||
{
|
||||
if (LOGGER.isTraceEnabled())
|
||||
@@ -501,13 +512,20 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
||||
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
|
||||
+ "' created by: " + user);
|
||||
}
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
logAndSendEvent(event, consolidator.getEventTypes());
|
||||
logEvent(event, consolidator.getEventTypes());
|
||||
return event;
|
||||
}
|
||||
|
||||
protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
|
||||
{
|
||||
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
|
||||
eventGeneratorQueue.accept(()-> createEvent(eventInfo, peerAssociationRef, consolidator));
|
||||
}
|
||||
|
||||
private RepoEvent<?> createEvent(EventInfo eventInfo, AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
|
||||
{
|
||||
if (consolidator.isTemporaryPeerAssociation())
|
||||
{
|
||||
@@ -515,30 +533,21 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
|
||||
{
|
||||
LOGGER.trace("Ignoring temporary peer association: " + peerAssociationRef);
|
||||
}
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
final String user = AuthenticationUtil.getFullyAuthenticatedUser();
|
||||
// Get the repo event before the filtering,
|
||||
// so we can take the latest association info into account
|
||||
final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user));
|
||||
|
||||
logAndSendEvent(event, consolidator.getEventTypes());
|
||||
RepoEvent<?> event = consolidator.getRepoEvent(eventInfo);
|
||||
logEvent(event, consolidator.getEventTypes());
|
||||
return event;
|
||||
}
|
||||
|
||||
protected void logAndSendEvent(RepoEvent<?> event, Deque<EventType> listOfEvents)
|
||||
private void logEvent(RepoEvent<?> event, Deque<EventType> listOfEvents)
|
||||
{
|
||||
if (LOGGER.isTraceEnabled())
|
||||
{
|
||||
LOGGER.trace("List of Events:" + listOfEvents);
|
||||
LOGGER.trace("Sending event:" + event);
|
||||
}
|
||||
// Need to execute this in another read txn because Camel expects it
|
||||
transactionService.getRetryingTransactionHelper().doInTransaction((RetryingTransactionCallback<Void>) () -> {
|
||||
event2MessageProducer.send(event);
|
||||
|
||||
return null;
|
||||
}, true, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -0,0 +1,179 @@
|
||||
/*
|
||||
* #%L
|
||||
* Alfresco Repository
|
||||
* %%
|
||||
* Copyright (C) 2005 - 2021 Alfresco Software Limited
|
||||
* %%
|
||||
* This file is part of the Alfresco software.
|
||||
* If the software was purchased under a paid Alfresco license, the terms of
|
||||
* the paid license agreement will prevail. Otherwise, the software is
|
||||
* provided under the following open source license terms:
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
* #L%
|
||||
*/
|
||||
package org.alfresco.repo.event2;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.alfresco.repo.event.v1.model.RepoEvent;
|
||||
import org.alfresco.util.PropertyCheck;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
|
||||
/*
|
||||
* This queue allows to create asynchronously the RepoEvent offloading the work to a ThreadPool but
|
||||
* at the same time it preserves the order of the events
|
||||
*/
|
||||
public class EventGeneratorQueue implements InitializingBean
|
||||
{
|
||||
protected static final Log LOGGER = LogFactory.getLog(EventGeneratorQueue.class);
|
||||
|
||||
protected Executor enqueueThreadPoolExecutor;
|
||||
protected Executor dequeueThreadPoolExecutor;
|
||||
protected Event2MessageProducer event2MessageProducer;
|
||||
protected BlockingQueue<EventInMaking> queue = new LinkedBlockingQueue<>();
|
||||
protected Runnable listener = createListener();
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception
|
||||
{
|
||||
PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor);
|
||||
PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor);
|
||||
PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
|
||||
}
|
||||
|
||||
public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer)
|
||||
{
|
||||
this.event2MessageProducer = event2MessageProducer;
|
||||
}
|
||||
|
||||
public void setEnqueueThreadPoolExecutor(Executor enqueueThreadPoolExecutor)
|
||||
{
|
||||
this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor;
|
||||
}
|
||||
|
||||
public void setDequeueThreadPoolExecutor(Executor dequeueThreadPoolExecutor)
|
||||
{
|
||||
this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor;
|
||||
dequeueThreadPoolExecutor.execute(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Procedure to enqueue the callback functions that creates an event.
|
||||
* @param maker Callback function that creates an event.
|
||||
*/
|
||||
public void accept(Callable<RepoEvent<?>> maker)
|
||||
{
|
||||
EventInMaking eventInMaking = new EventInMaking(maker);
|
||||
queue.offer(eventInMaking);
|
||||
enqueueThreadPoolExecutor.execute(() -> {
|
||||
try
|
||||
{
|
||||
eventInMaking.make();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
LOGGER.error("Unexpected error while enqueuing maker function for repository event" + e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create listener task in charge of dequeuing and sending events ready to be sent.
|
||||
* @return The task in charge of dequeuing and sending events ready to be sent.
|
||||
*/
|
||||
private Runnable createListener()
|
||||
{
|
||||
return new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
while (!Thread.interrupted())
|
||||
{
|
||||
try
|
||||
{
|
||||
EventInMaking eventInMaking = queue.take();
|
||||
RepoEvent<?> event = eventInMaking.getEventWhenReady();
|
||||
if (event != null)
|
||||
{
|
||||
event2MessageProducer.send(event);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
LOGGER.error("Unexpected error while dequeuing and sending repository event" + e);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
LOGGER.warn("Unexpected: rescheduling the listener thread.");
|
||||
dequeueThreadPoolExecutor.execute(listener);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Simple class that makes events and allows to retrieve them when ready
|
||||
*/
|
||||
private static class EventInMaking
|
||||
{
|
||||
private Callable<RepoEvent<?>> maker;
|
||||
private volatile RepoEvent<?> event;
|
||||
private CountDownLatch latch;
|
||||
|
||||
public EventInMaking(Callable<RepoEvent<?>> maker)
|
||||
{
|
||||
this.maker = maker;
|
||||
this.latch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
public void make() throws Exception
|
||||
{
|
||||
try
|
||||
{
|
||||
event = maker.call();
|
||||
}
|
||||
finally
|
||||
{
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public RepoEvent<?> getEventWhenReady() throws InterruptedException
|
||||
{
|
||||
latch.await(30, TimeUnit.SECONDS);
|
||||
return event;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return maker.toString();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -38,9 +38,10 @@
|
||||
<property name="dictionaryService" ref="dictionaryService"/>
|
||||
<property name="descriptorService" ref="descriptorComponent"/>
|
||||
<property name="eventFilterRegistry" ref="event2FilterRegistry"/>
|
||||
<property name="event2MessageProducer" ref="event2MessageProducer"/>
|
||||
<property name="transactionService" ref="transactionService"/>
|
||||
<property name="personService" ref="personService"/>
|
||||
<property name="nodeResourceHelper" ref="nodeResourceHelper"/>
|
||||
<property name="eventGeneratorQueue" ref="eventGeneratorQueue"/>
|
||||
</bean>
|
||||
|
||||
<bean id="baseNodeResourceHelper" abstract="true">
|
||||
@@ -54,7 +55,45 @@
|
||||
|
||||
<bean id="nodeResourceHelper" class="org.alfresco.repo.event2.NodeResourceHelper" parent="baseNodeResourceHelper"/>
|
||||
|
||||
<bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2">
|
||||
<property name="nodeResourceHelper" ref="nodeResourceHelper"/>
|
||||
<bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2"/>
|
||||
|
||||
<bean id="eventGeneratorQueue" class="org.alfresco.repo.event2.EventGeneratorQueue" >
|
||||
<property name="enqueueThreadPoolExecutor">
|
||||
<ref bean="eventAsyncEnqueueThreadPool" />
|
||||
</property>
|
||||
<property name="dequeueThreadPoolExecutor">
|
||||
<ref bean="eventAsyncDequeueThreadPool" />
|
||||
</property>
|
||||
<property name="event2MessageProducer" ref="event2MessageProducer"/>
|
||||
</bean>
|
||||
|
||||
<bean id="eventAsyncEnqueueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean">
|
||||
<property name="poolName">
|
||||
<value>eventAsyncEnqueueThreadPool</value>
|
||||
</property>
|
||||
<property name="corePoolSize">
|
||||
<value>${repo.event2.queue.enqueueThreadPool.coreSize}</value>
|
||||
</property>
|
||||
<property name="maximumPoolSize">
|
||||
<value>${repo.event2.queue.enqueueThreadPool.maximumSize}</value>
|
||||
</property>
|
||||
<property name="threadPriority">
|
||||
<value>${repo.event2.queue.enqueueThreadPool.priority}</value>
|
||||
</property>
|
||||
</bean>
|
||||
|
||||
<bean id="eventAsyncDequeueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean">
|
||||
<property name="poolName">
|
||||
<value>eventAsyncDequeueThreadPool</value>
|
||||
</property>
|
||||
<property name="corePoolSize">
|
||||
<value>${repo.event2.queue.dequeueThreadPool.coreSize}</value>
|
||||
</property>
|
||||
<property name="maximumPoolSize">
|
||||
<value>${repo.event2.queue.dequeueThreadPool.maximumSize}</value>
|
||||
</property>
|
||||
<property name="threadPriority">
|
||||
<value>${repo.event2.queue.dequeueThreadPool.priority}</value>
|
||||
</property>
|
||||
</bean>
|
||||
</beans>
|
||||
|
@@ -1211,6 +1211,15 @@ repo.event2.filter.childAssocTypes=rn:rendition
|
||||
repo.event2.filter.users=
|
||||
# Topic name
|
||||
repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2
|
||||
# Thread pool for async enqueue of repo events
|
||||
repo.event2.queue.enqueueThreadPool.priority=1
|
||||
repo.event2.queue.enqueueThreadPool.coreSize=8
|
||||
repo.event2.queue.enqueueThreadPool.maximumSize=10
|
||||
# Thread pool for async dequeue and delivery of repo events
|
||||
repo.event2.queue.dequeueThreadPool.priority=1
|
||||
repo.event2.queue.dequeueThreadPool.coreSize=1
|
||||
repo.event2.queue.dequeueThreadPool.maximumSize=1
|
||||
|
||||
|
||||
# MNT-21083
|
||||
# --DELETE_NOT_EXISTS - default settings
|
||||
|
@@ -30,6 +30,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
@@ -77,12 +78,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
||||
{
|
||||
protected static final boolean DEBUG = false;
|
||||
|
||||
protected static final String TEST_NAMESPACE = "http://www.alfresco.org/test/ContextAwareRepoEvent";
|
||||
protected static final RepoEventContainer EVENT_CONTAINER = new RepoEventContainer();
|
||||
|
||||
private static final String BROKER_URL = "tcp://localhost:61616";
|
||||
private static final String TOPIC_NAME = "alfresco.repo.event2";
|
||||
private static final String CAMEL_ROUTE = "jms:topic:" + TOPIC_NAME;
|
||||
private static final RepoEventContainer EVENT_CONTAINER = new RepoEventContainer();
|
||||
private static final CamelContext CAMEL_CONTEXT = new DefaultCamelContext();
|
||||
|
||||
private static boolean isCamelConfigured;
|
||||
@@ -104,6 +107,13 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
||||
@Autowired
|
||||
protected ObjectMapper event2ObjectMapper;
|
||||
|
||||
@Autowired @Qualifier("eventGeneratorV2")
|
||||
protected EventGenerator eventGenerator;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("eventGeneratorQueue")
|
||||
protected EventGeneratorQueue eventQueue;
|
||||
|
||||
protected NodeRef rootNodeRef;
|
||||
|
||||
@BeforeClass
|
||||
@@ -141,6 +151,33 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
||||
}
|
||||
return nodeService.getRootNode(storeRef);
|
||||
});
|
||||
|
||||
flushSpuriousEvents();
|
||||
}
|
||||
|
||||
/*
|
||||
* When running with an empty database some events related to the creation may
|
||||
* creep up here making the test fails. After attempting several other
|
||||
* strategies, a smart sleep seems to do the work.
|
||||
*/
|
||||
protected void flushSpuriousEvents() throws InterruptedException
|
||||
{
|
||||
int maxloops = 5;
|
||||
|
||||
int count = maxloops;
|
||||
do
|
||||
{
|
||||
Thread.sleep(165l);
|
||||
if (EVENT_CONTAINER.isEmpty())
|
||||
{
|
||||
count--;
|
||||
} else
|
||||
{
|
||||
EVENT_CONTAINER.reset();
|
||||
count = maxloops;
|
||||
}
|
||||
|
||||
} while (count > 0);
|
||||
}
|
||||
|
||||
@After
|
||||
@@ -179,6 +216,16 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
||||
propertyMap).getChildRef());
|
||||
}
|
||||
|
||||
protected NodeRef updateNodeName(NodeRef nodeRef, String newName)
|
||||
{
|
||||
PropertyMap propertyMap = new PropertyMap();
|
||||
propertyMap.put(ContentModel.PROP_NAME, newName);
|
||||
return retryingTransactionHelper.doInTransaction(() -> {
|
||||
nodeService.addProperties(nodeRef, propertyMap);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
protected void deleteNode(NodeRef nodeRef)
|
||||
{
|
||||
retryingTransactionHelper.doInTransaction(() -> {
|
||||
@@ -376,13 +423,18 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
||||
|
||||
public static class RepoEventContainer implements Processor
|
||||
{
|
||||
private final List<RepoEvent<?>> events = new ArrayList<>();
|
||||
private final List<RepoEvent<?>> events = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
@Override
|
||||
public void process(Exchange exchange)
|
||||
{
|
||||
Object object = exchange.getIn().getBody();
|
||||
events.add((RepoEvent<?>) object);
|
||||
|
||||
if (DEBUG)
|
||||
{
|
||||
System.err.println("XX: "+object);
|
||||
}
|
||||
}
|
||||
|
||||
public List<RepoEvent<?>> getEvents()
|
||||
@@ -404,6 +456,12 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
|
||||
{
|
||||
events.clear();
|
||||
}
|
||||
|
||||
public boolean isEmpty()
|
||||
{
|
||||
return events.isEmpty();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@@ -0,0 +1,290 @@
|
||||
/*
|
||||
* #%L
|
||||
* Alfresco Repository
|
||||
* %%
|
||||
* Copyright (C) 2005 - 2021 Alfresco Software Limited
|
||||
* %%
|
||||
* This file is part of the Alfresco software.
|
||||
* If the software was purchased under a paid Alfresco license, the terms of
|
||||
* the paid license agreement will prevail. Otherwise, the software is
|
||||
* provided under the following open source license terms:
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
* #L%
|
||||
*/
|
||||
package org.alfresco.repo.event2;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.alfresco.repo.event.v1.model.RepoEvent;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class EventGeneratorQueueUnitTest
|
||||
{
|
||||
private EventGeneratorQueue queue;
|
||||
|
||||
private Event2MessageProducer bus;
|
||||
private ExecutorService enqueuePool;
|
||||
private ExecutorService dequeuePool;
|
||||
private List<RepoEvent<?>> recordedEvents;
|
||||
private Map<String, RepoEvent<?>> events;
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
queue = new EventGeneratorQueue();
|
||||
|
||||
enqueuePool = newThreadPool();
|
||||
queue.setEnqueueThreadPoolExecutor(enqueuePool);
|
||||
dequeuePool = newThreadPool();
|
||||
queue.setDequeueThreadPoolExecutor(dequeuePool);
|
||||
|
||||
bus = mock(Event2MessageProducer.class);
|
||||
queue.setEvent2MessageProducer(bus);
|
||||
|
||||
events = new HashMap<>();
|
||||
|
||||
setupEventsRecorder();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown()
|
||||
{
|
||||
enqueuePool.shutdown();
|
||||
}
|
||||
|
||||
private void setupEventsRecorder()
|
||||
{
|
||||
recordedEvents = new CopyOnWriteArrayList<>();
|
||||
|
||||
Mockito.doAnswer(new Answer<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable
|
||||
{
|
||||
RepoEvent<?> event = invocation.getArgument(0, RepoEvent.class);
|
||||
recordedEvents.add(event);
|
||||
return null;
|
||||
}
|
||||
}).when(bus).send(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReceiveSingleQuickMessage() throws Exception
|
||||
{
|
||||
queue.accept(messageWithDelay("A", 55l));
|
||||
|
||||
sleep(150l);
|
||||
|
||||
assertEquals(1, recordedEvents.size());
|
||||
assertEquals("A", recordedEvents.get(0).getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotReceiveEventsWhenMessageIsNull() throws Exception
|
||||
{
|
||||
queue.accept(() -> { return null; });
|
||||
|
||||
sleep(150l);
|
||||
|
||||
assertEquals(0, recordedEvents.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReceiveMultipleMessagesPreservingOrderScenarioOne() throws Exception {
|
||||
queue.accept(messageWithDelay("A", 0l));
|
||||
queue.accept(messageWithDelay("B", 100l));
|
||||
queue.accept(messageWithDelay("C", 200l));
|
||||
|
||||
sleep(450l);
|
||||
|
||||
assertEquals(3, recordedEvents.size());
|
||||
assertEquals("A", recordedEvents.get(0).getId());
|
||||
assertEquals("B", recordedEvents.get(1).getId());
|
||||
assertEquals("C", recordedEvents.get(2).getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReceiveMultipleMessagesPreservingOrderScenarioTwo() throws Exception
|
||||
{
|
||||
queue.accept(messageWithDelay("A", 300l));
|
||||
queue.accept(messageWithDelay("B", 150l));
|
||||
queue.accept(messageWithDelay("C", 0l));
|
||||
|
||||
sleep(950l);
|
||||
|
||||
assertEquals(3, recordedEvents.size());
|
||||
assertEquals("A", recordedEvents.get(0).getId());
|
||||
assertEquals("B", recordedEvents.get(1).getId());
|
||||
assertEquals("C", recordedEvents.get(2).getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisoned() throws Exception
|
||||
{
|
||||
queue.accept(messageWithDelay("A", 300l));
|
||||
queue.accept(() -> {throw new RuntimeException("Boom! (not to worry, this is a test)");});
|
||||
queue.accept(messageWithDelay("B", 55l));
|
||||
queue.accept(messageWithDelay("C", 0l));
|
||||
|
||||
sleep(950l);
|
||||
|
||||
assertEquals(3, recordedEvents.size());
|
||||
assertEquals("A", recordedEvents.get(0).getId());
|
||||
assertEquals("B", recordedEvents.get(1).getId());
|
||||
assertEquals("C", recordedEvents.get(2).getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisoned() throws Exception
|
||||
{
|
||||
Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l);
|
||||
RepoEvent<?> messageB = makerB.call();
|
||||
doThrow(new RuntimeException("Boom! (not to worry, this is a test)")).when(bus).send(messageB);
|
||||
queue.accept(messageWithDelay("A", 300l));
|
||||
queue.accept(makerB);
|
||||
queue.accept(messageWithDelay("C", 0l));
|
||||
|
||||
sleep(950l);
|
||||
|
||||
assertEquals(2, recordedEvents.size());
|
||||
assertEquals("A", recordedEvents.get(0).getId());
|
||||
assertEquals("C", recordedEvents.get(1).getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisonedWithError() throws Exception
|
||||
{
|
||||
queue.accept(messageWithDelay("A", 300l));
|
||||
queue.accept(() -> {throw new OutOfMemoryError("Boom! (not to worry, this is a test)");});
|
||||
queue.accept(messageWithDelay("B", 55l));
|
||||
queue.accept(messageWithDelay("C", 0l));
|
||||
|
||||
sleep(950l);
|
||||
|
||||
assertEquals(3, recordedEvents.size());
|
||||
assertEquals("A", recordedEvents.get(0).getId());
|
||||
assertEquals("B", recordedEvents.get(1).getId());
|
||||
assertEquals("C", recordedEvents.get(2).getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisonedWithError() throws Exception
|
||||
{
|
||||
Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l);
|
||||
RepoEvent<?> messageB = makerB.call();
|
||||
doThrow(new OutOfMemoryError("Boom! (not to worry, this is a test)")).when(bus).send(messageB);
|
||||
queue.accept(messageWithDelay("A", 300l));
|
||||
queue.accept(makerB);
|
||||
queue.accept(messageWithDelay("C", 0l));
|
||||
|
||||
sleep(950l);
|
||||
|
||||
assertEquals(2, recordedEvents.size());
|
||||
assertEquals("A", recordedEvents.get(0).getId());
|
||||
assertEquals("C", recordedEvents.get(1).getId());
|
||||
}
|
||||
|
||||
private Callable<RepoEvent<?>> messageWithDelay(String id, long delay)
|
||||
{
|
||||
Callable<RepoEvent<?>> res = new Callable<RepoEvent<?>>() {
|
||||
|
||||
@Override
|
||||
public RepoEvent<?> call() throws Exception
|
||||
{
|
||||
if(delay != 0)
|
||||
{
|
||||
sleep(delay);
|
||||
}
|
||||
return newRepoEvent(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return id;
|
||||
}
|
||||
};
|
||||
return res;
|
||||
}
|
||||
|
||||
private RepoEvent<?> newRepoEvent(String id)
|
||||
{
|
||||
RepoEvent<?> ev = events.get(id);
|
||||
if (ev!=null)
|
||||
return ev;
|
||||
|
||||
ev = mock(RepoEvent.class);
|
||||
when(ev.getId()).thenReturn(id);
|
||||
when(ev.toString()).thenReturn(id);
|
||||
events.put(id, ev);
|
||||
|
||||
return ev;
|
||||
}
|
||||
|
||||
public static ExecutorService newThreadPool()
|
||||
{
|
||||
return new ThreadPoolExecutor(2, Integer.MAX_VALUE,
|
||||
60L, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<Runnable>());
|
||||
}
|
||||
|
||||
public static final Executor SYNC_EXECUTOR_SAME_THREAD = new Executor()
|
||||
{
|
||||
@Override
|
||||
public void execute(Runnable command)
|
||||
{
|
||||
command.run();
|
||||
}
|
||||
};
|
||||
|
||||
public static final Executor SYNC_EXECUTOR_NEW_THREAD = new Executor()
|
||||
{
|
||||
@Override
|
||||
public void execute(Runnable command)
|
||||
{
|
||||
Thread t = new Thread(command);
|
||||
t.start();
|
||||
try
|
||||
{
|
||||
t.join();
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
@@ -0,0 +1,249 @@
|
||||
/*
|
||||
* #%L
|
||||
* Alfresco Repository
|
||||
* %%
|
||||
* Copyright (C) 2005 - 2020 Alfresco Software Limited
|
||||
* %%
|
||||
* This file is part of the Alfresco software.
|
||||
* If the software was purchased under a paid Alfresco license, the terms of
|
||||
* the paid license agreement will prevail. Otherwise, the software is
|
||||
* provided under the following open source license terms:
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
* #L%
|
||||
*/
|
||||
package org.alfresco.repo.event2;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.alfresco.model.ContentModel;
|
||||
import org.alfresco.repo.event.databind.ObjectMapperFactory;
|
||||
import org.alfresco.repo.event.v1.model.RepoEvent;
|
||||
import org.alfresco.service.cmr.repository.NodeRef;
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.advisory.DestinationSource;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
public class EventGeneratorTest extends AbstractContextAwareRepoEvent
|
||||
{
|
||||
private static final String EVENT2_TOPIC_NAME = "alfresco.repo.event2";
|
||||
|
||||
private static final long DUMP_BROKER_TIMEOUT = 50000000l;
|
||||
|
||||
@Autowired @Qualifier("event2ObjectMapper")
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
private ActiveMQConnection connection;
|
||||
protected List<RepoEvent<?>> receivedEvents;
|
||||
|
||||
@Before
|
||||
public void startupTopicListener() throws Exception
|
||||
{
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
connection = (ActiveMQConnection) connectionFactory.createConnection();
|
||||
connection.start();
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createTopic(EVENT2_TOPIC_NAME);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
receivedEvents = Collections.synchronizedList(new LinkedList<>());
|
||||
consumer.setMessageListener(new MessageListener()
|
||||
{
|
||||
@Override
|
||||
public void onMessage(Message message)
|
||||
{
|
||||
String text = getText(message);
|
||||
RepoEvent<?> event = toRepoEvent(text);
|
||||
|
||||
if (DEBUG)
|
||||
{
|
||||
System.err.println("RX: " + event);
|
||||
}
|
||||
|
||||
receivedEvents.add(event);
|
||||
}
|
||||
|
||||
private RepoEvent<?> toRepoEvent(String json)
|
||||
{
|
||||
try
|
||||
{
|
||||
return objectMapper.readValue(json, RepoEvent.class);
|
||||
} catch (Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (DEBUG)
|
||||
{
|
||||
System.err.println("Now actively listening on topic " + EVENT2_TOPIC_NAME);
|
||||
}
|
||||
}
|
||||
|
||||
protected ObjectMapper createObjectMapper()
|
||||
{
|
||||
return ObjectMapperFactory.createInstance();
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutdownTopicListener() throws Exception
|
||||
{
|
||||
connection.close();
|
||||
connection = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReceiveEvent2EventsOnNodeCreation() throws Exception
|
||||
{
|
||||
createNode(ContentModel.TYPE_CONTENT);
|
||||
|
||||
Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 1);
|
||||
|
||||
RepoEvent<?> sent = getRepoEvent(1);
|
||||
RepoEvent<?> received = receivedEvents.get(0);
|
||||
assertEventsEquals("Events are different!", sent, received);
|
||||
}
|
||||
|
||||
private void assertEventsEquals(String message, RepoEvent<?> expected, RepoEvent<?> current)
|
||||
{
|
||||
if (DEBUG)
|
||||
{
|
||||
System.err.println("XP: " + expected);
|
||||
System.err.println("CU: " + current);
|
||||
}
|
||||
|
||||
assertEquals(message, expected, current);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReceiveEvent2EventsInOrder() throws Exception
|
||||
{
|
||||
NodeRef nodeRef = createNode(ContentModel.TYPE_CONTENT);
|
||||
updateNodeName(nodeRef, "TestFile-" + System.currentTimeMillis() + ".txt");
|
||||
deleteNode(nodeRef);
|
||||
|
||||
Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 3);
|
||||
|
||||
RepoEvent<?> sentCreation = getRepoEvent(1);
|
||||
RepoEvent<?> sentUpdate = getRepoEvent(2);
|
||||
RepoEvent<?> sentDeletion = getRepoEvent(3);
|
||||
assertEquals("Expected create event!", sentCreation, (RepoEvent<?>) receivedEvents.get(0));
|
||||
assertEquals("Expected update event!", sentUpdate, (RepoEvent<?>) receivedEvents.get(1));
|
||||
assertEquals("Expected delete event!", sentDeletion, (RepoEvent<?>) receivedEvents.get(2));
|
||||
}
|
||||
|
||||
private static String getText(Message message)
|
||||
{
|
||||
try
|
||||
{
|
||||
ActiveMQTextMessage am = (ActiveMQTextMessage) message;
|
||||
return am.getText();
|
||||
} catch (JMSException e)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// a simple main to investigate the contents of the local broker
|
||||
public static void main(String[] args) throws Exception
|
||||
{
|
||||
dumpBroker("tcp://localhost:61616", DUMP_BROKER_TIMEOUT);
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
private static void dumpBroker(String url, long timeout) throws Exception
|
||||
{
|
||||
System.out.println("Broker at url: '" + url + "'");
|
||||
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
|
||||
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
|
||||
try
|
||||
{
|
||||
connection.start();
|
||||
|
||||
DestinationSource ds = connection.getDestinationSource();
|
||||
|
||||
Set<ActiveMQQueue> queues = ds.getQueues();
|
||||
System.out.println("\nFound " + queues.size() + " queues:");
|
||||
for (ActiveMQQueue queue : queues)
|
||||
{
|
||||
try
|
||||
{
|
||||
System.out.println("- " + queue.getQueueName());
|
||||
} catch (JMSException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
Set<ActiveMQTopic> topics = ds.getTopics();
|
||||
System.out.println("\nFound " + topics.size() + " topics:");
|
||||
for (ActiveMQTopic topic : topics)
|
||||
{
|
||||
try
|
||||
{
|
||||
System.out.println("- " + topic.getTopicName());
|
||||
} catch (JMSException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createTopic(EVENT2_TOPIC_NAME);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
System.out.println("\nListening to topic " + EVENT2_TOPIC_NAME + "...");
|
||||
consumer.setMessageListener(new MessageListener()
|
||||
{
|
||||
@Override
|
||||
public void onMessage(Message message)
|
||||
{
|
||||
String text = getText(message);
|
||||
System.out.println("Received message " + message + "\n" + text + "\n");
|
||||
}
|
||||
});
|
||||
|
||||
Thread.sleep(timeout);
|
||||
} finally
|
||||
{
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
@@ -34,7 +34,8 @@ import org.junit.runners.Suite.SuiteClasses;
|
||||
UpdateRepoEventIT.class,
|
||||
DeleteRepoEventIT.class,
|
||||
ChildAssociationRepoEventIT.class,
|
||||
PeerAssociationRepoEventIT.class
|
||||
PeerAssociationRepoEventIT.class,
|
||||
EventGeneratorTest.class
|
||||
})
|
||||
public class RepoEvent2ITSuite
|
||||
{
|
||||
|
@@ -33,7 +33,8 @@ import org.junit.runners.Suite.SuiteClasses;
|
||||
@RunWith(Suite.class)
|
||||
@SuiteClasses({ EventFilterUnitTest.class,
|
||||
EventConsolidatorUnitTest.class,
|
||||
EventJSONSchemaUnitTest.class
|
||||
EventJSONSchemaUnitTest.class,
|
||||
EventGeneratorQueueUnitTest.class
|
||||
})
|
||||
public class RepoEvent2UnitSuite
|
||||
{
|
||||
|
Reference in New Issue
Block a user