Messaging facade: finished changing jgroups heartbeating over to facade.

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@33886 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Matt Ward
2012-02-14 12:27:57 +00:00
parent 4f1d0adc4d
commit e40bd92c21
16 changed files with 211 additions and 14 deletions

View File

@@ -182,6 +182,11 @@
<bean id="messengerFactory" <bean id="messengerFactory"
class="org.alfresco.repo.cluster.JGroupsMessengerFactory" class="org.alfresco.repo.cluster.JGroupsMessengerFactory"
depends-on="jgroupsChannelFactory"/> depends-on="jgroupsChannelFactory"/>
<!-- Set up this class for static access, don't use this for DI - use messengerFactory instead -->
<bean id="staticMessengerFactoryProvider" class="org.alfresco.repo.cluster.MessengerFactoryProvider">
<property name="instance" ref="messengerFactory"/>
</bean>
<bean id="encryptionChecker" class="org.alfresco.encryption.EncryptionChecker"> <bean id="encryptionChecker" class="org.alfresco.encryption.EncryptionChecker">
<property name="transactionService" ref="transactionService"/> <property name="transactionService" ref="transactionService"/>

View File

@@ -13,7 +13,8 @@
<!-- EH Cache Manager to produce shared EH Caches --> <!-- EH Cache Manager to produce shared EH Caches -->
<!-- ============================================ --> <!-- ============================================ -->
<bean name="internalEHCacheManager" class="org.alfresco.repo.cache.InternalEhCacheManagerFactoryBean" depends-on="clusterPropertySetter" /> <bean name="internalEHCacheManager" class="org.alfresco.repo.cache.InternalEhCacheManagerFactoryBean"
depends-on="staticMessengerFactoryProvider,clusterPropertySetter" />
<!-- ===================================== --> <!-- ===================================== -->

View File

@@ -0,0 +1,40 @@
/*
* Copyright (C) 2005-2012 Alfresco Software Limited.
*
* This file is part of Alfresco
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
*/
package org.alfresco.repo.cluster;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
/**
* Test suite for the org.alfresco.repo.cluster package.
*
* @author Matt Ward
*/
@RunWith(Suite.class)
@SuiteClasses({
org.alfresco.repo.cluster.HazelcastMessengerFactoryTest.class,
org.alfresco.repo.cluster.HazelcastMessengerTest.class,
org.alfresco.repo.cluster.HazelcastTest.class,
org.alfresco.repo.cluster.JGroupsMessengerTest.class,
org.alfresco.repo.cluster.JGroupsTest.class
})
public class ClusterTestSuite
{
}

View File

@@ -36,14 +36,15 @@ public class HazelcastMessenger<T extends Serializable> implements Messenger<T>,
{ {
private ITopic<T> topic; private ITopic<T> topic;
private MessageReceiver<T> receiverDelegate; private MessageReceiver<T> receiverDelegate;
private String address;
/** /**
* @param topic * @param topic
*/ */
public HazelcastMessenger(ITopic<T> topic) public HazelcastMessenger(ITopic<T> topic, String address)
{ {
this.topic = topic; this.topic = topic;
this.address = address;
} }
@@ -79,4 +80,11 @@ public class HazelcastMessenger<T extends Serializable> implements Messenger<T>,
{ {
return topic; return topic;
} }
@Override
public String getAddress()
{
return address;
}
} }

View File

@@ -21,6 +21,10 @@ package org.alfresco.repo.cluster;
import java.io.Serializable; import java.io.Serializable;
import org.springframework.util.StringUtils;
import com.hazelcast.config.Config;
import com.hazelcast.config.GroupConfig;
import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic; import com.hazelcast.core.ITopic;
@@ -39,7 +43,8 @@ public class HazelcastMessengerFactory implements MessengerFactory
public <T extends Serializable> Messenger<T> createMessenger(String appRegion) public <T extends Serializable> Messenger<T> createMessenger(String appRegion)
{ {
ITopic<T> topic = hazelcast.getTopic(appRegion); ITopic<T> topic = hazelcast.getTopic(appRegion);
return new HazelcastMessenger<T>(topic); String address = hazelcast.getCluster().getLocalMember().getInetSocketAddress().toString();
return new HazelcastMessenger<T>(topic, address);
} }
/** /**
@@ -49,4 +54,16 @@ public class HazelcastMessengerFactory implements MessengerFactory
{ {
this.hazelcast = hazelcast; this.hazelcast = hazelcast;
} }
@Override
public boolean isClusterActive()
{
Config config = hazelcast.getConfig();
if (config == null || config.getGroupConfig() == null)
{
return false;
}
GroupConfig groupConfig = config.getGroupConfig();
return StringUtils.hasText(groupConfig.getName());
}
} }

View File

@@ -19,17 +19,24 @@
package org.alfresco.repo.cluster; package org.alfresco.repo.cluster;
import static org.junit.Assert.*;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.net.InetSocketAddress;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import com.hazelcast.config.Config;
import com.hazelcast.config.GroupConfig;
import com.hazelcast.core.Cluster;
import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic; import com.hazelcast.core.ITopic;
import com.hazelcast.core.Member;
/** /**
@@ -41,23 +48,45 @@ import com.hazelcast.core.ITopic;
public class HazelcastMessengerFactoryTest public class HazelcastMessengerFactoryTest
{ {
private HazelcastMessengerFactory factory; private HazelcastMessengerFactory factory;
private GroupConfig groupConfig;
private @Mock HazelcastInstance hazelcast; private @Mock HazelcastInstance hazelcast;
private @Mock Member member;
private @Mock Cluster cluster;
private @Mock ITopic<String> topic; private @Mock ITopic<String> topic;
private @Mock Config config;
@Before @Before
public void setUp() public void setUp()
{ {
factory = new HazelcastMessengerFactory(); factory = new HazelcastMessengerFactory();
factory.setHazelcast(hazelcast); factory.setHazelcast(hazelcast);
groupConfig = new GroupConfig();
} }
@Test @Test
public void topicWrappedInMessenger() public void topicWrappedInMessenger()
{ {
when(hazelcast.<String>getTopic("app-region")).thenReturn(topic); when(hazelcast.<String>getTopic("app-region")).thenReturn(topic);
when(hazelcast.getCluster()).thenReturn(cluster);
when(cluster.getLocalMember()).thenReturn(member);
when(member.getInetSocketAddress()).thenReturn(InetSocketAddress.createUnresolved("a-host-name", 1234));
Messenger<String> messenger = factory.createMessenger("app-region"); Messenger<String> messenger = factory.createMessenger("app-region");
assertSame(topic, ((HazelcastMessenger<String>) messenger).getTopic()); assertSame(topic, ((HazelcastMessenger<String>) messenger).getTopic());
assertEquals("a-host-name:1234", messenger.getAddress());
}
@Test
public void canCheckClusterIsActive()
{
when(hazelcast.getConfig()).thenReturn(config);
when(config.getGroupConfig()).thenReturn(groupConfig);
groupConfig.setName("my-cluster-name");
assertEquals(true, factory.isClusterActive());
groupConfig.setName("");
assertEquals(false, factory.isClusterActive());
} }
} }

View File

@@ -45,7 +45,7 @@ public class HazelcastMessengerTest
@Before @Before
public void setUp() public void setUp()
{ {
messenger = new HazelcastMessenger<String>(topic); messenger = new HazelcastMessenger<String>(topic, "address");
receivedMsg = null; receivedMsg = null;
} }

View File

@@ -57,6 +57,7 @@ public class HazelcastTest implements MessageListener<String>
public static void tearDownClass() public static void tearDownClass()
{ {
ApplicationContextHelper.closeApplicationContext(); ApplicationContextHelper.closeApplicationContext();
Hazelcast.shutdownAll();
} }
@Before @Before

View File

@@ -28,6 +28,8 @@ import java.io.ObjectOutput;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.io.Serializable; import java.io.Serializable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Channel; import org.jgroups.Channel;
import org.jgroups.Message; import org.jgroups.Message;
import org.jgroups.ReceiverAdapter; import org.jgroups.ReceiverAdapter;
@@ -41,7 +43,7 @@ public class JGroupsMessenger<T extends Serializable> extends ReceiverAdapter im
{ {
private final Channel channel; private final Channel channel;
private MessageReceiver<T> receiverDelegate; private MessageReceiver<T> receiverDelegate;
private final static Log logger = LogFactory.getLog(JGroupsMessenger.class);
/** /**
* Construct a messenger that wraps a JGroups Channel. * Construct a messenger that wraps a JGroups Channel.
@@ -66,6 +68,10 @@ public class JGroupsMessenger<T extends Serializable> extends ReceiverAdapter im
out.writeObject(message); out.writeObject(message);
out.close(); out.close();
bytes.close(); bytes.close();
if (logger.isTraceEnabled())
{
logger.trace("Sending " + message);
}
channel.send(null, null, bytes.toByteArray()); channel.send(null, null, bytes.toByteArray());
} }
catch (Throwable e) catch (Throwable e)
@@ -105,6 +111,10 @@ public class JGroupsMessenger<T extends Serializable> extends ReceiverAdapter im
T payload = (T) in.readObject(); T payload = (T) in.readObject();
in.close(); in.close();
bytes.close(); bytes.close();
if (logger.isTraceEnabled())
{
logger.trace("Received (will be delegated to receiver): " + payload);
}
// Pass the deserialized payload on to the receiver delegate // Pass the deserialized payload on to the receiver delegate
receiverDelegate.onReceive(payload); receiverDelegate.onReceive(payload);
} }
@@ -124,4 +134,11 @@ public class JGroupsMessenger<T extends Serializable> extends ReceiverAdapter im
{ {
return channel.isConnected(); return channel.isConnected();
} }
@Override
public String getAddress()
{
return channel.getAddress().toString();
}
} }

View File

@@ -38,5 +38,11 @@ public class JGroupsMessengerFactory implements MessengerFactory
ParameterCheck.mandatory("appRegion", appRegion); ParameterCheck.mandatory("appRegion", appRegion);
Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion); Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
return new JGroupsMessenger<T>(channel); return new JGroupsMessenger<T>(channel);
}
@Override
public boolean isClusterActive()
{
return AlfrescoJGroupsChannelFactory.isClusterActive();
} }
} }

View File

@@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.ObjectOutput; import java.io.ObjectOutput;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import org.jgroups.Address;
import org.jgroups.Channel; import org.jgroups.Channel;
import org.jgroups.ChannelClosedException; import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelNotConnectedException; import org.jgroups.ChannelNotConnectedException;
@@ -102,6 +103,15 @@ public class JGroupsMessengerTest
assertEquals(false, messenger.isConnected()); assertEquals(false, messenger.isConnected());
} }
@Test
public void canDelegateGetAddress()
{
Address address = Mockito.mock(Address.class);
Mockito.when(address.toString()).thenReturn("an-address");
Mockito.when(channel.getAddress()).thenReturn(address);
assertEquals("an-address", messenger.getAddress());
}
private byte[] serialize(String text) throws IOException private byte[] serialize(String text) throws IOException
{ {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(); ByteArrayOutputStream bytes = new ByteArrayOutputStream();

View File

@@ -120,6 +120,18 @@ public class JGroupsTest extends ReceiverAdapter
helper.checkMessageReceivedWas("This message was sent with jgroups"); helper.checkMessageReceivedWas("This message was sent with jgroups");
} }
@Test
public void canCheckIsClusterActive()
{
JGroupsMessengerFactory messengerFactory = new JGroupsMessengerFactory();
AlfrescoJGroupsChannelFactory.changeClusterNamePrefix(null);
assertEquals(false, messengerFactory.isClusterActive());
AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("my-cluster-name");
assertEquals(true, messengerFactory.isClusterActive());
}
@Ignore("Not currently allowing multiple receivers per underlying Channel") @Ignore("Not currently allowing multiple receivers per underlying Channel")
@Test @Test
public void canReceiveFromMultipleMessageReceivers() throws InterruptedException, ChannelException public void canReceiveFromMultipleMessageReceivers() throws InterruptedException, ChannelException

View File

@@ -38,4 +38,6 @@ public interface Messenger<T extends Serializable>
void setReceiver(MessageReceiver<T> receiver); void setReceiver(MessageReceiver<T> receiver);
boolean isConnected(); boolean isConnected();
String getAddress();
} }

View File

@@ -27,6 +27,14 @@ import java.io.Serializable;
* @author Matt Ward * @author Matt Ward
*/ */
public interface MessengerFactory public interface MessengerFactory
{ {
/** A catch-all for unknown application regions. */
public static final String APP_REGION_DEFAULT = "DEFAULT";
/** The application region used by the EHCache heartbeat implementation over JGroups. */
public static final String APP_REGION_EHCACHE_HEARTBEAT = "EHCACHE_HEARTBEAT";
<T extends Serializable> Messenger<T> createMessenger(String appRegion); <T extends Serializable> Messenger<T> createMessenger(String appRegion);
boolean isClusterActive();
} }

View File

@@ -0,0 +1,44 @@
/*
* Copyright (C) 2005-2012 Alfresco Software Limited.
*
* This file is part of Alfresco
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
*/
package org.alfresco.repo.cluster;
/**
* Static container for MessengerFactory. This allows code to obtain the correct
* {@link MessengerFactory} implementation where dependency injection is not available.
*
* @author Matt Ward
*/
public class MessengerFactoryProvider
{
private static MessengerFactory instance;
public void setInstance(MessengerFactory messengerFactory)
{
instance = messengerFactory;
}
public static MessengerFactory getInstance()
{
if (instance == null)
{
throw new IllegalStateException("MessengerFactory instance not configured yet.");
}
return instance;
}
}

View File

@@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.alfresco.error.AlfrescoRuntimeException; import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.repo.cluster.MessengerFactory;
import org.alfresco.util.PropertyCheck; import org.alfresco.util.PropertyCheck;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@@ -68,10 +69,6 @@ import org.springframework.util.ResourceUtils;
*/ */
public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean
{ {
/** A catch-all for unknown application regions. */
public static final String APP_REGION_DEFAULT = "DEFAULT";
/** The application region used by the EHCache heartbeat implementation over JGroups. */
public static final String APP_REGION_EHCACHE_HEARTBEAT = "EHCACHE_HEARTBEAT";
/** The UDP protocol config (default) */ /** The UDP protocol config (default) */
public static final String DEFAULT_CONFIG_UDP = "classpath:alfresco/jgroups/alfresco-jgroups-UDP.xml"; public static final String DEFAULT_CONFIG_UDP = "classpath:alfresco/jgroups/alfresco-jgroups-UDP.xml";
/** The TCP protocol config */ /** The TCP protocol config */
@@ -102,7 +99,7 @@ public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean
clusterNamePrefix = null; clusterNamePrefix = null;
configUrlsByAppRegion = new HashMap<String, String>(5); configUrlsByAppRegion = new HashMap<String, String>(5);
configUrlsByAppRegion.put( configUrlsByAppRegion.put(
AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT, MessengerFactory.APP_REGION_DEFAULT,
AlfrescoJGroupsChannelFactory.DEFAULT_CONFIG_UDP); AlfrescoJGroupsChannelFactory.DEFAULT_CONFIG_UDP);
} }
@@ -167,7 +164,7 @@ public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean
String configUrlStr = configUrlsByAppRegion.get(appRegion); String configUrlStr = configUrlsByAppRegion.get(appRegion);
if (!PropertyCheck.isValidPropertyString(configUrlStr)) if (!PropertyCheck.isValidPropertyString(configUrlStr))
{ {
configUrlStr = configUrlsByAppRegion.get(AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT); configUrlStr = configUrlsByAppRegion.get(MessengerFactory.APP_REGION_DEFAULT);
} }
if (configUrlStr == null) if (configUrlStr == null)
{ {
@@ -432,7 +429,7 @@ public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean
try try
{ {
// Check that there is a mapping for default // Check that there is a mapping for default
if (!configUrlsByAppRegion.containsKey(AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT)) if (!configUrlsByAppRegion.containsKey(MessengerFactory.APP_REGION_DEFAULT))
{ {
throw new AlfrescoRuntimeException("A configuration URL must be defined for 'DEFAULT'"); throw new AlfrescoRuntimeException("A configuration URL must be defined for 'DEFAULT'");
} }