diff --git a/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java b/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java
index de6aebd83e..01ff41c885 100644
--- a/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java
+++ b/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java
@@ -24,194 +24,207 @@
*/
package org.alfresco.repo.cache.jgroups;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.alfresco.util.EqualsHelper;
-import org.alfresco.util.TraceableThreadFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.Message;
-import org.jgroups.ReceiverAdapter;
-import org.jgroups.View;
+import net.sf.ehcache.distribution.MulticastKeepaliveHeartbeatSender;
/**
- * Receives heartbeats from the {@link JGroupsKeepAliveHeartbeatSender JGroups heartbeat sender}.
+ * Receives heartbeats from any {@link MulticastKeepaliveHeartbeatSender}s out there.
+ *
+ * Our own multicast heartbeats are ignored.
*
- * @author Derek Hulley
- * @since 2.1.3
+ * @author Greg Luck
+ * @version $Id: MulticastKeepaliveHeartbeatReceiver.java 556 2007-10-29 02:06:30Z gregluck $
*/
-public class JGroupsKeepAliveHeartbeatReceiver extends ReceiverAdapter
+public abstract class JGroupsKeepaliveHeartbeatReceiver
{
- private static final int MAX_THREADS = 5;
-
- private static Log logger = LogFactory.getLog(JGroupsKeepAliveHeartbeatReceiver.class);
-
- private final JGroupsRMICacheManagerPeerProvider peerProvider;
- private final JGroupsKeepAliveHeartbeatSender heartbeatSender;
- private final Channel channel;
- private boolean stopped;
- private View lastView;
- private final ThreadPoolExecutor threadPool;
- private final Set rmiUrlsProcessingQueue;
-
- public JGroupsKeepAliveHeartbeatReceiver(
- JGroupsRMICacheManagerPeerProvider peerProvider,
- JGroupsKeepAliveHeartbeatSender heartbeatSender,
- Channel channel)
- {
- this.peerProvider = peerProvider;
- this.heartbeatSender = heartbeatSender;
- this.channel = channel;
-
- this.rmiUrlsProcessingQueue = Collections.synchronizedSet(new HashSet());
-
- // Build the thread pool
- TraceableThreadFactory threadFactory = new TraceableThreadFactory();
- threadFactory.setThreadDaemon(true);
- threadFactory.setThreadPriority(Thread.NORM_PRIORITY + 2);
-
- this.threadPool = new ThreadPoolExecutor(
- 1, // Maintain one threads
- 1, // We'll increase it, if necessary
- 60, // 1 minute until unused threads get trashed
- TimeUnit.SECONDS,
- new LinkedBlockingQueue(),
- threadFactory);
- }
-
- /**
- * Register to receive message on the channel
- */
- public void init()
- {
- channel.setReceiver(this);
- }
-
- /**
- * Set the stop flag.
- */
- public void dispose()
- {
- stopped = true;
- }
-
- @Override
- public byte[] getState()
- {
- return new byte[] {};
- }
-
- @Override
- public void setState(byte[] state)
- {
- // Nothing to do
- }
-
- @Override
- public void receive(Message message)
- {
- Address localAddress = heartbeatSender.getHeartbeatSourceAddress();
- Address remoteAddress = message.getSrc();
- // Ignore messages from ourselves
- if (EqualsHelper.nullSafeEquals(localAddress, remoteAddress))
- {
- if (logger.isDebugEnabled())
- {
- logger.debug("\n" +
- "Ignoring cache peeer URLs heartbeat from self: " + message);
- }
- return;
- }
-
- String rmiUrls = new String(message.getBuffer());
- if (logger.isDebugEnabled())
- {
- logger.debug("\n" +
- "Received cache peer URLs heartbeat: \n" +
- " Message: " + message + "\n" +
- " Peers: " + rmiUrls);
- }
- // Quickly split them up
- StringTokenizer tokenizer = new StringTokenizer(rmiUrls, JGroupsKeepAliveHeartbeatSender.URL_DELIMITER, false);
- while (!stopped && tokenizer.hasMoreTokens())
- {
- String rmiUrl = tokenizer.nextToken();
- // Is it pending?
- if (rmiUrlsProcessingQueue.add(rmiUrl))
- {
- // Not pending. Shedule it.
- ProcessingRunnable runnable = new ProcessingRunnable(rmiUrl);
- threadPool.execute(runnable);
- }
- else
- {
- // It was already waiting to be processed
- // Increase the thread pool size
- int currentThreadPoolMaxSize = threadPool.getMaximumPoolSize();
- if (currentThreadPoolMaxSize < MAX_THREADS)
- {
- threadPool.setMaximumPoolSize(currentThreadPoolMaxSize + 1);
- }
- }
- }
- }
-
- /**
- * Worker class to go into thread pool
- *
- * @author Derek Hulley
- */
- private class ProcessingRunnable implements Runnable
- {
- private String rmiUrl;
- private ProcessingRunnable(String rmiUrl)
- {
- this.rmiUrl = rmiUrl;
- }
- public void run()
- {
- rmiUrlsProcessingQueue.remove(rmiUrl);
- if (stopped)
- {
- return;
- }
- peerProvider.registerPeer(rmiUrl);
- }
- }
-
- @Override
- public void viewAccepted(View newView)
- {
- if (EqualsHelper.nullSafeEquals(lastView, newView))
- {
- // No change, so ignore
- return;
- }
- int lastSize = (lastView == null) ? 0 : lastView.getMembers().size();
- int newSize = newView.getMembers().size();
- // Report
- if (newSize < lastSize)
- {
- logger.warn("\n" +
- "New cluster view with fewer members: \n" +
- " Last View: " + lastView + "\n" +
- " New View: " + newView);
- }
- else
- {
- logger.info("\n" +
- "New cluster view with additional members: \n" +
- " Last View: " + lastView + "\n" +
- " New View: " + newView);
- }
- lastView = newView;
- }
+// private static final Log LOG = LogFactory.getLog(MulticastKeepaliveHeartbeatReceiver.class.getName());
+//
+// private ExecutorService processingThreadPool;
+// private Set rmiUrlsProcessingQueue = Collections.synchronizedSet(new HashSet());
+// private final InetAddress groupMulticastAddress;
+// private final Integer groupMulticastPort;
+// private MulticastReceiverThread receiverThread;
+// private MulticastSocket socket;
+// private boolean stopped;
+// private final MulticastRMICacheManagerPeerProvider peerProvider;
+//
+// /**
+// * Constructor.
+// *
+// * @param peerProvider
+// * @param multicastAddress
+// * @param multicastPort
+// */
+// public MulticastKeepaliveHeartbeatReceiver(
+// MulticastRMICacheManagerPeerProvider peerProvider, InetAddress multicastAddress, Integer multicastPort) {
+// this.peerProvider = peerProvider;
+// this.groupMulticastAddress = multicastAddress;
+// this.groupMulticastPort = multicastPort;
+// }
+//
+// /**
+// * Start.
+// *
+// * @throws IOException
+// */
+// final void init() throws IOException {
+// socket = new MulticastSocket(groupMulticastPort.intValue());
+// socket.joinGroup(groupMulticastAddress);
+// receiverThread = new MulticastReceiverThread();
+// receiverThread.start();
+// processingThreadPool = Executors.newCachedThreadPool();
+// }
+//
+// /**
+// * Shutdown the heartbeat.
+// */
+// public final void dispose() {
+// LOG.debug("dispose called");
+// processingThreadPool.shutdownNow();
+// stopped = true;
+// receiverThread.interrupt();
+// }
+//
+// /**
+// * A multicast receiver which continously receives heartbeats.
+// */
+// private final class MulticastReceiverThread extends Thread {
+//
+// /**
+// * Constructor
+// */
+// public MulticastReceiverThread() {
+// super("Multicast Heartbeat Receiver Thread");
+// setDaemon(true);
+// }
+//
+// public final void run() {
+// byte[] buf = new byte[PayloadUtil.MTU];
+// try {
+// while (!stopped) {
+// DatagramPacket packet = new DatagramPacket(buf, buf.length);
+// try {
+// socket.receive(packet);
+// byte[] payload = packet.getData();
+// processPayload(payload);
+//
+//
+// } catch (IOException e) {
+// if (!stopped) {
+// LOG.error("Error receiving heartbeat. " + e.getMessage() +
+// ". Initial cause was " + e.getMessage(), e);
+// }
+// }
+// }
+// } catch (Throwable t) {
+// LOG.error("Multicast receiver thread caught throwable. Cause was " + t.getMessage() + ". Continuing...");
+// }
+// }
+//
+// private void processPayload(byte[] compressedPayload) {
+// byte[] payload = PayloadUtil.ungzip(compressedPayload);
+// String rmiUrls = new String(payload);
+// if (self(rmiUrls)) {
+// return;
+// }
+// rmiUrls = rmiUrls.trim();
+// if (LOG.isTraceEnabled()) {
+// LOG.trace("rmiUrls received " + rmiUrls);
+// }
+// processRmiUrls(rmiUrls);
+// }
+//
+// /**
+// * This method forks a new executor to process the received heartbeat in a thread pool.
+// * That way each remote cache manager cannot interfere with others.
+// *
+// * In the worst case, we have as many concurrent threads as remote cache managers.
+// *
+// * @param rmiUrls
+// */
+// private void processRmiUrls(final String rmiUrls) {
+// if (rmiUrlsProcessingQueue.contains(rmiUrls)) {
+// if (LOG.isDebugEnabled()) {
+// LOG.debug("We are already processing these rmiUrls. Another heartbeat came before we finished: " + rmiUrls);
+// }
+// return;
+// }
+//
+// if (processingThreadPool == null) {
+// return;
+// }
+//
+// processingThreadPool.execute(new Runnable() {
+// public void run() {
+// try {
+// // Add the rmiUrls we are processing.
+// rmiUrlsProcessingQueue.add(rmiUrls);
+// for (StringTokenizer stringTokenizer = new StringTokenizer(rmiUrls,
+// PayloadUtil.URL_DELIMITER); stringTokenizer.hasMoreTokens();) {
+// if (stopped) {
+// return;
+// }
+// String rmiUrl = stringTokenizer.nextToken();
+// registerNotification(rmiUrl);
+// if (!peerProvider.peerUrls.containsKey(rmiUrl)) {
+// if (LOG.isDebugEnabled()) {
+// LOG.debug("Aborting processing of rmiUrls since failed to add rmiUrl: " + rmiUrl);
+// }
+// return;
+// }
+// }
+// } finally {
+// // Remove the rmiUrls we just processed
+// rmiUrlsProcessingQueue.remove(rmiUrls);
+// }
+// }
+// });
+// }
+//
+//
+// /**
+// * @param rmiUrls
+// * @return true if our own hostname and listener port are found in the list. This then means we have
+// * caught our onw multicast, and should be ignored.
+// */
+// private boolean self(String rmiUrls) {
+// CacheManager cacheManager = peerProvider.getCacheManager();
+// CacheManagerPeerListener cacheManagerPeerListener = cacheManager.getCachePeerListener();
+// if (cacheManagerPeerListener == null) {
+// return false;
+// }
+// List boundCachePeers = cacheManagerPeerListener.getBoundCachePeers();
+// if (boundCachePeers == null || boundCachePeers.size() == 0) {
+// return false;
+// }
+// CachePeer peer = (CachePeer) boundCachePeers.get(0);
+// String cacheManagerUrlBase = null;
+// try {
+// cacheManagerUrlBase = peer.getUrlBase();
+// } catch (RemoteException e) {
+// LOG.error("Error geting url base");
+// }
+// int baseUrlMatch = rmiUrls.indexOf(cacheManagerUrlBase);
+// return baseUrlMatch != -1;
+// }
+//
+// private void registerNotification(String rmiUrl) {
+// peerProvider.registerPeer(rmiUrl);
+// }
+//
+//
+// /**
+// * {@inheritDoc}
+// */
+// public final void interrupt() {
+// try {
+// socket.leaveGroup(groupMulticastAddress);
+// } catch (IOException e) {
+// LOG.error("Error leaving group");
+// }
+// socket.close();
+// super.interrupt();
+// }
+// }
+//
+//
}