diff --git a/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java b/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java index bfedc4b576..28309a10b5 100644 --- a/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java +++ b/source/java/org/alfresco/repo/cluster/HazelcastMessenger.java @@ -21,6 +21,8 @@ package org.alfresco.repo.cluster; import java.io.Serializable; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.extensions.surf.util.ParameterCheck; import com.hazelcast.core.ITopic; @@ -37,7 +39,7 @@ public class HazelcastMessenger implements Messenger, private ITopic topic; private MessageReceiver receiverDelegate; private String address; - + private final static Log logger = LogFactory.getLog(HazelcastMessenger.class); /** * @param topic */ @@ -51,6 +53,10 @@ public class HazelcastMessenger implements Messenger, @Override public void send(T message) { + if (logger.isTraceEnabled()) + { + logger.trace("Sending " + message); + } topic.publish(message); } @@ -66,7 +72,10 @@ public class HazelcastMessenger implements Messenger, @Override public void onMessage(T message) { - ParameterCheck.mandatory("message", message); + if (logger.isTraceEnabled()) + { + logger.trace("Received (will be delegated to receiver): " + message); + } receiverDelegate.onReceive(message); } @@ -86,5 +95,5 @@ public class HazelcastMessenger implements Messenger, public String getAddress() { return address; - } + } } diff --git a/source/java/org/alfresco/repo/cluster/HazelcastTest.java b/source/java/org/alfresco/repo/cluster/HazelcastTest.java index f8ed604887..29ee9f6638 100644 --- a/source/java/org/alfresco/repo/cluster/HazelcastTest.java +++ b/source/java/org/alfresco/repo/cluster/HazelcastTest.java @@ -20,11 +20,12 @@ package org.alfresco.repo.cluster; +import org.alfresco.repo.cluster.MessengerTestHelper.TestMessageReceiver; import org.alfresco.util.ApplicationContextHelper; -import org.jgroups.ChannelException; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.springframework.context.ApplicationContext; @@ -68,13 +69,9 @@ public class HazelcastTest implements MessageListener @Test - public void canSendWithHazelcastMessengerFactory() throws InterruptedException, ChannelException + public void canSendWithHazelcastMessengerFactory() throws InterruptedException { - Config config = new Config(); - GroupConfig groupConfig = new GroupConfig(); - groupConfig.setName("testcluster"); - groupConfig.setPassword("secret"); - config.setGroupConfig(groupConfig); + Config config = createConfig(); HazelcastInstance hi = Hazelcast.newHazelcastInstance(config); ITopic topic = hi.getTopic("testregion"); @@ -87,10 +84,42 @@ public class HazelcastTest implements MessageListener helper.checkMessageReceivedWas("Full test including spring."); } + @Ignore("Behaviour not yet implemented.") + @Test + public void messengerWillNotReceiveMessagesFromSelf() throws InterruptedException + { + MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory"); + Messenger m1 = messengerFactory.createMessenger("testregion"); + TestMessageReceiver r1 = new TestMessageReceiver(); + m1.setReceiver(r1); + + Config config = createConfig(); + HazelcastInstance hi = Hazelcast.newHazelcastInstance(config); + ITopic topic2 = hi.getTopic("testregion"); + String address2 = hi.getCluster().getLocalMember().getInetSocketAddress().toString(); + Messenger m2 = new HazelcastMessenger(topic2, address2); + TestMessageReceiver r2 = new TestMessageReceiver(); + m2.setReceiver(r2); + + m1.send("This should be received by r2 but not r1"); + + r2.helper.checkMessageReceivedWas("This should be received by r2 but not r1"); + r1.helper.checkNoMessageReceived(); + } @Override public void onMessage(String message) { helper.setReceivedMsg(message); } + + private Config createConfig() + { + Config config = new Config(); + GroupConfig groupConfig = new GroupConfig(); + groupConfig.setName("testcluster"); + groupConfig.setPassword("secret"); + config.setGroupConfig(groupConfig); + return config; + } } diff --git a/source/java/org/alfresco/repo/cluster/JGroupsTest.java b/source/java/org/alfresco/repo/cluster/JGroupsTest.java index 40c84f6229..66fd420d1d 100644 --- a/source/java/org/alfresco/repo/cluster/JGroupsTest.java +++ b/source/java/org/alfresco/repo/cluster/JGroupsTest.java @@ -19,7 +19,7 @@ package org.alfresco.repo.cluster; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -27,6 +27,7 @@ import java.io.ObjectInput; import java.io.ObjectInputStream; import java.util.Collections; +import org.alfresco.repo.cluster.MessengerTestHelper.TestMessageReceiver; import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory; import org.alfresco.util.ApplicationContextHelper; import org.jgroups.Channel; @@ -37,7 +38,6 @@ import org.jgroups.ReceiverAdapter; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.springframework.context.ApplicationContext; @@ -132,27 +132,24 @@ public class JGroupsTest extends ReceiverAdapter assertEquals(true, messengerFactory.isClusterActive()); } - @Ignore("Not currently allowing multiple receivers per underlying Channel") @Test - public void canReceiveFromMultipleMessageReceivers() throws InterruptedException, ChannelException + public void messengerWillNotReceiveMessagesFromSelf() throws InterruptedException, ChannelException { - Channel ch = new JChannel("udp.xml"); - ch.connect("mycluster"); - Messenger sendMsgr = new JGroupsMessenger(ch); - - Messenger recvMsgr1 = new JGroupsMessenger(ch); + MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory"); + Messenger m1 = messengerFactory.createMessenger("testregion"); TestMessageReceiver r1 = new TestMessageReceiver(); - recvMsgr1.setReceiver(r1); + m1.setReceiver(r1); - Messenger recvMsgr2 = new JGroupsMessenger(ch); + Channel ch2 = new JChannel("udp.xml"); + ch2.connect("testcluster:testregion"); + Messenger m2 = new JGroupsMessenger(ch2); TestMessageReceiver r2 = new TestMessageReceiver(); - recvMsgr2.setReceiver(r2); + m2.setReceiver(r2); - sendMsgr.send("This message was sent with jgroups"); + m1.send("This should be received by r2 but not r1"); - Thread.sleep(50); - assertEquals("This message was sent with jgroups", new String(r1.receivedMsg)); - assertEquals("This message was sent with jgroups", new String(r2.receivedMsg)); + r2.helper.checkMessageReceivedWas("This should be received by r2 but not r1"); + r1.helper.checkNoMessageReceived(); } @Override @@ -177,16 +174,4 @@ public class JGroupsTest extends ReceiverAdapter throw new RuntimeException("Couldn't receive object.", e); } } - - - public static class TestMessageReceiver implements MessageReceiver - { - String receivedMsg; - - @Override - public void onReceive(String message) - { - receivedMsg = message; - } - } } diff --git a/source/java/org/alfresco/repo/cluster/MessengerTestHelper.java b/source/java/org/alfresco/repo/cluster/MessengerTestHelper.java index 58b49254cf..cdab15a0d4 100644 --- a/source/java/org/alfresco/repo/cluster/MessengerTestHelper.java +++ b/source/java/org/alfresco/repo/cluster/MessengerTestHelper.java @@ -31,6 +31,7 @@ public class MessengerTestHelper { private String receivedMsg; private final static int SLEEP_MILLIS = 50; + private static final int MAX_TRIES = 30; public MessengerTestHelper() @@ -51,7 +52,7 @@ public class MessengerTestHelper { int tries = 0; - while (tries < 30) + while (tries < MAX_TRIES) { try { @@ -72,6 +73,32 @@ public class MessengerTestHelper fail("No message received, tried " + tries + " times, sleeping " + SLEEP_MILLIS + "ms each time."); } + + /** + * Assert that no message was received in the given period. + */ + public void checkNoMessageReceived() + { + int tries = 0; + + while (tries < MAX_TRIES) + { + try + { + Thread.sleep(SLEEP_MILLIS); + } + catch (InterruptedException e) + { + // Carry on + e.printStackTrace(); + } + if (getReceivedMsg() != null) + { + fail("Message received but should NOT have been. Message was: " + getReceivedMsg()); + } + tries++; + } + } /** * @return the receivedMsg @@ -88,4 +115,16 @@ public class MessengerTestHelper { this.receivedMsg = receivedMsg; } + + + public static class TestMessageReceiver implements MessageReceiver + { + MessengerTestHelper helper = new MessengerTestHelper(); + + @Override + public void onReceive(String message) + { + helper.setReceivedMsg(message); + } + } } diff --git a/source/test-resources/cluster-test/hazelcast-messenger-test.xml b/source/test-resources/cluster-test/hazelcast-messenger-test.xml index 73ad0663b4..e73db0c7ff 100644 --- a/source/test-resources/cluster-test/hazelcast-messenger-test.xml +++ b/source/test-resources/cluster-test/hazelcast-messenger-test.xml @@ -3,6 +3,11 @@ +