mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
Messaging facade for clustered communications to allow implementations from JGroups or Hazelcast to be used.
* PropertyBackedBeanExporter uses the facade, configured with JGroups messaging implementation. * Unfortunately, some remodelling required before this will work with Ehcache's CacheManagerPeerProviderFactory git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@33830 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -0,0 +1,83 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.springframework.extensions.surf.util.ParameterCheck;
|
||||
|
||||
import com.hazelcast.core.ITopic;
|
||||
import com.hazelcast.core.MessageListener;
|
||||
|
||||
/**
|
||||
* Hazelcast-based implementation of the {@link Messenger} interface.
|
||||
*
|
||||
* @see HazelcastMessengerFactory
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class HazelcastMessenger<T extends Serializable> implements Messenger<T>, MessageListener<T>
|
||||
{
|
||||
private ITopic<T> topic;
|
||||
private MessageReceiver<T> receiverDelegate;
|
||||
|
||||
|
||||
/**
|
||||
* @param topic
|
||||
*/
|
||||
public HazelcastMessenger(ITopic<T> topic)
|
||||
{
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void send(T message)
|
||||
{
|
||||
topic.publish(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReceiver(MessageReceiver<T> receiver)
|
||||
{
|
||||
// Install a delegate to ready to handle incoming messages.
|
||||
receiverDelegate = receiver;
|
||||
// Start receiving messages.
|
||||
topic.addMessageListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(T message)
|
||||
{
|
||||
ParameterCheck.mandatory("message", message);
|
||||
receiverDelegate.onReceive(message);
|
||||
}
|
||||
|
||||
protected String getTopicName()
|
||||
{
|
||||
return topic.getName();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isConnected()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.alfresco.util.PropertyCheck;
|
||||
|
||||
import com.hazelcast.core.ITopic;
|
||||
|
||||
/**
|
||||
* Hazelcast-based implementation of the {@link MessengerFactory} interface.
|
||||
* The factory must be configured with an {@link ITopic} - which
|
||||
* should be configured with a topic name that corresponds to an application
|
||||
* region.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class HazelcastMessengerFactory implements MessengerFactory
|
||||
{
|
||||
private ITopic<Object> topic;
|
||||
|
||||
|
||||
/**
|
||||
* @param topic the topic to set
|
||||
*/
|
||||
public void setTopic(ITopic<Object> topic)
|
||||
{
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
/*
|
||||
* @see org.alfresco.repo.cluster.MessengerFactory#createMessenger()
|
||||
*/
|
||||
@Override
|
||||
public <T extends Serializable> Messenger<T> createMessenger()
|
||||
{
|
||||
PropertyCheck.mandatory(this, "topic", topic);
|
||||
return new HazelcastMessenger(topic);
|
||||
}
|
||||
}
|
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.*;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import com.hazelcast.core.ITopic;
|
||||
|
||||
|
||||
/**
|
||||
* Tests for the HazelcastMessengerFactory class.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class HazelcastMessengerFactoryTest
|
||||
{
|
||||
private HazelcastMessengerFactory factory;
|
||||
private @Mock ITopic<Object> topic;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
factory = new HazelcastMessengerFactory();
|
||||
factory.setTopic(topic);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void topicWrappedInMessenger()
|
||||
{
|
||||
when(topic.getName()).thenReturn("app-region");
|
||||
HazelcastMessenger messenger = (HazelcastMessenger) factory.createMessenger();
|
||||
assertEquals("app-region", messenger.getTopicName());
|
||||
}
|
||||
}
|
@@ -0,0 +1,79 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import com.hazelcast.core.ITopic;
|
||||
|
||||
/**
|
||||
* Tests for the HazelcastMessenger class.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class HazelcastMessengerTest
|
||||
{
|
||||
private @Mock ITopic<String> topic;
|
||||
private HazelcastMessenger<String> messenger;
|
||||
private String receivedMsg;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
messenger = new HazelcastMessenger<String>(topic);
|
||||
receivedMsg = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canSendMessage()
|
||||
{
|
||||
messenger.send("Test string");
|
||||
verify(topic).publish("Test string");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canReceiveMessage()
|
||||
{
|
||||
messenger.setReceiver(new MessageReceiver<String>()
|
||||
{
|
||||
@Override
|
||||
public void onReceive(String message)
|
||||
{
|
||||
receivedMsg = new String(message);
|
||||
}
|
||||
});
|
||||
|
||||
// Hazelcast will call the onMessage method...
|
||||
messenger.onMessage("Hazelcast is sending a message.");
|
||||
|
||||
// setReceiver() should have resulted in a listener being registered with the topic.
|
||||
verify(topic).addMessageListener(messenger);
|
||||
|
||||
assertEquals("Hazelcast is sending a message.", receivedMsg);
|
||||
}
|
||||
}
|
95
source/java/org/alfresco/repo/cluster/HazelcastTest.java
Normal file
95
source/java/org/alfresco/repo/cluster/HazelcastTest.java
Normal file
@@ -0,0 +1,95 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
|
||||
import org.alfresco.util.ApplicationContextHelper;
|
||||
import org.jgroups.ChannelException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import com.hazelcast.config.Config;
|
||||
import com.hazelcast.config.GroupConfig;
|
||||
import com.hazelcast.core.Hazelcast;
|
||||
import com.hazelcast.core.HazelcastInstance;
|
||||
import com.hazelcast.core.ITopic;
|
||||
import com.hazelcast.core.MessageListener;
|
||||
|
||||
/**
|
||||
* Tests for Hazelcast implementations of {@link Messenger} and related classes.
|
||||
* These are integration tests and configured through a Spring test context file.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class HazelcastTest implements MessageListener<String>
|
||||
{
|
||||
private static ApplicationContext ctx;
|
||||
private MessengerTestHelper helper;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpClass()
|
||||
{
|
||||
ctx = ApplicationContextHelper.
|
||||
getApplicationContext(new String[] { "cluster-test/hazelcast-messenger-test.xml" });
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownClass()
|
||||
{
|
||||
ApplicationContextHelper.closeApplicationContext();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
helper = new MessengerTestHelper();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void canSendWithHazelcastMessengerFactory() throws InterruptedException, ChannelException
|
||||
{
|
||||
Config config = new Config();
|
||||
GroupConfig groupConfig = new GroupConfig();
|
||||
groupConfig.setName("testcluster");
|
||||
groupConfig.setPassword("secret");
|
||||
config.setGroupConfig(groupConfig);
|
||||
HazelcastInstance hi = Hazelcast.newHazelcastInstance(config);
|
||||
ITopic<String> topic = hi.getTopic("testregion");
|
||||
|
||||
topic.addMessageListener(this);
|
||||
|
||||
MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory");
|
||||
Messenger<String> messenger = messengerFactory.createMessenger();
|
||||
messenger.send("Full test including spring.");
|
||||
|
||||
helper.checkMessageReceivedWas("Full test including spring.");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onMessage(String message)
|
||||
{
|
||||
helper.setReceivedMsg(message);
|
||||
}
|
||||
}
|
127
source/java/org/alfresco/repo/cluster/JGroupsMessenger.java
Normal file
127
source/java/org/alfresco/repo/cluster/JGroupsMessenger.java
Normal file
@@ -0,0 +1,127 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInput;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutput;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.jgroups.Channel;
|
||||
import org.jgroups.Message;
|
||||
import org.jgroups.ReceiverAdapter;
|
||||
|
||||
/**
|
||||
* JGroups implementation of the {@link Messenger} class.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class JGroupsMessenger<T extends Serializable> extends ReceiverAdapter implements Messenger<T>
|
||||
{
|
||||
private final Channel channel;
|
||||
private MessageReceiver<T> receiverDelegate;
|
||||
|
||||
|
||||
/**
|
||||
* Construct a messenger that wraps a JGroups Channel.
|
||||
*
|
||||
* @param channel
|
||||
*/
|
||||
public JGroupsMessenger(Channel channel)
|
||||
{
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void send(T message)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Serializing the message ourselves and passing a byte[]
|
||||
// to Channel.send() as recommended by JGroups.
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
ObjectOutput out = new ObjectOutputStream(bytes);
|
||||
out.writeObject(message);
|
||||
out.close();
|
||||
bytes.close();
|
||||
channel.send(null, null, bytes.toByteArray());
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
throw new MessageSendingException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @see org.alfresco.repo.cluster.Messenger#setReceiver(org.alfresco.repo.cluster.MessageReceiver)
|
||||
*/
|
||||
@Override
|
||||
public void setReceiver(MessageReceiver<T> receiver)
|
||||
{
|
||||
// Make sure the delegate is installed, before starting to receive messages.
|
||||
receiverDelegate = receiver;
|
||||
// Start receiving messages and dispatch them to the delegate.
|
||||
channel.setReceiver(this);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* @see org.jgroups.ReceiverAdapter#receive(org.jgroups.Message)
|
||||
*/
|
||||
@Override
|
||||
public void receive(Message msg)
|
||||
{
|
||||
// Deserializing the message ourselves rather than using
|
||||
// the Message's getObject() method (as recommended by JGroups).
|
||||
byte[] msgBytes = msg.getBuffer();
|
||||
ByteArrayInputStream bytes = new ByteArrayInputStream(msgBytes);
|
||||
ObjectInput in;
|
||||
try
|
||||
{
|
||||
in = new ObjectInputStream(bytes);
|
||||
@SuppressWarnings("unchecked")
|
||||
T payload = (T) in.readObject();
|
||||
in.close();
|
||||
bytes.close();
|
||||
// Pass the deserialized payload on to the receiver delegate
|
||||
receiverDelegate.onReceive(payload);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new RuntimeException("Couldn't receive object.", e);
|
||||
}
|
||||
catch (ClassNotFoundException e)
|
||||
{
|
||||
throw new RuntimeException("Couldn't receive object.", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isConnected()
|
||||
{
|
||||
return channel.isConnected();
|
||||
}
|
||||
}
|
@@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory;
|
||||
import org.alfresco.util.PropertyCheck;
|
||||
import org.jgroups.Channel;
|
||||
import org.springframework.beans.factory.annotation.Required;
|
||||
|
||||
/**
|
||||
* JGroups implementation of the {@link MessengerFactory} interface.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class JGroupsMessengerFactory implements MessengerFactory
|
||||
{
|
||||
private String appRegion;
|
||||
|
||||
|
||||
@Override
|
||||
public <T extends Serializable> Messenger<T> createMessenger()
|
||||
{
|
||||
PropertyCheck.mandatory(this, "appRegion", appRegion);
|
||||
Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
|
||||
return new JGroupsMessenger<T>(channel);
|
||||
}
|
||||
|
||||
@Required
|
||||
public void setAppRegion(String appRegion)
|
||||
{
|
||||
this.appRegion = appRegion;
|
||||
}
|
||||
}
|
115
source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java
Normal file
115
source/java/org/alfresco/repo/cluster/JGroupsMessengerTest.java
Normal file
@@ -0,0 +1,115 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectOutput;
|
||||
import java.io.ObjectOutputStream;
|
||||
|
||||
import org.jgroups.Channel;
|
||||
import org.jgroups.ChannelClosedException;
|
||||
import org.jgroups.ChannelNotConnectedException;
|
||||
import org.jgroups.Message;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* Tests for the JGroupsMessenger class.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class JGroupsMessengerTest
|
||||
{
|
||||
private Channel channel;
|
||||
private JGroupsMessenger<String> messenger;
|
||||
protected String receivedMsg;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
channel = Mockito.mock(Channel.class);
|
||||
messenger = new JGroupsMessenger<String>(channel);
|
||||
receivedMsg = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canSendMessage() throws ChannelNotConnectedException, ChannelClosedException, IOException
|
||||
{
|
||||
String testText = "This is a test message";
|
||||
byte[] testTextSer = serialize(testText);
|
||||
// When a message is sent...
|
||||
messenger.send(testText);
|
||||
|
||||
// the underlying channel should have been used to send it,
|
||||
// but will be called with a serialized version of the text.
|
||||
verify(channel).send(null, null, testTextSer);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void canReceiveMessage() throws IOException
|
||||
{
|
||||
MessageReceiver<String> receiver = new MessageReceiver<String>()
|
||||
{
|
||||
@Override
|
||||
public void onReceive(String message)
|
||||
{
|
||||
receivedMsg = message;
|
||||
}
|
||||
};
|
||||
|
||||
messenger.setReceiver(receiver);
|
||||
Message jgroupsMessage = new Message(null, null, serialize("JGroups message payload"));
|
||||
// JGroups will call the receive method
|
||||
messenger.receive(jgroupsMessage);
|
||||
|
||||
// The Messenger should have installed itself as the message
|
||||
// receiver for the underlying channel.
|
||||
verify(channel).setReceiver(messenger);
|
||||
|
||||
assertEquals("JGroups message payload", receivedMsg.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canDelegateIsConnected()
|
||||
{
|
||||
Mockito.when(channel.isConnected()).thenReturn(true);
|
||||
assertEquals(true, messenger.isConnected());
|
||||
|
||||
Mockito.when(channel.isConnected()).thenReturn(false);
|
||||
assertEquals(false, messenger.isConnected());
|
||||
}
|
||||
|
||||
private byte[] serialize(String text) throws IOException
|
||||
{
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
ObjectOutput out = new ObjectOutputStream(bytes);
|
||||
out.writeObject(text);
|
||||
out.close();
|
||||
bytes.close();
|
||||
return bytes.toByteArray();
|
||||
}
|
||||
}
|
||||
|
180
source/java/org/alfresco/repo/cluster/JGroupsTest.java
Normal file
180
source/java/org/alfresco/repo/cluster/JGroupsTest.java
Normal file
@@ -0,0 +1,180 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInput;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory;
|
||||
import org.alfresco.util.ApplicationContextHelper;
|
||||
import org.jgroups.Channel;
|
||||
import org.jgroups.ChannelException;
|
||||
import org.jgroups.JChannel;
|
||||
import org.jgroups.Message;
|
||||
import org.jgroups.ReceiverAdapter;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
/**
|
||||
* Tests for the JGroups messaging abstractions.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class JGroupsTest extends ReceiverAdapter
|
||||
{
|
||||
private static ApplicationContext ctx;
|
||||
private MessengerTestHelper helper;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpClass()
|
||||
{
|
||||
ctx = ApplicationContextHelper.
|
||||
getApplicationContext(new String[] { "cluster-test/jgroups-messenger-test.xml" });
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownClass()
|
||||
{
|
||||
ApplicationContextHelper.closeApplicationContext();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
helper = new MessengerTestHelper();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canSendWithJGroupsMessengerFactory() throws InterruptedException, ChannelException
|
||||
{
|
||||
Channel ch = new JChannel("udp.xml");
|
||||
ch.connect("testcluster:testregion");
|
||||
ch.setReceiver(this);
|
||||
|
||||
MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory");
|
||||
Messenger<String> messenger = messengerFactory.createMessenger();
|
||||
messenger.send("Full test including spring.");
|
||||
|
||||
helper.checkMessageReceivedWas("Full test including spring.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canSendWithJGroupsMessengerFactoryWithoutSpring() throws InterruptedException, ChannelException
|
||||
{
|
||||
Channel ch = new JChannel("udp.xml");
|
||||
ch.connect("testcluster:testregion");
|
||||
ch.setReceiver(this);
|
||||
|
||||
AlfrescoJGroupsChannelFactory channelFactory = new AlfrescoJGroupsChannelFactory();
|
||||
channelFactory.setClusterName("testcluster");
|
||||
channelFactory.setConfigUrlsByAppRegion(Collections.singletonMap("DEFAULT", "classpath:udp.xml"));
|
||||
AlfrescoJGroupsChannelFactory.rebuildChannels();
|
||||
|
||||
JGroupsMessengerFactory messengerFactory = new JGroupsMessengerFactory();
|
||||
messengerFactory.setAppRegion("testregion");
|
||||
Messenger<String> messenger = messengerFactory.createMessenger();
|
||||
messenger.send("This is a test payload.");
|
||||
|
||||
helper.checkMessageReceivedWas("This is a test payload.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canWrapRawChannels() throws ChannelException, InterruptedException
|
||||
{
|
||||
Channel sendCh = new JChannel("udp.xml");
|
||||
sendCh.connect("mycluster");
|
||||
Messenger<String> messenger = new JGroupsMessenger<String>(sendCh);
|
||||
|
||||
Channel recvCh = new JChannel("udp.xml");
|
||||
recvCh.connect("mycluster");
|
||||
recvCh.setReceiver(this);
|
||||
|
||||
messenger.send("This message was sent with jgroups");
|
||||
|
||||
helper.checkMessageReceivedWas("This message was sent with jgroups");
|
||||
}
|
||||
|
||||
@Ignore("Not currently allowing multiple receivers per underlying Channel")
|
||||
@Test
|
||||
public void canReceiveFromMultipleMessageReceivers() throws InterruptedException, ChannelException
|
||||
{
|
||||
Channel ch = new JChannel("udp.xml");
|
||||
ch.connect("mycluster");
|
||||
Messenger<String> sendMsgr = new JGroupsMessenger<String>(ch);
|
||||
|
||||
Messenger<String> recvMsgr1 = new JGroupsMessenger<String>(ch);
|
||||
TestMessageReceiver r1 = new TestMessageReceiver();
|
||||
recvMsgr1.setReceiver(r1);
|
||||
|
||||
Messenger<String> recvMsgr2 = new JGroupsMessenger<String>(ch);
|
||||
TestMessageReceiver r2 = new TestMessageReceiver();
|
||||
recvMsgr2.setReceiver(r2);
|
||||
|
||||
sendMsgr.send("This message was sent with jgroups");
|
||||
|
||||
Thread.sleep(50);
|
||||
assertEquals("This message was sent with jgroups", new String(r1.receivedMsg));
|
||||
assertEquals("This message was sent with jgroups", new String(r2.receivedMsg));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receive(Message msg)
|
||||
{
|
||||
ByteArrayInputStream bytes = new ByteArrayInputStream(msg.getBuffer());
|
||||
ObjectInput in;
|
||||
try
|
||||
{
|
||||
in = new ObjectInputStream(bytes);
|
||||
String payload = (String) in.readObject();
|
||||
in.close();
|
||||
bytes.close();
|
||||
helper.setReceivedMsg(payload);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new RuntimeException("Couldn't receive object.", e);
|
||||
}
|
||||
catch (ClassNotFoundException e)
|
||||
{
|
||||
throw new RuntimeException("Couldn't receive object.", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class TestMessageReceiver implements MessageReceiver<String>
|
||||
{
|
||||
String receivedMsg;
|
||||
|
||||
@Override
|
||||
public void onReceive(String message)
|
||||
{
|
||||
receivedMsg = message;
|
||||
}
|
||||
}
|
||||
}
|
34
source/java/org/alfresco/repo/cluster/MessageReceiver.java
Normal file
34
source/java/org/alfresco/repo/cluster/MessageReceiver.java
Normal file
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Implement this interface and supply to a {@link Messenger} using
|
||||
* {@link Messenger#setReceiver(MessageReceiver)} in order to receive
|
||||
* messages from other {@link Messenger}s.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public interface MessageReceiver<T extends Serializable>
|
||||
{
|
||||
void onReceive(T message);
|
||||
}
|
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
/**
|
||||
* Thrown when unable to send a message using a {@link Messenger}.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class MessageSendingException extends RuntimeException
|
||||
{
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public MessageSendingException(Throwable e)
|
||||
{
|
||||
super("Unable to send message", e);
|
||||
}
|
||||
}
|
41
source/java/org/alfresco/repo/cluster/Messenger.java
Normal file
41
source/java/org/alfresco/repo/cluster/Messenger.java
Normal file
@@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Provides facilities for peer-to-peer messaging within a cluster. This interface
|
||||
* is intended to act as a facade, allowing the actual implementation (e.g. JGroups
|
||||
* or Hazelcast) to be decoupled as much as possible from the Alfresco code base.
|
||||
* <p>
|
||||
* Instances of this class are parameterised with the type of message payload
|
||||
* to send and receive.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public interface Messenger<T extends Serializable>
|
||||
{
|
||||
void send(T message);
|
||||
|
||||
void setReceiver(MessageReceiver<T> receiver);
|
||||
|
||||
boolean isConnected();
|
||||
}
|
32
source/java/org/alfresco/repo/cluster/MessengerFactory.java
Normal file
32
source/java/org/alfresco/repo/cluster/MessengerFactory.java
Normal file
@@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Factory class responsible for creating instances of {@link Messenger} class.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public interface MessengerFactory
|
||||
{
|
||||
<T extends Serializable> Messenger<T> createMessenger();
|
||||
}
|
@@ -0,0 +1,91 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Helper class for testing Messenger related code.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class MessengerTestHelper
|
||||
{
|
||||
private String receivedMsg;
|
||||
private final static int SLEEP_MILLIS = 5;
|
||||
|
||||
|
||||
public MessengerTestHelper()
|
||||
{
|
||||
setReceivedMsg(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to avoid intermitten test failures by trying multiple times. Hopefully the messge
|
||||
* will have been received after the very first sleep, but in a slow environment it may take longer.
|
||||
* This also allows the sleep time to be lower - rather than waiting for say 500 ms, we can try 10 times
|
||||
* at 5 ms - with a chance that we can return after the initial 5 ms.
|
||||
*
|
||||
* @param expectedMsg
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void checkMessageReceivedWas(String expectedMsg)
|
||||
{
|
||||
int tries = 0;
|
||||
|
||||
while (tries < 10)
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(SLEEP_MILLIS);
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
// Carry on
|
||||
e.printStackTrace();
|
||||
}
|
||||
if (getReceivedMsg() != null)
|
||||
{
|
||||
assertEquals(expectedMsg, getReceivedMsg());
|
||||
return;
|
||||
}
|
||||
tries++;
|
||||
}
|
||||
fail("No message received, tried " + tries +
|
||||
" times, sleeping " + SLEEP_MILLIS + "ms each time.");
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the receivedMsg
|
||||
*/
|
||||
public String getReceivedMsg()
|
||||
{
|
||||
return this.receivedMsg;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param receivedMsg the receivedMsg to set
|
||||
*/
|
||||
public void setReceivedMsg(String receivedMsg)
|
||||
{
|
||||
this.receivedMsg = receivedMsg;
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user