From ada5b12a8ff7a4c7111b5ed604f2f13990aba988 Mon Sep 17 00:00:00 2001 From: Matt Ward Date: Fri, 10 Feb 2012 17:07:41 +0000 Subject: [PATCH] Messaging facade for clustered communications to allow implementations from JGroups or Hazelcast to be used. * PropertyBackedBeanExporter uses the facade, configured with JGroups messaging implementation. * Unfortunately, some remodelling required before this will work with Ehcache's CacheManagerPeerProviderFactory git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@33830 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261 --- .../repo/cluster/HazelcastMessenger.java | 83 ++++++++ .../cluster/HazelcastMessengerFactory.java | 58 ++++++ .../HazelcastMessengerFactoryTest.java | 58 ++++++ .../repo/cluster/HazelcastMessengerTest.java | 79 ++++++++ .../alfresco/repo/cluster/HazelcastTest.java | 95 +++++++++ .../repo/cluster/JGroupsMessenger.java | 127 ++++++++++++ .../repo/cluster/JGroupsMessengerFactory.java | 52 +++++ .../repo/cluster/JGroupsMessengerTest.java | 115 +++++++++++ .../alfresco/repo/cluster/JGroupsTest.java | 180 ++++++++++++++++++ .../repo/cluster/MessageReceiver.java | 34 ++++ .../repo/cluster/MessageSendingException.java | 34 ++++ .../org/alfresco/repo/cluster/Messenger.java | 41 ++++ .../repo/cluster/MessengerFactory.java | 32 ++++ .../repo/cluster/MessengerTestHelper.java | 91 +++++++++ .../cluster-test/hazelcast-messenger-test.xml | 26 +++ .../cluster-test/jgroups-messenger-test.xml | 27 +++ 16 files changed, 1132 insertions(+) create mode 100644 source/java/org/alfresco/repo/cluster/HazelcastMessenger.java create mode 100644 source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java create mode 100644 source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java create mode 100644 source/java/org/alfresco/repo/cluster/HazelcastMessengerTest.java create mode 100644 source/java/org/alfresco/repo/cluster/HazelcastTest.java create mode 100644 source/java/org/alfresco/repo/cluster/JGroupsMessenger.java create mode 100644 source/java/org/alfresco/repo/cluster/JGroupsMessengerFactory.java create mode 100644 source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java create mode 100644 source/java/org/alfresco/repo/cluster/JGroupsTest.java create mode 100644 source/java/org/alfresco/repo/cluster/MessageReceiver.java create mode 100644 source/java/org/alfresco/repo/cluster/MessageSendingException.java create mode 100644 source/java/org/alfresco/repo/cluster/Messenger.java create mode 100644 source/java/org/alfresco/repo/cluster/MessengerFactory.java create mode 100644 source/java/org/alfresco/repo/cluster/MessengerTestHelper.java create mode 100644 source/test-resources/cluster-test/hazelcast-messenger-test.xml create mode 100644 source/test-resources/cluster-test/jgroups-messenger-test.xml diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java b/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java new file mode 100644 index 0000000000..ec698262c0 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import java.io.Serializable; + +import org.springframework.extensions.surf.util.ParameterCheck; + +import com.hazelcast.core.ITopic; +import com.hazelcast.core.MessageListener; + +/** + * Hazelcast-based implementation of the {@link Messenger} interface. + * + * @see HazelcastMessengerFactory + * @author Matt Ward + */ +public class HazelcastMessenger implements Messenger, MessageListener +{ + private ITopic topic; + private MessageReceiver receiverDelegate; + + + /** + * @param topic + */ + public HazelcastMessenger(ITopic topic) + { + this.topic = topic; + } + + + @Override + public void send(T message) + { + topic.publish(message); + } + + @Override + public void setReceiver(MessageReceiver receiver) + { + // Install a delegate to ready to handle incoming messages. + receiverDelegate = receiver; + // Start receiving messages. + topic.addMessageListener(this); + } + + @Override + public void onMessage(T message) + { + ParameterCheck.mandatory("message", message); + receiverDelegate.onReceive(message); + } + + protected String getTopicName() + { + return topic.getName(); + } + + + @Override + public boolean isConnected() + { + return true; + } +} diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java new file mode 100644 index 0000000000..0c5390be51 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactory.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import java.io.Serializable; + +import org.alfresco.util.PropertyCheck; + +import com.hazelcast.core.ITopic; + +/** + * Hazelcast-based implementation of the {@link MessengerFactory} interface. + * The factory must be configured with an {@link ITopic} - which + * should be configured with a topic name that corresponds to an application + * region. + * + * @author Matt Ward + */ +public class HazelcastMessengerFactory implements MessengerFactory +{ + private ITopic topic; + + + /** + * @param topic the topic to set + */ + public void setTopic(ITopic topic) + { + this.topic = topic; + } + + /* + * @see org.alfresco.repo.cluster.MessengerFactory#createMessenger() + */ + @Override + public Messenger createMessenger() + { + PropertyCheck.mandatory(this, "topic", topic); + return new HazelcastMessenger(topic); + } +} diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java new file mode 100644 index 0000000000..81da050280 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessengerFactoryTest.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.hazelcast.core.ITopic; + + +/** + * Tests for the HazelcastMessengerFactory class. + * + * @author Matt Ward + */ +@RunWith(MockitoJUnitRunner.class) +public class HazelcastMessengerFactoryTest +{ + private HazelcastMessengerFactory factory; + private @Mock ITopic topic; + + @Before + public void setUp() + { + factory = new HazelcastMessengerFactory(); + factory.setTopic(topic); + } + + @Test + public void topicWrappedInMessenger() + { + when(topic.getName()).thenReturn("app-region"); + HazelcastMessenger messenger = (HazelcastMessenger) factory.createMessenger(); + assertEquals("app-region", messenger.getTopicName()); + } +} diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessengerTest.java b/source/java/org/alfresco/repo/cluster/HazelcastMessengerTest.java new file mode 100644 index 0000000000..625009211e --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessengerTest.java @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.verify; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.hazelcast.core.ITopic; + +/** + * Tests for the HazelcastMessenger class. + * + * @author Matt Ward + */ +@RunWith(MockitoJUnitRunner.class) +public class HazelcastMessengerTest +{ + private @Mock ITopic topic; + private HazelcastMessenger messenger; + private String receivedMsg; + + @Before + public void setUp() + { + messenger = new HazelcastMessenger(topic); + receivedMsg = null; + } + + @Test + public void canSendMessage() + { + messenger.send("Test string"); + verify(topic).publish("Test string"); + } + + @Test + public void canReceiveMessage() + { + messenger.setReceiver(new MessageReceiver() + { + @Override + public void onReceive(String message) + { + receivedMsg = new String(message); + } + }); + + // Hazelcast will call the onMessage method... + messenger.onMessage("Hazelcast is sending a message."); + + // setReceiver() should have resulted in a listener being registered with the topic. + verify(topic).addMessageListener(messenger); + + assertEquals("Hazelcast is sending a message.", receivedMsg); + } +} diff --git a/source/java/org/alfresco/repo/cluster/HazelcastTest.java b/source/java/org/alfresco/repo/cluster/HazelcastTest.java new file mode 100644 index 0000000000..4e6e2f5841 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/HazelcastTest.java @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + + +import org.alfresco.util.ApplicationContextHelper; +import org.jgroups.ChannelException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.springframework.context.ApplicationContext; + +import com.hazelcast.config.Config; +import com.hazelcast.config.GroupConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.ITopic; +import com.hazelcast.core.MessageListener; + +/** + * Tests for Hazelcast implementations of {@link Messenger} and related classes. + * These are integration tests and configured through a Spring test context file. + * + * @author Matt Ward + */ +public class HazelcastTest implements MessageListener +{ + private static ApplicationContext ctx; + private MessengerTestHelper helper; + + @BeforeClass + public static void setUpClass() + { + ctx = ApplicationContextHelper. + getApplicationContext(new String[] { "cluster-test/hazelcast-messenger-test.xml" }); + } + + @AfterClass + public static void tearDownClass() + { + ApplicationContextHelper.closeApplicationContext(); + } + + @Before + public void setUp() + { + helper = new MessengerTestHelper(); + } + + + @Test + public void canSendWithHazelcastMessengerFactory() throws InterruptedException, ChannelException + { + Config config = new Config(); + GroupConfig groupConfig = new GroupConfig(); + groupConfig.setName("testcluster"); + groupConfig.setPassword("secret"); + config.setGroupConfig(groupConfig); + HazelcastInstance hi = Hazelcast.newHazelcastInstance(config); + ITopic topic = hi.getTopic("testregion"); + + topic.addMessageListener(this); + + MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory"); + Messenger messenger = messengerFactory.createMessenger(); + messenger.send("Full test including spring."); + + helper.checkMessageReceivedWas("Full test including spring."); + } + + + @Override + public void onMessage(String message) + { + helper.setReceivedMsg(message); + } +} diff --git a/source/java/org/alfresco/repo/cluster/JGroupsMessenger.java b/source/java/org/alfresco/repo/cluster/JGroupsMessenger.java new file mode 100644 index 0000000000..f96604ee39 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/JGroupsMessenger.java @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.jgroups.Channel; +import org.jgroups.Message; +import org.jgroups.ReceiverAdapter; + +/** + * JGroups implementation of the {@link Messenger} class. + * + * @author Matt Ward + */ +public class JGroupsMessenger extends ReceiverAdapter implements Messenger +{ + private final Channel channel; + private MessageReceiver receiverDelegate; + + + /** + * Construct a messenger that wraps a JGroups Channel. + * + * @param channel + */ + public JGroupsMessenger(Channel channel) + { + this.channel = channel; + } + + + @Override + public void send(T message) + { + try + { + // Serializing the message ourselves and passing a byte[] + // to Channel.send() as recommended by JGroups. + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ObjectOutput out = new ObjectOutputStream(bytes); + out.writeObject(message); + out.close(); + bytes.close(); + channel.send(null, null, bytes.toByteArray()); + } + catch (Throwable e) + { + throw new MessageSendingException(e); + } + } + + /* + * @see org.alfresco.repo.cluster.Messenger#setReceiver(org.alfresco.repo.cluster.MessageReceiver) + */ + @Override + public void setReceiver(MessageReceiver receiver) + { + // Make sure the delegate is installed, before starting to receive messages. + receiverDelegate = receiver; + // Start receiving messages and dispatch them to the delegate. + channel.setReceiver(this); + } + + + /* + * @see org.jgroups.ReceiverAdapter#receive(org.jgroups.Message) + */ + @Override + public void receive(Message msg) + { + // Deserializing the message ourselves rather than using + // the Message's getObject() method (as recommended by JGroups). + byte[] msgBytes = msg.getBuffer(); + ByteArrayInputStream bytes = new ByteArrayInputStream(msgBytes); + ObjectInput in; + try + { + in = new ObjectInputStream(bytes); + @SuppressWarnings("unchecked") + T payload = (T) in.readObject(); + in.close(); + bytes.close(); + // Pass the deserialized payload on to the receiver delegate + receiverDelegate.onReceive(payload); + } + catch (IOException e) + { + throw new RuntimeException("Couldn't receive object.", e); + } + catch (ClassNotFoundException e) + { + throw new RuntimeException("Couldn't receive object.", e); + } + } + + + @Override + public boolean isConnected() + { + return channel.isConnected(); + } +} diff --git a/source/java/org/alfresco/repo/cluster/JGroupsMessengerFactory.java b/source/java/org/alfresco/repo/cluster/JGroupsMessengerFactory.java new file mode 100644 index 0000000000..65e104e929 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/JGroupsMessengerFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import java.io.Serializable; + +import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory; +import org.alfresco.util.PropertyCheck; +import org.jgroups.Channel; +import org.springframework.beans.factory.annotation.Required; + +/** + * JGroups implementation of the {@link MessengerFactory} interface. + * + * @author Matt Ward + */ +public class JGroupsMessengerFactory implements MessengerFactory +{ + private String appRegion; + + + @Override + public Messenger createMessenger() + { + PropertyCheck.mandatory(this, "appRegion", appRegion); + Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion); + return new JGroupsMessenger(channel); + } + + @Required + public void setAppRegion(String appRegion) + { + this.appRegion = appRegion; + } +} diff --git a/source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java b/source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java new file mode 100644 index 0000000000..5d0c4e324c --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.verify; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; + +import org.jgroups.Channel; +import org.jgroups.ChannelClosedException; +import org.jgroups.ChannelNotConnectedException; +import org.jgroups.Message; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Tests for the JGroupsMessenger class. + * + * @author Matt Ward + */ +public class JGroupsMessengerTest +{ + private Channel channel; + private JGroupsMessenger messenger; + protected String receivedMsg; + + @Before + public void setUp() + { + channel = Mockito.mock(Channel.class); + messenger = new JGroupsMessenger(channel); + receivedMsg = null; + } + + @Test + public void canSendMessage() throws ChannelNotConnectedException, ChannelClosedException, IOException + { + String testText = "This is a test message"; + byte[] testTextSer = serialize(testText); + // When a message is sent... + messenger.send(testText); + + // the underlying channel should have been used to send it, + // but will be called with a serialized version of the text. + verify(channel).send(null, null, testTextSer); + } + + + @Test + public void canReceiveMessage() throws IOException + { + MessageReceiver receiver = new MessageReceiver() + { + @Override + public void onReceive(String message) + { + receivedMsg = message; + } + }; + + messenger.setReceiver(receiver); + Message jgroupsMessage = new Message(null, null, serialize("JGroups message payload")); + // JGroups will call the receive method + messenger.receive(jgroupsMessage); + + // The Messenger should have installed itself as the message + // receiver for the underlying channel. + verify(channel).setReceiver(messenger); + + assertEquals("JGroups message payload", receivedMsg.toString()); + } + + @Test + public void canDelegateIsConnected() + { + Mockito.when(channel.isConnected()).thenReturn(true); + assertEquals(true, messenger.isConnected()); + + Mockito.when(channel.isConnected()).thenReturn(false); + assertEquals(false, messenger.isConnected()); + } + + private byte[] serialize(String text) throws IOException + { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ObjectOutput out = new ObjectOutputStream(bytes); + out.writeObject(text); + out.close(); + bytes.close(); + return bytes.toByteArray(); + } +} + diff --git a/source/java/org/alfresco/repo/cluster/JGroupsTest.java b/source/java/org/alfresco/repo/cluster/JGroupsTest.java new file mode 100644 index 0000000000..e0f928fec4 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/JGroupsTest.java @@ -0,0 +1,180 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import static org.junit.Assert.*; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.util.Collections; + +import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory; +import org.alfresco.util.ApplicationContextHelper; +import org.jgroups.Channel; +import org.jgroups.ChannelException; +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.ReceiverAdapter; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.springframework.context.ApplicationContext; + +/** + * Tests for the JGroups messaging abstractions. + * + * @author Matt Ward + */ +public class JGroupsTest extends ReceiverAdapter +{ + private static ApplicationContext ctx; + private MessengerTestHelper helper; + + @BeforeClass + public static void setUpClass() + { + ctx = ApplicationContextHelper. + getApplicationContext(new String[] { "cluster-test/jgroups-messenger-test.xml" }); + } + + @AfterClass + public static void tearDownClass() + { + ApplicationContextHelper.closeApplicationContext(); + } + + @Before + public void setUp() + { + helper = new MessengerTestHelper(); + } + + @Test + public void canSendWithJGroupsMessengerFactory() throws InterruptedException, ChannelException + { + Channel ch = new JChannel("udp.xml"); + ch.connect("testcluster:testregion"); + ch.setReceiver(this); + + MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory"); + Messenger messenger = messengerFactory.createMessenger(); + messenger.send("Full test including spring."); + + helper.checkMessageReceivedWas("Full test including spring."); + } + + @Test + public void canSendWithJGroupsMessengerFactoryWithoutSpring() throws InterruptedException, ChannelException + { + Channel ch = new JChannel("udp.xml"); + ch.connect("testcluster:testregion"); + ch.setReceiver(this); + + AlfrescoJGroupsChannelFactory channelFactory = new AlfrescoJGroupsChannelFactory(); + channelFactory.setClusterName("testcluster"); + channelFactory.setConfigUrlsByAppRegion(Collections.singletonMap("DEFAULT", "classpath:udp.xml")); + AlfrescoJGroupsChannelFactory.rebuildChannels(); + + JGroupsMessengerFactory messengerFactory = new JGroupsMessengerFactory(); + messengerFactory.setAppRegion("testregion"); + Messenger messenger = messengerFactory.createMessenger(); + messenger.send("This is a test payload."); + + helper.checkMessageReceivedWas("This is a test payload."); + } + + @Test + public void canWrapRawChannels() throws ChannelException, InterruptedException + { + Channel sendCh = new JChannel("udp.xml"); + sendCh.connect("mycluster"); + Messenger messenger = new JGroupsMessenger(sendCh); + + Channel recvCh = new JChannel("udp.xml"); + recvCh.connect("mycluster"); + recvCh.setReceiver(this); + + messenger.send("This message was sent with jgroups"); + + helper.checkMessageReceivedWas("This message was sent with jgroups"); + } + + @Ignore("Not currently allowing multiple receivers per underlying Channel") + @Test + public void canReceiveFromMultipleMessageReceivers() throws InterruptedException, ChannelException + { + Channel ch = new JChannel("udp.xml"); + ch.connect("mycluster"); + Messenger sendMsgr = new JGroupsMessenger(ch); + + Messenger recvMsgr1 = new JGroupsMessenger(ch); + TestMessageReceiver r1 = new TestMessageReceiver(); + recvMsgr1.setReceiver(r1); + + Messenger recvMsgr2 = new JGroupsMessenger(ch); + TestMessageReceiver r2 = new TestMessageReceiver(); + recvMsgr2.setReceiver(r2); + + sendMsgr.send("This message was sent with jgroups"); + + Thread.sleep(50); + assertEquals("This message was sent with jgroups", new String(r1.receivedMsg)); + assertEquals("This message was sent with jgroups", new String(r2.receivedMsg)); + } + + @Override + public void receive(Message msg) + { + ByteArrayInputStream bytes = new ByteArrayInputStream(msg.getBuffer()); + ObjectInput in; + try + { + in = new ObjectInputStream(bytes); + String payload = (String) in.readObject(); + in.close(); + bytes.close(); + helper.setReceivedMsg(payload); + } + catch (IOException e) + { + throw new RuntimeException("Couldn't receive object.", e); + } + catch (ClassNotFoundException e) + { + throw new RuntimeException("Couldn't receive object.", e); + } + } + + + public static class TestMessageReceiver implements MessageReceiver + { + String receivedMsg; + + @Override + public void onReceive(String message) + { + receivedMsg = message; + } + } +} diff --git a/source/java/org/alfresco/repo/cluster/MessageReceiver.java b/source/java/org/alfresco/repo/cluster/MessageReceiver.java new file mode 100644 index 0000000000..474c4adac6 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/MessageReceiver.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import java.io.Serializable; + +/** + * Implement this interface and supply to a {@link Messenger} using + * {@link Messenger#setReceiver(MessageReceiver)} in order to receive + * messages from other {@link Messenger}s. + * + * @author Matt Ward + */ +public interface MessageReceiver +{ + void onReceive(T message); +} diff --git a/source/java/org/alfresco/repo/cluster/MessageSendingException.java b/source/java/org/alfresco/repo/cluster/MessageSendingException.java new file mode 100644 index 0000000000..60d34a3174 --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/MessageSendingException.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.repo.cluster; + +/** + * Thrown when unable to send a message using a {@link Messenger}. + * + * @author Matt Ward + */ +public class MessageSendingException extends RuntimeException +{ + private static final long serialVersionUID = 1L; + + public MessageSendingException(Throwable e) + { + super("Unable to send message", e); + } +} diff --git a/source/java/org/alfresco/repo/cluster/Messenger.java b/source/java/org/alfresco/repo/cluster/Messenger.java new file mode 100644 index 0000000000..0ef889c33f --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/Messenger.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import java.io.Serializable; + +/** + * Provides facilities for peer-to-peer messaging within a cluster. This interface + * is intended to act as a facade, allowing the actual implementation (e.g. JGroups + * or Hazelcast) to be decoupled as much as possible from the Alfresco code base. + *

+ * Instances of this class are parameterised with the type of message payload + * to send and receive. + * + * @author Matt Ward + */ +public interface Messenger +{ + void send(T message); + + void setReceiver(MessageReceiver receiver); + + boolean isConnected(); +} diff --git a/source/java/org/alfresco/repo/cluster/MessengerFactory.java b/source/java/org/alfresco/repo/cluster/MessengerFactory.java new file mode 100644 index 0000000000..6ee3eeca3d --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/MessengerFactory.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import java.io.Serializable; + +/** + * Factory class responsible for creating instances of {@link Messenger} class. + * + * @author Matt Ward + */ +public interface MessengerFactory +{ + Messenger createMessenger(); +} diff --git a/source/java/org/alfresco/repo/cluster/MessengerTestHelper.java b/source/java/org/alfresco/repo/cluster/MessengerTestHelper.java new file mode 100644 index 0000000000..4cd9dcc85e --- /dev/null +++ b/source/java/org/alfresco/repo/cluster/MessengerTestHelper.java @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2005-2012 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.cluster; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; + +/** + * Helper class for testing Messenger related code. + * + * @author Matt Ward + */ +public class MessengerTestHelper +{ + private String receivedMsg; + private final static int SLEEP_MILLIS = 5; + + + public MessengerTestHelper() + { + setReceivedMsg(null); + } + + /** + * Try to avoid intermitten test failures by trying multiple times. Hopefully the messge + * will have been received after the very first sleep, but in a slow environment it may take longer. + * This also allows the sleep time to be lower - rather than waiting for say 500 ms, we can try 10 times + * at 5 ms - with a chance that we can return after the initial 5 ms. + * + * @param expectedMsg + * @throws InterruptedException + */ + public void checkMessageReceivedWas(String expectedMsg) + { + int tries = 0; + + while (tries < 10) + { + try + { + Thread.sleep(SLEEP_MILLIS); + } + catch (InterruptedException e) + { + // Carry on + e.printStackTrace(); + } + if (getReceivedMsg() != null) + { + assertEquals(expectedMsg, getReceivedMsg()); + return; + } + tries++; + } + fail("No message received, tried " + tries + + " times, sleeping " + SLEEP_MILLIS + "ms each time."); + } + + /** + * @return the receivedMsg + */ + public String getReceivedMsg() + { + return this.receivedMsg; + } + + /** + * @param receivedMsg the receivedMsg to set + */ + public void setReceivedMsg(String receivedMsg) + { + this.receivedMsg = receivedMsg; + } +} diff --git a/source/test-resources/cluster-test/hazelcast-messenger-test.xml b/source/test-resources/cluster-test/hazelcast-messenger-test.xml new file mode 100644 index 0000000000..b0425f1a0c --- /dev/null +++ b/source/test-resources/cluster-test/hazelcast-messenger-test.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/source/test-resources/cluster-test/jgroups-messenger-test.xml b/source/test-resources/cluster-test/jgroups-messenger-test.xml new file mode 100644 index 0000000000..46f89844df --- /dev/null +++ b/source/test-resources/cluster-test/jgroups-messenger-test.xml @@ -0,0 +1,27 @@ + + + + + + + testcluster + + + + + classpath:udp.xml + + + + + + + + + + + \ No newline at end of file