ACS-8832 Fix Event Outbox shutdown (#2959)

* ACS-8832 Add destroy method in EventSender

* ACS-8832 Rename

* ACS-8832 Reformat
This commit is contained in:
Damian Ujma
2024-10-02 16:32:53 +02:00
committed by GitHub
parent dde8dc90e6
commit aeebd3dcc6
2 changed files with 75 additions and 56 deletions

View File

@@ -38,6 +38,12 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.extensions.surf.util.AbstractLifecycleBean;
import org.alfresco.repo.domain.node.NodeDAO; import org.alfresco.repo.domain.node.NodeDAO;
import org.alfresco.repo.domain.node.TransactionEntity; import org.alfresco.repo.domain.node.TransactionEntity;
import org.alfresco.repo.event.v1.model.DataAttributes; import org.alfresco.repo.event.v1.model.DataAttributes;
@@ -81,11 +87,6 @@ import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.PropertyCheck; import org.alfresco.util.PropertyCheck;
import org.alfresco.util.TriPredicate; import org.alfresco.util.TriPredicate;
import org.alfresco.util.transaction.TransactionListenerAdapter; import org.alfresco.util.transaction.TransactionListenerAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.extensions.surf.util.AbstractLifecycleBean;
/** /**
* Generates events and sends them to an event topic. * Generates events and sends them to an event topic.
@@ -93,8 +94,8 @@ import org.springframework.extensions.surf.util.AbstractLifecycleBean;
* @author Jamal Kaabi-Mofrad * @author Jamal Kaabi-Mofrad
*/ */
public class EventGenerator extends AbstractLifecycleBean implements InitializingBean, EventSupportedPolicies, public class EventGenerator extends AbstractLifecycleBean implements InitializingBean, EventSupportedPolicies,
ChildAssociationEventSupportedPolicies, ChildAssociationEventSupportedPolicies,
PeerAssociationEventSupportedPolicies PeerAssociationEventSupportedPolicies
{ {
private static final Log LOGGER = LogFactory.getLog(EventGenerator.class); private static final Log LOGGER = LogFactory.getLog(EventGenerator.class);
@@ -354,7 +355,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
} }
protected ChildAssociationEventConsolidator createChildAssociationEventConsolidator( protected ChildAssociationEventConsolidator createChildAssociationEventConsolidator(
ChildAssociationRef childAssociationRef) ChildAssociationRef childAssociationRef)
{ {
return new ChildAssociationEventConsolidator(childAssociationRef, nodeResourceHelper); return new ChildAssociationEventConsolidator(childAssociationRef, nodeResourceHelper);
} }
@@ -417,8 +418,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
} }
/** /**
* @return the {@link NodeEventConsolidator} for the supplied {@code nodeRef} from * @return the {@link NodeEventConsolidator} for the supplied {@code nodeRef} from the current transaction context.
* the current transaction context.
*/ */
protected NodeEventConsolidator getEventConsolidator(NodeRef nodeRef) protected NodeEventConsolidator getEventConsolidator(NodeRef nodeRef)
{ {
@@ -438,7 +438,6 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
return eventConsolidator; return eventConsolidator;
} }
protected Consolidators getTxnConsolidators(Object resourceKey) protected Consolidators getTxnConsolidators(Object resourceKey)
{ {
Consolidators consolidators = AlfrescoTransactionSupport.getResource(resourceKey); Consolidators consolidators = AlfrescoTransactionSupport.getResource(resourceKey);
@@ -451,8 +450,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
} }
/** /**
* @return the {@link ChildAssociationEventConsolidator} for the supplied {@code childAssociationRef} from * @return the {@link ChildAssociationEventConsolidator} for the supplied {@code childAssociationRef} from the current transaction context.
* the current transaction context.
*/ */
private ChildAssociationEventConsolidator getEventConsolidator(ChildAssociationRef childAssociationRef) private ChildAssociationEventConsolidator getEventConsolidator(ChildAssociationRef childAssociationRef)
{ {
@@ -473,8 +471,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
} }
/** /**
* @return the {@link PeerAssociationEventConsolidator} for the supplied {@code peerAssociationRef} from * @return the {@link PeerAssociationEventConsolidator} for the supplied {@code peerAssociationRef} from the current transaction context.
* the current transaction context.
*/ */
private PeerAssociationEventConsolidator getEventConsolidator(AssociationRef peerAssociationRef) private PeerAssociationEventConsolidator getEventConsolidator(AssociationRef peerAssociationRef)
{ {
@@ -507,10 +504,10 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
protected EventInfo getEventInfo(String user) protected EventInfo getEventInfo(String user)
{ {
return new EventInfo().setTimestamp(getCurrentTransactionTimestamp()) return new EventInfo().setTimestamp(getCurrentTransactionTimestamp())
.setId(UUID.randomUUID().toString()) .setId(UUID.randomUUID().toString())
.setTxnId(AlfrescoTransactionSupport.getTransactionId()) .setTxnId(AlfrescoTransactionSupport.getTransactionId())
.setPrincipal(user) .setPrincipal(user)
.setSource(URI.create("/" + descriptorService.getCurrentRepositoryDescriptor().getId())); .setSource(URI.create("/" + descriptorService.getCurrentRepositoryDescriptor().getId()));
} }
private ZonedDateTime getCurrentTransactionTimestamp() private ZonedDateTime getCurrentTransactionTimestamp()
@@ -523,13 +520,12 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
private static ChildAssociationRef childAssociationWithoutParentOf(ChildAssociationRef childAssociationRef) private static ChildAssociationRef childAssociationWithoutParentOf(ChildAssociationRef childAssociationRef)
{ {
return new ChildAssociationRef( return new ChildAssociationRef(
null, null,
null, null,
childAssociationRef.getQName(), childAssociationRef.getQName(),
childAssociationRef.getChildRef(), childAssociationRef.getChildRef(),
childAssociationRef.isPrimary(), childAssociationRef.isPrimary(),
childAssociationRef.getNthSibling() childAssociationRef.getNthSibling());
);
} }
@Override @Override
@@ -546,7 +542,10 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
@Override @Override
protected void onShutdown(ApplicationEvent applicationEvent) protected void onShutdown(ApplicationEvent applicationEvent)
{ {
//NOOP if (eventSender != null)
{
eventSender.destroy();
}
} }
protected class EventTransactionListener extends TransactionListenerAdapter protected class EventTransactionListener extends TransactionListenerAdapter
@@ -586,8 +585,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
} }
/** /**
* @return true if a node transaction is not only active, but also committed with modifications. * @return true if a node transaction is not only active, but also committed with modifications. This means that a {@link TransactionEntity} object was created.
* This means that a {@link TransactionEntity} object was created.
*/ */
protected boolean isTransactionCommitted() protected boolean isTransactionCommitted()
{ {
@@ -601,7 +599,8 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
try try
{ {
sendEvents(); sendEvents();
} catch (Exception e) }
catch (Exception e)
{ {
// Must consume the exception to protect other TransactionListeners // Must consume the exception to protect other TransactionListeners
LOGGER.error("Unexpected error while sending repository events", e); LOGGER.error("Unexpected error while sending repository events", e);
@@ -650,14 +649,19 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
/** /**
* Handles all kinds of events and sends them within dedicated transaction. * Handles all kinds of events and sends them within dedicated transaction.
* *
* @param entityReference - reference to an entity (e.g. node, child association, peer association) * @param entityReference
* @param eventConsolidator - object encapsulating events occurred in a transaction * - reference to an entity (e.g. node, child association, peer association)
* @param entityToEventEligibilityVerifier - allows to verify if entity is eligible to generate an even. If null no verification is necessary * @param eventConsolidator
* @param <REF> - entity reference type (e.g. {@link NodeRef}, {@link AssociationRef}, {@link ChildAssociationRef}) * - object encapsulating events occurred in a transaction
* @param <CON> - event consolidator type - extension of {@link EventConsolidator} * @param entityToEventEligibilityVerifier
* - allows to verify if entity is eligible to generate an even. If null no verification is necessary
* @param <REF>
* - entity reference type (e.g. {@link NodeRef}, {@link AssociationRef}, {@link ChildAssociationRef})
* @param <CON>
* - event consolidator type - extension of {@link EventConsolidator}
*/ */
private <REF extends EntityRef, CON extends EventConsolidator<REF, ? extends Resource>> void sendEvent( private <REF extends EntityRef, CON extends EventConsolidator<REF, ? extends Resource>> void sendEvent(
final REF entityReference, final CON eventConsolidator, final TriPredicate<REF, CON, EventInfo> entityToEventEligibilityVerifier) final REF entityReference, final CON eventConsolidator, final TriPredicate<REF, CON, EventInfo> entityToEventEligibilityVerifier)
{ {
final EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); final EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
if (isSendingEventBeforeCommitRequired()) if (isSendingEventBeforeCommitRequired())
@@ -676,16 +680,22 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
/** /**
* Creates events from various kinds of entities. * Creates events from various kinds of entities.
* *
* @param entityReference - reference to an entity (e.g. node, child association, peer association) * @param entityReference
* @param eventConsolidator - object encapsulating events occurred in a transaction * - reference to an entity (e.g. node, child association, peer association)
* @param eventInfo - object holding the event information * @param eventConsolidator
* @param entityToEventEligibilityVerifier - allows to verify if entity is eligible to generate an even. If null no verification is necessary * - object encapsulating events occurred in a transaction
* @param <REF> - entity reference type (e.g. {@link NodeRef}, {@link AssociationRef}, {@link ChildAssociationRef}) * @param eventInfo
* @param <CON> - event consolidator type - extension of {@link EventConsolidator} * - object holding the event information
* @param entityToEventEligibilityVerifier
* - allows to verify if entity is eligible to generate an even. If null no verification is necessary
* @param <REF>
* - entity reference type (e.g. {@link NodeRef}, {@link AssociationRef}, {@link ChildAssociationRef})
* @param <CON>
* - event consolidator type - extension of {@link EventConsolidator}
*/ */
private <REF extends EntityRef, CON extends EventConsolidator<REF, ? extends Resource>> Optional<RepoEvent<?>> createEvent( private <REF extends EntityRef, CON extends EventConsolidator<REF, ? extends Resource>> Optional<RepoEvent<?>> createEvent(
final REF entityReference, final CON eventConsolidator, final EventInfo eventInfo, final REF entityReference, final CON eventConsolidator, final EventInfo eventInfo,
final TriPredicate<REF, CON, EventInfo> entityToEventEligibilityVerifier) final TriPredicate<REF, CON, EventInfo> entityToEventEligibilityVerifier)
{ {
if (eventConsolidator.isTemporaryEntity()) if (eventConsolidator.isTemporaryEntity())
{ {
@@ -719,8 +729,8 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
if (LOGGER.isTraceEnabled()) if (LOGGER.isTraceEnabled())
{ {
LOGGER.trace("EventFilter - Excluding node: '" + nodeReference + "' of type: '" LOGGER.trace("EventFilter - Excluding node: '" + nodeReference + "' of type: '"
+ ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString()) + ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString())
+ "' created by: " + user); + "' created by: " + user);
} }
return false; return false;
} }
@@ -747,8 +757,8 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
if (LOGGER.isTraceEnabled()) if (LOGGER.isTraceEnabled())
{ {
LOGGER.trace("EventFilter - Excluding child association: '" + childAssociationReference + "' of type: '" LOGGER.trace("EventFilter - Excluding child association: '" + childAssociationReference + "' of type: '"
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString()) + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
+ "' created by: " + user); + "' created by: " + user);
} }
return false; return false;
} }
@@ -757,8 +767,8 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
if (LOGGER.isTraceEnabled()) if (LOGGER.isTraceEnabled())
{ {
LOGGER.trace("EventFilter - Excluding primary child association: '" + childAssociationReference + "' of type: '" LOGGER.trace("EventFilter - Excluding primary child association: '" + childAssociationReference + "' of type: '"
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString()) + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
+ "' created by: " + user); + "' created by: " + user);
} }
return false; return false;
} }
@@ -804,7 +814,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
{ {
if (peerAssocs == null) if (peerAssocs == null)
{ {
peerAssocs = new LinkedHashMap<>(29); peerAssocs = new LinkedHashMap<>(29);
} }
return peerAssocs; return peerAssocs;
} }

View File

@@ -37,17 +37,26 @@ public interface EventSender
{ {
/** /**
* Accepts a callback function creating an event and sends this event to specified destination. * Accepts a callback function creating an event and sends this event to specified destination.
* @param eventProducer - callback function that creates an event *
* @param eventProducer
* - callback function that creates an event
*/ */
void accept(Callable<Optional<RepoEvent<?>>> eventProducer); void accept(Callable<Optional<RepoEvent<?>>> eventProducer);
/** /**
* It's called right after event sender instantiation (see {@link org.alfresco.repo.event2.EventSenderFactoryBean}). * It's called right after event sender instantiation (see {@link org.alfresco.repo.event2.EventSenderFactoryBean}). It might be used to initialize the sender implementation.
* It might be used to initialize the sender implementation.
*/ */
default void initialize() default void initialize()
{ {
//no initialization by default // no initialization by default
}
/**
* It's called when the application context is closing, allowing {@link org.alfresco.repo.event2.EventGenerator} to perform cleanup operations.
*/
default void destroy()
{
// no destruction by default
} }
default boolean shouldParticipateInTransaction() default boolean shouldParticipateInTransaction()