ACS-8323 Extend the possibility to configure the event sending strategy (#2765)

This commit is contained in:
Piotr Żurek
2024-07-11 12:56:45 +02:00
committed by GitHub
parent 0ab31fcc93
commit c6201fa2fa
12 changed files with 213 additions and 125 deletions

View File

@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* Copyright (C) 2005 - 2024 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -36,18 +36,13 @@ import org.springframework.beans.factory.InitializingBean;
/**
* Sends a message to a destination in the current thread.
*/
public class DirectEventSender implements EventSender, InitializingBean
public class DirectEventSender implements EventSender
{
protected Event2MessageProducer event2MessageProducer;
protected final Event2MessageProducer event2MessageProducer;
@Override
public void afterPropertiesSet()
public DirectEventSender(Event2MessageProducer event2MessageProducer)
{
PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
}
public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer)
{
this.event2MessageProducer = event2MessageProducer;
}

View File

@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* Copyright (C) 2005 - 2024 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -46,28 +46,18 @@ public class EnqueuingEventSender extends DirectEventSender
{
protected static final Log LOGGER = LogFactory.getLog(EnqueuingEventSender.class);
protected Executor enqueueThreadPoolExecutor;
protected Executor dequeueThreadPoolExecutor;
protected final Executor enqueueThreadPoolExecutor;
protected final Executor dequeueThreadPoolExecutor;
protected BlockingQueue<EventInMaking> queue = new LinkedBlockingQueue<>();
protected Runnable listener = createListener();
@Override
public void afterPropertiesSet()
public EnqueuingEventSender(Event2MessageProducer event2MessageProducer, Executor enqueueThreadPoolExecutor, Executor dequeueThreadPoolExecutor)
{
super.afterPropertiesSet();
super(event2MessageProducer);
PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor);
PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor);
}
public void setEnqueueThreadPoolExecutor(Executor enqueueThreadPoolExecutor)
{
this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor;
}
public void setDequeueThreadPoolExecutor(Executor dequeueThreadPoolExecutor)
{
this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor;
dequeueThreadPoolExecutor.execute(listener);
}
/**
@@ -91,6 +81,12 @@ public class EnqueuingEventSender extends DirectEventSender
});
}
@Override
public void initialize()
{
dequeueThreadPoolExecutor.execute(listener);
}
/**
* Create listener task in charge of dequeuing and sending events ready to be sent.
* @return The task in charge of dequeuing and sending events ready to be sent.

View File

@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* Copyright (C) 2005 - 2024 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -40,4 +40,13 @@ public interface EventSender
* @param eventProducer - callback function that creates an event
*/
void accept(Callable<Optional<RepoEvent<?>>> eventProducer);
/**
* It's called right after event sender instantiation (see {@link org.alfresco.repo.event2.EventSenderFactoryBean}).
* It might be used to initialize the sender implementation.
*/
default void initialize()
{
//no initialization by default
}
}

View File

@@ -0,0 +1,148 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2024 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.event2;
import jakarta.annotation.Nonnull;
import org.alfresco.util.PropertyCheck;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import org.springframework.core.env.PropertyResolver;
import java.util.Optional;
import java.util.concurrent.Executor;
public class EventSenderFactoryBean extends AbstractFactoryBean<EventSender>
{
static final String LEGACY_SKIP_QUEUE_PROPERTY = "repo.event2.queue.skip";
static final String EVENT_SEND_STRATEGY_PROPERTY = "repo.event2.send.strategy";
private static final String DIRECT_EVENT_SENDER_NAME = "direct";
private static final String ASYNC_EVENT_SENDER_NAME = "async";
private final PropertyResolver propertyResolver;
private final Event2MessageProducer event2MessageProducer;
private final Executor enqueueThreadPoolExecutor;
private final Executor dequeueThreadPoolExecutor;
private String configuredSenderName;
private boolean legacySkipQueueConfig;
public EventSenderFactoryBean(@Autowired PropertyResolver propertyResolver, Event2MessageProducer event2MessageProducer,
Executor enqueueThreadPoolExecutor, Executor dequeueThreadPoolExecutor)
{
super();
PropertyCheck.mandatory(this, "propertyResolver", propertyResolver);
PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor);
PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor);
this.propertyResolver = propertyResolver;
this.event2MessageProducer = event2MessageProducer;
this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor;
this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor;
}
@Value("${" + LEGACY_SKIP_QUEUE_PROPERTY + ":#{false}}")
public void setLegacySkipQueueConfig(boolean legacySkipQueueConfig)
{
this.legacySkipQueueConfig = legacySkipQueueConfig;
}
@Value("${" + EVENT_SEND_STRATEGY_PROPERTY + ":#{null}}")
public void setConfiguredSenderName(String configuredSenderName)
{
this.configuredSenderName = configuredSenderName;
}
@Override
public Class<?> getObjectType()
{
return EventSender.class;
}
@Override
@Nonnull
protected EventSender createInstance() throws Exception
{
EventSender sender = instantiateConfiguredSender();
sender.initialize();
return sender;
}
private EventSender instantiateConfiguredSender()
{
if (isSenderNameConfigured())
{
return instantiateSender(getConfiguredSenderName());
}
return isLegacySkipQueueConfigured() ? instantiateDirectSender() : instantiateAsyncSender();
}
protected EventSender instantiateSender(String senderName)
{
if (DIRECT_EVENT_SENDER_NAME.equalsIgnoreCase(senderName))
{
return instantiateDirectSender();
}
if (ASYNC_EVENT_SENDER_NAME.equalsIgnoreCase(senderName))
{
return instantiateAsyncSender();
}
throw new IllegalStateException("Failed to instantiate sender: " + senderName);
}
private DirectEventSender instantiateDirectSender()
{
return new DirectEventSender(event2MessageProducer);
}
private EnqueuingEventSender instantiateAsyncSender()
{
return new EnqueuingEventSender(event2MessageProducer, enqueueThreadPoolExecutor, dequeueThreadPoolExecutor);
}
private boolean isSenderNameConfigured()
{
return !Optional.ofNullable(getConfiguredSenderName())
.map(String::isBlank)
.orElse(true);
}
private boolean isLegacySkipQueueConfigured()
{
return Optional.ofNullable(propertyResolver.getProperty(LEGACY_SKIP_QUEUE_PROPERTY, Boolean.class))
.orElse(legacySkipQueueConfig);
}
private String getConfiguredSenderName()
{
return Optional.ofNullable(propertyResolver.getProperty(EVENT_SEND_STRATEGY_PROPERTY, String.class))
.orElse(configuredSenderName);
}
}

View File

@@ -41,7 +41,7 @@
<property name="transactionService" ref="transactionService"/>
<property name="personService" ref="personService"/>
<property name="nodeResourceHelper" ref="nodeResourceHelper"/>
<property name="eventSender" ref="#{ ${repo.event2.queue.skip} == true ? 'directEventSender' : 'enqueuingEventSender' }"/>
<property name="eventSender" ref="eventSender"/>
<property name="nodeDAO" ref="nodeDAO"/>
<property name="enabled" value="${repo.event2.enabled}"/>
</bean>
@@ -59,13 +59,10 @@
<bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2"/>
<bean id="directEventSender" class="org.alfresco.repo.event2.DirectEventSender">
<property name="event2MessageProducer" ref="event2MessageProducer"/>
</bean>
<bean id="enqueuingEventSender" class="org.alfresco.repo.event2.EnqueuingEventSender" parent="directEventSender" lazy-init="true">
<property name="enqueueThreadPoolExecutor" ref="eventAsyncEnqueueThreadPool"/>
<property name="dequeueThreadPoolExecutor" ref="eventAsyncDequeueThreadPool"/>
<bean id="eventSender" class="org.alfresco.repo.event2.EventSenderFactoryBean" autowire="constructor">
<constructor-arg ref="event2MessageProducer"/>
<constructor-arg ref="eventAsyncEnqueueThreadPool"/>
<constructor-arg ref="eventAsyncDequeueThreadPool"/>
</bean>
<bean id="eventAsyncEnqueueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean">

View File

@@ -1229,7 +1229,10 @@ repo.event2.filter.childAssocTypes=rn:rendition
repo.event2.filter.users=
# Topic name
repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2
# Specifies the strategy for sending the events
repo.event2.send.strategy=
# Specifies if messages should be enqueued in in-memory queue or sent directly to the topic
# Deprecated. Please use repo.event2.send.strategy
repo.event2.queue.skip=false
#repo.event2.topic.endpoint=amqp:topic:VirtualTopic.alfresco.repo.event2
# Thread pool for async enqueue of repo events

View File

@@ -44,11 +44,6 @@ import org.junit.runners.Suite;
@RunWith(Categories.class)
@Categories.ExcludeCategory({DBTests.class, NonBuildTests.class})
@Suite.SuiteClasses({
// Requires a running ActiveMQ
org.alfresco.repo.rawevents.EventBehaviourTest.class,
org.alfresco.repo.rawevents.TransactionAwareEventProducerTest.class,
org.alfresco.repo.event2.RepoEvent2ITSuite.class,
// Requires running transformers
org.alfresco.transform.registry.LocalTransformServiceRegistryConfigTest.class,
org.alfresco.repo.rendition2.RenditionService2IntegrationTest.class,
@@ -71,7 +66,12 @@ import org.junit.runners.Suite;
org.alfresco.repo.blog.BlogServiceImplTest.class,
org.alfresco.repo.action.scheduled.ScheduledPersistedActionServiceTest.class,
org.alfresco.repo.rendition2.RenditionDefinitionTest.class
org.alfresco.repo.rendition2.RenditionDefinitionTest.class,
// Requires a running ActiveMQ
org.alfresco.repo.rawevents.EventBehaviourTest.class,
org.alfresco.repo.rawevents.TransactionAwareEventProducerTest.class,
org.alfresco.repo.event2.RepoEvent2ITSuite.class,
})
public class AppContext06TestSuite
{

View File

@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* Copyright (C) 2005 - 2024 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -71,14 +71,14 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.test.context.TestPropertySource;
/**
* @author Iulian Aftene
*/
@TestPropertySource(properties = {"repo.event2.queue.skip=false"})
public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
{
protected static final boolean DEBUG = false;
@@ -123,9 +123,6 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
@Autowired
private NamespaceDAO namespaceDAO;
@Value("${repo.event2.queue.skip}")
protected boolean skipEventQueue;
protected NodeRef rootNodeRef;
@BeforeClass

View File

@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* Copyright (C) 2005 - 2024 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -25,78 +25,32 @@
*/
package org.alfresco.repo.event2;
import java.util.HashSet;
import java.util.Set;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.ContextHierarchy;
import org.springframework.test.context.TestPropertySource;
@ContextHierarchy({
// Context hierarchy inherits context config from parent classes and extends it with TestConfig from this class
@ContextConfiguration(classes = DirectEventGeneratorTest.TestConfig.class)
})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
import java.util.Collection;
@TestPropertySource(properties = {"repo.event2.queue.skip=true"})
public class DirectEventGeneratorTest extends EventGeneratorTest
{
@Autowired
private InstantiatedBeansRegistry instantiatedBeansRegistry;
private EventSender eventSender;
@Autowired
private EventSender directEventSender;
@BeforeClass
public static void beforeClass()
{
System.setProperty("repo.event2.queue.skip", "true");
}
private Collection<EventSender> allEventSenderBeans;
@Test
public void testIfEnqueuingEventSenderIsNotInstantiated()
public void testIfOnlyRequiredEventSenderIsInstantiated()
{
final Set<String> instantiatedBeans = this.instantiatedBeansRegistry.getBeans();
assertTrue(skipEventQueue);
assertFalse(instantiatedBeans.contains("enqueuingEventSender"));
assertEquals(1, allEventSenderBeans.size());
assertTrue(allEventSenderBeans.contains(eventSender));
}
@Test
public void testIfDirectSenderIsSetInEventGenerator()
{
assertTrue(skipEventQueue);
assertEquals(directEventSender, eventGenerator.getEventSender());
}
@Configuration
public static class TestConfig
{
@Bean
public BeanPostProcessor instantiatedBeansRegistry()
{
return new InstantiatedBeansRegistry();
}
}
protected static class InstantiatedBeansRegistry implements BeanPostProcessor
{
private final Set<String> registeredBeans = new HashSet<>();
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException
{
registeredBeans.add(beanName);
return bean;
}
public Set<String> getBeans() {
return registeredBeans;
}
assertEquals(DirectEventSender.class, eventSender.getClass());
assertEquals(eventSender, eventGenerator.getEventSender());
}
}

View File

@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* Copyright (C) 2005 - 2024 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -27,9 +27,9 @@ package org.alfresco.repo.event2;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
@TestPropertySource(properties = {"repo.event2.queue.skip=false"})
public class EnqueuingEventGeneratorTest extends EventGeneratorTest
{
@Autowired
@@ -38,7 +38,7 @@ public class EnqueuingEventGeneratorTest extends EventGeneratorTest
@Test
public void testIfEnqueuingSenderIsSetInEventGenerator()
{
assertFalse(skipEventQueue);
assertEquals(EnqueuingEventSender.class, enqueuingEventSender.getClass());
assertEquals(enqueuingEventSender, eventGenerator.getEventSender());
}
}

View File

@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* Copyright (C) 2005 - 2024 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -65,15 +65,11 @@ public class EnqueuingEventSenderUnitTest
@Before
public void setup()
{
eventSender = new EnqueuingEventSender();
enqueuePool = newThreadPool();
eventSender.setEnqueueThreadPoolExecutor(enqueuePool);
dequeuePool = newThreadPool();
eventSender.setDequeueThreadPoolExecutor(dequeuePool);
bus = mock(Event2MessageProducer.class);
eventSender.setEvent2MessageProducer(bus);
enqueuePool = newThreadPool();
dequeuePool = newThreadPool();
eventSender = new EnqueuingEventSender(bus, enqueuePool, dequeuePool);
eventSender.initialize();
events = new HashMap<>();

View File

@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* Copyright (C) 2005 - 2024 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -49,7 +49,6 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public abstract class EventGeneratorTest extends AbstractContextAwareRepoEvent
@@ -60,12 +59,6 @@ public abstract class EventGeneratorTest extends AbstractContextAwareRepoEvent
private ActiveMQConnection connection;
protected List<RepoEvent<?>> receivedEvents;
@BeforeClass
public static void beforeClass()
{
System.setProperty("repo.event2.queue.skip", "false");
}
@Before
public void startupTopicListener() throws Exception
{