From bc91efa1e097a428eff77490c2ed2b1332b0a289 Mon Sep 17 00:00:00 2001 From: Dave Ward Date: Tue, 10 Mar 2009 13:51:31 +0000 Subject: [PATCH] Merged V3.1 to HEAD 13107: Remove accidentally added .project file. 13106: Correction to 13104 - remove accidentally included merge marks 13105: Correction to 13104 - remove accidentally included merge marks 13104: Update continuous build to include Hyperic plugin amongst distributables 13092: Fix ConnectionPoolMBean. Naming of boolean methods didn't match those on DBCP BasicDataSource 13088: Bring hyperic plugin in sync with Derek's recent JMX changes 13076: Extended properties available on ConnectionPoolMBean 13045: Merged V3.0 to v3.1 (record only) 13044: Merged V2.2 to V3.0 13041: Merged V3.1 to V2.2 13037: PostgreSQL upgrade scripts from 2.1.6, 2.2.0, and 2.2.1 git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@13522 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261 --- .../JGroupsKeepAliveHeartbeatReceiver.java | 383 +++++++++--------- 1 file changed, 198 insertions(+), 185 deletions(-) diff --git a/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java b/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java index de6aebd83e..01ff41c885 100644 --- a/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java +++ b/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java @@ -24,194 +24,207 @@ */ package org.alfresco.repo.cache.jgroups; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.StringTokenizer; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.alfresco.util.EqualsHelper; -import org.alfresco.util.TraceableThreadFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jgroups.Address; -import org.jgroups.Channel; -import org.jgroups.Message; -import org.jgroups.ReceiverAdapter; -import org.jgroups.View; +import net.sf.ehcache.distribution.MulticastKeepaliveHeartbeatSender; /** - * Receives heartbeats from the {@link JGroupsKeepAliveHeartbeatSender JGroups heartbeat sender}. + * Receives heartbeats from any {@link MulticastKeepaliveHeartbeatSender}s out there. + *

+ * Our own multicast heartbeats are ignored. * - * @author Derek Hulley - * @since 2.1.3 + * @author Greg Luck + * @version $Id: MulticastKeepaliveHeartbeatReceiver.java 556 2007-10-29 02:06:30Z gregluck $ */ -public class JGroupsKeepAliveHeartbeatReceiver extends ReceiverAdapter +public abstract class JGroupsKeepaliveHeartbeatReceiver { - private static final int MAX_THREADS = 5; - - private static Log logger = LogFactory.getLog(JGroupsKeepAliveHeartbeatReceiver.class); - - private final JGroupsRMICacheManagerPeerProvider peerProvider; - private final JGroupsKeepAliveHeartbeatSender heartbeatSender; - private final Channel channel; - private boolean stopped; - private View lastView; - private final ThreadPoolExecutor threadPool; - private final Set rmiUrlsProcessingQueue; - - public JGroupsKeepAliveHeartbeatReceiver( - JGroupsRMICacheManagerPeerProvider peerProvider, - JGroupsKeepAliveHeartbeatSender heartbeatSender, - Channel channel) - { - this.peerProvider = peerProvider; - this.heartbeatSender = heartbeatSender; - this.channel = channel; - - this.rmiUrlsProcessingQueue = Collections.synchronizedSet(new HashSet()); - - // Build the thread pool - TraceableThreadFactory threadFactory = new TraceableThreadFactory(); - threadFactory.setThreadDaemon(true); - threadFactory.setThreadPriority(Thread.NORM_PRIORITY + 2); - - this.threadPool = new ThreadPoolExecutor( - 1, // Maintain one threads - 1, // We'll increase it, if necessary - 60, // 1 minute until unused threads get trashed - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - threadFactory); - } - - /** - * Register to receive message on the channel - */ - public void init() - { - channel.setReceiver(this); - } - - /** - * Set the stop flag. - */ - public void dispose() - { - stopped = true; - } - - @Override - public byte[] getState() - { - return new byte[] {}; - } - - @Override - public void setState(byte[] state) - { - // Nothing to do - } - - @Override - public void receive(Message message) - { - Address localAddress = heartbeatSender.getHeartbeatSourceAddress(); - Address remoteAddress = message.getSrc(); - // Ignore messages from ourselves - if (EqualsHelper.nullSafeEquals(localAddress, remoteAddress)) - { - if (logger.isDebugEnabled()) - { - logger.debug("\n" + - "Ignoring cache peeer URLs heartbeat from self: " + message); - } - return; - } - - String rmiUrls = new String(message.getBuffer()); - if (logger.isDebugEnabled()) - { - logger.debug("\n" + - "Received cache peer URLs heartbeat: \n" + - " Message: " + message + "\n" + - " Peers: " + rmiUrls); - } - // Quickly split them up - StringTokenizer tokenizer = new StringTokenizer(rmiUrls, JGroupsKeepAliveHeartbeatSender.URL_DELIMITER, false); - while (!stopped && tokenizer.hasMoreTokens()) - { - String rmiUrl = tokenizer.nextToken(); - // Is it pending? - if (rmiUrlsProcessingQueue.add(rmiUrl)) - { - // Not pending. Shedule it. - ProcessingRunnable runnable = new ProcessingRunnable(rmiUrl); - threadPool.execute(runnable); - } - else - { - // It was already waiting to be processed - // Increase the thread pool size - int currentThreadPoolMaxSize = threadPool.getMaximumPoolSize(); - if (currentThreadPoolMaxSize < MAX_THREADS) - { - threadPool.setMaximumPoolSize(currentThreadPoolMaxSize + 1); - } - } - } - } - - /** - * Worker class to go into thread pool - * - * @author Derek Hulley - */ - private class ProcessingRunnable implements Runnable - { - private String rmiUrl; - private ProcessingRunnable(String rmiUrl) - { - this.rmiUrl = rmiUrl; - } - public void run() - { - rmiUrlsProcessingQueue.remove(rmiUrl); - if (stopped) - { - return; - } - peerProvider.registerPeer(rmiUrl); - } - } - - @Override - public void viewAccepted(View newView) - { - if (EqualsHelper.nullSafeEquals(lastView, newView)) - { - // No change, so ignore - return; - } - int lastSize = (lastView == null) ? 0 : lastView.getMembers().size(); - int newSize = newView.getMembers().size(); - // Report - if (newSize < lastSize) - { - logger.warn("\n" + - "New cluster view with fewer members: \n" + - " Last View: " + lastView + "\n" + - " New View: " + newView); - } - else - { - logger.info("\n" + - "New cluster view with additional members: \n" + - " Last View: " + lastView + "\n" + - " New View: " + newView); - } - lastView = newView; - } +// private static final Log LOG = LogFactory.getLog(MulticastKeepaliveHeartbeatReceiver.class.getName()); +// +// private ExecutorService processingThreadPool; +// private Set rmiUrlsProcessingQueue = Collections.synchronizedSet(new HashSet()); +// private final InetAddress groupMulticastAddress; +// private final Integer groupMulticastPort; +// private MulticastReceiverThread receiverThread; +// private MulticastSocket socket; +// private boolean stopped; +// private final MulticastRMICacheManagerPeerProvider peerProvider; +// +// /** +// * Constructor. +// * +// * @param peerProvider +// * @param multicastAddress +// * @param multicastPort +// */ +// public MulticastKeepaliveHeartbeatReceiver( +// MulticastRMICacheManagerPeerProvider peerProvider, InetAddress multicastAddress, Integer multicastPort) { +// this.peerProvider = peerProvider; +// this.groupMulticastAddress = multicastAddress; +// this.groupMulticastPort = multicastPort; +// } +// +// /** +// * Start. +// * +// * @throws IOException +// */ +// final void init() throws IOException { +// socket = new MulticastSocket(groupMulticastPort.intValue()); +// socket.joinGroup(groupMulticastAddress); +// receiverThread = new MulticastReceiverThread(); +// receiverThread.start(); +// processingThreadPool = Executors.newCachedThreadPool(); +// } +// +// /** +// * Shutdown the heartbeat. +// */ +// public final void dispose() { +// LOG.debug("dispose called"); +// processingThreadPool.shutdownNow(); +// stopped = true; +// receiverThread.interrupt(); +// } +// +// /** +// * A multicast receiver which continously receives heartbeats. +// */ +// private final class MulticastReceiverThread extends Thread { +// +// /** +// * Constructor +// */ +// public MulticastReceiverThread() { +// super("Multicast Heartbeat Receiver Thread"); +// setDaemon(true); +// } +// +// public final void run() { +// byte[] buf = new byte[PayloadUtil.MTU]; +// try { +// while (!stopped) { +// DatagramPacket packet = new DatagramPacket(buf, buf.length); +// try { +// socket.receive(packet); +// byte[] payload = packet.getData(); +// processPayload(payload); +// +// +// } catch (IOException e) { +// if (!stopped) { +// LOG.error("Error receiving heartbeat. " + e.getMessage() + +// ". Initial cause was " + e.getMessage(), e); +// } +// } +// } +// } catch (Throwable t) { +// LOG.error("Multicast receiver thread caught throwable. Cause was " + t.getMessage() + ". Continuing..."); +// } +// } +// +// private void processPayload(byte[] compressedPayload) { +// byte[] payload = PayloadUtil.ungzip(compressedPayload); +// String rmiUrls = new String(payload); +// if (self(rmiUrls)) { +// return; +// } +// rmiUrls = rmiUrls.trim(); +// if (LOG.isTraceEnabled()) { +// LOG.trace("rmiUrls received " + rmiUrls); +// } +// processRmiUrls(rmiUrls); +// } +// +// /** +// * This method forks a new executor to process the received heartbeat in a thread pool. +// * That way each remote cache manager cannot interfere with others. +// *

+// * In the worst case, we have as many concurrent threads as remote cache managers. +// * +// * @param rmiUrls +// */ +// private void processRmiUrls(final String rmiUrls) { +// if (rmiUrlsProcessingQueue.contains(rmiUrls)) { +// if (LOG.isDebugEnabled()) { +// LOG.debug("We are already processing these rmiUrls. Another heartbeat came before we finished: " + rmiUrls); +// } +// return; +// } +// +// if (processingThreadPool == null) { +// return; +// } +// +// processingThreadPool.execute(new Runnable() { +// public void run() { +// try { +// // Add the rmiUrls we are processing. +// rmiUrlsProcessingQueue.add(rmiUrls); +// for (StringTokenizer stringTokenizer = new StringTokenizer(rmiUrls, +// PayloadUtil.URL_DELIMITER); stringTokenizer.hasMoreTokens();) { +// if (stopped) { +// return; +// } +// String rmiUrl = stringTokenizer.nextToken(); +// registerNotification(rmiUrl); +// if (!peerProvider.peerUrls.containsKey(rmiUrl)) { +// if (LOG.isDebugEnabled()) { +// LOG.debug("Aborting processing of rmiUrls since failed to add rmiUrl: " + rmiUrl); +// } +// return; +// } +// } +// } finally { +// // Remove the rmiUrls we just processed +// rmiUrlsProcessingQueue.remove(rmiUrls); +// } +// } +// }); +// } +// +// +// /** +// * @param rmiUrls +// * @return true if our own hostname and listener port are found in the list. This then means we have +// * caught our onw multicast, and should be ignored. +// */ +// private boolean self(String rmiUrls) { +// CacheManager cacheManager = peerProvider.getCacheManager(); +// CacheManagerPeerListener cacheManagerPeerListener = cacheManager.getCachePeerListener(); +// if (cacheManagerPeerListener == null) { +// return false; +// } +// List boundCachePeers = cacheManagerPeerListener.getBoundCachePeers(); +// if (boundCachePeers == null || boundCachePeers.size() == 0) { +// return false; +// } +// CachePeer peer = (CachePeer) boundCachePeers.get(0); +// String cacheManagerUrlBase = null; +// try { +// cacheManagerUrlBase = peer.getUrlBase(); +// } catch (RemoteException e) { +// LOG.error("Error geting url base"); +// } +// int baseUrlMatch = rmiUrls.indexOf(cacheManagerUrlBase); +// return baseUrlMatch != -1; +// } +// +// private void registerNotification(String rmiUrl) { +// peerProvider.registerPeer(rmiUrl); +// } +// +// +// /** +// * {@inheritDoc} +// */ +// public final void interrupt() { +// try { +// socket.leaveGroup(groupMulticastAddress); +// } catch (IOException e) { +// LOG.error("Error leaving group"); +// } +// socket.close(); +// super.interrupt(); +// } +// } +// +// }