ACS-5451: Toggle for direct Event sending (#2082)

* ACS-5451: Toggle for direct Event sending
- added new bean allowing direct event sending and a toggle (switch) property
- refactored EventGenerator logic related with creating and sending events
- refactored consolidators - renamed EventConsolidator -> NodeEventConsolidator, and moved common logic to new abstract EventConsolidator
- added integration tests
- added JavaDoc
- refactored events related tests
This commit is contained in:
Krystian Dabrowski
2023-08-22 08:30:05 +02:00
committed by GitHub
parent 3c242bc62b
commit e8a27dd68d
22 changed files with 1238 additions and 1032 deletions

View File

@@ -1,5 +1,5 @@
/* /*
* Copyright (C) 2005-2010 Alfresco Software Limited. * Copyright (C) 2005-2023 Alfresco Software Limited.
* *
* This file is part of Alfresco * This file is part of Alfresco
* *
@@ -53,7 +53,7 @@ public interface TransactionListener
* on the state of the transaction. * on the state of the transaction.
* <p> * <p>
* Although all transaction resources are still available, this method should * Although all transaction resources are still available, this method should
* be used only for cleaning up resources after a commit has occured. * be used only for cleaning up resources after a commit has occurred.
*/ */
void afterCommit(); void afterCommit();
@@ -64,7 +64,7 @@ public interface TransactionListener
* on the state of the transaction. * on the state of the transaction.
* <p> * <p>
* Although all transaction resources are still available, this method should * Although all transaction resources are still available, this method should
* be used only for cleaning up resources after a rollback has occured. * be used only for cleaning up resources after a rollback has occurred.
*/ */
void afterRollback(); void afterRollback();
} }

View File

@@ -2,7 +2,7 @@
* #%L * #%L
* Alfresco Repository * Alfresco Repository
* %% * %%
* Copyright (C) 2005 - 2020 Alfresco Software Limited * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %% * %%
* This file is part of the Alfresco software. * This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of * If the software was purchased under a paid Alfresco license, the terms of
@@ -25,13 +25,8 @@
*/ */
package org.alfresco.repo.event2; package org.alfresco.repo.event2;
import java.util.ArrayDeque;
import java.util.Deque;
import org.alfresco.repo.event.v1.model.ChildAssociationResource; import org.alfresco.repo.event.v1.model.ChildAssociationResource;
import org.alfresco.repo.event.v1.model.DataAttributes;
import org.alfresco.repo.event.v1.model.EventData;
import org.alfresco.repo.event.v1.model.EventType; import org.alfresco.repo.event.v1.model.EventType;
import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.service.cmr.repository.ChildAssociationRef; import org.alfresco.service.cmr.repository.ChildAssociationRef;
import org.alfresco.service.namespace.QName; import org.alfresco.service.namespace.QName;
@@ -41,51 +36,12 @@ import org.alfresco.service.namespace.QName;
* @author Chris Shields * @author Chris Shields
* @author Sara Aspery * @author Sara Aspery
*/ */
public class ChildAssociationEventConsolidator implements ChildAssociationEventSupportedPolicies public class ChildAssociationEventConsolidator extends EventConsolidator<ChildAssociationRef, ChildAssociationResource> implements ChildAssociationEventSupportedPolicies
{ {
private final Deque<EventType> eventTypes;
protected final ChildAssociationRef childAssociationRef;
private ChildAssociationResource resource;
private final NodeResourceHelper helper;
public ChildAssociationEventConsolidator(ChildAssociationRef childAssociationRef, NodeResourceHelper helper) public ChildAssociationEventConsolidator(ChildAssociationRef childAssociationRef, NodeResourceHelper helper)
{ {
this.eventTypes = new ArrayDeque<>(); super(childAssociationRef, helper);
this.childAssociationRef = childAssociationRef;
this.helper = helper;
this.resource = buildChildAssociationResource(this.childAssociationRef);
}
/**
* Builds and returns the {@link RepoEvent} instance.
*
* @param eventInfo the object holding the event information
* @return the {@link RepoEvent} instance
*/
public RepoEvent<DataAttributes<ChildAssociationResource>> getRepoEvent(EventInfo eventInfo)
{
EventType eventType = getDerivedEvent();
DataAttributes<ChildAssociationResource> eventData = buildEventData(eventInfo, resource);
return RepoEvent.<DataAttributes<ChildAssociationResource>>builder()
.setId(eventInfo.getId())
.setSource(eventInfo.getSource())
.setTime(eventInfo.getTimestamp())
.setType(eventType.getType())
.setData(eventData)
.setDataschema(EventJSONSchema.getSchemaV1(eventType))
.build();
}
protected DataAttributes<ChildAssociationResource> buildEventData(EventInfo eventInfo, ChildAssociationResource resource)
{
return EventData.<ChildAssociationResource>builder()
.setEventGroupId(eventInfo.getTxnId())
.setResource(resource)
.build();
} }
/** /**
@@ -121,12 +77,10 @@ public class ChildAssociationEventConsolidator implements ChildAssociationEventS
return new ChildAssociationResource(parentId, childId, assocType, assocQName); return new ChildAssociationResource(parentId, childId, assocType, assocQName);
} }
/** @Override
* @return a derived event for a transaction. protected EventType getDerivedEvent()
*/
private EventType getDerivedEvent()
{ {
if (isTemporaryChildAssociation()) if (isTemporaryEntity())
{ {
// This event will be filtered out, but we set the correct // This event will be filtered out, but we set the correct
// event type anyway for debugging purposes // event type anyway for debugging purposes
@@ -147,33 +101,15 @@ public class ChildAssociationEventConsolidator implements ChildAssociationEventS
} }
} }
/** @Override
* Whether or not the child association has been created and then deleted, i.e. a temporary child association. public boolean isTemporaryEntity()
*
* @return {@code true} if the child association has been created and then deleted, otherwise false
*/
public boolean isTemporaryChildAssociation()
{ {
return eventTypes.contains(EventType.CHILD_ASSOC_CREATED) && eventTypes.getLast() == EventType.CHILD_ASSOC_DELETED; return eventTypes.contains(EventType.CHILD_ASSOC_CREATED) && eventTypes.getLast() == EventType.CHILD_ASSOC_DELETED;
} }
/** @Override
* Get child association type. public QName getEntityType()
*
* @return QName the child association type
*/
public QName getChildAssocType()
{ {
return childAssociationRef.getTypeQName(); return entityReference.getTypeQName();
}
/**
* Get event types.
*
* @return Deque<EventType> queue of event types
*/
public Deque<EventType> getEventTypes()
{
return eventTypes;
} }
} }

View File

@@ -0,0 +1,70 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.event2;
import java.util.Optional;
import java.util.concurrent.Callable;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.util.PropertyCheck;
import org.springframework.beans.factory.InitializingBean;
/**
* Sends a message to a destination in the current thread.
*/
public class DirectEventSender implements EventSender, InitializingBean
{
protected Event2MessageProducer event2MessageProducer;
@Override
public void afterPropertiesSet()
{
PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
}
public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer)
{
this.event2MessageProducer = event2MessageProducer;
}
@Override
public void accept(Callable<Optional<RepoEvent<?>>> eventProducer)
{
try
{
eventProducer.call().ifPresent(event -> event2MessageProducer.send(event));
}
catch (RuntimeException e)
{
throw e;
}
catch (Exception e)
{
throw new AlfrescoRuntimeException("Unexpected error while executing maker function for repository event", e);
}
}
}

View File

@@ -2,7 +2,7 @@
* #%L * #%L
* Alfresco Repository * Alfresco Repository
* %% * %%
* Copyright (C) 2005 - 2021 Alfresco Software Limited * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %% * %%
* This file is part of the Alfresco software. * This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of * If the software was purchased under a paid Alfresco license, the terms of
@@ -25,6 +25,7 @@
*/ */
package org.alfresco.repo.event2; package org.alfresco.repo.event2;
import java.util.Optional;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@@ -36,40 +37,33 @@ import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.util.PropertyCheck; import org.alfresco.util.PropertyCheck;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
/* /**
* This queue allows to create asynchronously the RepoEvent offloading the work to a ThreadPool but * Enqueuing event sender allows to create asynchronously the RepoEvent offloading the work to a ThreadPool but
* at the same time it preserves the order of the events * at the same time it preserves the order of the events.
*/ */
public class EventGeneratorQueue implements InitializingBean public class EnqueuingEventSender extends DirectEventSender
{ {
protected static final Log LOGGER = LogFactory.getLog(EventGeneratorQueue.class); protected static final Log LOGGER = LogFactory.getLog(EnqueuingEventSender.class);
protected Executor enqueueThreadPoolExecutor; protected Executor enqueueThreadPoolExecutor;
protected Executor dequeueThreadPoolExecutor; protected Executor dequeueThreadPoolExecutor;
protected Event2MessageProducer event2MessageProducer;
protected BlockingQueue<EventInMaking> queue = new LinkedBlockingQueue<>(); protected BlockingQueue<EventInMaking> queue = new LinkedBlockingQueue<>();
protected Runnable listener = createListener(); protected Runnable listener = createListener();
@Override @Override
public void afterPropertiesSet() throws Exception public void afterPropertiesSet()
{ {
super.afterPropertiesSet();
PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor); PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor);
PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor); PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor);
PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
} }
public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer)
{
this.event2MessageProducer = event2MessageProducer;
}
public void setEnqueueThreadPoolExecutor(Executor enqueueThreadPoolExecutor) public void setEnqueueThreadPoolExecutor(Executor enqueueThreadPoolExecutor)
{ {
this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor; this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor;
} }
public void setDequeueThreadPoolExecutor(Executor dequeueThreadPoolExecutor) public void setDequeueThreadPoolExecutor(Executor dequeueThreadPoolExecutor)
{ {
this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor; this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor;
@@ -78,11 +72,12 @@ public class EventGeneratorQueue implements InitializingBean
/** /**
* Procedure to enqueue the callback functions that creates an event. * Procedure to enqueue the callback functions that creates an event.
* @param maker Callback function that creates an event. * @param eventProducer Callback function that creates an event.
*/ */
public void accept(Callable<RepoEvent<?>> maker) @Override
public void accept(Callable<Optional<RepoEvent<?>>> eventProducer)
{ {
EventInMaking eventInMaking = new EventInMaking(maker); EventInMaking eventInMaking = new EventInMaking(eventProducer);
queue.offer(eventInMaking); queue.offer(eventInMaking);
enqueueThreadPoolExecutor.execute(() -> { enqueueThreadPoolExecutor.execute(() -> {
try try
@@ -102,78 +97,67 @@ public class EventGeneratorQueue implements InitializingBean
*/ */
private Runnable createListener() private Runnable createListener()
{ {
return new Runnable() return () -> {
{ try
@Override
public void run()
{ {
try while (!Thread.interrupted())
{ {
while (!Thread.interrupted()) try
{ {
try queue.take().getEventWhenReady().ifPresent(event -> event2MessageProducer.send(event));
{ }
EventInMaking eventInMaking = queue.take(); catch (Exception e)
RepoEvent<?> event = eventInMaking.getEventWhenReady(); {
if (event != null) LOGGER.error("Unexpected error while dequeuing and sending repository event " + e);
{
event2MessageProducer.send(event);
}
}
catch (Exception e)
{
LOGGER.error("Unexpected error while dequeuing and sending repository event" + e);
}
} }
} }
finally }
{ finally
LOGGER.warn("Unexpected: rescheduling the listener thread."); {
dequeueThreadPoolExecutor.execute(listener); LOGGER.warn("Unexpected: rescheduling the listener thread.");
} dequeueThreadPoolExecutor.execute(listener);
} }
}; };
} }
/* /**
* Simple class that makes events and allows to retrieve them when ready * Simple class that makes events and allows to retrieve them when ready
*/ */
private static class EventInMaking private static class EventInMaking
{ {
private Callable<RepoEvent<?>> maker; private final Callable<Optional<RepoEvent<?>>> maker;
private volatile RepoEvent<?> event; private volatile RepoEvent<?> event;
private CountDownLatch latch; private final CountDownLatch latch;
public EventInMaking(Callable<RepoEvent<?>> maker) public EventInMaking(Callable<Optional<RepoEvent<?>>> maker)
{ {
this.maker = maker; this.maker = maker;
this.latch = new CountDownLatch(1); this.latch = new CountDownLatch(1);
} }
public void make() throws Exception public void make() throws Exception
{ {
try try
{ {
event = maker.call(); event = maker.call().orElse(null);
} }
finally finally
{ {
latch.countDown(); latch.countDown();
} }
} }
public RepoEvent<?> getEventWhenReady() throws InterruptedException public Optional<RepoEvent<?>> getEventWhenReady() throws InterruptedException
{ {
latch.await(30, TimeUnit.SECONDS); latch.await(30, TimeUnit.SECONDS);
return event; return Optional.ofNullable(event);
} }
@Override @Override
public String toString() public String toString()
{ {
return maker.toString(); return maker.toString();
} }
} }
} }

View File

@@ -25,60 +25,66 @@
*/ */
package org.alfresco.repo.event2; package org.alfresco.repo.event2;
import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Deque; import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.event.v1.model.ContentInfo;
import org.alfresco.repo.event.v1.model.DataAttributes; import org.alfresco.repo.event.v1.model.DataAttributes;
import org.alfresco.repo.event.v1.model.EventData; import org.alfresco.repo.event.v1.model.EventData;
import org.alfresco.repo.event.v1.model.EventType; import org.alfresco.repo.event.v1.model.EventType;
import org.alfresco.repo.event.v1.model.NodeResource;
import org.alfresco.repo.event.v1.model.NodeResource.Builder;
import org.alfresco.repo.event.v1.model.RepoEvent; import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.repo.event.v1.model.UserInfo; import org.alfresco.repo.event.v1.model.Resource;
import org.alfresco.service.cmr.repository.ChildAssociationRef; import org.alfresco.service.cmr.repository.EntityRef;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.namespace.QName; import org.alfresco.service.namespace.QName;
/** /**
* Encapsulates events occurred in a single transaction. * Encapsulates events occurred in a single transaction.
* *
* @author Jamal Kaabi-Mofrad * @param <REF> entity (e.g. node, child association, peer association) reference type
* @param <RES> entity resource type
*/ */
public class EventConsolidator implements EventSupportedPolicies public abstract class EventConsolidator<REF extends EntityRef, RES extends Resource>
{ {
private final NodeResourceHelper helper;
protected final Deque<EventType> eventTypes; protected final Deque<EventType> eventTypes;
private final List<QName> aspectsAdded; protected final NodeResourceHelper helper;
private final List<QName> aspectsRemoved; protected REF entityReference;
protected RES resource;
protected NodeRef nodeRef; public EventConsolidator(final REF entityReference, final NodeResourceHelper nodeResourceHelper)
private NodeResource.Builder resourceBuilder;
private Map<QName, Serializable> propertiesBefore;
private Map<QName, Serializable> propertiesAfter;
private QName nodeType;
private QName nodeTypeBefore;
private List<String> primaryHierarchyBefore;
private boolean resourceBeforeAllFieldsNull = true;
public EventConsolidator(NodeResourceHelper nodeResourceHelper)
{ {
this.helper = nodeResourceHelper;
this.eventTypes = new ArrayDeque<>(); this.eventTypes = new ArrayDeque<>();
this.aspectsAdded = new ArrayList<>(); this.entityReference = entityReference;
this.aspectsRemoved = new ArrayList<>(); this.helper = nodeResourceHelper;
}
/**
* Get entity (e.g. node, peer association, child association) type.
*
* @return QName the peer association type
*/
public abstract QName getEntityType();
/**
* Whether the entity has been created and then deleted, e.g. a temporary node.
*
* @return {@code true} if the node has been created and then deleted, otherwise false
*/
public abstract boolean isTemporaryEntity();
/**
* Get a derived event for a transaction.
*
* @return a derived event type
*/
protected abstract EventType getDerivedEvent();
/**
* Get event types.
*
* @return Deque<EventType> queue of event types
*/
public Deque<EventType> getEventTypes()
{
return eventTypes;
} }
/** /**
@@ -87,424 +93,30 @@ public class EventConsolidator implements EventSupportedPolicies
* @param eventInfo the object holding the event information * @param eventInfo the object holding the event information
* @return the {@link RepoEvent} instance * @return the {@link RepoEvent} instance
*/ */
public RepoEvent<DataAttributes<NodeResource>> getRepoEvent(EventInfo eventInfo) public RepoEvent<DataAttributes<RES>> getRepoEvent(EventInfo eventInfo)
{ {
NodeResource resource = buildNodeResource();
EventType eventType = getDerivedEvent(); EventType eventType = getDerivedEvent();
DataAttributes<NodeResource> eventData = buildEventData(eventInfo, resource, eventType); DataAttributes<RES> eventData = buildEventData(eventInfo, resource, eventType);
return RepoEvent.<DataAttributes<NodeResource>>builder() return RepoEvent.<DataAttributes<RES>>builder()
.setId(eventInfo.getId()) .setId(eventInfo.getId())
.setSource(eventInfo.getSource()) .setSource(eventInfo.getSource())
.setTime(eventInfo.getTimestamp()) .setTime(eventInfo.getTimestamp())
.setType(eventType.getType()) .setType(eventType.getType())
.setData(eventData) .setData(eventData)
.setDataschema(EventJSONSchema.getSchemaV1(eventType)) .setDataschema(EventJSONSchema.getSchemaV1(eventType))
.build(); .build();
}
protected DataAttributes<NodeResource> buildEventData(EventInfo eventInfo, NodeResource resource, EventType eventType)
{
EventData.Builder<NodeResource> eventDataBuilder = EventData.<NodeResource>builder()
.setEventGroupId(eventInfo.getTxnId())
.setResource(resource);
if (eventType == EventType.NODE_UPDATED)
{
eventDataBuilder.setResourceBefore(buildNodeResourceBeforeDelta(resource));
}
return eventDataBuilder.build();
} }
/** /**
* Creates a builder instance if absent or {@code forceUpdate} is requested. * Provides primary event data.
* It also, sets the required fields.
*
* @param nodeRef the nodeRef in the txn
* @param forceUpdate if {@code true}, will get the latest node info and ignores
* the existing builder object.
*/ */
protected void createBuilderIfAbsent(NodeRef nodeRef, boolean forceUpdate) protected DataAttributes<RES> buildEventData(EventInfo eventInfo, RES resource, EventType eventType)
{ {
if (resourceBuilder == null || forceUpdate) return EventData.<RES>builder()
{ .setEventGroupId(eventInfo.getTxnId())
this.resourceBuilder = helper.createNodeResourceBuilder(nodeRef); .setResource(resource)
this.nodeRef = nodeRef; .build();
this.nodeType = helper.getNodeType(nodeRef);
}
} }
/**
* Creates a builder instance if absent, and sets the required fields.
*
* @param nodeRef the nodeRef in the txn
*/
protected void createBuilderIfAbsent(NodeRef nodeRef)
{
createBuilderIfAbsent(nodeRef, false);
}
@Override
public void onCreateNode(ChildAssociationRef childAssocRef)
{
eventTypes.add(EventType.NODE_CREATED);
NodeRef nodeRef = childAssocRef.getChildRef();
createBuilderIfAbsent(nodeRef);
// Sometimes onCreateNode policy is out of order
this.propertiesBefore = null;
setBeforeProperties(Collections.emptyMap());
setAfterProperties(helper.getProperties(nodeRef));
}
@Override
public void onMoveNode(ChildAssociationRef oldChildAssocRef, ChildAssociationRef newChildAssocRef)
{
eventTypes.add(EventType.NODE_UPDATED);
createBuilderIfAbsent(newChildAssocRef.getChildRef());
setBeforePrimaryHierarchy(helper.getPrimaryHierarchy(oldChildAssocRef.getParentRef(), true));
}
@Override
public void onSetNodeType(NodeRef nodeRef, QName before, QName after)
{
eventTypes.add(EventType.NODE_UPDATED);
nodeTypeBefore = before;
createBuilderIfAbsent(nodeRef);
}
@Override
public void onUpdateProperties(NodeRef nodeRef, Map<QName, Serializable> before, Map<QName, Serializable> after)
{
eventTypes.add(EventType.NODE_UPDATED);
// Sometime we don't get the 'before', so just use the latest
if (before.isEmpty() && this.propertiesAfter != null)
{
before = this.propertiesAfter;
}
createBuilderIfAbsent(nodeRef);
setBeforeProperties(before);
setAfterProperties(after);
}
@Override
public void beforeDeleteNode(NodeRef nodeRef)
{
eventTypes.add(EventType.NODE_DELETED);
createBuilderIfAbsent(nodeRef, false);
}
@Override
public void onAddAspect(NodeRef nodeRef, QName aspectTypeQName)
{
eventTypes.add(EventType.NODE_UPDATED);
addAspect(aspectTypeQName);
createBuilderIfAbsent(nodeRef);
}
void addAspect(QName aspectTypeQName)
{
if (aspectsRemoved.contains(aspectTypeQName))
{
aspectsRemoved.remove(aspectTypeQName);
}
else
{
aspectsAdded.add(aspectTypeQName);
}
}
@Override
public void onRemoveAspect(NodeRef nodeRef, QName aspectTypeQName)
{
eventTypes.add(EventType.NODE_UPDATED);
removeAspect(aspectTypeQName);
createBuilderIfAbsent(nodeRef);
}
void removeAspect(QName aspectTypeQName)
{
if (aspectsAdded.contains(aspectTypeQName))
{
aspectsAdded.remove(aspectTypeQName);
}
else
{
aspectsRemoved.add(aspectTypeQName);
}
}
private void setAfterProperties(Map<QName, Serializable> after)
{
propertiesAfter = after;
}
private void setBeforeProperties(Map<QName, Serializable> before)
{
// Don't overwrite the original value if there are multiple calls.
if (propertiesBefore == null)
{
propertiesBefore = before;
}
}
private void setBeforePrimaryHierarchy(List<String> before)
{
// Don't overwrite the original value if there are multiple calls.
if (primaryHierarchyBefore == null)
{
primaryHierarchyBefore = before;
}
}
private NodeResource buildNodeResource()
{
if (resourceBuilder == null)
{
return null;
}
if (eventTypes.getLast() != EventType.NODE_DELETED)
{
// Check the node still exists.
// This could happen in tests where a node is deleted before the afterCommit code is
// executed (For example, see ThumbnailServiceImplTest#testIfNodesExistsAfterCreateThumbnail).
if (helper.nodeExists(nodeRef))
{
// We are setting the details at the end of the Txn by getting the latest info
createBuilderIfAbsent(nodeRef, true);
}
}
// Now create an instance of NodeResource
return resourceBuilder.build();
}
protected NodeResource buildNodeResourceBeforeDelta(NodeResource after)
{
if (after == null)
{
return null;
}
Builder builder = NodeResource.builder();
ZonedDateTime modifiedAt = null;
Map<QName, Serializable> changedPropsBefore = getBeforeMapChanges(propertiesBefore, propertiesAfter);
if (!changedPropsBefore.isEmpty())
{
// Set only the changed properties
Map<String, Serializable> mappedProps = helper.mapToNodeProperties(changedPropsBefore);
if (!mappedProps.isEmpty())
{
builder.setProperties(mappedProps);
resourceBeforeAllFieldsNull = false;
}
Map<String, Map<String, String>> localizedProps =helper.getLocalizedPropertiesBefore(changedPropsBefore, after);
if (!localizedProps.isEmpty())
{
builder.setLocalizedProperties(localizedProps);
resourceBeforeAllFieldsNull = false;
}
String name = (String) changedPropsBefore.get(ContentModel.PROP_NAME);
if (name != null)
{
builder.setName(name);
resourceBeforeAllFieldsNull = false;
}
ContentInfo contentInfo = helper.getContentInfo(changedPropsBefore);
if (contentInfo != null)
{
builder.setContent(contentInfo);
resourceBeforeAllFieldsNull = false;
}
UserInfo modifier = helper.getUserInfo((String) changedPropsBefore.get(ContentModel.PROP_MODIFIER));
if (modifier != null)
{
builder.setModifiedByUser(modifier);
resourceBeforeAllFieldsNull = false;
}
modifiedAt =
helper.getZonedDateTime((Date) changedPropsBefore.get(ContentModel.PROP_MODIFIED));
}
// Handle case where the content does not exist on the propertiesBefore
if (propertiesBefore != null && !propertiesBefore.containsKey(ContentModel.PROP_CONTENT) &&
propertiesAfter != null && propertiesAfter.containsKey(ContentModel.PROP_CONTENT))
{
builder.setContent(new ContentInfo());
resourceBeforeAllFieldsNull = false;
}
Set<String> aspectsBefore = getMappedAspectsBefore(after.getAspectNames());
if (!aspectsBefore.isEmpty())
{
builder.setAspectNames(aspectsBefore);
resourceBeforeAllFieldsNull = false;
}
if (primaryHierarchyBefore != null && !primaryHierarchyBefore.isEmpty())
{
builder.setPrimaryHierarchy(primaryHierarchyBefore);
resourceBeforeAllFieldsNull = false;
}
if (nodeTypeBefore != null)
{
builder.setNodeType(helper.getQNamePrefixString(nodeTypeBefore));
resourceBeforeAllFieldsNull = false;
}
// Only set modifiedAt if one of the other fields is also not null
if (modifiedAt != null && !resourceBeforeAllFieldsNull)
{
builder.setModifiedAt(modifiedAt);
}
return builder.build();
}
Set<String> getMappedAspectsBefore(Set<String> currentAspects)
{
if (currentAspects == null)
{
currentAspects = Collections.emptySet();
}
if (hasChangedAspect())
{
Set<String> removed = helper.mapToNodeAspects(aspectsRemoved);
Set<String> added = helper.mapToNodeAspects(aspectsAdded);
Set<String> before = new HashSet<>();
if (!removed.isEmpty() || !added.isEmpty())
{
before = new HashSet<>(currentAspects);
if (!removed.isEmpty())
{
// Add all the removed aspects from the current list
before.addAll(removed);
}
if (!added.isEmpty())
{
// Remove all the added aspects from the current list
before.removeAll(added);
}
}
return before;
}
return Collections.emptySet();
}
private boolean hasChangedAspect()
{
if ((aspectsRemoved.isEmpty() && aspectsAdded.isEmpty()) ||
org.apache.commons.collections.CollectionUtils.isEqualCollection(aspectsAdded, aspectsRemoved))
{
return false;
}
return true;
}
private <K, V> Map<K, V> getBeforeMapChanges(Map<K, V> before, Map<K, V> after)
{
if (before == null)
{
return Collections.emptyMap();
}
if (after == null)
{
after = Collections.emptyMap();
}
// Get before values that changed
Map<K, V> beforeDelta = new HashMap<>(before);
Map<K, V> afterDelta = new HashMap<>(after);
beforeDelta.entrySet().removeAll(after.entrySet());
// Add nulls for before properties
Set<K> beforeKeys = before.keySet();
Set<K> newKeys = afterDelta.keySet();
newKeys.removeAll(beforeKeys);
for (K key : newKeys)
{
beforeDelta.put(key, null);
}
return beforeDelta;
}
/**
* @return a derived event for a transaction.
*/
private EventType getDerivedEvent()
{
if (isTemporaryNode())
{
// This event will be filtered out, but we set the correct
// event type anyway for debugging purposes
return EventType.NODE_DELETED;
}
else if (eventTypes.contains(EventType.NODE_CREATED))
{
return EventType.NODE_CREATED;
}
else if (eventTypes.getLast() == EventType.NODE_DELETED)
{
return EventType.NODE_DELETED;
}
else
{
// Default to first event
return eventTypes.getFirst();
}
}
/**
* Whether or not the node has been created and then deleted, i.e. a temporary node.
*
* @return {@code true} if the node has been created and then deleted, otherwise false
*/
public boolean isTemporaryNode()
{
return eventTypes.contains(EventType.NODE_CREATED) && eventTypes.getLast() == EventType.NODE_DELETED;
}
public QName getNodeType()
{
return nodeType;
}
public Deque<EventType> getEventTypes()
{
return eventTypes;
}
public List<QName> getAspectsAdded()
{
return aspectsAdded;
}
public List<QName> getAspectsRemoved()
{
return aspectsRemoved;
}
public boolean isResourceBeforeAllFieldsNull()
{
return resourceBeforeAllFieldsNull;
}
protected void setResourceBeforeAllFieldsNull(boolean resourceBeforeAllFieldsNull){
this.resourceBeforeAllFieldsNull = resourceBeforeAllFieldsNull;
}
} }

View File

@@ -2,7 +2,7 @@
* #%L * #%L
* Alfresco Repository * Alfresco Repository
* %% * %%
* Copyright (C) 2005 - 2020 Alfresco Software Limited * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %% * %%
* This file is part of the Alfresco software. * This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of * If the software was purchased under a paid Alfresco license, the terms of
@@ -34,13 +34,16 @@ import java.util.Deque;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import org.alfresco.repo.domain.node.NodeDAO; import org.alfresco.repo.domain.node.NodeDAO;
import org.alfresco.repo.domain.node.TransactionEntity; import org.alfresco.repo.domain.node.TransactionEntity;
import org.alfresco.repo.event.v1.model.DataAttributes;
import org.alfresco.repo.event.v1.model.EventType; import org.alfresco.repo.event.v1.model.EventType;
import org.alfresco.repo.event.v1.model.RepoEvent; import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.repo.event.v1.model.Resource;
import org.alfresco.repo.event2.filter.ChildAssociationTypeFilter; import org.alfresco.repo.event2.filter.ChildAssociationTypeFilter;
import org.alfresco.repo.event2.filter.EventFilterRegistry; import org.alfresco.repo.event2.filter.EventFilterRegistry;
import org.alfresco.repo.event2.filter.EventUserFilter; import org.alfresco.repo.event2.filter.EventUserFilter;
@@ -63,9 +66,11 @@ import org.alfresco.repo.policy.PolicyComponent;
import org.alfresco.repo.policy.ServiceBehaviourBinding; import org.alfresco.repo.policy.ServiceBehaviourBinding;
import org.alfresco.repo.security.authentication.AuthenticationUtil; import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport; import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.cmr.dictionary.DictionaryService; import org.alfresco.service.cmr.dictionary.DictionaryService;
import org.alfresco.service.cmr.repository.AssociationRef; import org.alfresco.service.cmr.repository.AssociationRef;
import org.alfresco.service.cmr.repository.ChildAssociationRef; import org.alfresco.service.cmr.repository.ChildAssociationRef;
import org.alfresco.service.cmr.repository.EntityRef;
import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService; import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.security.PersonService; import org.alfresco.service.cmr.security.PersonService;
@@ -74,6 +79,7 @@ import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName; import org.alfresco.service.namespace.QName;
import org.alfresco.service.transaction.TransactionService; import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.PropertyCheck; import org.alfresco.util.PropertyCheck;
import org.alfresco.util.TriPredicate;
import org.alfresco.util.transaction.TransactionListenerAdapter; import org.alfresco.util.transaction.TransactionListenerAdapter;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@@ -102,8 +108,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
private PersonService personService; private PersonService personService;
protected NodeResourceHelper nodeResourceHelper; protected NodeResourceHelper nodeResourceHelper;
protected NodeDAO nodeDAO; protected NodeDAO nodeDAO;
private EventSender eventSender;
private EventGeneratorQueue eventGeneratorQueue;
private NodeTypeFilter nodeTypeFilter; private NodeTypeFilter nodeTypeFilter;
private ChildAssociationTypeFilter childAssociationTypeFilter; private ChildAssociationTypeFilter childAssociationTypeFilter;
private EventUserFilter userFilter; private EventUserFilter userFilter;
@@ -139,7 +144,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
PropertyCheck.mandatory(this, "personService", personService); PropertyCheck.mandatory(this, "personService", personService);
PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper); PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper);
PropertyCheck.mandatory(this, "nodeDAO", nodeDAO); PropertyCheck.mandatory(this, "nodeDAO", nodeDAO);
PropertyCheck.mandatory(this, "eventGeneratorQueue", eventGeneratorQueue); PropertyCheck.mandatory(this, "eventSender", eventSender);
this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter(); this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter();
this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter(); this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter();
@@ -188,7 +193,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
if (behaviours == null) if (behaviours == null)
{ {
behaviours = new HashSet<Behaviour>(); behaviours = new HashSet<>();
afterPropertiesSet(); afterPropertiesSet();
bindBehaviours(); bindBehaviours();
@@ -230,8 +235,6 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
this.descriptorService = descriptorService; this.descriptorService = descriptorService;
} }
// To make IntelliJ stop complaining about unused method!
@SuppressWarnings("unused")
public void setEventFilterRegistry(EventFilterRegistry eventFilterRegistry) public void setEventFilterRegistry(EventFilterRegistry eventFilterRegistry)
{ {
this.eventFilterRegistry = eventFilterRegistry; this.eventFilterRegistry = eventFilterRegistry;
@@ -252,9 +255,14 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
this.nodeResourceHelper = nodeResourceHelper; this.nodeResourceHelper = nodeResourceHelper;
} }
public void setEventGeneratorQueue(EventGeneratorQueue eventGeneratorQueue) public void setEventSender(EventSender eventSender)
{ {
this.eventGeneratorQueue = eventGeneratorQueue; this.eventSender = eventSender;
}
public EventSender getEventSender()
{
return eventSender;
} }
@Override @Override
@@ -323,9 +331,9 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
getEventConsolidator(associationRef).beforeDeleteAssociation(associationRef); getEventConsolidator(associationRef).beforeDeleteAssociation(associationRef);
} }
protected EventConsolidator createEventConsolidator() protected NodeEventConsolidator createEventConsolidator()
{ {
return new EventConsolidator(nodeResourceHelper); return new NodeEventConsolidator(nodeResourceHelper);
} }
protected ChildAssociationEventConsolidator createChildAssociationEventConsolidator( protected ChildAssociationEventConsolidator createChildAssociationEventConsolidator(
@@ -370,13 +378,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
disableBehaviours(behaviours); disableBehaviours(behaviours);
} }
protected void disableBehaviours(Set<Behaviour> bindedBehaviours) protected void disableBehaviours(Set<Behaviour> boundBehaviours)
{ {
if (bindedBehaviours != null) if (boundBehaviours != null)
{ {
bindedBehaviours.forEach(behaviour -> { boundBehaviours.forEach(Behaviour::disable);
behaviour.disable();
});
} }
} }
@@ -385,30 +391,28 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
enableBehaviours(behaviours); enableBehaviours(behaviours);
} }
protected void enableBehaviours(Set<Behaviour> bindedBehaviours) protected void enableBehaviours(Set<Behaviour> boundBehaviours)
{ {
if (bindedBehaviours != null) if (boundBehaviours != null)
{ {
bindedBehaviours.forEach(behaviour -> { boundBehaviours.forEach(Behaviour::enable);
behaviour.enable();
});
} }
} }
/** /**
* @return the {@link EventConsolidator} for the supplied {@code nodeRef} from * @return the {@link NodeEventConsolidator} for the supplied {@code nodeRef} from
* the current transaction context. * the current transaction context.
*/ */
protected EventConsolidator getEventConsolidator(NodeRef nodeRef) protected NodeEventConsolidator getEventConsolidator(NodeRef nodeRef)
{ {
Consolidators consolidators = getTxnConsolidators(transactionListener); Consolidators consolidators = getTxnConsolidators(transactionListener);
Map<NodeRef, EventConsolidator> nodeEvents = consolidators.getNodes(); Map<NodeRef, NodeEventConsolidator> nodeEvents = consolidators.getNodes();
if (nodeEvents.isEmpty()) if (nodeEvents.isEmpty())
{ {
AlfrescoTransactionSupport.bindListener(transactionListener); AlfrescoTransactionSupport.bindListener(transactionListener);
} }
EventConsolidator eventConsolidator = nodeEvents.get(nodeRef); NodeEventConsolidator eventConsolidator = nodeEvents.get(nodeRef);
if (eventConsolidator == null) if (eventConsolidator == null)
{ {
eventConsolidator = createEventConsolidator(); eventConsolidator = createEventConsolidator();
@@ -430,7 +434,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
} }
/** /**
* @return the {@link EventConsolidator} for the supplied {@code childAssociationRef} from * @return the {@link ChildAssociationEventConsolidator} for the supplied {@code childAssociationRef} from
* the current transaction context. * the current transaction context.
*/ */
private ChildAssociationEventConsolidator getEventConsolidator(ChildAssociationRef childAssociationRef) private ChildAssociationEventConsolidator getEventConsolidator(ChildAssociationRef childAssociationRef)
@@ -452,7 +456,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
} }
/** /**
* @return the {@link EventConsolidator} for the supplied {@code peerAssociationRef} from * @return the {@link PeerAssociationEventConsolidator} for the supplied {@code peerAssociationRef} from
* the current transaction context. * the current transaction context.
*/ */
private PeerAssociationEventConsolidator getEventConsolidator(AssociationRef peerAssociationRef) private PeerAssociationEventConsolidator getEventConsolidator(AssociationRef peerAssociationRef)
@@ -506,7 +510,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
{ {
return; return;
} }
behaviours = new HashSet<Behaviour>(); behaviours = new HashSet<>();
bindBehaviours(); bindBehaviours();
} }
@@ -521,32 +525,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
@Override @Override
public void afterCommit() public void afterCommit()
{ {
if(isTransactionCommitted()) if (isTransactionCommitted())
{ {
try try
{ {
final Consolidators consolidators = getTxnConsolidators(this); sendEvents();
// Node events
for (Map.Entry<NodeRef, EventConsolidator> entry : consolidators.getNodes().entrySet())
{
EventConsolidator eventConsolidator = entry.getValue();
sendEvent(entry.getKey(), eventConsolidator);
}
// Child assoc events
for (Map.Entry<ChildAssociationRef, ChildAssociationEventConsolidator> entry : consolidators.getChildAssocs().entrySet())
{
ChildAssociationEventConsolidator eventConsolidator = entry.getValue();
sendEvent(entry.getKey(), eventConsolidator);
}
// Peer assoc events
for (Map.Entry<AssociationRef, PeerAssociationEventConsolidator> entry : consolidators.getPeerAssocs().entrySet())
{
PeerAssociationEventConsolidator eventConsolidator = entry.getValue();
sendEvent(entry.getKey(), eventConsolidator);
}
} }
catch (Exception e) catch (Exception e)
{ {
@@ -556,12 +539,6 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
} }
} }
protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator)
{
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
eventGeneratorQueue.accept(()-> createEvent(nodeRef, consolidator, eventInfo));
}
/** /**
* @return true if a node transaction is not only active, but also committed with modifications. * @return true if a node transaction is not only active, but also committed with modifications.
* This means that a {@link TransactionEntity} object was created. * This means that a {@link TransactionEntity} object was created.
@@ -571,115 +548,154 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
return nodeDAO.getCurrentTransactionCommitTime() != null; return nodeDAO.getCurrentTransactionCommitTime() != null;
} }
private RepoEvent<?> createEvent(NodeRef nodeRef, EventConsolidator consolidator, EventInfo eventInfo) private void sendEvents()
{ {
String user = eventInfo.getPrincipal(); final Consolidators consolidators = getTxnConsolidators(this);
if (consolidator.isTemporaryNode()) // Node events
for (Map.Entry<NodeRef, NodeEventConsolidator> entry : consolidators.getNodes().entrySet())
{ {
if (LOGGER.isTraceEnabled()) sendEvent(entry.getKey(), entry.getValue());
{
LOGGER.trace("Ignoring temporary node: " + nodeRef);
}
return null;
} }
// Get the repo event before the filtering, // Child assoc events
// so we can take the latest node info into account for (Map.Entry<ChildAssociationRef, ChildAssociationEventConsolidator> entry : consolidators.getChildAssocs().entrySet())
final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo);
final QName nodeType = consolidator.getNodeType();
if (isFiltered(nodeType, user))
{ {
if (LOGGER.isTraceEnabled()) sendEvent(entry.getKey(), entry.getValue());
{
LOGGER.trace("EventFilter - Excluding node: '" + nodeRef + "' of type: '"
+ ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString())
+ "' created by: " + user);
}
return null;
} }
if (event.getType().equals(EventType.NODE_UPDATED.getType()) && consolidator.isResourceBeforeAllFieldsNull()) // Peer assoc events
for (Map.Entry<AssociationRef, PeerAssociationEventConsolidator> entry : consolidators.getPeerAssocs().entrySet())
{ {
if (LOGGER.isTraceEnabled()) sendEvent(entry.getKey(), entry.getValue());
{
LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeRef);
}
return null;
} }
}
logEvent(event, consolidator.getEventTypes()); protected void sendEvent(NodeRef nodeRef, NodeEventConsolidator consolidator)
return event; {
sendEvent(nodeRef, consolidator, nodeToEventEligibilityVerifier());
} }
protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator) protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
{ {
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); sendEvent(childAssociationRef, consolidator, childAssociationToEventEligibilityVerifier());
eventGeneratorQueue.accept(()-> createEvent(eventInfo, childAssociationRef, consolidator));
}
private RepoEvent<?> createEvent(EventInfo eventInfo, ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
{
String user = eventInfo.getPrincipal();
if (consolidator.isTemporaryChildAssociation())
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("Ignoring temporary child association: " + childAssociationRef);
}
return null;
}
// Get the repo event before the filtering,
// so we can take the latest association info into account
final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo);
final QName childAssocType = consolidator.getChildAssocType();
if (isFilteredChildAssociation(childAssocType, user))
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("EventFilter - Excluding child association: '" + childAssociationRef + "' of type: '"
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
+ "' created by: " + user);
}
return null;
} else if (childAssociationRef.isPrimary())
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("EventFilter - Excluding primary child association: '" + childAssociationRef + "' of type: '"
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
+ "' created by: " + user);
}
return null;
}
logEvent(event, consolidator.getEventTypes());
return event;
} }
protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator) protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
{ {
EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); sendEvent(peerAssociationRef, consolidator, null);
eventGeneratorQueue.accept(()-> createEvent(eventInfo, peerAssociationRef, consolidator));
} }
private RepoEvent<?> createEvent(EventInfo eventInfo, AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator) /**
* Handles all kinds of events and sends them within dedicated transaction.
*
* @param entityReference - reference to an entity (e.g. node, child association, peer association)
* @param eventConsolidator - object encapsulating events occurred in a transaction
* @param entityToEventEligibilityVerifier - allows to verify if entity is eligible to generate an even. If null no verification is necessary
* @param <REF> - entity reference type (e.g. {@link NodeRef}, {@link AssociationRef}, {@link ChildAssociationRef})
* @param <CON> - event consolidator type - extension of {@link EventConsolidator}
*/
private <REF extends EntityRef, CON extends EventConsolidator<REF, ? extends Resource>> void sendEvent(
final REF entityReference, final CON eventConsolidator, final TriPredicate<REF, CON, EventInfo> entityToEventEligibilityVerifier)
{ {
if (consolidator.isTemporaryPeerAssociation()) final EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
transactionService.getRetryingTransactionHelper().doInTransaction((RetryingTransactionCallback<Void>) () -> {
eventSender.accept(() -> createEvent(entityReference, eventConsolidator, eventInfo, entityToEventEligibilityVerifier));
return null;
}, true, true);
}
/**
* Creates events from various kinds of entities.
*
* @param entityReference - reference to an entity (e.g. node, child association, peer association)
* @param eventConsolidator - object encapsulating events occurred in a transaction
* @param eventInfo - object holding the event information
* @param entityToEventEligibilityVerifier - allows to verify if entity is eligible to generate an even. If null no verification is necessary
* @param <REF> - entity reference type (e.g. {@link NodeRef}, {@link AssociationRef}, {@link ChildAssociationRef})
* @param <CON> - event consolidator type - extension of {@link EventConsolidator}
*/
private <REF extends EntityRef, CON extends EventConsolidator<REF, ? extends Resource>> Optional<RepoEvent<?>> createEvent(
final REF entityReference, final CON eventConsolidator, final EventInfo eventInfo,
final TriPredicate<REF, CON, EventInfo> entityToEventEligibilityVerifier)
{
if (eventConsolidator.isTemporaryEntity())
{ {
if (LOGGER.isTraceEnabled()) if (LOGGER.isTraceEnabled())
{ {
LOGGER.trace("Ignoring temporary peer association: " + peerAssociationRef); LOGGER.trace("Ignoring temporary entity: " + entityReference);
} }
return null; return Optional.empty();
} }
RepoEvent<?> event = consolidator.getRepoEvent(eventInfo); // get the repo event before verifying entity eligibility to generate event, so we can take the latest node info into account
logEvent(event, consolidator.getEventTypes()); final RepoEvent<? extends DataAttributes<? extends Resource>> event = eventConsolidator.getRepoEvent(eventInfo);
return event;
// verify if entity is eligible to generate an event
if (entityToEventEligibilityVerifier != null && !entityToEventEligibilityVerifier.test(entityReference, eventConsolidator, eventInfo))
{
return Optional.empty();
}
logEvent(event, eventConsolidator.getEventTypes());
return Optional.of(event);
}
private TriPredicate<NodeRef, NodeEventConsolidator, EventInfo> nodeToEventEligibilityVerifier()
{
return (nodeReference, eventConsolidator, eventInfo) -> {
final String user = eventInfo.getPrincipal();
final QName nodeType = eventConsolidator.getEntityType();
if (isFiltered(nodeType, user))
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("EventFilter - Excluding node: '" + nodeReference + "' of type: '"
+ ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString())
+ "' created by: " + user);
}
return false;
}
if (eventConsolidator.isEventTypeEqualTo(EventType.NODE_UPDATED) && eventConsolidator.isResourceBeforeAllFieldsNull())
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeReference);
}
return false;
}
return true;
};
}
private TriPredicate<ChildAssociationRef, ChildAssociationEventConsolidator, EventInfo> childAssociationToEventEligibilityVerifier()
{
return (childAssociationReference, eventConsolidator, eventInfo) -> {
final String user = eventInfo.getPrincipal();
final QName childAssocType = eventConsolidator.getEntityType();
if (isFilteredChildAssociation(childAssocType, user))
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("EventFilter - Excluding child association: '" + childAssociationReference + "' of type: '"
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
+ "' created by: " + user);
}
return false;
}
else if (childAssociationReference.isPrimary())
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("EventFilter - Excluding primary child association: '" + childAssociationReference + "' of type: '"
+ ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
+ "' created by: " + user);
}
return false;
}
return true;
};
} }
private void logEvent(RepoEvent<?> event, Deque<EventType> listOfEvents) private void logEvent(RepoEvent<?> event, Deque<EventType> listOfEvents)
@@ -692,14 +708,13 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
} }
} }
protected static class Consolidators protected static class Consolidators
{ {
private Map<NodeRef, EventConsolidator> nodes; private Map<NodeRef, NodeEventConsolidator> nodes;
private Map<ChildAssociationRef, ChildAssociationEventConsolidator> childAssocs; private Map<ChildAssociationRef, ChildAssociationEventConsolidator> childAssocs;
private Map<AssociationRef, PeerAssociationEventConsolidator> peerAssocs; private Map<AssociationRef, PeerAssociationEventConsolidator> peerAssocs;
public Map<NodeRef, EventConsolidator> getNodes() public Map<NodeRef, NodeEventConsolidator> getNodes()
{ {
if (nodes == null) if (nodes == null)
{ {

View File

@@ -0,0 +1,43 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.event2;
import java.util.Optional;
import java.util.concurrent.Callable;
import org.alfresco.repo.event.v1.model.RepoEvent;
/**
* Interface representing an asynchronous event sender.
*/
public interface EventSender
{
/**
* Accepts a callback function creating an event and sends this event to specified destination.
* @param eventProducer - callback function that creates an event
*/
void accept(Callable<Optional<RepoEvent<?>>> eventProducer);
}

View File

@@ -0,0 +1,481 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.event2;
import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.event.v1.model.ContentInfo;
import org.alfresco.repo.event.v1.model.DataAttributes;
import org.alfresco.repo.event.v1.model.EventData;
import org.alfresco.repo.event.v1.model.EventType;
import org.alfresco.repo.event.v1.model.NodeResource;
import org.alfresco.repo.event.v1.model.NodeResource.Builder;
import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.repo.event.v1.model.UserInfo;
import org.alfresco.service.cmr.repository.ChildAssociationRef;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.namespace.QName;
/**
* Encapsulates node events occurred in a single transaction.
*
* @author Jamal Kaabi-Mofrad
*/
public class NodeEventConsolidator extends EventConsolidator<NodeRef, NodeResource> implements EventSupportedPolicies
{
private final List<QName> aspectsAdded;
private final List<QName> aspectsRemoved;
private NodeResource.Builder resourceBuilder;
private Map<QName, Serializable> propertiesBefore;
private Map<QName, Serializable> propertiesAfter;
private QName nodeType;
private QName nodeTypeBefore;
private List<String> primaryHierarchyBefore;
private boolean resourceBeforeAllFieldsNull = true;
public NodeEventConsolidator(NodeResourceHelper nodeResourceHelper)
{
super(null, nodeResourceHelper);
this.aspectsAdded = new ArrayList<>();
this.aspectsRemoved = new ArrayList<>();
}
@Override
public RepoEvent<DataAttributes<NodeResource>> getRepoEvent(EventInfo eventInfo)
{
resource = buildNodeResource();
return super.getRepoEvent(eventInfo);
}
@Override
protected DataAttributes<NodeResource> buildEventData(EventInfo eventInfo, NodeResource resource, EventType eventType)
{
EventData.Builder<NodeResource> eventDataBuilder = EventData.<NodeResource>builder()
.setEventGroupId(eventInfo.getTxnId())
.setResource(resource);
if (eventType == EventType.NODE_UPDATED)
{
eventDataBuilder.setResourceBefore(buildNodeResourceBeforeDelta(resource));
}
return eventDataBuilder.build();
}
/**
* Creates a builder instance if absent or {@code forceUpdate} is requested.
* It also, sets the required fields.
*
* @param nodeRef the nodeRef in the txn
* @param forceUpdate if {@code true}, will get the latest node info and ignores
* the existing builder object.
*/
protected void createBuilderIfAbsent(NodeRef nodeRef, boolean forceUpdate)
{
if (resourceBuilder == null || forceUpdate)
{
this.resourceBuilder = helper.createNodeResourceBuilder(nodeRef);
this.entityReference = nodeRef;
this.nodeType = helper.getNodeType(nodeRef);
}
}
/**
* Creates a builder instance if absent, and sets the required fields.
*
* @param nodeRef the nodeRef in the txn
*/
protected void createBuilderIfAbsent(NodeRef nodeRef)
{
createBuilderIfAbsent(nodeRef, false);
}
@Override
public void onCreateNode(ChildAssociationRef childAssocRef)
{
eventTypes.add(EventType.NODE_CREATED);
NodeRef nodeRef = childAssocRef.getChildRef();
createBuilderIfAbsent(nodeRef);
// Sometimes onCreateNode policy is out of order
this.propertiesBefore = null;
setBeforeProperties(Collections.emptyMap());
setAfterProperties(helper.getProperties(nodeRef));
}
@Override
public void onMoveNode(ChildAssociationRef oldChildAssocRef, ChildAssociationRef newChildAssocRef)
{
eventTypes.add(EventType.NODE_UPDATED);
createBuilderIfAbsent(newChildAssocRef.getChildRef());
setBeforePrimaryHierarchy(helper.getPrimaryHierarchy(oldChildAssocRef.getParentRef(), true));
}
@Override
public void onSetNodeType(NodeRef nodeRef, QName before, QName after)
{
eventTypes.add(EventType.NODE_UPDATED);
nodeTypeBefore = before;
createBuilderIfAbsent(nodeRef);
}
@Override
public void onUpdateProperties(NodeRef nodeRef, Map<QName, Serializable> before, Map<QName, Serializable> after)
{
eventTypes.add(EventType.NODE_UPDATED);
// Sometimes we don't get the 'before', so just use the latest
if (before.isEmpty() && this.propertiesAfter != null)
{
before = this.propertiesAfter;
}
createBuilderIfAbsent(nodeRef);
setBeforeProperties(before);
setAfterProperties(after);
}
@Override
public void beforeDeleteNode(NodeRef nodeRef)
{
eventTypes.add(EventType.NODE_DELETED);
createBuilderIfAbsent(nodeRef, false);
}
@Override
public void onAddAspect(NodeRef nodeRef, QName aspectTypeQName)
{
eventTypes.add(EventType.NODE_UPDATED);
addAspect(aspectTypeQName);
createBuilderIfAbsent(nodeRef);
}
void addAspect(QName aspectTypeQName)
{
if (aspectsRemoved.contains(aspectTypeQName))
{
aspectsRemoved.remove(aspectTypeQName);
}
else
{
aspectsAdded.add(aspectTypeQName);
}
}
@Override
public void onRemoveAspect(NodeRef nodeRef, QName aspectTypeQName)
{
eventTypes.add(EventType.NODE_UPDATED);
removeAspect(aspectTypeQName);
createBuilderIfAbsent(nodeRef);
}
void removeAspect(QName aspectTypeQName)
{
if (aspectsAdded.contains(aspectTypeQName))
{
aspectsAdded.remove(aspectTypeQName);
}
else
{
aspectsRemoved.add(aspectTypeQName);
}
}
private void setAfterProperties(Map<QName, Serializable> after)
{
propertiesAfter = after;
}
private void setBeforeProperties(Map<QName, Serializable> before)
{
// Don't overwrite the original value if there are multiple calls.
if (propertiesBefore == null)
{
propertiesBefore = before;
}
}
private void setBeforePrimaryHierarchy(List<String> before)
{
// Don't overwrite the original value if there are multiple calls.
if (primaryHierarchyBefore == null)
{
primaryHierarchyBefore = before;
}
}
private NodeResource buildNodeResource()
{
if (resourceBuilder == null)
{
return null;
}
if (eventTypes.getLast() != EventType.NODE_DELETED)
{
// Check the node still exists.
// This could happen in tests where a node is deleted before the afterCommit code is
// executed (For example, see ThumbnailServiceImplTest#testIfNodesExistsAfterCreateThumbnail).
if (helper.nodeExists(entityReference))
{
// We are setting the details at the end of the Txn by getting the latest info
createBuilderIfAbsent(entityReference, true);
}
}
// Now create an instance of NodeResource
return resourceBuilder.build();
}
protected NodeResource buildNodeResourceBeforeDelta(NodeResource after)
{
if (after == null)
{
return null;
}
Builder builder = NodeResource.builder();
ZonedDateTime modifiedAt = null;
Map<QName, Serializable> changedPropsBefore = getBeforeMapChanges(propertiesBefore, propertiesAfter);
if (!changedPropsBefore.isEmpty())
{
// Set only the changed properties
Map<String, Serializable> mappedProps = helper.mapToNodeProperties(changedPropsBefore);
if (!mappedProps.isEmpty())
{
builder.setProperties(mappedProps);
resourceBeforeAllFieldsNull = false;
}
Map<String, Map<String, String>> localizedProps =helper.getLocalizedPropertiesBefore(changedPropsBefore, after);
if (!localizedProps.isEmpty())
{
builder.setLocalizedProperties(localizedProps);
resourceBeforeAllFieldsNull = false;
}
String name = (String) changedPropsBefore.get(ContentModel.PROP_NAME);
if (name != null)
{
builder.setName(name);
resourceBeforeAllFieldsNull = false;
}
ContentInfo contentInfo = helper.getContentInfo(changedPropsBefore);
if (contentInfo != null)
{
builder.setContent(contentInfo);
resourceBeforeAllFieldsNull = false;
}
UserInfo modifier = helper.getUserInfo((String) changedPropsBefore.get(ContentModel.PROP_MODIFIER));
if (modifier != null)
{
builder.setModifiedByUser(modifier);
resourceBeforeAllFieldsNull = false;
}
modifiedAt =
helper.getZonedDateTime((Date) changedPropsBefore.get(ContentModel.PROP_MODIFIED));
}
// Handle case where the content does not exist on the propertiesBefore
if (propertiesBefore != null && !propertiesBefore.containsKey(ContentModel.PROP_CONTENT) &&
propertiesAfter != null && propertiesAfter.containsKey(ContentModel.PROP_CONTENT))
{
builder.setContent(new ContentInfo());
resourceBeforeAllFieldsNull = false;
}
Set<String> aspectsBefore = getMappedAspectsBefore(after.getAspectNames());
if (!aspectsBefore.isEmpty())
{
builder.setAspectNames(aspectsBefore);
resourceBeforeAllFieldsNull = false;
}
if (primaryHierarchyBefore != null && !primaryHierarchyBefore.isEmpty())
{
builder.setPrimaryHierarchy(primaryHierarchyBefore);
resourceBeforeAllFieldsNull = false;
}
if (nodeTypeBefore != null)
{
builder.setNodeType(helper.getQNamePrefixString(nodeTypeBefore));
resourceBeforeAllFieldsNull = false;
}
// Only set modifiedAt if one of the other fields is also not null
if (modifiedAt != null && !resourceBeforeAllFieldsNull)
{
builder.setModifiedAt(modifiedAt);
}
return builder.build();
}
Set<String> getMappedAspectsBefore(Set<String> currentAspects)
{
if (currentAspects == null)
{
currentAspects = Collections.emptySet();
}
if (hasChangedAspect())
{
Set<String> removed = helper.mapToNodeAspects(aspectsRemoved);
Set<String> added = helper.mapToNodeAspects(aspectsAdded);
Set<String> before = new HashSet<>();
if (!removed.isEmpty() || !added.isEmpty())
{
before = new HashSet<>(currentAspects);
if (!removed.isEmpty())
{
// Add all the removed aspects from the current list
before.addAll(removed);
}
if (!added.isEmpty())
{
// Remove all the added aspects from the current list
before.removeAll(added);
}
}
return before;
}
return Collections.emptySet();
}
private boolean hasChangedAspect()
{
if ((aspectsRemoved.isEmpty() && aspectsAdded.isEmpty()) ||
org.apache.commons.collections.CollectionUtils.isEqualCollection(aspectsAdded, aspectsRemoved))
{
return false;
}
return true;
}
private <K, V> Map<K, V> getBeforeMapChanges(Map<K, V> before, Map<K, V> after)
{
if (before == null)
{
return Collections.emptyMap();
}
if (after == null)
{
after = Collections.emptyMap();
}
// Get before values that changed
Map<K, V> beforeDelta = new HashMap<>(before);
Map<K, V> afterDelta = new HashMap<>(after);
beforeDelta.entrySet().removeAll(after.entrySet());
// Add nulls for before properties
Set<K> beforeKeys = before.keySet();
Set<K> newKeys = afterDelta.keySet();
newKeys.removeAll(beforeKeys);
for (K key : newKeys)
{
beforeDelta.put(key, null);
}
return beforeDelta;
}
@Override
protected EventType getDerivedEvent()
{
if (isTemporaryEntity())
{
// This event will be filtered out, but we set the correct
// event type anyway for debugging purposes
return EventType.NODE_DELETED;
}
else if (eventTypes.contains(EventType.NODE_CREATED))
{
return EventType.NODE_CREATED;
}
else if (eventTypes.getLast() == EventType.NODE_DELETED)
{
return EventType.NODE_DELETED;
}
else
{
// Default to first event
return eventTypes.getFirst();
}
}
@Override
public boolean isTemporaryEntity()
{
return eventTypes.contains(EventType.NODE_CREATED) && eventTypes.getLast() == EventType.NODE_DELETED;
}
@Override
public QName getEntityType()
{
return nodeType;
}
public List<QName> getAspectsAdded()
{
return aspectsAdded;
}
public List<QName> getAspectsRemoved()
{
return aspectsRemoved;
}
public boolean isResourceBeforeAllFieldsNull()
{
return resourceBeforeAllFieldsNull;
}
public boolean isEventTypeEqualTo(EventType eventType)
{
return this.getDerivedEvent().getType().equals(eventType.getType());
}
protected void setResourceBeforeAllFieldsNull(boolean resourceBeforeAllFieldsNull){
this.resourceBeforeAllFieldsNull = resourceBeforeAllFieldsNull;
}
}

View File

@@ -2,7 +2,7 @@
* #%L * #%L
* Alfresco Repository * Alfresco Repository
* %% * %%
* Copyright (C) 2005 - 2020 Alfresco Software Limited * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %% * %%
* This file is part of the Alfresco software. * This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of * If the software was purchased under a paid Alfresco license, the terms of
@@ -25,14 +25,8 @@
*/ */
package org.alfresco.repo.event2; package org.alfresco.repo.event2;
import java.util.ArrayDeque;
import java.util.Deque;
import org.alfresco.repo.event.v1.model.DataAttributes;
import org.alfresco.repo.event.v1.model.EventData;
import org.alfresco.repo.event.v1.model.EventType; import org.alfresco.repo.event.v1.model.EventType;
import org.alfresco.repo.event.v1.model.PeerAssociationResource; import org.alfresco.repo.event.v1.model.PeerAssociationResource;
import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.service.cmr.repository.AssociationRef; import org.alfresco.service.cmr.repository.AssociationRef;
import org.alfresco.service.namespace.QName; import org.alfresco.service.namespace.QName;
@@ -41,50 +35,12 @@ import org.alfresco.service.namespace.QName;
* *
* @author Sara Aspery * @author Sara Aspery
*/ */
public class PeerAssociationEventConsolidator implements PeerAssociationEventSupportedPolicies public class PeerAssociationEventConsolidator extends EventConsolidator<AssociationRef, PeerAssociationResource> implements PeerAssociationEventSupportedPolicies
{ {
private final Deque<EventType> eventTypes;
protected final AssociationRef associationRef;
private PeerAssociationResource resource;
private final NodeResourceHelper helper;
public PeerAssociationEventConsolidator(AssociationRef associationRef, NodeResourceHelper helper) public PeerAssociationEventConsolidator(AssociationRef associationRef, NodeResourceHelper helper)
{ {
this.eventTypes = new ArrayDeque<>(); super(associationRef, helper);
this.associationRef = associationRef;
this.helper = helper;
}
/**
* Builds and returns the {@link RepoEvent} instance.
*
* @param eventInfo the object holding the event information
* @return the {@link RepoEvent} instance
*/
public RepoEvent<DataAttributes<PeerAssociationResource>> getRepoEvent(EventInfo eventInfo)
{
EventType eventType = getDerivedEvent();
DataAttributes<PeerAssociationResource> eventData = buildEventData(eventInfo, resource);
return RepoEvent.<DataAttributes<PeerAssociationResource>>builder()
.setId(eventInfo.getId())
.setSource(eventInfo.getSource())
.setTime(eventInfo.getTimestamp())
.setType(eventType.getType())
.setData(eventData)
.setDataschema(EventJSONSchema.getSchemaV1(eventType))
.build();
}
protected DataAttributes<PeerAssociationResource> buildEventData(EventInfo eventInfo, PeerAssociationResource resource)
{
return EventData.<PeerAssociationResource>builder()
.setEventGroupId(eventInfo.getTxnId())
.setResource(resource)
.build();
} }
/** /**
@@ -120,12 +76,10 @@ public class PeerAssociationEventConsolidator implements PeerAssociationEventSup
return new PeerAssociationResource(sourceId, targetId, assocType); return new PeerAssociationResource(sourceId, targetId, assocType);
} }
/** @Override
* @return a derived event for a transaction. protected EventType getDerivedEvent()
*/
private EventType getDerivedEvent()
{ {
if (isTemporaryPeerAssociation()) if (isTemporaryEntity())
{ {
// This event will be filtered out, but we set the correct // This event will be filtered out, but we set the correct
// event type anyway for debugging purposes // event type anyway for debugging purposes
@@ -146,34 +100,16 @@ public class PeerAssociationEventConsolidator implements PeerAssociationEventSup
} }
} }
/** @Override
* Whether or not the association has been created and then deleted, i.e. a temporary association. public boolean isTemporaryEntity()
*
* @return {@code true} if the association has been created and then deleted, otherwise false
*/
public boolean isTemporaryPeerAssociation()
{ {
return eventTypes.contains(EventType.PEER_ASSOC_CREATED) && eventTypes.getLast() == EventType.PEER_ASSOC_DELETED; return eventTypes.contains(EventType.PEER_ASSOC_CREATED) && eventTypes.getLast() == EventType.PEER_ASSOC_DELETED;
} }
/** @Override
* Get peer association type. public QName getEntityType()
*
* @return QName the peer association type
*/
public QName getAssocType()
{ {
return associationRef.getTypeQName(); return entityReference.getTypeQName();
}
/**
* Get event types.
*
* @return Deque<EventType> queue of event types
*/
public Deque<EventType> getEventTypes()
{
return eventTypes;
} }
} }

View File

@@ -767,7 +767,7 @@ public abstract class AbstractEventsService extends TransactionListenerAdapter
void addEvents(List<Event> events) void addEvents(List<Event> events)
{ {
events.addAll(events); this.events.addAll(events);
} }
void clear() void clear()

View File

@@ -0,0 +1,83 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.util;
import java.util.Objects;
/**
* Represents a predicate (boolean-valued function) of three arguments. This is the three-arity specialization of Predicate.
* This is a functional interface whose functional method is test(Object, Object, Object).
*
* @param <T> - type of the first argument to the predicate
* @param <U> - type of the second argument the predicate
* @param <V> - type of the third argument the predicate
*/
@FunctionalInterface
public interface TriPredicate<T, U, V>
{
/**
* Evaluates this predicate on the given arguments.
*
* @param t - first input argument
* @param u - second input argument
* @param v - third input argument
* @return true if the input arguments match the predicate, otherwise false
*/
boolean test(T t, U u, V v);
/**
* Creates a composed predicate that represents a logical AND of this predicate and another.
*
* @param other - predicate that will be logically-ANDed with this predicate
* @return composed predicate
*/
default TriPredicate<T, U, V> and(TriPredicate<? super T, ? super U, ? super V> other)
{
Objects.requireNonNull(other);
return (T t, U u, V v) -> test(t, u, v) && other.test(t, u, v);
}
/**
* @return a predicate that represents the logical negation of this predicate
*/
default TriPredicate<T, U, V> negate()
{
return (T t, U u, V v) -> !test(t, u, v);
}
/**
* Creates a composed predicate that represents a logical OR of this predicate and another.
*
* @param other - predicate that will be logically-ORed with this predicate
* @return composed predicate
*/
default TriPredicate<T, U, V> or(TriPredicate<? super T, ? super U, ? super V> other)
{
Objects.requireNonNull(other);
return (T t, U u, V v) -> test(t, u, v) || other.test(t, u, v);
}
}

View File

@@ -41,11 +41,9 @@
<property name="transactionService" ref="transactionService"/> <property name="transactionService" ref="transactionService"/>
<property name="personService" ref="personService"/> <property name="personService" ref="personService"/>
<property name="nodeResourceHelper" ref="nodeResourceHelper"/> <property name="nodeResourceHelper" ref="nodeResourceHelper"/>
<property name="eventGeneratorQueue" ref="eventGeneratorQueue"/> <property name="eventSender" ref="#{ ${repo.event2.queue.skip} == true ? 'directEventSender' : 'enqueuingEventSender' }"/>
<property name="nodeDAO" ref="nodeDAO"/> <property name="nodeDAO" ref="nodeDAO"/>
<property name="enabled"> <property name="enabled" value="${repo.event2.enabled}"/>
<value>${repo.event2.enabled}</value>
</property>
</bean> </bean>
<bean id="baseNodeResourceHelper" abstract="true"> <bean id="baseNodeResourceHelper" abstract="true">
@@ -61,14 +59,13 @@
<bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2"/> <bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2"/>
<bean id="eventGeneratorQueue" class="org.alfresco.repo.event2.EventGeneratorQueue" > <bean id="directEventSender" class="org.alfresco.repo.event2.DirectEventSender">
<property name="enqueueThreadPoolExecutor"> <property name="event2MessageProducer" ref="event2MessageProducer"/>
<ref bean="eventAsyncEnqueueThreadPool" /> </bean>
</property>
<property name="dequeueThreadPoolExecutor"> <bean id="enqueuingEventSender" class="org.alfresco.repo.event2.EnqueuingEventSender" parent="directEventSender" lazy-init="true">
<ref bean="eventAsyncDequeueThreadPool" /> <property name="enqueueThreadPoolExecutor" ref="eventAsyncEnqueueThreadPool"/>
</property> <property name="dequeueThreadPoolExecutor" ref="eventAsyncDequeueThreadPool"/>
<property name="event2MessageProducer" ref="event2MessageProducer"/>
</bean> </bean>
<bean id="eventAsyncEnqueueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean"> <bean id="eventAsyncEnqueueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean">

View File

@@ -1229,6 +1229,8 @@ repo.event2.filter.childAssocTypes=rn:rendition
repo.event2.filter.users= repo.event2.filter.users=
# Topic name # Topic name
repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2 repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2
# Specifies if messages should be enqueued in in-memory queue or sent directly to the topic
repo.event2.queue.skip=false
# Thread pool for async enqueue of repo events # Thread pool for async enqueue of repo events
repo.event2.queue.enqueueThreadPool.priority=1 repo.event2.queue.enqueueThreadPool.priority=1
repo.event2.queue.enqueueThreadPool.coreSize=8 repo.event2.queue.enqueueThreadPool.coreSize=8

View File

@@ -23,7 +23,6 @@
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>. * along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L% * #L%
*/ */
package org.alfresco.repo.event2; package org.alfresco.repo.event2;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
@@ -72,6 +71,7 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@@ -108,23 +108,24 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
@Autowired @Autowired
protected CustomModelService customModelService; protected CustomModelService customModelService;
@Qualifier("descriptorComponent")
@Autowired @Autowired
@Qualifier("descriptorComponent")
protected DescriptorService descriptorService; protected DescriptorService descriptorService;
@Autowired @Autowired
protected ObjectMapper event2ObjectMapper; @Qualifier("event2ObjectMapper")
protected ObjectMapper objectMapper;
@Autowired @Qualifier("eventGeneratorV2") @Autowired
@Qualifier("eventGeneratorV2")
protected EventGenerator eventGenerator; protected EventGenerator eventGenerator;
@Autowired
@Qualifier("eventGeneratorQueue")
protected EventGeneratorQueue eventQueue;
@Autowired @Autowired
private NamespaceDAO namespaceDAO; private NamespaceDAO namespaceDAO;
@Value("${repo.event2.queue.skip}")
protected boolean skipEventQueue;
protected NodeRef rootNodeRef; protected NodeRef rootNodeRef;
@BeforeClass @BeforeClass
@@ -134,7 +135,7 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
} }
@AfterClass @AfterClass
public static void afterAll() throws Exception public static void afterAll()
{ {
CAMEL_CONTEXT.stop(); CAMEL_CONTEXT.stop();
} }
@@ -144,7 +145,7 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
{ {
if (!isCamelConfigured) if (!isCamelConfigured)
{ {
dataFormat = new JacksonDataFormat(event2ObjectMapper, RepoEvent.class); dataFormat = new JacksonDataFormat(objectMapper, RepoEvent.class);
configRoute(); configRoute();
isCamelConfigured = true; isCamelConfigured = true;
} }

View File

@@ -0,0 +1,100 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.event2;
import java.util.HashSet;
import java.util.Set;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.ContextHierarchy;
@ContextHierarchy({
// Context hierarchy inherits context config from parent classes and extends it with TestConfig from this class
@ContextConfiguration(classes = DirectEventGeneratorTest.TestConfig.class)
})
public class DirectEventGeneratorTest extends EventGeneratorTest
{
@Autowired
private InstantiatedBeansRegistry instantiatedBeansRegistry;
@Autowired
private EventSender directEventSender;
@BeforeClass
public static void beforeClass()
{
System.setProperty("repo.event2.queue.skip", "true");
}
@Test
public void testIfEnqueuingEventSenderIsNotInstantiated()
{
final Set<String> instantiatedBeans = this.instantiatedBeansRegistry.getBeans();
assertTrue(skipEventQueue);
assertFalse(instantiatedBeans.contains("enqueuingEventSender"));
}
@Test
public void testIfDirectSenderIsSetInEventGenerator()
{
assertTrue(skipEventQueue);
assertEquals(directEventSender, eventGenerator.getEventSender());
}
@Configuration
public static class TestConfig
{
@Bean
public BeanPostProcessor instantiatedBeansRegistry()
{
return new InstantiatedBeansRegistry();
}
}
protected static class InstantiatedBeansRegistry implements BeanPostProcessor
{
private final Set<String> registeredBeans = new HashSet<>();
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException
{
registeredBeans.add(beanName);
return bean;
}
public Set<String> getBeans() {
return registeredBeans;
}
}
}

View File

@@ -0,0 +1,42 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.event2;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
public class EnqueuingEventGeneratorTest extends EventGeneratorTest
{
@Autowired
private EventSender enqueuingEventSender;
@Test
public void testIfEnqueuingSenderIsSetInEventGenerator()
{
assertFalse(skipEventQueue);
assertEquals(enqueuingEventSender, eventGenerator.getEventSender());
}
}

View File

@@ -2,7 +2,7 @@
* #%L * #%L
* Alfresco Repository * Alfresco Repository
* %% * %%
* Copyright (C) 2005 - 2021 Alfresco Software Limited * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %% * %%
* This file is part of the Alfresco software. * This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of * If the software was purchased under a paid Alfresco license, the terms of
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.when;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@@ -51,9 +52,9 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
public class EventGeneratorQueueUnitTest public class EnqueuingEventSenderUnitTest
{ {
private EventGeneratorQueue queue; private EnqueuingEventSender eventSender;
private Event2MessageProducer bus; private Event2MessageProducer bus;
private ExecutorService enqueuePool; private ExecutorService enqueuePool;
@@ -64,15 +65,15 @@ public class EventGeneratorQueueUnitTest
@Before @Before
public void setup() public void setup()
{ {
queue = new EventGeneratorQueue(); eventSender = new EnqueuingEventSender();
enqueuePool = newThreadPool(); enqueuePool = newThreadPool();
queue.setEnqueueThreadPoolExecutor(enqueuePool); eventSender.setEnqueueThreadPoolExecutor(enqueuePool);
dequeuePool = newThreadPool(); dequeuePool = newThreadPool();
queue.setDequeueThreadPoolExecutor(dequeuePool); eventSender.setDequeueThreadPoolExecutor(dequeuePool);
bus = mock(Event2MessageProducer.class); bus = mock(Event2MessageProducer.class);
queue.setEvent2MessageProducer(bus); eventSender.setEvent2MessageProducer(bus);
events = new HashMap<>(); events = new HashMap<>();
@@ -83,6 +84,7 @@ public class EventGeneratorQueueUnitTest
public void teardown() public void teardown()
{ {
enqueuePool.shutdown(); enqueuePool.shutdown();
dequeuePool.shutdown();
} }
private void setupEventsRecorder() private void setupEventsRecorder()
@@ -104,7 +106,7 @@ public class EventGeneratorQueueUnitTest
@Test @Test
public void shouldReceiveSingleQuickMessage() throws Exception public void shouldReceiveSingleQuickMessage() throws Exception
{ {
queue.accept(messageWithDelay("A", 55l)); eventSender.accept(messageWithDelay("A", 55l));
sleep(150l); sleep(150l);
@@ -115,7 +117,7 @@ public class EventGeneratorQueueUnitTest
@Test @Test
public void shouldNotReceiveEventsWhenMessageIsNull() throws Exception public void shouldNotReceiveEventsWhenMessageIsNull() throws Exception
{ {
queue.accept(() -> { return null; }); eventSender.accept(() -> { return null; });
sleep(150l); sleep(150l);
@@ -124,9 +126,9 @@ public class EventGeneratorQueueUnitTest
@Test @Test
public void shouldReceiveMultipleMessagesPreservingOrderScenarioOne() throws Exception { public void shouldReceiveMultipleMessagesPreservingOrderScenarioOne() throws Exception {
queue.accept(messageWithDelay("A", 0l)); eventSender.accept(messageWithDelay("A", 0l));
queue.accept(messageWithDelay("B", 100l)); eventSender.accept(messageWithDelay("B", 100l));
queue.accept(messageWithDelay("C", 200l)); eventSender.accept(messageWithDelay("C", 200l));
sleep(450l); sleep(450l);
@@ -139,9 +141,9 @@ public class EventGeneratorQueueUnitTest
@Test @Test
public void shouldReceiveMultipleMessagesPreservingOrderScenarioTwo() throws Exception public void shouldReceiveMultipleMessagesPreservingOrderScenarioTwo() throws Exception
{ {
queue.accept(messageWithDelay("A", 300l)); eventSender.accept(messageWithDelay("A", 300l));
queue.accept(messageWithDelay("B", 150l)); eventSender.accept(messageWithDelay("B", 150l));
queue.accept(messageWithDelay("C", 0l)); eventSender.accept(messageWithDelay("C", 0l));
sleep(950l); sleep(950l);
@@ -154,10 +156,10 @@ public class EventGeneratorQueueUnitTest
@Test @Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisoned() throws Exception public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisoned() throws Exception
{ {
queue.accept(messageWithDelay("A", 300l)); eventSender.accept(messageWithDelay("A", 300l));
queue.accept(() -> {throw new RuntimeException("Boom! (not to worry, this is a test)");}); eventSender.accept(() -> {throw new RuntimeException("Boom! (not to worry, this is a test)");});
queue.accept(messageWithDelay("B", 55l)); eventSender.accept(messageWithDelay("B", 55l));
queue.accept(messageWithDelay("C", 0l)); eventSender.accept(messageWithDelay("C", 0l));
sleep(950l); sleep(950l);
@@ -170,12 +172,12 @@ public class EventGeneratorQueueUnitTest
@Test @Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisoned() throws Exception public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisoned() throws Exception
{ {
Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l); Callable<Optional<RepoEvent<?>>> makerB = messageWithDelay("B", 55l);
RepoEvent<?> messageB = makerB.call(); RepoEvent<?> messageB = makerB.call().get();
doThrow(new RuntimeException("Boom! (not to worry, this is a test)")).when(bus).send(messageB); doThrow(new RuntimeException("Boom! (not to worry, this is a test)")).when(bus).send(messageB);
queue.accept(messageWithDelay("A", 300l)); eventSender.accept(messageWithDelay("A", 300l));
queue.accept(makerB); eventSender.accept(makerB);
queue.accept(messageWithDelay("C", 0l)); eventSender.accept(messageWithDelay("C", 0l));
sleep(950l); sleep(950l);
@@ -187,10 +189,10 @@ public class EventGeneratorQueueUnitTest
@Test @Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisonedWithError() throws Exception public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisonedWithError() throws Exception
{ {
queue.accept(messageWithDelay("A", 300l)); eventSender.accept(messageWithDelay("A", 300l));
queue.accept(() -> {throw new OutOfMemoryError("Boom! (not to worry, this is a test)");}); eventSender.accept(() -> {throw new OutOfMemoryError("Boom! (not to worry, this is a test)");});
queue.accept(messageWithDelay("B", 55l)); eventSender.accept(messageWithDelay("B", 55l));
queue.accept(messageWithDelay("C", 0l)); eventSender.accept(messageWithDelay("C", 0l));
sleep(950l); sleep(950l);
@@ -203,12 +205,12 @@ public class EventGeneratorQueueUnitTest
@Test @Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisonedWithError() throws Exception public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisonedWithError() throws Exception
{ {
Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l); Callable<Optional<RepoEvent<?>>> makerB = messageWithDelay("B", 55l);
RepoEvent<?> messageB = makerB.call(); RepoEvent<?> messageB = makerB.call().get();
doThrow(new OutOfMemoryError("Boom! (not to worry, this is a test)")).when(bus).send(messageB); doThrow(new OutOfMemoryError("Boom! (not to worry, this is a test)")).when(bus).send(messageB);
queue.accept(messageWithDelay("A", 300l)); eventSender.accept(messageWithDelay("A", 300l));
queue.accept(makerB); eventSender.accept(makerB);
queue.accept(messageWithDelay("C", 0l)); eventSender.accept(messageWithDelay("C", 0l));
sleep(950l); sleep(950l);
@@ -217,33 +219,32 @@ public class EventGeneratorQueueUnitTest
assertEquals("C", recordedEvents.get(1).getId()); assertEquals("C", recordedEvents.get(1).getId());
} }
private Callable<RepoEvent<?>> messageWithDelay(String id, long delay) private Callable<Optional<RepoEvent<?>>> messageWithDelay(String id, long delay)
{ {
Callable<RepoEvent<?>> res = new Callable<RepoEvent<?>>() { return new Callable<Optional<RepoEvent<?>>>()
{
@Override @Override
public RepoEvent<?> call() throws Exception public Optional<RepoEvent<?>> call() throws Exception
{ {
if(delay != 0) if (delay != 0)
{ {
sleep(delay); sleep(delay);
} }
return newRepoEvent(id); return Optional.of(newRepoEvent(id));
} }
@Override @Override
public String toString() public String toString()
{ {
return id; return id;
} }
}; };
return res;
} }
private RepoEvent<?> newRepoEvent(String id) private RepoEvent<?> newRepoEvent(String id)
{ {
RepoEvent<?> ev = events.get(id); RepoEvent<?> ev = events.get(id);
if (ev!=null) if (ev != null)
return ev; return ev;
ev = mock(RepoEvent.class); ev = mock(RepoEvent.class);

View File

@@ -2,7 +2,7 @@
* #%L * #%L
* Alfresco Repository * Alfresco Repository
* %% * %%
* Copyright (C) 2005 - 2020 Alfresco Software Limited * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %% * %%
* This file is part of the Alfresco software. * This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of * If the software was purchased under a paid Alfresco license, the terms of
@@ -43,7 +43,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testGetMappedAspectsBeforeRemovedAndAddedEmpty() public void testGetMappedAspectsBeforeRemovedAndAddedEmpty()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
Set<String> currentAspects = new HashSet<>(); Set<String> currentAspects = new HashSet<>();
currentAspects.add("cm:geographic"); currentAspects.add("cm:geographic");
@@ -57,7 +57,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testGetMappedAspectsBefore_AspectRemoved() public void testGetMappedAspectsBefore_AspectRemoved()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
Set<String> currentAspects = new HashSet<>(); Set<String> currentAspects = new HashSet<>();
@@ -79,7 +79,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testGetMappedAspectsBefore_AspectAdded() public void testGetMappedAspectsBefore_AspectAdded()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
Set<String> currentAspects = new HashSet<>(); Set<String> currentAspects = new HashSet<>();
@@ -102,7 +102,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testGetMappedAspectsBefore_AspectAddedAndRemoved() public void testGetMappedAspectsBefore_AspectAddedAndRemoved()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
Set<String> currentAspects = new HashSet<>(); Set<String> currentAspects = new HashSet<>();
@@ -125,7 +125,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testGetMappedAspectsBefore_AspectRemovedAndAdded() public void testGetMappedAspectsBefore_AspectRemovedAndAdded()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
@@ -150,7 +150,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testGetMappedAspectsBefore_AspectAddedTwiceRemovedOnce() public void testGetMappedAspectsBefore_AspectAddedTwiceRemovedOnce()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
@@ -178,7 +178,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testGetMappedAspectsBefore_AspectRemovedTwiceAddedOnce() public void testGetMappedAspectsBefore_AspectRemovedTwiceAddedOnce()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
@@ -206,7 +206,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testGetMappedAspectsBefore_FilteredAspectAdded() public void testGetMappedAspectsBefore_FilteredAspectAdded()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASPECT_COPIEDFROM); eventConsolidator.addAspect(ContentModel.ASPECT_COPIEDFROM);
Set<String> currentAspects = new HashSet<>(); Set<String> currentAspects = new HashSet<>();
@@ -227,7 +227,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testAddAspect() public void testAddAspect()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
assertEquals(1, eventConsolidator.getAspectsAdded().size()); assertEquals(1, eventConsolidator.getAspectsAdded().size());
@@ -238,7 +238,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testRemoveAspect() public void testRemoveAspect()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
assertEquals(0, eventConsolidator.getAspectsAdded().size()); assertEquals(0, eventConsolidator.getAspectsAdded().size());
@@ -249,7 +249,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testAddAspectRemoveAspect() public void testAddAspectRemoveAspect()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
@@ -260,7 +260,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testRemoveAspectAddAspect() public void testRemoveAspectAddAspect()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
@@ -271,7 +271,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testAddAspectTwiceRemoveAspectOnce() public void testAddAspectTwiceRemoveAspectOnce()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
@@ -284,7 +284,7 @@ public class EventConsolidatorUnitTest
@Test @Test
public void testAddAspectOnceRemoveAspectTwice() public void testAddAspectOnceRemoveAspectTwice()
{ {
EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper); NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS); eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);

View File

@@ -25,94 +25,16 @@
*/ */
package org.alfresco.repo.event2; package org.alfresco.repo.event2;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import org.alfresco.model.ContentModel; import org.alfresco.model.ContentModel;
import org.alfresco.repo.event.v1.model.RepoEvent;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import com.fasterxml.jackson.databind.ObjectMapper; public class EventGeneratorDisabledTest extends EventGeneratorTest
public class EventGeneratorDisabledTest extends AbstractContextAwareRepoEvent
{ {
private static final String EVENT2_TOPIC_NAME = "alfresco.repo.event2";
private static final String BROKER_URL = "tcp://localhost:61616";
@Autowired @Qualifier("event2ObjectMapper")
private ObjectMapper objectMapper;
@Autowired
protected ObjectMapper event2ObjectMapper;
//private EventGenerator eventGenerator;
private ActiveMQConnection connection;
protected List<RepoEvent<?>> receivedEvents;
@Before
public void setup() throws Exception
{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(EVENT2_TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(destination);
receivedEvents = Collections.synchronizedList(new LinkedList<>());
consumer.setMessageListener(new MessageListener()
{
@Override
public void onMessage(Message message)
{
String text = getText(message);
RepoEvent<?> event = toRepoEvent(text);
receivedEvents.add(event);
}
private RepoEvent<?> toRepoEvent(String json)
{
try
{
return objectMapper.readValue(json, RepoEvent.class);
} catch (Exception e)
{
e.printStackTrace();
return null;
}
}
});
}
@After
public void shutdownTopicListener() throws Exception
{
connection.close();
connection = null;
}
@Test @Test
public void shouldNotReceiveEvent2EventsOnNodeCreation() throws Exception public void shouldNotReceiveEvent2EventsOnNodeCreation()
{ {
if (eventGenerator.isEnabled()) if (eventGenerator.isEnabled())
{ {
@@ -127,44 +49,29 @@ public class EventGeneratorDisabledTest extends AbstractContextAwareRepoEvent
assertTrue(receivedEvents.size() == 0); assertTrue(receivedEvents.size() == 0);
eventGenerator.enable(); eventGenerator.enable();
} }
@Test @Test
public void shouldReceiveEvent2EventsOnNodeCreation() throws Exception @Override
public void shouldReceiveEvent2EventsOnNodeCreation()
{ {
if (!eventGenerator.isEnabled()) if (!eventGenerator.isEnabled())
{ {
eventGenerator.enable(); eventGenerator.enable();
} }
createNode(ContentModel.TYPE_CONTENT);
Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 1);
assertTrue(EVENT_CONTAINER.getEvents().size() == 1);
assertTrue(receivedEvents.size() == 1);
RepoEvent<?> sent = getRepoEvent(1); super.shouldReceiveEvent2EventsOnNodeCreation();
RepoEvent<?> received = receivedEvents.get(0);
assertEventsEquals("Events are different!", sent, received);
}
private void assertEventsEquals(String message, RepoEvent<?> expected, RepoEvent<?> current)
{
assertEquals(message, expected, current);
} }
private static String getText(Message message) @Test
@Override
public void shouldReceiveEvent2EventsInOrder()
{ {
try if (!eventGenerator.isEnabled())
{ {
ActiveMQTextMessage am = (ActiveMQTextMessage) message; eventGenerator.enable();
return am.getText();
} catch (JMSException e)
{
return null;
} }
}
super.shouldReceiveEvent2EventsInOrder();
}
} }

View File

@@ -39,7 +39,6 @@ import jakarta.jms.MessageListener;
import jakarta.jms.Session; import jakarta.jms.Session;
import org.alfresco.model.ContentModel; import org.alfresco.model.ContentModel;
import org.alfresco.repo.event.databind.ObjectMapperFactory;
import org.alfresco.repo.event.v1.model.RepoEvent; import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.repository.NodeRef;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
@@ -51,24 +50,25 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.annotation.DirtiesContext;
import org.springframework.beans.factory.annotation.Qualifier;
import com.fasterxml.jackson.databind.ObjectMapper; @DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
public abstract class EventGeneratorTest extends AbstractContextAwareRepoEvent
public class EventGeneratorTest extends AbstractContextAwareRepoEvent
{ {
private static final String EVENT2_TOPIC_NAME = "alfresco.repo.event2"; private static final String EVENT2_TOPIC_NAME = "alfresco.repo.event2";
private static final long DUMP_BROKER_TIMEOUT = 50000000L;
private static final long DUMP_BROKER_TIMEOUT = 50000000l;
@Autowired @Qualifier("event2ObjectMapper")
private ObjectMapper objectMapper;
private ActiveMQConnection connection; private ActiveMQConnection connection;
protected List<RepoEvent<?>> receivedEvents; protected List<RepoEvent<?>> receivedEvents;
@BeforeClass
public static void beforeClass()
{
System.setProperty("repo.event2.queue.skip", "false");
}
@Before @Before
public void startupTopicListener() throws Exception public void startupTopicListener() throws Exception
{ {
@@ -116,11 +116,6 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
} }
} }
protected ObjectMapper createObjectMapper()
{
return ObjectMapperFactory.createInstance();
}
@After @After
public void shutdownTopicListener() throws Exception public void shutdownTopicListener() throws Exception
{ {
@@ -129,30 +124,21 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
} }
@Test @Test
public void shouldReceiveEvent2EventsOnNodeCreation() throws Exception public void shouldReceiveEvent2EventsOnNodeCreation()
{ {
createNode(ContentModel.TYPE_CONTENT); createNode(ContentModel.TYPE_CONTENT);
Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 1); Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 1);
assertEquals(1, EVENT_CONTAINER.getEvents().size());
assertEquals(1, receivedEvents.size());
RepoEvent<?> sent = getRepoEvent(1); RepoEvent<?> sent = getRepoEvent(1);
RepoEvent<?> received = receivedEvents.get(0); RepoEvent<?> received = receivedEvents.get(0);
assertEventsEquals("Events are different!", sent, received); assertEventsEquals("Events are different!", sent, received);
} }
private void assertEventsEquals(String message, RepoEvent<?> expected, RepoEvent<?> current)
{
if (DEBUG)
{
System.err.println("XP: " + expected);
System.err.println("CU: " + current);
}
assertEquals(message, expected, current);
}
@Test @Test
public void shouldReceiveEvent2EventsInOrder() throws Exception public void shouldReceiveEvent2EventsInOrder()
{ {
NodeRef nodeRef = createNode(ContentModel.TYPE_CONTENT); NodeRef nodeRef = createNode(ContentModel.TYPE_CONTENT);
updateNodeName(nodeRef, "TestFile-" + System.currentTimeMillis() + ".txt"); updateNodeName(nodeRef, "TestFile-" + System.currentTimeMillis() + ".txt");
@@ -163,9 +149,20 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
RepoEvent<?> sentCreation = getRepoEvent(1); RepoEvent<?> sentCreation = getRepoEvent(1);
RepoEvent<?> sentUpdate = getRepoEvent(2); RepoEvent<?> sentUpdate = getRepoEvent(2);
RepoEvent<?> sentDeletion = getRepoEvent(3); RepoEvent<?> sentDeletion = getRepoEvent(3);
assertEquals("Expected create event!", sentCreation, (RepoEvent<?>) receivedEvents.get(0)); assertEquals("Expected create event!", sentCreation, receivedEvents.get(0));
assertEquals("Expected update event!", sentUpdate, (RepoEvent<?>) receivedEvents.get(1)); assertEquals("Expected update event!", sentUpdate, receivedEvents.get(1));
assertEquals("Expected delete event!", sentDeletion, (RepoEvent<?>) receivedEvents.get(2)); assertEquals("Expected delete event!", sentDeletion, receivedEvents.get(2));
}
private void assertEventsEquals(String message, RepoEvent<?> expected, RepoEvent<?> current)
{
if (DEBUG)
{
System.err.println("XP: " + expected);
System.err.println("CU: " + current);
}
assertEquals(message, expected, current);
} }
private static String getText(Message message) private static String getText(Message message)
@@ -174,7 +171,8 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
{ {
ActiveMQTextMessage am = (ActiveMQTextMessage) message; ActiveMQTextMessage am = (ActiveMQTextMessage) message;
return am.getText(); return am.getText();
} catch (JMSException e) }
catch (JMSException e)
{ {
return null; return null;
} }
@@ -206,7 +204,8 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
try try
{ {
System.out.println("- " + queue.getQueueName()); System.out.println("- " + queue.getQueueName());
} catch (JMSException e) }
catch (JMSException e)
{ {
e.printStackTrace(); e.printStackTrace();
} }
@@ -219,7 +218,8 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
try try
{ {
System.out.println("- " + topic.getTopicName()); System.out.println("- " + topic.getTopicName());
} catch (JMSException e) }
catch (JMSException e)
{ {
e.printStackTrace(); e.printStackTrace();
} }
@@ -230,18 +230,14 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
System.out.println("\nListening to topic " + EVENT2_TOPIC_NAME + "..."); System.out.println("\nListening to topic " + EVENT2_TOPIC_NAME + "...");
consumer.setMessageListener(new MessageListener() consumer.setMessageListener(message -> {
{ String text = getText(message);
@Override System.out.println("Received message " + message + "\n" + text + "\n");
public void onMessage(Message message)
{
String text = getText(message);
System.out.println("Received message " + message + "\n" + text + "\n");
}
}); });
Thread.sleep(timeout); Thread.sleep(timeout);
} finally }
finally
{ {
connection.close(); connection.close();
} }

View File

@@ -2,7 +2,7 @@
* #%L * #%L
* Alfresco Repository * Alfresco Repository
* %% * %%
* Copyright (C) 2005 - 2020 Alfresco Software Limited * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %% * %%
* This file is part of the Alfresco software. * This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of * If the software was purchased under a paid Alfresco license, the terms of
@@ -35,7 +35,8 @@ import org.junit.runners.Suite.SuiteClasses;
DeleteRepoEventIT.class, DeleteRepoEventIT.class,
ChildAssociationRepoEventIT.class, ChildAssociationRepoEventIT.class,
PeerAssociationRepoEventIT.class, PeerAssociationRepoEventIT.class,
EventGeneratorTest.class, EnqueuingEventGeneratorTest.class,
DirectEventGeneratorTest.class,
EventGeneratorDisabledTest.class EventGeneratorDisabledTest.class
}) })
public class RepoEvent2ITSuite public class RepoEvent2ITSuite

View File

@@ -23,7 +23,6 @@
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>. * along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L% * #L%
*/ */
package org.alfresco.repo.event2; package org.alfresco.repo.event2;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@@ -34,7 +33,7 @@ import org.junit.runners.Suite.SuiteClasses;
@SuiteClasses({ EventFilterUnitTest.class, @SuiteClasses({ EventFilterUnitTest.class,
EventConsolidatorUnitTest.class, EventConsolidatorUnitTest.class,
EventJSONSchemaUnitTest.class, EventJSONSchemaUnitTest.class,
EventGeneratorQueueUnitTest.class, EnqueuingEventSenderUnitTest.class,
NodeResourceHelperUnitTest.class NodeResourceHelperUnitTest.class
}) })
public class RepoEvent2UnitSuite public class RepoEvent2UnitSuite