mirror of
				https://github.com/Alfresco/alfresco-community-repo.git
				synced 2025-10-22 15:12:38 +00:00 
			
		
		
		
	Compare commits
	
		
			4 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 9c1aa53819 | ||
|  | 885f4a49a5 | ||
|  | 9989ec3260 | ||
|  | 78ad14b696 | 
| @@ -7,7 +7,7 @@ | ||||
|    <parent> | ||||
|       <groupId>org.alfresco</groupId> | ||||
|       <artifactId>alfresco-community-repo</artifactId> | ||||
|       <version>8.424-SNAPSHOT</version> | ||||
|       <version>8.425</version> | ||||
|    </parent> | ||||
|  | ||||
|    <dependencies> | ||||
|   | ||||
| @@ -7,7 +7,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <properties> | ||||
|   | ||||
| @@ -9,6 +9,6 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo-packaging</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
| </project> | ||||
|   | ||||
| @@ -7,7 +7,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo-packaging</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <properties> | ||||
|   | ||||
| @@ -7,7 +7,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <profiles> | ||||
|   | ||||
| @@ -6,7 +6,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo-packaging</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <modules> | ||||
|   | ||||
| @@ -9,7 +9,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo-tests</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <developers> | ||||
|   | ||||
| @@ -9,7 +9,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo-tests</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <developers> | ||||
|   | ||||
| @@ -9,7 +9,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo-tests</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <developers> | ||||
|   | ||||
| @@ -9,7 +9,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo-tests</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <developers> | ||||
|   | ||||
| @@ -9,7 +9,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo-tests</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <developers> | ||||
|   | ||||
| @@ -7,7 +7,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo-packaging</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <properties> | ||||
|   | ||||
							
								
								
									
										4
									
								
								pom.xml
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								pom.xml
									
									
									
									
									
								
							| @@ -2,7 +2,7 @@ | ||||
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||||
|     <modelVersion>4.0.0</modelVersion> | ||||
|     <artifactId>alfresco-community-repo</artifactId> | ||||
|     <version>8.424-SNAPSHOT</version> | ||||
|     <version>8.425</version> | ||||
|     <packaging>pom</packaging> | ||||
|     <name>Alfresco Community Repo Parent</name> | ||||
|  | ||||
| @@ -116,7 +116,7 @@ | ||||
|         <connection>scm:git:https://github.com/Alfresco/alfresco-community-repo.git</connection> | ||||
|         <developerConnection>scm:git:https://github.com/Alfresco/alfresco-community-repo.git</developerConnection> | ||||
|         <url>https://github.com/Alfresco/alfresco-community-repo</url> | ||||
|         <tag>HEAD</tag> | ||||
|         <tag>8.425</tag> | ||||
|     </scm> | ||||
|  | ||||
|     <distributionManagement> | ||||
|   | ||||
| @@ -7,7 +7,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <dependencies> | ||||
|   | ||||
| @@ -7,7 +7,7 @@ | ||||
|     <parent> | ||||
|         <groupId>org.alfresco</groupId> | ||||
|         <artifactId>alfresco-community-repo</artifactId> | ||||
|         <version>8.424-SNAPSHOT</version> | ||||
|         <version>8.425</version> | ||||
|     </parent> | ||||
|  | ||||
|     <dependencies> | ||||
|   | ||||
| @@ -54,7 +54,6 @@ import org.alfresco.repo.policy.JavaBehaviour; | ||||
| import org.alfresco.repo.policy.PolicyComponent; | ||||
| import org.alfresco.repo.security.authentication.AuthenticationUtil; | ||||
| import org.alfresco.repo.transaction.AlfrescoTransactionSupport; | ||||
| import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; | ||||
| import org.alfresco.service.cmr.dictionary.DictionaryService; | ||||
| import org.alfresco.service.cmr.repository.AssociationRef; | ||||
| import org.alfresco.service.cmr.repository.ChildAssociationRef; | ||||
| @@ -90,11 +89,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | ||||
|     protected DictionaryService dictionaryService; | ||||
|     private DescriptorService descriptorService; | ||||
|     private EventFilterRegistry eventFilterRegistry; | ||||
|     private Event2MessageProducer event2MessageProducer; | ||||
|     private TransactionService transactionService; | ||||
|     private PersonService personService; | ||||
|     protected NodeResourceHelper nodeResourceHelper; | ||||
|  | ||||
|     private EventGeneratorQueue eventGeneratorQueue; | ||||
|     private NodeTypeFilter nodeTypeFilter; | ||||
|     private ChildAssociationTypeFilter childAssociationTypeFilter; | ||||
|     private EventUserFilter userFilter; | ||||
| @@ -109,10 +108,10 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | ||||
|         PropertyCheck.mandatory(this, "dictionaryService", dictionaryService); | ||||
|         PropertyCheck.mandatory(this, "descriptorService", descriptorService); | ||||
|         PropertyCheck.mandatory(this, "eventFilterRegistry", eventFilterRegistry); | ||||
|         PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer); | ||||
|         PropertyCheck.mandatory(this, "transactionService", transactionService); | ||||
|         PropertyCheck.mandatory(this, "personService", personService); | ||||
|         PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper); | ||||
|         PropertyCheck.mandatory(this, "eventGeneratorQueue", eventGeneratorQueue); | ||||
|  | ||||
|         this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter(); | ||||
|         this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter(); | ||||
| @@ -177,12 +176,6 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | ||||
|         this.eventFilterRegistry = eventFilterRegistry; | ||||
|     } | ||||
|  | ||||
|     @SuppressWarnings("unused") | ||||
|     public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer) | ||||
|     { | ||||
|         this.event2MessageProducer = event2MessageProducer; | ||||
|     } | ||||
|  | ||||
|     public void setTransactionService(TransactionService transactionService) | ||||
|     { | ||||
|         this.transactionService = transactionService; | ||||
| @@ -198,6 +191,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | ||||
|         this.nodeResourceHelper = nodeResourceHelper; | ||||
|     } | ||||
|  | ||||
|     public void setEventGeneratorQueue(EventGeneratorQueue eventGeneratorQueue) | ||||
|     { | ||||
|         this.eventGeneratorQueue = eventGeneratorQueue; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void onCreateNode(ChildAssociationRef childAssocRef) | ||||
|     { | ||||
| @@ -428,20 +426,26 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | ||||
|  | ||||
|         protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator) | ||||
|         { | ||||
|             EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); | ||||
|             eventGeneratorQueue.accept(()-> createEvent(nodeRef, consolidator, eventInfo)); | ||||
|         } | ||||
|  | ||||
|         private RepoEvent<?> createEvent(NodeRef nodeRef, EventConsolidator consolidator, EventInfo eventInfo) | ||||
|         { | ||||
|             String user = eventInfo.getPrincipal(); | ||||
|  | ||||
|             if (consolidator.isTemporaryNode()) | ||||
|             { | ||||
|                 if (LOGGER.isTraceEnabled()) | ||||
|                 { | ||||
|                     LOGGER.trace("Ignoring temporary node: " + nodeRef); | ||||
|                 } | ||||
|                 return; | ||||
|                 return null; | ||||
|             } | ||||
|  | ||||
|             final String user = AuthenticationUtil.getFullyAuthenticatedUser(); | ||||
|             // Get the repo event before the filtering, | ||||
|             // so we can take the latest node info into account | ||||
|             final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user)); | ||||
|  | ||||
|             final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo); | ||||
|  | ||||
|             final QName nodeType = consolidator.getNodeType(); | ||||
|             if (isFiltered(nodeType, user)) | ||||
| @@ -452,7 +456,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | ||||
|                             + ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString()) | ||||
|                             + "' created by: " + user); | ||||
|                 } | ||||
|                 return; | ||||
|                 return null; | ||||
|             } | ||||
|  | ||||
|             if (event.getType().equals(EventType.NODE_UPDATED.getType()) && consolidator.isResourceBeforeAllFieldsNull()) | ||||
| @@ -461,27 +465,34 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | ||||
|                 { | ||||
|                     LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeRef); | ||||
|                 } | ||||
|                 return; | ||||
|                 return null; | ||||
|             } | ||||
|  | ||||
|             logAndSendEvent(event, consolidator.getEventTypes()); | ||||
|             logEvent(event, consolidator.getEventTypes()); | ||||
|             return event; | ||||
|         } | ||||
|  | ||||
|         protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator) | ||||
|         { | ||||
|             EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); | ||||
|             eventGeneratorQueue.accept(()-> createEvent(eventInfo, childAssociationRef, consolidator)); | ||||
|         } | ||||
|  | ||||
|         private RepoEvent<?> createEvent(EventInfo eventInfo, ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator) | ||||
|         { | ||||
|             String user = eventInfo.getPrincipal(); | ||||
|             if (consolidator.isTemporaryChildAssociation()) | ||||
|             { | ||||
|                 if (LOGGER.isTraceEnabled()) | ||||
|                 { | ||||
|                     LOGGER.trace("Ignoring temporary child association: " + childAssociationRef); | ||||
|                 } | ||||
|                 return; | ||||
|                 return null; | ||||
|             } | ||||
|  | ||||
|             final String user = AuthenticationUtil.getFullyAuthenticatedUser(); | ||||
|             // Get the repo event before the filtering, | ||||
|             // so we can take the latest association info into account | ||||
|             final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user)); | ||||
|             final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo); | ||||
|  | ||||
|             final QName childAssocType = consolidator.getChildAssocType(); | ||||
|             if (isFilteredChildAssociation(childAssocType, user)) | ||||
| @@ -492,7 +503,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | ||||
|                             + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString()) | ||||
|                             + "' created by: " + user); | ||||
|                 } | ||||
|                 return; | ||||
|                 return null; | ||||
|             } else if (childAssociationRef.isPrimary()) | ||||
|             { | ||||
|                 if (LOGGER.isTraceEnabled()) | ||||
| @@ -501,13 +512,20 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | ||||
|                             + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString()) | ||||
|                             + "' created by: " + user); | ||||
|                 } | ||||
|                 return; | ||||
|                 return null; | ||||
|             } | ||||
|  | ||||
|             logAndSendEvent(event, consolidator.getEventTypes()); | ||||
|             logEvent(event, consolidator.getEventTypes()); | ||||
|             return event; | ||||
|         } | ||||
|  | ||||
|         protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator) | ||||
|         { | ||||
|             EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); | ||||
|             eventGeneratorQueue.accept(()-> createEvent(eventInfo, peerAssociationRef, consolidator)); | ||||
|         } | ||||
|  | ||||
|         private RepoEvent<?> createEvent(EventInfo eventInfo, AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator) | ||||
|         { | ||||
|             if (consolidator.isTemporaryPeerAssociation()) | ||||
|             { | ||||
| @@ -515,30 +533,21 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | ||||
|                 { | ||||
|                     LOGGER.trace("Ignoring temporary peer association: " + peerAssociationRef); | ||||
|                 } | ||||
|                 return; | ||||
|                 return null; | ||||
|             } | ||||
|  | ||||
|             final String user = AuthenticationUtil.getFullyAuthenticatedUser(); | ||||
|             // Get the repo event before the filtering, | ||||
|             // so we can take the latest association info into account | ||||
|             final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user)); | ||||
|  | ||||
|             logAndSendEvent(event, consolidator.getEventTypes()); | ||||
|             RepoEvent<?> event = consolidator.getRepoEvent(eventInfo); | ||||
|             logEvent(event, consolidator.getEventTypes()); | ||||
|             return event; | ||||
|         } | ||||
|  | ||||
|         protected void logAndSendEvent(RepoEvent<?> event, Deque<EventType> listOfEvents) | ||||
|         private void logEvent(RepoEvent<?> event, Deque<EventType> listOfEvents) | ||||
|         { | ||||
|             if (LOGGER.isTraceEnabled()) | ||||
|             { | ||||
|                 LOGGER.trace("List of Events:" + listOfEvents); | ||||
|                 LOGGER.trace("Sending event:" + event); | ||||
|             } | ||||
|             // Need to execute this in another read txn because Camel expects it | ||||
|             transactionService.getRetryingTransactionHelper().doInTransaction((RetryingTransactionCallback<Void>) () -> { | ||||
|                 event2MessageProducer.send(event); | ||||
|  | ||||
|                 return null; | ||||
|             }, true, false); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|   | ||||
| @@ -0,0 +1,179 @@ | ||||
| /* | ||||
|  * #%L | ||||
|  * Alfresco Repository | ||||
|  * %% | ||||
|  * Copyright (C) 2005 - 2021 Alfresco Software Limited | ||||
|  * %% | ||||
|  * This file is part of the Alfresco software. | ||||
|  * If the software was purchased under a paid Alfresco license, the terms of | ||||
|  * the paid license agreement will prevail.  Otherwise, the software is | ||||
|  * provided under the following open source license terms: | ||||
|  * | ||||
|  * Alfresco is free software: you can redistribute it and/or modify | ||||
|  * it under the terms of the GNU Lesser General Public License as published by | ||||
|  * the Free Software Foundation, either version 3 of the License, or | ||||
|  * (at your option) any later version. | ||||
|  * | ||||
|  * Alfresco is distributed in the hope that it will be useful, | ||||
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|  * GNU Lesser General Public License for more details. | ||||
|  * | ||||
|  * You should have received a copy of the GNU Lesser General Public License | ||||
|  * along with Alfresco. If not, see <http://www.gnu.org/licenses/>. | ||||
|  * #L% | ||||
|  */ | ||||
| package org.alfresco.repo.event2; | ||||
|  | ||||
| import java.util.concurrent.BlockingQueue; | ||||
| import java.util.concurrent.Callable; | ||||
| import java.util.concurrent.CountDownLatch; | ||||
| import java.util.concurrent.Executor; | ||||
| import java.util.concurrent.LinkedBlockingQueue; | ||||
| import java.util.concurrent.TimeUnit; | ||||
|  | ||||
| import org.alfresco.repo.event.v1.model.RepoEvent; | ||||
| import org.alfresco.util.PropertyCheck; | ||||
| import org.apache.commons.logging.Log; | ||||
| import org.apache.commons.logging.LogFactory; | ||||
| import org.springframework.beans.factory.InitializingBean; | ||||
|  | ||||
| /* | ||||
|  * This queue allows to create asynchronously the RepoEvent offloading the work to a ThreadPool but | ||||
|  * at the same time it preserves the order of the events | ||||
|  */ | ||||
| public class EventGeneratorQueue implements InitializingBean | ||||
| { | ||||
| 	protected static final Log LOGGER = LogFactory.getLog(EventGeneratorQueue.class); | ||||
|      | ||||
|     protected Executor enqueueThreadPoolExecutor; | ||||
|     protected Executor dequeueThreadPoolExecutor; | ||||
|     protected Event2MessageProducer event2MessageProducer; | ||||
|     protected BlockingQueue<EventInMaking> queue = new LinkedBlockingQueue<>(); | ||||
|     protected Runnable listener = createListener(); | ||||
|  | ||||
|     @Override | ||||
|     public void afterPropertiesSet() throws Exception | ||||
|     { | ||||
|         PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor); | ||||
|         PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor); | ||||
|         PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer); | ||||
|     } | ||||
|  | ||||
|     public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer) | ||||
|     { | ||||
|         this.event2MessageProducer = event2MessageProducer; | ||||
|     } | ||||
|      | ||||
|     public void setEnqueueThreadPoolExecutor(Executor enqueueThreadPoolExecutor) | ||||
|     { | ||||
|         this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor; | ||||
|     } | ||||
|      | ||||
|     public void setDequeueThreadPoolExecutor(Executor dequeueThreadPoolExecutor) | ||||
|     { | ||||
|         this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor; | ||||
|         dequeueThreadPoolExecutor.execute(listener); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Procedure to enqueue the callback functions that creates an event. | ||||
|      * @param maker Callback function that creates an event. | ||||
|      */ | ||||
|     public void accept(Callable<RepoEvent<?>> maker) | ||||
|     { | ||||
|         EventInMaking eventInMaking = new EventInMaking(maker); | ||||
|         queue.offer(eventInMaking); | ||||
|         enqueueThreadPoolExecutor.execute(() -> { | ||||
|             try | ||||
|             { | ||||
|                 eventInMaking.make(); | ||||
|             } | ||||
|             catch (Exception e) | ||||
|             { | ||||
|                 LOGGER.error("Unexpected error while enqueuing maker function for repository event" + e); | ||||
|             } | ||||
|         }); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Create listener task in charge of dequeuing and sending events ready to be sent. | ||||
|      * @return The task in charge of dequeuing and sending events ready to be sent. | ||||
|      */ | ||||
|     private Runnable createListener() | ||||
|     { | ||||
|         return new Runnable() | ||||
|         { | ||||
|             @Override | ||||
|             public void run() | ||||
|             { | ||||
|                 try  | ||||
|                 { | ||||
|                     while (!Thread.interrupted()) | ||||
|                     { | ||||
|                         try | ||||
|                         { | ||||
|                             EventInMaking eventInMaking = queue.take(); | ||||
|                             RepoEvent<?> event = eventInMaking.getEventWhenReady(); | ||||
|                             if (event != null) | ||||
|                             { | ||||
|                                 event2MessageProducer.send(event); | ||||
|                             } | ||||
|                         } | ||||
|                         catch (Exception e) | ||||
|                         { | ||||
|                             LOGGER.error("Unexpected error while dequeuing and sending repository event" + e); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|                 finally | ||||
|                 { | ||||
|                     LOGGER.warn("Unexpected: rescheduling the listener thread."); | ||||
|                     dequeueThreadPoolExecutor.execute(listener); | ||||
|                 } | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|     } | ||||
|  | ||||
|     /* | ||||
|      * Simple class that makes events and allows to retrieve them when ready | ||||
|      */ | ||||
|     private static class EventInMaking | ||||
|     { | ||||
|         private Callable<RepoEvent<?>> maker; | ||||
|         private volatile RepoEvent<?> event; | ||||
|         private CountDownLatch latch; | ||||
|          | ||||
|         public EventInMaking(Callable<RepoEvent<?>> maker) | ||||
|         { | ||||
|             this.maker = maker; | ||||
|             this.latch = new CountDownLatch(1); | ||||
|         } | ||||
|          | ||||
|         public void make() throws Exception | ||||
|         { | ||||
|             try | ||||
|             { | ||||
|                 event = maker.call(); | ||||
|             } | ||||
|             finally  | ||||
|             { | ||||
|                 latch.countDown(); | ||||
|             } | ||||
|         } | ||||
|          | ||||
|         public RepoEvent<?> getEventWhenReady() throws InterruptedException | ||||
|         { | ||||
|             latch.await(30, TimeUnit.SECONDS); | ||||
|             return event; | ||||
|         } | ||||
|          | ||||
|         @Override | ||||
|         public String toString() | ||||
|         { | ||||
|             return maker.toString(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
| } | ||||
| @@ -38,9 +38,10 @@ | ||||
|         <property name="dictionaryService" ref="dictionaryService"/> | ||||
|         <property name="descriptorService" ref="descriptorComponent"/> | ||||
|         <property name="eventFilterRegistry" ref="event2FilterRegistry"/> | ||||
|         <property name="event2MessageProducer" ref="event2MessageProducer"/> | ||||
|         <property name="transactionService" ref="transactionService"/> | ||||
|         <property name="personService" ref="personService"/> | ||||
|         <property name="nodeResourceHelper" ref="nodeResourceHelper"/> | ||||
|         <property name="eventGeneratorQueue" ref="eventGeneratorQueue"/> | ||||
|     </bean> | ||||
|  | ||||
|     <bean id="baseNodeResourceHelper" abstract="true"> | ||||
| @@ -54,7 +55,45 @@ | ||||
|  | ||||
|     <bean id="nodeResourceHelper" class="org.alfresco.repo.event2.NodeResourceHelper" parent="baseNodeResourceHelper"/> | ||||
|  | ||||
|     <bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2"> | ||||
|         <property name="nodeResourceHelper" ref="nodeResourceHelper"/> | ||||
|     <bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2"/> | ||||
|  | ||||
|     <bean id="eventGeneratorQueue" class="org.alfresco.repo.event2.EventGeneratorQueue" > | ||||
|         <property name="enqueueThreadPoolExecutor"> | ||||
|             <ref bean="eventAsyncEnqueueThreadPool" /> | ||||
|         </property> | ||||
|         <property name="dequeueThreadPoolExecutor"> | ||||
|             <ref bean="eventAsyncDequeueThreadPool" /> | ||||
|         </property> | ||||
|        <property name="event2MessageProducer" ref="event2MessageProducer"/> | ||||
|     </bean> | ||||
|  | ||||
|     <bean id="eventAsyncEnqueueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean"> | ||||
|         <property name="poolName"> | ||||
|             <value>eventAsyncEnqueueThreadPool</value> | ||||
|         </property> | ||||
|         <property name="corePoolSize"> | ||||
|             <value>${repo.event2.queue.enqueueThreadPool.coreSize}</value> | ||||
|         </property> | ||||
|         <property name="maximumPoolSize"> | ||||
|             <value>${repo.event2.queue.enqueueThreadPool.maximumSize}</value> | ||||
|         </property> | ||||
|         <property name="threadPriority"> | ||||
|             <value>${repo.event2.queue.enqueueThreadPool.priority}</value> | ||||
|         </property> | ||||
|     </bean> | ||||
|  | ||||
|     <bean id="eventAsyncDequeueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean"> | ||||
|         <property name="poolName"> | ||||
|             <value>eventAsyncDequeueThreadPool</value> | ||||
|         </property> | ||||
|         <property name="corePoolSize"> | ||||
|             <value>${repo.event2.queue.dequeueThreadPool.coreSize}</value> | ||||
|         </property> | ||||
|         <property name="maximumPoolSize"> | ||||
|             <value>${repo.event2.queue.dequeueThreadPool.maximumSize}</value> | ||||
|         </property> | ||||
|         <property name="threadPriority"> | ||||
|             <value>${repo.event2.queue.dequeueThreadPool.priority}</value> | ||||
|         </property> | ||||
|     </bean> | ||||
| </beans> | ||||
|   | ||||
| @@ -1207,6 +1207,15 @@ repo.event2.filter.childAssocTypes=rn:rendition | ||||
| repo.event2.filter.users=System, null | ||||
| # Topic name | ||||
| repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2 | ||||
| # Thread pool for async enqueue of repo events | ||||
| repo.event2.queue.enqueueThreadPool.priority=1 | ||||
| repo.event2.queue.enqueueThreadPool.coreSize=8 | ||||
| repo.event2.queue.enqueueThreadPool.maximumSize=10 | ||||
| # Thread pool for async dequeue and delivery of repo events | ||||
| repo.event2.queue.dequeueThreadPool.priority=1 | ||||
| repo.event2.queue.dequeueThreadPool.coreSize=1 | ||||
| repo.event2.queue.dequeueThreadPool.maximumSize=1 | ||||
|  | ||||
|  | ||||
| # MNT-21083 | ||||
| # --DELETE_NOT_EXISTS - default settings | ||||
|   | ||||
| @@ -30,6 +30,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; | ||||
| import static org.awaitility.Awaitility.await; | ||||
|  | ||||
| import java.util.ArrayList; | ||||
| import java.util.Collections; | ||||
| import java.util.List; | ||||
|  | ||||
| import javax.jms.ConnectionFactory; | ||||
| @@ -77,12 +78,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; | ||||
|  | ||||
| public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | ||||
| { | ||||
|     protected static final boolean            DEBUG = false; | ||||
|  | ||||
|     protected static final String             TEST_NAMESPACE  = "http://www.alfresco.org/test/ContextAwareRepoEvent"; | ||||
|     protected static final RepoEventContainer EVENT_CONTAINER = new RepoEventContainer(); | ||||
|  | ||||
|     private static final   String             BROKER_URL      = "tcp://localhost:61616"; | ||||
|     private static final   String             TOPIC_NAME      = "alfresco.repo.event2"; | ||||
|     private static final   String             CAMEL_ROUTE     = "jms:topic:" + TOPIC_NAME; | ||||
|     private static final   RepoEventContainer EVENT_CONTAINER = new RepoEventContainer(); | ||||
|     private static final   CamelContext       CAMEL_CONTEXT   = new DefaultCamelContext(); | ||||
|  | ||||
|     private static boolean isCamelConfigured; | ||||
| @@ -104,6 +107,13 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | ||||
|     @Autowired | ||||
|     protected ObjectMapper event2ObjectMapper; | ||||
|  | ||||
|     @Autowired @Qualifier("eventGeneratorV2") | ||||
|     protected EventGenerator eventGenerator; | ||||
|  | ||||
|     @Autowired | ||||
|     @Qualifier("eventGeneratorQueue") | ||||
|     protected EventGeneratorQueue eventQueue; | ||||
|  | ||||
|     protected NodeRef rootNodeRef; | ||||
|  | ||||
|     @BeforeClass | ||||
| @@ -141,6 +151,33 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | ||||
|             } | ||||
|             return nodeService.getRootNode(storeRef); | ||||
|         }); | ||||
|  | ||||
|         flushSpuriousEvents(); | ||||
|     } | ||||
|  | ||||
|     /* | ||||
|      * When running with an empty database some events related to the creation may | ||||
|      * creep up here making the test fails. After attempting several other | ||||
|      * strategies, a smart sleep seems to do the work. | ||||
|      */ | ||||
|     protected void flushSpuriousEvents() throws InterruptedException | ||||
|     { | ||||
|         int maxloops = 5; | ||||
|          | ||||
|         int count = maxloops; | ||||
|         do | ||||
|         { | ||||
|             Thread.sleep(165l); | ||||
|             if (EVENT_CONTAINER.isEmpty()) | ||||
|             { | ||||
|                 count--; | ||||
|             } else  | ||||
|             { | ||||
|                 EVENT_CONTAINER.reset(); | ||||
|                 count = maxloops; | ||||
|             } | ||||
|  | ||||
|         } while (count > 0); | ||||
|     } | ||||
|      | ||||
|     @After | ||||
| @@ -179,6 +216,16 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | ||||
|             propertyMap).getChildRef()); | ||||
|     } | ||||
|  | ||||
|     protected NodeRef updateNodeName(NodeRef nodeRef, String newName) | ||||
|     { | ||||
|         PropertyMap propertyMap = new PropertyMap(); | ||||
|         propertyMap.put(ContentModel.PROP_NAME, newName); | ||||
|         return retryingTransactionHelper.doInTransaction(() -> { | ||||
|             nodeService.addProperties(nodeRef, propertyMap); | ||||
|             return null; | ||||
|         }); | ||||
|     } | ||||
|  | ||||
|     protected void deleteNode(NodeRef nodeRef) | ||||
|     { | ||||
|         retryingTransactionHelper.doInTransaction(() -> { | ||||
| @@ -376,13 +423,18 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | ||||
|  | ||||
|     public static class RepoEventContainer implements Processor | ||||
|     { | ||||
|         private final List<RepoEvent<?>> events = new ArrayList<>(); | ||||
|         private final List<RepoEvent<?>> events = Collections.synchronizedList(new ArrayList<>()); | ||||
|  | ||||
|         @Override | ||||
|         public void process(Exchange exchange) | ||||
|         { | ||||
|             Object object = exchange.getIn().getBody(); | ||||
|             events.add((RepoEvent<?>) object); | ||||
|  | ||||
|             if (DEBUG) | ||||
|             { | ||||
|                 System.err.println("XX: "+object); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         public List<RepoEvent<?>> getEvents() | ||||
| @@ -404,6 +456,12 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | ||||
|         { | ||||
|             events.clear(); | ||||
|         } | ||||
|  | ||||
|         public boolean isEmpty() | ||||
|         { | ||||
|             return events.isEmpty(); | ||||
|         } | ||||
|  | ||||
|     } | ||||
|  | ||||
|     @SuppressWarnings("unchecked") | ||||
|   | ||||
| @@ -0,0 +1,290 @@ | ||||
| /* | ||||
|  * #%L | ||||
|  * Alfresco Repository | ||||
|  * %% | ||||
|  * Copyright (C) 2005 - 2021 Alfresco Software Limited | ||||
|  * %% | ||||
|  * This file is part of the Alfresco software. | ||||
|  * If the software was purchased under a paid Alfresco license, the terms of | ||||
|  * the paid license agreement will prevail.  Otherwise, the software is | ||||
|  * provided under the following open source license terms: | ||||
|  * | ||||
|  * Alfresco is free software: you can redistribute it and/or modify | ||||
|  * it under the terms of the GNU Lesser General Public License as published by | ||||
|  * the Free Software Foundation, either version 3 of the License, or | ||||
|  * (at your option) any later version. | ||||
|  * | ||||
|  * Alfresco is distributed in the hope that it will be useful, | ||||
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|  * GNU Lesser General Public License for more details. | ||||
|  * | ||||
|  * You should have received a copy of the GNU Lesser General Public License | ||||
|  * along with Alfresco. If not, see <http://www.gnu.org/licenses/>. | ||||
|  * #L% | ||||
|  */ | ||||
| package org.alfresco.repo.event2; | ||||
|  | ||||
| import static java.lang.Thread.sleep; | ||||
| import static org.junit.Assert.assertEquals; | ||||
| import static org.mockito.ArgumentMatchers.any; | ||||
| import static org.mockito.Mockito.doThrow; | ||||
| import static org.mockito.Mockito.mock; | ||||
| import static org.mockito.Mockito.when; | ||||
|  | ||||
| import java.util.HashMap; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.concurrent.Callable; | ||||
| import java.util.concurrent.CopyOnWriteArrayList; | ||||
| import java.util.concurrent.Executor; | ||||
| import java.util.concurrent.ExecutorService; | ||||
| import java.util.concurrent.SynchronousQueue; | ||||
| import java.util.concurrent.ThreadPoolExecutor; | ||||
| import java.util.concurrent.TimeUnit; | ||||
|  | ||||
| import org.alfresco.repo.event.v1.model.RepoEvent; | ||||
| import org.junit.After; | ||||
| import org.junit.Before; | ||||
| import org.junit.Test; | ||||
| import org.mockito.Mockito; | ||||
| import org.mockito.invocation.InvocationOnMock; | ||||
| import org.mockito.stubbing.Answer; | ||||
|  | ||||
| public class EventGeneratorQueueUnitTest | ||||
| { | ||||
|     private EventGeneratorQueue queue; | ||||
|  | ||||
|     private Event2MessageProducer bus; | ||||
|     private ExecutorService enqueuePool; | ||||
|     private ExecutorService dequeuePool; | ||||
|     private List<RepoEvent<?>> recordedEvents; | ||||
|     private Map<String, RepoEvent<?>> events; | ||||
|  | ||||
|     @Before | ||||
|     public void setup()  | ||||
|     { | ||||
|         queue = new EventGeneratorQueue(); | ||||
|  | ||||
|         enqueuePool = newThreadPool(); | ||||
|         queue.setEnqueueThreadPoolExecutor(enqueuePool); | ||||
|         dequeuePool = newThreadPool(); | ||||
|         queue.setDequeueThreadPoolExecutor(dequeuePool); | ||||
|  | ||||
|         bus = mock(Event2MessageProducer.class); | ||||
|         queue.setEvent2MessageProducer(bus); | ||||
|  | ||||
|         events = new HashMap<>(); | ||||
|  | ||||
|         setupEventsRecorder(); | ||||
|     } | ||||
|  | ||||
|     @After | ||||
|     public void teardown()  | ||||
|     { | ||||
|         enqueuePool.shutdown(); | ||||
|     } | ||||
|  | ||||
|     private void setupEventsRecorder() | ||||
|     { | ||||
|         recordedEvents = new CopyOnWriteArrayList<>(); | ||||
|  | ||||
|         Mockito.doAnswer(new Answer<Void>() | ||||
|         { | ||||
|             @Override | ||||
|             public Void answer(InvocationOnMock invocation) throws Throwable | ||||
|             { | ||||
|                 RepoEvent<?> event = invocation.getArgument(0, RepoEvent.class); | ||||
|                 recordedEvents.add(event); | ||||
|                 return null; | ||||
|             } | ||||
|         }).when(bus).send(any()); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void shouldReceiveSingleQuickMessage() throws Exception  | ||||
|     { | ||||
|         queue.accept(messageWithDelay("A", 55l)); | ||||
|  | ||||
|         sleep(150l); | ||||
|  | ||||
|         assertEquals(1, recordedEvents.size()); | ||||
|         assertEquals("A", recordedEvents.get(0).getId()); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void shouldNotReceiveEventsWhenMessageIsNull() throws Exception  | ||||
|     { | ||||
|         queue.accept(() -> { return null; }); | ||||
|  | ||||
|         sleep(150l); | ||||
|  | ||||
|         assertEquals(0, recordedEvents.size()); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void shouldReceiveMultipleMessagesPreservingOrderScenarioOne() throws Exception { | ||||
|         queue.accept(messageWithDelay("A", 0l)); | ||||
|         queue.accept(messageWithDelay("B", 100l)); | ||||
|         queue.accept(messageWithDelay("C", 200l)); | ||||
|  | ||||
|         sleep(450l); | ||||
|  | ||||
|         assertEquals(3, recordedEvents.size()); | ||||
|         assertEquals("A", recordedEvents.get(0).getId()); | ||||
|         assertEquals("B", recordedEvents.get(1).getId()); | ||||
|         assertEquals("C", recordedEvents.get(2).getId()); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void shouldReceiveMultipleMessagesPreservingOrderScenarioTwo() throws Exception | ||||
|     { | ||||
|         queue.accept(messageWithDelay("A", 300l)); | ||||
|         queue.accept(messageWithDelay("B", 150l)); | ||||
|         queue.accept(messageWithDelay("C", 0l)); | ||||
|  | ||||
|         sleep(950l); | ||||
|  | ||||
|         assertEquals(3, recordedEvents.size()); | ||||
|         assertEquals("A", recordedEvents.get(0).getId()); | ||||
|         assertEquals("B", recordedEvents.get(1).getId()); | ||||
|         assertEquals("C", recordedEvents.get(2).getId()); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisoned() throws Exception | ||||
|     { | ||||
|         queue.accept(messageWithDelay("A", 300l)); | ||||
|         queue.accept(() -> {throw new RuntimeException("Boom! (not to worry, this is a test)");}); | ||||
|         queue.accept(messageWithDelay("B", 55l)); | ||||
|         queue.accept(messageWithDelay("C", 0l)); | ||||
|  | ||||
|         sleep(950l); | ||||
|  | ||||
|         assertEquals(3, recordedEvents.size()); | ||||
|         assertEquals("A", recordedEvents.get(0).getId()); | ||||
|         assertEquals("B", recordedEvents.get(1).getId()); | ||||
|         assertEquals("C", recordedEvents.get(2).getId()); | ||||
|     } | ||||
|      | ||||
|     @Test | ||||
|     public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisoned() throws Exception | ||||
|     { | ||||
|         Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l); | ||||
|         RepoEvent<?> messageB = makerB.call(); | ||||
|         doThrow(new RuntimeException("Boom! (not to worry, this is a test)")).when(bus).send(messageB); | ||||
|         queue.accept(messageWithDelay("A", 300l)); | ||||
|         queue.accept(makerB); | ||||
|         queue.accept(messageWithDelay("C", 0l)); | ||||
|  | ||||
|         sleep(950l); | ||||
|  | ||||
|         assertEquals(2, recordedEvents.size()); | ||||
|         assertEquals("A", recordedEvents.get(0).getId()); | ||||
|         assertEquals("C", recordedEvents.get(1).getId()); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisonedWithError() throws Exception | ||||
|     { | ||||
|         queue.accept(messageWithDelay("A", 300l)); | ||||
|         queue.accept(() -> {throw new OutOfMemoryError("Boom! (not to worry, this is a test)");}); | ||||
|         queue.accept(messageWithDelay("B", 55l)); | ||||
|         queue.accept(messageWithDelay("C", 0l)); | ||||
|  | ||||
|         sleep(950l); | ||||
|  | ||||
|         assertEquals(3, recordedEvents.size()); | ||||
|         assertEquals("A", recordedEvents.get(0).getId()); | ||||
|         assertEquals("B", recordedEvents.get(1).getId()); | ||||
|         assertEquals("C", recordedEvents.get(2).getId()); | ||||
|     } | ||||
|      | ||||
|     @Test | ||||
|     public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisonedWithError() throws Exception | ||||
|     { | ||||
|         Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l); | ||||
|         RepoEvent<?> messageB = makerB.call(); | ||||
|         doThrow(new OutOfMemoryError("Boom! (not to worry, this is a test)")).when(bus).send(messageB); | ||||
|         queue.accept(messageWithDelay("A", 300l)); | ||||
|         queue.accept(makerB); | ||||
|         queue.accept(messageWithDelay("C", 0l)); | ||||
|  | ||||
|         sleep(950l); | ||||
|  | ||||
|         assertEquals(2, recordedEvents.size()); | ||||
|         assertEquals("A", recordedEvents.get(0).getId()); | ||||
|         assertEquals("C", recordedEvents.get(1).getId()); | ||||
|     } | ||||
|  | ||||
|     private Callable<RepoEvent<?>> messageWithDelay(String id, long delay) | ||||
|     { | ||||
|         Callable<RepoEvent<?>> res = new Callable<RepoEvent<?>>() { | ||||
|  | ||||
|             @Override | ||||
|             public RepoEvent<?> call() throws Exception | ||||
|             { | ||||
|                 if(delay != 0) | ||||
|                 { | ||||
|                     sleep(delay);  | ||||
|                 } | ||||
|                 return newRepoEvent(id); | ||||
|             }  | ||||
|              | ||||
|             @Override | ||||
|             public String toString() | ||||
|             { | ||||
|                 return id; | ||||
|             } | ||||
|         }; | ||||
|         return res; | ||||
|     } | ||||
|      | ||||
|     private RepoEvent<?> newRepoEvent(String id) | ||||
|     { | ||||
|         RepoEvent<?> ev = events.get(id); | ||||
|         if (ev!=null) | ||||
|             return ev; | ||||
|          | ||||
|         ev = mock(RepoEvent.class); | ||||
|         when(ev.getId()).thenReturn(id); | ||||
|         when(ev.toString()).thenReturn(id); | ||||
|         events.put(id, ev); | ||||
|  | ||||
|         return ev; | ||||
|     } | ||||
|  | ||||
|     public static ExecutorService newThreadPool()  | ||||
|     { | ||||
|         return new ThreadPoolExecutor(2, Integer.MAX_VALUE, | ||||
|                                       60L, TimeUnit.SECONDS, | ||||
|                                       new SynchronousQueue<Runnable>()); | ||||
|     } | ||||
|  | ||||
|     public static final Executor SYNC_EXECUTOR_SAME_THREAD = new Executor() | ||||
|     { | ||||
|         @Override | ||||
|         public void execute(Runnable command) | ||||
|         { | ||||
|             command.run(); | ||||
|         } | ||||
|     }; | ||||
|  | ||||
|     public static final Executor SYNC_EXECUTOR_NEW_THREAD = new Executor() | ||||
|     { | ||||
|         @Override | ||||
|         public void execute(Runnable command) | ||||
|         { | ||||
|             Thread t = new Thread(command); | ||||
|             t.start(); | ||||
|             try | ||||
|             { | ||||
|                 t.join(); | ||||
|             } | ||||
|             catch (InterruptedException e) | ||||
|             { | ||||
|                 Thread.currentThread().interrupt(); | ||||
|             } | ||||
|         } | ||||
|     }; | ||||
| } | ||||
| @@ -0,0 +1,249 @@ | ||||
| /* | ||||
|  * #%L | ||||
|  * Alfresco Repository | ||||
|  * %% | ||||
|  * Copyright (C) 2005 - 2020 Alfresco Software Limited | ||||
|  * %% | ||||
|  * This file is part of the Alfresco software. | ||||
|  * If the software was purchased under a paid Alfresco license, the terms of | ||||
|  * the paid license agreement will prevail.  Otherwise, the software is | ||||
|  * provided under the following open source license terms: | ||||
|  * | ||||
|  * Alfresco is free software: you can redistribute it and/or modify | ||||
|  * it under the terms of the GNU Lesser General Public License as published by | ||||
|  * the Free Software Foundation, either version 3 of the License, or | ||||
|  * (at your option) any later version. | ||||
|  * | ||||
|  * Alfresco is distributed in the hope that it will be useful, | ||||
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|  * GNU Lesser General Public License for more details. | ||||
|  * | ||||
|  * You should have received a copy of the GNU Lesser General Public License | ||||
|  * along with Alfresco. If not, see <http://www.gnu.org/licenses/>. | ||||
|  * #L% | ||||
|  */ | ||||
| package org.alfresco.repo.event2; | ||||
|  | ||||
| import java.util.Collections; | ||||
| import java.util.LinkedList; | ||||
| import java.util.List; | ||||
| import java.util.Set; | ||||
| import java.util.concurrent.TimeUnit; | ||||
|  | ||||
| import javax.jms.Destination; | ||||
| import javax.jms.JMSException; | ||||
| import javax.jms.Message; | ||||
| import javax.jms.MessageConsumer; | ||||
| import javax.jms.MessageListener; | ||||
| import javax.jms.Session; | ||||
|  | ||||
| import org.alfresco.model.ContentModel; | ||||
| import org.alfresco.repo.event.databind.ObjectMapperFactory; | ||||
| import org.alfresco.repo.event.v1.model.RepoEvent; | ||||
| import org.alfresco.service.cmr.repository.NodeRef; | ||||
| import org.apache.activemq.ActiveMQConnection; | ||||
| import org.apache.activemq.ActiveMQConnectionFactory; | ||||
| import org.apache.activemq.advisory.DestinationSource; | ||||
| import org.apache.activemq.command.ActiveMQQueue; | ||||
| import org.apache.activemq.command.ActiveMQTextMessage; | ||||
| import org.apache.activemq.command.ActiveMQTopic; | ||||
| import org.awaitility.Awaitility; | ||||
| import org.junit.After; | ||||
| import org.junit.Before; | ||||
| import org.junit.Test; | ||||
| import org.springframework.beans.factory.annotation.Autowired; | ||||
| import org.springframework.beans.factory.annotation.Qualifier; | ||||
|  | ||||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||||
|  | ||||
| public class EventGeneratorTest extends AbstractContextAwareRepoEvent | ||||
| { | ||||
|     private static final String EVENT2_TOPIC_NAME = "alfresco.repo.event2"; | ||||
|  | ||||
|     private static final long DUMP_BROKER_TIMEOUT = 50000000l; | ||||
|  | ||||
|     @Autowired @Qualifier("event2ObjectMapper") | ||||
|     private ObjectMapper objectMapper; | ||||
|  | ||||
|     private ActiveMQConnection connection; | ||||
|     protected List<RepoEvent<?>> receivedEvents; | ||||
|  | ||||
|     @Before | ||||
|     public void startupTopicListener() throws Exception | ||||
|     { | ||||
|         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); | ||||
|         connection = (ActiveMQConnection) connectionFactory.createConnection(); | ||||
|         connection.start(); | ||||
|  | ||||
|         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | ||||
|         Destination destination = session.createTopic(EVENT2_TOPIC_NAME); | ||||
|         MessageConsumer consumer = session.createConsumer(destination); | ||||
|  | ||||
|         receivedEvents = Collections.synchronizedList(new LinkedList<>()); | ||||
|         consumer.setMessageListener(new MessageListener() | ||||
|         { | ||||
|             @Override | ||||
|             public void onMessage(Message message) | ||||
|             { | ||||
|                 String text = getText(message); | ||||
|                 RepoEvent<?> event = toRepoEvent(text); | ||||
|  | ||||
|                 if (DEBUG) | ||||
|                 { | ||||
|                     System.err.println("RX: " + event); | ||||
|                 } | ||||
|  | ||||
|                 receivedEvents.add(event); | ||||
|             } | ||||
|  | ||||
|             private RepoEvent<?> toRepoEvent(String json) | ||||
|             { | ||||
|                 try | ||||
|                 { | ||||
|                     return objectMapper.readValue(json, RepoEvent.class); | ||||
|                 } catch (Exception e) | ||||
|                 { | ||||
|                     e.printStackTrace(); | ||||
|                     return null; | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
|  | ||||
|         if (DEBUG) | ||||
|         { | ||||
|             System.err.println("Now actively listening on topic " + EVENT2_TOPIC_NAME); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     protected ObjectMapper createObjectMapper() | ||||
|     { | ||||
|         return ObjectMapperFactory.createInstance(); | ||||
|     } | ||||
|  | ||||
|     @After | ||||
|     public void shutdownTopicListener() throws Exception | ||||
|     { | ||||
|         connection.close(); | ||||
|         connection = null; | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void shouldReceiveEvent2EventsOnNodeCreation() throws Exception | ||||
|     { | ||||
|         createNode(ContentModel.TYPE_CONTENT); | ||||
|  | ||||
|         Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 1); | ||||
|  | ||||
|         RepoEvent<?> sent = getRepoEvent(1); | ||||
|         RepoEvent<?> received = receivedEvents.get(0); | ||||
|         assertEventsEquals("Events are different!", sent, received); | ||||
|     } | ||||
|  | ||||
|     private void assertEventsEquals(String message, RepoEvent<?> expected, RepoEvent<?> current) | ||||
|     { | ||||
|         if (DEBUG) | ||||
|         { | ||||
|             System.err.println("XP: " + expected); | ||||
|             System.err.println("CU: " + current); | ||||
|         } | ||||
|          | ||||
|         assertEquals(message, expected, current); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void shouldReceiveEvent2EventsInOrder() throws Exception | ||||
|     { | ||||
|         NodeRef nodeRef = createNode(ContentModel.TYPE_CONTENT); | ||||
|         updateNodeName(nodeRef, "TestFile-" + System.currentTimeMillis() + ".txt"); | ||||
|         deleteNode(nodeRef); | ||||
|  | ||||
|         Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 3); | ||||
|  | ||||
|         RepoEvent<?> sentCreation = getRepoEvent(1); | ||||
|         RepoEvent<?> sentUpdate = getRepoEvent(2); | ||||
|         RepoEvent<?> sentDeletion = getRepoEvent(3); | ||||
|         assertEquals("Expected create event!", sentCreation, (RepoEvent<?>) receivedEvents.get(0)); | ||||
|         assertEquals("Expected update event!", sentUpdate, (RepoEvent<?>) receivedEvents.get(1)); | ||||
|         assertEquals("Expected delete event!", sentDeletion, (RepoEvent<?>) receivedEvents.get(2)); | ||||
|     } | ||||
|  | ||||
|     private static String getText(Message message) | ||||
|     { | ||||
|         try | ||||
|         { | ||||
|             ActiveMQTextMessage am = (ActiveMQTextMessage) message; | ||||
|             return am.getText(); | ||||
|         } catch (JMSException e) | ||||
|         { | ||||
|             return null; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     // a simple main to investigate the contents of the local broker | ||||
|     public static void main(String[] args) throws Exception | ||||
|     { | ||||
|         dumpBroker("tcp://localhost:61616", DUMP_BROKER_TIMEOUT); | ||||
|         System.exit(0); | ||||
|     } | ||||
|  | ||||
|     private static void dumpBroker(String url, long timeout) throws Exception | ||||
|     { | ||||
|         System.out.println("Broker at url: '" + url + "'"); | ||||
|  | ||||
|         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); | ||||
|         ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); | ||||
|         try | ||||
|         { | ||||
|             connection.start(); | ||||
|  | ||||
|             DestinationSource ds = connection.getDestinationSource(); | ||||
|  | ||||
|             Set<ActiveMQQueue> queues = ds.getQueues(); | ||||
|             System.out.println("\nFound " + queues.size() + " queues:"); | ||||
|             for (ActiveMQQueue queue : queues) | ||||
|             { | ||||
|                 try | ||||
|                 { | ||||
|                     System.out.println("- " + queue.getQueueName()); | ||||
|                 } catch (JMSException e) | ||||
|                 { | ||||
|                     e.printStackTrace(); | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             Set<ActiveMQTopic> topics = ds.getTopics(); | ||||
|             System.out.println("\nFound " + topics.size() + " topics:"); | ||||
|             for (ActiveMQTopic topic : topics) | ||||
|             { | ||||
|                 try | ||||
|                 { | ||||
|                     System.out.println("- " + topic.getTopicName()); | ||||
|                 } catch (JMSException e) | ||||
|                 { | ||||
|                     e.printStackTrace(); | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | ||||
|             Destination destination = session.createTopic(EVENT2_TOPIC_NAME); | ||||
|             MessageConsumer consumer = session.createConsumer(destination); | ||||
|  | ||||
|             System.out.println("\nListening to topic " + EVENT2_TOPIC_NAME + "..."); | ||||
|             consumer.setMessageListener(new MessageListener() | ||||
|             { | ||||
|                 @Override | ||||
|                 public void onMessage(Message message) | ||||
|                 { | ||||
|                     String text = getText(message); | ||||
|                     System.out.println("Received message " + message + "\n" + text + "\n"); | ||||
|                 } | ||||
|             }); | ||||
|  | ||||
|             Thread.sleep(timeout); | ||||
|         } finally | ||||
|         { | ||||
|             connection.close(); | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -34,7 +34,8 @@ import org.junit.runners.Suite.SuiteClasses; | ||||
|                 UpdateRepoEventIT.class, | ||||
|                 DeleteRepoEventIT.class, | ||||
|                 ChildAssociationRepoEventIT.class, | ||||
|                 PeerAssociationRepoEventIT.class | ||||
|                 PeerAssociationRepoEventIT.class, | ||||
|                 EventGeneratorTest.class | ||||
| }) | ||||
| public class RepoEvent2ITSuite | ||||
| { | ||||
|   | ||||
| @@ -33,7 +33,8 @@ import org.junit.runners.Suite.SuiteClasses; | ||||
| @RunWith(Suite.class) | ||||
| @SuiteClasses({ EventFilterUnitTest.class, | ||||
|                 EventConsolidatorUnitTest.class, | ||||
|                 EventJSONSchemaUnitTest.class | ||||
|                 EventJSONSchemaUnitTest.class, | ||||
|                 EventGeneratorQueueUnitTest.class | ||||
| }) | ||||
| public class RepoEvent2UnitSuite | ||||
| { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user