Files
alfresco-community-repo/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatSender.java
Derek Hulley adc940f961 Merged V3.1 to HEAD
Merged DEV/LIVECYCLE-3.1 to V3.1
      12665: Merged V2.1-A to DEV\LIVECYCLE-3.1
         8615: Cluster startup and property setting enhancements
         8657: Fixed shutdown procedure for JGroups
         8676: Enable system property overriding of more JGroups TCP stack properties
               -  ${alfresco.tcp.start_port:7800}
               -  ${alfresco.tcp.port_range:3}
         8678: More logging of cluster view changes and channel factory config during startup
      12667: Merged V2.1-A to DEV/LIVECYCLE-3.1
         9188: Index recovery job only calls through if property 'alfresco.cluster.name' has been set
         9197: Fixed unit test after bean property name change
      12793: Merged V2.1-A to DEV/LIVECYCLE-3.1
         7765: Requested mimetypes
         8526: Updated Mimetypes
         8610: Mimetype changes
         Many branding and other non-core changes were omitted
      12848: Fixed JAWS-223: Adobe LC Hibernate Dialect Loading
             - Hibernate dialect can be null or empty and will be autodetected from the database metadata
             - Property 'hibernate.dialect' is set on the System
             - iBatis loading (activities) checks for 'hibernate.dialect'
             - SchemaBootstrap checks for 'hibernate.dialect'
      12854: Merged V2.1-A to DEV/LIVECYCLE-3.1
         8681: Fixed mimetype 'application/photoshop'
      12856: Merged V2.1-A to DEV/LIVECYCLE-3.1
         9008: Fixed ADB-64: NPE when applying aspect cm:mlDocument
      12857: Merged V2.1-A to DEV/LIVECYCLE-3.1
         9032: ACT-2303: "Namespace is displayed in the Node browser is www.alfresco.org ...
   ___________________________________________________________________
   Modified: svn:mergeinfo
      Merged /alfresco/BRANCHES/V2.1-A:r7765,8526,8610,8615,8657,8676,8678,9188,9197
      Merged /alfresco/BRANCHES/V3.1:r12894
      Merged /alfresco/BRANCHES/DEV/LIVECYCLE-3.1:r12665,12667,12793,12848,12854,12856-12857


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@13518 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
2009-03-10 13:29:24 +00:00

257 lines
8.5 KiB
Java

/*
* Copyright (C) 2005-2008 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have recieved a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.repo.cache.jgroups;
import java.rmi.RemoteException;
import java.util.List;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.distribution.CachePeer;
import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory;
import org.alfresco.util.VmShutdownListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.Message;
/**
* Sends heartbeats to a JGroups cluster containing a list of RMI URLs.
*
* @author Derek Hulley
* @since 2.1.3
*/
public final class JGroupsKeepAliveHeartbeatSender
{
private static Log logger = LogFactory.getLog(JGroupsKeepAliveHeartbeatSender.class);
public static final String URL_DELIMITER = "|";
/** Used to detect the necessary changes to the outgoing heartbeat messages */
private static String lastHeartbeatSendUrls = "";
public static synchronized String getLastHeartbeatSendUrls()
{
return lastHeartbeatSendUrls;
}
public static synchronized void setLastHeartbeatSendUrls(String heartbeatUrls)
{
lastHeartbeatSendUrls = heartbeatUrls;
}
private VmShutdownListener vmShutdownListener;
private final CacheManager cacheManager;
private final Channel heartbeatChannel;
private long heartBeatInterval;
private boolean stopped;
private HeartbeatSenderThread serverThread;
private Address heartbeatSourceAddress;
/**
* @param cacheManager the bound CacheManager
* @param heartbeatChannel the cluster channel to use
* @param heartBeatInterval the time between heartbeats
*/
public JGroupsKeepAliveHeartbeatSender(
CacheManager cacheManager,
Channel heartbeatChannel,
long heartBeatInterval)
{
this.vmShutdownListener = new VmShutdownListener("JGroupsKeepAliveHeartbeatSender");
this.cacheManager = cacheManager;
this.heartbeatChannel = heartbeatChannel;
this.heartBeatInterval = heartBeatInterval;
}
/**
* @return Return the heartbeat interval (milliseconds)
*/
public long getHeartBeatInterval()
{
return heartBeatInterval;
}
/**
* @return Returns the last heartbeat source address
*/
/*package*/ synchronized Address getHeartbeatSourceAddress()
{
return heartbeatSourceAddress;
}
/**
* @param heartbeatSourceAddress the source address
*/
private synchronized void setHeartbeatSourceAddress(Address heartbeatSourceAddress)
{
this.heartbeatSourceAddress = heartbeatSourceAddress;
}
/**
* Start the heartbeat thread
*/
public void init()
{
serverThread = new HeartbeatSenderThread();
serverThread.setDaemon(true);
serverThread.start();
}
/**
* Shutdown this heartbeat sender
*/
public final synchronized void dispose()
{
stopped = true;
notifyAll();
serverThread.interrupt();
}
/**
* A thread which sends a multicast heartbeat every second
*/
private class HeartbeatSenderThread extends Thread
{
private Message heartBeatMessage;
private int lastCachePeersHash;
/**
* Constructor
*/
public HeartbeatSenderThread()
{
super("JGroupsKeepAliveHeartbeatSender");
setDaemon(true);
}
public final void run()
{
if (logger.isDebugEnabled())
{
logger.debug("\n" +
"Starting cache peer URLs heartbeat");
}
while (!stopped && !vmShutdownListener.isVmShuttingDown())
{
try
{
if (AlfrescoJGroupsChannelFactory.isClusterActive())
{
Message message = getCachePeersMessage();
if (logger.isDebugEnabled())
{
logger.debug("\n" +
"Sending cache peer URLs heartbeat: \n" +
" Message: " + message + "\n" +
" Peers: " + new String(message.getBuffer()));
}
heartbeatChannel.send(message);
// Record the source address
setHeartbeatSourceAddress(message.getSrc());
}
}
catch (Throwable e)
{
logger.debug("Heartbeat sending failed: ", e);
}
// Quick exit if necessary
if (stopped || vmShutdownListener.isVmShuttingDown())
{
break;
}
// Wait for the next heartbeat
synchronized (this)
{
try
{
wait(heartBeatInterval);
}
catch (InterruptedException e)
{
}
}
}
}
/**
* Gets the message containing the peer URLs to send in the next heartbeat.
*/
private Message getCachePeersMessage()
{
@SuppressWarnings("unchecked")
List<CachePeer> localCachePeers = cacheManager.getCachePeerListener().getBoundCachePeers();
int newCachePeersHash = localCachePeers.hashCode();
if (heartBeatMessage == null || lastCachePeersHash != newCachePeersHash)
{
lastCachePeersHash = newCachePeersHash;
String peerUrls = assembleUrlList(localCachePeers);
JGroupsKeepAliveHeartbeatSender.setLastHeartbeatSendUrls(peerUrls);
// Convert to message
byte[] peerUrlsBytes = peerUrls.getBytes();
heartBeatMessage = new Message(null, null, peerUrlsBytes);
if (logger.isDebugEnabled())
{
logger.debug("\n" +
"Heartbeat message updated: \n" +
" URLs: " + peerUrls + "\n" +
" Message: " + heartBeatMessage);
}
}
return heartBeatMessage;
}
/**
* Builds a String of cache peer URLs of the form url1|url2|url3
*/
public String assembleUrlList(List<CachePeer> localCachePeers)
{
StringBuffer sb = new StringBuffer();
for (int i = 0; i < localCachePeers.size(); i++)
{
CachePeer cachePeer = (CachePeer) localCachePeers.get(i);
String rmiUrl = null;
try
{
rmiUrl = cachePeer.getUrl();
}
catch (RemoteException e)
{
logger.error("This should never be thrown as it is called locally");
}
if (i != localCachePeers.size() - 1)
{
sb.append(rmiUrl).append(URL_DELIMITER);
}
else
{
sb.append(rmiUrl);
}
}
return sb.toString();
}
}
}