diff --git a/repository/src/main/java/org/alfresco/repo/event2/DirectEventSender.java b/repository/src/main/java/org/alfresco/repo/event2/DirectEventSender.java index f2af8666ea..3f3805be09 100644 --- a/repository/src/main/java/org/alfresco/repo/event2/DirectEventSender.java +++ b/repository/src/main/java/org/alfresco/repo/event2/DirectEventSender.java @@ -2,7 +2,7 @@ * #%L * Alfresco Repository * %% - * Copyright (C) 2005 - 2023 Alfresco Software Limited + * Copyright (C) 2005 - 2024 Alfresco Software Limited * %% * This file is part of the Alfresco software. * If the software was purchased under a paid Alfresco license, the terms of @@ -36,18 +36,13 @@ import org.springframework.beans.factory.InitializingBean; /** * Sends a message to a destination in the current thread. */ -public class DirectEventSender implements EventSender, InitializingBean +public class DirectEventSender implements EventSender { - protected Event2MessageProducer event2MessageProducer; + protected final Event2MessageProducer event2MessageProducer; - @Override - public void afterPropertiesSet() + public DirectEventSender(Event2MessageProducer event2MessageProducer) { PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer); - } - - public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer) - { this.event2MessageProducer = event2MessageProducer; } diff --git a/repository/src/main/java/org/alfresco/repo/event2/EnqueuingEventSender.java b/repository/src/main/java/org/alfresco/repo/event2/EnqueuingEventSender.java index 08a425f218..7615cba81f 100644 --- a/repository/src/main/java/org/alfresco/repo/event2/EnqueuingEventSender.java +++ b/repository/src/main/java/org/alfresco/repo/event2/EnqueuingEventSender.java @@ -2,7 +2,7 @@ * #%L * Alfresco Repository * %% - * Copyright (C) 2005 - 2023 Alfresco Software Limited + * Copyright (C) 2005 - 2024 Alfresco Software Limited * %% * This file is part of the Alfresco software. * If the software was purchased under a paid Alfresco license, the terms of @@ -46,28 +46,18 @@ public class EnqueuingEventSender extends DirectEventSender { protected static final Log LOGGER = LogFactory.getLog(EnqueuingEventSender.class); - protected Executor enqueueThreadPoolExecutor; - protected Executor dequeueThreadPoolExecutor; + protected final Executor enqueueThreadPoolExecutor; + protected final Executor dequeueThreadPoolExecutor; protected BlockingQueue queue = new LinkedBlockingQueue<>(); protected Runnable listener = createListener(); - @Override - public void afterPropertiesSet() + public EnqueuingEventSender(Event2MessageProducer event2MessageProducer, Executor enqueueThreadPoolExecutor, Executor dequeueThreadPoolExecutor) { - super.afterPropertiesSet(); + super(event2MessageProducer); PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor); PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor); - } - - public void setEnqueueThreadPoolExecutor(Executor enqueueThreadPoolExecutor) - { this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor; - } - - public void setDequeueThreadPoolExecutor(Executor dequeueThreadPoolExecutor) - { this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor; - dequeueThreadPoolExecutor.execute(listener); } /** @@ -91,6 +81,12 @@ public class EnqueuingEventSender extends DirectEventSender }); } + @Override + public void initialize() + { + dequeueThreadPoolExecutor.execute(listener); + } + /** * Create listener task in charge of dequeuing and sending events ready to be sent. * @return The task in charge of dequeuing and sending events ready to be sent. diff --git a/repository/src/main/java/org/alfresco/repo/event2/EventSender.java b/repository/src/main/java/org/alfresco/repo/event2/EventSender.java index 798c66f111..e23aa1692d 100644 --- a/repository/src/main/java/org/alfresco/repo/event2/EventSender.java +++ b/repository/src/main/java/org/alfresco/repo/event2/EventSender.java @@ -2,7 +2,7 @@ * #%L * Alfresco Repository * %% - * Copyright (C) 2005 - 2023 Alfresco Software Limited + * Copyright (C) 2005 - 2024 Alfresco Software Limited * %% * This file is part of the Alfresco software. * If the software was purchased under a paid Alfresco license, the terms of @@ -40,4 +40,13 @@ public interface EventSender * @param eventProducer - callback function that creates an event */ void accept(Callable>> eventProducer); + + /** + * It's called right after event sender instantiation (see {@link org.alfresco.repo.event2.EventSenderFactoryBean}). + * It might be used to initialize the sender implementation. + */ + default void initialize() + { + //no initialization by default + } } diff --git a/repository/src/main/java/org/alfresco/repo/event2/EventSenderFactoryBean.java b/repository/src/main/java/org/alfresco/repo/event2/EventSenderFactoryBean.java new file mode 100644 index 0000000000..48fc4db3c8 --- /dev/null +++ b/repository/src/main/java/org/alfresco/repo/event2/EventSenderFactoryBean.java @@ -0,0 +1,148 @@ +/* + * #%L + * Alfresco Repository + * %% + * Copyright (C) 2005 - 2024 Alfresco Software Limited + * %% + * This file is part of the Alfresco software. + * If the software was purchased under a paid Alfresco license, the terms of + * the paid license agreement will prevail. Otherwise, the software is + * provided under the following open source license terms: + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + * #L% + */ +package org.alfresco.repo.event2; + +import jakarta.annotation.Nonnull; +import org.alfresco.util.PropertyCheck; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.config.AbstractFactoryBean; +import org.springframework.core.env.PropertyResolver; + +import java.util.Optional; +import java.util.concurrent.Executor; + +public class EventSenderFactoryBean extends AbstractFactoryBean +{ + static final String LEGACY_SKIP_QUEUE_PROPERTY = "repo.event2.queue.skip"; + static final String EVENT_SEND_STRATEGY_PROPERTY = "repo.event2.send.strategy"; + private static final String DIRECT_EVENT_SENDER_NAME = "direct"; + private static final String ASYNC_EVENT_SENDER_NAME = "async"; + + private final PropertyResolver propertyResolver; + private final Event2MessageProducer event2MessageProducer; + private final Executor enqueueThreadPoolExecutor; + private final Executor dequeueThreadPoolExecutor; + + private String configuredSenderName; + private boolean legacySkipQueueConfig; + + public EventSenderFactoryBean(@Autowired PropertyResolver propertyResolver, Event2MessageProducer event2MessageProducer, + Executor enqueueThreadPoolExecutor, Executor dequeueThreadPoolExecutor) + { + super(); + PropertyCheck.mandatory(this, "propertyResolver", propertyResolver); + PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer); + PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor); + PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor); + this.propertyResolver = propertyResolver; + this.event2MessageProducer = event2MessageProducer; + this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor; + this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor; + } + + @Value("${" + LEGACY_SKIP_QUEUE_PROPERTY + ":#{false}}") + public void setLegacySkipQueueConfig(boolean legacySkipQueueConfig) + { + this.legacySkipQueueConfig = legacySkipQueueConfig; + } + + @Value("${" + EVENT_SEND_STRATEGY_PROPERTY + ":#{null}}") + public void setConfiguredSenderName(String configuredSenderName) + { + this.configuredSenderName = configuredSenderName; + } + + @Override + public Class getObjectType() + { + return EventSender.class; + } + + @Override + @Nonnull + protected EventSender createInstance() throws Exception + { + EventSender sender = instantiateConfiguredSender(); + + sender.initialize(); + + return sender; + } + + private EventSender instantiateConfiguredSender() + { + if (isSenderNameConfigured()) + { + return instantiateSender(getConfiguredSenderName()); + } + return isLegacySkipQueueConfigured() ? instantiateDirectSender() : instantiateAsyncSender(); + } + + protected EventSender instantiateSender(String senderName) + { + if (DIRECT_EVENT_SENDER_NAME.equalsIgnoreCase(senderName)) + { + return instantiateDirectSender(); + } + + if (ASYNC_EVENT_SENDER_NAME.equalsIgnoreCase(senderName)) + { + return instantiateAsyncSender(); + } + + throw new IllegalStateException("Failed to instantiate sender: " + senderName); + } + + private DirectEventSender instantiateDirectSender() + { + return new DirectEventSender(event2MessageProducer); + } + + private EnqueuingEventSender instantiateAsyncSender() + { + return new EnqueuingEventSender(event2MessageProducer, enqueueThreadPoolExecutor, dequeueThreadPoolExecutor); + } + + private boolean isSenderNameConfigured() + { + return !Optional.ofNullable(getConfiguredSenderName()) + .map(String::isBlank) + .orElse(true); + } + + private boolean isLegacySkipQueueConfigured() + { + return Optional.ofNullable(propertyResolver.getProperty(LEGACY_SKIP_QUEUE_PROPERTY, Boolean.class)) + .orElse(legacySkipQueueConfig); + } + + private String getConfiguredSenderName() + { + return Optional.ofNullable(propertyResolver.getProperty(EVENT_SEND_STRATEGY_PROPERTY, String.class)) + .orElse(configuredSenderName); + } +} diff --git a/repository/src/main/resources/alfresco/events2-context.xml b/repository/src/main/resources/alfresco/events2-context.xml index 0fb6f3ef43..3e6a2bc8f4 100644 --- a/repository/src/main/resources/alfresco/events2-context.xml +++ b/repository/src/main/resources/alfresco/events2-context.xml @@ -41,7 +41,7 @@ - + @@ -59,13 +59,10 @@ - - - - - - - + + + + diff --git a/repository/src/main/resources/alfresco/repository.properties b/repository/src/main/resources/alfresco/repository.properties index 91f678dee9..02347cc32d 100644 --- a/repository/src/main/resources/alfresco/repository.properties +++ b/repository/src/main/resources/alfresco/repository.properties @@ -1229,7 +1229,10 @@ repo.event2.filter.childAssocTypes=rn:rendition repo.event2.filter.users= # Topic name repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2 +# Specifies the strategy for sending the events +repo.event2.send.strategy= # Specifies if messages should be enqueued in in-memory queue or sent directly to the topic +# Deprecated. Please use repo.event2.send.strategy repo.event2.queue.skip=false #repo.event2.topic.endpoint=amqp:topic:VirtualTopic.alfresco.repo.event2 # Thread pool for async enqueue of repo events diff --git a/repository/src/test/java/org/alfresco/AppContext06TestSuite.java b/repository/src/test/java/org/alfresco/AppContext06TestSuite.java index 49bede2566..1c30218387 100644 --- a/repository/src/test/java/org/alfresco/AppContext06TestSuite.java +++ b/repository/src/test/java/org/alfresco/AppContext06TestSuite.java @@ -44,11 +44,6 @@ import org.junit.runners.Suite; @RunWith(Categories.class) @Categories.ExcludeCategory({DBTests.class, NonBuildTests.class}) @Suite.SuiteClasses({ - // Requires a running ActiveMQ - org.alfresco.repo.rawevents.EventBehaviourTest.class, - org.alfresco.repo.rawevents.TransactionAwareEventProducerTest.class, - org.alfresco.repo.event2.RepoEvent2ITSuite.class, - // Requires running transformers org.alfresco.transform.registry.LocalTransformServiceRegistryConfigTest.class, org.alfresco.repo.rendition2.RenditionService2IntegrationTest.class, @@ -71,7 +66,12 @@ import org.junit.runners.Suite; org.alfresco.repo.blog.BlogServiceImplTest.class, org.alfresco.repo.action.scheduled.ScheduledPersistedActionServiceTest.class, - org.alfresco.repo.rendition2.RenditionDefinitionTest.class + org.alfresco.repo.rendition2.RenditionDefinitionTest.class, + + // Requires a running ActiveMQ + org.alfresco.repo.rawevents.EventBehaviourTest.class, + org.alfresco.repo.rawevents.TransactionAwareEventProducerTest.class, + org.alfresco.repo.event2.RepoEvent2ITSuite.class, }) public class AppContext06TestSuite { diff --git a/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java b/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java index d7af0a25e7..6edf471720 100644 --- a/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java +++ b/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java @@ -2,7 +2,7 @@ * #%L * Alfresco Repository * %% - * Copyright (C) 2005 - 2023 Alfresco Software Limited + * Copyright (C) 2005 - 2024 Alfresco Software Limited * %% * This file is part of the Alfresco software. * If the software was purchased under a paid Alfresco license, the terms of @@ -71,14 +71,14 @@ import org.junit.Before; import org.junit.BeforeClass; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.test.context.TestPropertySource; /** * @author Iulian Aftene */ - +@TestPropertySource(properties = {"repo.event2.queue.skip=false"}) public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest { protected static final boolean DEBUG = false; @@ -123,9 +123,6 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest @Autowired private NamespaceDAO namespaceDAO; - @Value("${repo.event2.queue.skip}") - protected boolean skipEventQueue; - protected NodeRef rootNodeRef; @BeforeClass diff --git a/repository/src/test/java/org/alfresco/repo/event2/DirectEventGeneratorTest.java b/repository/src/test/java/org/alfresco/repo/event2/DirectEventGeneratorTest.java index 1298ded98d..94349cfbbc 100644 --- a/repository/src/test/java/org/alfresco/repo/event2/DirectEventGeneratorTest.java +++ b/repository/src/test/java/org/alfresco/repo/event2/DirectEventGeneratorTest.java @@ -2,7 +2,7 @@ * #%L * Alfresco Repository * %% - * Copyright (C) 2005 - 2023 Alfresco Software Limited + * Copyright (C) 2005 - 2024 Alfresco Software Limited * %% * This file is part of the Alfresco software. * If the software was purchased under a paid Alfresco license, the terms of @@ -25,78 +25,32 @@ */ package org.alfresco.repo.event2; -import java.util.HashSet; -import java.util.Set; - -import org.junit.BeforeClass; import org.junit.Test; -import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.config.BeanPostProcessor; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.ContextHierarchy; +import org.springframework.test.context.TestPropertySource; -@ContextHierarchy({ - // Context hierarchy inherits context config from parent classes and extends it with TestConfig from this class - @ContextConfiguration(classes = DirectEventGeneratorTest.TestConfig.class) -}) -@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +import java.util.Collection; + +@TestPropertySource(properties = {"repo.event2.queue.skip=true"}) public class DirectEventGeneratorTest extends EventGeneratorTest { @Autowired - private InstantiatedBeansRegistry instantiatedBeansRegistry; - + private EventSender eventSender; @Autowired - private EventSender directEventSender; - - @BeforeClass - public static void beforeClass() - { - System.setProperty("repo.event2.queue.skip", "true"); - } + private Collection allEventSenderBeans; @Test - public void testIfEnqueuingEventSenderIsNotInstantiated() + public void testIfOnlyRequiredEventSenderIsInstantiated() { - final Set instantiatedBeans = this.instantiatedBeansRegistry.getBeans(); - - assertTrue(skipEventQueue); - assertFalse(instantiatedBeans.contains("enqueuingEventSender")); + assertEquals(1, allEventSenderBeans.size()); + assertTrue(allEventSenderBeans.contains(eventSender)); } + @Test public void testIfDirectSenderIsSetInEventGenerator() { - assertTrue(skipEventQueue); - assertEquals(directEventSender, eventGenerator.getEventSender()); - } - - @Configuration - public static class TestConfig - { - @Bean - public BeanPostProcessor instantiatedBeansRegistry() - { - return new InstantiatedBeansRegistry(); - } - } - - protected static class InstantiatedBeansRegistry implements BeanPostProcessor - { - private final Set registeredBeans = new HashSet<>(); - - @Override - public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException - { - registeredBeans.add(beanName); - return bean; - } - - public Set getBeans() { - return registeredBeans; - } + assertEquals(DirectEventSender.class, eventSender.getClass()); + assertEquals(eventSender, eventGenerator.getEventSender()); } } diff --git a/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventGeneratorTest.java b/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventGeneratorTest.java index 06e0643ec6..244f89ed23 100644 --- a/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventGeneratorTest.java +++ b/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventGeneratorTest.java @@ -2,7 +2,7 @@ * #%L * Alfresco Repository * %% - * Copyright (C) 2005 - 2023 Alfresco Software Limited + * Copyright (C) 2005 - 2024 Alfresco Software Limited * %% * This file is part of the Alfresco software. * If the software was purchased under a paid Alfresco license, the terms of @@ -27,9 +27,9 @@ package org.alfresco.repo.event2; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; -@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +@TestPropertySource(properties = {"repo.event2.queue.skip=false"}) public class EnqueuingEventGeneratorTest extends EventGeneratorTest { @Autowired @@ -38,7 +38,7 @@ public class EnqueuingEventGeneratorTest extends EventGeneratorTest @Test public void testIfEnqueuingSenderIsSetInEventGenerator() { - assertFalse(skipEventQueue); + assertEquals(EnqueuingEventSender.class, enqueuingEventSender.getClass()); assertEquals(enqueuingEventSender, eventGenerator.getEventSender()); } } diff --git a/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventSenderUnitTest.java b/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventSenderUnitTest.java index ce8604f326..162c99b7dd 100644 --- a/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventSenderUnitTest.java +++ b/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventSenderUnitTest.java @@ -2,7 +2,7 @@ * #%L * Alfresco Repository * %% - * Copyright (C) 2005 - 2023 Alfresco Software Limited + * Copyright (C) 2005 - 2024 Alfresco Software Limited * %% * This file is part of the Alfresco software. * If the software was purchased under a paid Alfresco license, the terms of @@ -65,15 +65,11 @@ public class EnqueuingEventSenderUnitTest @Before public void setup() { - eventSender = new EnqueuingEventSender(); - - enqueuePool = newThreadPool(); - eventSender.setEnqueueThreadPoolExecutor(enqueuePool); - dequeuePool = newThreadPool(); - eventSender.setDequeueThreadPoolExecutor(dequeuePool); - bus = mock(Event2MessageProducer.class); - eventSender.setEvent2MessageProducer(bus); + enqueuePool = newThreadPool(); + dequeuePool = newThreadPool(); + eventSender = new EnqueuingEventSender(bus, enqueuePool, dequeuePool); + eventSender.initialize(); events = new HashMap<>(); diff --git a/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java b/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java index 561cb77ae1..25c6f96cc2 100644 --- a/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java +++ b/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java @@ -2,7 +2,7 @@ * #%L * Alfresco Repository * %% - * Copyright (C) 2005 - 2023 Alfresco Software Limited + * Copyright (C) 2005 - 2024 Alfresco Software Limited * %% * This file is part of the Alfresco software. * If the software was purchased under a paid Alfresco license, the terms of @@ -49,7 +49,6 @@ import org.apache.activemq.command.ActiveMQTopic; import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; public abstract class EventGeneratorTest extends AbstractContextAwareRepoEvent @@ -60,12 +59,6 @@ public abstract class EventGeneratorTest extends AbstractContextAwareRepoEvent private ActiveMQConnection connection; protected List> receivedEvents; - @BeforeClass - public static void beforeClass() - { - System.setProperty("repo.event2.queue.skip", "false"); - } - @Before public void startupTopicListener() throws Exception {