mirror of
				https://github.com/Alfresco/alfresco-community-repo.git
				synced 2025-10-22 15:12:38 +00:00 
			
		
		
		
	Compare commits
	
		
			7 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | dc5e7405cc | ||
|  | 3c8bb7f154 | ||
|  | bb8d42d23c | ||
|  | 9c1aa53819 | ||
|  | 885f4a49a5 | ||
|  | 9989ec3260 | ||
|  | 78ad14b696 | 
| @@ -7,7 +7,7 @@ | |||||||
|    <parent> |    <parent> | ||||||
|       <groupId>org.alfresco</groupId> |       <groupId>org.alfresco</groupId> | ||||||
|       <artifactId>alfresco-community-repo</artifactId> |       <artifactId>alfresco-community-repo</artifactId> | ||||||
|       <version>8.424-SNAPSHOT</version> |       <version>9.7</version> | ||||||
|    </parent> |    </parent> | ||||||
|  |  | ||||||
|    <dependencies> |    <dependencies> | ||||||
|   | |||||||
| @@ -7,7 +7,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo</artifactId> |         <artifactId>alfresco-community-repo</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <properties> |     <properties> | ||||||
|   | |||||||
| @@ -9,6 +9,6 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo-packaging</artifactId> |         <artifactId>alfresco-community-repo-packaging</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
| </project> | </project> | ||||||
|   | |||||||
| @@ -7,7 +7,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo-packaging</artifactId> |         <artifactId>alfresco-community-repo-packaging</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <properties> |     <properties> | ||||||
|   | |||||||
| @@ -7,7 +7,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo</artifactId> |         <artifactId>alfresco-community-repo</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <profiles> |     <profiles> | ||||||
|   | |||||||
| @@ -6,7 +6,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo-packaging</artifactId> |         <artifactId>alfresco-community-repo-packaging</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <modules> |     <modules> | ||||||
|   | |||||||
| @@ -9,7 +9,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo-tests</artifactId> |         <artifactId>alfresco-community-repo-tests</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <developers> |     <developers> | ||||||
|   | |||||||
| @@ -9,7 +9,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo-tests</artifactId> |         <artifactId>alfresco-community-repo-tests</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <developers> |     <developers> | ||||||
|   | |||||||
| @@ -9,7 +9,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo-tests</artifactId> |         <artifactId>alfresco-community-repo-tests</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <developers> |     <developers> | ||||||
|   | |||||||
| @@ -9,7 +9,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo-tests</artifactId> |         <artifactId>alfresco-community-repo-tests</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <developers> |     <developers> | ||||||
|   | |||||||
| @@ -9,7 +9,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo-tests</artifactId> |         <artifactId>alfresco-community-repo-tests</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <developers> |     <developers> | ||||||
|   | |||||||
| @@ -7,7 +7,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo-packaging</artifactId> |         <artifactId>alfresco-community-repo-packaging</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <properties> |     <properties> | ||||||
|   | |||||||
							
								
								
									
										6
									
								
								pom.xml
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								pom.xml
									
									
									
									
									
								
							| @@ -2,7 +2,7 @@ | |||||||
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||||||
|     <modelVersion>4.0.0</modelVersion> |     <modelVersion>4.0.0</modelVersion> | ||||||
|     <artifactId>alfresco-community-repo</artifactId> |     <artifactId>alfresco-community-repo</artifactId> | ||||||
|     <version>8.424-SNAPSHOT</version> |     <version>9.7</version> | ||||||
|     <packaging>pom</packaging> |     <packaging>pom</packaging> | ||||||
|     <name>Alfresco Community Repo Parent</name> |     <name>Alfresco Community Repo Parent</name> | ||||||
|  |  | ||||||
| @@ -23,7 +23,7 @@ | |||||||
|     <properties> |     <properties> | ||||||
|         <acs.version.major>7</acs.version.major> |         <acs.version.major>7</acs.version.major> | ||||||
|         <acs.version.minor>0</acs.version.minor> |         <acs.version.minor>0</acs.version.minor> | ||||||
|         <acs.version.revision>0</acs.version.revision> |         <acs.version.revision>1</acs.version.revision> | ||||||
|         <acs.version.label /> |         <acs.version.label /> | ||||||
|  |  | ||||||
|         <version.edition>Community</version.edition> |         <version.edition>Community</version.edition> | ||||||
| @@ -116,7 +116,7 @@ | |||||||
|         <connection>scm:git:https://github.com/Alfresco/alfresco-community-repo.git</connection> |         <connection>scm:git:https://github.com/Alfresco/alfresco-community-repo.git</connection> | ||||||
|         <developerConnection>scm:git:https://github.com/Alfresco/alfresco-community-repo.git</developerConnection> |         <developerConnection>scm:git:https://github.com/Alfresco/alfresco-community-repo.git</developerConnection> | ||||||
|         <url>https://github.com/Alfresco/alfresco-community-repo</url> |         <url>https://github.com/Alfresco/alfresco-community-repo</url> | ||||||
|         <tag>HEAD</tag> |         <tag>9.7</tag> | ||||||
|     </scm> |     </scm> | ||||||
|  |  | ||||||
|     <distributionManagement> |     <distributionManagement> | ||||||
|   | |||||||
| @@ -7,7 +7,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo</artifactId> |         <artifactId>alfresco-community-repo</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <dependencies> |     <dependencies> | ||||||
|   | |||||||
| @@ -7,7 +7,7 @@ | |||||||
|     <parent> |     <parent> | ||||||
|         <groupId>org.alfresco</groupId> |         <groupId>org.alfresco</groupId> | ||||||
|         <artifactId>alfresco-community-repo</artifactId> |         <artifactId>alfresco-community-repo</artifactId> | ||||||
|         <version>8.424-SNAPSHOT</version> |         <version>9.7</version> | ||||||
|     </parent> |     </parent> | ||||||
|  |  | ||||||
|     <dependencies> |     <dependencies> | ||||||
|   | |||||||
| @@ -54,7 +54,6 @@ import org.alfresco.repo.policy.JavaBehaviour; | |||||||
| import org.alfresco.repo.policy.PolicyComponent; | import org.alfresco.repo.policy.PolicyComponent; | ||||||
| import org.alfresco.repo.security.authentication.AuthenticationUtil; | import org.alfresco.repo.security.authentication.AuthenticationUtil; | ||||||
| import org.alfresco.repo.transaction.AlfrescoTransactionSupport; | import org.alfresco.repo.transaction.AlfrescoTransactionSupport; | ||||||
| import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; |  | ||||||
| import org.alfresco.service.cmr.dictionary.DictionaryService; | import org.alfresco.service.cmr.dictionary.DictionaryService; | ||||||
| import org.alfresco.service.cmr.repository.AssociationRef; | import org.alfresco.service.cmr.repository.AssociationRef; | ||||||
| import org.alfresco.service.cmr.repository.ChildAssociationRef; | import org.alfresco.service.cmr.repository.ChildAssociationRef; | ||||||
| @@ -90,11 +89,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | |||||||
|     protected DictionaryService dictionaryService; |     protected DictionaryService dictionaryService; | ||||||
|     private DescriptorService descriptorService; |     private DescriptorService descriptorService; | ||||||
|     private EventFilterRegistry eventFilterRegistry; |     private EventFilterRegistry eventFilterRegistry; | ||||||
|     private Event2MessageProducer event2MessageProducer; |  | ||||||
|     private TransactionService transactionService; |     private TransactionService transactionService; | ||||||
|     private PersonService personService; |     private PersonService personService; | ||||||
|     protected NodeResourceHelper nodeResourceHelper; |     protected NodeResourceHelper nodeResourceHelper; | ||||||
|  |  | ||||||
|  |     private EventGeneratorQueue eventGeneratorQueue; | ||||||
|     private NodeTypeFilter nodeTypeFilter; |     private NodeTypeFilter nodeTypeFilter; | ||||||
|     private ChildAssociationTypeFilter childAssociationTypeFilter; |     private ChildAssociationTypeFilter childAssociationTypeFilter; | ||||||
|     private EventUserFilter userFilter; |     private EventUserFilter userFilter; | ||||||
| @@ -109,10 +108,10 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | |||||||
|         PropertyCheck.mandatory(this, "dictionaryService", dictionaryService); |         PropertyCheck.mandatory(this, "dictionaryService", dictionaryService); | ||||||
|         PropertyCheck.mandatory(this, "descriptorService", descriptorService); |         PropertyCheck.mandatory(this, "descriptorService", descriptorService); | ||||||
|         PropertyCheck.mandatory(this, "eventFilterRegistry", eventFilterRegistry); |         PropertyCheck.mandatory(this, "eventFilterRegistry", eventFilterRegistry); | ||||||
|         PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer); |  | ||||||
|         PropertyCheck.mandatory(this, "transactionService", transactionService); |         PropertyCheck.mandatory(this, "transactionService", transactionService); | ||||||
|         PropertyCheck.mandatory(this, "personService", personService); |         PropertyCheck.mandatory(this, "personService", personService); | ||||||
|         PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper); |         PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper); | ||||||
|  |         PropertyCheck.mandatory(this, "eventGeneratorQueue", eventGeneratorQueue); | ||||||
|  |  | ||||||
|         this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter(); |         this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter(); | ||||||
|         this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter(); |         this.childAssociationTypeFilter = eventFilterRegistry.getChildAssociationTypeFilter(); | ||||||
| @@ -177,12 +176,6 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | |||||||
|         this.eventFilterRegistry = eventFilterRegistry; |         this.eventFilterRegistry = eventFilterRegistry; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     @SuppressWarnings("unused") |  | ||||||
|     public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer) |  | ||||||
|     { |  | ||||||
|         this.event2MessageProducer = event2MessageProducer; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     public void setTransactionService(TransactionService transactionService) |     public void setTransactionService(TransactionService transactionService) | ||||||
|     { |     { | ||||||
|         this.transactionService = transactionService; |         this.transactionService = transactionService; | ||||||
| @@ -198,6 +191,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | |||||||
|         this.nodeResourceHelper = nodeResourceHelper; |         this.nodeResourceHelper = nodeResourceHelper; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     public void setEventGeneratorQueue(EventGeneratorQueue eventGeneratorQueue) | ||||||
|  |     { | ||||||
|  |         this.eventGeneratorQueue = eventGeneratorQueue; | ||||||
|  |     } | ||||||
|  |  | ||||||
|     @Override |     @Override | ||||||
|     public void onCreateNode(ChildAssociationRef childAssocRef) |     public void onCreateNode(ChildAssociationRef childAssocRef) | ||||||
|     { |     { | ||||||
| @@ -428,20 +426,26 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | |||||||
|  |  | ||||||
|         protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator) |         protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator) | ||||||
|         { |         { | ||||||
|  |             EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); | ||||||
|  |             eventGeneratorQueue.accept(()-> createEvent(nodeRef, consolidator, eventInfo)); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         private RepoEvent<?> createEvent(NodeRef nodeRef, EventConsolidator consolidator, EventInfo eventInfo) | ||||||
|  |         { | ||||||
|  |             String user = eventInfo.getPrincipal(); | ||||||
|  |  | ||||||
|             if (consolidator.isTemporaryNode()) |             if (consolidator.isTemporaryNode()) | ||||||
|             { |             { | ||||||
|                 if (LOGGER.isTraceEnabled()) |                 if (LOGGER.isTraceEnabled()) | ||||||
|                 { |                 { | ||||||
|                     LOGGER.trace("Ignoring temporary node: " + nodeRef); |                     LOGGER.trace("Ignoring temporary node: " + nodeRef); | ||||||
|                 } |                 } | ||||||
|                 return; |                 return null; | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             final String user = AuthenticationUtil.getFullyAuthenticatedUser(); |  | ||||||
|             // Get the repo event before the filtering, |             // Get the repo event before the filtering, | ||||||
|             // so we can take the latest node info into account |             // so we can take the latest node info into account | ||||||
|             final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user)); |             final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo); | ||||||
|  |  | ||||||
|  |  | ||||||
|             final QName nodeType = consolidator.getNodeType(); |             final QName nodeType = consolidator.getNodeType(); | ||||||
|             if (isFiltered(nodeType, user)) |             if (isFiltered(nodeType, user)) | ||||||
| @@ -452,7 +456,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | |||||||
|                             + ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString()) |                             + ((nodeType == null) ? "Unknown' " : nodeType.toPrefixString()) | ||||||
|                             + "' created by: " + user); |                             + "' created by: " + user); | ||||||
|                 } |                 } | ||||||
|                 return; |                 return null; | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             if (event.getType().equals(EventType.NODE_UPDATED.getType()) && consolidator.isResourceBeforeAllFieldsNull()) |             if (event.getType().equals(EventType.NODE_UPDATED.getType()) && consolidator.isResourceBeforeAllFieldsNull()) | ||||||
| @@ -461,27 +465,34 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | |||||||
|                 { |                 { | ||||||
|                     LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeRef); |                     LOGGER.trace("Ignoring node updated event as no fields have been updated: " + nodeRef); | ||||||
|                 } |                 } | ||||||
|                 return; |                 return null; | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             logAndSendEvent(event, consolidator.getEventTypes()); |             logEvent(event, consolidator.getEventTypes()); | ||||||
|  |             return event; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator) |         protected void sendEvent(ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator) | ||||||
|         { |         { | ||||||
|  |             EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); | ||||||
|  |             eventGeneratorQueue.accept(()-> createEvent(eventInfo, childAssociationRef, consolidator)); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         private RepoEvent<?> createEvent(EventInfo eventInfo, ChildAssociationRef childAssociationRef, ChildAssociationEventConsolidator consolidator) | ||||||
|  |         { | ||||||
|  |             String user = eventInfo.getPrincipal(); | ||||||
|             if (consolidator.isTemporaryChildAssociation()) |             if (consolidator.isTemporaryChildAssociation()) | ||||||
|             { |             { | ||||||
|                 if (LOGGER.isTraceEnabled()) |                 if (LOGGER.isTraceEnabled()) | ||||||
|                 { |                 { | ||||||
|                     LOGGER.trace("Ignoring temporary child association: " + childAssociationRef); |                     LOGGER.trace("Ignoring temporary child association: " + childAssociationRef); | ||||||
|                 } |                 } | ||||||
|                 return; |                 return null; | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             final String user = AuthenticationUtil.getFullyAuthenticatedUser(); |  | ||||||
|             // Get the repo event before the filtering, |             // Get the repo event before the filtering, | ||||||
|             // so we can take the latest association info into account |             // so we can take the latest association info into account | ||||||
|             final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user)); |             final RepoEvent<?> event = consolidator.getRepoEvent(eventInfo); | ||||||
|  |  | ||||||
|             final QName childAssocType = consolidator.getChildAssocType(); |             final QName childAssocType = consolidator.getChildAssocType(); | ||||||
|             if (isFilteredChildAssociation(childAssocType, user)) |             if (isFilteredChildAssociation(childAssocType, user)) | ||||||
| @@ -492,7 +503,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | |||||||
|                             + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString()) |                             + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString()) | ||||||
|                             + "' created by: " + user); |                             + "' created by: " + user); | ||||||
|                 } |                 } | ||||||
|                 return; |                 return null; | ||||||
|             } else if (childAssociationRef.isPrimary()) |             } else if (childAssociationRef.isPrimary()) | ||||||
|             { |             { | ||||||
|                 if (LOGGER.isTraceEnabled()) |                 if (LOGGER.isTraceEnabled()) | ||||||
| @@ -501,13 +512,20 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | |||||||
|                             + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString()) |                             + ((childAssocType == null) ? "Unknown' " : childAssocType.toPrefixString()) | ||||||
|                             + "' created by: " + user); |                             + "' created by: " + user); | ||||||
|                 } |                 } | ||||||
|                 return; |                 return null; | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             logAndSendEvent(event, consolidator.getEventTypes()); |             logEvent(event, consolidator.getEventTypes()); | ||||||
|  |             return event; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator) |         protected void sendEvent(AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator) | ||||||
|  |         { | ||||||
|  |             EventInfo eventInfo = getEventInfo(AuthenticationUtil.getFullyAuthenticatedUser()); | ||||||
|  |             eventGeneratorQueue.accept(()-> createEvent(eventInfo, peerAssociationRef, consolidator)); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         private RepoEvent<?> createEvent(EventInfo eventInfo, AssociationRef peerAssociationRef, PeerAssociationEventConsolidator consolidator) | ||||||
|         { |         { | ||||||
|             if (consolidator.isTemporaryPeerAssociation()) |             if (consolidator.isTemporaryPeerAssociation()) | ||||||
|             { |             { | ||||||
| @@ -515,30 +533,21 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin | |||||||
|                 { |                 { | ||||||
|                     LOGGER.trace("Ignoring temporary peer association: " + peerAssociationRef); |                     LOGGER.trace("Ignoring temporary peer association: " + peerAssociationRef); | ||||||
|                 } |                 } | ||||||
|                 return; |                 return null; | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             final String user = AuthenticationUtil.getFullyAuthenticatedUser(); |             RepoEvent<?> event = consolidator.getRepoEvent(eventInfo); | ||||||
|             // Get the repo event before the filtering, |             logEvent(event, consolidator.getEventTypes()); | ||||||
|             // so we can take the latest association info into account |             return event; | ||||||
|             final RepoEvent<?> event = consolidator.getRepoEvent(getEventInfo(user)); |  | ||||||
|  |  | ||||||
|             logAndSendEvent(event, consolidator.getEventTypes()); |  | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         protected void logAndSendEvent(RepoEvent<?> event, Deque<EventType> listOfEvents) |         private void logEvent(RepoEvent<?> event, Deque<EventType> listOfEvents) | ||||||
|         { |         { | ||||||
|             if (LOGGER.isTraceEnabled()) |             if (LOGGER.isTraceEnabled()) | ||||||
|             { |             { | ||||||
|                 LOGGER.trace("List of Events:" + listOfEvents); |                 LOGGER.trace("List of Events:" + listOfEvents); | ||||||
|                 LOGGER.trace("Sending event:" + event); |                 LOGGER.trace("Sending event:" + event); | ||||||
|             } |             } | ||||||
|             // Need to execute this in another read txn because Camel expects it |  | ||||||
|             transactionService.getRetryingTransactionHelper().doInTransaction((RetryingTransactionCallback<Void>) () -> { |  | ||||||
|                 event2MessageProducer.send(event); |  | ||||||
|  |  | ||||||
|                 return null; |  | ||||||
|             }, true, false); |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -0,0 +1,179 @@ | |||||||
|  | /* | ||||||
|  |  * #%L | ||||||
|  |  * Alfresco Repository | ||||||
|  |  * %% | ||||||
|  |  * Copyright (C) 2005 - 2021 Alfresco Software Limited | ||||||
|  |  * %% | ||||||
|  |  * This file is part of the Alfresco software. | ||||||
|  |  * If the software was purchased under a paid Alfresco license, the terms of | ||||||
|  |  * the paid license agreement will prevail.  Otherwise, the software is | ||||||
|  |  * provided under the following open source license terms: | ||||||
|  |  * | ||||||
|  |  * Alfresco is free software: you can redistribute it and/or modify | ||||||
|  |  * it under the terms of the GNU Lesser General Public License as published by | ||||||
|  |  * the Free Software Foundation, either version 3 of the License, or | ||||||
|  |  * (at your option) any later version. | ||||||
|  |  * | ||||||
|  |  * Alfresco is distributed in the hope that it will be useful, | ||||||
|  |  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  |  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  |  * GNU Lesser General Public License for more details. | ||||||
|  |  * | ||||||
|  |  * You should have received a copy of the GNU Lesser General Public License | ||||||
|  |  * along with Alfresco. If not, see <http://www.gnu.org/licenses/>. | ||||||
|  |  * #L% | ||||||
|  |  */ | ||||||
|  | package org.alfresco.repo.event2; | ||||||
|  |  | ||||||
|  | import java.util.concurrent.BlockingQueue; | ||||||
|  | import java.util.concurrent.Callable; | ||||||
|  | import java.util.concurrent.CountDownLatch; | ||||||
|  | import java.util.concurrent.Executor; | ||||||
|  | import java.util.concurrent.LinkedBlockingQueue; | ||||||
|  | import java.util.concurrent.TimeUnit; | ||||||
|  |  | ||||||
|  | import org.alfresco.repo.event.v1.model.RepoEvent; | ||||||
|  | import org.alfresco.util.PropertyCheck; | ||||||
|  | import org.apache.commons.logging.Log; | ||||||
|  | import org.apache.commons.logging.LogFactory; | ||||||
|  | import org.springframework.beans.factory.InitializingBean; | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * This queue allows to create asynchronously the RepoEvent offloading the work to a ThreadPool but | ||||||
|  |  * at the same time it preserves the order of the events | ||||||
|  |  */ | ||||||
|  | public class EventGeneratorQueue implements InitializingBean | ||||||
|  | { | ||||||
|  | 	protected static final Log LOGGER = LogFactory.getLog(EventGeneratorQueue.class); | ||||||
|  |      | ||||||
|  |     protected Executor enqueueThreadPoolExecutor; | ||||||
|  |     protected Executor dequeueThreadPoolExecutor; | ||||||
|  |     protected Event2MessageProducer event2MessageProducer; | ||||||
|  |     protected BlockingQueue<EventInMaking> queue = new LinkedBlockingQueue<>(); | ||||||
|  |     protected Runnable listener = createListener(); | ||||||
|  |  | ||||||
|  |     @Override | ||||||
|  |     public void afterPropertiesSet() throws Exception | ||||||
|  |     { | ||||||
|  |         PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", enqueueThreadPoolExecutor); | ||||||
|  |         PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", dequeueThreadPoolExecutor); | ||||||
|  |         PropertyCheck.mandatory(this, "event2MessageProducer", event2MessageProducer); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer) | ||||||
|  |     { | ||||||
|  |         this.event2MessageProducer = event2MessageProducer; | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     public void setEnqueueThreadPoolExecutor(Executor enqueueThreadPoolExecutor) | ||||||
|  |     { | ||||||
|  |         this.enqueueThreadPoolExecutor = enqueueThreadPoolExecutor; | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     public void setDequeueThreadPoolExecutor(Executor dequeueThreadPoolExecutor) | ||||||
|  |     { | ||||||
|  |         this.dequeueThreadPoolExecutor = dequeueThreadPoolExecutor; | ||||||
|  |         dequeueThreadPoolExecutor.execute(listener); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * Procedure to enqueue the callback functions that creates an event. | ||||||
|  |      * @param maker Callback function that creates an event. | ||||||
|  |      */ | ||||||
|  |     public void accept(Callable<RepoEvent<?>> maker) | ||||||
|  |     { | ||||||
|  |         EventInMaking eventInMaking = new EventInMaking(maker); | ||||||
|  |         queue.offer(eventInMaking); | ||||||
|  |         enqueueThreadPoolExecutor.execute(() -> { | ||||||
|  |             try | ||||||
|  |             { | ||||||
|  |                 eventInMaking.make(); | ||||||
|  |             } | ||||||
|  |             catch (Exception e) | ||||||
|  |             { | ||||||
|  |                 LOGGER.error("Unexpected error while enqueuing maker function for repository event" + e); | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * Create listener task in charge of dequeuing and sending events ready to be sent. | ||||||
|  |      * @return The task in charge of dequeuing and sending events ready to be sent. | ||||||
|  |      */ | ||||||
|  |     private Runnable createListener() | ||||||
|  |     { | ||||||
|  |         return new Runnable() | ||||||
|  |         { | ||||||
|  |             @Override | ||||||
|  |             public void run() | ||||||
|  |             { | ||||||
|  |                 try  | ||||||
|  |                 { | ||||||
|  |                     while (!Thread.interrupted()) | ||||||
|  |                     { | ||||||
|  |                         try | ||||||
|  |                         { | ||||||
|  |                             EventInMaking eventInMaking = queue.take(); | ||||||
|  |                             RepoEvent<?> event = eventInMaking.getEventWhenReady(); | ||||||
|  |                             if (event != null) | ||||||
|  |                             { | ||||||
|  |                                 event2MessageProducer.send(event); | ||||||
|  |                             } | ||||||
|  |                         } | ||||||
|  |                         catch (Exception e) | ||||||
|  |                         { | ||||||
|  |                             LOGGER.error("Unexpected error while dequeuing and sending repository event" + e); | ||||||
|  |                         } | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |                 finally | ||||||
|  |                 { | ||||||
|  |                     LOGGER.warn("Unexpected: rescheduling the listener thread."); | ||||||
|  |                     dequeueThreadPoolExecutor.execute(listener); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /* | ||||||
|  |      * Simple class that makes events and allows to retrieve them when ready | ||||||
|  |      */ | ||||||
|  |     private static class EventInMaking | ||||||
|  |     { | ||||||
|  |         private Callable<RepoEvent<?>> maker; | ||||||
|  |         private volatile RepoEvent<?> event; | ||||||
|  |         private CountDownLatch latch; | ||||||
|  |          | ||||||
|  |         public EventInMaking(Callable<RepoEvent<?>> maker) | ||||||
|  |         { | ||||||
|  |             this.maker = maker; | ||||||
|  |             this.latch = new CountDownLatch(1); | ||||||
|  |         } | ||||||
|  |          | ||||||
|  |         public void make() throws Exception | ||||||
|  |         { | ||||||
|  |             try | ||||||
|  |             { | ||||||
|  |                 event = maker.call(); | ||||||
|  |             } | ||||||
|  |             finally  | ||||||
|  |             { | ||||||
|  |                 latch.countDown(); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |          | ||||||
|  |         public RepoEvent<?> getEventWhenReady() throws InterruptedException | ||||||
|  |         { | ||||||
|  |             latch.await(30, TimeUnit.SECONDS); | ||||||
|  |             return event; | ||||||
|  |         } | ||||||
|  |          | ||||||
|  |         @Override | ||||||
|  |         public String toString() | ||||||
|  |         { | ||||||
|  |             return maker.toString(); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  | } | ||||||
| @@ -38,9 +38,10 @@ | |||||||
|         <property name="dictionaryService" ref="dictionaryService"/> |         <property name="dictionaryService" ref="dictionaryService"/> | ||||||
|         <property name="descriptorService" ref="descriptorComponent"/> |         <property name="descriptorService" ref="descriptorComponent"/> | ||||||
|         <property name="eventFilterRegistry" ref="event2FilterRegistry"/> |         <property name="eventFilterRegistry" ref="event2FilterRegistry"/> | ||||||
|         <property name="event2MessageProducer" ref="event2MessageProducer"/> |  | ||||||
|         <property name="transactionService" ref="transactionService"/> |         <property name="transactionService" ref="transactionService"/> | ||||||
|         <property name="personService" ref="personService"/> |         <property name="personService" ref="personService"/> | ||||||
|  |         <property name="nodeResourceHelper" ref="nodeResourceHelper"/> | ||||||
|  |         <property name="eventGeneratorQueue" ref="eventGeneratorQueue"/> | ||||||
|     </bean> |     </bean> | ||||||
|  |  | ||||||
|     <bean id="baseNodeResourceHelper" abstract="true"> |     <bean id="baseNodeResourceHelper" abstract="true"> | ||||||
| @@ -54,7 +55,45 @@ | |||||||
|  |  | ||||||
|     <bean id="nodeResourceHelper" class="org.alfresco.repo.event2.NodeResourceHelper" parent="baseNodeResourceHelper"/> |     <bean id="nodeResourceHelper" class="org.alfresco.repo.event2.NodeResourceHelper" parent="baseNodeResourceHelper"/> | ||||||
|  |  | ||||||
|     <bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2"> |     <bean id="eventGeneratorV2" class="org.alfresco.repo.event2.EventGenerator" parent="baseEventGeneratorV2"/> | ||||||
|         <property name="nodeResourceHelper" ref="nodeResourceHelper"/> |  | ||||||
|  |     <bean id="eventGeneratorQueue" class="org.alfresco.repo.event2.EventGeneratorQueue" > | ||||||
|  |         <property name="enqueueThreadPoolExecutor"> | ||||||
|  |             <ref bean="eventAsyncEnqueueThreadPool" /> | ||||||
|  |         </property> | ||||||
|  |         <property name="dequeueThreadPoolExecutor"> | ||||||
|  |             <ref bean="eventAsyncDequeueThreadPool" /> | ||||||
|  |         </property> | ||||||
|  |        <property name="event2MessageProducer" ref="event2MessageProducer"/> | ||||||
|  |     </bean> | ||||||
|  |  | ||||||
|  |     <bean id="eventAsyncEnqueueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean"> | ||||||
|  |         <property name="poolName"> | ||||||
|  |             <value>eventAsyncEnqueueThreadPool</value> | ||||||
|  |         </property> | ||||||
|  |         <property name="corePoolSize"> | ||||||
|  |             <value>${repo.event2.queue.enqueueThreadPool.coreSize}</value> | ||||||
|  |         </property> | ||||||
|  |         <property name="maximumPoolSize"> | ||||||
|  |             <value>${repo.event2.queue.enqueueThreadPool.maximumSize}</value> | ||||||
|  |         </property> | ||||||
|  |         <property name="threadPriority"> | ||||||
|  |             <value>${repo.event2.queue.enqueueThreadPool.priority}</value> | ||||||
|  |         </property> | ||||||
|  |     </bean> | ||||||
|  |  | ||||||
|  |     <bean id="eventAsyncDequeueThreadPool" class="org.alfresco.util.ThreadPoolExecutorFactoryBean"> | ||||||
|  |         <property name="poolName"> | ||||||
|  |             <value>eventAsyncDequeueThreadPool</value> | ||||||
|  |         </property> | ||||||
|  |         <property name="corePoolSize"> | ||||||
|  |             <value>${repo.event2.queue.dequeueThreadPool.coreSize}</value> | ||||||
|  |         </property> | ||||||
|  |         <property name="maximumPoolSize"> | ||||||
|  |             <value>${repo.event2.queue.dequeueThreadPool.maximumSize}</value> | ||||||
|  |         </property> | ||||||
|  |         <property name="threadPriority"> | ||||||
|  |             <value>${repo.event2.queue.dequeueThreadPool.priority}</value> | ||||||
|  |         </property> | ||||||
|     </bean> |     </bean> | ||||||
| </beans> | </beans> | ||||||
|   | |||||||
| @@ -3,7 +3,7 @@ | |||||||
| repository.name=Main Repository | repository.name=Main Repository | ||||||
|  |  | ||||||
| # Schema number | # Schema number | ||||||
| version.schema=14002 | version.schema=14100 | ||||||
|  |  | ||||||
| # Directory configuration | # Directory configuration | ||||||
|  |  | ||||||
| @@ -1207,6 +1207,15 @@ repo.event2.filter.childAssocTypes=rn:rendition | |||||||
| repo.event2.filter.users=System, null | repo.event2.filter.users=System, null | ||||||
| # Topic name | # Topic name | ||||||
| repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2 | repo.event2.topic.endpoint=amqp:topic:alfresco.repo.event2 | ||||||
|  | # Thread pool for async enqueue of repo events | ||||||
|  | repo.event2.queue.enqueueThreadPool.priority=1 | ||||||
|  | repo.event2.queue.enqueueThreadPool.coreSize=8 | ||||||
|  | repo.event2.queue.enqueueThreadPool.maximumSize=10 | ||||||
|  | # Thread pool for async dequeue and delivery of repo events | ||||||
|  | repo.event2.queue.dequeueThreadPool.priority=1 | ||||||
|  | repo.event2.queue.dequeueThreadPool.coreSize=1 | ||||||
|  | repo.event2.queue.dequeueThreadPool.maximumSize=1 | ||||||
|  |  | ||||||
|  |  | ||||||
| # MNT-21083 | # MNT-21083 | ||||||
| # --DELETE_NOT_EXISTS - default settings | # --DELETE_NOT_EXISTS - default settings | ||||||
|   | |||||||
| @@ -30,6 +30,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; | |||||||
| import static org.awaitility.Awaitility.await; | import static org.awaitility.Awaitility.await; | ||||||
|  |  | ||||||
| import java.util.ArrayList; | import java.util.ArrayList; | ||||||
|  | import java.util.Collections; | ||||||
| import java.util.List; | import java.util.List; | ||||||
|  |  | ||||||
| import javax.jms.ConnectionFactory; | import javax.jms.ConnectionFactory; | ||||||
| @@ -77,12 +78,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; | |||||||
|  |  | ||||||
| public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | ||||||
| { | { | ||||||
|  |     protected static final boolean            DEBUG = false; | ||||||
|  |  | ||||||
|     protected static final String             TEST_NAMESPACE  = "http://www.alfresco.org/test/ContextAwareRepoEvent"; |     protected static final String             TEST_NAMESPACE  = "http://www.alfresco.org/test/ContextAwareRepoEvent"; | ||||||
|  |     protected static final RepoEventContainer EVENT_CONTAINER = new RepoEventContainer(); | ||||||
|  |  | ||||||
|     private static final   String             BROKER_URL      = "tcp://localhost:61616"; |     private static final   String             BROKER_URL      = "tcp://localhost:61616"; | ||||||
|     private static final   String             TOPIC_NAME      = "alfresco.repo.event2"; |     private static final   String             TOPIC_NAME      = "alfresco.repo.event2"; | ||||||
|     private static final   String             CAMEL_ROUTE     = "jms:topic:" + TOPIC_NAME; |     private static final   String             CAMEL_ROUTE     = "jms:topic:" + TOPIC_NAME; | ||||||
|     private static final   RepoEventContainer EVENT_CONTAINER = new RepoEventContainer(); |  | ||||||
|     private static final   CamelContext       CAMEL_CONTEXT   = new DefaultCamelContext(); |     private static final   CamelContext       CAMEL_CONTEXT   = new DefaultCamelContext(); | ||||||
|  |  | ||||||
|     private static boolean isCamelConfigured; |     private static boolean isCamelConfigured; | ||||||
| @@ -104,6 +107,13 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | |||||||
|     @Autowired |     @Autowired | ||||||
|     protected ObjectMapper event2ObjectMapper; |     protected ObjectMapper event2ObjectMapper; | ||||||
|  |  | ||||||
|  |     @Autowired @Qualifier("eventGeneratorV2") | ||||||
|  |     protected EventGenerator eventGenerator; | ||||||
|  |  | ||||||
|  |     @Autowired | ||||||
|  |     @Qualifier("eventGeneratorQueue") | ||||||
|  |     protected EventGeneratorQueue eventQueue; | ||||||
|  |  | ||||||
|     protected NodeRef rootNodeRef; |     protected NodeRef rootNodeRef; | ||||||
|  |  | ||||||
|     @BeforeClass |     @BeforeClass | ||||||
| @@ -141,6 +151,33 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | |||||||
|             } |             } | ||||||
|             return nodeService.getRootNode(storeRef); |             return nodeService.getRootNode(storeRef); | ||||||
|         }); |         }); | ||||||
|  |  | ||||||
|  |         flushSpuriousEvents(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /* | ||||||
|  |      * When running with an empty database some events related to the creation may | ||||||
|  |      * creep up here making the test fails. After attempting several other | ||||||
|  |      * strategies, a smart sleep seems to do the work. | ||||||
|  |      */ | ||||||
|  |     protected void flushSpuriousEvents() throws InterruptedException | ||||||
|  |     { | ||||||
|  |         int maxloops = 5; | ||||||
|  |          | ||||||
|  |         int count = maxloops; | ||||||
|  |         do | ||||||
|  |         { | ||||||
|  |             Thread.sleep(165l); | ||||||
|  |             if (EVENT_CONTAINER.isEmpty()) | ||||||
|  |             { | ||||||
|  |                 count--; | ||||||
|  |             } else  | ||||||
|  |             { | ||||||
|  |                 EVENT_CONTAINER.reset(); | ||||||
|  |                 count = maxloops; | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |         } while (count > 0); | ||||||
|     } |     } | ||||||
|      |      | ||||||
|     @After |     @After | ||||||
| @@ -179,6 +216,16 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | |||||||
|             propertyMap).getChildRef()); |             propertyMap).getChildRef()); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     protected NodeRef updateNodeName(NodeRef nodeRef, String newName) | ||||||
|  |     { | ||||||
|  |         PropertyMap propertyMap = new PropertyMap(); | ||||||
|  |         propertyMap.put(ContentModel.PROP_NAME, newName); | ||||||
|  |         return retryingTransactionHelper.doInTransaction(() -> { | ||||||
|  |             nodeService.addProperties(nodeRef, propertyMap); | ||||||
|  |             return null; | ||||||
|  |         }); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     protected void deleteNode(NodeRef nodeRef) |     protected void deleteNode(NodeRef nodeRef) | ||||||
|     { |     { | ||||||
|         retryingTransactionHelper.doInTransaction(() -> { |         retryingTransactionHelper.doInTransaction(() -> { | ||||||
| @@ -376,13 +423,18 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | |||||||
|  |  | ||||||
|     public static class RepoEventContainer implements Processor |     public static class RepoEventContainer implements Processor | ||||||
|     { |     { | ||||||
|         private final List<RepoEvent<?>> events = new ArrayList<>(); |         private final List<RepoEvent<?>> events = Collections.synchronizedList(new ArrayList<>()); | ||||||
|  |  | ||||||
|         @Override |         @Override | ||||||
|         public void process(Exchange exchange) |         public void process(Exchange exchange) | ||||||
|         { |         { | ||||||
|             Object object = exchange.getIn().getBody(); |             Object object = exchange.getIn().getBody(); | ||||||
|             events.add((RepoEvent<?>) object); |             events.add((RepoEvent<?>) object); | ||||||
|  |  | ||||||
|  |             if (DEBUG) | ||||||
|  |             { | ||||||
|  |                 System.err.println("XX: "+object); | ||||||
|  |             } | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         public List<RepoEvent<?>> getEvents() |         public List<RepoEvent<?>> getEvents() | ||||||
| @@ -404,6 +456,12 @@ public abstract class AbstractContextAwareRepoEvent extends BaseSpringTest | |||||||
|         { |         { | ||||||
|             events.clear(); |             events.clear(); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  |         public boolean isEmpty() | ||||||
|  |         { | ||||||
|  |             return events.isEmpty(); | ||||||
|  |         } | ||||||
|  |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     @SuppressWarnings("unchecked") |     @SuppressWarnings("unchecked") | ||||||
|   | |||||||
| @@ -0,0 +1,290 @@ | |||||||
|  | /* | ||||||
|  |  * #%L | ||||||
|  |  * Alfresco Repository | ||||||
|  |  * %% | ||||||
|  |  * Copyright (C) 2005 - 2021 Alfresco Software Limited | ||||||
|  |  * %% | ||||||
|  |  * This file is part of the Alfresco software. | ||||||
|  |  * If the software was purchased under a paid Alfresco license, the terms of | ||||||
|  |  * the paid license agreement will prevail.  Otherwise, the software is | ||||||
|  |  * provided under the following open source license terms: | ||||||
|  |  * | ||||||
|  |  * Alfresco is free software: you can redistribute it and/or modify | ||||||
|  |  * it under the terms of the GNU Lesser General Public License as published by | ||||||
|  |  * the Free Software Foundation, either version 3 of the License, or | ||||||
|  |  * (at your option) any later version. | ||||||
|  |  * | ||||||
|  |  * Alfresco is distributed in the hope that it will be useful, | ||||||
|  |  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  |  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  |  * GNU Lesser General Public License for more details. | ||||||
|  |  * | ||||||
|  |  * You should have received a copy of the GNU Lesser General Public License | ||||||
|  |  * along with Alfresco. If not, see <http://www.gnu.org/licenses/>. | ||||||
|  |  * #L% | ||||||
|  |  */ | ||||||
|  | package org.alfresco.repo.event2; | ||||||
|  |  | ||||||
|  | import static java.lang.Thread.sleep; | ||||||
|  | import static org.junit.Assert.assertEquals; | ||||||
|  | import static org.mockito.ArgumentMatchers.any; | ||||||
|  | import static org.mockito.Mockito.doThrow; | ||||||
|  | import static org.mockito.Mockito.mock; | ||||||
|  | import static org.mockito.Mockito.when; | ||||||
|  |  | ||||||
|  | import java.util.HashMap; | ||||||
|  | import java.util.List; | ||||||
|  | import java.util.Map; | ||||||
|  | import java.util.concurrent.Callable; | ||||||
|  | import java.util.concurrent.CopyOnWriteArrayList; | ||||||
|  | import java.util.concurrent.Executor; | ||||||
|  | import java.util.concurrent.ExecutorService; | ||||||
|  | import java.util.concurrent.SynchronousQueue; | ||||||
|  | import java.util.concurrent.ThreadPoolExecutor; | ||||||
|  | import java.util.concurrent.TimeUnit; | ||||||
|  |  | ||||||
|  | import org.alfresco.repo.event.v1.model.RepoEvent; | ||||||
|  | import org.junit.After; | ||||||
|  | import org.junit.Before; | ||||||
|  | import org.junit.Test; | ||||||
|  | import org.mockito.Mockito; | ||||||
|  | import org.mockito.invocation.InvocationOnMock; | ||||||
|  | import org.mockito.stubbing.Answer; | ||||||
|  |  | ||||||
|  | public class EventGeneratorQueueUnitTest | ||||||
|  | { | ||||||
|  |     private EventGeneratorQueue queue; | ||||||
|  |  | ||||||
|  |     private Event2MessageProducer bus; | ||||||
|  |     private ExecutorService enqueuePool; | ||||||
|  |     private ExecutorService dequeuePool; | ||||||
|  |     private List<RepoEvent<?>> recordedEvents; | ||||||
|  |     private Map<String, RepoEvent<?>> events; | ||||||
|  |  | ||||||
|  |     @Before | ||||||
|  |     public void setup()  | ||||||
|  |     { | ||||||
|  |         queue = new EventGeneratorQueue(); | ||||||
|  |  | ||||||
|  |         enqueuePool = newThreadPool(); | ||||||
|  |         queue.setEnqueueThreadPoolExecutor(enqueuePool); | ||||||
|  |         dequeuePool = newThreadPool(); | ||||||
|  |         queue.setDequeueThreadPoolExecutor(dequeuePool); | ||||||
|  |  | ||||||
|  |         bus = mock(Event2MessageProducer.class); | ||||||
|  |         queue.setEvent2MessageProducer(bus); | ||||||
|  |  | ||||||
|  |         events = new HashMap<>(); | ||||||
|  |  | ||||||
|  |         setupEventsRecorder(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     @After | ||||||
|  |     public void teardown()  | ||||||
|  |     { | ||||||
|  |         enqueuePool.shutdown(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     private void setupEventsRecorder() | ||||||
|  |     { | ||||||
|  |         recordedEvents = new CopyOnWriteArrayList<>(); | ||||||
|  |  | ||||||
|  |         Mockito.doAnswer(new Answer<Void>() | ||||||
|  |         { | ||||||
|  |             @Override | ||||||
|  |             public Void answer(InvocationOnMock invocation) throws Throwable | ||||||
|  |             { | ||||||
|  |                 RepoEvent<?> event = invocation.getArgument(0, RepoEvent.class); | ||||||
|  |                 recordedEvents.add(event); | ||||||
|  |                 return null; | ||||||
|  |             } | ||||||
|  |         }).when(bus).send(any()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     @Test | ||||||
|  |     public void shouldReceiveSingleQuickMessage() throws Exception  | ||||||
|  |     { | ||||||
|  |         queue.accept(messageWithDelay("A", 55l)); | ||||||
|  |  | ||||||
|  |         sleep(150l); | ||||||
|  |  | ||||||
|  |         assertEquals(1, recordedEvents.size()); | ||||||
|  |         assertEquals("A", recordedEvents.get(0).getId()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     @Test | ||||||
|  |     public void shouldNotReceiveEventsWhenMessageIsNull() throws Exception  | ||||||
|  |     { | ||||||
|  |         queue.accept(() -> { return null; }); | ||||||
|  |  | ||||||
|  |         sleep(150l); | ||||||
|  |  | ||||||
|  |         assertEquals(0, recordedEvents.size()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     @Test | ||||||
|  |     public void shouldReceiveMultipleMessagesPreservingOrderScenarioOne() throws Exception { | ||||||
|  |         queue.accept(messageWithDelay("A", 0l)); | ||||||
|  |         queue.accept(messageWithDelay("B", 100l)); | ||||||
|  |         queue.accept(messageWithDelay("C", 200l)); | ||||||
|  |  | ||||||
|  |         sleep(450l); | ||||||
|  |  | ||||||
|  |         assertEquals(3, recordedEvents.size()); | ||||||
|  |         assertEquals("A", recordedEvents.get(0).getId()); | ||||||
|  |         assertEquals("B", recordedEvents.get(1).getId()); | ||||||
|  |         assertEquals("C", recordedEvents.get(2).getId()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     @Test | ||||||
|  |     public void shouldReceiveMultipleMessagesPreservingOrderScenarioTwo() throws Exception | ||||||
|  |     { | ||||||
|  |         queue.accept(messageWithDelay("A", 300l)); | ||||||
|  |         queue.accept(messageWithDelay("B", 150l)); | ||||||
|  |         queue.accept(messageWithDelay("C", 0l)); | ||||||
|  |  | ||||||
|  |         sleep(950l); | ||||||
|  |  | ||||||
|  |         assertEquals(3, recordedEvents.size()); | ||||||
|  |         assertEquals("A", recordedEvents.get(0).getId()); | ||||||
|  |         assertEquals("B", recordedEvents.get(1).getId()); | ||||||
|  |         assertEquals("C", recordedEvents.get(2).getId()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     @Test | ||||||
|  |     public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisoned() throws Exception | ||||||
|  |     { | ||||||
|  |         queue.accept(messageWithDelay("A", 300l)); | ||||||
|  |         queue.accept(() -> {throw new RuntimeException("Boom! (not to worry, this is a test)");}); | ||||||
|  |         queue.accept(messageWithDelay("B", 55l)); | ||||||
|  |         queue.accept(messageWithDelay("C", 0l)); | ||||||
|  |  | ||||||
|  |         sleep(950l); | ||||||
|  |  | ||||||
|  |         assertEquals(3, recordedEvents.size()); | ||||||
|  |         assertEquals("A", recordedEvents.get(0).getId()); | ||||||
|  |         assertEquals("B", recordedEvents.get(1).getId()); | ||||||
|  |         assertEquals("C", recordedEvents.get(2).getId()); | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     @Test | ||||||
|  |     public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisoned() throws Exception | ||||||
|  |     { | ||||||
|  |         Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l); | ||||||
|  |         RepoEvent<?> messageB = makerB.call(); | ||||||
|  |         doThrow(new RuntimeException("Boom! (not to worry, this is a test)")).when(bus).send(messageB); | ||||||
|  |         queue.accept(messageWithDelay("A", 300l)); | ||||||
|  |         queue.accept(makerB); | ||||||
|  |         queue.accept(messageWithDelay("C", 0l)); | ||||||
|  |  | ||||||
|  |         sleep(950l); | ||||||
|  |  | ||||||
|  |         assertEquals(2, recordedEvents.size()); | ||||||
|  |         assertEquals("A", recordedEvents.get(0).getId()); | ||||||
|  |         assertEquals("C", recordedEvents.get(1).getId()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     @Test | ||||||
|  |     public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenMakerPoisonedWithError() throws Exception | ||||||
|  |     { | ||||||
|  |         queue.accept(messageWithDelay("A", 300l)); | ||||||
|  |         queue.accept(() -> {throw new OutOfMemoryError("Boom! (not to worry, this is a test)");}); | ||||||
|  |         queue.accept(messageWithDelay("B", 55l)); | ||||||
|  |         queue.accept(messageWithDelay("C", 0l)); | ||||||
|  |  | ||||||
|  |         sleep(950l); | ||||||
|  |  | ||||||
|  |         assertEquals(3, recordedEvents.size()); | ||||||
|  |         assertEquals("A", recordedEvents.get(0).getId()); | ||||||
|  |         assertEquals("B", recordedEvents.get(1).getId()); | ||||||
|  |         assertEquals("C", recordedEvents.get(2).getId()); | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     @Test | ||||||
|  |     public void shouldReceiveMultipleMessagesPreservingOrderEvenWhenSenderPoisonedWithError() throws Exception | ||||||
|  |     { | ||||||
|  |         Callable<RepoEvent<?>> makerB = messageWithDelay("B", 55l); | ||||||
|  |         RepoEvent<?> messageB = makerB.call(); | ||||||
|  |         doThrow(new OutOfMemoryError("Boom! (not to worry, this is a test)")).when(bus).send(messageB); | ||||||
|  |         queue.accept(messageWithDelay("A", 300l)); | ||||||
|  |         queue.accept(makerB); | ||||||
|  |         queue.accept(messageWithDelay("C", 0l)); | ||||||
|  |  | ||||||
|  |         sleep(950l); | ||||||
|  |  | ||||||
|  |         assertEquals(2, recordedEvents.size()); | ||||||
|  |         assertEquals("A", recordedEvents.get(0).getId()); | ||||||
|  |         assertEquals("C", recordedEvents.get(1).getId()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     private Callable<RepoEvent<?>> messageWithDelay(String id, long delay) | ||||||
|  |     { | ||||||
|  |         Callable<RepoEvent<?>> res = new Callable<RepoEvent<?>>() { | ||||||
|  |  | ||||||
|  |             @Override | ||||||
|  |             public RepoEvent<?> call() throws Exception | ||||||
|  |             { | ||||||
|  |                 if(delay != 0) | ||||||
|  |                 { | ||||||
|  |                     sleep(delay);  | ||||||
|  |                 } | ||||||
|  |                 return newRepoEvent(id); | ||||||
|  |             }  | ||||||
|  |              | ||||||
|  |             @Override | ||||||
|  |             public String toString() | ||||||
|  |             { | ||||||
|  |                 return id; | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |         return res; | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     private RepoEvent<?> newRepoEvent(String id) | ||||||
|  |     { | ||||||
|  |         RepoEvent<?> ev = events.get(id); | ||||||
|  |         if (ev!=null) | ||||||
|  |             return ev; | ||||||
|  |          | ||||||
|  |         ev = mock(RepoEvent.class); | ||||||
|  |         when(ev.getId()).thenReturn(id); | ||||||
|  |         when(ev.toString()).thenReturn(id); | ||||||
|  |         events.put(id, ev); | ||||||
|  |  | ||||||
|  |         return ev; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     public static ExecutorService newThreadPool()  | ||||||
|  |     { | ||||||
|  |         return new ThreadPoolExecutor(2, Integer.MAX_VALUE, | ||||||
|  |                                       60L, TimeUnit.SECONDS, | ||||||
|  |                                       new SynchronousQueue<Runnable>()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     public static final Executor SYNC_EXECUTOR_SAME_THREAD = new Executor() | ||||||
|  |     { | ||||||
|  |         @Override | ||||||
|  |         public void execute(Runnable command) | ||||||
|  |         { | ||||||
|  |             command.run(); | ||||||
|  |         } | ||||||
|  |     }; | ||||||
|  |  | ||||||
|  |     public static final Executor SYNC_EXECUTOR_NEW_THREAD = new Executor() | ||||||
|  |     { | ||||||
|  |         @Override | ||||||
|  |         public void execute(Runnable command) | ||||||
|  |         { | ||||||
|  |             Thread t = new Thread(command); | ||||||
|  |             t.start(); | ||||||
|  |             try | ||||||
|  |             { | ||||||
|  |                 t.join(); | ||||||
|  |             } | ||||||
|  |             catch (InterruptedException e) | ||||||
|  |             { | ||||||
|  |                 Thread.currentThread().interrupt(); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     }; | ||||||
|  | } | ||||||
| @@ -0,0 +1,249 @@ | |||||||
|  | /* | ||||||
|  |  * #%L | ||||||
|  |  * Alfresco Repository | ||||||
|  |  * %% | ||||||
|  |  * Copyright (C) 2005 - 2020 Alfresco Software Limited | ||||||
|  |  * %% | ||||||
|  |  * This file is part of the Alfresco software. | ||||||
|  |  * If the software was purchased under a paid Alfresco license, the terms of | ||||||
|  |  * the paid license agreement will prevail.  Otherwise, the software is | ||||||
|  |  * provided under the following open source license terms: | ||||||
|  |  * | ||||||
|  |  * Alfresco is free software: you can redistribute it and/or modify | ||||||
|  |  * it under the terms of the GNU Lesser General Public License as published by | ||||||
|  |  * the Free Software Foundation, either version 3 of the License, or | ||||||
|  |  * (at your option) any later version. | ||||||
|  |  * | ||||||
|  |  * Alfresco is distributed in the hope that it will be useful, | ||||||
|  |  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  |  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  |  * GNU Lesser General Public License for more details. | ||||||
|  |  * | ||||||
|  |  * You should have received a copy of the GNU Lesser General Public License | ||||||
|  |  * along with Alfresco. If not, see <http://www.gnu.org/licenses/>. | ||||||
|  |  * #L% | ||||||
|  |  */ | ||||||
|  | package org.alfresco.repo.event2; | ||||||
|  |  | ||||||
|  | import java.util.Collections; | ||||||
|  | import java.util.LinkedList; | ||||||
|  | import java.util.List; | ||||||
|  | import java.util.Set; | ||||||
|  | import java.util.concurrent.TimeUnit; | ||||||
|  |  | ||||||
|  | import javax.jms.Destination; | ||||||
|  | import javax.jms.JMSException; | ||||||
|  | import javax.jms.Message; | ||||||
|  | import javax.jms.MessageConsumer; | ||||||
|  | import javax.jms.MessageListener; | ||||||
|  | import javax.jms.Session; | ||||||
|  |  | ||||||
|  | import org.alfresco.model.ContentModel; | ||||||
|  | import org.alfresco.repo.event.databind.ObjectMapperFactory; | ||||||
|  | import org.alfresco.repo.event.v1.model.RepoEvent; | ||||||
|  | import org.alfresco.service.cmr.repository.NodeRef; | ||||||
|  | import org.apache.activemq.ActiveMQConnection; | ||||||
|  | import org.apache.activemq.ActiveMQConnectionFactory; | ||||||
|  | import org.apache.activemq.advisory.DestinationSource; | ||||||
|  | import org.apache.activemq.command.ActiveMQQueue; | ||||||
|  | import org.apache.activemq.command.ActiveMQTextMessage; | ||||||
|  | import org.apache.activemq.command.ActiveMQTopic; | ||||||
|  | import org.awaitility.Awaitility; | ||||||
|  | import org.junit.After; | ||||||
|  | import org.junit.Before; | ||||||
|  | import org.junit.Test; | ||||||
|  | import org.springframework.beans.factory.annotation.Autowired; | ||||||
|  | import org.springframework.beans.factory.annotation.Qualifier; | ||||||
|  |  | ||||||
|  | import com.fasterxml.jackson.databind.ObjectMapper; | ||||||
|  |  | ||||||
|  | public class EventGeneratorTest extends AbstractContextAwareRepoEvent | ||||||
|  | { | ||||||
|  |     private static final String EVENT2_TOPIC_NAME = "alfresco.repo.event2"; | ||||||
|  |  | ||||||
|  |     private static final long DUMP_BROKER_TIMEOUT = 50000000l; | ||||||
|  |  | ||||||
|  |     @Autowired @Qualifier("event2ObjectMapper") | ||||||
|  |     private ObjectMapper objectMapper; | ||||||
|  |  | ||||||
|  |     private ActiveMQConnection connection; | ||||||
|  |     protected List<RepoEvent<?>> receivedEvents; | ||||||
|  |  | ||||||
|  |     @Before | ||||||
|  |     public void startupTopicListener() throws Exception | ||||||
|  |     { | ||||||
|  |         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); | ||||||
|  |         connection = (ActiveMQConnection) connectionFactory.createConnection(); | ||||||
|  |         connection.start(); | ||||||
|  |  | ||||||
|  |         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | ||||||
|  |         Destination destination = session.createTopic(EVENT2_TOPIC_NAME); | ||||||
|  |         MessageConsumer consumer = session.createConsumer(destination); | ||||||
|  |  | ||||||
|  |         receivedEvents = Collections.synchronizedList(new LinkedList<>()); | ||||||
|  |         consumer.setMessageListener(new MessageListener() | ||||||
|  |         { | ||||||
|  |             @Override | ||||||
|  |             public void onMessage(Message message) | ||||||
|  |             { | ||||||
|  |                 String text = getText(message); | ||||||
|  |                 RepoEvent<?> event = toRepoEvent(text); | ||||||
|  |  | ||||||
|  |                 if (DEBUG) | ||||||
|  |                 { | ||||||
|  |                     System.err.println("RX: " + event); | ||||||
|  |                 } | ||||||
|  |  | ||||||
|  |                 receivedEvents.add(event); | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             private RepoEvent<?> toRepoEvent(String json) | ||||||
|  |             { | ||||||
|  |                 try | ||||||
|  |                 { | ||||||
|  |                     return objectMapper.readValue(json, RepoEvent.class); | ||||||
|  |                 } catch (Exception e) | ||||||
|  |                 { | ||||||
|  |                     e.printStackTrace(); | ||||||
|  |                     return null; | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |         if (DEBUG) | ||||||
|  |         { | ||||||
|  |             System.err.println("Now actively listening on topic " + EVENT2_TOPIC_NAME); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     protected ObjectMapper createObjectMapper() | ||||||
|  |     { | ||||||
|  |         return ObjectMapperFactory.createInstance(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     @After | ||||||
|  |     public void shutdownTopicListener() throws Exception | ||||||
|  |     { | ||||||
|  |         connection.close(); | ||||||
|  |         connection = null; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     @Test | ||||||
|  |     public void shouldReceiveEvent2EventsOnNodeCreation() throws Exception | ||||||
|  |     { | ||||||
|  |         createNode(ContentModel.TYPE_CONTENT); | ||||||
|  |  | ||||||
|  |         Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 1); | ||||||
|  |  | ||||||
|  |         RepoEvent<?> sent = getRepoEvent(1); | ||||||
|  |         RepoEvent<?> received = receivedEvents.get(0); | ||||||
|  |         assertEventsEquals("Events are different!", sent, received); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     private void assertEventsEquals(String message, RepoEvent<?> expected, RepoEvent<?> current) | ||||||
|  |     { | ||||||
|  |         if (DEBUG) | ||||||
|  |         { | ||||||
|  |             System.err.println("XP: " + expected); | ||||||
|  |             System.err.println("CU: " + current); | ||||||
|  |         } | ||||||
|  |          | ||||||
|  |         assertEquals(message, expected, current); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     @Test | ||||||
|  |     public void shouldReceiveEvent2EventsInOrder() throws Exception | ||||||
|  |     { | ||||||
|  |         NodeRef nodeRef = createNode(ContentModel.TYPE_CONTENT); | ||||||
|  |         updateNodeName(nodeRef, "TestFile-" + System.currentTimeMillis() + ".txt"); | ||||||
|  |         deleteNode(nodeRef); | ||||||
|  |  | ||||||
|  |         Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> receivedEvents.size() == 3); | ||||||
|  |  | ||||||
|  |         RepoEvent<?> sentCreation = getRepoEvent(1); | ||||||
|  |         RepoEvent<?> sentUpdate = getRepoEvent(2); | ||||||
|  |         RepoEvent<?> sentDeletion = getRepoEvent(3); | ||||||
|  |         assertEquals("Expected create event!", sentCreation, (RepoEvent<?>) receivedEvents.get(0)); | ||||||
|  |         assertEquals("Expected update event!", sentUpdate, (RepoEvent<?>) receivedEvents.get(1)); | ||||||
|  |         assertEquals("Expected delete event!", sentDeletion, (RepoEvent<?>) receivedEvents.get(2)); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     private static String getText(Message message) | ||||||
|  |     { | ||||||
|  |         try | ||||||
|  |         { | ||||||
|  |             ActiveMQTextMessage am = (ActiveMQTextMessage) message; | ||||||
|  |             return am.getText(); | ||||||
|  |         } catch (JMSException e) | ||||||
|  |         { | ||||||
|  |             return null; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     // a simple main to investigate the contents of the local broker | ||||||
|  |     public static void main(String[] args) throws Exception | ||||||
|  |     { | ||||||
|  |         dumpBroker("tcp://localhost:61616", DUMP_BROKER_TIMEOUT); | ||||||
|  |         System.exit(0); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     private static void dumpBroker(String url, long timeout) throws Exception | ||||||
|  |     { | ||||||
|  |         System.out.println("Broker at url: '" + url + "'"); | ||||||
|  |  | ||||||
|  |         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); | ||||||
|  |         ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); | ||||||
|  |         try | ||||||
|  |         { | ||||||
|  |             connection.start(); | ||||||
|  |  | ||||||
|  |             DestinationSource ds = connection.getDestinationSource(); | ||||||
|  |  | ||||||
|  |             Set<ActiveMQQueue> queues = ds.getQueues(); | ||||||
|  |             System.out.println("\nFound " + queues.size() + " queues:"); | ||||||
|  |             for (ActiveMQQueue queue : queues) | ||||||
|  |             { | ||||||
|  |                 try | ||||||
|  |                 { | ||||||
|  |                     System.out.println("- " + queue.getQueueName()); | ||||||
|  |                 } catch (JMSException e) | ||||||
|  |                 { | ||||||
|  |                     e.printStackTrace(); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             Set<ActiveMQTopic> topics = ds.getTopics(); | ||||||
|  |             System.out.println("\nFound " + topics.size() + " topics:"); | ||||||
|  |             for (ActiveMQTopic topic : topics) | ||||||
|  |             { | ||||||
|  |                 try | ||||||
|  |                 { | ||||||
|  |                     System.out.println("- " + topic.getTopicName()); | ||||||
|  |                 } catch (JMSException e) | ||||||
|  |                 { | ||||||
|  |                     e.printStackTrace(); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | ||||||
|  |             Destination destination = session.createTopic(EVENT2_TOPIC_NAME); | ||||||
|  |             MessageConsumer consumer = session.createConsumer(destination); | ||||||
|  |  | ||||||
|  |             System.out.println("\nListening to topic " + EVENT2_TOPIC_NAME + "..."); | ||||||
|  |             consumer.setMessageListener(new MessageListener() | ||||||
|  |             { | ||||||
|  |                 @Override | ||||||
|  |                 public void onMessage(Message message) | ||||||
|  |                 { | ||||||
|  |                     String text = getText(message); | ||||||
|  |                     System.out.println("Received message " + message + "\n" + text + "\n"); | ||||||
|  |                 } | ||||||
|  |             }); | ||||||
|  |  | ||||||
|  |             Thread.sleep(timeout); | ||||||
|  |         } finally | ||||||
|  |         { | ||||||
|  |             connection.close(); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
| @@ -34,7 +34,8 @@ import org.junit.runners.Suite.SuiteClasses; | |||||||
|                 UpdateRepoEventIT.class, |                 UpdateRepoEventIT.class, | ||||||
|                 DeleteRepoEventIT.class, |                 DeleteRepoEventIT.class, | ||||||
|                 ChildAssociationRepoEventIT.class, |                 ChildAssociationRepoEventIT.class, | ||||||
|                 PeerAssociationRepoEventIT.class |                 PeerAssociationRepoEventIT.class, | ||||||
|  |                 EventGeneratorTest.class | ||||||
| }) | }) | ||||||
| public class RepoEvent2ITSuite | public class RepoEvent2ITSuite | ||||||
| { | { | ||||||
|   | |||||||
| @@ -33,7 +33,8 @@ import org.junit.runners.Suite.SuiteClasses; | |||||||
| @RunWith(Suite.class) | @RunWith(Suite.class) | ||||||
| @SuiteClasses({ EventFilterUnitTest.class, | @SuiteClasses({ EventFilterUnitTest.class, | ||||||
|                 EventConsolidatorUnitTest.class, |                 EventConsolidatorUnitTest.class, | ||||||
|                 EventJSONSchemaUnitTest.class |                 EventJSONSchemaUnitTest.class, | ||||||
|  |                 EventGeneratorQueueUnitTest.class | ||||||
| }) | }) | ||||||
| public class RepoEvent2UnitSuite | public class RepoEvent2UnitSuite | ||||||
| { | { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user