diff --git a/core/src/main/java/org/alfresco/util/transaction/TransactionListener.java b/core/src/main/java/org/alfresco/util/transaction/TransactionListener.java
index e97222ad5a..a597f5c05c 100644
--- a/core/src/main/java/org/alfresco/util/transaction/TransactionListener.java
+++ b/core/src/main/java/org/alfresco/util/transaction/TransactionListener.java
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2005-2010 Alfresco Software Limited.
+ * Copyright (C) 2005-2023 Alfresco Software Limited.
*
* This file is part of Alfresco
*
@@ -53,7 +53,7 @@ public interface TransactionListener
* on the state of the transaction.
*
* Although all transaction resources are still available, this method should
- * be used only for cleaning up resources after a commit has occured.
+ * be used only for cleaning up resources after a commit has occurred.
*/
void afterCommit();
@@ -64,7 +64,7 @@ public interface TransactionListener
* on the state of the transaction.
*
* Although all transaction resources are still available, this method should
- * be used only for cleaning up resources after a rollback has occured.
+ * be used only for cleaning up resources after a rollback has occurred.
*/
void afterRollback();
}
diff --git a/repository/src/main/java/org/alfresco/repo/event2/ChildAssociationEventConsolidator.java b/repository/src/main/java/org/alfresco/repo/event2/ChildAssociationEventConsolidator.java
index d09833b74e..85163138a8 100644
--- a/repository/src/main/java/org/alfresco/repo/event2/ChildAssociationEventConsolidator.java
+++ b/repository/src/main/java/org/alfresco/repo/event2/ChildAssociationEventConsolidator.java
@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
- * Copyright (C) 2005 - 2020 Alfresco Software Limited
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -25,13 +25,8 @@
*/
package org.alfresco.repo.event2;
-import java.util.ArrayDeque;
-import java.util.Deque;
import org.alfresco.repo.event.v1.model.ChildAssociationResource;
-import org.alfresco.repo.event.v1.model.DataAttributes;
-import org.alfresco.repo.event.v1.model.EventData;
import org.alfresco.repo.event.v1.model.EventType;
-import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.service.cmr.repository.ChildAssociationRef;
import org.alfresco.service.namespace.QName;
@@ -41,51 +36,12 @@ import org.alfresco.service.namespace.QName;
* @author Chris Shields
* @author Sara Aspery
*/
-public class ChildAssociationEventConsolidator implements ChildAssociationEventSupportedPolicies
+public class ChildAssociationEventConsolidator extends EventConsolidator implements ChildAssociationEventSupportedPolicies
{
- private final Deque eventTypes;
-
- protected final ChildAssociationRef childAssociationRef;
-
- private ChildAssociationResource resource;
- private final NodeResourceHelper helper;
public ChildAssociationEventConsolidator(ChildAssociationRef childAssociationRef, NodeResourceHelper helper)
{
- this.eventTypes = new ArrayDeque<>();
- this.childAssociationRef = childAssociationRef;
- this.helper = helper;
- this.resource = buildChildAssociationResource(this.childAssociationRef);
- }
-
- /**
- * Builds and returns the {@link RepoEvent} instance.
- *
- * @param eventInfo the object holding the event information
- * @return the {@link RepoEvent} instance
- */
- public RepoEvent> getRepoEvent(EventInfo eventInfo)
- {
- EventType eventType = getDerivedEvent();
-
- DataAttributes eventData = buildEventData(eventInfo, resource);
-
- return RepoEvent.>builder()
- .setId(eventInfo.getId())
- .setSource(eventInfo.getSource())
- .setTime(eventInfo.getTimestamp())
- .setType(eventType.getType())
- .setData(eventData)
- .setDataschema(EventJSONSchema.getSchemaV1(eventType))
- .build();
- }
-
- protected DataAttributes buildEventData(EventInfo eventInfo, ChildAssociationResource resource)
- {
- return EventData.builder()
- .setEventGroupId(eventInfo.getTxnId())
- .setResource(resource)
- .build();
+ super(childAssociationRef, helper);
}
/**
@@ -121,12 +77,10 @@ public class ChildAssociationEventConsolidator implements ChildAssociationEventS
return new ChildAssociationResource(parentId, childId, assocType, assocQName);
}
- /**
- * @return a derived event for a transaction.
- */
- private EventType getDerivedEvent()
+ @Override
+ protected EventType getDerivedEvent()
{
- if (isTemporaryChildAssociation())
+ if (isTemporaryEntity())
{
// This event will be filtered out, but we set the correct
// event type anyway for debugging purposes
@@ -147,33 +101,15 @@ public class ChildAssociationEventConsolidator implements ChildAssociationEventS
}
}
- /**
- * Whether or not the child association has been created and then deleted, i.e. a temporary child association.
- *
- * @return {@code true} if the child association has been created and then deleted, otherwise false
- */
- public boolean isTemporaryChildAssociation()
+ @Override
+ public boolean isTemporaryEntity()
{
return eventTypes.contains(EventType.CHILD_ASSOC_CREATED) && eventTypes.getLast() == EventType.CHILD_ASSOC_DELETED;
}
- /**
- * Get child association type.
- *
- * @return QName the child association type
- */
- public QName getChildAssocType()
+ @Override
+ public QName getEntityType()
{
- return childAssociationRef.getTypeQName();
- }
-
- /**
- * Get event types.
- *
- * @return Deque queue of event types
- */
- public Deque getEventTypes()
- {
- return eventTypes;
+ return entityReference.getTypeQName();
}
}
diff --git a/repository/src/main/java/org/alfresco/repo/event2/DirectEventSender.java b/repository/src/main/java/org/alfresco/repo/event2/DirectEventSender.java
new file mode 100644
index 0000000000..f2af8666ea
--- /dev/null
+++ b/repository/src/main/java/org/alfresco/repo/event2/DirectEventSender.java
@@ -0,0 +1,70 @@
+/*
+ * #%L
+ * Alfresco Repository
+ * %%
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
+ * %%
+ * This file is part of the Alfresco software.
+ * If the software was purchased under a paid Alfresco license, the terms of
+ * the paid license agreement will prevail. Otherwise, the software is
+ * provided under the following open source license terms:
+ *
+ * Alfresco is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Alfresco is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with Alfresco. If not, see .
+ * #L%
+ */
+package org.alfresco.repo.event2;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+import org.alfresco.error.AlfrescoRuntimeException;
+import org.alfresco.repo.event.v1.model.RepoEvent;
+import org.alfresco.util.PropertyCheck;
+import org.springframework.beans.factory.InitializingBean;
+
+/**
+ * Sends a message to a destination in the current thread.
+ */
+public class DirectEventSender implements EventSender, InitializingBean
+{
+ protected Event2MessageProducer event2MessageProducer;
+
+ @Override
+ public void afterPropertiesSet()
+ {
+ PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
+ }
+
+ public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer)
+ {
+ this.event2MessageProducer = event2MessageProducer;
+ }
+
+ @Override
+ public void accept(Callable>> eventProducer)
+ {
+ try
+ {
+ eventProducer.call().ifPresent(event -> event2MessageProducer.send(event));
+ }
+ catch (RuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new AlfrescoRuntimeException("Unexpected error while executing maker function for repository event", e);
+ }
+ }
+}
diff --git a/repository/src/main/java/org/alfresco/repo/event2/EventGeneratorQueue.java b/repository/src/main/java/org/alfresco/repo/event2/EnqueuingEventSender.java
similarity index 61%
rename from repository/src/main/java/org/alfresco/repo/event2/EventGeneratorQueue.java
rename to repository/src/main/java/org/alfresco/repo/event2/EnqueuingEventSender.java
index 2805b16b1d..08a425f218 100644
--- a/repository/src/main/java/org/alfresco/repo/event2/EventGeneratorQueue.java
+++ b/repository/src/main/java/org/alfresco/repo/event2/EnqueuingEventSender.java
@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
- * Copyright (C) 2005 - 2021 Alfresco Software Limited
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -25,6 +25,7 @@
*/
package org.alfresco.repo.event2;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -36,40 +37,33 @@ import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.util.PropertyCheck;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.springframework.beans.factory.InitializingBean;
-/*
- * This queue allows to create asynchronously the RepoEvent offloading the work to a ThreadPool but
- * at the same time it preserves the order of the events
+/**
+ * Enqueuing event sender allows to create asynchronously the RepoEvent offloading the work to a ThreadPool but
+ * at the same time it preserves the order of the events.
*/
-public class EventGeneratorQueue implements InitializingBean
+public class EnqueuingEventSender extends DirectEventSender
{
- protected static final Log LOGGER = LogFactory.getLog(EventGeneratorQueue.class);
-
+ protected static final Log LOGGER = LogFactory.getLog(EnqueuingEventSender.class);
+
protected Executor enqueueThreadPoolExecutor;
protected Executor dequeueThreadPoolExecutor;
- protected Event2MessageProducer event2MessageProducer;
protected BlockingQueue queue = new LinkedBlockingQueue<>();
protected Runnable listener = createListener();
@Override
- public void afterPropertiesSet() throws Exception
+ public void afterPropertiesSet()
{
+ super.afterPropertiesSet();
PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor);
PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor);
- PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer);
}
- public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer)
- {
- this.event2MessageProducer = event2MessageProducer;
- }
-
public void setEnqueueThreadPoolExecutor(Executor enqueueThreadPoolExecutor)
{
this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor;
}
-
+
public void setDequeueThreadPoolExecutor(Executor dequeueThreadPoolExecutor)
{
this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor;
@@ -78,11 +72,12 @@ public class EventGeneratorQueue implements InitializingBean
/**
* Procedure to enqueue the callback functions that creates an event.
- * @param maker Callback function that creates an event.
+ * @param eventProducer Callback function that creates an event.
*/
- public void accept(Callable> maker)
+ @Override
+ public void accept(Callable>> eventProducer)
{
- EventInMaking eventInMaking = new EventInMaking(maker);
+ EventInMaking eventInMaking = new EventInMaking(eventProducer);
queue.offer(eventInMaking);
enqueueThreadPoolExecutor.execute(() -> {
try
@@ -102,78 +97,67 @@ public class EventGeneratorQueue implements InitializingBean
*/
private Runnable createListener()
{
- return new Runnable()
- {
- @Override
- public void run()
+ return () -> {
+ try
{
- try
+ while (!Thread.interrupted())
{
- while (!Thread.interrupted())
+ try
{
- try
- {
- EventInMaking eventInMaking = queue.take();
- RepoEvent> event = eventInMaking.getEventWhenReady();
- if (event != null)
- {
- event2MessageProducer.send(event);
- }
- }
- catch (Exception e)
- {
- LOGGER.error("Unexpected error while dequeuing and sending repository event" + e);
- }
+ queue.take().getEventWhenReady().ifPresent(event -> event2MessageProducer.send(event));
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Unexpected error while dequeuing and sending repository event " + e);
}
}
- finally
- {
- LOGGER.warn("Unexpected: rescheduling the listener thread.");
- dequeueThreadPoolExecutor.execute(listener);
- }
+ }
+ finally
+ {
+ LOGGER.warn("Unexpected: rescheduling the listener thread.");
+ dequeueThreadPoolExecutor.execute(listener);
}
};
}
- /*
+ /**
* Simple class that makes events and allows to retrieve them when ready
*/
private static class EventInMaking
{
- private Callable> maker;
+ private final Callable>> maker;
private volatile RepoEvent> event;
- private CountDownLatch latch;
-
- public EventInMaking(Callable> maker)
+ private final CountDownLatch latch;
+
+ public EventInMaking(Callable>> maker)
{
this.maker = maker;
this.latch = new CountDownLatch(1);
}
-
+
public void make() throws Exception
{
try
{
- event = maker.call();
+ event = maker.call().orElse(null);
}
- finally
+ finally
{
latch.countDown();
}
}
-
- public RepoEvent> getEventWhenReady() throws InterruptedException
+
+ public Optional> getEventWhenReady() throws InterruptedException
{
latch.await(30, TimeUnit.SECONDS);
- return event;
+ return Optional.ofNullable(event);
}
-
+
@Override
public String toString()
{
return maker.toString();
}
}
-
}
diff --git a/repository/src/main/java/org/alfresco/repo/event2/EventConsolidator.java b/repository/src/main/java/org/alfresco/repo/event2/EventConsolidator.java
index dafa4abdc1..faa4a05214 100644
--- a/repository/src/main/java/org/alfresco/repo/event2/EventConsolidator.java
+++ b/repository/src/main/java/org/alfresco/repo/event2/EventConsolidator.java
@@ -25,60 +25,66 @@
*/
package org.alfresco.repo.event2;
-import java.io.Serializable;
-import java.time.ZonedDateTime;
import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.alfresco.model.ContentModel;
-import org.alfresco.repo.event.v1.model.ContentInfo;
import org.alfresco.repo.event.v1.model.DataAttributes;
import org.alfresco.repo.event.v1.model.EventData;
import org.alfresco.repo.event.v1.model.EventType;
-import org.alfresco.repo.event.v1.model.NodeResource;
-import org.alfresco.repo.event.v1.model.NodeResource.Builder;
import org.alfresco.repo.event.v1.model.RepoEvent;
-import org.alfresco.repo.event.v1.model.UserInfo;
-import org.alfresco.service.cmr.repository.ChildAssociationRef;
-import org.alfresco.service.cmr.repository.NodeRef;
+import org.alfresco.repo.event.v1.model.Resource;
+import org.alfresco.service.cmr.repository.EntityRef;
import org.alfresco.service.namespace.QName;
/**
* Encapsulates events occurred in a single transaction.
*
- * @author Jamal Kaabi-Mofrad
+ * @param [ entity (e.g. node, child association, peer association) reference type
+ * @param entity resource type
*/
-public class EventConsolidator implements EventSupportedPolicies
+public abstract class EventConsolidator][
{
- private final NodeResourceHelper helper;
protected final Deque eventTypes;
- private final List aspectsAdded;
- private final List aspectsRemoved;
+ protected final NodeResourceHelper helper;
+ protected REF entityReference;
+ protected RES resource;
- protected NodeRef nodeRef;
-
- private NodeResource.Builder resourceBuilder;
- private Map propertiesBefore;
- private Map propertiesAfter;
- private QName nodeType;
- private QName nodeTypeBefore;
- private List primaryHierarchyBefore;
- private boolean resourceBeforeAllFieldsNull = true;
-
- public EventConsolidator(NodeResourceHelper nodeResourceHelper)
+ public EventConsolidator(final REF entityReference, final NodeResourceHelper nodeResourceHelper)
{
- this.helper = nodeResourceHelper;
this.eventTypes = new ArrayDeque<>();
- this.aspectsAdded = new ArrayList<>();
- this.aspectsRemoved = new ArrayList<>();
+ this.entityReference = entityReference;
+ this.helper = nodeResourceHelper;
+ }
+
+ /**
+ * Get entity (e.g. node, peer association, child association) type.
+ *
+ * @return QName the peer association type
+ */
+ public abstract QName getEntityType();
+
+ /**
+ * Whether the entity has been created and then deleted, e.g. a temporary node.
+ *
+ * @return {@code true} if the node has been created and then deleted, otherwise false
+ */
+ public abstract boolean isTemporaryEntity();
+
+ /**
+ * Get a derived event for a transaction.
+ *
+ * @return a derived event type
+ */
+ protected abstract EventType getDerivedEvent();
+
+ /**
+ * Get event types.
+ *
+ * @return Deque queue of event types
+ */
+ public Deque getEventTypes()
+ {
+ return eventTypes;
}
/**
@@ -87,424 +93,30 @@ public class EventConsolidator implements EventSupportedPolicies
* @param eventInfo the object holding the event information
* @return the {@link RepoEvent} instance
*/
- public RepoEvent> getRepoEvent(EventInfo eventInfo)
+ public RepoEvent> getRepoEvent(EventInfo eventInfo)
{
- NodeResource resource = buildNodeResource();
EventType eventType = getDerivedEvent();
- DataAttributes eventData = buildEventData(eventInfo, resource, eventType);
+ DataAttributes eventData = buildEventData(eventInfo, resource, eventType);
- return RepoEvent.>builder()
- .setId(eventInfo.getId())
- .setSource(eventInfo.getSource())
- .setTime(eventInfo.getTimestamp())
- .setType(eventType.getType())
- .setData(eventData)
- .setDataschema(EventJSONSchema.getSchemaV1(eventType))
- .build();
- }
-
- protected DataAttributes buildEventData(EventInfo eventInfo, NodeResource resource, EventType eventType)
- {
- EventData.Builder eventDataBuilder = EventData.builder()
- .setEventGroupId(eventInfo.getTxnId())
- .setResource(resource);
-
- if (eventType == EventType.NODE_UPDATED)
- {
- eventDataBuilder.setResourceBefore(buildNodeResourceBeforeDelta(resource));
- }
-
- return eventDataBuilder.build();
+ return RepoEvent.>builder()
+ .setId(eventInfo.getId())
+ .setSource(eventInfo.getSource())
+ .setTime(eventInfo.getTimestamp())
+ .setType(eventType.getType())
+ .setData(eventData)
+ .setDataschema(EventJSONSchema.getSchemaV1(eventType))
+ .build();
}
/**
- * Creates a builder instance if absent or {@code forceUpdate} is requested.
- * It also, sets the required fields.
- *
- * @param nodeRef the nodeRef in the txn
- * @param forceUpdate if {@code true}, will get the latest node info and ignores
- * the existing builder object.
+ * Provides primary event data.
*/
- protected void createBuilderIfAbsent(NodeRef nodeRef, boolean forceUpdate)
+ protected DataAttributes buildEventData(EventInfo eventInfo, RES resource, EventType eventType)
{
- if (resourceBuilder == null || forceUpdate)
- {
- this.resourceBuilder = helper.createNodeResourceBuilder(nodeRef);
- this.nodeRef = nodeRef;
- this.nodeType = helper.getNodeType(nodeRef);
- }
+ return EventData.builder()
+ .setEventGroupId(eventInfo.getTxnId())
+ .setResource(resource)
+ .build();
}
-
- /**
- * Creates a builder instance if absent, and sets the required fields.
- *
- * @param nodeRef the nodeRef in the txn
- */
- protected void createBuilderIfAbsent(NodeRef nodeRef)
- {
- createBuilderIfAbsent(nodeRef, false);
- }
-
- @Override
- public void onCreateNode(ChildAssociationRef childAssocRef)
- {
- eventTypes.add(EventType.NODE_CREATED);
-
- NodeRef nodeRef = childAssocRef.getChildRef();
- createBuilderIfAbsent(nodeRef);
-
- // Sometimes onCreateNode policy is out of order
- this.propertiesBefore = null;
- setBeforeProperties(Collections.emptyMap());
- setAfterProperties(helper.getProperties(nodeRef));
- }
-
- @Override
- public void onMoveNode(ChildAssociationRef oldChildAssocRef, ChildAssociationRef newChildAssocRef)
- {
- eventTypes.add(EventType.NODE_UPDATED);
-
- createBuilderIfAbsent(newChildAssocRef.getChildRef());
- setBeforePrimaryHierarchy(helper.getPrimaryHierarchy(oldChildAssocRef.getParentRef(), true));
- }
-
- @Override
- public void onSetNodeType(NodeRef nodeRef, QName before, QName after)
- {
- eventTypes.add(EventType.NODE_UPDATED);
- nodeTypeBefore = before;
- createBuilderIfAbsent(nodeRef);
- }
-
- @Override
- public void onUpdateProperties(NodeRef nodeRef, Map before, Map after)
- {
- eventTypes.add(EventType.NODE_UPDATED);
-
- // Sometime we don't get the 'before', so just use the latest
- if (before.isEmpty() && this.propertiesAfter != null)
- {
- before = this.propertiesAfter;
- }
- createBuilderIfAbsent(nodeRef);
- setBeforeProperties(before);
- setAfterProperties(after);
- }
-
- @Override
- public void beforeDeleteNode(NodeRef nodeRef)
- {
- eventTypes.add(EventType.NODE_DELETED);
- createBuilderIfAbsent(nodeRef, false);
- }
-
- @Override
- public void onAddAspect(NodeRef nodeRef, QName aspectTypeQName)
- {
- eventTypes.add(EventType.NODE_UPDATED);
- addAspect(aspectTypeQName);
- createBuilderIfAbsent(nodeRef);
- }
-
- void addAspect(QName aspectTypeQName)
- {
- if (aspectsRemoved.contains(aspectTypeQName))
- {
- aspectsRemoved.remove(aspectTypeQName);
- }
- else
- {
- aspectsAdded.add(aspectTypeQName);
- }
- }
-
- @Override
- public void onRemoveAspect(NodeRef nodeRef, QName aspectTypeQName)
- {
- eventTypes.add(EventType.NODE_UPDATED);
- removeAspect(aspectTypeQName);
- createBuilderIfAbsent(nodeRef);
- }
-
- void removeAspect(QName aspectTypeQName)
- {
- if (aspectsAdded.contains(aspectTypeQName))
- {
- aspectsAdded.remove(aspectTypeQName);
- }
- else
- {
- aspectsRemoved.add(aspectTypeQName);
- }
- }
-
- private void setAfterProperties(Map after)
- {
- propertiesAfter = after;
- }
-
- private void setBeforeProperties(Map before)
- {
- // Don't overwrite the original value if there are multiple calls.
- if (propertiesBefore == null)
- {
- propertiesBefore = before;
- }
- }
-
- private void setBeforePrimaryHierarchy(List before)
- {
- // Don't overwrite the original value if there are multiple calls.
- if (primaryHierarchyBefore == null)
- {
- primaryHierarchyBefore = before;
- }
- }
-
- private NodeResource buildNodeResource()
- {
- if (resourceBuilder == null)
- {
- return null;
- }
-
- if (eventTypes.getLast() != EventType.NODE_DELETED)
- {
- // Check the node still exists.
- // This could happen in tests where a node is deleted before the afterCommit code is
- // executed (For example, see ThumbnailServiceImplTest#testIfNodesExistsAfterCreateThumbnail).
- if (helper.nodeExists(nodeRef))
- {
- // We are setting the details at the end of the Txn by getting the latest info
- createBuilderIfAbsent(nodeRef, true);
- }
- }
- // Now create an instance of NodeResource
- return resourceBuilder.build();
- }
-
- protected NodeResource buildNodeResourceBeforeDelta(NodeResource after)
- {
- if (after == null)
- {
- return null;
- }
-
- Builder builder = NodeResource.builder();
-
- ZonedDateTime modifiedAt = null;
- Map changedPropsBefore = getBeforeMapChanges(propertiesBefore, propertiesAfter);
- if (!changedPropsBefore.isEmpty())
- {
- // Set only the changed properties
- Map mappedProps = helper.mapToNodeProperties(changedPropsBefore);
- if (!mappedProps.isEmpty())
- {
- builder.setProperties(mappedProps);
- resourceBeforeAllFieldsNull = false;
- }
-
- Map> localizedProps =helper.getLocalizedPropertiesBefore(changedPropsBefore, after);
- if (!localizedProps.isEmpty())
- {
- builder.setLocalizedProperties(localizedProps);
- resourceBeforeAllFieldsNull = false;
- }
-
- String name = (String) changedPropsBefore.get(ContentModel.PROP_NAME);
- if (name != null)
- {
- builder.setName(name);
- resourceBeforeAllFieldsNull = false;
- }
- ContentInfo contentInfo = helper.getContentInfo(changedPropsBefore);
- if (contentInfo != null)
- {
- builder.setContent(contentInfo);
- resourceBeforeAllFieldsNull = false;
- }
-
- UserInfo modifier = helper.getUserInfo((String) changedPropsBefore.get(ContentModel.PROP_MODIFIER));
- if (modifier != null)
- {
- builder.setModifiedByUser(modifier);
- resourceBeforeAllFieldsNull = false;
- }
- modifiedAt =
- helper.getZonedDateTime((Date) changedPropsBefore.get(ContentModel.PROP_MODIFIED));
- }
-
- // Handle case where the content does not exist on the propertiesBefore
- if (propertiesBefore != null && !propertiesBefore.containsKey(ContentModel.PROP_CONTENT) &&
- propertiesAfter != null && propertiesAfter.containsKey(ContentModel.PROP_CONTENT))
- {
- builder.setContent(new ContentInfo());
- resourceBeforeAllFieldsNull = false;
- }
-
- Set aspectsBefore = getMappedAspectsBefore(after.getAspectNames());
- if (!aspectsBefore.isEmpty())
- {
- builder.setAspectNames(aspectsBefore);
- resourceBeforeAllFieldsNull = false;
- }
-
- if (primaryHierarchyBefore != null && !primaryHierarchyBefore.isEmpty())
- {
- builder.setPrimaryHierarchy(primaryHierarchyBefore);
- resourceBeforeAllFieldsNull = false;
- }
-
- if (nodeTypeBefore != null)
- {
- builder.setNodeType(helper.getQNamePrefixString(nodeTypeBefore));
- resourceBeforeAllFieldsNull = false;
- }
-
- // Only set modifiedAt if one of the other fields is also not null
- if (modifiedAt != null && !resourceBeforeAllFieldsNull)
- {
- builder.setModifiedAt(modifiedAt);
- }
-
- return builder.build();
- }
-
- Set getMappedAspectsBefore(Set currentAspects)
- {
- if (currentAspects == null)
- {
- currentAspects = Collections.emptySet();
- }
- if (hasChangedAspect())
- {
- Set removed = helper.mapToNodeAspects(aspectsRemoved);
- Set added = helper.mapToNodeAspects(aspectsAdded);
-
- Set before = new HashSet<>();
- if (!removed.isEmpty() || !added.isEmpty())
- {
- before = new HashSet<>(currentAspects);
- if (!removed.isEmpty())
- {
- // Add all the removed aspects from the current list
- before.addAll(removed);
- }
- if (!added.isEmpty())
- {
- // Remove all the added aspects from the current list
- before.removeAll(added);
- }
- }
- return before;
- }
- return Collections.emptySet();
- }
-
- private boolean hasChangedAspect()
- {
- if ((aspectsRemoved.isEmpty() && aspectsAdded.isEmpty()) ||
- org.apache.commons.collections.CollectionUtils.isEqualCollection(aspectsAdded, aspectsRemoved))
- {
- return false;
- }
- return true;
- }
-
- private Map getBeforeMapChanges(Map before, Map after)
- {
- if (before == null)
- {
- return Collections.emptyMap();
- }
- if (after == null)
- {
- after = Collections.emptyMap();
- }
- // Get before values that changed
- Map beforeDelta = new HashMap<>(before);
- Map afterDelta = new HashMap<>(after);
-
- beforeDelta.entrySet().removeAll(after.entrySet());
-
- // Add nulls for before properties
- Set beforeKeys = before.keySet();
- Set newKeys = afterDelta.keySet();
- newKeys.removeAll(beforeKeys);
-
- for (K key : newKeys)
- {
- beforeDelta.put(key, null);
- }
-
- return beforeDelta;
- }
-
- /**
- * @return a derived event for a transaction.
- */
- private EventType getDerivedEvent()
- {
- if (isTemporaryNode())
- {
- // This event will be filtered out, but we set the correct
- // event type anyway for debugging purposes
- return EventType.NODE_DELETED;
- }
- else if (eventTypes.contains(EventType.NODE_CREATED))
- {
- return EventType.NODE_CREATED;
- }
- else if (eventTypes.getLast() == EventType.NODE_DELETED)
- {
- return EventType.NODE_DELETED;
- }
- else
- {
- // Default to first event
- return eventTypes.getFirst();
- }
- }
-
- /**
- * Whether or not the node has been created and then deleted, i.e. a temporary node.
- *
- * @return {@code true} if the node has been created and then deleted, otherwise false
- */
- public boolean isTemporaryNode()
- {
- return eventTypes.contains(EventType.NODE_CREATED) && eventTypes.getLast() == EventType.NODE_DELETED;
- }
-
- public QName getNodeType()
- {
- return nodeType;
- }
-
- public Deque getEventTypes()
- {
- return eventTypes;
- }
-
-
- public List getAspectsAdded()
- {
- return aspectsAdded;
- }
-
- public List getAspectsRemoved()
- {
- return aspectsRemoved;
- }
-
- public boolean isResourceBeforeAllFieldsNull()
- {
- return resourceBeforeAllFieldsNull;
- }
-
- protected void setResourceBeforeAllFieldsNull(boolean resourceBeforeAllFieldsNull){
- this.resourceBeforeAllFieldsNull = resourceBeforeAllFieldsNull;
- }
-
}
diff --git a/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java b/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java
index 0981104a14..6e59a5f498 100644
--- a/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java
+++ b/repository/src/main/java/org/alfresco/repo/event2/EventGenerator.java
@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
- * Copyright (C) 2005 - 2020 Alfresco Software Limited
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -34,13 +34,16 @@ import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.alfresco.repo.domain.node.NodeDAO;
import org.alfresco.repo.domain.node.TransactionEntity;
+import org.alfresco.repo.event.v1.model.DataAttributes;
import org.alfresco.repo.event.v1.model.EventType;
import org.alfresco.repo.event.v1.model.RepoEvent;
+import org.alfresco.repo.event.v1.model.Resource;
import org.alfresco.repo.event2.filter.ChildAssociationTypeFilter;
import org.alfresco.repo.event2.filter.EventFilterRegistry;
import org.alfresco.repo.event2.filter.EventUserFilter;
@@ -63,9 +66,11 @@ import org.alfresco.repo.policy.PolicyComponent;
import org.alfresco.repo.policy.ServiceBehaviourBinding;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
+import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.cmr.dictionary.DictionaryService;
import org.alfresco.service.cmr.repository.AssociationRef;
import org.alfresco.service.cmr.repository.ChildAssociationRef;
+import org.alfresco.service.cmr.repository.EntityRef;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.security.PersonService;
@@ -74,6 +79,7 @@ import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.PropertyCheck;
+import org.alfresco.util.TriPredicate;
import org.alfresco.util.transaction.TransactionListenerAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -102,8 +108,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
private PersonService personService;
protected NodeResourceHelper nodeResourceHelper;
protected NodeDAO nodeDAO;
-
- private EventGeneratorQueue eventGeneratorQueue;
+ private EventSender eventSender;
private NodeTypeFilter nodeTypeFilter;
private ChildAssociationTypeFilter childAssociationTypeFilter;
private EventUserFilter userFilter;
@@ -139,7 +144,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
PropertyCheck.mandatory(this, "personService", personService);
PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper);
PropertyCheck.mandatory(this, "nodeDAO", nodeDAO);
- PropertyCheck.mandatory(this, "eventGeneratorQueue", eventGeneratorQueue);
+ PropertyCheck.mandatory(this, "eventSender", eventSender);
this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter();
this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter();
@@ -188,7 +193,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
if (behaviours == null)
{
- behaviours = new HashSet();
+ behaviours = new HashSet<>();
afterPropertiesSet();
bindBehaviours();
@@ -230,8 +235,6 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
this.descriptorService = descriptorService;
}
- // To make IntelliJ stop complaining about unused method!
- @SuppressWarnings("unused")
public void setEventFilterRegistry(EventFilterRegistry eventFilterRegistry)
{
this.eventFilterRegistry = eventFilterRegistry;
@@ -252,9 +255,14 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
this.nodeResourceHelper = nodeResourceHelper;
}
- public void setEventGeneratorQueue(EventGeneratorQueue eventGeneratorQueue)
+ public void setEventSender(EventSender eventSender)
{
- this.eventGeneratorQueue = eventGeneratorQueue;
+ this.eventSender = eventSender;
+ }
+
+ public EventSender getEventSender()
+ {
+ return eventSender;
}
@Override
@@ -323,9 +331,9 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
getEventConsolidator(associationRef).beforeDeleteAssociation(associationRef);
}
- protected EventConsolidator createEventConsolidator()
+ protected NodeEventConsolidator createEventConsolidator()
{
- return new EventConsolidator(nodeResourceHelper);
+ return new NodeEventConsolidator(nodeResourceHelper);
}
protected ChildAssociationEventConsolidator createChildAssociationEventConsolidator(
@@ -370,13 +378,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
disableBehaviours(behaviours);
}
- protected void disableBehaviours(Set bindedBehaviours)
+ protected void disableBehaviours(Set boundBehaviours)
{
- if (bindedBehaviours != null)
+ if (boundBehaviours != null)
{
- bindedBehaviours.forEach(behaviour -> {
- behaviour.disable();
- });
+ boundBehaviours.forEach(Behaviour::disable);
}
}
@@ -385,30 +391,28 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
enableBehaviours(behaviours);
}
- protected void enableBehaviours(Set bindedBehaviours)
+ protected void enableBehaviours(Set boundBehaviours)
{
- if (bindedBehaviours != null)
+ if (boundBehaviours != null)
{
- bindedBehaviours.forEach(behaviour -> {
- behaviour.enable();
- });
+ boundBehaviours.forEach(Behaviour::enable);
}
}
/**
- * @return the {@link EventConsolidator} for the supplied {@code nodeRef} from
+ * @return the {@link NodeEventConsolidator} for the supplied {@code nodeRef} from
* the current transaction context.
*/
- protected EventConsolidator getEventConsolidator(NodeRef nodeRef)
+ protected NodeEventConsolidator getEventConsolidator(NodeRef nodeRef)
{
Consolidators consolidators = getTxnConsolidators(transactionListener);
- Map nodeEvents = consolidators.getNodes();
+ Map nodeEvents = consolidators.getNodes();
if (nodeEvents.isEmpty())
{
AlfrescoTransactionSupport.bindListener(transactionListener);
}
- EventConsolidator eventConsolidator = nodeEvents.get(nodeRef);
+ NodeEventConsolidator eventConsolidator = nodeEvents.get(nodeRef);
if (eventConsolidator == null)
{
eventConsolidator = createEventConsolidator();
@@ -430,7 +434,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
}
/**
- * @return the {@link EventConsolidator} for the supplied {@code childAssociationRef} from
+ * @return the {@link ChildAssociationEventConsolidator} for the supplied {@code childAssociationRef} from
* the current transaction context.
*/
private ChildAssociationEventConsolidator getEventConsolidator(ChildAssociationRef childAssociationRef)
@@ -452,7 +456,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
}
/**
- * @return the {@link EventConsolidator} for the supplied {@code peerAssociationRef} from
+ * @return the {@link PeerAssociationEventConsolidator} for the supplied {@code peerAssociationRef} from
* the current transaction context.
*/
private PeerAssociationEventConsolidator getEventConsolidator(AssociationRef peerAssociationRef)
@@ -506,7 +510,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
{
return;
}
- behaviours = new HashSet();
+ behaviours = new HashSet<>();
bindBehaviours();
}
@@ -521,32 +525,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
@Override
public void afterCommit()
{
- if(isTransactionCommitted())
+ if (isTransactionCommitted())
{
try
{
- final Consolidators consolidators = getTxnConsolidators(this);
-
- // Node events
- for (Map.Entry entry : consolidators.getNodes().entrySet())
- {
- EventConsolidator eventConsolidator = entry.getValue();
- sendEvent(entry.getKey(), eventConsolidator);
- }
-
- // Child assoc events
- for (Map.Entry entry : consolidators.getChildAssocs().entrySet())
- {
- ChildAssociationEventConsolidator eventConsolidator = entry.getValue();
- sendEvent(entry.getKey(), eventConsolidator);
- }
-
- // Peer assoc events
- for (Map.Entry entry : consolidators.getPeerAssocs().entrySet())
- {
- PeerAssociationEventConsolidator eventConsolidator = entry.getValue();
- sendEvent(entry.getKey(), eventConsolidator);
- }
+ sendEvents();
}
catch (Exception e)
{
@@ -556,12 +539,6 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
}
}
- protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator)
- {
- EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
- eventGeneratorQueue.accept(()-> createEvent(nodeRef, consolidator, eventInfo));
- }
-
/**
* @return true if a node transaction is not only active, but also committed with modifications.
* This means that a {@link TransactionEntity} object was created.
@@ -571,115 +548,154 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
return nodeDAO.getCurrentTransactionCommitTime() != null;
}
- private RepoEvent> createEvent(NodeRef nodeRef, EventConsolidator consolidator, EventInfo eventInfo)
+ private void sendEvents()
{
- String user = eventInfo.getPrincipal();
+ final Consolidators consolidators = getTxnConsolidators(this);
- if (consolidator.isTemporaryNode())
+ // Node events
+ for (Map.Entry entry : consolidators.getNodes().entrySet())
{
- if (LOGGER.isTraceEnabled())
- {
- LOGGER.trace("Ignoring temporary node: " + nodeRef);
- }
- return null;
+ sendEvent(entry.getKey(), entry.getValue());
}
- // Get the repo event before the filtering,
- // so we can take the latest node info into account
- final RepoEvent> event = consolidator.getRepoEvent(eventInfo);
-
- final QName nodeType = consolidator.getNodeType();
- if (isFiltered(nodeType, user))
+ // Child assoc events
+ for (Map.Entry entry : consolidators.getChildAssocs().entrySet())
{
- if (LOGGER.isTraceEnabled())
- {
- LOGGER.trace("EventFilter - Excluding node: '" + nodeRef + "' of type: '"
- + ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString())
- + "' created by: " + user);
- }
- return null;
+ sendEvent(entry.getKey(), entry.getValue());
}
- if (event.getType().equals(EventType.NODE_UPDATED.getType()) && consolidator.isResourceBeforeAllFieldsNull())
+ // Peer assoc events
+ for (Map.Entry entry : consolidators.getPeerAssocs().entrySet())
{
- if (LOGGER.isTraceEnabled())
- {
- LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeRef);
- }
- return null;
+ sendEvent(entry.getKey(), entry.getValue());
}
+ }
- logEvent(event, consolidator.getEventTypes());
- return event;
+ protected void sendEvent(NodeRef nodeRef, NodeEventConsolidator consolidator)
+ {
+ sendEvent(nodeRef, consolidator, nodeToEventEligibilityVerifier());
}
protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
{
- EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
- eventGeneratorQueue.accept(()-> createEvent(eventInfo, childAssociationRef, consolidator));
- }
-
- private RepoEvent> createEvent(EventInfo eventInfo, ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator)
- {
- String user = eventInfo.getPrincipal();
- if (consolidator.isTemporaryChildAssociation())
- {
- if (LOGGER.isTraceEnabled())
- {
- LOGGER.trace("Ignoring temporary child association: " + childAssociationRef);
- }
- return null;
- }
-
- // Get the repo event before the filtering,
- // so we can take the latest association info into account
- final RepoEvent> event = consolidator.getRepoEvent(eventInfo);
-
- final QName childAssocType = consolidator.getChildAssocType();
- if (isFilteredChildAssociation(childAssocType, user))
- {
- if (LOGGER.isTraceEnabled())
- {
- LOGGER.trace("EventFilter - Excluding child association: '" + childAssociationRef + "' of type: '"
- + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
- + "' created by: " + user);
- }
- return null;
- } else if (childAssociationRef.isPrimary())
- {
- if (LOGGER.isTraceEnabled())
- {
- LOGGER.trace("EventFilter - Excluding primary child association: '" + childAssociationRef + "' of type: '"
- + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
- + "' created by: " + user);
- }
- return null;
- }
-
- logEvent(event, consolidator.getEventTypes());
- return event;
+ sendEvent(childAssociationRef, consolidator, childAssociationToEventEligibilityVerifier());
}
protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
{
- EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
- eventGeneratorQueue.accept(()-> createEvent(eventInfo, peerAssociationRef, consolidator));
+ sendEvent(peerAssociationRef, consolidator, null);
}
- private RepoEvent> createEvent(EventInfo eventInfo, AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator)
+ /**
+ * Handles all kinds of events and sends them within dedicated transaction.
+ *
+ * @param entityReference - reference to an entity (e.g. node, child association, peer association)
+ * @param eventConsolidator - object encapsulating events occurred in a transaction
+ * @param entityToEventEligibilityVerifier - allows to verify if entity is eligible to generate an even. If null no verification is necessary
+ * @param ][ - entity reference type (e.g. {@link NodeRef}, {@link AssociationRef}, {@link ChildAssociationRef})
+ * @param - event consolidator type - extension of {@link EventConsolidator}
+ */
+ private ][> void sendEvent(
+ final REF entityReference, final CON eventConsolidator, final TriPredicate][ entityToEventEligibilityVerifier)
{
- if (consolidator.isTemporaryPeerAssociation())
+ final EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser());
+ transactionService.getRetryingTransactionHelper().doInTransaction((RetryingTransactionCallback) () -> {
+ eventSender.accept(() -> createEvent(entityReference, eventConsolidator, eventInfo, entityToEventEligibilityVerifier));
+ return null;
+ }, true, true);
+ }
+
+ /**
+ * Creates events from various kinds of entities.
+ *
+ * @param entityReference - reference to an entity (e.g. node, child association, peer association)
+ * @param eventConsolidator - object encapsulating events occurred in a transaction
+ * @param eventInfo - object holding the event information
+ * @param entityToEventEligibilityVerifier - allows to verify if entity is eligible to generate an even. If null no verification is necessary
+ * @param ][ - entity reference type (e.g. {@link NodeRef}, {@link AssociationRef}, {@link ChildAssociationRef})
+ * @param - event consolidator type - extension of {@link EventConsolidator}
+ */
+ private ][> Optional> createEvent(
+ final REF entityReference, final CON eventConsolidator, final EventInfo eventInfo,
+ final TriPredicate][ entityToEventEligibilityVerifier)
+ {
+ if (eventConsolidator.isTemporaryEntity())
{
if (LOGGER.isTraceEnabled())
{
- LOGGER.trace("Ignoring temporary peer association: " + peerAssociationRef);
+ LOGGER.trace("Ignoring temporary entity: " + entityReference);
}
- return null;
+ return Optional.empty();
}
- RepoEvent> event = consolidator.getRepoEvent(eventInfo);
- logEvent(event, consolidator.getEventTypes());
- return event;
+ // get the repo event before verifying entity eligibility to generate event, so we can take the latest node info into account
+ final RepoEvent extends DataAttributes extends Resource>> event = eventConsolidator.getRepoEvent(eventInfo);
+
+ // verify if entity is eligible to generate an event
+ if (entityToEventEligibilityVerifier != null && !entityToEventEligibilityVerifier.test(entityReference, eventConsolidator, eventInfo))
+ {
+ return Optional.empty();
+ }
+
+ logEvent(event, eventConsolidator.getEventTypes());
+ return Optional.of(event);
+ }
+
+ private TriPredicate nodeToEventEligibilityVerifier()
+ {
+ return (nodeReference, eventConsolidator, eventInfo) -> {
+ final String user = eventInfo.getPrincipal();
+ final QName nodeType = eventConsolidator.getEntityType();
+ if (isFiltered(nodeType, user))
+ {
+ if (LOGGER.isTraceEnabled())
+ {
+ LOGGER.trace("EventFilter - Excluding node: '" + nodeReference + "' of type: '"
+ + ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString())
+ + "' created by: " + user);
+ }
+ return false;
+ }
+
+ if (eventConsolidator.isEventTypeEqualTo(EventType.NODE_UPDATED) && eventConsolidator.isResourceBeforeAllFieldsNull())
+ {
+ if (LOGGER.isTraceEnabled())
+ {
+ LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeReference);
+ }
+ return false;
+ }
+ return true;
+ };
+ }
+
+ private TriPredicate childAssociationToEventEligibilityVerifier()
+ {
+ return (childAssociationReference, eventConsolidator, eventInfo) -> {
+ final String user = eventInfo.getPrincipal();
+ final QName childAssocType = eventConsolidator.getEntityType();
+ if (isFilteredChildAssociation(childAssocType, user))
+ {
+ if (LOGGER.isTraceEnabled())
+ {
+ LOGGER.trace("EventFilter - Excluding child association: '" + childAssociationReference + "' of type: '"
+ + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
+ + "' created by: " + user);
+ }
+ return false;
+ }
+ else if (childAssociationReference.isPrimary())
+ {
+ if (LOGGER.isTraceEnabled())
+ {
+ LOGGER.trace("EventFilter - Excluding primary child association: '" + childAssociationReference + "' of type: '"
+ + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString())
+ + "' created by: " + user);
+ }
+ return false;
+ }
+ return true;
+ };
}
private void logEvent(RepoEvent> event, Deque listOfEvents)
@@ -692,14 +708,13 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
}
}
-
protected static class Consolidators
{
- private Map nodes;
+ private Map nodes;
private Map childAssocs;
private Map peerAssocs;
- public Map getNodes()
+ public Map getNodes()
{
if (nodes == null)
{
diff --git a/repository/src/main/java/org/alfresco/repo/event2/EventSender.java b/repository/src/main/java/org/alfresco/repo/event2/EventSender.java
new file mode 100644
index 0000000000..798c66f111
--- /dev/null
+++ b/repository/src/main/java/org/alfresco/repo/event2/EventSender.java
@@ -0,0 +1,43 @@
+/*
+ * #%L
+ * Alfresco Repository
+ * %%
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
+ * %%
+ * This file is part of the Alfresco software.
+ * If the software was purchased under a paid Alfresco license, the terms of
+ * the paid license agreement will prevail. Otherwise, the software is
+ * provided under the following open source license terms:
+ *
+ * Alfresco is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Alfresco is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with Alfresco. If not, see .
+ * #L%
+ */
+package org.alfresco.repo.event2;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+import org.alfresco.repo.event.v1.model.RepoEvent;
+
+/**
+ * Interface representing an asynchronous event sender.
+ */
+public interface EventSender
+{
+ /**
+ * Accepts a callback function creating an event and sends this event to specified destination.
+ * @param eventProducer - callback function that creates an event
+ */
+ void accept(Callable>> eventProducer);
+}
diff --git a/repository/src/main/java/org/alfresco/repo/event2/NodeEventConsolidator.java b/repository/src/main/java/org/alfresco/repo/event2/NodeEventConsolidator.java
new file mode 100644
index 0000000000..ea38b64e26
--- /dev/null
+++ b/repository/src/main/java/org/alfresco/repo/event2/NodeEventConsolidator.java
@@ -0,0 +1,481 @@
+/*
+ * #%L
+ * Alfresco Repository
+ * %%
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
+ * %%
+ * This file is part of the Alfresco software.
+ * If the software was purchased under a paid Alfresco license, the terms of
+ * the paid license agreement will prevail. Otherwise, the software is
+ * provided under the following open source license terms:
+ *
+ * Alfresco is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Alfresco is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with Alfresco. If not, see .
+ * #L%
+ */
+package org.alfresco.repo.event2;
+
+import java.io.Serializable;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.alfresco.model.ContentModel;
+import org.alfresco.repo.event.v1.model.ContentInfo;
+import org.alfresco.repo.event.v1.model.DataAttributes;
+import org.alfresco.repo.event.v1.model.EventData;
+import org.alfresco.repo.event.v1.model.EventType;
+import org.alfresco.repo.event.v1.model.NodeResource;
+import org.alfresco.repo.event.v1.model.NodeResource.Builder;
+import org.alfresco.repo.event.v1.model.RepoEvent;
+import org.alfresco.repo.event.v1.model.UserInfo;
+import org.alfresco.service.cmr.repository.ChildAssociationRef;
+import org.alfresco.service.cmr.repository.NodeRef;
+import org.alfresco.service.namespace.QName;
+
+/**
+ * Encapsulates node events occurred in a single transaction.
+ *
+ * @author Jamal Kaabi-Mofrad
+ */
+public class NodeEventConsolidator extends EventConsolidator implements EventSupportedPolicies
+{
+ private final List aspectsAdded;
+ private final List aspectsRemoved;
+
+ private NodeResource.Builder resourceBuilder;
+ private Map propertiesBefore;
+ private Map propertiesAfter;
+ private QName nodeType;
+ private QName nodeTypeBefore;
+ private List primaryHierarchyBefore;
+ private boolean resourceBeforeAllFieldsNull = true;
+
+ public NodeEventConsolidator(NodeResourceHelper nodeResourceHelper)
+ {
+ super(null, nodeResourceHelper);
+ this.aspectsAdded = new ArrayList<>();
+ this.aspectsRemoved = new ArrayList<>();
+ }
+
+ @Override
+ public RepoEvent> getRepoEvent(EventInfo eventInfo)
+ {
+ resource = buildNodeResource();
+ return super.getRepoEvent(eventInfo);
+ }
+
+ @Override
+ protected DataAttributes buildEventData(EventInfo eventInfo, NodeResource resource, EventType eventType)
+ {
+ EventData.Builder eventDataBuilder = EventData.builder()
+ .setEventGroupId(eventInfo.getTxnId())
+ .setResource(resource);
+
+ if (eventType == EventType.NODE_UPDATED)
+ {
+ eventDataBuilder.setResourceBefore(buildNodeResourceBeforeDelta(resource));
+ }
+
+ return eventDataBuilder.build();
+ }
+
+ /**
+ * Creates a builder instance if absent or {@code forceUpdate} is requested.
+ * It also, sets the required fields.
+ *
+ * @param nodeRef the nodeRef in the txn
+ * @param forceUpdate if {@code true}, will get the latest node info and ignores
+ * the existing builder object.
+ */
+ protected void createBuilderIfAbsent(NodeRef nodeRef, boolean forceUpdate)
+ {
+ if (resourceBuilder == null || forceUpdate)
+ {
+ this.resourceBuilder = helper.createNodeResourceBuilder(nodeRef);
+ this.entityReference = nodeRef;
+ this.nodeType = helper.getNodeType(nodeRef);
+ }
+ }
+
+ /**
+ * Creates a builder instance if absent, and sets the required fields.
+ *
+ * @param nodeRef the nodeRef in the txn
+ */
+ protected void createBuilderIfAbsent(NodeRef nodeRef)
+ {
+ createBuilderIfAbsent(nodeRef, false);
+ }
+
+ @Override
+ public void onCreateNode(ChildAssociationRef childAssocRef)
+ {
+ eventTypes.add(EventType.NODE_CREATED);
+
+ NodeRef nodeRef = childAssocRef.getChildRef();
+ createBuilderIfAbsent(nodeRef);
+
+ // Sometimes onCreateNode policy is out of order
+ this.propertiesBefore = null;
+ setBeforeProperties(Collections.emptyMap());
+ setAfterProperties(helper.getProperties(nodeRef));
+ }
+
+ @Override
+ public void onMoveNode(ChildAssociationRef oldChildAssocRef, ChildAssociationRef newChildAssocRef)
+ {
+ eventTypes.add(EventType.NODE_UPDATED);
+
+ createBuilderIfAbsent(newChildAssocRef.getChildRef());
+ setBeforePrimaryHierarchy(helper.getPrimaryHierarchy(oldChildAssocRef.getParentRef(), true));
+ }
+
+ @Override
+ public void onSetNodeType(NodeRef nodeRef, QName before, QName after)
+ {
+ eventTypes.add(EventType.NODE_UPDATED);
+ nodeTypeBefore = before;
+ createBuilderIfAbsent(nodeRef);
+ }
+
+ @Override
+ public void onUpdateProperties(NodeRef nodeRef, Map before, Map after)
+ {
+ eventTypes.add(EventType.NODE_UPDATED);
+
+ // Sometimes we don't get the 'before', so just use the latest
+ if (before.isEmpty() && this.propertiesAfter != null)
+ {
+ before = this.propertiesAfter;
+ }
+ createBuilderIfAbsent(nodeRef);
+ setBeforeProperties(before);
+ setAfterProperties(after);
+ }
+
+ @Override
+ public void beforeDeleteNode(NodeRef nodeRef)
+ {
+ eventTypes.add(EventType.NODE_DELETED);
+ createBuilderIfAbsent(nodeRef, false);
+ }
+
+ @Override
+ public void onAddAspect(NodeRef nodeRef, QName aspectTypeQName)
+ {
+ eventTypes.add(EventType.NODE_UPDATED);
+ addAspect(aspectTypeQName);
+ createBuilderIfAbsent(nodeRef);
+ }
+
+ void addAspect(QName aspectTypeQName)
+ {
+ if (aspectsRemoved.contains(aspectTypeQName))
+ {
+ aspectsRemoved.remove(aspectTypeQName);
+ }
+ else
+ {
+ aspectsAdded.add(aspectTypeQName);
+ }
+ }
+
+ @Override
+ public void onRemoveAspect(NodeRef nodeRef, QName aspectTypeQName)
+ {
+ eventTypes.add(EventType.NODE_UPDATED);
+ removeAspect(aspectTypeQName);
+ createBuilderIfAbsent(nodeRef);
+ }
+
+ void removeAspect(QName aspectTypeQName)
+ {
+ if (aspectsAdded.contains(aspectTypeQName))
+ {
+ aspectsAdded.remove(aspectTypeQName);
+ }
+ else
+ {
+ aspectsRemoved.add(aspectTypeQName);
+ }
+ }
+
+ private void setAfterProperties(Map after)
+ {
+ propertiesAfter = after;
+ }
+
+ private void setBeforeProperties(Map before)
+ {
+ // Don't overwrite the original value if there are multiple calls.
+ if (propertiesBefore == null)
+ {
+ propertiesBefore = before;
+ }
+ }
+
+ private void setBeforePrimaryHierarchy(List before)
+ {
+ // Don't overwrite the original value if there are multiple calls.
+ if (primaryHierarchyBefore == null)
+ {
+ primaryHierarchyBefore = before;
+ }
+ }
+
+ private NodeResource buildNodeResource()
+ {
+ if (resourceBuilder == null)
+ {
+ return null;
+ }
+
+ if (eventTypes.getLast() != EventType.NODE_DELETED)
+ {
+ // Check the node still exists.
+ // This could happen in tests where a node is deleted before the afterCommit code is
+ // executed (For example, see ThumbnailServiceImplTest#testIfNodesExistsAfterCreateThumbnail).
+ if (helper.nodeExists(entityReference))
+ {
+ // We are setting the details at the end of the Txn by getting the latest info
+ createBuilderIfAbsent(entityReference, true);
+ }
+ }
+ // Now create an instance of NodeResource
+ return resourceBuilder.build();
+ }
+
+ protected NodeResource buildNodeResourceBeforeDelta(NodeResource after)
+ {
+ if (after == null)
+ {
+ return null;
+ }
+
+ Builder builder = NodeResource.builder();
+
+ ZonedDateTime modifiedAt = null;
+ Map changedPropsBefore = getBeforeMapChanges(propertiesBefore, propertiesAfter);
+ if (!changedPropsBefore.isEmpty())
+ {
+ // Set only the changed properties
+ Map mappedProps = helper.mapToNodeProperties(changedPropsBefore);
+ if (!mappedProps.isEmpty())
+ {
+ builder.setProperties(mappedProps);
+ resourceBeforeAllFieldsNull = false;
+ }
+
+ Map> localizedProps =helper.getLocalizedPropertiesBefore(changedPropsBefore, after);
+ if (!localizedProps.isEmpty())
+ {
+ builder.setLocalizedProperties(localizedProps);
+ resourceBeforeAllFieldsNull = false;
+ }
+
+ String name = (String) changedPropsBefore.get(ContentModel.PROP_NAME);
+ if (name != null)
+ {
+ builder.setName(name);
+ resourceBeforeAllFieldsNull = false;
+ }
+ ContentInfo contentInfo = helper.getContentInfo(changedPropsBefore);
+ if (contentInfo != null)
+ {
+ builder.setContent(contentInfo);
+ resourceBeforeAllFieldsNull = false;
+ }
+
+ UserInfo modifier = helper.getUserInfo((String) changedPropsBefore.get(ContentModel.PROP_MODIFIER));
+ if (modifier != null)
+ {
+ builder.setModifiedByUser(modifier);
+ resourceBeforeAllFieldsNull = false;
+ }
+ modifiedAt =
+ helper.getZonedDateTime((Date) changedPropsBefore.get(ContentModel.PROP_MODIFIED));
+ }
+
+ // Handle case where the content does not exist on the propertiesBefore
+ if (propertiesBefore != null && !propertiesBefore.containsKey(ContentModel.PROP_CONTENT) &&
+ propertiesAfter != null && propertiesAfter.containsKey(ContentModel.PROP_CONTENT))
+ {
+ builder.setContent(new ContentInfo());
+ resourceBeforeAllFieldsNull = false;
+ }
+
+ Set aspectsBefore = getMappedAspectsBefore(after.getAspectNames());
+ if (!aspectsBefore.isEmpty())
+ {
+ builder.setAspectNames(aspectsBefore);
+ resourceBeforeAllFieldsNull = false;
+ }
+
+ if (primaryHierarchyBefore != null && !primaryHierarchyBefore.isEmpty())
+ {
+ builder.setPrimaryHierarchy(primaryHierarchyBefore);
+ resourceBeforeAllFieldsNull = false;
+ }
+
+ if (nodeTypeBefore != null)
+ {
+ builder.setNodeType(helper.getQNamePrefixString(nodeTypeBefore));
+ resourceBeforeAllFieldsNull = false;
+ }
+
+ // Only set modifiedAt if one of the other fields is also not null
+ if (modifiedAt != null && !resourceBeforeAllFieldsNull)
+ {
+ builder.setModifiedAt(modifiedAt);
+ }
+
+ return builder.build();
+ }
+
+ Set getMappedAspectsBefore(Set currentAspects)
+ {
+ if (currentAspects == null)
+ {
+ currentAspects = Collections.emptySet();
+ }
+ if (hasChangedAspect())
+ {
+ Set removed = helper.mapToNodeAspects(aspectsRemoved);
+ Set added = helper.mapToNodeAspects(aspectsAdded);
+
+ Set before = new HashSet<>();
+ if (!removed.isEmpty() || !added.isEmpty())
+ {
+ before = new HashSet<>(currentAspects);
+ if (!removed.isEmpty())
+ {
+ // Add all the removed aspects from the current list
+ before.addAll(removed);
+ }
+ if (!added.isEmpty())
+ {
+ // Remove all the added aspects from the current list
+ before.removeAll(added);
+ }
+ }
+ return before;
+ }
+ return Collections.emptySet();
+ }
+
+ private boolean hasChangedAspect()
+ {
+ if ((aspectsRemoved.isEmpty() && aspectsAdded.isEmpty()) ||
+ org.apache.commons.collections.CollectionUtils.isEqualCollection(aspectsAdded, aspectsRemoved))
+ {
+ return false;
+ }
+ return true;
+ }
+
+ private Map getBeforeMapChanges(Map before, Map after)
+ {
+ if (before == null)
+ {
+ return Collections.emptyMap();
+ }
+ if (after == null)
+ {
+ after = Collections.emptyMap();
+ }
+ // Get before values that changed
+ Map beforeDelta = new HashMap<>(before);
+ Map afterDelta = new HashMap<>(after);
+
+ beforeDelta.entrySet().removeAll(after.entrySet());
+
+ // Add nulls for before properties
+ Set beforeKeys = before.keySet();
+ Set newKeys = afterDelta.keySet();
+ newKeys.removeAll(beforeKeys);
+
+ for (K key : newKeys)
+ {
+ beforeDelta.put(key, null);
+ }
+
+ return beforeDelta;
+ }
+
+ @Override
+ protected EventType getDerivedEvent()
+ {
+ if (isTemporaryEntity())
+ {
+ // This event will be filtered out, but we set the correct
+ // event type anyway for debugging purposes
+ return EventType.NODE_DELETED;
+ }
+ else if (eventTypes.contains(EventType.NODE_CREATED))
+ {
+ return EventType.NODE_CREATED;
+ }
+ else if (eventTypes.getLast() == EventType.NODE_DELETED)
+ {
+ return EventType.NODE_DELETED;
+ }
+ else
+ {
+ // Default to first event
+ return eventTypes.getFirst();
+ }
+ }
+
+ @Override
+ public boolean isTemporaryEntity()
+ {
+ return eventTypes.contains(EventType.NODE_CREATED) && eventTypes.getLast() == EventType.NODE_DELETED;
+ }
+
+ @Override
+ public QName getEntityType()
+ {
+ return nodeType;
+ }
+
+ public List getAspectsAdded()
+ {
+ return aspectsAdded;
+ }
+
+ public List getAspectsRemoved()
+ {
+ return aspectsRemoved;
+ }
+
+ public boolean isResourceBeforeAllFieldsNull()
+ {
+ return resourceBeforeAllFieldsNull;
+ }
+
+ public boolean isEventTypeEqualTo(EventType eventType)
+ {
+ return this.getDerivedEvent().getType().equals(eventType.getType());
+ }
+
+ protected void setResourceBeforeAllFieldsNull(boolean resourceBeforeAllFieldsNull){
+ this.resourceBeforeAllFieldsNull = resourceBeforeAllFieldsNull;
+ }
+}
diff --git a/repository/src/main/java/org/alfresco/repo/event2/PeerAssociationEventConsolidator.java b/repository/src/main/java/org/alfresco/repo/event2/PeerAssociationEventConsolidator.java
index 589c9c3061..d3be5b4657 100644
--- a/repository/src/main/java/org/alfresco/repo/event2/PeerAssociationEventConsolidator.java
+++ b/repository/src/main/java/org/alfresco/repo/event2/PeerAssociationEventConsolidator.java
@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
- * Copyright (C) 2005 - 2020 Alfresco Software Limited
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -25,14 +25,8 @@
*/
package org.alfresco.repo.event2;
-import java.util.ArrayDeque;
-import java.util.Deque;
-
-import org.alfresco.repo.event.v1.model.DataAttributes;
-import org.alfresco.repo.event.v1.model.EventData;
import org.alfresco.repo.event.v1.model.EventType;
import org.alfresco.repo.event.v1.model.PeerAssociationResource;
-import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.service.cmr.repository.AssociationRef;
import org.alfresco.service.namespace.QName;
@@ -41,50 +35,12 @@ import org.alfresco.service.namespace.QName;
*
* @author Sara Aspery
*/
-public class PeerAssociationEventConsolidator implements PeerAssociationEventSupportedPolicies
+public class PeerAssociationEventConsolidator extends EventConsolidator implements PeerAssociationEventSupportedPolicies
{
- private final Deque eventTypes;
-
- protected final AssociationRef associationRef;
-
- private PeerAssociationResource resource;
- private final NodeResourceHelper helper;
public PeerAssociationEventConsolidator(AssociationRef associationRef, NodeResourceHelper helper)
{
- this.eventTypes = new ArrayDeque<>();
- this.associationRef = associationRef;
- this.helper = helper;
- }
-
- /**
- * Builds and returns the {@link RepoEvent} instance.
- *
- * @param eventInfo the object holding the event information
- * @return the {@link RepoEvent} instance
- */
- public RepoEvent> getRepoEvent(EventInfo eventInfo)
- {
- EventType eventType = getDerivedEvent();
-
- DataAttributes eventData = buildEventData(eventInfo, resource);
-
- return RepoEvent.>builder()
- .setId(eventInfo.getId())
- .setSource(eventInfo.getSource())
- .setTime(eventInfo.getTimestamp())
- .setType(eventType.getType())
- .setData(eventData)
- .setDataschema(EventJSONSchema.getSchemaV1(eventType))
- .build();
- }
-
- protected DataAttributes buildEventData(EventInfo eventInfo, PeerAssociationResource resource)
- {
- return EventData.builder()
- .setEventGroupId(eventInfo.getTxnId())
- .setResource(resource)
- .build();
+ super(associationRef, helper);
}
/**
@@ -120,12 +76,10 @@ public class PeerAssociationEventConsolidator implements PeerAssociationEventSup
return new PeerAssociationResource(sourceId, targetId, assocType);
}
- /**
- * @return a derived event for a transaction.
- */
- private EventType getDerivedEvent()
+ @Override
+ protected EventType getDerivedEvent()
{
- if (isTemporaryPeerAssociation())
+ if (isTemporaryEntity())
{
// This event will be filtered out, but we set the correct
// event type anyway for debugging purposes
@@ -146,34 +100,16 @@ public class PeerAssociationEventConsolidator implements PeerAssociationEventSup
}
}
- /**
- * Whether or not the association has been created and then deleted, i.e. a temporary association.
- *
- * @return {@code true} if the association has been created and then deleted, otherwise false
- */
- public boolean isTemporaryPeerAssociation()
+ @Override
+ public boolean isTemporaryEntity()
{
return eventTypes.contains(EventType.PEER_ASSOC_CREATED) && eventTypes.getLast() == EventType.PEER_ASSOC_DELETED;
}
- /**
- * Get peer association type.
- *
- * @return QName the peer association type
- */
- public QName getAssocType()
+ @Override
+ public QName getEntityType()
{
- return associationRef.getTypeQName();
- }
-
- /**
- * Get event types.
- *
- * @return Deque queue of event types
- */
- public Deque getEventTypes()
- {
- return eventTypes;
+ return entityReference.getTypeQName();
}
}
diff --git a/repository/src/main/java/org/alfresco/repo/events/AbstractEventsService.java b/repository/src/main/java/org/alfresco/repo/events/AbstractEventsService.java
index af5d9b3777..d71f24f919 100644
--- a/repository/src/main/java/org/alfresco/repo/events/AbstractEventsService.java
+++ b/repository/src/main/java/org/alfresco/repo/events/AbstractEventsService.java
@@ -767,7 +767,7 @@ public abstract class AbstractEventsService extends TransactionListenerAdapter
void addEvents(List events)
{
- events.addAll(events);
+ this.events.addAll(events);
}
void clear()
diff --git a/repository/src/main/java/org/alfresco/util/TriPredicate.java b/repository/src/main/java/org/alfresco/util/TriPredicate.java
new file mode 100644
index 0000000000..ee99fffa81
--- /dev/null
+++ b/repository/src/main/java/org/alfresco/util/TriPredicate.java
@@ -0,0 +1,83 @@
+/*
+ * #%L
+ * Alfresco Repository
+ * %%
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
+ * %%
+ * This file is part of the Alfresco software.
+ * If the software was purchased under a paid Alfresco license, the terms of
+ * the paid license agreement will prevail. Otherwise, the software is
+ * provided under the following open source license terms:
+ *
+ * Alfresco is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Alfresco is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with Alfresco. If not, see .
+ * #L%
+ */
+package org.alfresco.util;
+
+import java.util.Objects;
+
+/**
+ * Represents a predicate (boolean-valued function) of three arguments. This is the three-arity specialization of Predicate.
+ * This is a functional interface whose functional method is test(Object, Object, Object).
+ *
+ * @param - type of the first argument to the predicate
+ * @param - type of the second argument the predicate
+ * @param - type of the third argument the predicate
+ */
+@FunctionalInterface
+public interface TriPredicate
+{
+
+ /**
+ * Evaluates this predicate on the given arguments.
+ *
+ * @param t - first input argument
+ * @param u - second input argument
+ * @param v - third input argument
+ * @return true if the input arguments match the predicate, otherwise false
+ */
+ boolean test(T t, U u, V v);
+
+ /**
+ * Creates a composed predicate that represents a logical AND of this predicate and another.
+ *
+ * @param other - predicate that will be logically-ANDed with this predicate
+ * @return composed predicate
+ */
+ default TriPredicate and(TriPredicate super T, ? super U, ? super V> other)
+ {
+ Objects.requireNonNull(other);
+ return (T t, U u, V v) -> test(t, u, v) && other.test(t, u, v);
+ }
+
+ /**
+ * @return a predicate that represents the logical negation of this predicate
+ */
+ default TriPredicate negate()
+ {
+ return (T t, U u, V v) -> !test(t, u, v);
+ }
+
+ /**
+ * Creates a composed predicate that represents a logical OR of this predicate and another.
+ *
+ * @param other - predicate that will be logically-ORed with this predicate
+ * @return composed predicate
+ */
+ default TriPredicate or(TriPredicate super T, ? super U, ? super V> other)
+ {
+ Objects.requireNonNull(other);
+ return (T t, U u, V v) -> test(t, u, v) || other.test(t, u, v);
+ }
+}
diff --git a/repository/src/main/resources/alfresco/events2-context.xml b/repository/src/main/resources/alfresco/events2-context.xml
index 9589b01d75..0fb6f3ef43 100644
--- a/repository/src/main/resources/alfresco/events2-context.xml
+++ b/repository/src/main/resources/alfresco/events2-context.xml
@@ -41,11 +41,9 @@
-
+
-
- ${repo.event2.enabled}
-
+
@@ -61,14 +59,13 @@
-
-
- ]
-
-
-
-
-
+
+
+
+
+
+
+
diff --git a/repository/src/main/resources/alfresco/repository.properties b/repository/src/main/resources/alfresco/repository.properties
index ef4dfa3927..78d1558dbc 100644
--- a/repository/src/main/resources/alfresco/repository.properties
+++ b/repository/src/main/resources/alfresco/repository.properties
@@ -1229,6 +1229,8 @@ repo.event2.filter.childAssocTypes=rn:rendition
repo.event2.filter.users=
# Topic name
repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2
+# Specifies if messages should be enqueued in in-memory queue or sent directly to the topic
+repo.event2.queue.skip=false
# Thread pool for async enqueue of repo events
repo.event2.queue.enqueueThreadPool.priority=1
repo.event2.queue.enqueueThreadPool.coreSize=8
diff --git a/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java b/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java
index 6d9db0f7ef..d7af0a25e7 100644
--- a/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java
+++ b/repository/src/test/java/org/alfresco/repo/event2/AbstractContextAwareRepoEvent.java
@@ -23,7 +23,6 @@
* along with Alfresco. If not, see .
* #L%
*/
-
package org.alfresco.repo.event2;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -72,6 +71,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -108,23 +108,24 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
@Autowired
protected CustomModelService customModelService;
- @Qualifier("descriptorComponent")
@Autowired
+ @Qualifier("descriptorComponent")
protected DescriptorService descriptorService;
@Autowired
- protected ObjectMapper event2ObjectMapper;
+ @Qualifier("event2ObjectMapper")
+ protected ObjectMapper objectMapper;
- @Autowired @Qualifier("eventGeneratorV2")
+ @Autowired
+ @Qualifier("eventGeneratorV2")
protected EventGenerator eventGenerator;
- @Autowired
- @Qualifier("eventGeneratorQueue")
- protected EventGeneratorQueue eventQueue;
-
@Autowired
private NamespaceDAO namespaceDAO;
+ @Value("${repo.event2.queue.skip}")
+ protected boolean skipEventQueue;
+
protected NodeRef rootNodeRef;
@BeforeClass
@@ -134,7 +135,7 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
}
@AfterClass
- public static void afterAll() throws Exception
+ public static void afterAll()
{
CAMEL_CONTEXT.stop();
}
@@ -144,7 +145,7 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest
{
if (!isCamelConfigured)
{
- dataFormat = new JacksonDataFormat(event2ObjectMapper, RepoEvent.class);
+ dataFormat = new JacksonDataFormat(objectMapper, RepoEvent.class);
configRoute();
isCamelConfigured = true;
}
diff --git a/repository/src/test/java/org/alfresco/repo/event2/DirectEventGeneratorTest.java b/repository/src/test/java/org/alfresco/repo/event2/DirectEventGeneratorTest.java
new file mode 100644
index 0000000000..cb0f640213
--- /dev/null
+++ b/repository/src/test/java/org/alfresco/repo/event2/DirectEventGeneratorTest.java
@@ -0,0 +1,100 @@
+/*
+ * #%L
+ * Alfresco Repository
+ * %%
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
+ * %%
+ * This file is part of the Alfresco software.
+ * If the software was purchased under a paid Alfresco license, the terms of
+ * the paid license agreement will prevail. Otherwise, the software is
+ * provided under the following open source license terms:
+ *
+ * Alfresco is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Alfresco is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with Alfresco. If not, see .
+ * #L%
+ */
+package org.alfresco.repo.event2;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.ContextHierarchy;
+
+@ContextHierarchy({
+ // Context hierarchy inherits context config from parent classes and extends it with TestConfig from this class
+ @ContextConfiguration(classes = DirectEventGeneratorTest.TestConfig.class)
+})
+public class DirectEventGeneratorTest extends EventGeneratorTest
+{
+ @Autowired
+ private InstantiatedBeansRegistry instantiatedBeansRegistry;
+
+ @Autowired
+ private EventSender directEventSender;
+
+ @BeforeClass
+ public static void beforeClass()
+ {
+ System.setProperty("repo.event2.queue.skip", "true");
+ }
+
+ @Test
+ public void testIfEnqueuingEventSenderIsNotInstantiated()
+ {
+ final Set instantiatedBeans = this.instantiatedBeansRegistry.getBeans();
+
+ assertTrue(skipEventQueue);
+ assertFalse(instantiatedBeans.contains("enqueuingEventSender"));
+ }
+
+ @Test
+ public void testIfDirectSenderIsSetInEventGenerator()
+ {
+ assertTrue(skipEventQueue);
+ assertEquals(directEventSender, eventGenerator.getEventSender());
+ }
+
+ @Configuration
+ public static class TestConfig
+ {
+ @Bean
+ public BeanPostProcessor instantiatedBeansRegistry()
+ {
+ return new InstantiatedBeansRegistry();
+ }
+ }
+
+ protected static class InstantiatedBeansRegistry implements BeanPostProcessor
+ {
+ private final Set registeredBeans = new HashSet<>();
+
+ @Override
+ public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException
+ {
+ registeredBeans.add(beanName);
+ return bean;
+ }
+
+ public Set getBeans() {
+ return registeredBeans;
+ }
+ }
+}
diff --git a/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventGeneratorTest.java b/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventGeneratorTest.java
new file mode 100644
index 0000000000..7e59600671
--- /dev/null
+++ b/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventGeneratorTest.java
@@ -0,0 +1,42 @@
+/*
+ * #%L
+ * Alfresco Repository
+ * %%
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
+ * %%
+ * This file is part of the Alfresco software.
+ * If the software was purchased under a paid Alfresco license, the terms of
+ * the paid license agreement will prevail. Otherwise, the software is
+ * provided under the following open source license terms:
+ *
+ * Alfresco is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Alfresco is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with Alfresco. If not, see .
+ * #L%
+ */
+package org.alfresco.repo.event2;
+
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public class EnqueuingEventGeneratorTest extends EventGeneratorTest
+{
+ @Autowired
+ private EventSender enqueuingEventSender;
+
+ @Test
+ public void testIfEnqueuingSenderIsSetInEventGenerator()
+ {
+ assertFalse(skipEventQueue);
+ assertEquals(enqueuingEventSender, eventGenerator.getEventSender());
+ }
+}
diff --git a/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorQueueUnitTest.java b/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventSenderUnitTest.java
similarity index 74%
rename from repository/src/test/java/org/alfresco/repo/event2/EventGeneratorQueueUnitTest.java
rename to repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventSenderUnitTest.java
index 241464ca42..ce8604f326 100644
--- a/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorQueueUnitTest.java
+++ b/repository/src/test/java/org/alfresco/repo/event2/EnqueuingEventSenderUnitTest.java
@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
- * Copyright (C) 2005 - 2021 Alfresco Software Limited
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -51,9 +52,9 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-public class EventGeneratorQueueUnitTest
+public class EnqueuingEventSenderUnitTest
{
- private EventGeneratorQueue queue;
+ private EnqueuingEventSender eventSender;
private Event2MessageProducer bus;
private ExecutorService enqueuePool;
@@ -64,15 +65,15 @@ public class EventGeneratorQueueUnitTest
@Before
public void setup()
{
- queue = new EventGeneratorQueue();
+ eventSender = new EnqueuingEventSender();
enqueuePool = newThreadPool();
- queue.setEnqueueThreadPoolExecutor(enqueuePool);
+ eventSender.setEnqueueThreadPoolExecutor(enqueuePool);
dequeuePool = newThreadPool();
- queue.setDequeueThreadPoolExecutor(dequeuePool);
+ eventSender.setDequeueThreadPoolExecutor(dequeuePool);
bus = mock(Event2MessageProducer.class);
- queue.setEvent2MessageProducer(bus);
+ eventSender.setEvent2MessageProducer(bus);
events = new HashMap<>();
@@ -83,6 +84,7 @@ public class EventGeneratorQueueUnitTest
public void teardown()
{
enqueuePool.shutdown();
+ dequeuePool.shutdown();
}
private void setupEventsRecorder()
@@ -104,7 +106,7 @@ public class EventGeneratorQueueUnitTest
@Test
public void shouldReceiveSingleQuickMessage() throws Exception
{
- queue.accept(messageWithDelay("A", 55l));
+ eventSender.accept(messageWithDelay("A", 55l));
sleep(150l);
@@ -115,7 +117,7 @@ public class EventGeneratorQueueUnitTest
@Test
public void shouldNotReceiveEventsWhenMessageIsNull() throws Exception
{
- queue.accept(() -> { return null; });
+ eventSender.accept(() -> { return null; });
sleep(150l);
@@ -124,9 +126,9 @@ public class EventGeneratorQueueUnitTest
@Test
public void shouldReceiveMultipleMessagesPreservingOrderScenarioOne() throws Exception {
- queue.accept(messageWithDelay("A", 0l));
- queue.accept(messageWithDelay("B", 100l));
- queue.accept(messageWithDelay("C", 200l));
+ eventSender.accept(messageWithDelay("A", 0l));
+ eventSender.accept(messageWithDelay("B", 100l));
+ eventSender.accept(messageWithDelay("C", 200l));
sleep(450l);
@@ -139,9 +141,9 @@ public class EventGeneratorQueueUnitTest
@Test
public void shouldReceiveMultipleMessagesPreservingOrderScenarioTwo() throws Exception
{
- queue.accept(messageWithDelay("A", 300l));
- queue.accept(messageWithDelay("B", 150l));
- queue.accept(messageWithDelay("C", 0l));
+ eventSender.accept(messageWithDelay("A", 300l));
+ eventSender.accept(messageWithDelay("B", 150l));
+ eventSender.accept(messageWithDelay("C", 0l));
sleep(950l);
@@ -154,10 +156,10 @@ public class EventGeneratorQueueUnitTest
@Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisoned() throws Exception
{
- queue.accept(messageWithDelay("A", 300l));
- queue.accept(() -> {throw new RuntimeException("Boom! (not to worry, this is a test)");});
- queue.accept(messageWithDelay("B", 55l));
- queue.accept(messageWithDelay("C", 0l));
+ eventSender.accept(messageWithDelay("A", 300l));
+ eventSender.accept(() -> {throw new RuntimeException("Boom! (not to worry, this is a test)");});
+ eventSender.accept(messageWithDelay("B", 55l));
+ eventSender.accept(messageWithDelay("C", 0l));
sleep(950l);
@@ -170,12 +172,12 @@ public class EventGeneratorQueueUnitTest
@Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisoned() throws Exception
{
- Callable> makerB = messageWithDelay("B", 55l);
- RepoEvent> messageB = makerB.call();
+ Callable>> makerB = messageWithDelay("B", 55l);
+ RepoEvent> messageB = makerB.call().get();
doThrow(new RuntimeException("Boom! (not to worry, this is a test)")).when(bus).send(messageB);
- queue.accept(messageWithDelay("A", 300l));
- queue.accept(makerB);
- queue.accept(messageWithDelay("C", 0l));
+ eventSender.accept(messageWithDelay("A", 300l));
+ eventSender.accept(makerB);
+ eventSender.accept(messageWithDelay("C", 0l));
sleep(950l);
@@ -187,10 +189,10 @@ public class EventGeneratorQueueUnitTest
@Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisonedWithError() throws Exception
{
- queue.accept(messageWithDelay("A", 300l));
- queue.accept(() -> {throw new OutOfMemoryError("Boom! (not to worry, this is a test)");});
- queue.accept(messageWithDelay("B", 55l));
- queue.accept(messageWithDelay("C", 0l));
+ eventSender.accept(messageWithDelay("A", 300l));
+ eventSender.accept(() -> {throw new OutOfMemoryError("Boom! (not to worry, this is a test)");});
+ eventSender.accept(messageWithDelay("B", 55l));
+ eventSender.accept(messageWithDelay("C", 0l));
sleep(950l);
@@ -203,12 +205,12 @@ public class EventGeneratorQueueUnitTest
@Test
public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisonedWithError() throws Exception
{
- Callable> makerB = messageWithDelay("B", 55l);
- RepoEvent> messageB = makerB.call();
+ Callable>> makerB = messageWithDelay("B", 55l);
+ RepoEvent> messageB = makerB.call().get();
doThrow(new OutOfMemoryError("Boom! (not to worry, this is a test)")).when(bus).send(messageB);
- queue.accept(messageWithDelay("A", 300l));
- queue.accept(makerB);
- queue.accept(messageWithDelay("C", 0l));
+ eventSender.accept(messageWithDelay("A", 300l));
+ eventSender.accept(makerB);
+ eventSender.accept(messageWithDelay("C", 0l));
sleep(950l);
@@ -217,33 +219,32 @@ public class EventGeneratorQueueUnitTest
assertEquals("C", recordedEvents.get(1).getId());
}
- private Callable> messageWithDelay(String id, long delay)
+ private Callable>> messageWithDelay(String id, long delay)
{
- Callable> res = new Callable>() {
-
+ return new Callable>>()
+ {
@Override
- public RepoEvent> call() throws Exception
+ public Optional> call() throws Exception
{
- if(delay != 0)
+ if (delay != 0)
{
- sleep(delay);
+ sleep(delay);
}
- return newRepoEvent(id);
- }
-
+ return Optional.of(newRepoEvent(id));
+ }
+
@Override
public String toString()
{
return id;
}
};
- return res;
}
private RepoEvent> newRepoEvent(String id)
{
RepoEvent> ev = events.get(id);
- if (ev!=null)
+ if (ev != null)
return ev;
ev = mock(RepoEvent.class);
diff --git a/repository/src/test/java/org/alfresco/repo/event2/EventConsolidatorUnitTest.java b/repository/src/test/java/org/alfresco/repo/event2/EventConsolidatorUnitTest.java
index 25fb458837..21300eb054 100644
--- a/repository/src/test/java/org/alfresco/repo/event2/EventConsolidatorUnitTest.java
+++ b/repository/src/test/java/org/alfresco/repo/event2/EventConsolidatorUnitTest.java
@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
- * Copyright (C) 2005 - 2020 Alfresco Software Limited
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -43,7 +43,7 @@ public class EventConsolidatorUnitTest
@Test
public void testGetMappedAspectsBeforeRemovedAndAddedEmpty()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
Set currentAspects = new HashSet<>();
currentAspects.add("cm:geographic");
@@ -57,7 +57,7 @@ public class EventConsolidatorUnitTest
@Test
public void testGetMappedAspectsBefore_AspectRemoved()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
Set currentAspects = new HashSet<>();
@@ -79,7 +79,7 @@ public class EventConsolidatorUnitTest
@Test
public void testGetMappedAspectsBefore_AspectAdded()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
Set currentAspects = new HashSet<>();
@@ -102,7 +102,7 @@ public class EventConsolidatorUnitTest
@Test
public void testGetMappedAspectsBefore_AspectAddedAndRemoved()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
Set currentAspects = new HashSet<>();
@@ -125,7 +125,7 @@ public class EventConsolidatorUnitTest
@Test
public void testGetMappedAspectsBefore_AspectRemovedAndAdded()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
@@ -150,7 +150,7 @@ public class EventConsolidatorUnitTest
@Test
public void testGetMappedAspectsBefore_AspectAddedTwiceRemovedOnce()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
@@ -178,7 +178,7 @@ public class EventConsolidatorUnitTest
@Test
public void testGetMappedAspectsBefore_AspectRemovedTwiceAddedOnce()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
@@ -206,7 +206,7 @@ public class EventConsolidatorUnitTest
@Test
public void testGetMappedAspectsBefore_FilteredAspectAdded()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASPECT_COPIEDFROM);
Set currentAspects = new HashSet<>();
@@ -227,7 +227,7 @@ public class EventConsolidatorUnitTest
@Test
public void testAddAspect()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
assertEquals(1, eventConsolidator.getAspectsAdded().size());
@@ -238,7 +238,7 @@ public class EventConsolidatorUnitTest
@Test
public void testRemoveAspect()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
assertEquals(0, eventConsolidator.getAspectsAdded().size());
@@ -249,7 +249,7 @@ public class EventConsolidatorUnitTest
@Test
public void testAddAspectRemoveAspect()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
@@ -260,7 +260,7 @@ public class EventConsolidatorUnitTest
@Test
public void testRemoveAspectAddAspect()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
@@ -271,7 +271,7 @@ public class EventConsolidatorUnitTest
@Test
public void testAddAspectTwiceRemoveAspectOnce()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
@@ -284,7 +284,7 @@ public class EventConsolidatorUnitTest
@Test
public void testAddAspectOnceRemoveAspectTwice()
{
- EventConsolidator eventConsolidator = new EventConsolidator(nodeResourceHelper);
+ NodeEventConsolidator eventConsolidator = new NodeEventConsolidator(nodeResourceHelper);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.addAspect(ContentModel.ASSOC_CONTAINS);
eventConsolidator.removeAspect(ContentModel.ASSOC_CONTAINS);
diff --git a/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorDisabledTest.java b/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorDisabledTest.java
index def763d33d..332facdc18 100644
--- a/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorDisabledTest.java
+++ b/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorDisabledTest.java
@@ -25,94 +25,16 @@
*/
package org.alfresco.repo.event2;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
import java.util.concurrent.TimeUnit;
-import jakarta.jms.Destination;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
-import jakarta.jms.Session;
-
import org.alfresco.model.ContentModel;
-import org.alfresco.repo.event.v1.model.RepoEvent;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQTextMessage;
import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public class EventGeneratorDisabledTest extends AbstractContextAwareRepoEvent
+public class EventGeneratorDisabledTest extends EventGeneratorTest
{
- private static final String EVENT2_TOPIC_NAME = "alfresco.repo.event2";
- private static final String BROKER_URL = "tcp://localhost:61616";
-
- @Autowired @Qualifier("event2ObjectMapper")
- private ObjectMapper objectMapper;
-
- @Autowired
- protected ObjectMapper event2ObjectMapper;
-
- //private EventGenerator eventGenerator;
-
- private ActiveMQConnection connection;
- protected List> receivedEvents;
-
-
- @Before
- public void setup() throws Exception
- {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
- connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic(EVENT2_TOPIC_NAME);
- MessageConsumer consumer = session.createConsumer(destination);
-
- receivedEvents = Collections.synchronizedList(new LinkedList<>());
- consumer.setMessageListener(new MessageListener()
- {
- @Override
- public void onMessage(Message message)
- {
- String text = getText(message);
- RepoEvent> event = toRepoEvent(text);
- receivedEvents.add(event);
- }
-
- private RepoEvent> toRepoEvent(String json)
- {
- try
- {
- return objectMapper.readValue(json, RepoEvent.class);
- } catch (Exception e)
- {
- e.printStackTrace();
- return null;
- }
- }
- });
- }
-
- @After
- public void shutdownTopicListener() throws Exception
- {
- connection.close();
- connection = null;
- }
-
@Test
- public void shouldNotReceiveEvent2EventsOnNodeCreation() throws Exception
+ public void shouldNotReceiveEvent2EventsOnNodeCreation()
{
if (eventGenerator.isEnabled())
{
@@ -127,44 +49,29 @@ public class EventGeneratorDisabledTest extends AbstractContextAwareRepoEvent
assertTrue(receivedEvents.size() == 0);
eventGenerator.enable();
-
}
@Test
- public void shouldReceiveEvent2EventsOnNodeCreation() throws Exception
+ @Override
+ public void shouldReceiveEvent2EventsOnNodeCreation()
{
if (!eventGenerator.isEnabled())
{
eventGenerator.enable();
}
-
- createNode(ContentModel.TYPE_CONTENT);
-
- Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 1);
-
- assertTrue(EVENT_CONTAINER.getEvents().size() == 1);
- assertTrue(receivedEvents.size() == 1);
- RepoEvent> sent = getRepoEvent(1);
- RepoEvent> received = receivedEvents.get(0);
- assertEventsEquals("Events are different!", sent, received);
- }
-
- private void assertEventsEquals(String message, RepoEvent> expected, RepoEvent> current)
- {
- assertEquals(message, expected, current);
+ super.shouldReceiveEvent2EventsOnNodeCreation();
}
- private static String getText(Message message)
+ @Test
+ @Override
+ public void shouldReceiveEvent2EventsInOrder()
{
- try
+ if (!eventGenerator.isEnabled())
{
- ActiveMQTextMessage am = (ActiveMQTextMessage) message;
- return am.getText();
- } catch (JMSException e)
- {
- return null;
+ eventGenerator.enable();
}
- }
+ super.shouldReceiveEvent2EventsInOrder();
+ }
}
diff --git a/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java b/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java
index 6cd3edc67e..4f8610b6d6 100644
--- a/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java
+++ b/repository/src/test/java/org/alfresco/repo/event2/EventGeneratorTest.java
@@ -39,7 +39,6 @@ import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import org.alfresco.model.ContentModel;
-import org.alfresco.repo.event.databind.ObjectMapperFactory;
import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.service.cmr.repository.NodeRef;
import org.apache.activemq.ActiveMQConnection;
@@ -51,24 +50,25 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.test.annotation.DirtiesContext;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public class EventGeneratorTest extends AbstractContextAwareRepoEvent
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+public abstract class EventGeneratorTest extends AbstractContextAwareRepoEvent
{
private static final String EVENT2_TOPIC_NAME = "alfresco.repo.event2";
-
- private static final long DUMP_BROKER_TIMEOUT = 50000000l;
-
- @Autowired @Qualifier("event2ObjectMapper")
- private ObjectMapper objectMapper;
+ private static final long DUMP_BROKER_TIMEOUT = 50000000L;
private ActiveMQConnection connection;
protected List> receivedEvents;
+ @BeforeClass
+ public static void beforeClass()
+ {
+ System.setProperty("repo.event2.queue.skip", "false");
+ }
+
@Before
public void startupTopicListener() throws Exception
{
@@ -116,11 +116,6 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
}
}
- protected ObjectMapper createObjectMapper()
- {
- return ObjectMapperFactory.createInstance();
- }
-
@After
public void shutdownTopicListener() throws Exception
{
@@ -129,30 +124,21 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
}
@Test
- public void shouldReceiveEvent2EventsOnNodeCreation() throws Exception
+ public void shouldReceiveEvent2EventsOnNodeCreation()
{
createNode(ContentModel.TYPE_CONTENT);
Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 1);
+ assertEquals(1, EVENT_CONTAINER.getEvents().size());
+ assertEquals(1, receivedEvents.size());
RepoEvent> sent = getRepoEvent(1);
RepoEvent> received = receivedEvents.get(0);
assertEventsEquals("Events are different!", sent, received);
}
- private void assertEventsEquals(String message, RepoEvent> expected, RepoEvent> current)
- {
- if (DEBUG)
- {
- System.err.println("XP: " + expected);
- System.err.println("CU: " + current);
- }
-
- assertEquals(message, expected, current);
- }
-
@Test
- public void shouldReceiveEvent2EventsInOrder() throws Exception
+ public void shouldReceiveEvent2EventsInOrder()
{
NodeRef nodeRef = createNode(ContentModel.TYPE_CONTENT);
updateNodeName(nodeRef, "TestFile-" + System.currentTimeMillis() + ".txt");
@@ -163,9 +149,20 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
RepoEvent> sentCreation = getRepoEvent(1);
RepoEvent> sentUpdate = getRepoEvent(2);
RepoEvent> sentDeletion = getRepoEvent(3);
- assertEquals("Expected create event!", sentCreation, (RepoEvent>) receivedEvents.get(0));
- assertEquals("Expected update event!", sentUpdate, (RepoEvent>) receivedEvents.get(1));
- assertEquals("Expected delete event!", sentDeletion, (RepoEvent>) receivedEvents.get(2));
+ assertEquals("Expected create event!", sentCreation, receivedEvents.get(0));
+ assertEquals("Expected update event!", sentUpdate, receivedEvents.get(1));
+ assertEquals("Expected delete event!", sentDeletion, receivedEvents.get(2));
+ }
+
+ private void assertEventsEquals(String message, RepoEvent> expected, RepoEvent> current)
+ {
+ if (DEBUG)
+ {
+ System.err.println("XP: " + expected);
+ System.err.println("CU: " + current);
+ }
+
+ assertEquals(message, expected, current);
}
private static String getText(Message message)
@@ -174,7 +171,8 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
{
ActiveMQTextMessage am = (ActiveMQTextMessage) message;
return am.getText();
- } catch (JMSException e)
+ }
+ catch (JMSException e)
{
return null;
}
@@ -206,7 +204,8 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
try
{
System.out.println("- " + queue.getQueueName());
- } catch (JMSException e)
+ }
+ catch (JMSException e)
{
e.printStackTrace();
}
@@ -219,7 +218,8 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
try
{
System.out.println("- " + topic.getTopicName());
- } catch (JMSException e)
+ }
+ catch (JMSException e)
{
e.printStackTrace();
}
@@ -230,18 +230,14 @@ public class EventGeneratorTest extends AbstractContextAwareRepoEvent
MessageConsumer consumer = session.createConsumer(destination);
System.out.println("\nListening to topic " + EVENT2_TOPIC_NAME + "...");
- consumer.setMessageListener(new MessageListener()
- {
- @Override
- public void onMessage(Message message)
- {
- String text = getText(message);
- System.out.println("Received message " + message + "\n" + text + "\n");
- }
+ consumer.setMessageListener(message -> {
+ String text = getText(message);
+ System.out.println("Received message " + message + "\n" + text + "\n");
});
Thread.sleep(timeout);
- } finally
+ }
+ finally
{
connection.close();
}
diff --git a/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2ITSuite.java b/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2ITSuite.java
index 82b0716c03..17292b3a22 100644
--- a/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2ITSuite.java
+++ b/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2ITSuite.java
@@ -2,7 +2,7 @@
* #%L
* Alfresco Repository
* %%
- * Copyright (C) 2005 - 2020 Alfresco Software Limited
+ * Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
@@ -35,7 +35,8 @@ import org.junit.runners.Suite.SuiteClasses;
DeleteRepoEventIT.class,
ChildAssociationRepoEventIT.class,
PeerAssociationRepoEventIT.class,
- EventGeneratorTest.class,
+ EnqueuingEventGeneratorTest.class,
+ DirectEventGeneratorTest.class,
EventGeneratorDisabledTest.class
})
public class RepoEvent2ITSuite
diff --git a/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2UnitSuite.java b/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2UnitSuite.java
index 9b73a48539..b012df9ff4 100644
--- a/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2UnitSuite.java
+++ b/repository/src/test/java/org/alfresco/repo/event2/RepoEvent2UnitSuite.java
@@ -23,7 +23,6 @@
* along with Alfresco. If not, see .
* #L%
*/
-
package org.alfresco.repo.event2;
import org.junit.runner.RunWith;
@@ -34,7 +33,7 @@ import org.junit.runners.Suite.SuiteClasses;
@SuiteClasses({ EventFilterUnitTest.class,
EventConsolidatorUnitTest.class,
EventJSONSchemaUnitTest.class,
- EventGeneratorQueueUnitTest.class,
+ EnqueuingEventSenderUnitTest.class,
NodeResourceHelperUnitTest.class
})
public class RepoEvent2UnitSuite