From e40bd92c21ffc42ccc693f57f66708ec2da0a557 Mon Sep 17 00:00:00 2001 From: Matt Ward Date: Tue, 14 Feb 2012 12:27:57 +0000 Subject: [PATCH] Messaging facade: finished changing jgroups heartbeating over to facade. git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@33886 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261 --- config/alfresco/bootstrap-context.xml | 5 +++ config/alfresco/cache-context.xml | 3 +- .../repo/cluster/ClusterTestSuite.java | 40 +++++++++++++++++ .../repo/cluster/HazelcastMessenger.java | 12 ++++- .../cluster/HazelcastMessengerFactory.java | 19 +++++++- .../HazelcastMessengerFactoryTest.java | 29 ++++++++++++ .../repo/cluster/HazelcastMessengerTest.java | 2 +- .../alfresco/repo/cluster/HazelcastTest.java | 1 + .../repo/cluster/JGroupsMessenger.java | 19 +++++++- .../repo/cluster/JGroupsMessengerFactory.java | 6 +++ .../repo/cluster/JGroupsMessengerTest.java | 10 +++++ .../alfresco/repo/cluster/JGroupsTest.java | 12 +++++ .../org/alfresco/repo/cluster/Messenger.java | 2 + .../repo/cluster/MessengerFactory.java | 10 ++++- .../cluster/MessengerFactoryProvider.java | 44 +++++++++++++++++++ .../AlfrescoJGroupsChannelFactory.java | 11 ++--- 16 files changed, 211 insertions(+), 14 deletions(-) create mode 100644 source/java/org/alfresco/repo/cluster/ClusterTestSuite.java create mode 100644 source/java/org/alfresco/repo/cluster/MessengerFactoryProvider.java diff --git a/config/alfresco/bootstrap-context.xml b/config/alfresco/bootstrap-context.xml index 994d4335c4..dee06f97c3 100644 --- a/config/alfresco/bootstrap-context.xml +++ b/config/alfresco/bootstrap-context.xml @@ -182,6 +182,11 @@ + + + + + diff --git a/config/alfresco/cache-context.xml b/config/alfresco/cache-context.xml index 227ed1a573..36331fc2da 100644 --- a/config/alfresco/cache-context.xml +++ b/config/alfresco/cache-context.xml @@ -13,7 +13,8 @@ - + diff --git a/source/java/org/alfresco/repo/cluster/ClusterTestSuite.java b/source/java/org/alfresco/repo/cluster/ClusterTestSuite.java new file mode 100644 index 0000000000..22f92113d4 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/ClusterTestSuite.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.repo.cluster; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; +import org.junit.runners.Suite.SuiteClasses; + +/** + * Test suite for the org.alfresco.repo.cluster package. + * + * @author Matt Ward + */ +@RunWith(Suite.class) +@SuiteClasses({ + org.alfresco.repo.cluster.HazelcastMessengerFactoryTest.class, + org.alfresco.repo.cluster.HazelcastMessengerTest.class, + org.alfresco.repo.cluster.HazelcastTest.class, + org.alfresco.repo.cluster.JGroupsMessengerTest.class, + org.alfresco.repo.cluster.JGroupsTest.class +}) +public class ClusterTestSuite +{ +} diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java b/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java index 718bc71222..bfedc4b576 100644 --- a/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java @@ -36,14 +36,15 @@ public class HazelcastMessenger implements Messenger, { private ITopic topic; private MessageReceiver receiverDelegate; - + private String address; /** * @param topic */ - public HazelcastMessenger(ITopic topic) + public HazelcastMessenger(ITopic topic, String address) { this.topic = topic; + this.address = address; } @@ -79,4 +80,11 @@ public class HazelcastMessenger implements Messenger, { return topic; } + + + @Override + public String getAddress() + { + return address; + } } diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java index cf86143e55..91be5a9290 100644 --- a/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java @@ -21,6 +21,10 @@ package org.alfresco.repo.cluster; import java.io.Serializable; +import org.springframework.util.StringUtils; + +import com.hazelcast.config.Config; +import com.hazelcast.config.GroupConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ITopic; @@ -39,7 +43,8 @@ public class HazelcastMessengerFactory implements MessengerFactory public Messenger createMessenger(String appRegion) { ITopic topic = hazelcast.getTopic(appRegion); - return new HazelcastMessenger(topic); + String address = hazelcast.getCluster().getLocalMember().getInetSocketAddress().toString(); + return new HazelcastMessenger(topic, address); } /** @@ -49,4 +54,16 @@ public class HazelcastMessengerFactory implements MessengerFactory { this.hazelcast = hazelcast; } + + @Override + public boolean isClusterActive() + { + Config config = hazelcast.getConfig(); + if (config == null || config.getGroupConfig() == null) + { + return false; + } + GroupConfig groupConfig = config.getGroupConfig(); + return StringUtils.hasText(groupConfig.getName()); + } } diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java index 8a9c18309d..037a7b9461 100644 --- a/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java @@ -19,17 +19,24 @@ package org.alfresco.repo.cluster; +import static org.junit.Assert.*; import static org.junit.Assert.assertSame; import static org.mockito.Mockito.when; +import java.net.InetSocketAddress; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import com.hazelcast.config.Config; +import com.hazelcast.config.GroupConfig; +import com.hazelcast.core.Cluster; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ITopic; +import com.hazelcast.core.Member; /** @@ -41,23 +48,45 @@ import com.hazelcast.core.ITopic; public class HazelcastMessengerFactoryTest { private HazelcastMessengerFactory factory; + private GroupConfig groupConfig; private @Mock HazelcastInstance hazelcast; + private @Mock Member member; + private @Mock Cluster cluster; private @Mock ITopic topic; + private @Mock Config config; @Before public void setUp() { factory = new HazelcastMessengerFactory(); factory.setHazelcast(hazelcast); + groupConfig = new GroupConfig(); } @Test public void topicWrappedInMessenger() { when(hazelcast.getTopic("app-region")).thenReturn(topic); + when(hazelcast.getCluster()).thenReturn(cluster); + when(cluster.getLocalMember()).thenReturn(member); + when(member.getInetSocketAddress()).thenReturn(InetSocketAddress.createUnresolved("a-host-name", 1234)); Messenger messenger = factory.createMessenger("app-region"); assertSame(topic, ((HazelcastMessenger) messenger).getTopic()); + assertEquals("a-host-name:1234", messenger.getAddress()); + } + + @Test + public void canCheckClusterIsActive() + { + when(hazelcast.getConfig()).thenReturn(config); + when(config.getGroupConfig()).thenReturn(groupConfig); + + groupConfig.setName("my-cluster-name"); + assertEquals(true, factory.isClusterActive()); + + groupConfig.setName(""); + assertEquals(false, factory.isClusterActive()); } } diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessengerTest.java b/source/java/org/alfresco/repo/cluster/HazelcastMessengerTest.java index 625009211e..5ef3fd1ca3 100644 --- a/source/java/org/alfresco/repo/cluster/HazelcastMessengerTest.java +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessengerTest.java @@ -45,7 +45,7 @@ public class HazelcastMessengerTest @Before public void setUp() { - messenger = new HazelcastMessenger(topic); + messenger = new HazelcastMessenger(topic, "address"); receivedMsg = null; } diff --git a/source/java/org/alfresco/repo/cluster/HazelcastTest.java b/source/java/org/alfresco/repo/cluster/HazelcastTest.java index 4815c7d0be..f8ed604887 100644 --- a/source/java/org/alfresco/repo/cluster/HazelcastTest.java +++ b/source/java/org/alfresco/repo/cluster/HazelcastTest.java @@ -57,6 +57,7 @@ public class HazelcastTest implements MessageListener public static void tearDownClass() { ApplicationContextHelper.closeApplicationContext(); + Hazelcast.shutdownAll(); } @Before diff --git a/source/java/org/alfresco/repo/cluster/JGroupsMessenger.java b/source/java/org/alfresco/repo/cluster/JGroupsMessenger.java index f96604ee39..1178902565 100644 --- a/source/java/org/alfresco/repo/cluster/JGroupsMessenger.java +++ b/source/java/org/alfresco/repo/cluster/JGroupsMessenger.java @@ -28,6 +28,8 @@ import java.io.ObjectOutput; import java.io.ObjectOutputStream; import java.io.Serializable; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.jgroups.Channel; import org.jgroups.Message; import org.jgroups.ReceiverAdapter; @@ -41,7 +43,7 @@ public class JGroupsMessenger extends ReceiverAdapter im { private final Channel channel; private MessageReceiver receiverDelegate; - + private final static Log logger = LogFactory.getLog(JGroupsMessenger.class); /** * Construct a messenger that wraps a JGroups Channel. @@ -66,6 +68,10 @@ public class JGroupsMessenger extends ReceiverAdapter im out.writeObject(message); out.close(); bytes.close(); + if (logger.isTraceEnabled()) + { + logger.trace("Sending " + message); + } channel.send(null, null, bytes.toByteArray()); } catch (Throwable e) @@ -105,6 +111,10 @@ public class JGroupsMessenger extends ReceiverAdapter im T payload = (T) in.readObject(); in.close(); bytes.close(); + if (logger.isTraceEnabled()) + { + logger.trace("Received (will be delegated to receiver): " + payload); + } // Pass the deserialized payload on to the receiver delegate receiverDelegate.onReceive(payload); } @@ -124,4 +134,11 @@ public class JGroupsMessenger extends ReceiverAdapter im { return channel.isConnected(); } + + + @Override + public String getAddress() + { + return channel.getAddress().toString(); + } } diff --git a/source/java/org/alfresco/repo/cluster/JGroupsMessengerFactory.java b/source/java/org/alfresco/repo/cluster/JGroupsMessengerFactory.java index 083c33810b..3e8d49e86d 100644 --- a/source/java/org/alfresco/repo/cluster/JGroupsMessengerFactory.java +++ b/source/java/org/alfresco/repo/cluster/JGroupsMessengerFactory.java @@ -38,5 +38,11 @@ public class JGroupsMessengerFactory implements MessengerFactory ParameterCheck.mandatory("appRegion", appRegion); Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion); return new JGroupsMessenger(channel); + } + + @Override + public boolean isClusterActive() + { + return AlfrescoJGroupsChannelFactory.isClusterActive(); } } diff --git a/source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java b/source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java index 5d0c4e324c..413c01e76a 100644 --- a/source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java +++ b/source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.ObjectOutput; import java.io.ObjectOutputStream; +import org.jgroups.Address; import org.jgroups.Channel; import org.jgroups.ChannelClosedException; import org.jgroups.ChannelNotConnectedException; @@ -102,6 +103,15 @@ public class JGroupsMessengerTest assertEquals(false, messenger.isConnected()); } + @Test + public void canDelegateGetAddress() + { + Address address = Mockito.mock(Address.class); + Mockito.when(address.toString()).thenReturn("an-address"); + Mockito.when(channel.getAddress()).thenReturn(address); + assertEquals("an-address", messenger.getAddress()); + } + private byte[] serialize(String text) throws IOException { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); diff --git a/source/java/org/alfresco/repo/cluster/JGroupsTest.java b/source/java/org/alfresco/repo/cluster/JGroupsTest.java index 9111a2f656..40c84f6229 100644 --- a/source/java/org/alfresco/repo/cluster/JGroupsTest.java +++ b/source/java/org/alfresco/repo/cluster/JGroupsTest.java @@ -120,6 +120,18 @@ public class JGroupsTest extends ReceiverAdapter helper.checkMessageReceivedWas("This message was sent with jgroups"); } + @Test + public void canCheckIsClusterActive() + { + JGroupsMessengerFactory messengerFactory = new JGroupsMessengerFactory(); + + AlfrescoJGroupsChannelFactory.changeClusterNamePrefix(null); + assertEquals(false, messengerFactory.isClusterActive()); + + AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("my-cluster-name"); + assertEquals(true, messengerFactory.isClusterActive()); + } + @Ignore("Not currently allowing multiple receivers per underlying Channel") @Test public void canReceiveFromMultipleMessageReceivers() throws InterruptedException, ChannelException diff --git a/source/java/org/alfresco/repo/cluster/Messenger.java b/source/java/org/alfresco/repo/cluster/Messenger.java index 0ef889c33f..27db9716f6 100644 --- a/source/java/org/alfresco/repo/cluster/Messenger.java +++ b/source/java/org/alfresco/repo/cluster/Messenger.java @@ -38,4 +38,6 @@ public interface Messenger void setReceiver(MessageReceiver receiver); boolean isConnected(); + + String getAddress(); } diff --git a/source/java/org/alfresco/repo/cluster/MessengerFactory.java b/source/java/org/alfresco/repo/cluster/MessengerFactory.java index 26f827063e..0c6c75a530 100644 --- a/source/java/org/alfresco/repo/cluster/MessengerFactory.java +++ b/source/java/org/alfresco/repo/cluster/MessengerFactory.java @@ -27,6 +27,14 @@ import java.io.Serializable; * @author Matt Ward */ public interface MessengerFactory -{ +{ + /** A catch-all for unknown application regions. */ + public static final String APP_REGION_DEFAULT = "DEFAULT"; + + /** The application region used by the EHCache heartbeat implementation over JGroups. */ + public static final String APP_REGION_EHCACHE_HEARTBEAT = "EHCACHE_HEARTBEAT"; + Messenger createMessenger(String appRegion); + + boolean isClusterActive(); } diff --git a/source/java/org/alfresco/repo/cluster/MessengerFactoryProvider.java b/source/java/org/alfresco/repo/cluster/MessengerFactoryProvider.java new file mode 100644 index 0000000000..3bc3630757 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/MessengerFactoryProvider.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.repo.cluster; + +/** + * Static container for MessengerFactory. This allows code to obtain the correct + * {@link MessengerFactory} implementation where dependency injection is not available. + * + * @author Matt Ward + */ +public class MessengerFactoryProvider +{ + private static MessengerFactory instance; + + public void setInstance(MessengerFactory messengerFactory) + { + instance = messengerFactory; + } + + public static MessengerFactory getInstance() + { + if (instance == null) + { + throw new IllegalStateException("MessengerFactory instance not configured yet."); + } + return instance; + } +} diff --git a/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java index ebd3a85ebc..3ecc40b033 100644 --- a/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java +++ b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java @@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.alfresco.error.AlfrescoRuntimeException; +import org.alfresco.repo.cluster.MessengerFactory; import org.alfresco.util.PropertyCheck; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -68,10 +69,6 @@ import org.springframework.util.ResourceUtils; */ public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean { - /** A catch-all for unknown application regions. */ - public static final String APP_REGION_DEFAULT = "DEFAULT"; - /** The application region used by the EHCache heartbeat implementation over JGroups. */ - public static final String APP_REGION_EHCACHE_HEARTBEAT = "EHCACHE_HEARTBEAT"; /** The UDP protocol config (default) */ public static final String DEFAULT_CONFIG_UDP = "classpath:alfresco/jgroups/alfresco-jgroups-UDP.xml"; /** The TCP protocol config */ @@ -102,7 +99,7 @@ public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean clusterNamePrefix = null; configUrlsByAppRegion = new HashMap(5); configUrlsByAppRegion.put( - AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT, + MessengerFactory.APP_REGION_DEFAULT, AlfrescoJGroupsChannelFactory.DEFAULT_CONFIG_UDP); } @@ -167,7 +164,7 @@ public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean String configUrlStr = configUrlsByAppRegion.get(appRegion); if (!PropertyCheck.isValidPropertyString(configUrlStr)) { - configUrlStr = configUrlsByAppRegion.get(AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT); + configUrlStr = configUrlsByAppRegion.get(MessengerFactory.APP_REGION_DEFAULT); } if (configUrlStr == null) { @@ -432,7 +429,7 @@ public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean try { // Check that there is a mapping for default - if (!configUrlsByAppRegion.containsKey(AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT)) + if (!configUrlsByAppRegion.containsKey(MessengerFactory.APP_REGION_DEFAULT)) { throw new AlfrescoRuntimeException("A configuration URL must be defined for 'DEFAULT'"); }