diff --git a/config/alfresco/bootstrap-context.xml b/config/alfresco/bootstrap-context.xml index fd8d5bf0fe..7eea049604 100644 --- a/config/alfresco/bootstrap-context.xml +++ b/config/alfresco/bootstrap-context.xml @@ -170,24 +170,10 @@ - - - - ${alfresco.cluster.name} - - - - - ${alfresco.jgroups.configLocation} - - - + + - - diff --git a/config/alfresco/cluster-context.xml b/config/alfresco/cluster-context.xml index 2160c567ee..41be670422 100644 --- a/config/alfresco/cluster-context.xml +++ b/config/alfresco/cluster-context.xml @@ -6,8 +6,22 @@ + ${alfresco.cluster.name} ${alfresco.hazelcast.password} + ${alfresco.hazelcast.specify.interface} + ${alfresco.hazelcast.bind.interface} + + + ${alfresco.hazelcast.tcp.config} + + + ${alfresco.hazelcast.ec2.accesskey} + ${alfresco.hazelcast.ec2.secretkey} + ${alfresco.hazelcast.ec2.region} + ${alfresco.hazelcast.ec2.securitygroup} + ${alfresco.hazelcast.ec2.tagkey} + ${alfresco.hazelcast.ec2.tagvalue} diff --git a/config/alfresco/core-services-context.xml b/config/alfresco/core-services-context.xml index d25a0e712d..d4150af079 100644 --- a/config/alfresco/core-services-context.xml +++ b/config/alfresco/core-services-context.xml @@ -121,12 +121,6 @@ - - ${alfresco.jgroups.bind_address} - - - ${alfresco.jgroups.bind_interface} - ${alfresco.ehcache.rmi.hostname} diff --git a/config/alfresco/extension/ehcache-custom.xml.sample.cluster b/config/alfresco/extension/ehcache-custom.xml.sample.cluster index 2863383ad5..b731055290 100644 --- a/config/alfresco/extension/ehcache-custom.xml.sample.cluster +++ b/config/alfresco/extension/ehcache-custom.xml.sample.cluster @@ -3,7 +3,7 @@ path="java.io.tmpdir"/> ${alfresco.hazelcast.ec2.tagvalue} - - 10.10.1.* + + ${alfresco.hazelcast.bind.interface} - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/config/alfresco/jgroups/alfresco-jgroups-UDP.xml b/config/alfresco/jgroups/alfresco-jgroups-UDP.xml deleted file mode 100644 index 37b76982ff..0000000000 --- a/config/alfresco/jgroups/alfresco-jgroups-UDP.xml +++ /dev/null @@ -1,67 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - diff --git a/config/alfresco/repository.properties b/config/alfresco/repository.properties index a771be6684..883a9bda86 100644 --- a/config/alfresco/repository.properties +++ b/config/alfresco/repository.properties @@ -129,8 +129,13 @@ alfresco.hazelcast.protocol=tcp # Location of the Hazelcast configuration file alfresco.hazelcast.configLocation=classpath:alfresco/hazelcast/hazelcast-${alfresco.hazelcast.protocol}.xml # XML elements to incorporate into Hazelcast config, in particular -# hostnames to use for membership discovery -alfresco.hazelcast.tcp.config=localhost +# hostnames/IP addresses to use for membership discovery +alfresco.hazelcast.tcp.config= +# Whether to bind to a specific host interface +alfresco.hazelcast.specify.interface=false +# The interface to bind to, if enabled above. +alfresco.hazelcast.bind.interface= + # Amazon Web Services - EC2 discovery alfresco.hazelcast.ec2.accesskey=my-access-key alfresco.hazelcast.ec2.secretkey=my-secret-key @@ -149,17 +154,6 @@ alfresco.ehcache.rmi.remoteObjectPort=0 alfresco.ehcache.rmi.port=0 alfresco.ehcache.rmi.socketTimeoutMillis=5000 -# The protocol stack to use from the JGroups configuration file -# Use this property to select which communication method should be used. -# The JGroups configuration file is build up using the protocol string -alfresco.jgroups.defaultProtocol=UDP -# The bind address and interface for JGroups to use; equivalent to -Djgroups.bind_addr and -Djgroups.bind_interface -alfresco.jgroups.bind_address= -alfresco.jgroups.bind_interface= -# JGroups configuration (http://www.jgroups.org) -# The location of the JGroups configuration file -alfresco.jgroups.configLocation=classpath:alfresco/jgroups/alfresco-jgroups-${alfresco.jgroups.defaultProtocol}.xml - # # How long should shutdown wait to complete normally before # taking stronger action and calling System.exit() diff --git a/source/java/org/alfresco/repo/cache/AlfrescoCacheManagerPeerProviderFactory.java b/source/java/org/alfresco/repo/cache/AlfrescoCacheManagerPeerProviderFactory.java index 2d938a0456..6f6e74ef7a 100644 --- a/source/java/org/alfresco/repo/cache/AlfrescoCacheManagerPeerProviderFactory.java +++ b/source/java/org/alfresco/repo/cache/AlfrescoCacheManagerPeerProviderFactory.java @@ -46,7 +46,7 @@ public class AlfrescoCacheManagerPeerProviderFactory extends CacheManagerPeerPro try { @SuppressWarnings("unchecked") - Class clazz = Class.forName("org.alfresco.enterprise.repo.cache.jgroups.JGroupsRMICacheManagerPeerProvider$Factory"); + Class clazz = Class.forName("org.alfresco.enterprise.repo.cache.cluster.RMICacheManagerPeerProvider$Factory"); factory = (CacheManagerPeerProviderFactory) clazz.newInstance(); } catch (ClassNotFoundException e) @@ -55,7 +55,7 @@ public class AlfrescoCacheManagerPeerProviderFactory extends CacheManagerPeerPro } catch (Throwable e) { - logger.error("Failed to instantiate JGroupsRMICacheManagerPeerProvider factory.", e); + logger.error("Failed to instantiate RMICacheManagerPeerProvider factory.", e); } finally { diff --git a/source/java/org/alfresco/repo/cluster/BuildSafeTestSuite.java b/source/java/org/alfresco/repo/cluster/BuildSafeTestSuite.java index dfb4ecd079..b922fa0974 100644 --- a/source/java/org/alfresco/repo/cluster/BuildSafeTestSuite.java +++ b/source/java/org/alfresco/repo/cluster/BuildSafeTestSuite.java @@ -36,8 +36,7 @@ import org.junit.runners.Suite.SuiteClasses; @SuiteClasses({ org.alfresco.repo.cluster.HazelcastConfigFactoryBeanTest.class, org.alfresco.repo.cluster.HazelcastMessengerFactoryTest.class, - org.alfresco.repo.cluster.HazelcastMessengerTest.class, - org.alfresco.repo.cluster.JGroupsMessengerTest.class + org.alfresco.repo.cluster.HazelcastMessengerTest.class }) public class BuildSafeTestSuite { diff --git a/source/java/org/alfresco/repo/cluster/ClusterMembershipListener.java b/source/java/org/alfresco/repo/cluster/ClusterMembershipListener.java new file mode 100644 index 0000000000..2341986774 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/ClusterMembershipListener.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.repo.cluster; + +/** + * Implementing classes can react to members joining or leaving the cluster. + * + * @author Matt Ward + */ +public interface ClusterMembershipListener +{ + void memberJoined(String member, String[] cluster); + void memberLeft(String member, String[] cluster); +} diff --git a/source/java/org/alfresco/repo/cluster/ClusterTestSuite.java b/source/java/org/alfresco/repo/cluster/ClusterTestSuite.java index 591868be84..502fba17a9 100644 --- a/source/java/org/alfresco/repo/cluster/ClusterTestSuite.java +++ b/source/java/org/alfresco/repo/cluster/ClusterTestSuite.java @@ -36,8 +36,7 @@ import org.junit.runners.Suite.SuiteClasses; org.alfresco.repo.cluster.BuildSafeTestSuite.class, // Additionally run these tests that cannot be run on the build servers. - org.alfresco.repo.cluster.HazelcastTest.class, - org.alfresco.repo.cluster.JGroupsTest.class + org.alfresco.repo.cluster.HazelcastTest.class }) public class ClusterTestSuite { diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java b/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java index 28309a10b5..0a3d540304 100644 --- a/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java @@ -21,9 +21,9 @@ package org.alfresco.repo.cluster; import java.io.Serializable; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.springframework.extensions.surf.util.ParameterCheck; import com.hazelcast.core.ITopic; import com.hazelcast.core.MessageListener; @@ -40,6 +40,7 @@ public class HazelcastMessenger implements Messenger, private MessageReceiver receiverDelegate; private String address; private final static Log logger = LogFactory.getLog(HazelcastMessenger.class); + /** * @param topic */ @@ -55,7 +56,8 @@ public class HazelcastMessenger implements Messenger, { if (logger.isTraceEnabled()) { - logger.trace("Sending " + message); + String digest = StringUtils.abbreviate(message.toString(), 50); + logger.trace("Sending [source: " + address + "]: " + digest); } topic.publish(message); } @@ -74,7 +76,8 @@ public class HazelcastMessenger implements Messenger, { if (logger.isTraceEnabled()) { - logger.trace("Received (will be delegated to receiver): " + message); + String digest = StringUtils.abbreviate(message.toString(), 50); + logger.trace("Received [destination: " + address + "] (delegating to receiver): " + digest); } receiverDelegate.onReceive(message); } @@ -95,5 +98,14 @@ public class HazelcastMessenger implements Messenger, public String getAddress() { return address; - } + } + + + @Override + public String toString() + { + return "HazelcastMessenger[connected=" + isConnected() + + ", topic=" + getTopic() + + ", address=" + getAddress() + "]"; + } } diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java index d4223ac59a..9ba93ac6dd 100644 --- a/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java @@ -20,13 +20,13 @@ package org.alfresco.repo.cluster; import java.io.Serializable; +import java.util.Set; -import org.springframework.util.StringUtils; - -import com.hazelcast.config.Config; -import com.hazelcast.config.GroupConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ITopic; +import com.hazelcast.core.Member; +import com.hazelcast.core.MembershipEvent; +import com.hazelcast.core.MembershipListener; /** * Hazelcast-based implementation of the {@link MessengerFactory} interface. @@ -37,8 +37,8 @@ import com.hazelcast.core.ITopic; */ public class HazelcastMessengerFactory implements MessengerFactory { - private HazelcastInstance hazelcast; - + private HazelcastInstanceFactory hazelcastInstanceFactory; + @Override public Messenger createMessenger(String appRegion) { @@ -48,28 +48,70 @@ public class HazelcastMessengerFactory implements MessengerFactory @Override public Messenger createMessenger(String appRegion, boolean acceptLocalMessages) { + if (!isClusterActive()) + { + return new NullMessenger(); + } + // Clustering is enabled, create a messenger. + HazelcastInstance hazelcast = hazelcastInstanceFactory.getInstance(); ITopic topic = hazelcast.getTopic(appRegion); String address = hazelcast.getCluster().getLocalMember().getInetSocketAddress().toString(); return new HazelcastMessenger(topic, address); } /** - * @param hazelcast the hazelcast to set + * Provide the messenger factory with a means to obtain a HazelcastInstance. + * + * @param hazelcastInstanceFactory */ - public void setHazelcast(HazelcastInstance hazelcast) + public void setHazelcastInstanceFactory(HazelcastInstanceFactory hazelcastInstanceFactory) { - this.hazelcast = hazelcast; + this.hazelcastInstanceFactory = hazelcastInstanceFactory; } @Override public boolean isClusterActive() { - Config config = hazelcast.getConfig(); - if (config == null || config.getGroupConfig() == null) + return hazelcastInstanceFactory.isClusteringEnabled(); + } + + @Override + public void addMembershipListener(final ClusterMembershipListener listener) + { + if (isClusterActive()) { - return false; + HazelcastInstance hazelcast = hazelcastInstanceFactory.getInstance(); + hazelcast.getCluster().addMembershipListener(new MembershipListener() + { + @Override + public void memberRemoved(MembershipEvent e) + { + listener.memberLeft(member(e), cluster(e)); + } + + @Override + public void memberAdded(MembershipEvent e) + { + listener.memberJoined(member(e), cluster(e)); + } + + private String member(MembershipEvent e) + { + return e.getMember().getInetSocketAddress().toString(); + } + + private String[] cluster(MembershipEvent e) + { + Set members = e.getCluster().getMembers(); + String[] cluster = new String[members.size()]; + int i = 0; + for (Member m : members) + { + cluster[i++] = m.getInetSocketAddress().toString(); + } + return cluster; + } + }); } - GroupConfig groupConfig = config.getGroupConfig(); - return StringUtils.hasText(groupConfig.getName()); } } diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java index 037a7b9461..84e5d02e80 100644 --- a/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java @@ -19,7 +19,7 @@ package org.alfresco.repo.cluster; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.mockito.Mockito.when; @@ -31,8 +31,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import com.hazelcast.config.Config; -import com.hazelcast.config.GroupConfig; import com.hazelcast.core.Cluster; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ITopic; @@ -48,19 +46,20 @@ import com.hazelcast.core.Member; public class HazelcastMessengerFactoryTest { private HazelcastMessengerFactory factory; - private GroupConfig groupConfig; private @Mock HazelcastInstance hazelcast; private @Mock Member member; private @Mock Cluster cluster; private @Mock ITopic topic; - private @Mock Config config; + private @Mock HazelcastInstanceFactory hazelcastInstanceFactory; @Before public void setUp() { factory = new HazelcastMessengerFactory(); - factory.setHazelcast(hazelcast); - groupConfig = new GroupConfig(); + factory.setHazelcastInstanceFactory(hazelcastInstanceFactory); + + when(hazelcastInstanceFactory.isClusteringEnabled()).thenReturn(true); + when(hazelcastInstanceFactory.getInstance()).thenReturn(hazelcast); } @Test @@ -80,13 +79,9 @@ public class HazelcastMessengerFactoryTest @Test public void canCheckClusterIsActive() { - when(hazelcast.getConfig()).thenReturn(config); - when(config.getGroupConfig()).thenReturn(groupConfig); - - groupConfig.setName("my-cluster-name"); assertEquals(true, factory.isClusterActive()); - groupConfig.setName(""); + when(hazelcastInstanceFactory.isClusteringEnabled()).thenReturn(false); assertEquals(false, factory.isClusterActive()); } } diff --git a/source/java/org/alfresco/repo/cluster/HazelcastTest.java b/source/java/org/alfresco/repo/cluster/HazelcastTest.java index 29ee9f6638..d161152711 100644 --- a/source/java/org/alfresco/repo/cluster/HazelcastTest.java +++ b/source/java/org/alfresco/repo/cluster/HazelcastTest.java @@ -29,8 +29,6 @@ import org.junit.Ignore; import org.junit.Test; import org.springframework.context.ApplicationContext; -import com.hazelcast.config.Config; -import com.hazelcast.config.GroupConfig; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ITopic; @@ -46,6 +44,8 @@ public class HazelcastTest implements MessageListener { private static ApplicationContext ctx; private MessengerTestHelper helper; + private HazelcastInstanceFactory hiFactory; + private HazelcastInstance hi; @BeforeClass public static void setUpClass() @@ -65,14 +65,14 @@ public class HazelcastTest implements MessageListener public void setUp() { helper = new MessengerTestHelper(); + hiFactory = ctx.getBean(HazelcastInstanceFactory.class); + hi = hiFactory.getInstance(); } @Test public void canSendWithHazelcastMessengerFactory() throws InterruptedException { - Config config = createConfig(); - HazelcastInstance hi = Hazelcast.newHazelcastInstance(config); ITopic topic = hi.getTopic("testregion"); topic.addMessageListener(this); @@ -93,8 +93,6 @@ public class HazelcastTest implements MessageListener TestMessageReceiver r1 = new TestMessageReceiver(); m1.setReceiver(r1); - Config config = createConfig(); - HazelcastInstance hi = Hazelcast.newHazelcastInstance(config); ITopic topic2 = hi.getTopic("testregion"); String address2 = hi.getCluster().getLocalMember().getInetSocketAddress().toString(); Messenger m2 = new HazelcastMessenger(topic2, address2); @@ -112,14 +110,4 @@ public class HazelcastTest implements MessageListener { helper.setReceivedMsg(message); } - - private Config createConfig() - { - Config config = new Config(); - GroupConfig groupConfig = new GroupConfig(); - groupConfig.setName("testcluster"); - groupConfig.setPassword("secret"); - config.setGroupConfig(groupConfig); - return config; - } } diff --git a/source/java/org/alfresco/repo/cluster/JGroupsMessenger.java b/source/java/org/alfresco/repo/cluster/JGroupsMessenger.java deleted file mode 100644 index 1178902565..0000000000 --- a/source/java/org/alfresco/repo/cluster/JGroupsMessenger.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright (C) 2005-2012 Alfresco Software Limited. - * - * This file is part of Alfresco - * - * Alfresco is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Alfresco is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with Alfresco. If not, see . - */ - -package org.alfresco.repo.cluster; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectInputStream; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jgroups.Channel; -import org.jgroups.Message; -import org.jgroups.ReceiverAdapter; - -/** - * JGroups implementation of the {@link Messenger} class. - * - * @author Matt Ward - */ -public class JGroupsMessenger extends ReceiverAdapter implements Messenger -{ - private final Channel channel; - private MessageReceiver receiverDelegate; - private final static Log logger = LogFactory.getLog(JGroupsMessenger.class); - - /** - * Construct a messenger that wraps a JGroups Channel. - * - * @param channel - */ - public JGroupsMessenger(Channel channel) - { - this.channel = channel; - } - - - @Override - public void send(T message) - { - try - { - // Serializing the message ourselves and passing a byte[] - // to Channel.send() as recommended by JGroups. - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - ObjectOutput out = new ObjectOutputStream(bytes); - out.writeObject(message); - out.close(); - bytes.close(); - if (logger.isTraceEnabled()) - { - logger.trace("Sending " + message); - } - channel.send(null, null, bytes.toByteArray()); - } - catch (Throwable e) - { - throw new MessageSendingException(e); - } - } - - /* - * @see org.alfresco.repo.cluster.Messenger#setReceiver(org.alfresco.repo.cluster.MessageReceiver) - */ - @Override - public void setReceiver(MessageReceiver receiver) - { - // Make sure the delegate is installed, before starting to receive messages. - receiverDelegate = receiver; - // Start receiving messages and dispatch them to the delegate. - channel.setReceiver(this); - } - - - /* - * @see org.jgroups.ReceiverAdapter#receive(org.jgroups.Message) - */ - @Override - public void receive(Message msg) - { - // Deserializing the message ourselves rather than using - // the Message's getObject() method (as recommended by JGroups). - byte[] msgBytes = msg.getBuffer(); - ByteArrayInputStream bytes = new ByteArrayInputStream(msgBytes); - ObjectInput in; - try - { - in = new ObjectInputStream(bytes); - @SuppressWarnings("unchecked") - T payload = (T) in.readObject(); - in.close(); - bytes.close(); - if (logger.isTraceEnabled()) - { - logger.trace("Received (will be delegated to receiver): " + payload); - } - // Pass the deserialized payload on to the receiver delegate - receiverDelegate.onReceive(payload); - } - catch (IOException e) - { - throw new RuntimeException("Couldn't receive object.", e); - } - catch (ClassNotFoundException e) - { - throw new RuntimeException("Couldn't receive object.", e); - } - } - - - @Override - public boolean isConnected() - { - return channel.isConnected(); - } - - - @Override - public String getAddress() - { - return channel.getAddress().toString(); - } -} diff --git a/source/java/org/alfresco/repo/cluster/JGroupsMessengerFactory.java b/source/java/org/alfresco/repo/cluster/JGroupsMessengerFactory.java deleted file mode 100644 index 09133d85d6..0000000000 --- a/source/java/org/alfresco/repo/cluster/JGroupsMessengerFactory.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (C) 2005-2012 Alfresco Software Limited. - * - * This file is part of Alfresco - * - * Alfresco is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Alfresco is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with Alfresco. If not, see . - */ - -package org.alfresco.repo.cluster; - -import java.io.Serializable; - -import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory; -import org.alfresco.util.ParameterCheck; -import org.jgroups.Channel; - -/** - * JGroups implementation of the {@link MessengerFactory} interface. - * - * @author Matt Ward - */ -public class JGroupsMessengerFactory implements MessengerFactory -{ - @Override - public Messenger createMessenger(String appRegion) - { - return createMessenger(appRegion, false); - } - - @Override - public Messenger createMessenger(String appRegion, boolean acceptLocalMessages) - { - ParameterCheck.mandatory("appRegion", appRegion); - Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion, acceptLocalMessages); - return new JGroupsMessenger(channel); - } - - @Override - public boolean isClusterActive() - { - return AlfrescoJGroupsChannelFactory.isClusterActive(); - } -} diff --git a/source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java b/source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java deleted file mode 100644 index 413c01e76a..0000000000 --- a/source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright (C) 2005-2012 Alfresco Software Limited. - * - * This file is part of Alfresco - * - * Alfresco is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Alfresco is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with Alfresco. If not, see . - */ - -package org.alfresco.repo.cluster; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.verify; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; - -import org.jgroups.Address; -import org.jgroups.Channel; -import org.jgroups.ChannelClosedException; -import org.jgroups.ChannelNotConnectedException; -import org.jgroups.Message; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -/** - * Tests for the JGroupsMessenger class. - * - * @author Matt Ward - */ -public class JGroupsMessengerTest -{ - private Channel channel; - private JGroupsMessenger messenger; - protected String receivedMsg; - - @Before - public void setUp() - { - channel = Mockito.mock(Channel.class); - messenger = new JGroupsMessenger(channel); - receivedMsg = null; - } - - @Test - public void canSendMessage() throws ChannelNotConnectedException, ChannelClosedException, IOException - { - String testText = "This is a test message"; - byte[] testTextSer = serialize(testText); - // When a message is sent... - messenger.send(testText); - - // the underlying channel should have been used to send it, - // but will be called with a serialized version of the text. - verify(channel).send(null, null, testTextSer); - } - - - @Test - public void canReceiveMessage() throws IOException - { - MessageReceiver receiver = new MessageReceiver() - { - @Override - public void onReceive(String message) - { - receivedMsg = message; - } - }; - - messenger.setReceiver(receiver); - Message jgroupsMessage = new Message(null, null, serialize("JGroups message payload")); - // JGroups will call the receive method - messenger.receive(jgroupsMessage); - - // The Messenger should have installed itself as the message - // receiver for the underlying channel. - verify(channel).setReceiver(messenger); - - assertEquals("JGroups message payload", receivedMsg.toString()); - } - - @Test - public void canDelegateIsConnected() - { - Mockito.when(channel.isConnected()).thenReturn(true); - assertEquals(true, messenger.isConnected()); - - Mockito.when(channel.isConnected()).thenReturn(false); - assertEquals(false, messenger.isConnected()); - } - - @Test - public void canDelegateGetAddress() - { - Address address = Mockito.mock(Address.class); - Mockito.when(address.toString()).thenReturn("an-address"); - Mockito.when(channel.getAddress()).thenReturn(address); - assertEquals("an-address", messenger.getAddress()); - } - - private byte[] serialize(String text) throws IOException - { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - ObjectOutput out = new ObjectOutputStream(bytes); - out.writeObject(text); - out.close(); - bytes.close(); - return bytes.toByteArray(); - } -} - diff --git a/source/java/org/alfresco/repo/cluster/JGroupsTest.java b/source/java/org/alfresco/repo/cluster/JGroupsTest.java deleted file mode 100644 index 66fd420d1d..0000000000 --- a/source/java/org/alfresco/repo/cluster/JGroupsTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright (C) 2005-2012 Alfresco Software Limited. - * - * This file is part of Alfresco - * - * Alfresco is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Alfresco is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with Alfresco. If not, see . - */ - -package org.alfresco.repo.cluster; - -import static org.junit.Assert.assertEquals; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectInputStream; -import java.util.Collections; - -import org.alfresco.repo.cluster.MessengerTestHelper.TestMessageReceiver; -import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory; -import org.alfresco.util.ApplicationContextHelper; -import org.jgroups.Channel; -import org.jgroups.ChannelException; -import org.jgroups.JChannel; -import org.jgroups.Message; -import org.jgroups.ReceiverAdapter; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.springframework.context.ApplicationContext; - -/** - * Tests for the JGroups messaging abstractions. - * - * @author Matt Ward - */ -public class JGroupsTest extends ReceiverAdapter -{ - private static ApplicationContext ctx; - private MessengerTestHelper helper; - - @BeforeClass - public static void setUpClass() - { - ctx = ApplicationContextHelper. - getApplicationContext(new String[] { "cluster-test/jgroups-messenger-test.xml" }); - } - - @AfterClass - public static void tearDownClass() - { - ApplicationContextHelper.closeApplicationContext(); - } - - @Before - public void setUp() - { - helper = new MessengerTestHelper(); - } - - @Test - public void canSendWithJGroupsMessengerFactory() throws InterruptedException, ChannelException - { - Channel ch = new JChannel("udp.xml"); - ch.connect("testcluster:testregion"); - ch.setReceiver(this); - - MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory"); - Messenger messenger = messengerFactory.createMessenger("testregion"); - messenger.send("Full test including spring."); - - helper.checkMessageReceivedWas("Full test including spring."); - } - - @Test - public void canSendWithJGroupsMessengerFactoryWithoutSpring() throws InterruptedException, ChannelException - { - Channel ch = new JChannel("udp.xml"); - ch.connect("testcluster:testregion"); - ch.setReceiver(this); - - AlfrescoJGroupsChannelFactory channelFactory = new AlfrescoJGroupsChannelFactory(); - channelFactory.setClusterName("testcluster"); - channelFactory.setConfigUrlsByAppRegion(Collections.singletonMap("DEFAULT", "classpath:udp.xml")); - AlfrescoJGroupsChannelFactory.rebuildChannels(); - - JGroupsMessengerFactory messengerFactory = new JGroupsMessengerFactory(); - - Messenger messenger = messengerFactory.createMessenger("testregion"); - messenger.send("This is a test payload."); - - helper.checkMessageReceivedWas("This is a test payload."); - } - - @Test - public void canWrapRawChannels() throws ChannelException, InterruptedException - { - Channel sendCh = new JChannel("udp.xml"); - sendCh.connect("mycluster"); - Messenger messenger = new JGroupsMessenger(sendCh); - - Channel recvCh = new JChannel("udp.xml"); - recvCh.connect("mycluster"); - recvCh.setReceiver(this); - - messenger.send("This message was sent with jgroups"); - - helper.checkMessageReceivedWas("This message was sent with jgroups"); - } - - @Test - public void canCheckIsClusterActive() - { - JGroupsMessengerFactory messengerFactory = new JGroupsMessengerFactory(); - - AlfrescoJGroupsChannelFactory.changeClusterNamePrefix(null); - assertEquals(false, messengerFactory.isClusterActive()); - - AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("my-cluster-name"); - assertEquals(true, messengerFactory.isClusterActive()); - } - - @Test - public void messengerWillNotReceiveMessagesFromSelf() throws InterruptedException, ChannelException - { - MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory"); - Messenger m1 = messengerFactory.createMessenger("testregion"); - TestMessageReceiver r1 = new TestMessageReceiver(); - m1.setReceiver(r1); - - Channel ch2 = new JChannel("udp.xml"); - ch2.connect("testcluster:testregion"); - Messenger m2 = new JGroupsMessenger(ch2); - TestMessageReceiver r2 = new TestMessageReceiver(); - m2.setReceiver(r2); - - m1.send("This should be received by r2 but not r1"); - - r2.helper.checkMessageReceivedWas("This should be received by r2 but not r1"); - r1.helper.checkNoMessageReceived(); - } - - @Override - public void receive(Message msg) - { - ByteArrayInputStream bytes = new ByteArrayInputStream(msg.getBuffer()); - ObjectInput in; - try - { - in = new ObjectInputStream(bytes); - String payload = (String) in.readObject(); - in.close(); - bytes.close(); - helper.setReceivedMsg(payload); - } - catch (IOException e) - { - throw new RuntimeException("Couldn't receive object.", e); - } - catch (ClassNotFoundException e) - { - throw new RuntimeException("Couldn't receive object.", e); - } - } -} diff --git a/source/java/org/alfresco/repo/cluster/Messenger.java b/source/java/org/alfresco/repo/cluster/Messenger.java index 27db9716f6..465e112278 100644 --- a/source/java/org/alfresco/repo/cluster/Messenger.java +++ b/source/java/org/alfresco/repo/cluster/Messenger.java @@ -23,8 +23,8 @@ import java.io.Serializable; /** * Provides facilities for peer-to-peer messaging within a cluster. This interface - * is intended to act as a facade, allowing the actual implementation (e.g. JGroups - * or Hazelcast) to be decoupled as much as possible from the Alfresco code base. + * is intended to act as a facade, allowing the actual implementation (e.g. Hazelcast) + * to be decoupled as much as possible from the Alfresco code base. *

* Instances of this class are parameterised with the type of message payload * to send and receive. diff --git a/source/java/org/alfresco/repo/cluster/MessengerFactory.java b/source/java/org/alfresco/repo/cluster/MessengerFactory.java index e1bc891e8c..fc39ddfa7d 100644 --- a/source/java/org/alfresco/repo/cluster/MessengerFactory.java +++ b/source/java/org/alfresco/repo/cluster/MessengerFactory.java @@ -31,7 +31,7 @@ public interface MessengerFactory /** A catch-all for unknown application regions. */ public static final String APP_REGION_DEFAULT = "DEFAULT"; - /** The application region used by the EHCache heartbeat implementation over JGroups. */ + /** The application region used by the EHCache heartbeat implementation. */ public static final String APP_REGION_EHCACHE_HEARTBEAT = "EHCACHE_HEARTBEAT"; Messenger createMessenger(String appRegion); @@ -39,4 +39,6 @@ public interface MessengerFactory Messenger createMessenger(String appRegion, boolean acceptLocalMessages); boolean isClusterActive(); + + void addMembershipListener(ClusterMembershipListener membershipListener); } diff --git a/source/java/org/alfresco/repo/cluster/NullMessenger.java b/source/java/org/alfresco/repo/cluster/NullMessenger.java new file mode 100644 index 0000000000..55b63587aa --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/NullMessenger.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.repo.cluster; + +import java.io.Serializable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A do-nothing implementation of the {@link Messenger} interface. + * + * @author Matt Ward + */ +public class NullMessenger implements Messenger +{ + private static final Log logger = LogFactory.getLog(NullMessenger.class); + + @Override + public void send(T message) + { + if (logger.isDebugEnabled()) + { + logger.debug("Throwing away message: " + message); + } + } + + @Override + public void setReceiver(MessageReceiver receiver) + { + if (logger.isDebugEnabled()) + { + logger.debug("Throwing away receiver: " + receiver); + } + } + + @Override + public boolean isConnected() + { + return false; + } + + @Override + public String getAddress() + { + if (logger.isDebugEnabled()) + { + logger.debug("getAddress() always returns loopback address: 127.0.0.1"); + } + return "127.0.0.1"; + } +} diff --git a/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java deleted file mode 100644 index 2636af2c7c..0000000000 --- a/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java +++ /dev/null @@ -1,966 +0,0 @@ -/* - * Copyright (C) 2005-2010 Alfresco Software Limited. - * - * This file is part of Alfresco - * - * Alfresco is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Alfresco is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with Alfresco. If not, see . - */ -package org.alfresco.repo.jgroups; - -import java.io.Serializable; -import java.net.URL; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Vector; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; - -import org.alfresco.error.AlfrescoRuntimeException; -import org.alfresco.repo.cluster.MessengerFactory; -import org.alfresco.util.PropertyCheck; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jgroups.Address; -import org.jgroups.Channel; -import org.jgroups.ChannelClosedException; -import org.jgroups.ChannelException; -import org.jgroups.ChannelListener; -import org.jgroups.ChannelNotConnectedException; -import org.jgroups.Event; -import org.jgroups.JChannel; -import org.jgroups.Message; -import org.jgroups.Receiver; -import org.jgroups.TimeoutException; -import org.jgroups.UpHandler; -import org.jgroups.View; -import org.jgroups.protocols.LOOPBACK; -import org.jgroups.stack.ProtocolStack; -import org.springframework.context.ApplicationEvent; -import org.springframework.extensions.surf.util.AbstractLifecycleBean; -import org.springframework.util.ResourceUtils; - -/** - * A cache peer provider that does heartbeat sending and receiving using JGroups. - *

- * The cluster name needs to be set before any communication is possible. This can be done using the - * property {@link #setClusterName(String)}. - *

- * The channels provided to the callers will be proxies to underlying channels that will be hot-swappable. - * This means that the client code can continue to use the channel references while the actual - * implementation can be switched in and out as required. - * - * @author Derek Hulley - * @since 2.1.3 - */ -public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean -{ - /** The UDP protocol config (default) */ - public static final String DEFAULT_CONFIG_UDP = "classpath:alfresco/jgroups/alfresco-jgroups-UDP.xml"; - /** The TCP protocol config */ - public static final String DEFAULT_CONFIG_TCP = "classpath:alfresco/jgroups/alfresco-jgroups-TCP.xml"; - - private static Log logger = LogFactory.getLog(AlfrescoJGroupsChannelFactory.class); - - // Synchronization locks - private static ReadLock readLock; - private static WriteLock writeLock; - - // Values that are modified by the bean implementation - private static String clusterNamePrefix; - private static Map configUrlsByAppRegion; - - // Derived data - /** A map that stores channel information by the application region. */ - private static final Map channelsByAppRegion; - - static - { - ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); // Fair - readLock = readWriteLock.readLock(); - writeLock = readWriteLock.writeLock(); - - channelsByAppRegion = new HashMap(5); - - clusterNamePrefix = null; - configUrlsByAppRegion = new HashMap(5); - configUrlsByAppRegion.put( - MessengerFactory.APP_REGION_DEFAULT, - AlfrescoJGroupsChannelFactory.DEFAULT_CONFIG_UDP); - } - - /** - * Check if a cluster name was provided. - * - * @return Returns true if the cluster configuration is active, - * i.e. a cluster name was provided - */ - public static boolean isClusterActive() - { - readLock.lock(); - try - { - return clusterNamePrefix != null; - } - finally - { - readLock.unlock(); - } - } - - /** - * Close all channels. All the channels will be closed and will cease to function. - */ - private static void closeChannels() - { - for (Map.Entry entry : channelsByAppRegion.entrySet()) - { - ChannelProxy channelProxy = entry.getValue(); - - // Close the channel via the proxy - try - { - channelProxy.close(); - if (logger.isDebugEnabled()) - { - logger.debug("\n" + - "Closed channel: " + channelProxy); - } - } - catch (Throwable e) - { - logger.warn( - "Unable to close channel: \n" + - " Channel: " + channelProxy, - e); - } - } - } - - /** - * Returns the configuration URL to use for the given application region. This might default to the - * {@link #APP_REGION_DEFAULT default app region}. - */ - private static String getConfigUrl(String appRegion) - { - readLock.lock(); - try - { - // Get the configuration to use - String configUrlStr = configUrlsByAppRegion.get(appRegion); - if (!PropertyCheck.isValidPropertyString(configUrlStr)) - { - configUrlStr = configUrlsByAppRegion.get(MessengerFactory.APP_REGION_DEFAULT); - } - if (configUrlStr == null) - { - throw new AlfrescoRuntimeException( - "No protocol configuration was found for application region: \n" + - " Cluster prefix: " + clusterNamePrefix + "\n" + - " App region: " + appRegion + "\n" + - " Regions defined: " + configUrlsByAppRegion); - } - return configUrlStr; - } - finally - { - readLock.unlock(); - } - } - - /** - /** - * Creates a channel for the cluster. This method should not be heavily used - * as the checks and synchronizations will slow the calls. Returned channels can be - * kept and will be modified directly using the factory-held references, if necessary. - *

- * The application region is used to determine the protocol configuration to apply. - *

- * This method returns a dummy channel if no cluster name has been provided. - * - * @param appRegion the application region identifier. - * @return Returns a channel - */ - public static Channel getChannel(String appRegion, boolean acceptLocalMessages) - { - readLock.lock(); - try - { - ChannelProxy channelProxy = channelsByAppRegion.get(appRegion); - if (channelProxy != null) - { - // This will do - return channelProxy; - } - } - finally - { - readLock.unlock(); - } - // Being here means that there is no channel yet - // Go write - writeLock.lock(); - try - { - ChannelProxy channelProxy = channelsByAppRegion.get(appRegion); - if (channelProxy != null) - { - // This will do - return channelProxy; - } - // Get the channel - Channel channel = getChannelInternal(appRegion, acceptLocalMessages); - // Proxy the channel - channelProxy = new ChannelProxy(channel); - // Store the channel to the map - channelsByAppRegion.put(appRegion, channelProxy); - // Done - return channelProxy; - } - finally - { - writeLock.unlock(); - } - } - - /** - * Creates a channel for the given cluster. The application region is used - * to determine the protocol configuration to apply. - * - * @param appRegion the application region identifier. - * @return Returns a channel - */ - /* All calls to this are ultimately wrapped in the writeLock. */ - private static /*synchronized*/ Channel getChannelInternal(String appRegion, boolean acceptLocalMessages) - { - Channel channel; - URL configUrl = null; - // If there is no cluster defined (yet) then we define a dummy channel - if (AlfrescoJGroupsChannelFactory.clusterNamePrefix == null) - { - try - { - channel = new DummyJChannel(); - } - catch (Throwable e) - { - throw new AlfrescoRuntimeException( - "Failed to create dummy JGroups channel: \n" + - " Cluster prefix: " + clusterNamePrefix + "\n" + - " App region: " + appRegion, - e); - } - } - else // Create real channel - { - // Get the protocol configuration to use - String configUrlStr = getConfigUrl(appRegion); - try - { - // Construct the JChannel directly - configUrl = ResourceUtils.getURL(configUrlStr); - channel = new JChannel(configUrl); - } - catch (Throwable e) - { - throw new AlfrescoRuntimeException( - "Failed to create JGroups channel: \n" + - " Cluster prefix: " + clusterNamePrefix + "\n" + - " App region: " + appRegion + "\n" + - " Regions defined: " + configUrlsByAppRegion + "\n" + - " Configuration URL: " + configUrlStr, - e); - } - } - // Initialise the channel - try - { - String clusterName = clusterNamePrefix + ":" + appRegion; - // Don't accept messages from self - if(acceptLocalMessages) - { - channel.setOpt(Channel.LOCAL, Boolean.TRUE); - } - else - { - channel.setOpt(Channel.LOCAL, Boolean.FALSE); - } - - // Connect - channel.connect(clusterName); - // Done - if (logger.isDebugEnabled()) - { - logger.debug("\n" + - "Created JGroups channel: \n" + - " Cluster prefix: " + clusterNamePrefix + "\n" + - " App region: " + appRegion + "\n" + - " Regions defined: " + configUrlsByAppRegion + "\n" + - " Channel: " + channel + "\n" + - " Configuration URL: " + configUrl); - } - } - catch (Throwable e) - { - throw new AlfrescoRuntimeException( - "Failed to initialise JGroups channel: \n" + - " Cluster prefix: " + clusterNamePrefix + "\n" + - " App region: " + appRegion + "\n" + - " Channel: " + channel + "\n" + - " Configuration URL: " + configUrl, - e); - } - return channel; - } - - /** - * Rebuild all the channels using the current cluster name and configuration mappings. - */ - public static void rebuildChannels() - { - writeLock.lock(); - try - { - rebuildChannelsInternal(); - } - finally - { - writeLock.unlock(); - } - } - - /** - * Throw away all calculated values and rebuild. This means that the channel factory will - * be reconstructed from scratch. All the channels are reconstructed - but this will not - * affect any references to channels held outside this class as the values returned are proxies - * on top of hot swappable implementations. - *

- * The old channel is closed before the new one is created, so it is possible for a channel - * held by client code to be rendered unusable during the switch-over. - */ - /* All calls to this are ultimately wrapped in the writeLock. */ - private static /*synchronized*/ void rebuildChannelsInternal() - { - // Reprocess all the application regions with the new data - for (Map.Entry entry : channelsByAppRegion.entrySet()) - { - String appRegion = entry.getKey(); - ChannelProxy channelProxy = entry.getValue(); - - // Get the old channel - Channel oldChannel = channelProxy.getDelegate(); - - Boolean acceptLocalMessages = (Boolean)oldChannel.getOpt(Channel.LOCAL); - - // Close the old channel. - try - { - oldChannel.close(); - if (logger.isDebugEnabled()) - { - logger.debug("\n" + - "Closed old channel during channel rebuild: \n" + - " Old channel: " + oldChannel); - } - } - catch (Throwable e) - { - logger.warn( - "Unable to close old channel during channel rebuild: \n" + - " Old channel: " + oldChannel, - e); - } - - // Create the new channel - Channel newChannel = getChannelInternal(appRegion, acceptLocalMessages.booleanValue()); - - // Now do the hot-swap - channelProxy.swap(newChannel); - } - } - - /** - * Set the prefix used to identify the different clusters. Each application region will - * have a separate cluster name that will be: - *

-     *    clusterNamePrefix:appRegion
-     * 
- * If no cluster name prefix is declared, the cluster is effectively disabled. - *

- * NOTE: The channels must be {@link #rebuildChannels() rebuilt}. - * - * @param clusterNamePrefix a prefix to append to the cluster names used - */ - public static void changeClusterNamePrefix(String clusterNamePrefix) - { - writeLock.lock(); - try - { - if (!PropertyCheck.isValidPropertyString(clusterNamePrefix)) - { - // Clear everything out - AlfrescoJGroupsChannelFactory.clusterNamePrefix = null; - } - else - { - AlfrescoJGroupsChannelFactory.clusterNamePrefix = clusterNamePrefix; - } - } - finally - { - writeLock.unlock(); - } - } - - /** - * Configure a mapping between the application regions and the available JGroup protocol configurations. - * The map must contain a mapping for application region 'DEFAULT'. - *

- * NOTE: The channels must be {@link #rebuildChannels() rebuilt}. - * - * @param configUrlsByAppRegion a mapping from application region (keys) to protocol configuration URLs (values) - */ - private static void changeConfigUrlsMapping(Map configUrlsByAppRegion) - { - writeLock.lock(); - try - { - // Check that there is a mapping for default - if (!configUrlsByAppRegion.containsKey(MessengerFactory.APP_REGION_DEFAULT)) - { - throw new AlfrescoRuntimeException("A configuration URL must be defined for 'DEFAULT'"); - } - AlfrescoJGroupsChannelFactory.configUrlsByAppRegion = configUrlsByAppRegion; - } - finally - { - writeLock.unlock(); - } - } - - /** - * Bean-enabling constructor - */ - public AlfrescoJGroupsChannelFactory() - { - } - - /** - * @see AlfrescoJGroupsChannelFactory#changeClusterNamePrefix(String) - */ - public void setClusterName(String clusterName) - { - AlfrescoJGroupsChannelFactory.changeClusterNamePrefix(clusterName); - } - - /** - * @see AlfrescoJGroupsChannelFactory#changeConfigUrlsMapping(Map) - */ - public void setConfigUrlsByAppRegion(Map configUrlsByAppRegion) - { - AlfrescoJGroupsChannelFactory.changeConfigUrlsMapping(configUrlsByAppRegion); - } - - /** - * @deprecated Use {@link #setConfigUrlsByAppRegion(Map)} - */ - public void setProtocolStackMapping(Map unused) - { - throw new AlfrescoRuntimeException( - "Properties 'protocolStackMapping' and 'jgroupsConfigurationUrl'" + - " have been deprecated in favour of 'configUrlsByAppRegion'."); - } - - /** - * @deprecated Use {@link #setConfigUrlsByAppRegion(Map)} - */ - public void setJgroupsConfigurationUrl(String configUrl) - { - throw new AlfrescoRuntimeException( - "Properties 'protocolStackMapping' and 'jgroupsConfigurationUrl'" + - " have been deprecated in favour of 'configUrlsByAppRegion'."); - } - - @Override - protected void onBootstrap(ApplicationEvent event) - { - AlfrescoJGroupsChannelFactory.rebuildChannels(); - } - - @Override - protected void onShutdown(ApplicationEvent event) - { - AlfrescoJGroupsChannelFactory.closeChannels(); - } - - /** - * A no-op JChannel using the "DUMMY_TP" protocol only - * - * @author Derek Hulley - * @since 2.1.3 - */ - private static class DummyJChannel extends JChannel - { - public DummyJChannel() throws ChannelException - { - super("org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory$DummyProtocol"); - } - } - - public static class DummyProtocol extends LOOPBACK - { - public DummyProtocol() - { - super(); - enable_diagnostics = false; - } - - @Override - public String getName() - { - return "ALF_DUMMY"; - } - - @Override - public Object down(Event evt) - { - return null; - } - - @Override - public Object up(Event evt) - { - return null; - } - } - - /** - * A proxy channel that can be used to hot-swap underlying channels. All listeners - * and the receiver will be carried over to the new underlying channel when it is - * swapped out. - * - * @author Derek Hulley - */ - @SuppressWarnings("deprecation") - public static class ChannelProxy extends Channel - { - /* - * Not synchronizing. Mostly swapping will be VERY rare and if there is a bit - * of inconsistency it is not important. - */ - private Channel delegate; - private UpHandler delegateUpHandler; - private Set delegateChannelListeners; - private Receiver delegateReceiver; - - /** - * @param delegate the real channel that will do the work - */ - public ChannelProxy(Channel delegate) - { - this.delegate = delegate; - this.delegateChannelListeners = new HashSet(7); - } - - /** - * @return Returns the channel to which the implementation will delegate - */ - public Channel getDelegate() - { - return delegate; - } - - /** - * Swap the channel. The old delegate will be disconnected before the swap occurs. - * This guarantees data consistency, assuming that any failures will be handled. - *

- * Note that the old delegate is not closed or shutdown. - * - * @param the new delegate - * @return the old, disconnected delegate - */ - public synchronized Channel swap(Channel channel) - { - // Remove the listeners from the old channel - delegate.setReceiver(null); - for (ChannelListener delegateChannelListener : delegateChannelListeners) - { - delegate.removeChannelListener(delegateChannelListener); - } - delegate.setUpHandler(null); - - Channel oldDelegate = delegate; - - // Assign the new delegate and carry the listeners over - delegate = channel; - delegate.setReceiver(delegateReceiver); - delegate.setOpt(Channel.LOCAL, oldDelegate.getOpt(Channel.LOCAL)); - for (ChannelListener delegateChannelListener : delegateChannelListeners) - { - delegate.addChannelListener(delegateChannelListener); - } - delegate.setUpHandler(delegateUpHandler); - // Done - return oldDelegate; - } - - @Override - protected org.jgroups.logging.Log getLog() - { - throw new UnsupportedOperationException(); - } - - @Override - public Address getAddress() - { - return delegate.getAddress(); - } - - @Override - public String getName() - { - return delegate.getName(); - } - - @Override - public ProtocolStack getProtocolStack() - { - return delegate.getProtocolStack(); - } - - @Override - public synchronized void setReceiver(Receiver r) - { - delegateReceiver = r; - delegate.setReceiver(r); - } - - @Override - public synchronized void addChannelListener(ChannelListener listener) - { - if (listener == null) - { - return; - } - delegateChannelListeners.add(listener); - delegate.addChannelListener(listener); - } - - @Override - public synchronized void removeChannelListener(ChannelListener listener) - { - if (listener != null) - { - delegateChannelListeners.remove(listener); - } - delegate.removeChannelListener(listener); - } - - @Override - public synchronized void clearChannelListeners() - { - delegateChannelListeners.clear(); - delegate.clearChannelListeners(); - } - - @Override - public synchronized void setUpHandler(UpHandler up_handler) - { - delegateUpHandler = up_handler; - delegate.setUpHandler(up_handler); - } - - @Override - public void blockOk() - { - delegate.blockOk(); - } - - @Override - public void close() - { - delegate.close(); - } - - @Override - public void connect(String cluster_name, Address target, String state_id, long timeout) throws ChannelException - { - delegate.connect(cluster_name, target, state_id, timeout); - } - - @Override - public void connect(String cluster_name) throws ChannelException - { - delegate.connect(cluster_name); - } - - @Override - public void disconnect() - { - delegate.disconnect(); - } - - @Override - public void down(Event evt) - { - delegate.down(evt); - } - - @Override - public Object downcall(Event evt) - { - return delegate.downcall(evt); - } - - @Override - public String dumpQueue() - { - return delegate.dumpQueue(); - } - - @Override - @SuppressWarnings({ "unchecked", "rawtypes" }) - public Map dumpStats() - { - return delegate.dumpStats(); - } - - @Override - public boolean equals(Object obj) - { - return delegate.equals(obj); - } - - @Override - public boolean flushSupported() - { - return delegate.flushSupported(); - } - - @SuppressWarnings("rawtypes") - @Override - public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException, ChannelClosedException - { - return delegate.getAllStates(targets, timeout); - } - - @Override - public String getChannelName() - { - return delegate.getChannelName(); - } - - @Override - public String getClusterName() - { - return delegate.getClusterName(); - } - - @Override - public Map getInfo() - { - return delegate.getInfo(); - } - - @Override - public Address getLocalAddress() - { - return delegate.getLocalAddress(); - } - - @Override - public int getNumMessages() - { - return delegate.getNumMessages(); - } - - @Override - public Object getOpt(int option) - { - return delegate.getOpt(option); - } - - @Override - public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException - { - return delegate.getState(target, timeout); - } - - @Override - public boolean getState(Address target, String state_id, long timeout) throws ChannelNotConnectedException, ChannelClosedException - { - return delegate.getState(target, state_id, timeout); - } - - @Override - public View getView() - { - return delegate.getView(); - } - - @Override - public int hashCode() - { - return delegate.hashCode(); - } - - @Override - public boolean isConnected() - { - return delegate.isConnected(); - } - - @Override - public boolean isOpen() - { - return delegate.isOpen(); - } - - @Override - public void open() throws ChannelException - { - delegate.open(); - } - - @Override - public Object peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException - { - return delegate.peek(timeout); - } - - @Override - public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException - { - return delegate.receive(timeout); - } - - @Override - public void returnState(byte[] state, String state_id) - { - delegate.returnState(state, state_id); - } - - @Override - public void returnState(byte[] state) - { - delegate.returnState(state); - } - - @Override - public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException, ChannelClosedException - { - delegate.send(dst, src, obj); - } - - @Override - public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException - { - delegate.send(msg); - } - - @Override - public void setChannelListener(ChannelListener channel_listener) - { - delegate.setChannelListener(channel_listener); - } - - @Override - public void setInfo(String key, Object value) - { - delegate.setInfo(key, value); - } - - @Override - public void setOpt(int option, Object value) - { - delegate.setOpt(option, value); - } - - @Override - public void shutdown() - { - delegate.shutdown(); - } - - @Override - public boolean startFlush(boolean automatic_resume) - { - return delegate.startFlush(automatic_resume); - } - - @Override - public boolean startFlush(List

flushParticipants, boolean automatic_resume) - { - return delegate.startFlush(flushParticipants, automatic_resume); - } - - @Override - public boolean startFlush(long timeout, boolean automatic_resume) - { - return delegate.startFlush(timeout, automatic_resume); - } - - @Override - public void stopFlush() - { - delegate.stopFlush(); - } - - @Override - public void stopFlush(List
flushParticipants) - { - delegate.stopFlush(flushParticipants); - } - - @Override - public synchronized String toString() - { - if (delegate instanceof DummyJChannel) - { - return delegate.toString() + "(dummy)"; - } - else - { - return delegate.toString(); - } - } - - @Override - public String getName(Address member) - { - return delegate.getName(member); - } - - @Override - public void send(Address dst, Address src, byte[] buf) throws ChannelNotConnectedException, ChannelClosedException - { - delegate.send(dst, src, buf); - } - - @Override - public void send(Address dst, Address src, byte[] buf, int offset, int length) throws ChannelNotConnectedException, ChannelClosedException - { - delegate.send(dst, src, buf, offset, length); - } - - @Override - public void setName(String name) - { - delegate.setName(name); - } - } -} diff --git a/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactoryTest.java b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactoryTest.java deleted file mode 100644 index 8a5b8731ef..0000000000 --- a/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactoryTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (C) 2005-2010 Alfresco Software Limited. - * - * This file is part of Alfresco - * - * Alfresco is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Alfresco is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with Alfresco. If not, see . - */ -package org.alfresco.repo.jgroups; - -import org.jgroups.Channel; -import org.jgroups.Message; - -import junit.framework.TestCase; - -/** - * @see AlfrescoJGroupsChannelFactory - * - * @author Derek Hulley - * @since 2.1.3 - */ -public class AlfrescoJGroupsChannelFactoryTest extends TestCase -{ - private static byte[] bytes = new byte[65536]; - static - { - for (int i = 0; i < bytes.length; i++) - { - bytes[i] = 1; - } - } - - private String appRegion; - - @Override - protected void setUp() throws Exception - { - appRegion = getName(); - } - - /** - * Check that the channel is behaving - */ - private void stressChannel(Channel channel) throws Exception - { - System.out.println("Test: " + getName()); - System.out.println(" Channel: " + channel); - System.out.println(" Cluster: " + channel.getClusterName()); - channel.send(null, null, Boolean.TRUE); - channel.send(new Message(null, null, bytes)); - } - - public void testNoCluster() throws Exception - { - Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion, false); - stressChannel(channel); - } - - public void testBasicCluster() throws Exception - { - AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("blah"); - AlfrescoJGroupsChannelFactory.rebuildChannels(); - Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion, false); - stressChannel(channel); - } - - public void testHotSwapCluster() throws Exception - { - AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("ONE"); - AlfrescoJGroupsChannelFactory.rebuildChannels(); - Channel channel1 = AlfrescoJGroupsChannelFactory.getChannel(appRegion, false); - stressChannel(channel1); - AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("TWO"); - AlfrescoJGroupsChannelFactory.rebuildChannels(); - Channel channel2 = AlfrescoJGroupsChannelFactory.getChannel(appRegion, false); - stressChannel(channel1); - assertTrue("Channel reference must be the same", channel1 == channel2); - } -} diff --git a/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupport.java b/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupport.java index 0a1a4be0b7..818059f25b 100644 --- a/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupport.java +++ b/source/java/org/alfresco/repo/transaction/AlfrescoTransactionSupport.java @@ -531,7 +531,6 @@ public abstract class AlfrescoTransactionSupport private final Set lucenes; private final LinkedHashSet listeners; private final Set> transactionalCaches; -// private final Set jgroupsEhCacheListeners; private final Map resources; /** @@ -548,7 +547,6 @@ public abstract class AlfrescoTransactionSupport lucenes = new HashSet(3); listeners = new LinkedHashSet(5); transactionalCaches = new HashSet>(3); -// jgroupsEhCacheListeners = new HashSet(3); resources = new HashMap(17); } @@ -602,10 +600,6 @@ public abstract class AlfrescoTransactionSupport { return transactionalCaches.add((TransactionalCache)listener); } -// else if (listener instanceof JGroupsEhCacheListener) -// { -// return jgroupsEhCacheListeners.add((JGroupsEhCacheListener)listener); -// } else { return listeners.add(listener); @@ -709,12 +703,6 @@ public abstract class AlfrescoTransactionSupport { cache.beforeCommit(readOnly); } -// -// // Flush the JGroups listeners -// for (JGroupsEhCacheListener listener : jgroupsEhCacheListeners) -// { -// listener.beforeCommit(readOnly); -// } } /** diff --git a/source/test-resources/cluster-test/hazelcast-messenger-test.xml b/source/test-resources/cluster-test/hazelcast-messenger-test.xml index e73db0c7ff..b7c391e185 100644 --- a/source/test-resources/cluster-test/hazelcast-messenger-test.xml +++ b/source/test-resources/cluster-test/hazelcast-messenger-test.xml @@ -2,26 +2,21 @@ - - - + - - - - - - - - - - - + + + + + test_hazelcast_cluster + test_hazelcast_cluster_password + + + + + + diff --git a/source/test-resources/cluster-test/jgroups-messenger-test.xml b/source/test-resources/cluster-test/jgroups-messenger-test.xml deleted file mode 100644 index 66cefab25b..0000000000 --- a/source/test-resources/cluster-test/jgroups-messenger-test.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - - - - testcluster - - - - - classpath:udp.xml - - - - - - - \ No newline at end of file