(5);
+ }
+
+ /**
+ *
+ * @param clusterName the name of the cluster. The effectively groups the
+ * machines that will be interacting with each other.
+ * @param applicationRegion the application region.
+ * @return
+ */
+ public static JChannel getChannel(String clusterName, String applicationRegion)
+ {
+ /*
+ * Synchronization is handled by the Map
+ */
+ StringBuilder sb = new StringBuilder(100).append(clusterName).append("-").append(applicationRegion);
+ String fullClusterName = sb.toString();
+ // Get the channel
+ JChannel channel = channels.get(fullClusterName);
+ }
+
+ private static synchronized void newChannel(String clusterName, String clusterRegion)
+ {
+
+ }
+
+ /**
+ * TODO: Make into a Spring factory bean so that it can be used as a direct bean reference
+ */
+ private AlfrescoJChannelFactory()
+ {
+ }
+
+}
diff --git a/source/java/org/alfresco/repo/jgroups/AlfrescoJChannelFactoryTest.java b/source/java/org/alfresco/repo/jgroups/AlfrescoJChannelFactoryTest.java
new file mode 100644
index 0000000000..adbf6c6ce3
--- /dev/null
+++ b/source/java/org/alfresco/repo/jgroups/AlfrescoJChannelFactoryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2005-2008 Alfresco Software Limited.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
+ * As a special exception to the terms and conditions of version 2.0 of
+ * the GPL, you may redistribute this Program in connection with Free/Libre
+ * and Open Source Software ("FLOSS") applications as described in Alfresco's
+ * FLOSS exception. You should have recieved a copy of the text describing
+ * the FLOSS exception, and it is also available here:
+ * http://www.alfresco.com/legal/licensing"
+ */
+package org.alfresco.repo.jgroups;
+
+import org.jgroups.Channel;
+import org.jgroups.Message;
+
+import junit.framework.TestCase;
+
+/**
+ * @see AlfrescoJChannelFactoryTest
+ *
+ * @author Derek Hulley
+ * @since 2.1.3
+ */
+public class AlfrescoJChannelFactoryTest extends TestCase
+{
+ private static byte[] bytes = new byte[65536];
+ static
+ {
+ for (int i = 0; i < bytes.length; i++)
+ {
+ bytes[i] = 1;
+ }
+ }
+
+ private AlfrescoJGroupsChannelFactory factory;
+ private String appRegion;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ factory = new AlfrescoJGroupsChannelFactory();
+ appRegion = getName();
+ }
+
+ /**
+ * Check that the channel is behaving
+ */
+ private void stressChannel(Channel channel) throws Exception
+ {
+ System.out.println("Test: " + getName());
+ System.out.println(" Channel: " + channel);
+ System.out.println(" Cluster: " + channel.getClusterName());
+ channel.send(null, null, Boolean.TRUE);
+ channel.send(new Message(null, null, bytes));
+ }
+
+ public void testNoCluster() throws Exception
+ {
+ Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
+ stressChannel(channel);
+ }
+
+ public void testBasicCluster() throws Exception
+ {
+ AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("blah");
+ Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
+ stressChannel(channel);
+ }
+
+ public void testHotSwapCluster() throws Exception
+ {
+ AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("ONE");
+ Channel channel1 = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
+ stressChannel(channel1);
+ AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("TWO");
+ Channel channel2 = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
+ stressChannel(channel1);
+ assertTrue("Channel reference must be the same", channel1 == channel2);
+ }
+}
diff --git a/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java
new file mode 100644
index 0000000000..4452d845c5
--- /dev/null
+++ b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java
@@ -0,0 +1,862 @@
+/*
+ * Copyright (C) 2005-2008 Alfresco Software Limited.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
+ * As a special exception to the terms and conditions of version 2.0 of
+ * the GPL, you may redistribute this Program in connection with Free/Libre
+ * and Open Source Software ("FLOSS") applications as described in Alfresco's
+ * FLOSS exception. You should have recieved a copy of the text describing
+ * the FLOSS exception, and it is also available here:
+ * http://www.alfresco.com/legal/licensing"
+ */
+package org.alfresco.repo.jgroups;
+
+import java.io.FileNotFoundException;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.alfresco.error.AlfrescoRuntimeException;
+import org.alfresco.util.AbstractLifecycleBean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.ChannelClosedException;
+import org.jgroups.ChannelException;
+import org.jgroups.ChannelListener;
+import org.jgroups.ChannelNotConnectedException;
+import org.jgroups.Event;
+import org.jgroups.JChannel;
+import org.jgroups.JChannelFactory;
+import org.jgroups.Message;
+import org.jgroups.Receiver;
+import org.jgroups.TimeoutException;
+import org.jgroups.UpHandler;
+import org.jgroups.View;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.util.ResourceUtils;
+
+/**
+ * A cache peer provider that does heartbeat sending and receiving using JGroups.
+ *
+ * The cluster name needs to be set before any communication is possible. This can be done using the
+ * system property
+ * {@link #PROP_CLUSTER_NAME_PREFIX -Dalfresco.cluster-name-prefix}=MyCluster
+ * or by declaring a bean
+ *
+ *
+ *
+ * MyCluster
+ *
+ *
+ *
+ *
+ * The channels provided to the callers will be proxies to underlying channels that will be hot-swappable.
+ * This means that the client code can continue to use the channel references while the actual
+ * implementation can be switched in and out as required.
+ *
+ * @see #PROP_CLUSTER_NAME_PREFIX
+ *
+ * @author Derek Hulley
+ * @since 2.1.3
+ */
+public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean
+{
+ /** A catch-all for unknown application regions. */
+ public static final String APP_REGION_DEFAULT = "DEFAULT";
+ /** The application region used by the EHCache heartbeat implementation over JGroups. */
+ public static final String APP_REGION_EHCACHE_HEARTBEAT = "EHCACHE_HEARTBEAT";
+ /** The UDP protocol stack (default) */
+ public static final String PROTOCOL_STACK_UDP = "UDP";
+ /** The TCP protocol stack */
+ public static final String PROTOCOL_STACK_TCP = "TCP";
+
+
+ public static final String PROP_CLUSTER_NAME_PREFIX = "alfresco.cluster-name-prefix";
+ public static final String CUSTOM_CONFIGURATION_FILE = "classpath:alfresco/extension/jgroups-custom.xml";
+ public static final String DEFAULT_CONFIGURATION_FILE = "classpath:alfresco/jgroups-default.xml";
+
+ private static Log logger = LogFactory.getLog(AlfrescoJGroupsChannelFactory.class);
+
+ // Synchronization locks
+ private static ReadLock readLock;
+ private static WriteLock writeLock;
+
+ // Values that are modified by the bean implementation
+ private static String clusterNamePrefix;
+ private static URL configUrl;
+ private static Map stacksByAppRegion;
+
+ // Derived data
+ /** A map that stores channel information by the application region. */
+ private static final Map channels;
+ private static JChannelFactory channelFactory;
+
+ static
+ {
+ ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); // Fair
+ readLock = readWriteLock.readLock();
+ writeLock = readWriteLock.writeLock();
+
+ channels = new HashMap(5);
+
+ clusterNamePrefix = null;
+ configUrl = null;
+ stacksByAppRegion = new HashMap(5);
+ stacksByAppRegion.put(
+ AlfrescoJGroupsChannelFactory.APP_REGION_EHCACHE_HEARTBEAT,
+ AlfrescoJGroupsChannelFactory.PROTOCOL_STACK_UDP);
+ stacksByAppRegion.put(
+ AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT,
+ AlfrescoJGroupsChannelFactory.PROTOCOL_STACK_UDP);
+ }
+
+ /**
+ * Check if a cluster name was provided.
+ *
+ * @return Returns true if the cluster configuration is active,
+ * i.e. a cluster name was provided
+ */
+ public static boolean isClusterActive()
+ {
+ readLock.lock();
+ try
+ {
+ return clusterNamePrefix != null;
+ }
+ finally
+ {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Close all channels. All the channels will continue to function, but will be replaced
+ * internally with dummy channels. Effectively, all the cluster communications will be
+ * closed down.
+ */
+ private static void closeChannels()
+ {
+ changeClusterNamePrefix(null);
+ }
+
+ /**
+ * Creates a channel for the cluster. This method should not be heavily used
+ * as the checks and synchronizations will slow the calls. Returns channels can be
+ * kept and will be modified directly using the factory-held references, if necessary.
+ *
+ * The application region is used to determine the protocol stack to apply.
+ *
+ * This method returns a dummy channel if no cluster name has been provided.
+ *
+ * @param appRegion the application region identifier.
+ * @return Returns a channel
+ */
+ public static Channel getChannel(String appRegion)
+ {
+ readLock.lock();
+ try
+ {
+ ChannelProxy channelProxy = channels.get(appRegion);
+ if (channelProxy != null)
+ {
+ // This will do
+ return channelProxy;
+ }
+ }
+ finally
+ {
+ readLock.unlock();
+ }
+ // Being here means that there is no channel yet
+ // Go write
+ writeLock.lock();
+ try
+ {
+ // Double check
+ ChannelProxy channelProxy = channels.get(appRegion);
+ if (channelProxy != null)
+ {
+ // This will do
+ return channelProxy;
+ }
+ // Get the channel
+ Channel channel = getChannelInternal(appRegion);
+ // Proxy the channel
+ channelProxy = new ChannelProxy(channel);
+ // Store the channel to the map
+ channels.put(appRegion, channelProxy);
+ // Done
+ return channelProxy;
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Creates a channel for the given cluster. The application region is used
+ * to determine the protocol stack to apply.
+ *
+ * @param appRegion the application region identifier.
+ * @return Returns a channel
+ */
+ /* All calls to this are ultimately wrapped in the writeLock. */
+ private static /*synchronized*/ Channel getChannelInternal(String appRegion)
+ {
+ Channel channel;
+ // If there is no cluster defined (yet) then we define a dummy channel
+ if (AlfrescoJGroupsChannelFactory.clusterNamePrefix == null)
+ {
+ try
+ {
+ channel = new DummyJChannel();
+ }
+ catch (Throwable e)
+ {
+ throw new AlfrescoRuntimeException(
+ "Failed to create dummy JGroups channel: \n" +
+ " Cluster prefix: " + clusterNamePrefix + "\n" +
+ " App region: " + appRegion,
+ e);
+ }
+ }
+ else // Create real channel
+ {
+ JChannelFactory channelFactory = getChannelFactory();
+ // Get the protocol stack to use
+ String stack = stacksByAppRegion.get(appRegion);
+ if (stack == null)
+ {
+ stack = stacksByAppRegion.get(AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT);
+ }
+ if (stack == null)
+ {
+ throw new AlfrescoRuntimeException(
+ "No protocol stack was found for application region: \n" +
+ " Cluster prefix: " + clusterNamePrefix + "\n" +
+ " App region: " + appRegion + "\n" +
+ " Regions defined: " + stacksByAppRegion);
+ }
+ try
+ {
+ // Get the stack config from the factory (we are not using MUX)
+ String config = channelFactory.getConfig(stack);
+ channel = new JChannel(config);
+ }
+ catch (Throwable e)
+ {
+ throw new AlfrescoRuntimeException(
+ "Failed to create JGroups channel: \n" +
+ " Cluster prefix: " + clusterNamePrefix + "\n" +
+ " App region: " + appRegion + "\n" +
+ " Protocol stack: " + stack + "\n" +
+ " Configuration URL: " + AlfrescoJGroupsChannelFactory.configUrl,
+ e);
+ }
+ }
+ // Initialise the channel
+ try
+ {
+ String clusterName = clusterNamePrefix + ":" + appRegion;
+ // Set reconnect property
+ channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
+ // Don't accept messages from self
+ channel.setOpt(Channel.LOCAL, Boolean.FALSE);
+ // No state transfer
+ channel.setOpt(Channel.AUTO_GETSTATE, Boolean.FALSE);
+ // Connect
+ channel.connect(clusterName, null, null, 5000L);
+ // Done
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("\n" +
+ "Created JGroups channel: \n" +
+ " Cluster prefix: " + clusterNamePrefix + "\n" +
+ " App region: " + appRegion + "\n" +
+ " Channel: " + channel + "\n" +
+ " Configuration URL: " + AlfrescoJGroupsChannelFactory.configUrl);
+ }
+ }
+ catch (Throwable e)
+ {
+ throw new AlfrescoRuntimeException(
+ "Failed to initialise JGroups channel: \n" +
+ " Cluster prefix: " + clusterNamePrefix + "\n" +
+ " App region: " + appRegion + "\n" +
+ " Channel: " + channel + "\n" +
+ " Configuration URL: " + AlfrescoJGroupsChannelFactory.configUrl,
+ e);
+ }
+ return channel;
+ }
+
+ /**
+ * Builds and initializes a JChannelFactory
+ */
+ /* All calls to this are ultimately wrapped in the writeLock. */
+ private static /*synchronized*/ JChannelFactory getChannelFactory()
+ {
+ if (AlfrescoJGroupsChannelFactory.channelFactory != null)
+ {
+ return AlfrescoJGroupsChannelFactory.channelFactory;
+ }
+ // Set the config location to use
+ if (AlfrescoJGroupsChannelFactory.configUrl == null)
+ {
+ // This was not set by the bean so set it using the default mechanism
+ try
+ {
+ AlfrescoJGroupsChannelFactory.configUrl = ResourceUtils.getURL(CUSTOM_CONFIGURATION_FILE);
+ }
+ catch (FileNotFoundException e)
+ {
+ // try the alfresco default
+ try
+ {
+ AlfrescoJGroupsChannelFactory.configUrl = ResourceUtils.getURL(DEFAULT_CONFIGURATION_FILE);
+ }
+ catch (FileNotFoundException ee)
+ {
+ throw new AlfrescoRuntimeException("Missing default JGroups config: " + DEFAULT_CONFIGURATION_FILE);
+ }
+ }
+ }
+ try
+ {
+ // Construct factory
+ AlfrescoJGroupsChannelFactory.channelFactory = new JChannelFactory();
+ channelFactory.setMultiplexerConfig(AlfrescoJGroupsChannelFactory.configUrl);
+ }
+ catch (Throwable e)
+ {
+ throw new AlfrescoRuntimeException(
+ "Failed to construct JChannelFactory using config: " + AlfrescoJGroupsChannelFactory.configUrl,
+ e);
+ }
+ // done
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("\n" +
+ "Created JChannelFactory: \n" +
+ " configuration: " + AlfrescoJGroupsChannelFactory.configUrl);
+ }
+ return AlfrescoJGroupsChannelFactory.channelFactory;
+ }
+
+ /**
+ * Throw away all calculated values and rebuild. This means that the channel factory will
+ * be reconstructed from scratch. All the channels are reconstructed - but this will not
+ * affect any references to channels held outside this class as the values returned are proxies
+ * on top of hot swappable implementations.
+ */
+ /* All calls to this are ultimately wrapped in the writeLock. */
+ private static /*synchronized*/ void rebuildChannels()
+ {
+ // First throw away the channel factory. It will be fetched lazily.
+ AlfrescoJGroupsChannelFactory.channelFactory = null;
+
+ // Reprocess all the application regions with the new data
+ for (Map.Entry entry : channels.entrySet())
+ {
+ String appRegion = entry.getKey();
+ ChannelProxy channelProxy = entry.getValue();
+
+ // Create the new channel
+ Channel newChannel = getChannelInternal(appRegion);
+
+ // Now do the hot-swap
+ Channel oldChannel = channelProxy.swap(newChannel);
+ // Close the old channel
+ try
+ {
+ oldChannel.close();
+ }
+ catch (Throwable e)
+ {
+ logger.warn(
+ "Unable to close old channel during channel rebuild: \n" +
+ " Old channel: " + oldChannel,
+ e);
+ }
+ }
+ }
+
+ /**
+ * Set the prefix used to identify the different clusters. Each application region will
+ * have a separate cluster name that will be:
+ *
+ * clusterNamePrefix:appRegion
+ *
+ * If no cluster name prefix is declared, the cluster is effectively disabled.
+ *
+ * @param clusterNamePrefix a prefix to append to the cluster names used
+ */
+ public static void changeClusterNamePrefix(String clusterNamePrefix)
+ {
+ writeLock.lock();
+ try
+ {
+ if (clusterNamePrefix == null || clusterNamePrefix.length() == 0)
+ {
+ // Clear everything out
+ AlfrescoJGroupsChannelFactory.clusterNamePrefix = null;
+ }
+ AlfrescoJGroupsChannelFactory.clusterNamePrefix = clusterNamePrefix;
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Configure a mapping between the application regions and the available JGroup protocol stacks.
+ * The map must contain a mapping for application region 'DEFAULT'.
+ *
+ * @param protocolMap a mapping from application region (keys) to protocol stacks (values)
+ */
+ public static void changeProtocolStackMapping(Map protocolMap)
+ {
+ writeLock.lock();
+ try
+ {
+ // Check that there is a mapping for default
+ if (!protocolMap.containsKey(AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT))
+ {
+ throw new AlfrescoRuntimeException("A protocol stack must be defined for 'DEFAULT'");
+ }
+ stacksByAppRegion = protocolMap;
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Set the URL location of the JGroups configuration file. This must refer to a MUX-compatible
+ * configuration file.
+ *
+ * @param configUrl a url of the form file:... or classpath:
+ */
+ public static void changeJgroupsConfigurationUrl(String configUrl)
+ {
+ writeLock.lock();
+ try
+ {
+ AlfrescoJGroupsChannelFactory.configUrl = ResourceUtils.getURL(configUrl);
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new AlfrescoRuntimeException(
+ "Failed to set property 'jgroupsConfigurationUrl'. The url is invalid: " + configUrl,
+ e);
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Bean-enabling constructor
+ */
+ public AlfrescoJGroupsChannelFactory()
+ {
+ }
+
+ /**
+ * @see AlfrescoJGroupsChannelFactory#changeClusterName(String)
+ */
+ public void setClusterNamePrefix(String clusterNamePrefix)
+ {
+ AlfrescoJGroupsChannelFactory.changeClusterNamePrefix(clusterNamePrefix);
+ }
+
+ /**
+ * @see AlfrescoJGroupsChannelFactory#changeProtocolStackMapping(Map)
+ */
+ public void setProtocolStackMapping(Map protocolMap)
+ {
+ AlfrescoJGroupsChannelFactory.changeProtocolStackMapping(protocolMap);
+ }
+
+ /**
+ * Set the URL location of the JGroups configuration file. This must refer to a MUX-compatible
+ * configuration file.
+ *
+ * @param configUrl a url of the form file:... or classpath:
+ */
+ public void setJgroupsConfigurationUrl(String configUrl)
+ {
+ try
+ {
+ AlfrescoJGroupsChannelFactory.configUrl = ResourceUtils.getURL(configUrl);
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new AlfrescoRuntimeException(
+ "Failed to set property 'jgroupsConfigurationUrl'. The url is invalid: " + configUrl,
+ e);
+ }
+ }
+
+ @Override
+ protected void onBootstrap(ApplicationEvent event)
+ {
+ AlfrescoJGroupsChannelFactory.rebuildChannels();
+ }
+
+ @Override
+ protected void onShutdown(ApplicationEvent event)
+ {
+ AlfrescoJGroupsChannelFactory.closeChannels();
+ }
+
+ /**
+ * A no-op JChannel using the "DUMMY_TP" protocol only
+ *
+ * @author Derek Hulley
+ * @since 2.1.3
+ */
+ private static class DummyJChannel extends JChannel
+ {
+ public DummyJChannel() throws ChannelException
+ {
+ super("DUMMY_TP:UDP(mcast_addr=224.10.10.200;mcast_port=5679)");
+ }
+ }
+
+ /**
+ * A proxy channel that can be used to hot-swap underlying channels. All listeners
+ * and the receiver will be carried over to the new underlying channel when it is
+ * swapped out.
+ *
+ * @author Derek Hulley
+ */
+ @SuppressWarnings("deprecation")
+ public static class ChannelProxy extends Channel
+ {
+ /*
+ * Not synchronizing. Mostly swapping will be VERY rare and if there is a bit
+ * of inconsistency it is not important.
+ */
+ private Channel delegate;
+ private UpHandler delegateUpHandler;
+ private Set delegateChannelListeners;
+ private Receiver delegateReceiver;
+
+ public ChannelProxy(Channel delegate)
+ {
+ this.delegate = delegate;
+ this.delegateChannelListeners = new HashSet(7);
+ }
+
+ /**
+ * Swap the channel. The old delegate will be disconnected before the swap occurs.
+ * This guarantees data consistency, assuming that any failures will be handled.
+ *
+ * @param the new delegate
+ * @return the old, disconnected delegate
+ */
+ public Channel swap(Channel channel)
+ {
+ // Remove the listeners from the old channel
+ delegate.setReceiver(null);
+ for (ChannelListener delegateChannelListener : delegateChannelListeners)
+ {
+ delegate.removeChannelListener(delegateChannelListener);
+ }
+ delegate.setUpHandler(null);
+
+ // Close the old delegate
+ delegate.close();
+ Channel oldDelegage = delegate;
+
+ // Assign the new delegate and carry the listeners over
+ delegate = channel;
+ delegate.setReceiver(delegateReceiver);
+ for (ChannelListener delegateChannelListener : delegateChannelListeners)
+ {
+ delegate.addChannelListener(delegateChannelListener);
+ }
+ delegate.setUpHandler(delegateUpHandler);
+ // Done
+ return oldDelegage;
+ }
+
+ @Override
+ protected Log getLog()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setReceiver(Receiver r)
+ {
+ delegateReceiver = r;
+ delegate.setReceiver(r);
+ }
+
+ public void addChannelListener(ChannelListener listener)
+ {
+ if (listener == null)
+ {
+ return;
+ }
+ delegateChannelListeners.add(listener);
+ delegate.addChannelListener(listener);
+ }
+
+ public void removeChannelListener(ChannelListener listener)
+ {
+ if (listener != null)
+ {
+ delegateChannelListeners.remove(listener);
+ }
+ delegate.removeChannelListener(listener);
+ }
+
+ public void clearChannelListeners()
+ {
+ delegateChannelListeners.clear();
+ delegate.clearChannelListeners();
+ }
+
+ public void setUpHandler(UpHandler up_handler)
+ {
+ delegateUpHandler = up_handler;
+ delegate.setUpHandler(up_handler);
+ }
+
+ public void blockOk()
+ {
+ delegate.blockOk();
+ }
+
+ public void close()
+ {
+ delegate.close();
+ }
+
+ public void connect(String cluster_name, Address target, String state_id, long timeout) throws ChannelException
+ {
+ delegate.connect(cluster_name, target, state_id, timeout);
+ }
+
+ public void connect(String cluster_name) throws ChannelException
+ {
+ delegate.connect(cluster_name);
+ }
+
+ public void disconnect()
+ {
+ delegate.disconnect();
+ }
+
+ public void down(Event evt)
+ {
+ delegate.down(evt);
+ }
+
+ public Object downcall(Event evt)
+ {
+ return delegate.downcall(evt);
+ }
+
+ public String dumpQueue()
+ {
+ return delegate.dumpQueue();
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map dumpStats()
+ {
+ return delegate.dumpStats();
+ }
+
+ public boolean equals(Object obj)
+ {
+ return delegate.equals(obj);
+ }
+
+ public boolean flushSupported()
+ {
+ return delegate.flushSupported();
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException, ChannelClosedException
+ {
+ return delegate.getAllStates(targets, timeout);
+ }
+
+ public String getChannelName()
+ {
+ return delegate.getChannelName();
+ }
+
+ public String getClusterName()
+ {
+ return delegate.getClusterName();
+ }
+
+ public Map getInfo()
+ {
+ return delegate.getInfo();
+ }
+
+ public Address getLocalAddress()
+ {
+ return delegate.getLocalAddress();
+ }
+
+ public int getNumMessages()
+ {
+ return delegate.getNumMessages();
+ }
+
+ public Object getOpt(int option)
+ {
+ return delegate.getOpt(option);
+ }
+
+ public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException
+ {
+ return delegate.getState(target, timeout);
+ }
+
+ public boolean getState(Address target, String state_id, long timeout) throws ChannelNotConnectedException, ChannelClosedException
+ {
+ return delegate.getState(target, state_id, timeout);
+ }
+
+ public View getView()
+ {
+ return delegate.getView();
+ }
+
+ public int hashCode()
+ {
+ return delegate.hashCode();
+ }
+
+ public boolean isConnected()
+ {
+ return delegate.isConnected();
+ }
+
+ public boolean isOpen()
+ {
+ return delegate.isOpen();
+ }
+
+ public void open() throws ChannelException
+ {
+ delegate.open();
+ }
+
+ public Object peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
+ {
+ return delegate.peek(timeout);
+ }
+
+ public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
+ {
+ return delegate.receive(timeout);
+ }
+
+ public void returnState(byte[] state, String state_id)
+ {
+ delegate.returnState(state, state_id);
+ }
+
+ public void returnState(byte[] state)
+ {
+ delegate.returnState(state);
+ }
+
+ public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException, ChannelClosedException
+ {
+ delegate.send(dst, src, obj);
+ }
+
+ public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException
+ {
+ delegate.send(msg);
+ }
+
+ public void setChannelListener(ChannelListener channel_listener)
+ {
+ delegate.setChannelListener(channel_listener);
+ }
+
+ public void setInfo(String key, Object value)
+ {
+ delegate.setInfo(key, value);
+ }
+
+ public void setOpt(int option, Object value)
+ {
+ delegate.setOpt(option, value);
+ }
+
+ public void shutdown()
+ {
+ delegate.shutdown();
+ }
+
+ public boolean startFlush(boolean automatic_resume)
+ {
+ return delegate.startFlush(automatic_resume);
+ }
+
+ public boolean startFlush(List flushParticipants, boolean automatic_resume)
+ {
+ return delegate.startFlush(flushParticipants, automatic_resume);
+ }
+
+ public boolean startFlush(long timeout, boolean automatic_resume)
+ {
+ return delegate.startFlush(timeout, automatic_resume);
+ }
+
+ public void stopFlush()
+ {
+ delegate.stopFlush();
+ }
+
+ public void stopFlush(List flushParticipants)
+ {
+ delegate.stopFlush(flushParticipants);
+ }
+
+ public String toString()
+ {
+ return delegate.toString();
+ }
+ }
+}
diff --git a/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactoryTest.java b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactoryTest.java
new file mode 100644
index 0000000000..2c35095e94
--- /dev/null
+++ b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactoryTest.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (C) 2005-2008 Alfresco Software Limited.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
+ * As a special exception to the terms and conditions of version 2.0 of
+ * the GPL, you may redistribute this Program in connection with Free/Libre
+ * and Open Source Software ("FLOSS") applications as described in Alfresco's
+ * FLOSS exception. You should have recieved a copy of the text describing
+ * the FLOSS exception, and it is also available here:
+ * http://www.alfresco.com/legal/licensing"
+ */
+package org.alfresco.repo.jgroups;
+
+import org.jgroups.Channel;
+import org.jgroups.Message;
+
+import junit.framework.TestCase;
+
+/**
+ * @see AlfrescoJGroupsChannelFactory
+ *
+ * @author Derek Hulley
+ * @since 2.1.3
+ */
+public class AlfrescoJGroupsChannelFactoryTest extends TestCase
+{
+ private static byte[] bytes = new byte[65536];
+ static
+ {
+ for (int i = 0; i < bytes.length; i++)
+ {
+ bytes[i] = 1;
+ }
+ }
+
+ private String appRegion;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ appRegion = getName();
+ }
+
+ /**
+ * Check that the channel is behaving
+ */
+ private void stressChannel(Channel channel) throws Exception
+ {
+ System.out.println("Test: " + getName());
+ System.out.println(" Channel: " + channel);
+ System.out.println(" Cluster: " + channel.getClusterName());
+ channel.send(null, null, Boolean.TRUE);
+ channel.send(new Message(null, null, bytes));
+ }
+
+ public void testNoCluster() throws Exception
+ {
+ Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
+ stressChannel(channel);
+ }
+
+ public void testBasicCluster() throws Exception
+ {
+ AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("blah");
+ Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
+ stressChannel(channel);
+ }
+
+ public void testHotSwapCluster() throws Exception
+ {
+ AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("ONE");
+ Channel channel1 = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
+ stressChannel(channel1);
+ AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("TWO");
+ Channel channel2 = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
+ stressChannel(channel1);
+ assertTrue("Channel reference must be the same", channel1 == channel2);
+ }
+}
diff --git a/source/test-resources/jgroups/ehcache-jgroups-cluster-test-context.xml b/source/test-resources/jgroups/ehcache-jgroups-cluster-test-context.xml
new file mode 100644
index 0000000000..d5be11c2b3
--- /dev/null
+++ b/source/test-resources/jgroups/ehcache-jgroups-cluster-test-context.xml
@@ -0,0 +1,19 @@
+
+
+
+
+
+
+
+
+ classpath:jgroups/ehcache-jgroups-cluster-test.xml
+
+
+
+
+
+ ehcache-jgroups-cluster-test
+
+
+
+
\ No newline at end of file
diff --git a/source/test-resources/jgroups/ehcache-jgroups-cluster-test.xml b/source/test-resources/jgroups/ehcache-jgroups-cluster-test.xml
new file mode 100644
index 0000000000..a3da88e41f
--- /dev/null
+++ b/source/test-resources/jgroups/ehcache-jgroups-cluster-test.xml
@@ -0,0 +1,83 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+