mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
Merged V3.1 to HEAD
12896: Merged DEV/LIVECYCLE-3.1 to V3.1 12859: Merged V2.1-A to DEV/LIVECYCLE-3.1 9040: Fixed WebService client code to take dynamic webapp name 12865: Merged V2.1-A to DEV/LIVECYCLE-3.1 9040 integration: Added 'repository.webapp' (defaults to 'alfresco') for Webservice clients 12868: JAWS-142: Adobe LC JGroups Clustering - JGroups communications and factories remain in 'repository' project - JGroups EHCache integration moved to 'enterpriserepository' project - Default factory for EHCache cluster config is aware of open-enterprise split - Default EHCache config still works as normal - JGroups EHCache config still enabled by setting 'alfresco.cluster.name' property 12887: Merged V2.1-A to DEV\LIVECYCLE-31 8619: Hard-coded "admin" usage in non-test classes only 12906: Re-deleted files after merge mix-up ___________________________________________________________________ Modified: svn:mergeinfo Merged /alfresco/BRANCHES/V3.1:r12896,12906 git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@13524 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -98,7 +98,7 @@
|
|||||||
</property>
|
</property>
|
||||||
</bean>
|
</bean>
|
||||||
|
|
||||||
<!-- Set up the JGroups clustering, if necessary -->
|
<!-- Set up JGroups communication, if necessary -->
|
||||||
<bean name="jgroupsChannelFactory" class="org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory">
|
<bean name="jgroupsChannelFactory" class="org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory">
|
||||||
<property name="clusterName">
|
<property name="clusterName">
|
||||||
<value>${alfresco.cluster.name}</value>
|
<value>${alfresco.cluster.name}</value>
|
||||||
|
@@ -60,9 +60,6 @@
|
|||||||
|
|
||||||
<!-- Abstract bean definition defining base definition for content service -->
|
<!-- Abstract bean definition defining base definition for content service -->
|
||||||
<bean id="baseContentService" class="org.alfresco.repo.content.RoutingContentService" abstract="true" init-method="init">
|
<bean id="baseContentService" class="org.alfresco.repo.content.RoutingContentService" abstract="true" init-method="init">
|
||||||
<property name="transactionService">
|
|
||||||
<ref bean="transactionService" />
|
|
||||||
</property>
|
|
||||||
<property name="retryingTransactionHelper">
|
<property name="retryingTransactionHelper">
|
||||||
<ref bean="retryingTransactionHelper"/>
|
<ref bean="retryingTransactionHelper"/>
|
||||||
</property>
|
</property>
|
||||||
|
@@ -2,8 +2,15 @@
|
|||||||
<diskStore
|
<diskStore
|
||||||
path="java.io.tmpdir"/>
|
path="java.io.tmpdir"/>
|
||||||
|
|
||||||
|
<!--
|
||||||
|
The 'heartbeatInterval' property is the only one used for the JGroups-enabled implementation
|
||||||
|
-->
|
||||||
<cacheManagerPeerProviderFactory
|
<cacheManagerPeerProviderFactory
|
||||||
class="org.alfresco.repo.cache.jgroups.JGroupsRMICacheManagerPeerProvider$Factory"
|
class="org.alfresco.repo.cache.AlfrescoCacheManagerPeerProviderFactory"
|
||||||
|
properties="heartbeatInterval=5000,
|
||||||
|
peerDiscovery=automatic,
|
||||||
|
multicastGroupAddress=230.0.0.1,
|
||||||
|
multicastGroupPort=4446"/>
|
||||||
/>
|
/>
|
||||||
|
|
||||||
<cacheManagerPeerListenerFactory
|
<cacheManagerPeerListenerFactory
|
||||||
|
59
source/java/org/alfresco/repo/cache/AlfrescoCacheManagerPeerProviderFactory.java
vendored
Normal file
59
source/java/org/alfresco/repo/cache/AlfrescoCacheManagerPeerProviderFactory.java
vendored
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
package org.alfresco.repo.cache;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import net.sf.ehcache.CacheManager;
|
||||||
|
import net.sf.ehcache.distribution.CacheManagerPeerProvider;
|
||||||
|
import net.sf.ehcache.distribution.CacheManagerPeerProviderFactory;
|
||||||
|
import net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Alfresco's <tt>CacheManagerPeerProviderFactory</tt> that defers to the community or
|
||||||
|
* enterprise factories.
|
||||||
|
*
|
||||||
|
* @author Derek Hulley
|
||||||
|
* @since 3.1
|
||||||
|
*/
|
||||||
|
public class AlfrescoCacheManagerPeerProviderFactory extends CacheManagerPeerProviderFactory
|
||||||
|
{
|
||||||
|
private static Log logger = LogFactory.getLog(AlfrescoCacheManagerPeerProviderFactory.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CacheManagerPeerProvider createCachePeerProvider(CacheManager cacheManager, Properties properties)
|
||||||
|
{
|
||||||
|
CacheManagerPeerProviderFactory factory = null;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Class clazz = Class.forName("org.alfresco.enterprise.repo.cache.jgroups.JGroupsRMICacheManagerPeerProvider$Factory");
|
||||||
|
factory = (CacheManagerPeerProviderFactory) clazz.newInstance();
|
||||||
|
}
|
||||||
|
catch (ClassNotFoundException e)
|
||||||
|
{
|
||||||
|
// Entirely expected if the Enterprise-level code is not present
|
||||||
|
}
|
||||||
|
catch (Throwable e)
|
||||||
|
{
|
||||||
|
logger.error("Failed to instantiate JGroupsRMICacheManagerPeerProvider factory.", e);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (factory == null)
|
||||||
|
{
|
||||||
|
// Use EHCache's default implementation
|
||||||
|
factory = new RMICacheManagerPeerProviderFactory();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (logger.isDebugEnabled())
|
||||||
|
{
|
||||||
|
logger.debug("Using peer provider factory: " + factory.getClass().getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
return factory.createCachePeerProvider(cacheManager, properties);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@@ -1,126 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (C) 2005-2008 Alfresco Software Limited.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
|
|
||||||
* As a special exception to the terms and conditions of version 2.0 of
|
|
||||||
* the GPL, you may redistribute this Program in connection with Free/Libre
|
|
||||||
* and Open Source Software ("FLOSS") applications as described in Alfresco's
|
|
||||||
* FLOSS exception. You should have recieved a copy of the text describing
|
|
||||||
* the FLOSS exception, and it is also available here:
|
|
||||||
* http://www.alfresco.com/legal/licensing"
|
|
||||||
*/
|
|
||||||
package org.alfresco.repo.cache;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.net.URL;
|
|
||||||
|
|
||||||
import net.sf.ehcache.CacheManager;
|
|
||||||
|
|
||||||
import org.alfresco.error.AlfrescoRuntimeException;
|
|
||||||
import org.alfresco.repo.cache.jgroups.JGroupsRMICacheManagerPeerProvider;
|
|
||||||
import org.alfresco.util.AbstractLifecycleBean;
|
|
||||||
import org.alfresco.util.PropertyCheck;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.jgroups.JChannel;
|
|
||||||
import org.springframework.context.ApplicationEvent;
|
|
||||||
import org.springframework.util.ResourceUtils;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A bootstrap bean that sets up the Alfresco-specific cache manager.
|
|
||||||
*
|
|
||||||
* @author Derek Hulley
|
|
||||||
* @since 2.1.3
|
|
||||||
*/
|
|
||||||
public class AlfrescoEhCacheBootstrap extends AbstractLifecycleBean
|
|
||||||
{
|
|
||||||
private static Log logger = LogFactory.getLog(AlfrescoEhCacheBootstrap.class);
|
|
||||||
|
|
||||||
private JChannel ehcacheHeartbeatChannel;
|
|
||||||
private String clusterName;
|
|
||||||
|
|
||||||
public AlfrescoEhCacheBootstrap()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setEhcacheHeartbeatChannel(JChannel channel)
|
|
||||||
{
|
|
||||||
this.ehcacheHeartbeatChannel = channel;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setClusterName(String clusterName)
|
|
||||||
{
|
|
||||||
this.clusterName = clusterName;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void onBootstrap(ApplicationEvent event)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
CacheManager cacheManager = InternalEhCacheManagerFactoryBean.getInstance();
|
|
||||||
// We need only proceed if the cache-manager doesn't already have a peer mechanims
|
|
||||||
if( cacheManager.getCacheManagerPeerProvider() != null)
|
|
||||||
{
|
|
||||||
logger.info("Cache cluster config enabled using ehcache-custom.xml");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
setupCaches(cacheManager);
|
|
||||||
}
|
|
||||||
catch (Throwable e)
|
|
||||||
{
|
|
||||||
throw new AlfrescoRuntimeException("Failed to bootstrap the EHCache cluster", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void onShutdown(ApplicationEvent event)
|
|
||||||
{
|
|
||||||
// Shut the cache manager down
|
|
||||||
CacheManager cacheManager = InternalEhCacheManagerFactoryBean.getInstance();
|
|
||||||
cacheManager.shutdown();
|
|
||||||
// Close the channel
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (ehcacheHeartbeatChannel != null && ehcacheHeartbeatChannel.isOpen())
|
|
||||||
{
|
|
||||||
ehcacheHeartbeatChannel.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Throwable e)
|
|
||||||
{
|
|
||||||
logger.error("Error during shutdown: ", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds the necessary peer mechanisms to the caches and cache manager.
|
|
||||||
*
|
|
||||||
* @param cacheManager the cache manager to add the factories to
|
|
||||||
*/
|
|
||||||
private void setupCaches(CacheManager cacheManager)
|
|
||||||
{
|
|
||||||
if (cacheManager.getCachePeerProvider() != null)
|
|
||||||
{
|
|
||||||
throw new RuntimeException("Double check for cache manager peer provider failed");
|
|
||||||
}
|
|
||||||
JGroupsRMICacheManagerPeerProvider peerProvider = new JGroupsRMICacheManagerPeerProvider(
|
|
||||||
cacheManager,
|
|
||||||
ehcacheHeartbeatChannel,
|
|
||||||
JGroupsRMICacheManagerPeerProvider.DEFAULT_HEARTBEAT_INTERVAL);
|
|
||||||
}
|
|
||||||
}
|
|
@@ -25,10 +25,12 @@
|
|||||||
package org.alfresco.repo.cache;
|
package org.alfresco.repo.cache;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
import net.sf.ehcache.CacheException;
|
import net.sf.ehcache.CacheException;
|
||||||
import net.sf.ehcache.CacheManager;
|
import net.sf.ehcache.CacheManager;
|
||||||
|
|
||||||
|
import org.alfresco.error.AlfrescoRuntimeException;
|
||||||
import org.alfresco.util.PropertyCheck;
|
import org.alfresco.util.PropertyCheck;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@@ -65,8 +67,17 @@ public class EhCacheManagerFactoryBean implements FactoryBean, InitializingBean,
|
|||||||
{
|
{
|
||||||
PropertyCheck.mandatory(this, "configLocation", configLocation);
|
PropertyCheck.mandatory(this, "configLocation", configLocation);
|
||||||
|
|
||||||
logger.info("Initializing EHCache CacheManager");
|
// Double-check the config location or EHCache will throw an NPE
|
||||||
this.cacheManager = new CacheManager(this.configLocation.getURL());
|
try
|
||||||
|
{
|
||||||
|
URL configUrl = this.configLocation.getURL();
|
||||||
|
logger.info("Initializing EHCache CacheManager using URL: " + configLocation);
|
||||||
|
this.cacheManager = new CacheManager(configUrl);
|
||||||
|
}
|
||||||
|
catch (IOException e)
|
||||||
|
{
|
||||||
|
throw new AlfrescoRuntimeException("Unabled to read EHCache configuration file at " + configLocation, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object getObject()
|
public Object getObject()
|
||||||
@@ -74,6 +85,7 @@ public class EhCacheManagerFactoryBean implements FactoryBean, InitializingBean,
|
|||||||
return this.cacheManager;
|
return this.cacheManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public Class getObjectType()
|
public Class getObjectType()
|
||||||
{
|
{
|
||||||
return (this.cacheManager != null ? this.cacheManager.getClass() : CacheManager.class);
|
return (this.cacheManager != null ? this.cacheManager.getClass() : CacheManager.class);
|
||||||
|
@@ -1,249 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (C) 2005-2007 Alfresco Software Limited.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
|
|
||||||
* As a special exception to the terms and conditions of version 2.0 of
|
|
||||||
* the GPL, you may redistribute this Program in connection with Free/Libre
|
|
||||||
* and Open Source Software ("FLOSS") applications as described in Alfresco's
|
|
||||||
* FLOSS exception. You should have recieved a copy of the text describing
|
|
||||||
* the FLOSS exception, and it is also available here:
|
|
||||||
* http://www.alfresco.com/legal/licensing"
|
|
||||||
*/
|
|
||||||
package org.alfresco.repo.cache.jgroups;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
import net.sf.ehcache.Cache;
|
|
||||||
import net.sf.ehcache.CacheManager;
|
|
||||||
import net.sf.ehcache.Ehcache;
|
|
||||||
import net.sf.ehcache.Element;
|
|
||||||
import net.sf.ehcache.Status;
|
|
||||||
|
|
||||||
import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory;
|
|
||||||
import org.springframework.context.ApplicationContext;
|
|
||||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @see JGroupsRMICacheManagerPeerProvider
|
|
||||||
*
|
|
||||||
* @author Derek Hulley
|
|
||||||
*/
|
|
||||||
public class JGroupsEhCacheTest extends TestCase
|
|
||||||
{
|
|
||||||
private static final String CACHE_INVALIDATION = "org.alresco.test.cache.invalidation";
|
|
||||||
private static final String CACHE_REPLICATION = "org.alresco.test.cache.replication";
|
|
||||||
private static final String CACHE_NOT_CLUSTERED = "org.alresco.test.cache.not-clustered";
|
|
||||||
private static final String BEAN_CACHE_MANAGER = "ehCacheManager";
|
|
||||||
|
|
||||||
private static final String KEY_A = "A";
|
|
||||||
private static final String KEY_B = "B";
|
|
||||||
private static final String KEY_C = "C";
|
|
||||||
private static final String VALUE_A = "AAA";
|
|
||||||
private static final String VALUE_B = "BBB";
|
|
||||||
private static final String VALUE_C = "CCC";
|
|
||||||
|
|
||||||
private static ApplicationContext ctx = new ClassPathXmlApplicationContext(
|
|
||||||
new String[] {
|
|
||||||
"classpath:jgroups/ehcache-jgroups-cluster-test-context.xml",}
|
|
||||||
);
|
|
||||||
private CacheManager cacheManager;
|
|
||||||
private Cache cacheInvalidation;
|
|
||||||
private Cache cacheReplication;
|
|
||||||
private Cache cacheNotClustered;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setUp() throws Exception
|
|
||||||
{
|
|
||||||
cacheManager = (CacheManager) ctx.getBean(BEAN_CACHE_MANAGER);
|
|
||||||
cacheInvalidation = cacheManager.getCache(CACHE_INVALIDATION);
|
|
||||||
cacheReplication = cacheManager.getCache(CACHE_REPLICATION);
|
|
||||||
cacheNotClustered = cacheManager.getCache(CACHE_NOT_CLUSTERED);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void tearDown()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSetUp() throws Exception
|
|
||||||
{
|
|
||||||
assertNotNull(cacheManager);
|
|
||||||
CacheManager cacheManagerCheck = (CacheManager) ctx.getBean(BEAN_CACHE_MANAGER);
|
|
||||||
assertTrue(cacheManager == cacheManagerCheck);
|
|
||||||
|
|
||||||
// Check that the cache manager is active
|
|
||||||
assertTrue("Cache manager is not alive", cacheManager.getStatus() == Status.STATUS_ALIVE);
|
|
||||||
|
|
||||||
// Check that the caches are available
|
|
||||||
assertNotNull("Cache not found: " + CACHE_INVALIDATION, cacheInvalidation);
|
|
||||||
assertNotNull("Cache not found: " + CACHE_REPLICATION, cacheReplication);
|
|
||||||
assertNotNull("Cache not found: " + CACHE_NOT_CLUSTERED, cacheReplication);
|
|
||||||
|
|
||||||
// Make sure that the cache manager is cluster-enabled
|
|
||||||
assertNotNull("CacheManagerPeerProvider is not present", cacheManager.getCacheManagerPeerProvider());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Loops, checking the names of the caches present in the URLs being sent to the heartbeat. The test will
|
|
||||||
* loop for 6s (maximum heartbeat duration) or until the correct cache names are present.
|
|
||||||
*/
|
|
||||||
private synchronized void checkHeartbeatCacheNamesPresent(String ... cacheNames)
|
|
||||||
{
|
|
||||||
Set<String> lookingFor = new HashSet<String>(Arrays.asList(cacheNames));
|
|
||||||
|
|
||||||
long startTimeMs = System.currentTimeMillis();
|
|
||||||
while (System.currentTimeMillis() - startTimeMs < 6000)
|
|
||||||
{
|
|
||||||
String urls = JGroupsKeepAliveHeartbeatSender.getLastHeartbeatSendUrls();
|
|
||||||
for (String cacheName : cacheNames)
|
|
||||||
{
|
|
||||||
if (urls.contains(cacheName))
|
|
||||||
{
|
|
||||||
lookingFor.remove(cacheName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Did we eliminate all the caches?
|
|
||||||
if (lookingFor.size() == 0)
|
|
||||||
{
|
|
||||||
// All of the given caches are present
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try { wait(100); } catch (InterruptedException e) {}
|
|
||||||
}
|
|
||||||
// Some of the caches are STILL NOT in the heartbeat
|
|
||||||
fail("Caches did not appear in the heartbeat: " + lookingFor);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Loops, checking the names of the caches are <b>not</b> present in the URLs being sent to the heartbeat.
|
|
||||||
* The test will loop for 6s (maximum heartbeat duration) or until the caches are no longer present.
|
|
||||||
*/
|
|
||||||
private synchronized void checkHeartbeatCacheNamesNotPresent(String ... cacheNames)
|
|
||||||
{
|
|
||||||
Set<String> notLookingFor = new HashSet<String>(Arrays.asList(cacheNames));
|
|
||||||
|
|
||||||
long startTimeMs = System.currentTimeMillis();
|
|
||||||
while (System.currentTimeMillis() - startTimeMs < 6000)
|
|
||||||
{
|
|
||||||
String urls = JGroupsKeepAliveHeartbeatSender.getLastHeartbeatSendUrls();
|
|
||||||
for (String cacheName : cacheNames)
|
|
||||||
{
|
|
||||||
if (!urls.contains(cacheName))
|
|
||||||
{
|
|
||||||
notLookingFor.remove(cacheName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Did we eliminate all the caches?
|
|
||||||
if (notLookingFor.size() == 0)
|
|
||||||
{
|
|
||||||
// None of the given caches are present
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try { wait(100); } catch (InterruptedException e) {}
|
|
||||||
}
|
|
||||||
// Some of the caches are STILL in the heartbeat
|
|
||||||
fail("Caches were not removed from the heartbeat: " + notLookingFor);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check that the default heartbeat is as expected
|
|
||||||
*/
|
|
||||||
public void testDefaultHeartbeat() throws Exception
|
|
||||||
{
|
|
||||||
checkHeartbeatCacheNamesPresent(CACHE_INVALIDATION, CACHE_REPLICATION);
|
|
||||||
checkHeartbeatCacheNamesNotPresent(CACHE_NOT_CLUSTERED);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Manipulate the invalidating cache
|
|
||||||
*/
|
|
||||||
public void testInvalidatingCache() throws Exception
|
|
||||||
{
|
|
||||||
cacheInvalidation.put(new Element(KEY_A, VALUE_A));
|
|
||||||
cacheInvalidation.put(new Element(KEY_B, VALUE_B));
|
|
||||||
cacheInvalidation.put(new Element(KEY_C, VALUE_C));
|
|
||||||
|
|
||||||
cacheInvalidation.put(new Element(KEY_A, VALUE_C));
|
|
||||||
cacheInvalidation.put(new Element(KEY_B, VALUE_A));
|
|
||||||
cacheInvalidation.put(new Element(KEY_C, VALUE_B));
|
|
||||||
|
|
||||||
cacheInvalidation.remove(KEY_A);
|
|
||||||
cacheInvalidation.remove(KEY_B);
|
|
||||||
cacheInvalidation.remove(KEY_C);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Manipulate the replicating cache
|
|
||||||
*/
|
|
||||||
public void testReplicatingCache() throws Exception
|
|
||||||
{
|
|
||||||
cacheReplication.put(new Element(KEY_A, VALUE_A));
|
|
||||||
cacheReplication.put(new Element(KEY_B, VALUE_B));
|
|
||||||
cacheReplication.put(new Element(KEY_C, VALUE_C));
|
|
||||||
|
|
||||||
cacheReplication.put(new Element(KEY_A, VALUE_C));
|
|
||||||
cacheReplication.put(new Element(KEY_B, VALUE_A));
|
|
||||||
cacheReplication.put(new Element(KEY_C, VALUE_B));
|
|
||||||
|
|
||||||
cacheReplication.remove(KEY_A);
|
|
||||||
cacheReplication.remove(KEY_B);
|
|
||||||
cacheReplication.remove(KEY_C);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Manipulate the non-clustered cache
|
|
||||||
*/
|
|
||||||
public void testNonClusteredCache() throws Exception
|
|
||||||
{
|
|
||||||
cacheNotClustered.put(new Element(KEY_A, VALUE_A));
|
|
||||||
cacheNotClustered.put(new Element(KEY_B, VALUE_B));
|
|
||||||
cacheNotClustered.put(new Element(KEY_C, VALUE_C));
|
|
||||||
|
|
||||||
cacheNotClustered.put(new Element(KEY_A, VALUE_C));
|
|
||||||
cacheNotClustered.put(new Element(KEY_B, VALUE_A));
|
|
||||||
cacheNotClustered.put(new Element(KEY_C, VALUE_B));
|
|
||||||
|
|
||||||
cacheNotClustered.remove(KEY_A);
|
|
||||||
cacheNotClustered.remove(KEY_B);
|
|
||||||
cacheNotClustered.remove(KEY_C);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Starts up a second VM and manipulates the cache
|
|
||||||
*/
|
|
||||||
public static void main(String ... args)
|
|
||||||
{
|
|
||||||
CacheManager cacheManager = (CacheManager) ctx.getBean(BEAN_CACHE_MANAGER);
|
|
||||||
Ehcache cacheInvalidation = cacheManager.getCache(CACHE_INVALIDATION);
|
|
||||||
Ehcache cacheReplication = cacheManager.getCache(CACHE_REPLICATION);
|
|
||||||
Ehcache cacheNotClustered = cacheManager.getCache(CACHE_NOT_CLUSTERED);
|
|
||||||
|
|
||||||
Element[] elements = new Element[] {
|
|
||||||
new Element(KEY_A, VALUE_A),
|
|
||||||
new Element(KEY_B, VALUE_B),
|
|
||||||
new Element(KEY_C, VALUE_C),
|
|
||||||
};
|
|
||||||
|
|
||||||
synchronized (cacheManager)
|
|
||||||
{
|
|
||||||
try { cacheManager.wait(0); } catch (Throwable e) {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@@ -1,230 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (C) 2005-2008 Alfresco Software Limited.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
|
|
||||||
* As a special exception to the terms and conditions of version 2.0 of
|
|
||||||
* the GPL, you may redistribute this Program in connection with Free/Libre
|
|
||||||
* and Open Source Software ("FLOSS") applications as described in Alfresco's
|
|
||||||
* FLOSS exception. You should have recieved a copy of the text describing
|
|
||||||
* the FLOSS exception, and it is also available here:
|
|
||||||
* http://www.alfresco.com/legal/licensing"
|
|
||||||
*/
|
|
||||||
package org.alfresco.repo.cache.jgroups;
|
|
||||||
|
|
||||||
import net.sf.ehcache.distribution.MulticastKeepaliveHeartbeatSender;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Receives heartbeats from any {@link MulticastKeepaliveHeartbeatSender}s out there.
|
|
||||||
* <p/>
|
|
||||||
* Our own multicast heartbeats are ignored.
|
|
||||||
*
|
|
||||||
* @author Greg Luck
|
|
||||||
* @version $Id: MulticastKeepaliveHeartbeatReceiver.java 556 2007-10-29 02:06:30Z gregluck $
|
|
||||||
*/
|
|
||||||
public abstract class JGroupsKeepaliveHeartbeatReceiver
|
|
||||||
{
|
|
||||||
// private static final Log LOG = LogFactory.getLog(MulticastKeepaliveHeartbeatReceiver.class.getName());
|
|
||||||
//
|
|
||||||
// private ExecutorService processingThreadPool;
|
|
||||||
// private Set rmiUrlsProcessingQueue = Collections.synchronizedSet(new HashSet());
|
|
||||||
// private final InetAddress groupMulticastAddress;
|
|
||||||
// private final Integer groupMulticastPort;
|
|
||||||
// private MulticastReceiverThread receiverThread;
|
|
||||||
// private MulticastSocket socket;
|
|
||||||
// private boolean stopped;
|
|
||||||
// private final MulticastRMICacheManagerPeerProvider peerProvider;
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Constructor.
|
|
||||||
// *
|
|
||||||
// * @param peerProvider
|
|
||||||
// * @param multicastAddress
|
|
||||||
// * @param multicastPort
|
|
||||||
// */
|
|
||||||
// public MulticastKeepaliveHeartbeatReceiver(
|
|
||||||
// MulticastRMICacheManagerPeerProvider peerProvider, InetAddress multicastAddress, Integer multicastPort) {
|
|
||||||
// this.peerProvider = peerProvider;
|
|
||||||
// this.groupMulticastAddress = multicastAddress;
|
|
||||||
// this.groupMulticastPort = multicastPort;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Start.
|
|
||||||
// *
|
|
||||||
// * @throws IOException
|
|
||||||
// */
|
|
||||||
// final void init() throws IOException {
|
|
||||||
// socket = new MulticastSocket(groupMulticastPort.intValue());
|
|
||||||
// socket.joinGroup(groupMulticastAddress);
|
|
||||||
// receiverThread = new MulticastReceiverThread();
|
|
||||||
// receiverThread.start();
|
|
||||||
// processingThreadPool = Executors.newCachedThreadPool();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Shutdown the heartbeat.
|
|
||||||
// */
|
|
||||||
// public final void dispose() {
|
|
||||||
// LOG.debug("dispose called");
|
|
||||||
// processingThreadPool.shutdownNow();
|
|
||||||
// stopped = true;
|
|
||||||
// receiverThread.interrupt();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * A multicast receiver which continously receives heartbeats.
|
|
||||||
// */
|
|
||||||
// private final class MulticastReceiverThread extends Thread {
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Constructor
|
|
||||||
// */
|
|
||||||
// public MulticastReceiverThread() {
|
|
||||||
// super("Multicast Heartbeat Receiver Thread");
|
|
||||||
// setDaemon(true);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// public final void run() {
|
|
||||||
// byte[] buf = new byte[PayloadUtil.MTU];
|
|
||||||
// try {
|
|
||||||
// while (!stopped) {
|
|
||||||
// DatagramPacket packet = new DatagramPacket(buf, buf.length);
|
|
||||||
// try {
|
|
||||||
// socket.receive(packet);
|
|
||||||
// byte[] payload = packet.getData();
|
|
||||||
// processPayload(payload);
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// } catch (IOException e) {
|
|
||||||
// if (!stopped) {
|
|
||||||
// LOG.error("Error receiving heartbeat. " + e.getMessage() +
|
|
||||||
// ". Initial cause was " + e.getMessage(), e);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// } catch (Throwable t) {
|
|
||||||
// LOG.error("Multicast receiver thread caught throwable. Cause was " + t.getMessage() + ". Continuing...");
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void processPayload(byte[] compressedPayload) {
|
|
||||||
// byte[] payload = PayloadUtil.ungzip(compressedPayload);
|
|
||||||
// String rmiUrls = new String(payload);
|
|
||||||
// if (self(rmiUrls)) {
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
// rmiUrls = rmiUrls.trim();
|
|
||||||
// if (LOG.isTraceEnabled()) {
|
|
||||||
// LOG.trace("rmiUrls received " + rmiUrls);
|
|
||||||
// }
|
|
||||||
// processRmiUrls(rmiUrls);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * This method forks a new executor to process the received heartbeat in a thread pool.
|
|
||||||
// * That way each remote cache manager cannot interfere with others.
|
|
||||||
// * <p/>
|
|
||||||
// * In the worst case, we have as many concurrent threads as remote cache managers.
|
|
||||||
// *
|
|
||||||
// * @param rmiUrls
|
|
||||||
// */
|
|
||||||
// private void processRmiUrls(final String rmiUrls) {
|
|
||||||
// if (rmiUrlsProcessingQueue.contains(rmiUrls)) {
|
|
||||||
// if (LOG.isDebugEnabled()) {
|
|
||||||
// LOG.debug("We are already processing these rmiUrls. Another heartbeat came before we finished: " + rmiUrls);
|
|
||||||
// }
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (processingThreadPool == null) {
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// processingThreadPool.execute(new Runnable() {
|
|
||||||
// public void run() {
|
|
||||||
// try {
|
|
||||||
// // Add the rmiUrls we are processing.
|
|
||||||
// rmiUrlsProcessingQueue.add(rmiUrls);
|
|
||||||
// for (StringTokenizer stringTokenizer = new StringTokenizer(rmiUrls,
|
|
||||||
// PayloadUtil.URL_DELIMITER); stringTokenizer.hasMoreTokens();) {
|
|
||||||
// if (stopped) {
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
// String rmiUrl = stringTokenizer.nextToken();
|
|
||||||
// registerNotification(rmiUrl);
|
|
||||||
// if (!peerProvider.peerUrls.containsKey(rmiUrl)) {
|
|
||||||
// if (LOG.isDebugEnabled()) {
|
|
||||||
// LOG.debug("Aborting processing of rmiUrls since failed to add rmiUrl: " + rmiUrl);
|
|
||||||
// }
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// } finally {
|
|
||||||
// // Remove the rmiUrls we just processed
|
|
||||||
// rmiUrlsProcessingQueue.remove(rmiUrls);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * @param rmiUrls
|
|
||||||
// * @return true if our own hostname and listener port are found in the list. This then means we have
|
|
||||||
// * caught our onw multicast, and should be ignored.
|
|
||||||
// */
|
|
||||||
// private boolean self(String rmiUrls) {
|
|
||||||
// CacheManager cacheManager = peerProvider.getCacheManager();
|
|
||||||
// CacheManagerPeerListener cacheManagerPeerListener = cacheManager.getCachePeerListener();
|
|
||||||
// if (cacheManagerPeerListener == null) {
|
|
||||||
// return false;
|
|
||||||
// }
|
|
||||||
// List boundCachePeers = cacheManagerPeerListener.getBoundCachePeers();
|
|
||||||
// if (boundCachePeers == null || boundCachePeers.size() == 0) {
|
|
||||||
// return false;
|
|
||||||
// }
|
|
||||||
// CachePeer peer = (CachePeer) boundCachePeers.get(0);
|
|
||||||
// String cacheManagerUrlBase = null;
|
|
||||||
// try {
|
|
||||||
// cacheManagerUrlBase = peer.getUrlBase();
|
|
||||||
// } catch (RemoteException e) {
|
|
||||||
// LOG.error("Error geting url base");
|
|
||||||
// }
|
|
||||||
// int baseUrlMatch = rmiUrls.indexOf(cacheManagerUrlBase);
|
|
||||||
// return baseUrlMatch != -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void registerNotification(String rmiUrl) {
|
|
||||||
// peerProvider.registerPeer(rmiUrl);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * {@inheritDoc}
|
|
||||||
// */
|
|
||||||
// public final void interrupt() {
|
|
||||||
// try {
|
|
||||||
// socket.leaveGroup(groupMulticastAddress);
|
|
||||||
// } catch (IOException e) {
|
|
||||||
// LOG.error("Error leaving group");
|
|
||||||
// }
|
|
||||||
// socket.close();
|
|
||||||
// super.interrupt();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
}
|
|
@@ -1,256 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (C) 2005-2008 Alfresco Software Limited.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
|
|
||||||
* As a special exception to the terms and conditions of version 2.0 of
|
|
||||||
* the GPL, you may redistribute this Program in connection with Free/Libre
|
|
||||||
* and Open Source Software ("FLOSS") applications as described in Alfresco's
|
|
||||||
* FLOSS exception. You should have recieved a copy of the text describing
|
|
||||||
* the FLOSS exception, and it is also available here:
|
|
||||||
* http://www.alfresco.com/legal/licensing"
|
|
||||||
*/
|
|
||||||
package org.alfresco.repo.cache.jgroups;
|
|
||||||
|
|
||||||
import java.rmi.RemoteException;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import net.sf.ehcache.CacheManager;
|
|
||||||
import net.sf.ehcache.distribution.CachePeer;
|
|
||||||
|
|
||||||
import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory;
|
|
||||||
import org.alfresco.util.VmShutdownListener;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.jgroups.Address;
|
|
||||||
import org.jgroups.Channel;
|
|
||||||
import org.jgroups.Message;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sends heartbeats to a JGroups cluster containing a list of RMI URLs.
|
|
||||||
*
|
|
||||||
* @author Derek Hulley
|
|
||||||
* @since 2.1.3
|
|
||||||
*/
|
|
||||||
public final class JGroupsKeepAliveHeartbeatSender
|
|
||||||
{
|
|
||||||
private static Log logger = LogFactory.getLog(JGroupsKeepAliveHeartbeatSender.class);
|
|
||||||
|
|
||||||
public static final String URL_DELIMITER = "|";
|
|
||||||
|
|
||||||
/** Used to detect the necessary changes to the outgoing heartbeat messages */
|
|
||||||
private static String lastHeartbeatSendUrls = "";
|
|
||||||
public static synchronized String getLastHeartbeatSendUrls()
|
|
||||||
{
|
|
||||||
return lastHeartbeatSendUrls;
|
|
||||||
}
|
|
||||||
public static synchronized void setLastHeartbeatSendUrls(String heartbeatUrls)
|
|
||||||
{
|
|
||||||
lastHeartbeatSendUrls = heartbeatUrls;
|
|
||||||
}
|
|
||||||
|
|
||||||
private VmShutdownListener vmShutdownListener;
|
|
||||||
|
|
||||||
private final CacheManager cacheManager;
|
|
||||||
private final Channel heartbeatChannel;
|
|
||||||
private long heartBeatInterval;
|
|
||||||
private boolean stopped;
|
|
||||||
private HeartbeatSenderThread serverThread;
|
|
||||||
private Address heartbeatSourceAddress;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param cacheManager the bound CacheManager
|
|
||||||
* @param heartbeatChannel the cluster channel to use
|
|
||||||
* @param heartBeatInterval the time between heartbeats
|
|
||||||
*/
|
|
||||||
public JGroupsKeepAliveHeartbeatSender(
|
|
||||||
CacheManager cacheManager,
|
|
||||||
Channel heartbeatChannel,
|
|
||||||
long heartBeatInterval)
|
|
||||||
{
|
|
||||||
this.vmShutdownListener = new VmShutdownListener("JGroupsKeepAliveHeartbeatSender");
|
|
||||||
this.cacheManager = cacheManager;
|
|
||||||
this.heartbeatChannel = heartbeatChannel;
|
|
||||||
this.heartBeatInterval = heartBeatInterval;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return Return the heartbeat interval (milliseconds)
|
|
||||||
*/
|
|
||||||
public long getHeartBeatInterval()
|
|
||||||
{
|
|
||||||
return heartBeatInterval;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return Returns the last heartbeat source address
|
|
||||||
*/
|
|
||||||
/*package*/ synchronized Address getHeartbeatSourceAddress()
|
|
||||||
{
|
|
||||||
return heartbeatSourceAddress;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* @param heartbeatSourceAddress the source address
|
|
||||||
*/
|
|
||||||
private synchronized void setHeartbeatSourceAddress(Address heartbeatSourceAddress)
|
|
||||||
{
|
|
||||||
this.heartbeatSourceAddress = heartbeatSourceAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start the heartbeat thread
|
|
||||||
*/
|
|
||||||
public void init()
|
|
||||||
{
|
|
||||||
serverThread = new HeartbeatSenderThread();
|
|
||||||
serverThread.setDaemon(true);
|
|
||||||
serverThread.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Shutdown this heartbeat sender
|
|
||||||
*/
|
|
||||||
public final synchronized void dispose()
|
|
||||||
{
|
|
||||||
stopped = true;
|
|
||||||
notifyAll();
|
|
||||||
serverThread.interrupt();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A thread which sends a multicast heartbeat every second
|
|
||||||
*/
|
|
||||||
private class HeartbeatSenderThread extends Thread
|
|
||||||
{
|
|
||||||
private Message heartBeatMessage;
|
|
||||||
private int lastCachePeersHash;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor
|
|
||||||
*/
|
|
||||||
public HeartbeatSenderThread()
|
|
||||||
{
|
|
||||||
super("JGroupsKeepAliveHeartbeatSender");
|
|
||||||
setDaemon(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
public final void run()
|
|
||||||
{
|
|
||||||
if (logger.isDebugEnabled())
|
|
||||||
{
|
|
||||||
logger.debug("\n" +
|
|
||||||
"Starting cache peer URLs heartbeat");
|
|
||||||
}
|
|
||||||
while (!stopped && !vmShutdownListener.isVmShuttingDown())
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (AlfrescoJGroupsChannelFactory.isClusterActive())
|
|
||||||
{
|
|
||||||
Message message = getCachePeersMessage();
|
|
||||||
if (logger.isDebugEnabled())
|
|
||||||
{
|
|
||||||
logger.debug("\n" +
|
|
||||||
"Sending cache peer URLs heartbeat: \n" +
|
|
||||||
" Message: " + message + "\n" +
|
|
||||||
" Peers: " + new String(message.getBuffer()));
|
|
||||||
}
|
|
||||||
heartbeatChannel.send(message);
|
|
||||||
// Record the source address
|
|
||||||
setHeartbeatSourceAddress(message.getSrc());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Throwable e)
|
|
||||||
{
|
|
||||||
logger.debug("Heartbeat sending failed: ", e);
|
|
||||||
}
|
|
||||||
// Quick exit if necessary
|
|
||||||
if (stopped || vmShutdownListener.isVmShuttingDown())
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// Wait for the next heartbeat
|
|
||||||
synchronized (this)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
wait(heartBeatInterval);
|
|
||||||
}
|
|
||||||
catch (InterruptedException e)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the message containing the peer URLs to send in the next heartbeat.
|
|
||||||
*/
|
|
||||||
private Message getCachePeersMessage()
|
|
||||||
{
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
List<CachePeer> localCachePeers = cacheManager.getCachePeerListener().getBoundCachePeers();
|
|
||||||
int newCachePeersHash = localCachePeers.hashCode();
|
|
||||||
if (heartBeatMessage == null || lastCachePeersHash != newCachePeersHash)
|
|
||||||
{
|
|
||||||
lastCachePeersHash = newCachePeersHash;
|
|
||||||
|
|
||||||
String peerUrls = assembleUrlList(localCachePeers);
|
|
||||||
JGroupsKeepAliveHeartbeatSender.setLastHeartbeatSendUrls(peerUrls);
|
|
||||||
// Convert to message
|
|
||||||
byte[] peerUrlsBytes = peerUrls.getBytes();
|
|
||||||
heartBeatMessage = new Message(null, null, peerUrlsBytes);
|
|
||||||
if (logger.isDebugEnabled())
|
|
||||||
{
|
|
||||||
logger.debug("\n" +
|
|
||||||
"Heartbeat message updated: \n" +
|
|
||||||
" URLs: " + peerUrls + "\n" +
|
|
||||||
" Message: " + heartBeatMessage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return heartBeatMessage;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Builds a String of cache peer URLs of the form url1|url2|url3
|
|
||||||
*/
|
|
||||||
public String assembleUrlList(List<CachePeer> localCachePeers)
|
|
||||||
{
|
|
||||||
StringBuffer sb = new StringBuffer();
|
|
||||||
for (int i = 0; i < localCachePeers.size(); i++)
|
|
||||||
{
|
|
||||||
CachePeer cachePeer = (CachePeer) localCachePeers.get(i);
|
|
||||||
String rmiUrl = null;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
rmiUrl = cachePeer.getUrl();
|
|
||||||
}
|
|
||||||
catch (RemoteException e)
|
|
||||||
{
|
|
||||||
logger.error("This should never be thrown as it is called locally");
|
|
||||||
}
|
|
||||||
if (i != localCachePeers.size() - 1)
|
|
||||||
{
|
|
||||||
sb.append(rmiUrl).append(URL_DELIMITER);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
sb.append(rmiUrl);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@@ -1,449 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (C) 2005-2008 Alfresco Software Limited.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
|
|
||||||
* As a special exception to the terms and conditions of version 2.0 of
|
|
||||||
* the GPL, you may redistribute this Program in connection with Free/Libre
|
|
||||||
* and Open Source Software ("FLOSS") applications as described in Alfresco's
|
|
||||||
* FLOSS exception. You should have recieved a copy of the text describing
|
|
||||||
* the FLOSS exception, and it is also available here:
|
|
||||||
* http://www.alfresco.com/legal/licensing"
|
|
||||||
*/
|
|
||||||
package org.alfresco.repo.cache.jgroups;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.rmi.Naming;
|
|
||||||
import java.rmi.NotBoundException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
||||||
|
|
||||||
import net.sf.ehcache.CacheException;
|
|
||||||
import net.sf.ehcache.CacheManager;
|
|
||||||
import net.sf.ehcache.Ehcache;
|
|
||||||
import net.sf.ehcache.distribution.CacheManagerPeerProvider;
|
|
||||||
import net.sf.ehcache.distribution.CacheManagerPeerProviderFactory;
|
|
||||||
import net.sf.ehcache.distribution.CachePeer;
|
|
||||||
|
|
||||||
import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.jgroups.Channel;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A cache peer provider that does heartbeat sending and receiving using JGroups.
|
|
||||||
*
|
|
||||||
* @author Derek Hulley
|
|
||||||
* @since 2.1.3
|
|
||||||
*/
|
|
||||||
public class JGroupsRMICacheManagerPeerProvider implements CacheManagerPeerProvider
|
|
||||||
{
|
|
||||||
public static final int DEFAULT_HEARTBEAT_INTERVAL = 5000;
|
|
||||||
public static final int MINIMUM_HEARTBEAT_INTERVAL = 1000;
|
|
||||||
/**
|
|
||||||
* the heartbeat signal time in milliseconds.<br/>
|
|
||||||
* The default is {@link #DEFAULT_HEARTBEAT_INTERVAL}.
|
|
||||||
*/
|
|
||||||
public static final String PROP_HEARTBEAT_INTERVAL = "heartbeatInterval";
|
|
||||||
|
|
||||||
private static Log logger = LogFactory.getLog(JGroupsRMICacheManagerPeerProvider.class);
|
|
||||||
|
|
||||||
private final JGroupsKeepAliveHeartbeatSender heartbeatSender;
|
|
||||||
private final JGroupsKeepAliveHeartbeatReceiver heartbeatReceiver;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Store the entries referenced first by cache name and then by the RMI URL.
|
|
||||||
* This looks like duplicated data, but the fastest lookup needs to be a retrieval of
|
|
||||||
* the list of URLs for a given cache. All other access is infrequent.
|
|
||||||
*/
|
|
||||||
private final Map<String, Map<String, CachePeerEntry>> cachePeersByUrlByCacheName;
|
|
||||||
|
|
||||||
private final long staleAge;
|
|
||||||
private final ReadLock peersReadLock;
|
|
||||||
private final WriteLock peersWriteLock;
|
|
||||||
|
|
||||||
public JGroupsRMICacheManagerPeerProvider(CacheManager cacheManager, Channel heartbeatChannel, long heartbeatInterval)
|
|
||||||
{
|
|
||||||
this.heartbeatSender = new JGroupsKeepAliveHeartbeatSender(cacheManager, heartbeatChannel, heartbeatInterval);
|
|
||||||
this.heartbeatReceiver = new JGroupsKeepAliveHeartbeatReceiver(this, heartbeatSender, heartbeatChannel);
|
|
||||||
|
|
||||||
cachePeersByUrlByCacheName = new HashMap<String, Map<String, CachePeerEntry>>(103);
|
|
||||||
|
|
||||||
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
|
|
||||||
peersReadLock = readWriteLock.readLock();
|
|
||||||
peersWriteLock = readWriteLock.writeLock();
|
|
||||||
|
|
||||||
// Calculate the age that a peer entry must be before we consider it stale.
|
|
||||||
// This is the method used in the Multicast EHCache code, so I guess it must be OK...
|
|
||||||
// Ofcourse, it's better to send to peers that are broken than to evict peers when they
|
|
||||||
// are just having some trouble sending their heartbeats for some reason.
|
|
||||||
staleAge = (heartbeatInterval * 2 + 100) * 1000 * 1000;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void init()
|
|
||||||
{
|
|
||||||
heartbeatSender.init();
|
|
||||||
heartbeatReceiver.init();
|
|
||||||
}
|
|
||||||
|
|
||||||
private String extractCacheName(String rmiUrl)
|
|
||||||
{
|
|
||||||
return rmiUrl.substring(rmiUrl.lastIndexOf('/') + 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Lazy map creation using appropriate synchronization
|
|
||||||
* @return the entry if it exists otherwise <tt>null</tt>
|
|
||||||
*/
|
|
||||||
private CachePeerEntry getCachePeerEntry(String cacheName, String rmiUrl)
|
|
||||||
{
|
|
||||||
Map<String, CachePeerEntry> peerEntriesByUrl = getPeerEntriesByUrl(cacheName);
|
|
||||||
|
|
||||||
peersReadLock.lock();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
return peerEntriesByUrl.get(rmiUrl);
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
peersReadLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Lazy map creation using appropriate synchronization
|
|
||||||
* @return never null
|
|
||||||
*/
|
|
||||||
private Map<String, CachePeerEntry> getPeerEntriesByUrl(String cacheName)
|
|
||||||
{
|
|
||||||
Map<String, CachePeerEntry> peerEntriesByUrl;
|
|
||||||
peersReadLock.lock();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
peerEntriesByUrl = cachePeersByUrlByCacheName.get(cacheName);
|
|
||||||
if (peerEntriesByUrl != null)
|
|
||||||
{
|
|
||||||
return peerEntriesByUrl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
peersReadLock.unlock();
|
|
||||||
}
|
|
||||||
peersWriteLock.lock();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
// Double check in the write lock
|
|
||||||
peerEntriesByUrl = cachePeersByUrlByCacheName.get(cacheName);
|
|
||||||
if (peerEntriesByUrl != null)
|
|
||||||
{
|
|
||||||
return peerEntriesByUrl;
|
|
||||||
}
|
|
||||||
peerEntriesByUrl = new HashMap<String, CachePeerEntry>(7);
|
|
||||||
cachePeersByUrlByCacheName.put(cacheName, peerEntriesByUrl);
|
|
||||||
return peerEntriesByUrl;
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
peersWriteLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Performs the actual RMI setup necessary to create a cache peer.
|
|
||||||
* @param rmiUrl the RMI URL of the peer
|
|
||||||
* @return Returns the cache peer
|
|
||||||
*/
|
|
||||||
/* Always called from a write block */
|
|
||||||
private /*synchronized*/ CachePeer registerPeerImpl(String rmiUrl)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
CachePeer cachePeer = (CachePeer) Naming.lookup(rmiUrl);
|
|
||||||
return cachePeer;
|
|
||||||
}
|
|
||||||
catch (NotBoundException e) // Pretty ordinary
|
|
||||||
{
|
|
||||||
if (logger.isDebugEnabled())
|
|
||||||
{
|
|
||||||
logger.debug(
|
|
||||||
"Unable to lookup remote cache peer for " + rmiUrl + ". " +
|
|
||||||
"Removing from peer list.",
|
|
||||||
e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (IOException e) // Some network issue
|
|
||||||
{
|
|
||||||
if (logger.isDebugEnabled())
|
|
||||||
{
|
|
||||||
logger.debug(
|
|
||||||
"Unable to lookup remote cache peer for " + rmiUrl + ". " +
|
|
||||||
"Removing from peer list.",
|
|
||||||
e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Throwable e) // More serious
|
|
||||||
{
|
|
||||||
logger.error(
|
|
||||||
"Unable to lookup remote cache peer for " + rmiUrl + ". " +
|
|
||||||
"Removing from peer list.",
|
|
||||||
e);
|
|
||||||
}
|
|
||||||
// Only errors
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void registerPeer(String rmiUrl)
|
|
||||||
{
|
|
||||||
String cacheName = extractCacheName(rmiUrl);
|
|
||||||
CachePeerEntry peerEntry = getCachePeerEntry(cacheName, rmiUrl);
|
|
||||||
if (peerEntry != null && !peerEntry.isStale(staleAge))
|
|
||||||
{
|
|
||||||
// It is already there and is still current
|
|
||||||
peerEntry.updateTimestamp();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// There is no entry
|
|
||||||
peersWriteLock.lock();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
// Double check
|
|
||||||
peerEntry = getCachePeerEntry(cacheName, rmiUrl);
|
|
||||||
if (peerEntry != null && !peerEntry.isStale(staleAge))
|
|
||||||
{
|
|
||||||
// It has just appeared. Ofcourse, it will be current
|
|
||||||
peerEntry.updateTimestamp();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// Create a new one
|
|
||||||
CachePeer cachePeer = registerPeerImpl(rmiUrl);
|
|
||||||
if (cachePeer == null)
|
|
||||||
{
|
|
||||||
// It can be null, ie. the RMI URL is not valid.
|
|
||||||
// This is not an error and we just ignore it
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// Cache it
|
|
||||||
peerEntry = new CachePeerEntry(cachePeer, rmiUrl);
|
|
||||||
Map<String, CachePeerEntry> peerEntriesByUrl = getPeerEntriesByUrl(cacheName);
|
|
||||||
peerEntriesByUrl.put(rmiUrl, peerEntry);
|
|
||||||
// Done
|
|
||||||
if (logger.isDebugEnabled())
|
|
||||||
{
|
|
||||||
logger.debug("Registered new cache peer with URL: " + rmiUrl);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
peersWriteLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void unregisterPeer(String rmiUrl)
|
|
||||||
{
|
|
||||||
String cacheName = extractCacheName(rmiUrl);
|
|
||||||
Map<String, CachePeerEntry> peerEntriesByUrl = getPeerEntriesByUrl(cacheName);
|
|
||||||
peersWriteLock.lock();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
peerEntriesByUrl.remove(rmiUrl);
|
|
||||||
// Done
|
|
||||||
if (logger.isDebugEnabled())
|
|
||||||
{
|
|
||||||
logger.debug("Unregistered cache peer with URL: " + rmiUrl);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
peersWriteLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<CachePeer> listRemoteCachePeers(Ehcache cache) throws CacheException
|
|
||||||
{
|
|
||||||
String cacheName = cache.getName();
|
|
||||||
Map<String, CachePeerEntry> peerEntriesByUrl = getPeerEntriesByUrl(cacheName);
|
|
||||||
List<CachePeer> cachePeers = new ArrayList<CachePeer>(peerEntriesByUrl.size());
|
|
||||||
List<String> staleUrlEntries = null;
|
|
||||||
peersReadLock.lock();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
for (CachePeerEntry peerEntry : peerEntriesByUrl.values())
|
|
||||||
{
|
|
||||||
if (peerEntry.isStale(staleAge))
|
|
||||||
{
|
|
||||||
if (staleUrlEntries == null)
|
|
||||||
{
|
|
||||||
staleUrlEntries = new ArrayList<String>(4);
|
|
||||||
}
|
|
||||||
// Old
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
cachePeers.add(peerEntry.getCachePeer());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
peersReadLock.unlock();
|
|
||||||
}
|
|
||||||
// Clean up stale URL entries
|
|
||||||
if (staleUrlEntries != null)
|
|
||||||
{
|
|
||||||
for (String rmiUrl : staleUrlEntries)
|
|
||||||
{
|
|
||||||
unregisterPeer(rmiUrl);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Done
|
|
||||||
return cachePeers;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected boolean stale(Date date)
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getTimeForClusterToForm()
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Drops all the peer references
|
|
||||||
*/
|
|
||||||
public void dispose() throws CacheException
|
|
||||||
{
|
|
||||||
peersWriteLock.lock();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
//TODO
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
peersWriteLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A factory that can be given in the ehcache-default.xml or ehcache-custom.xml.
|
|
||||||
* When using this factory, it is not necessary to provide individual cache
|
|
||||||
* cluster configurations provided that the Alfresco bootstrap class, AlfrescoEhCacheBootstrap,
|
|
||||||
* is used.
|
|
||||||
*
|
|
||||||
* @author Derek Hulley
|
|
||||||
* @since 2.1.3
|
|
||||||
*/
|
|
||||||
public static class Factory extends CacheManagerPeerProviderFactory
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public CacheManagerPeerProvider createCachePeerProvider(CacheManager cacheManager, Properties properties)
|
|
||||||
{
|
|
||||||
if (properties == null)
|
|
||||||
{
|
|
||||||
properties = new Properties();
|
|
||||||
}
|
|
||||||
|
|
||||||
Channel channel = AlfrescoJGroupsChannelFactory.getChannel(AlfrescoJGroupsChannelFactory.APP_REGION_EHCACHE_HEARTBEAT);
|
|
||||||
|
|
||||||
long heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
String heartBeatIntervalStr = properties.getProperty(PROP_HEARTBEAT_INTERVAL);
|
|
||||||
if (heartBeatIntervalStr != null)
|
|
||||||
{
|
|
||||||
heartbeatInterval = Long.parseLong(heartBeatIntervalStr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (NumberFormatException e)
|
|
||||||
{
|
|
||||||
throw new RuntimeException(
|
|
||||||
"The property " + PROP_HEARTBEAT_INTERVAL +
|
|
||||||
" must be a valid integer greater than " + MINIMUM_HEARTBEAT_INTERVAL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (heartbeatInterval < MINIMUM_HEARTBEAT_INTERVAL)
|
|
||||||
{
|
|
||||||
throw new RuntimeException(
|
|
||||||
"The minimum value for property " + PROP_HEARTBEAT_INTERVAL +
|
|
||||||
" is " + MINIMUM_HEARTBEAT_INTERVAL + "ms");
|
|
||||||
}
|
|
||||||
return new JGroupsRMICacheManagerPeerProvider(cacheManager, channel, heartbeatInterval);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Map entry to keep references to EHCache peers along with the necessary timestamping.
|
|
||||||
*
|
|
||||||
* @author Derek Hulley
|
|
||||||
* @since 2.1.3
|
|
||||||
*/
|
|
||||||
public static class CachePeerEntry
|
|
||||||
{
|
|
||||||
private final CachePeer cachePeer;
|
|
||||||
private String rmiUrl;
|
|
||||||
private long timestamp;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param cachePeer the remote cache peer
|
|
||||||
* @param rmiUrl the RMI URL for the peer
|
|
||||||
*/
|
|
||||||
public CachePeerEntry(CachePeer cachePeer, String rmiUrl)
|
|
||||||
{
|
|
||||||
this.cachePeer = cachePeer;
|
|
||||||
this.rmiUrl = rmiUrl;
|
|
||||||
this.timestamp = System.nanoTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString()
|
|
||||||
{
|
|
||||||
return rmiUrl;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CachePeer getCachePeer()
|
|
||||||
{
|
|
||||||
return cachePeer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getRmiUrl()
|
|
||||||
{
|
|
||||||
return rmiUrl;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Refreshes the peer's timestamp.
|
|
||||||
*/
|
|
||||||
public void updateTimestamp()
|
|
||||||
{
|
|
||||||
timestamp = System.nanoTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param age the maximum age (nanoseconds) before the peer is considered old
|
|
||||||
* @return Returns <tt>true</tt> if the cache peer is older than the given time (nanoseconds)
|
|
||||||
*/
|
|
||||||
public boolean isStale(long age)
|
|
||||||
{
|
|
||||||
return (System.nanoTime() - age) > timestamp;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@@ -1,77 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (C) 2005-2008 Alfresco Software Limited.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
|
|
||||||
* As a special exception to the terms and conditions of version 2.0 of
|
|
||||||
* the GPL, you may redistribute this Program in connection with Free/Libre
|
|
||||||
* and Open Source Software ("FLOSS") applications as described in Alfresco's
|
|
||||||
* FLOSS exception. You should have recieved a copy of the text describing
|
|
||||||
* the FLOSS exception, and it is also available here:
|
|
||||||
* http://www.alfresco.com/legal/licensing"
|
|
||||||
*/
|
|
||||||
package org.alfresco.repo.jgroups;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
import org.jgroups.JChannel;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A cache peer provider that does heartbeat sending and receiving using JGroups.
|
|
||||||
*
|
|
||||||
* @author Derek Hulley
|
|
||||||
* @since 2.1.3
|
|
||||||
*/
|
|
||||||
public class AlfrescoJChannelFactory
|
|
||||||
{
|
|
||||||
private static final Map<String, JChannel> channels;
|
|
||||||
|
|
||||||
static
|
|
||||||
{
|
|
||||||
channels = new ConcurrentHashMap<String, JChannel>(5);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param clusterName the name of the cluster. The effectively groups the
|
|
||||||
* machines that will be interacting with each other.
|
|
||||||
* @param applicationRegion the application region.
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
public static JChannel getChannel(String clusterName, String applicationRegion)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Synchronization is handled by the Map
|
|
||||||
*/
|
|
||||||
StringBuilder sb = new StringBuilder(100).append(clusterName).append("-").append(applicationRegion);
|
|
||||||
String fullClusterName = sb.toString();
|
|
||||||
// Get the channel
|
|
||||||
JChannel channel = channels.get(fullClusterName);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static synchronized void newChannel(String clusterName, String clusterRegion)
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* TODO: Make into a Spring factory bean so that it can be used as a direct bean reference
|
|
||||||
*/
|
|
||||||
private AlfrescoJChannelFactory()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@@ -1,94 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (C) 2005-2008 Alfresco Software Limited.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
|
|
||||||
* As a special exception to the terms and conditions of version 2.0 of
|
|
||||||
* the GPL, you may redistribute this Program in connection with Free/Libre
|
|
||||||
* and Open Source Software ("FLOSS") applications as described in Alfresco's
|
|
||||||
* FLOSS exception. You should have recieved a copy of the text describing
|
|
||||||
* the FLOSS exception, and it is also available here:
|
|
||||||
* http://www.alfresco.com/legal/licensing"
|
|
||||||
*/
|
|
||||||
package org.alfresco.repo.jgroups;
|
|
||||||
|
|
||||||
import org.jgroups.Channel;
|
|
||||||
import org.jgroups.Message;
|
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @see AlfrescoJChannelFactoryTest
|
|
||||||
*
|
|
||||||
* @author Derek Hulley
|
|
||||||
* @since 2.1.3
|
|
||||||
*/
|
|
||||||
public class AlfrescoJChannelFactoryTest extends TestCase
|
|
||||||
{
|
|
||||||
private static byte[] bytes = new byte[65536];
|
|
||||||
static
|
|
||||||
{
|
|
||||||
for (int i = 0; i < bytes.length; i++)
|
|
||||||
{
|
|
||||||
bytes[i] = 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private AlfrescoJGroupsChannelFactory factory;
|
|
||||||
private String appRegion;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void setUp() throws Exception
|
|
||||||
{
|
|
||||||
factory = new AlfrescoJGroupsChannelFactory();
|
|
||||||
appRegion = getName();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check that the channel is behaving
|
|
||||||
*/
|
|
||||||
private void stressChannel(Channel channel) throws Exception
|
|
||||||
{
|
|
||||||
System.out.println("Test: " + getName());
|
|
||||||
System.out.println(" Channel: " + channel);
|
|
||||||
System.out.println(" Cluster: " + channel.getClusterName());
|
|
||||||
channel.send(null, null, Boolean.TRUE);
|
|
||||||
channel.send(new Message(null, null, bytes));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testNoCluster() throws Exception
|
|
||||||
{
|
|
||||||
Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
|
|
||||||
stressChannel(channel);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testBasicCluster() throws Exception
|
|
||||||
{
|
|
||||||
AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("blah");
|
|
||||||
Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
|
|
||||||
stressChannel(channel);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testHotSwapCluster() throws Exception
|
|
||||||
{
|
|
||||||
AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("ONE");
|
|
||||||
Channel channel1 = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
|
|
||||||
stressChannel(channel1);
|
|
||||||
AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("TWO");
|
|
||||||
Channel channel2 = AlfrescoJGroupsChannelFactory.getChannel(appRegion);
|
|
||||||
stressChannel(channel1);
|
|
||||||
assertTrue("Channel reference must be the same", channel1 == channel2);
|
|
||||||
}
|
|
||||||
}
|
|
@@ -1,19 +0,0 @@
|
|||||||
<?xml version='1.0' encoding='UTF-8'?>
|
|
||||||
<!DOCTYPE beans PUBLIC '-//SPRING//DTD BEAN//EN' 'http://www.springframework.org/dtd/spring-beans.dtd'>
|
|
||||||
|
|
||||||
<!-- Beans to load test ehcache configuration for JGroups clustering support -->
|
|
||||||
<beans>
|
|
||||||
|
|
||||||
<bean name="ehCacheManager" class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean" >
|
|
||||||
<property name="configLocation">
|
|
||||||
<value>classpath:jgroups/ehcache-jgroups-cluster-test.xml</value>
|
|
||||||
</property>
|
|
||||||
</bean>
|
|
||||||
|
|
||||||
<bean name="jgroupsChannelFactory" class="org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory">
|
|
||||||
<property name="clusterName">
|
|
||||||
<value>ehcache-jgroups-cluster-test</value>
|
|
||||||
</property>
|
|
||||||
</bean>
|
|
||||||
|
|
||||||
</beans>
|
|
@@ -1,83 +0,0 @@
|
|||||||
<ehcache>
|
|
||||||
<!--
|
|
||||||
Initialises EHCache with the JGroup-enabled cluster support.
|
|
||||||
At the moment, the default replication mechanism remains RMI with the
|
|
||||||
heartbeat functionality being replaced by a JGroups channel.
|
|
||||||
|
|
||||||
The JGroups heartbeats are only activated if the following bean is configured:
|
|
||||||
org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory
|
|
||||||
Without it, the JGroups heartbeats will all be absorbed locally and there will
|
|
||||||
be no communication with any other Alfresco instances.
|
|
||||||
-->
|
|
||||||
|
|
||||||
<diskStore
|
|
||||||
path="java.io.tmpdir"/>
|
|
||||||
|
|
||||||
<cacheManagerPeerProviderFactory
|
|
||||||
class="org.alfresco.repo.cache.jgroups.JGroupsRMICacheManagerPeerProvider$Factory"
|
|
||||||
/>
|
|
||||||
|
|
||||||
<cacheManagerPeerListenerFactory
|
|
||||||
class="net.sf.ehcache.distribution.RMICacheManagerPeerListenerFactory"
|
|
||||||
/>
|
|
||||||
|
|
||||||
<defaultCache
|
|
||||||
maxElementsInMemory="5000"
|
|
||||||
eternal="true"
|
|
||||||
timeToIdleSeconds="0"
|
|
||||||
timeToLiveSeconds="0"
|
|
||||||
overflowToDisk="false">
|
|
||||||
|
|
||||||
<cacheEventListenerFactory
|
|
||||||
class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"
|
|
||||||
properties="replicatePuts = false,
|
|
||||||
replicateUpdates = true,
|
|
||||||
replicateRemovals = true,
|
|
||||||
replicateUpdatesViaCopy = false,
|
|
||||||
replicateAsynchronously = false"/>
|
|
||||||
</defaultCache>
|
|
||||||
|
|
||||||
<!-- Clustered: Invalidation -->
|
|
||||||
<cache
|
|
||||||
name="org.alresco.test.cache.invalidation"
|
|
||||||
maxElementsInMemory="500"
|
|
||||||
eternal="true"
|
|
||||||
timeToLiveSeconds="0"
|
|
||||||
overflowToDisk="false">
|
|
||||||
|
|
||||||
<cacheEventListenerFactory
|
|
||||||
class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"
|
|
||||||
properties="replicatePuts = false,
|
|
||||||
replicateUpdates = true,
|
|
||||||
replicateRemovals = true,
|
|
||||||
replicateUpdatesViaCopy = false,
|
|
||||||
replicateAsynchronously = false"/>
|
|
||||||
</cache>
|
|
||||||
|
|
||||||
<!-- Clustered: Replication -->
|
|
||||||
<cache
|
|
||||||
name="org.alresco.test.cache.replication"
|
|
||||||
maxElementsInMemory="1000"
|
|
||||||
eternal="true"
|
|
||||||
overflowToDisk="true">
|
|
||||||
|
|
||||||
<cacheEventListenerFactory
|
|
||||||
class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"
|
|
||||||
properties="replicatePuts = true,
|
|
||||||
replicateUpdates = true,
|
|
||||||
replicateRemovals = true,
|
|
||||||
replicateUpdatesViaCopy = true,
|
|
||||||
replicateAsynchronously = false"/>
|
|
||||||
</cache>
|
|
||||||
|
|
||||||
<!-- Not clustered -->
|
|
||||||
<cache
|
|
||||||
name="org.alresco.test.cache.not-clustered"
|
|
||||||
maxElementsInMemory="1000"
|
|
||||||
eternal="true"
|
|
||||||
overflowToDisk="true">
|
|
||||||
</cache>
|
|
||||||
|
|
||||||
</ehcache>
|
|
||||||
|
|
||||||
|
|
Reference in New Issue
Block a user