From 046116ddf0498facd672260fe28e3fc4f19c30e8 Mon Sep 17 00:00:00 2001 From: Nana Insaidoo Date: Fri, 9 Apr 2021 13:34:05 +0100 Subject: [PATCH] Bugfix/repo 5610 events are not actually sent to activemq (#360) * Add events tests * Polished put test: connects to JMS via TCP and validate that the event sent is also received back * Now the tests provides a simple main() that listens on the topic, useful for quick debug sessions * Now the user name is collected in the calling thread, so that the sendEvent does not silently fails * Apply changes following review * Now using queue system to guarantee events order * Add license * Updated logs and corrected comments * Remove empty methods * Now catering for spurious events at startup when database is bootstrapped * Now preserving the txn-id in all events * Moved up definitions in events2.xml after PR feedback Co-authored-by: Bruno Bossola --- .../alfresco/repo/event2/EventGenerator.java | 81 ++--- .../repo/event2/EventGeneratorQueue.java | 179 +++++++++++ .../resources/alfresco/events2-context.xml | 45 ++- .../resources/alfresco/repository.properties | 9 + .../event2/AbstractContextAwareRepoEvent.java | 64 +++- .../event2/EventGeneratorQueueUnitTest.java | 290 ++++++++++++++++++ .../repo/event2/EventGeneratorTest.java | 249 +++++++++++++++ .../repo/event2/RepoEvent2ITSuite.java | 3 +- .../repo/event2/RepoEvent2UnitSuite.java | 3 +- 9 files changed, 879 insertions(+), 44 deletions(-) create mode 100644 repository/src/main/java/org/alfresco/repo/event2/EventGeneratorQueue.java create mode 100644 repository/src/test/java/org/alfresco/repo/event2/EventGeneratorQueueUnitTest.java create mode 100644 repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java diff --git a/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java b/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java index 06fea65ba9..a4c01021f1 100644 --- a/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java +++ b/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java @@ -54,7 +54,6 @@ import org.alfresco.repo.policy.JavaBehaviour; import org.alfresco.repo.policy.PolicyComponent; import org.alfresco.repo.security.authentication.AuthenticationUtil; import org.alfresco.repo.transaction.AlfrescoTransactionSupport; -import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.cmr.dictionary.DictionaryService; import org.alfresco.service.cmr.repository.AssociationRef; import org.alfresco.service.cmr.repository.ChildAssociationRef; @@ -90,11 +89,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin protected DictionaryService dictionaryService; private DescriptorService descriptorService; private EventFilterRegistry eventFilterRegistry; - private Event2MessageProducer event2MessageProducer; private TransactionService transactionService; private PersonService personService; protected NodeResourceHelper nodeResourceHelper; + private EventGeneratorQueue eventGeneratorQueue; private NodeTypeFilter nodeTypeFilter; private ChildAssociationTypeFilter childAssociationTypeFilter; private EventUserFilter userFilter; @@ -109,10 +108,10 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin PropertyCheck.mandatory(this, "dictionaryService", dictionaryService); PropertyCheck.mandatory(this, "descriptorService", descriptorService); PropertyCheck.mandatory(this, "eventFilterRegistry", eventFilterRegistry); - PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer); PropertyCheck.mandatory(this, "transactionService", transactionService); PropertyCheck.mandatory(this, "personService", personService); PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper); + PropertyCheck.mandatory(this, "eventGeneratorQueue", eventGeneratorQueue); this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter(); this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter(); @@ -177,12 +176,6 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin this.eventFilterRegistry = eventFilterRegistry; } - @SuppressWarnings("unused") - public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer) - { - this.event2MessageProducer = event2MessageProducer; - } - public void setTransactionService(TransactionService transactionService) { this.transactionService = transactionService; @@ -198,6 +191,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin this.nodeResourceHelper = nodeResourceHelper; } + public void setEventGeneratorQueue(EventGeneratorQueue eventGeneratorQueue) + { + this.eventGeneratorQueue = eventGeneratorQueue; + } + @Override public void onCreateNode(ChildAssociationRef childAssocRef) { @@ -428,20 +426,26 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator) { + EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); + eventGeneratorQueue.accept(()-> createEvent(nodeRef, consolidator, eventInfo)); + } + + private RepoEvent createEvent(NodeRef nodeRef, EventConsolidator consolidator, EventInfo eventInfo) + { + String user = eventInfo.getPrincipal(); + if (consolidator.isTemporaryNode()) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Ignoring temporary node: " + nodeRef); } - return; + return null; } - final String user = AuthenticationUtil.getFullyAuthenticatedUser(); // Get the repo event before the filtering, // so we can take the latest node info into account - final RepoEvent event = consolidator.getRepoEvent(getEventInfo(user)); - + final RepoEvent event = consolidator.getRepoEvent(eventInfo); final QName nodeType = consolidator.getNodeType(); if (isFiltered(nodeType, user)) @@ -452,7 +456,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin + ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString()) + "' created by: " + user); } - return; + return null; } if (event.getType().equals(EventType.NODE_UPDATED.getType()) && consolidator.isResourceBeforeAllFieldsNull()) @@ -461,27 +465,34 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin { LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeRef); } - return; + return null; } - logAndSendEvent(event, consolidator.getEventTypes()); + logEvent(event, consolidator.getEventTypes()); + return event; } protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator) { + EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); + eventGeneratorQueue.accept(()-> createEvent(eventInfo, childAssociationRef, consolidator)); + } + + private RepoEvent createEvent(EventInfo eventInfo, ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator) + { + String user = eventInfo.getPrincipal(); if (consolidator.isTemporaryChildAssociation()) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Ignoring temporary child association: " + childAssociationRef); } - return; + return null; } - final String user = AuthenticationUtil.getFullyAuthenticatedUser(); // Get the repo event before the filtering, // so we can take the latest association info into account - final RepoEvent event = consolidator.getRepoEvent(getEventInfo(user)); + final RepoEvent event = consolidator.getRepoEvent(eventInfo); final QName childAssocType = consolidator.getChildAssocType(); if (isFilteredChildAssociation(childAssocType, user)) @@ -492,7 +503,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString()) + "' created by: " + user); } - return; + return null; } else if (childAssociationRef.isPrimary()) { if (LOGGER.isTraceEnabled()) @@ -501,13 +512,20 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString()) + "' created by: " + user); } - return; + return null; } - logAndSendEvent(event, consolidator.getEventTypes()); + logEvent(event, consolidator.getEventTypes()); + return event; } protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator) + { + EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); + eventGeneratorQueue.accept(()-> createEvent(eventInfo, peerAssociationRef, consolidator)); + } + + private RepoEvent createEvent(EventInfo eventInfo, AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator) { if (consolidator.isTemporaryPeerAssociation()) { @@ -515,30 +533,21 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin { LOGGER.trace("Ignoring temporary peer association: " + peerAssociationRef); } - return; + return null; } - final String user = AuthenticationUtil.getFullyAuthenticatedUser(); - // Get the repo event before the filtering, - // so we can take the latest association info into account - final RepoEvent event = consolidator.getRepoEvent(getEventInfo(user)); - - logAndSendEvent(event, consolidator.getEventTypes()); + RepoEvent event = consolidator.getRepoEvent(eventInfo); + logEvent(event, consolidator.getEventTypes()); + return event; } - protected void logAndSendEvent(RepoEvent event, Deque listOfEvents) + private void logEvent(RepoEvent event, Deque listOfEvents) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("List of Events:" + listOfEvents); LOGGER.trace("Sending event:" + event); } - // Need to execute this in another read txn because Camel expects it - transactionService.getRetryingTransactionHelper().doInTransaction((RetryingTransactionCallback) () -> { - event2MessageProducer.send(event); - - return null; - }, true, false); } } diff --git a/repository/src/main/java/org/alfresco/repo/event2/EventGeneratorQueue.java b/repository/src/main/java/org/alfresco/repo/event2/EventGeneratorQueue.java new file mode 100644 index 0000000000..2805b16b1d --- /dev/null +++ b/repository/src/main/java/org/alfresco/repo/event2/EventGeneratorQueue.java @@ -0,0 +1,179 @@ +/* + * #%L + * Alfresco Repository + * %% + * Copyright (C) 2005 - 2021 Alfresco Software Limited + * %% + * This file is part of the Alfresco software. + * If the software was purchased under a paid Alfresco license, the terms of + * the paid license agreement will prevail. Otherwise, the software is + * provided under the following open source license terms: + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + * #L% + */ +package org.alfresco.repo.event2; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.alfresco.repo.event.v1.model.RepoEvent; +import org.alfresco.util.PropertyCheck; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.InitializingBean; + +/* + * This queue allows to create asynchronously the RepoEvent offloading the work to a ThreadPool but + * at the same time it preserves the order of the events + */ +public class EventGeneratorQueue implements InitializingBean +{ + protected static final Log LOGGER = LogFactory.getLog(EventGeneratorQueue.class); + + protected Executor enqueueThreadPoolExecutor; + protected Executor dequeueThreadPoolExecutor; + protected Event2MessageProducer event2MessageProducer; + protected BlockingQueue queue = new LinkedBlockingQueue<>(); + protected Runnable listener = createListener(); + + @Override + public void afterPropertiesSet() throws Exception + { + PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor); + PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor); + PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer); + } + + public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer) + { + this.event2MessageProducer = event2MessageProducer; + } + + public void setEnqueueThreadPoolExecutor(Executor enqueueThreadPoolExecutor) + { + this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor; + } + + public void setDequeueThreadPoolExecutor(Executor dequeueThreadPoolExecutor) + { + this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor; + dequeueThreadPoolExecutor.execute(listener); + } + + /** + * Procedure to enqueue the callback functions that creates an event. + * @param maker Callback function that creates an event. + */ + public void accept(Callable> maker) + { + EventInMaking eventInMaking = new EventInMaking(maker); + queue.offer(eventInMaking); + enqueueThreadPoolExecutor.execute(() -> { + try + { + eventInMaking.make(); + } + catch (Exception e) + { + LOGGER.error("Unexpected error while enqueuing maker function for repository event" + e); + } + }); + } + + /** + * Create listener task in charge of dequeuing and sending events ready to be sent. + * @return The task in charge of dequeuing and sending events ready to be sent. + */ + private Runnable createListener() + { + return new Runnable() + { + @Override + public void run() + { + try + { + while (!Thread.interrupted()) + { + try + { + EventInMaking eventInMaking = queue.take(); + RepoEvent event = eventInMaking.getEventWhenReady(); + if (event != null) + { + event2MessageProducer.send(event); + } + } + catch (Exception e) + { + LOGGER.error("Unexpected error while dequeuing and sending repository event" + e); + } + } + } + finally + { + LOGGER.warn("Unexpected: rescheduling the listener thread."); + dequeueThreadPoolExecutor.execute(listener); + } + } + }; + + } + + /* + * Simple class that makes events and allows to retrieve them when ready + */ + private static class EventInMaking + { + private Callable> maker; + private volatile RepoEvent event; + private CountDownLatch latch; + + public EventInMaking(Callable> maker) + { + this.maker = maker; + this.latch = new CountDownLatch(1); + } + + public void make() throws Exception + { + try + { + event = maker.call(); + } + finally + { + latch.countDown(); + } + } + + public RepoEvent getEventWhenReady() throws InterruptedException + { + latch.await(30, TimeUnit.SECONDS); + return event; + } + + @Override + public String toString() + { + return maker.toString(); + } + } + +} diff --git a/repository/src/main/resources/alfresco/events2-context.xml b/repository/src/main/resources/alfresco/events2-context.xml index c5d899023f..028421f7ee 100644 --- a/repository/src/main/resources/alfresco/events2-context.xml +++ b/repository/src/main/resources/alfresco/events2-context.xml @@ -38,9 +38,10 @@ - + + @@ -54,7 +55,45 @@ - - + + + + + + + + + + + + + + + eventAsyncEnqueueThreadPool + + + ${repo.event2.queue.enqueueThreadPool.coreSize} + + + ${repo.event2.queue.enqueueThreadPool.maximumSize} + + + ${repo.event2.queue.enqueueThreadPool.priority} + + + + + + eventAsyncDequeueThreadPool + + + ${repo.event2.queue.dequeueThreadPool.coreSize} + + + ${repo.event2.queue.dequeueThreadPool.maximumSize} + + + ${repo.event2.queue.dequeueThreadPool.priority} + diff --git a/repository/src/main/resources/alfresco/repository.properties b/repository/src/main/resources/alfresco/repository.properties index d017205478..b7f27589da 100644 --- a/repository/src/main/resources/alfresco/repository.properties +++ b/repository/src/main/resources/alfresco/repository.properties @@ -1211,6 +1211,15 @@ repo.event2.filter.childAssocTypes=rn:rendition repo.event2.filter.users= # Topic name repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2 +# Thread pool for async enqueue of repo events +repo.event2.queue.enqueueThreadPool.priority=1 +repo.event2.queue.enqueueThreadPool.coreSize=8 +repo.event2.queue.enqueueThreadPool.maximumSize=10 +# Thread pool for async dequeue and delivery of repo events +repo.event2.queue.dequeueThreadPool.priority=1 +repo.event2.queue.dequeueThreadPool.coreSize=1 +repo.event2.queue.dequeueThreadPool.maximumSize=1 + # MNT-21083 # --DELETE_NOT_EXISTS - default settings diff --git a/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java b/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java index 458336401f..c5f3f9248f 100644 --- a/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java +++ b/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java @@ -30,6 +30,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import javax.jms.ConnectionFactory; @@ -77,17 +78,19 @@ import com.fasterxml.jackson.databind.ObjectMapper; public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest { + protected static final boolean DEBUG = false; + protected static final String TEST_NAMESPACE = "http://www.alfresco.org/test/ContextAwareRepoEvent"; + protected static final RepoEventContainer EVENT_CONTAINER = new RepoEventContainer(); private static final String BROKER_URL = "tcp://localhost:61616"; private static final String TOPIC_NAME = "alfresco.repo.event2"; private static final String CAMEL_ROUTE = "jms:topic:" + TOPIC_NAME; - private static final RepoEventContainer EVENT_CONTAINER = new RepoEventContainer(); private static final CamelContext CAMEL_CONTEXT = new DefaultCamelContext(); private static boolean isCamelConfigured; private static DataFormat dataFormat; - + @Autowired protected RetryingTransactionHelper retryingTransactionHelper; @@ -104,6 +107,13 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest @Autowired protected ObjectMapper event2ObjectMapper; + @Autowired @Qualifier("eventGeneratorV2") + protected EventGenerator eventGenerator; + + @Autowired + @Qualifier("eventGeneratorQueue") + protected EventGeneratorQueue eventQueue; + protected NodeRef rootNodeRef; @BeforeClass @@ -141,8 +151,35 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest } return nodeService.getRootNode(storeRef); }); + + flushSpuriousEvents(); } + /* + * When running with an empty database some events related to the creation may + * creep up here making the test fails. After attempting several other + * strategies, a smart sleep seems to do the work. + */ + protected void flushSpuriousEvents() throws InterruptedException + { + int maxloops = 5; + + int count = maxloops; + do + { + Thread.sleep(165l); + if (EVENT_CONTAINER.isEmpty()) + { + count--; + } else + { + EVENT_CONTAINER.reset(); + count = maxloops; + } + + } while (count > 0); + } + @After public void tearDown() { @@ -179,6 +216,16 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest propertyMap).getChildRef()); } + protected NodeRef updateNodeName(NodeRef nodeRef, String newName) + { + PropertyMap propertyMap = new PropertyMap(); + propertyMap.put(ContentModel.PROP_NAME, newName); + return retryingTransactionHelper.doInTransaction(() -> { + nodeService.addProperties(nodeRef, propertyMap); + return null; + }); + } + protected void deleteNode(NodeRef nodeRef) { retryingTransactionHelper.doInTransaction(() -> { @@ -376,13 +423,18 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest public static class RepoEventContainer implements Processor { - private final List> events = new ArrayList<>(); + private final List> events = Collections.synchronizedList(new ArrayList<>()); @Override public void process(Exchange exchange) { Object object = exchange.getIn().getBody(); events.add((RepoEvent) object); + + if (DEBUG) + { + System.err.println("XX: "+object); + } } public List> getEvents() @@ -404,6 +456,12 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest { events.clear(); } + + public boolean isEmpty() + { + return events.isEmpty(); + } + } @SuppressWarnings("unchecked") diff --git a/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorQueueUnitTest.java b/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorQueueUnitTest.java new file mode 100644 index 0000000000..241464ca42 --- /dev/null +++ b/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorQueueUnitTest.java @@ -0,0 +1,290 @@ +/* + * #%L + * Alfresco Repository + * %% + * Copyright (C) 2005 - 2021 Alfresco Software Limited + * %% + * This file is part of the Alfresco software. + * If the software was purchased under a paid Alfresco license, the terms of + * the paid license agreement will prevail. Otherwise, the software is + * provided under the following open source license terms: + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + * #L% + */ +package org.alfresco.repo.event2; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.alfresco.repo.event.v1.model.RepoEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class EventGeneratorQueueUnitTest +{ + private EventGeneratorQueue queue; + + private Event2MessageProducer bus; + private ExecutorService enqueuePool; + private ExecutorService dequeuePool; + private List> recordedEvents; + private Map> events; + + @Before + public void setup() + { + queue = new EventGeneratorQueue(); + + enqueuePool = newThreadPool(); + queue.setEnqueueThreadPoolExecutor(enqueuePool); + dequeuePool = newThreadPool(); + queue.setDequeueThreadPoolExecutor(dequeuePool); + + bus = mock(Event2MessageProducer.class); + queue.setEvent2MessageProducer(bus); + + events = new HashMap<>(); + + setupEventsRecorder(); + } + + @After + public void teardown() + { + enqueuePool.shutdown(); + } + + private void setupEventsRecorder() + { + recordedEvents = new CopyOnWriteArrayList<>(); + + Mockito.doAnswer(new Answer() + { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable + { + RepoEvent event = invocation.getArgument(0, RepoEvent.class); + recordedEvents.add(event); + return null; + } + }).when(bus).send(any()); + } + + @Test + public void shouldReceiveSingleQuickMessage() throws Exception + { + queue.accept(messageWithDelay("A", 55l)); + + sleep(150l); + + assertEquals(1, recordedEvents.size()); + assertEquals("A", recordedEvents.get(0).getId()); + } + + @Test + public void shouldNotReceiveEventsWhenMessageIsNull() throws Exception + { + queue.accept(() -> { return null; }); + + sleep(150l); + + assertEquals(0, recordedEvents.size()); + } + + @Test + public void shouldReceiveMultipleMessagesPreservingOrderScenarioOne() throws Exception { + queue.accept(messageWithDelay("A", 0l)); + queue.accept(messageWithDelay("B", 100l)); + queue.accept(messageWithDelay("C", 200l)); + + sleep(450l); + + assertEquals(3, recordedEvents.size()); + assertEquals("A", recordedEvents.get(0).getId()); + assertEquals("B", recordedEvents.get(1).getId()); + assertEquals("C", recordedEvents.get(2).getId()); + } + + @Test + public void shouldReceiveMultipleMessagesPreservingOrderScenarioTwo() throws Exception + { + queue.accept(messageWithDelay("A", 300l)); + queue.accept(messageWithDelay("B", 150l)); + queue.accept(messageWithDelay("C", 0l)); + + sleep(950l); + + assertEquals(3, recordedEvents.size()); + assertEquals("A", recordedEvents.get(0).getId()); + assertEquals("B", recordedEvents.get(1).getId()); + assertEquals("C", recordedEvents.get(2).getId()); + } + + @Test + public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisoned() throws Exception + { + queue.accept(messageWithDelay("A", 300l)); + queue.accept(() -> {throw new RuntimeException("Boom! (not to worry, this is a test)");}); + queue.accept(messageWithDelay("B", 55l)); + queue.accept(messageWithDelay("C", 0l)); + + sleep(950l); + + assertEquals(3, recordedEvents.size()); + assertEquals("A", recordedEvents.get(0).getId()); + assertEquals("B", recordedEvents.get(1).getId()); + assertEquals("C", recordedEvents.get(2).getId()); + } + + @Test + public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisoned() throws Exception + { + Callable> makerB = messageWithDelay("B", 55l); + RepoEvent messageB = makerB.call(); + doThrow(new RuntimeException("Boom! (not to worry, this is a test)")).when(bus).send(messageB); + queue.accept(messageWithDelay("A", 300l)); + queue.accept(makerB); + queue.accept(messageWithDelay("C", 0l)); + + sleep(950l); + + assertEquals(2, recordedEvents.size()); + assertEquals("A", recordedEvents.get(0).getId()); + assertEquals("C", recordedEvents.get(1).getId()); + } + + @Test + public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisonedWithError() throws Exception + { + queue.accept(messageWithDelay("A", 300l)); + queue.accept(() -> {throw new OutOfMemoryError("Boom! (not to worry, this is a test)");}); + queue.accept(messageWithDelay("B", 55l)); + queue.accept(messageWithDelay("C", 0l)); + + sleep(950l); + + assertEquals(3, recordedEvents.size()); + assertEquals("A", recordedEvents.get(0).getId()); + assertEquals("B", recordedEvents.get(1).getId()); + assertEquals("C", recordedEvents.get(2).getId()); + } + + @Test + public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisonedWithError() throws Exception + { + Callable> makerB = messageWithDelay("B", 55l); + RepoEvent messageB = makerB.call(); + doThrow(new OutOfMemoryError("Boom! (not to worry, this is a test)")).when(bus).send(messageB); + queue.accept(messageWithDelay("A", 300l)); + queue.accept(makerB); + queue.accept(messageWithDelay("C", 0l)); + + sleep(950l); + + assertEquals(2, recordedEvents.size()); + assertEquals("A", recordedEvents.get(0).getId()); + assertEquals("C", recordedEvents.get(1).getId()); + } + + private Callable> messageWithDelay(String id, long delay) + { + Callable> res = new Callable>() { + + @Override + public RepoEvent call() throws Exception + { + if(delay != 0) + { + sleep(delay); + } + return newRepoEvent(id); + } + + @Override + public String toString() + { + return id; + } + }; + return res; + } + + private RepoEvent newRepoEvent(String id) + { + RepoEvent ev = events.get(id); + if (ev!=null) + return ev; + + ev = mock(RepoEvent.class); + when(ev.getId()).thenReturn(id); + when(ev.toString()).thenReturn(id); + events.put(id, ev); + + return ev; + } + + public static ExecutorService newThreadPool() + { + return new ThreadPoolExecutor(2, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue()); + } + + public static final Executor SYNC_EXECUTOR_SAME_THREAD = new Executor() + { + @Override + public void execute(Runnable command) + { + command.run(); + } + }; + + public static final Executor SYNC_EXECUTOR_NEW_THREAD = new Executor() + { + @Override + public void execute(Runnable command) + { + Thread t = new Thread(command); + t.start(); + try + { + t.join(); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + }; +} diff --git a/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java b/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java new file mode 100644 index 0000000000..d80e1ba7dc --- /dev/null +++ b/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java @@ -0,0 +1,249 @@ +/* + * #%L + * Alfresco Repository + * %% + * Copyright (C) 2005 - 2020 Alfresco Software Limited + * %% + * This file is part of the Alfresco software. + * If the software was purchased under a paid Alfresco license, the terms of + * the paid license agreement will prevail. Otherwise, the software is + * provided under the following open source license terms: + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + * #L% + */ +package org.alfresco.repo.event2; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.alfresco.model.ContentModel; +import org.alfresco.repo.event.databind.ObjectMapperFactory; +import org.alfresco.repo.event.v1.model.RepoEvent; +import org.alfresco.service.cmr.repository.NodeRef; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.DestinationSource; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class EventGeneratorTest extends AbstractContextAwareRepoEvent +{ + private static final String EVENT2_TOPIC_NAME = "alfresco.repo.event2"; + + private static final long DUMP_BROKER_TIMEOUT = 50000000l; + + @Autowired @Qualifier("event2ObjectMapper") + private ObjectMapper objectMapper; + + private ActiveMQConnection connection; + protected List> receivedEvents; + + @Before + public void startupTopicListener() throws Exception + { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(EVENT2_TOPIC_NAME); + MessageConsumer consumer = session.createConsumer(destination); + + receivedEvents = Collections.synchronizedList(new LinkedList<>()); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message message) + { + String text = getText(message); + RepoEvent event = toRepoEvent(text); + + if (DEBUG) + { + System.err.println("RX: " + event); + } + + receivedEvents.add(event); + } + + private RepoEvent toRepoEvent(String json) + { + try + { + return objectMapper.readValue(json, RepoEvent.class); + } catch (Exception e) + { + e.printStackTrace(); + return null; + } + } + }); + + if (DEBUG) + { + System.err.println("Now actively listening on topic " + EVENT2_TOPIC_NAME); + } + } + + protected ObjectMapper createObjectMapper() + { + return ObjectMapperFactory.createInstance(); + } + + @After + public void shutdownTopicListener() throws Exception + { + connection.close(); + connection = null; + } + + @Test + public void shouldReceiveEvent2EventsOnNodeCreation() throws Exception + { + createNode(ContentModel.TYPE_CONTENT); + + Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 1); + + RepoEvent sent = getRepoEvent(1); + RepoEvent received = receivedEvents.get(0); + assertEventsEquals("Events are different!", sent, received); + } + + private void assertEventsEquals(String message, RepoEvent expected, RepoEvent current) + { + if (DEBUG) + { + System.err.println("XP: " + expected); + System.err.println("CU: " + current); + } + + assertEquals(message, expected, current); + } + + @Test + public void shouldReceiveEvent2EventsInOrder() throws Exception + { + NodeRef nodeRef = createNode(ContentModel.TYPE_CONTENT); + updateNodeName(nodeRef, "TestFile-" + System.currentTimeMillis() + ".txt"); + deleteNode(nodeRef); + + Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 3); + + RepoEvent sentCreation = getRepoEvent(1); + RepoEvent sentUpdate = getRepoEvent(2); + RepoEvent sentDeletion = getRepoEvent(3); + assertEquals("Expected create event!", sentCreation, (RepoEvent) receivedEvents.get(0)); + assertEquals("Expected update event!", sentUpdate, (RepoEvent) receivedEvents.get(1)); + assertEquals("Expected delete event!", sentDeletion, (RepoEvent) receivedEvents.get(2)); + } + + private static String getText(Message message) + { + try + { + ActiveMQTextMessage am = (ActiveMQTextMessage) message; + return am.getText(); + } catch (JMSException e) + { + return null; + } + } + + // a simple main to investigate the contents of the local broker + public static void main(String[] args) throws Exception + { + dumpBroker("tcp://localhost:61616", DUMP_BROKER_TIMEOUT); + System.exit(0); + } + + private static void dumpBroker(String url, long timeout) throws Exception + { + System.out.println("Broker at url: '" + url + "'"); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + try + { + connection.start(); + + DestinationSource ds = connection.getDestinationSource(); + + Set queues = ds.getQueues(); + System.out.println("\nFound " + queues.size() + " queues:"); + for (ActiveMQQueue queue : queues) + { + try + { + System.out.println("- " + queue.getQueueName()); + } catch (JMSException e) + { + e.printStackTrace(); + } + } + + Set topics = ds.getTopics(); + System.out.println("\nFound " + topics.size() + " topics:"); + for (ActiveMQTopic topic : topics) + { + try + { + System.out.println("- " + topic.getTopicName()); + } catch (JMSException e) + { + e.printStackTrace(); + } + } + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(EVENT2_TOPIC_NAME); + MessageConsumer consumer = session.createConsumer(destination); + + System.out.println("\nListening to topic " + EVENT2_TOPIC_NAME + "..."); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message message) + { + String text = getText(message); + System.out.println("Received message " + message + "\n" + text + "\n"); + } + }); + + Thread.sleep(timeout); + } finally + { + connection.close(); + } + } +} diff --git a/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2ITSuite.java b/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2ITSuite.java index cdf78e963b..6937822f87 100644 --- a/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2ITSuite.java +++ b/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2ITSuite.java @@ -34,7 +34,8 @@ import org.junit.runners.Suite.SuiteClasses; UpdateRepoEventIT.class, DeleteRepoEventIT.class, ChildAssociationRepoEventIT.class, - PeerAssociationRepoEventIT.class + PeerAssociationRepoEventIT.class, + EventGeneratorTest.class }) public class RepoEvent2ITSuite { diff --git a/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2UnitSuite.java b/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2UnitSuite.java index f22cae45d6..462f21c464 100644 --- a/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2UnitSuite.java +++ b/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2UnitSuite.java @@ -33,7 +33,8 @@ import org.junit.runners.Suite.SuiteClasses; @RunWith(Suite.class) @SuiteClasses({ EventFilterUnitTest.class, EventConsolidatorUnitTest.class, - EventJSONSchemaUnitTest.class + EventJSONSchemaUnitTest.class, + EventGeneratorQueueUnitTest.class }) public class RepoEvent2UnitSuite {