Bugfix/repo 5610 events are not actually sent to activemq (#360)

* Add events tests

* Polished put test: connects to JMS via TCP and validate that the event sent is also received back

* Now the tests provides a simple main() that listens on the topic, useful for quick debug sessions

* Now the user name is collected in the calling thread, so that the sendEvent does not silently fails

* Apply changes following review

* Now using queue system to guarantee events order

* Add license

* Updated logs and corrected comments

* Remove empty methods

* Now catering for spurious events at startup when database is bootstrapped

* Now preserving the txn-id in all events

* Moved up definitions in events2.xml after PR feedback

Co-authored-by: Bruno Bossola <bruno@meterian.com>
This commit is contained in:
Nana Insaidoo
2021-04-09 13:34:05 +01:00
committed by GitHub
parent 0ca611dcfd
commit 046116ddf0
9 changed files with 879 additions and 44 deletions

View File

@@ -54,7 +54,6 @@ import org.alfresco.repo.policy.JavaBehaviour;
import org.alfresco.repo.policy.PolicyComponent;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.cmr.dictionary.DictionaryService;
import org.alfresco.service.cmr.repository.AssociationRef;
import org.alfresco.service.cmr.repository.ChildAssociationRef;
@@ -90,11 +89,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
protected DictionaryService dictionaryService;
private DescriptorService descriptorService;
private EventFilterRegistry eventFilterRegistry;
private Event2MessageProducer event2MessageProducer;
private TransactionService transactionService;
private PersonService personService;
protected NodeResourceHelper nodeResourceHelper;
private EventGeneratorQueue eventGeneratorQueue;
private NodeTypeFilter nodeTypeFilter;
private ChildAssociationTypeFilter childAssociationTypeFilter;
private EventUserFilter userFilter;
@@ -109,10 +108,10 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
PropertyCheck.mandatory(this, "dictionaryService", dictionaryService);
PropertyCheck.mandatory(this, "descriptorService", descriptorService);
PropertyCheck.mandatory(this, "eventFilterRegistry", eventFilterRegistry);
PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
PropertyCheck.mandatory(this, "transactionService", transactionService);
PropertyCheck.mandatory(this, "personService", personService);
PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper);
PropertyCheck.mandatory(this, "eventGeneratorQueue", eventGeneratorQueue);
this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter();
this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter();
@@ -177,12 +176,6 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
this.eventFilterRegistry = eventFilterRegistry;
}
@SuppressWarnings("unused")
public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer)
{
this.event2MessageProducer = event2MessageProducer;
}
public void setTransactionService(TransactionService transactionService)
{
this.transactionService = transactionService;
@@ -198,6 +191,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
this.nodeResourceHelper = nodeResourceHelper;
}
public void setEventGeneratorQueue(EventGeneratorQueue eventGeneratorQueue)
{
this.eventGeneratorQueue = eventGeneratorQueue;
}
@Override
public void onCreateNode(ChildAssociationRef childAssocRef)
{
@@ -428,20 +426,26 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator)
{
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
eventGeneratorQueue.accept(()-> createEvent(nodeRef, consolidator, eventInfo));
}
private RepoEvent<?> createEvent(NodeRef nodeRef, EventConsolidator consolidator, EventInfo eventInfo)
{
String user = eventInfo.getPrincipal();
if (consolidator.isTemporaryNode())
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("Ignoring temporary node: " + nodeRef);
}
return;
return null;
}
final String user = AuthenticationUtil.getFullyAuthenticatedUser();
// Get the repo event before the filtering,
// so we can take the latest node info into account
final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user));
final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo);
final QName nodeType = consolidator.getNodeType();
if (isFiltered(nodeType, user))
@@ -452,7 +456,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
+ ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString())
+ "' created by: " + user);
}
return;
return null;
}
if (event.getType().equals(EventType.NODE_UPDATED.getType()) && consolidator.isResourceBeforeAllFieldsNull())
@@ -461,27 +465,34 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
{
LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeRef);
}
return;
return null;
}
logAndSendEvent(event, consolidator.getEventTypes());
logEvent(event, consolidator.getEventTypes());
return event;
}
protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
{
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
eventGeneratorQueue.accept(()-> createEvent(eventInfo, childAssociationRef, consolidator));
}
private RepoEvent<?> createEvent(EventInfo eventInfo, ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
{
String user = eventInfo.getPrincipal();
if (consolidator.isTemporaryChildAssociation())
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("Ignoring temporary child association: " + childAssociationRef);
}
return;
return null;
}
final String user = AuthenticationUtil.getFullyAuthenticatedUser();
// Get the repo event before the filtering,
// so we can take the latest association info into account
final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user));
final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo);
final QName childAssocType = consolidator.getChildAssocType();
if (isFilteredChildAssociation(childAssocType, user))
@@ -492,7 +503,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
+ "' created by: " + user);
}
return;
return null;
} else if (childAssociationRef.isPrimary())
{
if (LOGGER.isTraceEnabled())
@@ -501,13 +512,20 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
+ "' created by: " + user);
}
return;
return null;
}
logAndSendEvent(event, consolidator.getEventTypes());
logEvent(event, consolidator.getEventTypes());
return event;
}
protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
{
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
eventGeneratorQueue.accept(()-> createEvent(eventInfo, peerAssociationRef, consolidator));
}
private RepoEvent<?> createEvent(EventInfo eventInfo, AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
{
if (consolidator.isTemporaryPeerAssociation())
{
@@ -515,30 +533,21 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
{
LOGGER.trace("Ignoring temporary peer association: " + peerAssociationRef);
}
return;
return null;
}
final String user = AuthenticationUtil.getFullyAuthenticatedUser();
// Get the repo event before the filtering,
// so we can take the latest association info into account
final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user));
logAndSendEvent(event, consolidator.getEventTypes());
RepoEvent<?> event = consolidator.getRepoEvent(eventInfo);
logEvent(event, consolidator.getEventTypes());
return event;
}
protected void logAndSendEvent(RepoEvent<?> event, Deque<EventType> listOfEvents)
private void logEvent(RepoEvent<?> event, Deque<EventType> listOfEvents)
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("List of Events:" + listOfEvents);
LOGGER.trace("Sending event:" + event);
}
// Need to execute this in another read txn because Camel expects it
transactionService.getRetryingTransactionHelper().doInTransaction((RetryingTransactionCallback<Void>) () -> {
event2MessageProducer.send(event);
return null;
}, true, false);
}
}

View File

@@ -0,0 +1,179 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2021 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.event2;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.util.PropertyCheck;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
/*
* This queue allows to create asynchronously the RepoEvent offloading the work to a ThreadPool but
* at the same time it preserves the order of the events
*/
public class EventGeneratorQueue implements InitializingBean
{
protected static final Log LOGGER = LogFactory.getLog(EventGeneratorQueue.class);
protected Executor enqueueThreadPoolExecutor;
protected Executor dequeueThreadPoolExecutor;
protected Event2MessageProducer event2MessageProducer;
protected BlockingQueue<EventInMaking> queue = new LinkedBlockingQueue<>();
protected Runnable listener = createListener();
@Override
public void afterPropertiesSet() throws Exception
{
PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor);
PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor);
PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
}
public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer)
{
this.event2MessageProducer = event2MessageProducer;
}
public void setEnqueueThreadPoolExecutor(Executor enqueueThreadPoolExecutor)
{
this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor;
}
public void setDequeueThreadPoolExecutor(Executor dequeueThreadPoolExecutor)
{
this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor;
dequeueThreadPoolExecutor.execute(listener);
}
/**
* Procedure to enqueue the callback functions that creates an event.
* @param maker Callback function that creates an event.
*/
public void accept(Callable<RepoEvent<?>> maker)
{
EventInMaking eventInMaking = new EventInMaking(maker);
queue.offer(eventInMaking);
enqueueThreadPoolExecutor.execute(() -> {
try
{
eventInMaking.make();
}
catch (Exception e)
{
LOGGER.error("Unexpected error while enqueuing maker function for repository event" + e);
}
});
}
/**
* Create listener task in charge of dequeuing and sending events ready to be sent.
* @return The task in charge of dequeuing and sending events ready to be sent.
*/
private Runnable createListener()
{
return new Runnable()
{
@Override
public void run()
{
try
{
while (!Thread.interrupted())
{
try
{
EventInMaking eventInMaking = queue.take();
RepoEvent<?> event = eventInMaking.getEventWhenReady();
if (event != null)
{
event2MessageProducer.send(event);
}
}
catch (Exception e)
{
LOGGER.error("Unexpected error while dequeuing and sending repository event" + e);
}
}
}
finally
{
LOGGER.warn("Unexpected: rescheduling the listener thread.");
dequeueThreadPoolExecutor.execute(listener);
}
}
};
}
/*
* Simple class that makes events and allows to retrieve them when ready
*/
private static class EventInMaking
{
private Callable<RepoEvent<?>> maker;
private volatile RepoEvent<?> event;
private CountDownLatch latch;
public EventInMaking(Callable<RepoEvent<?>> maker)
{
this.maker = maker;
this.latch = new CountDownLatch(1);
}
public void make() throws Exception
{
try
{
event = maker.call();
}
finally
{
latch.countDown();
}
}
public RepoEvent<?> getEventWhenReady() throws InterruptedException
{
latch.await(30, TimeUnit.SECONDS);
return event;
}
@Override
public String toString()
{
return maker.toString();
}
}
}

View File

@@ -38,9 +38,10 @@
<property name="dictionaryService" ref="dictionaryService"/>
<property name="descriptorService" ref="descriptorComponent"/>
<property name="eventFilterRegistry" ref="event2FilterRegistry"/>
<property name="event2MessageProducer" ref="event2MessageProducer"/>
<property name="transactionService" ref="transactionService"/>
<property name="personService" ref="personService"/>
<property name="nodeResourceHelper" ref="nodeResourceHelper"/>
<property name="eventGeneratorQueue" ref="eventGeneratorQueue"/>
</bean>
<bean id="baseNodeResourceHelper" abstract="true">
@@ -54,7 +55,45 @@
<bean id="nodeResourceHelper" class="org.alfresco.repo.event2.NodeResourceHelper" parent="baseNodeResourceHelper"/>
<bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2">
<property name="nodeResourceHelper" ref="nodeResourceHelper"/>
<bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2"/>
<bean id="eventGeneratorQueue" class="org.alfresco.repo.event2.EventGeneratorQueue" >
<property name="enqueueThreadPoolExecutor">
<ref bean="eventAsyncEnqueueThreadPool" />
</property>
<property name="dequeueThreadPoolExecutor">
<ref bean="eventAsyncDequeueThreadPool" />
</property>
<property name="event2MessageProducer" ref="event2MessageProducer"/>
</bean>
<bean id="eventAsyncEnqueueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean">
<property name="poolName">
<value>eventAsyncEnqueueThreadPool</value>
</property>
<property name="corePoolSize">
<value>${repo.event2.queue.enqueueThreadPool.coreSize}</value>
</property>
<property name="maximumPoolSize">
<value>${repo.event2.queue.enqueueThreadPool.maximumSize}</value>
</property>
<property name="threadPriority">
<value>${repo.event2.queue.enqueueThreadPool.priority}</value>
</property>
</bean>
<bean id="eventAsyncDequeueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean">
<property name="poolName">
<value>eventAsyncDequeueThreadPool</value>
</property>
<property name="corePoolSize">
<value>${repo.event2.queue.dequeueThreadPool.coreSize}</value>
</property>
<property name="maximumPoolSize">
<value>${repo.event2.queue.dequeueThreadPool.maximumSize}</value>
</property>
<property name="threadPriority">
<value>${repo.event2.queue.dequeueThreadPool.priority}</value>
</property>
</bean>
</beans>

View File

@@ -1211,6 +1211,15 @@ repo.event2.filter.childAssocTypes=rn:rendition
repo.event2.filter.users=
# Topic name
repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2
# Thread pool for async enqueue of repo events
repo.event2.queue.enqueueThreadPool.priority=1
repo.event2.queue.enqueueThreadPool.coreSize=8
repo.event2.queue.enqueueThreadPool.maximumSize=10
# Thread pool for async dequeue and delivery of repo events
repo.event2.queue.dequeueThreadPool.priority=1
repo.event2.queue.dequeueThreadPool.coreSize=1
repo.event2.queue.dequeueThreadPool.maximumSize=1
# MNT-21083
# --DELETE_NOT_EXISTS - default settings

View File

@@ -30,6 +30,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.jms.ConnectionFactory;
@@ -77,12 +78,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
{
protected static final boolean DEBUG = false;
protected static final String TEST_NAMESPACE = "http://www.alfresco.org/test/ContextAwareRepoEvent";
protected static final RepoEventContainer EVENT_CONTAINER = new RepoEventContainer();
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String TOPIC_NAME = "alfresco.repo.event2";
private static final String CAMEL_ROUTE = "jms:topic:" + TOPIC_NAME;
private static final RepoEventContainer EVENT_CONTAINER = new RepoEventContainer();
private static final CamelContext CAMEL_CONTEXT = new DefaultCamelContext();
private static boolean isCamelConfigured;
@@ -104,6 +107,13 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
@Autowired
protected ObjectMapper event2ObjectMapper;
@Autowired @Qualifier("eventGeneratorV2")
protected EventGenerator eventGenerator;
@Autowired
@Qualifier("eventGeneratorQueue")
protected EventGeneratorQueue eventQueue;
protected NodeRef rootNodeRef;
@BeforeClass
@@ -141,6 +151,33 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
}
return nodeService.getRootNode(storeRef);
});
flushSpuriousEvents();
}
/*
* When running with an empty database some events related to the creation may
* creep up here making the test fails. After attempting several other
* strategies, a smart sleep seems to do the work.
*/
protected void flushSpuriousEvents() throws InterruptedException
{
int maxloops = 5;
int count = maxloops;
do
{
Thread.sleep(165l);
if (EVENT_CONTAINER.isEmpty())
{
count--;
} else
{
EVENT_CONTAINER.reset();
count = maxloops;
}
} while (count > 0);
}
@After
@@ -179,6 +216,16 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
propertyMap).getChildRef());
}
protected NodeRef updateNodeName(NodeRef nodeRef, String newName)
{
PropertyMap propertyMap = new PropertyMap();
propertyMap.put(ContentModel.PROP_NAME, newName);
return retryingTransactionHelper.doInTransaction(() -> {
nodeService.addProperties(nodeRef, propertyMap);
return null;
});
}
protected void deleteNode(NodeRef nodeRef)
{
retryingTransactionHelper.doInTransaction(() -> {
@@ -376,13 +423,18 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
public static class RepoEventContainer implements Processor
{
private final List<RepoEvent<?>> events = new ArrayList<>();
private final List<RepoEvent<?>> events = Collections.synchronizedList(new ArrayList<>());
@Override
public void process(Exchange exchange)
{
Object object = exchange.getIn().getBody();
events.add((RepoEvent<?>) object);
if (DEBUG)
{
System.err.println("XX: "+object);
}
}
public List<RepoEvent<?>> getEvents()
@@ -404,6 +456,12 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
{
events.clear();
}
public boolean isEmpty()
{
return events.isEmpty();
}
}
@SuppressWarnings("unchecked")

View File

@@ -0,0 +1,290 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2021 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.event2;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.alfresco.repo.event.v1.model.RepoEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class EventGeneratorQueueUnitTest
{
private EventGeneratorQueue queue;
private Event2MessageProducer bus;
private ExecutorService enqueuePool;
private ExecutorService dequeuePool;
private List<RepoEvent<?>> recordedEvents;
private Map<String, RepoEvent<?>> events;
@Before
public void setup()
{
queue = new EventGeneratorQueue();
enqueuePool = newThreadPool();
queue.setEnqueueThreadPoolExecutor(enqueuePool);
dequeuePool = newThreadPool();
queue.setDequeueThreadPoolExecutor(dequeuePool);
bus = mock(Event2MessageProducer.class);
queue.setEvent2MessageProducer(bus);
events = new HashMap<>();
setupEventsRecorder();
}
@After
public void teardown()
{
enqueuePool.shutdown();
}
private void setupEventsRecorder()
{
recordedEvents = new CopyOnWriteArrayList<>();
Mockito.doAnswer(new Answer<Void>()
{
@Override
public Void answer(InvocationOnMock invocation) throws Throwable
{
RepoEvent<?> event = invocation.getArgument(0, RepoEvent.class);
recordedEvents.add(event);
return null;
}
}).when(bus).send(any());
}
@Test
public void shouldReceiveSingleQuickMessage() throws Exception
{
queue.accept(messageWithDelay("A", 55l));
sleep(150l);
assertEquals(1, recordedEvents.size());
assertEquals("A", recordedEvents.get(0).getId());
}
@Test
public void shouldNotReceiveEventsWhenMessageIsNull() throws Exception
{
queue.accept(() -> { return null; });
sleep(150l);
assertEquals(0, recordedEvents.size());
}
@Test
public void shouldReceiveMultipleMessagesPreservingOrderScenarioOne() throws Exception {
queue.accept(messageWithDelay("A", 0l));
queue.accept(messageWithDelay("B", 100l));
queue.accept(messageWithDelay("C", 200l));
sleep(450l);
assertEquals(3, recordedEvents.size());
assertEquals("A", recordedEvents.get(0).getId());
assertEquals("B", recordedEvents.get(1).getId());
assertEquals("C", recordedEvents.get(2).getId());
}
@Test
public void shouldReceiveMultipleMessagesPreservingOrderScenarioTwo() throws Exception
{
queue.accept(messageWithDelay("A", 300l));
queue.accept(messageWithDelay("B", 150l));
queue.accept(messageWithDelay("C", 0l));
sleep(950l);
assertEquals(3, recordedEvents.size());
assertEquals("A", recordedEvents.get(0).getId());
assertEquals("B", recordedEvents.get(1).getId());
assertEquals("C", recordedEvents.get(2).getId());
}
@Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisoned() throws Exception
{
queue.accept(messageWithDelay("A", 300l));
queue.accept(() -> {throw new RuntimeException("Boom! (not to worry, this is a test)");});
queue.accept(messageWithDelay("B", 55l));
queue.accept(messageWithDelay("C", 0l));
sleep(950l);
assertEquals(3, recordedEvents.size());
assertEquals("A", recordedEvents.get(0).getId());
assertEquals("B", recordedEvents.get(1).getId());
assertEquals("C", recordedEvents.get(2).getId());
}
@Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisoned() throws Exception
{
Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l);
RepoEvent<?> messageB = makerB.call();
doThrow(new RuntimeException("Boom! (not to worry, this is a test)")).when(bus).send(messageB);
queue.accept(messageWithDelay("A", 300l));
queue.accept(makerB);
queue.accept(messageWithDelay("C", 0l));
sleep(950l);
assertEquals(2, recordedEvents.size());
assertEquals("A", recordedEvents.get(0).getId());
assertEquals("C", recordedEvents.get(1).getId());
}
@Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisonedWithError() throws Exception
{
queue.accept(messageWithDelay("A", 300l));
queue.accept(() -> {throw new OutOfMemoryError("Boom! (not to worry, this is a test)");});
queue.accept(messageWithDelay("B", 55l));
queue.accept(messageWithDelay("C", 0l));
sleep(950l);
assertEquals(3, recordedEvents.size());
assertEquals("A", recordedEvents.get(0).getId());
assertEquals("B", recordedEvents.get(1).getId());
assertEquals("C", recordedEvents.get(2).getId());
}
@Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisonedWithError() throws Exception
{
Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l);
RepoEvent<?> messageB = makerB.call();
doThrow(new OutOfMemoryError("Boom! (not to worry, this is a test)")).when(bus).send(messageB);
queue.accept(messageWithDelay("A", 300l));
queue.accept(makerB);
queue.accept(messageWithDelay("C", 0l));
sleep(950l);
assertEquals(2, recordedEvents.size());
assertEquals("A", recordedEvents.get(0).getId());
assertEquals("C", recordedEvents.get(1).getId());
}
private Callable<RepoEvent<?>> messageWithDelay(String id, long delay)
{
Callable<RepoEvent<?>> res = new Callable<RepoEvent<?>>() {
@Override
public RepoEvent<?> call() throws Exception
{
if(delay != 0)
{
sleep(delay);
}
return newRepoEvent(id);
}
@Override
public String toString()
{
return id;
}
};
return res;
}
private RepoEvent<?> newRepoEvent(String id)
{
RepoEvent<?> ev = events.get(id);
if (ev!=null)
return ev;
ev = mock(RepoEvent.class);
when(ev.getId()).thenReturn(id);
when(ev.toString()).thenReturn(id);
events.put(id, ev);
return ev;
}
public static ExecutorService newThreadPool()
{
return new ThreadPoolExecutor(2, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static final Executor SYNC_EXECUTOR_SAME_THREAD = new Executor()
{
@Override
public void execute(Runnable command)
{
command.run();
}
};
public static final Executor SYNC_EXECUTOR_NEW_THREAD = new Executor()
{
@Override
public void execute(Runnable command)
{
Thread t = new Thread(command);
t.start();
try
{
t.join();
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
};
}

View File

@@ -0,0 +1,249 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2020 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.event2;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.event.databind.ObjectMapperFactory;
import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.service.cmr.repository.NodeRef;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import com.fasterxml.jackson.databind.ObjectMapper;
public class EventGeneratorTest extends AbstractContextAwareRepoEvent
{
private static final String EVENT2_TOPIC_NAME = "alfresco.repo.event2";
private static final long DUMP_BROKER_TIMEOUT = 50000000l;
@Autowired @Qualifier("event2ObjectMapper")
private ObjectMapper objectMapper;
private ActiveMQConnection connection;
protected List<RepoEvent<?>> receivedEvents;
@Before
public void startupTopicListener() throws Exception
{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(EVENT2_TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(destination);
receivedEvents = Collections.synchronizedList(new LinkedList<>());
consumer.setMessageListener(new MessageListener()
{
@Override
public void onMessage(Message message)
{
String text = getText(message);
RepoEvent<?> event = toRepoEvent(text);
if (DEBUG)
{
System.err.println("RX: " + event);
}
receivedEvents.add(event);
}
private RepoEvent<?> toRepoEvent(String json)
{
try
{
return objectMapper.readValue(json, RepoEvent.class);
} catch (Exception e)
{
e.printStackTrace();
return null;
}
}
});
if (DEBUG)
{
System.err.println("Now actively listening on topic " + EVENT2_TOPIC_NAME);
}
}
protected ObjectMapper createObjectMapper()
{
return ObjectMapperFactory.createInstance();
}
@After
public void shutdownTopicListener() throws Exception
{
connection.close();
connection = null;
}
@Test
public void shouldReceiveEvent2EventsOnNodeCreation() throws Exception
{
createNode(ContentModel.TYPE_CONTENT);
Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 1);
RepoEvent<?> sent = getRepoEvent(1);
RepoEvent<?> received = receivedEvents.get(0);
assertEventsEquals("Events are different!", sent, received);
}
private void assertEventsEquals(String message, RepoEvent<?> expected, RepoEvent<?> current)
{
if (DEBUG)
{
System.err.println("XP: " + expected);
System.err.println("CU: " + current);
}
assertEquals(message, expected, current);
}
@Test
public void shouldReceiveEvent2EventsInOrder() throws Exception
{
NodeRef nodeRef = createNode(ContentModel.TYPE_CONTENT);
updateNodeName(nodeRef, "TestFile-" + System.currentTimeMillis() + ".txt");
deleteNode(nodeRef);
Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 3);
RepoEvent<?> sentCreation = getRepoEvent(1);
RepoEvent<?> sentUpdate = getRepoEvent(2);
RepoEvent<?> sentDeletion = getRepoEvent(3);
assertEquals("Expected create event!", sentCreation, (RepoEvent<?>) receivedEvents.get(0));
assertEquals("Expected update event!", sentUpdate, (RepoEvent<?>) receivedEvents.get(1));
assertEquals("Expected delete event!", sentDeletion, (RepoEvent<?>) receivedEvents.get(2));
}
private static String getText(Message message)
{
try
{
ActiveMQTextMessage am = (ActiveMQTextMessage) message;
return am.getText();
} catch (JMSException e)
{
return null;
}
}
// a simple main to investigate the contents of the local broker
public static void main(String[] args) throws Exception
{
dumpBroker("tcp://localhost:61616", DUMP_BROKER_TIMEOUT);
System.exit(0);
}
private static void dumpBroker(String url, long timeout) throws Exception
{
System.out.println("Broker at url: '" + url + "'");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
try
{
connection.start();
DestinationSource ds = connection.getDestinationSource();
Set<ActiveMQQueue> queues = ds.getQueues();
System.out.println("\nFound " + queues.size() + " queues:");
for (ActiveMQQueue queue : queues)
{
try
{
System.out.println("- " + queue.getQueueName());
} catch (JMSException e)
{
e.printStackTrace();
}
}
Set<ActiveMQTopic> topics = ds.getTopics();
System.out.println("\nFound " + topics.size() + " topics:");
for (ActiveMQTopic topic : topics)
{
try
{
System.out.println("- " + topic.getTopicName());
} catch (JMSException e)
{
e.printStackTrace();
}
}
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(EVENT2_TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(destination);
System.out.println("\nListening to topic " + EVENT2_TOPIC_NAME + "...");
consumer.setMessageListener(new MessageListener()
{
@Override
public void onMessage(Message message)
{
String text = getText(message);
System.out.println("Received message " + message + "\n" + text + "\n");
}
});
Thread.sleep(timeout);
} finally
{
connection.close();
}
}
}

View File

@@ -34,7 +34,8 @@ import org.junit.runners.Suite.SuiteClasses;
UpdateRepoEventIT.class,
DeleteRepoEventIT.class,
ChildAssociationRepoEventIT.class,
PeerAssociationRepoEventIT.class
PeerAssociationRepoEventIT.class,
EventGeneratorTest.class
})
public class RepoEvent2ITSuite
{

View File

@@ -33,7 +33,8 @@ import org.junit.runners.Suite.SuiteClasses;
@RunWith(Suite.class)
@SuiteClasses({ EventFilterUnitTest.class,
EventConsolidatorUnitTest.class,
EventJSONSchemaUnitTest.class
EventJSONSchemaUnitTest.class,
EventGeneratorQueueUnitTest.class
})
public class RepoEvent2UnitSuite
{