mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
Messenger facade, a few miscellaneous bits
* more logging * comments * more tests git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@33894 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -21,6 +21,8 @@ package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.extensions.surf.util.ParameterCheck;
|
||||
|
||||
import com.hazelcast.core.ITopic;
|
||||
@@ -37,7 +39,7 @@ public class HazelcastMessenger<T extends Serializable> implements Messenger<T>,
|
||||
private ITopic<T> topic;
|
||||
private MessageReceiver<T> receiverDelegate;
|
||||
private String address;
|
||||
|
||||
private final static Log logger = LogFactory.getLog(HazelcastMessenger.class);
|
||||
/**
|
||||
* @param topic
|
||||
*/
|
||||
@@ -51,6 +53,10 @@ public class HazelcastMessenger<T extends Serializable> implements Messenger<T>,
|
||||
@Override
|
||||
public void send(T message)
|
||||
{
|
||||
if (logger.isTraceEnabled())
|
||||
{
|
||||
logger.trace("Sending " + message);
|
||||
}
|
||||
topic.publish(message);
|
||||
}
|
||||
|
||||
@@ -66,7 +72,10 @@ public class HazelcastMessenger<T extends Serializable> implements Messenger<T>,
|
||||
@Override
|
||||
public void onMessage(T message)
|
||||
{
|
||||
ParameterCheck.mandatory("message", message);
|
||||
if (logger.isTraceEnabled())
|
||||
{
|
||||
logger.trace("Received (will be delegated to receiver): " + message);
|
||||
}
|
||||
receiverDelegate.onReceive(message);
|
||||
}
|
||||
|
||||
@@ -86,5 +95,5 @@ public class HazelcastMessenger<T extends Serializable> implements Messenger<T>,
|
||||
public String getAddress()
|
||||
{
|
||||
return address;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -20,11 +20,12 @@
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
|
||||
import org.alfresco.repo.cluster.MessengerTestHelper.TestMessageReceiver;
|
||||
import org.alfresco.util.ApplicationContextHelper;
|
||||
import org.jgroups.ChannelException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
@@ -68,13 +69,9 @@ public class HazelcastTest implements MessageListener<String>
|
||||
|
||||
|
||||
@Test
|
||||
public void canSendWithHazelcastMessengerFactory() throws InterruptedException, ChannelException
|
||||
public void canSendWithHazelcastMessengerFactory() throws InterruptedException
|
||||
{
|
||||
Config config = new Config();
|
||||
GroupConfig groupConfig = new GroupConfig();
|
||||
groupConfig.setName("testcluster");
|
||||
groupConfig.setPassword("secret");
|
||||
config.setGroupConfig(groupConfig);
|
||||
Config config = createConfig();
|
||||
HazelcastInstance hi = Hazelcast.newHazelcastInstance(config);
|
||||
ITopic<String> topic = hi.getTopic("testregion");
|
||||
|
||||
@@ -87,10 +84,42 @@ public class HazelcastTest implements MessageListener<String>
|
||||
helper.checkMessageReceivedWas("Full test including spring.");
|
||||
}
|
||||
|
||||
@Ignore("Behaviour not yet implemented.")
|
||||
@Test
|
||||
public void messengerWillNotReceiveMessagesFromSelf() throws InterruptedException
|
||||
{
|
||||
MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory");
|
||||
Messenger<String> m1 = messengerFactory.createMessenger("testregion");
|
||||
TestMessageReceiver r1 = new TestMessageReceiver();
|
||||
m1.setReceiver(r1);
|
||||
|
||||
Config config = createConfig();
|
||||
HazelcastInstance hi = Hazelcast.newHazelcastInstance(config);
|
||||
ITopic<String> topic2 = hi.getTopic("testregion");
|
||||
String address2 = hi.getCluster().getLocalMember().getInetSocketAddress().toString();
|
||||
Messenger<String> m2 = new HazelcastMessenger<String>(topic2, address2);
|
||||
TestMessageReceiver r2 = new TestMessageReceiver();
|
||||
m2.setReceiver(r2);
|
||||
|
||||
m1.send("This should be received by r2 but not r1");
|
||||
|
||||
r2.helper.checkMessageReceivedWas("This should be received by r2 but not r1");
|
||||
r1.helper.checkNoMessageReceived();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String message)
|
||||
{
|
||||
helper.setReceivedMsg(message);
|
||||
}
|
||||
|
||||
private Config createConfig()
|
||||
{
|
||||
Config config = new Config();
|
||||
GroupConfig groupConfig = new GroupConfig();
|
||||
groupConfig.setName("testcluster");
|
||||
groupConfig.setPassword("secret");
|
||||
config.setGroupConfig(groupConfig);
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
@@ -19,7 +19,7 @@
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
@@ -27,6 +27,7 @@ import java.io.ObjectInput;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.alfresco.repo.cluster.MessengerTestHelper.TestMessageReceiver;
|
||||
import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory;
|
||||
import org.alfresco.util.ApplicationContextHelper;
|
||||
import org.jgroups.Channel;
|
||||
@@ -37,7 +38,6 @@ import org.jgroups.ReceiverAdapter;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
@@ -132,27 +132,24 @@ public class JGroupsTest extends ReceiverAdapter
|
||||
assertEquals(true, messengerFactory.isClusterActive());
|
||||
}
|
||||
|
||||
@Ignore("Not currently allowing multiple receivers per underlying Channel")
|
||||
@Test
|
||||
public void canReceiveFromMultipleMessageReceivers() throws InterruptedException, ChannelException
|
||||
public void messengerWillNotReceiveMessagesFromSelf() throws InterruptedException, ChannelException
|
||||
{
|
||||
Channel ch = new JChannel("udp.xml");
|
||||
ch.connect("mycluster");
|
||||
Messenger<String> sendMsgr = new JGroupsMessenger<String>(ch);
|
||||
|
||||
Messenger<String> recvMsgr1 = new JGroupsMessenger<String>(ch);
|
||||
MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory");
|
||||
Messenger<String> m1 = messengerFactory.createMessenger("testregion");
|
||||
TestMessageReceiver r1 = new TestMessageReceiver();
|
||||
recvMsgr1.setReceiver(r1);
|
||||
m1.setReceiver(r1);
|
||||
|
||||
Messenger<String> recvMsgr2 = new JGroupsMessenger<String>(ch);
|
||||
Channel ch2 = new JChannel("udp.xml");
|
||||
ch2.connect("testcluster:testregion");
|
||||
Messenger<String> m2 = new JGroupsMessenger<String>(ch2);
|
||||
TestMessageReceiver r2 = new TestMessageReceiver();
|
||||
recvMsgr2.setReceiver(r2);
|
||||
m2.setReceiver(r2);
|
||||
|
||||
sendMsgr.send("This message was sent with jgroups");
|
||||
m1.send("This should be received by r2 but not r1");
|
||||
|
||||
Thread.sleep(50);
|
||||
assertEquals("This message was sent with jgroups", new String(r1.receivedMsg));
|
||||
assertEquals("This message was sent with jgroups", new String(r2.receivedMsg));
|
||||
r2.helper.checkMessageReceivedWas("This should be received by r2 but not r1");
|
||||
r1.helper.checkNoMessageReceived();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -177,16 +174,4 @@ public class JGroupsTest extends ReceiverAdapter
|
||||
throw new RuntimeException("Couldn't receive object.", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class TestMessageReceiver implements MessageReceiver<String>
|
||||
{
|
||||
String receivedMsg;
|
||||
|
||||
@Override
|
||||
public void onReceive(String message)
|
||||
{
|
||||
receivedMsg = message;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -31,6 +31,7 @@ public class MessengerTestHelper
|
||||
{
|
||||
private String receivedMsg;
|
||||
private final static int SLEEP_MILLIS = 50;
|
||||
private static final int MAX_TRIES = 30;
|
||||
|
||||
|
||||
public MessengerTestHelper()
|
||||
@@ -51,7 +52,7 @@ public class MessengerTestHelper
|
||||
{
|
||||
int tries = 0;
|
||||
|
||||
while (tries < 30)
|
||||
while (tries < MAX_TRIES)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -72,6 +73,32 @@ public class MessengerTestHelper
|
||||
fail("No message received, tried " + tries +
|
||||
" times, sleeping " + SLEEP_MILLIS + "ms each time.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that no message was received in the given period.
|
||||
*/
|
||||
public void checkNoMessageReceived()
|
||||
{
|
||||
int tries = 0;
|
||||
|
||||
while (tries < MAX_TRIES)
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(SLEEP_MILLIS);
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
// Carry on
|
||||
e.printStackTrace();
|
||||
}
|
||||
if (getReceivedMsg() != null)
|
||||
{
|
||||
fail("Message received but should NOT have been. Message was: " + getReceivedMsg());
|
||||
}
|
||||
tries++;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the receivedMsg
|
||||
@@ -88,4 +115,16 @@ public class MessengerTestHelper
|
||||
{
|
||||
this.receivedMsg = receivedMsg;
|
||||
}
|
||||
|
||||
|
||||
public static class TestMessageReceiver implements MessageReceiver<String>
|
||||
{
|
||||
MessengerTestHelper helper = new MessengerTestHelper();
|
||||
|
||||
@Override
|
||||
public void onReceive(String message)
|
||||
{
|
||||
helper.setReceivedMsg(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -3,6 +3,11 @@
|
||||
|
||||
<beans>
|
||||
|
||||
<!--
|
||||
Unlike the JGroupsMessengerFactory, this messengerFactory bean doesn't require an explicit
|
||||
dependency (through the depends-on attribute) as it specifies the dependency through the
|
||||
reference to the hazelcastInstance bean.
|
||||
-->
|
||||
<bean id="messengerFactory" class="org.alfresco.repo.cluster.HazelcastMessengerFactory">
|
||||
<property name="hazelcast" ref="hazelcastInstance"/>
|
||||
</bean>
|
||||
|
Reference in New Issue
Block a user