mirror of
				https://github.com/Alfresco/alfresco-community-repo.git
				synced 2025-10-22 15:12:38 +00:00 
			
		
		
		
	Merged V3.1 to HEAD
13107: Remove accidentally added .project file.
   13106: Correction to 13104 - remove accidentally included merge marks
   13105: Correction to 13104 - remove accidentally included merge marks
   13104: Update continuous build to include Hyperic plugin amongst distributables
   13092: Fix ConnectionPoolMBean. Naming of boolean methods didn't match those on DBCP BasicDataSource
   13088: Bring hyperic plugin in sync with Derek's recent JMX changes
   13076: Extended properties available on ConnectionPoolMBean
   13045: Merged V3.0 to v3.1 (record only)
      13044: Merged V2.2 to V3.0
         13041: Merged V3.1 to V2.2
            13037: PostgreSQL upgrade scripts from 2.1.6, 2.2.0, and 2.2.1
git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@13522 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
			
			
This commit is contained in:
		| @@ -24,194 +24,207 @@ | ||||
|  */ | ||||
| package org.alfresco.repo.cache.jgroups; | ||||
|  | ||||
| import java.util.Collections; | ||||
| import java.util.HashSet; | ||||
| import java.util.Set; | ||||
| import java.util.StringTokenizer; | ||||
| import java.util.concurrent.LinkedBlockingQueue; | ||||
| import java.util.concurrent.ThreadPoolExecutor; | ||||
| import java.util.concurrent.TimeUnit; | ||||
|  | ||||
| import org.alfresco.util.EqualsHelper; | ||||
| import org.alfresco.util.TraceableThreadFactory; | ||||
| import org.apache.commons.logging.Log; | ||||
| import org.apache.commons.logging.LogFactory; | ||||
| import org.jgroups.Address; | ||||
| import org.jgroups.Channel; | ||||
| import org.jgroups.Message; | ||||
| import org.jgroups.ReceiverAdapter; | ||||
| import org.jgroups.View; | ||||
| import net.sf.ehcache.distribution.MulticastKeepaliveHeartbeatSender; | ||||
|  | ||||
| /** | ||||
|  * Receives heartbeats from the {@link JGroupsKeepAliveHeartbeatSender JGroups heartbeat sender}. | ||||
|  * Receives heartbeats from any {@link MulticastKeepaliveHeartbeatSender}s out there. | ||||
|  * <p/> | ||||
|  * Our own multicast heartbeats are ignored. | ||||
|  * | ||||
|  * @author Derek Hulley | ||||
|  * @since 2.1.3 | ||||
|  * @author Greg Luck | ||||
|  * @version $Id: MulticastKeepaliveHeartbeatReceiver.java 556 2007-10-29 02:06:30Z gregluck $ | ||||
|  */ | ||||
| public class JGroupsKeepAliveHeartbeatReceiver extends ReceiverAdapter | ||||
| public abstract class JGroupsKeepaliveHeartbeatReceiver | ||||
| { | ||||
|     private static final int MAX_THREADS = 5; | ||||
|      | ||||
|     private static Log logger = LogFactory.getLog(JGroupsKeepAliveHeartbeatReceiver.class); | ||||
|      | ||||
|     private final JGroupsRMICacheManagerPeerProvider peerProvider; | ||||
|     private final JGroupsKeepAliveHeartbeatSender heartbeatSender; | ||||
|     private final Channel channel; | ||||
|     private boolean stopped; | ||||
|     private View lastView; | ||||
|     private final ThreadPoolExecutor threadPool; | ||||
|     private final Set<String> rmiUrlsProcessingQueue; | ||||
|      | ||||
|     public JGroupsKeepAliveHeartbeatReceiver( | ||||
|             JGroupsRMICacheManagerPeerProvider peerProvider, | ||||
|             JGroupsKeepAliveHeartbeatSender heartbeatSender, | ||||
|             Channel channel) | ||||
|     { | ||||
|         this.peerProvider = peerProvider; | ||||
|         this.heartbeatSender = heartbeatSender; | ||||
|         this.channel = channel; | ||||
|          | ||||
|         this.rmiUrlsProcessingQueue = Collections.synchronizedSet(new HashSet<String>()); | ||||
|  | ||||
|         // Build the thread pool | ||||
|         TraceableThreadFactory threadFactory = new TraceableThreadFactory(); | ||||
|         threadFactory.setThreadDaemon(true); | ||||
|         threadFactory.setThreadPriority(Thread.NORM_PRIORITY + 2); | ||||
|          | ||||
|         this.threadPool = new ThreadPoolExecutor( | ||||
|                 1,                                          // Maintain one threads | ||||
|                 1,                                          // We'll increase it, if necessary | ||||
|                 60,                                         // 1 minute until unused threads get trashed | ||||
|                 TimeUnit.SECONDS, | ||||
|                 new LinkedBlockingQueue<Runnable>(), | ||||
|                 threadFactory); | ||||
|     } | ||||
|      | ||||
|     /** | ||||
|      * Register to receive message on the channel | ||||
|      */ | ||||
|     public void init() | ||||
|     { | ||||
|         channel.setReceiver(this); | ||||
|     } | ||||
|      | ||||
|     /** | ||||
|      * Set the stop flag. | ||||
|      */ | ||||
|     public void dispose() | ||||
|     { | ||||
|         stopped = true; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public byte[] getState() | ||||
|     { | ||||
|         return new byte[] {}; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void setState(byte[] state) | ||||
|     { | ||||
|         // Nothing to do | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void receive(Message message) | ||||
|     { | ||||
|         Address localAddress = heartbeatSender.getHeartbeatSourceAddress(); | ||||
|         Address remoteAddress = message.getSrc(); | ||||
|         // Ignore messages from ourselves | ||||
|         if (EqualsHelper.nullSafeEquals(localAddress, remoteAddress)) | ||||
|         { | ||||
|             if (logger.isDebugEnabled()) | ||||
|             { | ||||
|                 logger.debug("\n" + | ||||
|                         "Ignoring cache peeer URLs heartbeat from self: " + message); | ||||
|             } | ||||
|             return; | ||||
|         } | ||||
|          | ||||
|         String rmiUrls = new String(message.getBuffer()); | ||||
|         if (logger.isDebugEnabled()) | ||||
|         { | ||||
|             logger.debug("\n" + | ||||
|                     "Received cache peer URLs heartbeat: \n" + | ||||
|                     "   Message: " + message + "\n" + | ||||
|                     "   Peers:   " + rmiUrls); | ||||
|         } | ||||
|         // Quickly split them up | ||||
|         StringTokenizer tokenizer = new StringTokenizer(rmiUrls, JGroupsKeepAliveHeartbeatSender.URL_DELIMITER, false); | ||||
|         while (!stopped && tokenizer.hasMoreTokens()) | ||||
|         { | ||||
|             String rmiUrl = tokenizer.nextToken(); | ||||
|             // Is it pending? | ||||
|             if (rmiUrlsProcessingQueue.add(rmiUrl)) | ||||
|             { | ||||
|                 // Not pending.  Shedule it. | ||||
|                 ProcessingRunnable runnable = new ProcessingRunnable(rmiUrl); | ||||
|                 threadPool.execute(runnable); | ||||
|             } | ||||
|             else | ||||
|             { | ||||
|                 // It was already waiting to be processed | ||||
|                 // Increase the thread pool size | ||||
|                 int currentThreadPoolMaxSize = threadPool.getMaximumPoolSize(); | ||||
|                 if (currentThreadPoolMaxSize < MAX_THREADS) | ||||
|                 { | ||||
|                     threadPool.setMaximumPoolSize(currentThreadPoolMaxSize + 1); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Worker class to go into thread pool | ||||
|      *  | ||||
|      * @author Derek Hulley | ||||
|      */ | ||||
|     private class ProcessingRunnable implements Runnable | ||||
|     { | ||||
|         private String rmiUrl; | ||||
|         private ProcessingRunnable(String rmiUrl) | ||||
|         { | ||||
|             this.rmiUrl = rmiUrl; | ||||
|         } | ||||
|         public void run() | ||||
|         { | ||||
|             rmiUrlsProcessingQueue.remove(rmiUrl); | ||||
|             if (stopped) | ||||
|             { | ||||
|                 return; | ||||
|             } | ||||
|             peerProvider.registerPeer(rmiUrl); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void viewAccepted(View newView) | ||||
|     { | ||||
|         if (EqualsHelper.nullSafeEquals(lastView, newView)) | ||||
|         { | ||||
|             // No change, so ignore | ||||
|             return; | ||||
|         } | ||||
|         int lastSize = (lastView == null) ? 0 : lastView.getMembers().size(); | ||||
|         int newSize = newView.getMembers().size(); | ||||
|         // Report | ||||
|         if (newSize < lastSize) | ||||
|         { | ||||
|             logger.warn("\n" + | ||||
|             		"New cluster view with fewer members: \n" + | ||||
|             		"   Last View: " + lastView + "\n" + | ||||
|             		"   New View:  " + newView); | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             logger.info("\n" + | ||||
|                     "New cluster view with additional members: \n" + | ||||
|                     "   Last View: " + lastView + "\n" + | ||||
|                     "   New View:  " + newView); | ||||
|         } | ||||
|         lastView = newView; | ||||
|     } | ||||
| //    private static final Log LOG = LogFactory.getLog(MulticastKeepaliveHeartbeatReceiver.class.getName()); | ||||
| // | ||||
| //    private ExecutorService processingThreadPool; | ||||
| //    private Set rmiUrlsProcessingQueue = Collections.synchronizedSet(new HashSet()); | ||||
| //    private final InetAddress groupMulticastAddress; | ||||
| //    private final Integer groupMulticastPort; | ||||
| //    private MulticastReceiverThread receiverThread; | ||||
| //    private MulticastSocket socket; | ||||
| //    private boolean stopped; | ||||
| //    private final MulticastRMICacheManagerPeerProvider peerProvider; | ||||
| // | ||||
| //    /** | ||||
| //     * Constructor. | ||||
| //     * | ||||
| //     * @param peerProvider | ||||
| //     * @param multicastAddress | ||||
| //     * @param multicastPort | ||||
| //     */ | ||||
| //    public MulticastKeepaliveHeartbeatReceiver( | ||||
| //            MulticastRMICacheManagerPeerProvider peerProvider, InetAddress multicastAddress, Integer multicastPort) { | ||||
| //        this.peerProvider = peerProvider; | ||||
| //        this.groupMulticastAddress = multicastAddress; | ||||
| //        this.groupMulticastPort = multicastPort; | ||||
| //    } | ||||
| // | ||||
| //    /** | ||||
| //     * Start. | ||||
| //     * | ||||
| //     * @throws IOException | ||||
| //     */ | ||||
| //    final void init() throws IOException { | ||||
| //        socket = new MulticastSocket(groupMulticastPort.intValue()); | ||||
| //        socket.joinGroup(groupMulticastAddress); | ||||
| //        receiverThread = new MulticastReceiverThread(); | ||||
| //        receiverThread.start(); | ||||
| //        processingThreadPool = Executors.newCachedThreadPool(); | ||||
| //    } | ||||
| // | ||||
| //    /** | ||||
| //     * Shutdown the heartbeat. | ||||
| //     */ | ||||
| //    public final void dispose() { | ||||
| //        LOG.debug("dispose called"); | ||||
| //        processingThreadPool.shutdownNow(); | ||||
| //        stopped = true; | ||||
| //        receiverThread.interrupt(); | ||||
| //    } | ||||
| // | ||||
| //    /** | ||||
| //     * A multicast receiver which continously receives heartbeats. | ||||
| //     */ | ||||
| //    private final class MulticastReceiverThread extends Thread { | ||||
| // | ||||
| //        /** | ||||
| //         * Constructor | ||||
| //         */ | ||||
| //        public MulticastReceiverThread() { | ||||
| //            super("Multicast Heartbeat Receiver Thread"); | ||||
| //            setDaemon(true); | ||||
| //        } | ||||
| // | ||||
| //        public final void run() { | ||||
| //            byte[] buf = new byte[PayloadUtil.MTU]; | ||||
| //            try { | ||||
| //                while (!stopped) { | ||||
| //                    DatagramPacket packet = new DatagramPacket(buf, buf.length); | ||||
| //                    try { | ||||
| //                        socket.receive(packet); | ||||
| //                        byte[] payload = packet.getData(); | ||||
| //                        processPayload(payload); | ||||
| // | ||||
| // | ||||
| //                    } catch (IOException e) { | ||||
| //                        if (!stopped) { | ||||
| //                            LOG.error("Error receiving heartbeat. " + e.getMessage() + | ||||
| //                                    ". Initial cause was " + e.getMessage(), e); | ||||
| //                        } | ||||
| //                    } | ||||
| //                } | ||||
| //            } catch (Throwable t) { | ||||
| //                LOG.error("Multicast receiver thread caught throwable. Cause was " + t.getMessage() + ". Continuing..."); | ||||
| //            } | ||||
| //        } | ||||
| // | ||||
| //        private void processPayload(byte[] compressedPayload) { | ||||
| //            byte[] payload = PayloadUtil.ungzip(compressedPayload); | ||||
| //            String rmiUrls = new String(payload); | ||||
| //            if (self(rmiUrls)) { | ||||
| //                return; | ||||
| //            } | ||||
| //            rmiUrls = rmiUrls.trim(); | ||||
| //            if (LOG.isTraceEnabled()) { | ||||
| //                LOG.trace("rmiUrls received " + rmiUrls); | ||||
| //            } | ||||
| //            processRmiUrls(rmiUrls); | ||||
| //        } | ||||
| // | ||||
| //        /** | ||||
| //         * This method forks a new executor to process the received heartbeat in a thread pool. | ||||
| //         * That way each remote cache manager cannot interfere with others. | ||||
| //         * <p/> | ||||
| //         * In the worst case, we have as many concurrent threads as remote cache managers. | ||||
| //         * | ||||
| //         * @param rmiUrls | ||||
| //         */ | ||||
| //        private void processRmiUrls(final String rmiUrls) { | ||||
| //            if (rmiUrlsProcessingQueue.contains(rmiUrls)) { | ||||
| //                if (LOG.isDebugEnabled()) { | ||||
| //                    LOG.debug("We are already processing these rmiUrls. Another heartbeat came before we finished: " + rmiUrls); | ||||
| //                } | ||||
| //                return; | ||||
| //            } | ||||
| // | ||||
| //            if (processingThreadPool == null) { | ||||
| //                return; | ||||
| //            } | ||||
| // | ||||
| //            processingThreadPool.execute(new Runnable() { | ||||
| //                public void run() { | ||||
| //                    try { | ||||
| //                        // Add the rmiUrls we are processing. | ||||
| //                        rmiUrlsProcessingQueue.add(rmiUrls); | ||||
| //                        for (StringTokenizer stringTokenizer = new StringTokenizer(rmiUrls, | ||||
| //                                PayloadUtil.URL_DELIMITER); stringTokenizer.hasMoreTokens();) { | ||||
| //                            if (stopped) { | ||||
| //                                return; | ||||
| //                            } | ||||
| //                            String rmiUrl = stringTokenizer.nextToken(); | ||||
| //                            registerNotification(rmiUrl); | ||||
| //                            if (!peerProvider.peerUrls.containsKey(rmiUrl)) { | ||||
| //                                if (LOG.isDebugEnabled()) { | ||||
| //                                    LOG.debug("Aborting processing of rmiUrls since failed to add rmiUrl: " + rmiUrl); | ||||
| //                                } | ||||
| //                                return; | ||||
| //                            } | ||||
| //                        } | ||||
| //                    } finally { | ||||
| //                        // Remove the rmiUrls we just processed | ||||
| //                        rmiUrlsProcessingQueue.remove(rmiUrls); | ||||
| //                    } | ||||
| //                } | ||||
| //            }); | ||||
| //        } | ||||
| // | ||||
| // | ||||
| //        /** | ||||
| //         * @param rmiUrls | ||||
| //         * @return true if our own hostname and listener port are found in the list. This then means we have | ||||
| //         *         caught our onw multicast, and should be ignored. | ||||
| //         */ | ||||
| //        private boolean self(String rmiUrls) { | ||||
| //            CacheManager cacheManager = peerProvider.getCacheManager(); | ||||
| //            CacheManagerPeerListener cacheManagerPeerListener = cacheManager.getCachePeerListener(); | ||||
| //            if (cacheManagerPeerListener == null) { | ||||
| //                return false; | ||||
| //            } | ||||
| //            List boundCachePeers = cacheManagerPeerListener.getBoundCachePeers(); | ||||
| //            if (boundCachePeers == null || boundCachePeers.size() == 0) { | ||||
| //                return false; | ||||
| //            } | ||||
| //            CachePeer peer = (CachePeer) boundCachePeers.get(0); | ||||
| //            String cacheManagerUrlBase = null; | ||||
| //            try { | ||||
| //                cacheManagerUrlBase = peer.getUrlBase(); | ||||
| //            } catch (RemoteException e) { | ||||
| //                LOG.error("Error geting url base"); | ||||
| //            } | ||||
| //            int baseUrlMatch = rmiUrls.indexOf(cacheManagerUrlBase); | ||||
| //            return baseUrlMatch != -1; | ||||
| //        } | ||||
| // | ||||
| //        private void registerNotification(String rmiUrl) { | ||||
| //            peerProvider.registerPeer(rmiUrl); | ||||
| //        } | ||||
| // | ||||
| // | ||||
| //        /** | ||||
| //         * {@inheritDoc} | ||||
| //         */ | ||||
| //        public final void interrupt() { | ||||
| //            try { | ||||
| //                socket.leaveGroup(groupMulticastAddress); | ||||
| //            } catch (IOException e) { | ||||
| //                LOG.error("Error leaving group"); | ||||
| //            } | ||||
| //            socket.close(); | ||||
| //            super.interrupt(); | ||||
| //        } | ||||
| //    } | ||||
| // | ||||
| // | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user