From 4fdb2978840787b2221a0904a6ae71c00958e76c Mon Sep 17 00:00:00 2001 From: Derek Hulley Date: Tue, 10 Mar 2009 13:23:30 +0000 Subject: [PATCH] Merged V3.1 to HEAD Merged DEV/LIVECYCLE-3.1 to V3.1 12581: Merged V2.1-A to DEV/LIVECYCLE-3.1 8675: Set FileServers configuration properties using system properties 12603: Merged V2.1-A to DEV/LIVECYCLE-3.1 8702: Added bindto as a property-enabled value 8703: Added bindto as a property-enabled value 8705: Filter out a bind to address of '0.0.0.0' 12631: Merged V2.1-A to DEV/LIVECYCLE-3.1 8551: JGroups Channel factory 12633: Merged V2.1-A to DEV\LIVECYCLE-3.1 8552: Wired in JGroups heartbeats 8554: Re-delete file 8573: Failed hearbeat still must still wait before trying again 8582: JGroups EHCache heartbeat fixes for TCP stack ___________________________________________________________________ Modified: svn:mergeinfo Merged /alfresco/BRANCHES/V2.1-A:r8551-8552,8554,8573,8582,8675,8702-8705 Merged /alfresco/BRANCHES/V3.1:r12893 Merged /alfresco/BRANCHES/DEV/LIVECYCLE-3.1:r12581,12603,12631,12633 git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@13517 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261 --- config/alfresco/bootstrap-context.xml | 7 + config/alfresco/core-services-context.xml | 2 - config/alfresco/ehcache-default.xml | 20 +- .../ehcache-custom.xml.sample.cluster | 27 +- .../extension/file-servers-custom.xml | 2 +- config/alfresco/file-servers.properties | 8 + config/alfresco/file-servers.xml | 15 +- config/alfresco/jgroups-default.xml | 116 +++ config/alfresco/network-protocol-context.xml | 3 + config/alfresco/repository.properties | 4 + .../filesys/ServerConfigurationBean.java | 13 +- .../repo/cache/AlfrescoEhCacheBootstrap.java | 126 +++ .../cache/jgroups/JGroupsEhCacheTest.java | 249 +++++ .../JGroupsKeepAliveHeartbeatReceiver.java | 197 ++++ .../JGroupsKeepAliveHeartbeatSender.java | 247 +++++ .../JGroupsKeepaliveHeartbeatReceiver.java | 230 +++++ .../JGroupsRMICacheManagerPeerProvider.java | 449 +++++++++ .../repo/jgroups/AlfrescoJChannelFactory.java | 77 ++ .../jgroups/AlfrescoJChannelFactoryTest.java | 94 ++ .../AlfrescoJGroupsChannelFactory.java | 862 ++++++++++++++++++ .../AlfrescoJGroupsChannelFactoryTest.java | 92 ++ .../ehcache-jgroups-cluster-test-context.xml | 19 + .../jgroups/ehcache-jgroups-cluster-test.xml | 83 ++ 23 files changed, 2889 insertions(+), 53 deletions(-) create mode 100644 config/alfresco/jgroups-default.xml create mode 100644 source/java/org/alfresco/repo/cache/AlfrescoEhCacheBootstrap.java create mode 100644 source/java/org/alfresco/repo/cache/jgroups/JGroupsEhCacheTest.java create mode 100644 source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java create mode 100644 source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatSender.java create mode 100644 source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepaliveHeartbeatReceiver.java create mode 100644 source/java/org/alfresco/repo/cache/jgroups/JGroupsRMICacheManagerPeerProvider.java create mode 100644 source/java/org/alfresco/repo/jgroups/AlfrescoJChannelFactory.java create mode 100644 source/java/org/alfresco/repo/jgroups/AlfrescoJChannelFactoryTest.java create mode 100644 source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java create mode 100644 source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactoryTest.java create mode 100644 source/test-resources/jgroups/ehcache-jgroups-cluster-test-context.xml create mode 100644 source/test-resources/jgroups/ehcache-jgroups-cluster-test.xml diff --git a/config/alfresco/bootstrap-context.xml b/config/alfresco/bootstrap-context.xml index ac2e4144cb..bd991ef1d7 100644 --- a/config/alfresco/bootstrap-context.xml +++ b/config/alfresco/bootstrap-context.xml @@ -98,6 +98,13 @@ + + + + ${system.cluster.name} + + + diff --git a/config/alfresco/core-services-context.xml b/config/alfresco/core-services-context.xml index ba60e10c57..7c99d30b6c 100644 --- a/config/alfresco/core-services-context.xml +++ b/config/alfresco/core-services-context.xml @@ -50,8 +50,6 @@ - - diff --git a/config/alfresco/ehcache-default.xml b/config/alfresco/ehcache-default.xml index 2f4cde0a7d..84f1057c32 100644 --- a/config/alfresco/ehcache-default.xml +++ b/config/alfresco/ehcache-default.xml @@ -2,15 +2,9 @@ - + path="java.io.tmpdir" + /> + - + - - + class="org.alfresco.repo.cache.jgroups.JGroupsRMICacheManagerPeerProvider$Factory" + /> - - - - - - - - - + /> - + workspace://SpacesStore /app:company_home diff --git a/config/alfresco/file-servers.properties b/config/alfresco/file-servers.properties index 277a4b9a2f..42ebedfef8 100644 --- a/config/alfresco/file-servers.properties +++ b/config/alfresco/file-servers.properties @@ -1,3 +1,11 @@ +filesystem.name=Alfresco + +cifs.enabled=true cifs.localname=${localname} cifs.domain= cifs.broadcast=255.255.255.255 +cifs.bindto=0.0.0.0 + +ftp.enabled=true + +nfs.enabled=false diff --git a/config/alfresco/file-servers.xml b/config/alfresco/file-servers.xml index 4672aacc70..603f19f345 100644 --- a/config/alfresco/file-servers.xml +++ b/config/alfresco/file-servers.xml @@ -1,16 +1,18 @@ - + Alfresco CIFS Server ${cifs.broadcast} + + ${cifs.bindto} - + @@ -39,7 +41,7 @@ - + - + + + workspace://SpacesStore /app:company_home diff --git a/config/alfresco/jgroups-default.xml b/config/alfresco/jgroups-default.xml new file mode 100644 index 0000000000..b95066fac3 --- /dev/null +++ b/config/alfresco/jgroups-default.xml @@ -0,0 +1,116 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/config/alfresco/network-protocol-context.xml b/config/alfresco/network-protocol-context.xml index e67b48a04b..7a821f7222 100644 --- a/config/alfresco/network-protocol-context.xml +++ b/config/alfresco/network-protocol-context.xml @@ -21,6 +21,9 @@ classpath:alfresco/file-servers.properties + + SYSTEM_PROPERTIES_MODE_OVERRIDE + diff --git a/config/alfresco/repository.properties b/config/alfresco/repository.properties index b98d535180..f92c899687 100644 --- a/config/alfresco/repository.properties +++ b/config/alfresco/repository.properties @@ -59,6 +59,10 @@ index.tracking.minRecordPurgeAgeDays=30 # Change the failure behaviour of the configuration checker system.bootstrap.config_check.strict=true +# The name of the cluster +# Leave this empty to disable cluster entry +system.cluster.name= + # # How long should shutdown wait to complete normally before # taking stronger action and calling System.exit() diff --git a/source/java/org/alfresco/filesys/ServerConfigurationBean.java b/source/java/org/alfresco/filesys/ServerConfigurationBean.java index 6662fa79a6..c4847f8ae0 100644 --- a/source/java/org/alfresco/filesys/ServerConfigurationBean.java +++ b/source/java/org/alfresco/filesys/ServerConfigurationBean.java @@ -136,6 +136,10 @@ public class ServerConfigurationBean extends ServerConfiguration implements Appl public static final String SERVER_CONFIGURATION = "fileServerConfiguration"; + // IP address representing null + + public static final String BIND_TO_IGNORE = "0.0.0.0"; + // SMB/CIFS session debug type strings // // Must match the bit mask order @@ -803,7 +807,8 @@ public class ServerConfigurationBean extends ServerConfiguration implements Appl cifsConfig.setSMBBindAddress(bindAddr); } - else { + else if (!elem.getValue().equals(BIND_TO_IGNORE)) + { // Validate the bind address @@ -957,7 +962,7 @@ public class ServerConfigurationBean extends ServerConfiguration implements Appl // Check for a bind address String bindto = elem.getAttribute("bindto"); - if (bindto != null && bindto.length() > 0) + if (bindto != null && bindto.length() > 0 && !bindto.equals(BIND_TO_IGNORE)) { // Validate the bind address @@ -1137,7 +1142,7 @@ public class ServerConfigurationBean extends ServerConfiguration implements Appl // Check for a bind address String attr = elem.getAttribute("bindto"); - if ( attr != null && attr.length() > 0) { + if ( attr != null && attr.length() > 0 && !attr.equals(BIND_TO_IGNORE)) { // Validate the bind address @@ -1624,7 +1629,7 @@ public class ServerConfigurationBean extends ServerConfiguration implements Appl // Check for a bind address elem = config.getConfigElement("bindto"); - if ( elem != null) { + if ( elem != null && !elem.getValue().equals(BIND_TO_IGNORE)) { // Validate the bind address diff --git a/source/java/org/alfresco/repo/cache/AlfrescoEhCacheBootstrap.java b/source/java/org/alfresco/repo/cache/AlfrescoEhCacheBootstrap.java new file mode 100644 index 0000000000..d2f33eb25f --- /dev/null +++ b/source/java/org/alfresco/repo/cache/AlfrescoEhCacheBootstrap.java @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2005-2008 Alfresco Software Limited. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + * As a special exception to the terms and conditions of version 2.0 of + * the GPL, you may redistribute this Program in connection with Free/Libre + * and Open Source Software ("FLOSS") applications as described in Alfresco's + * FLOSS exception. You should have recieved a copy of the text describing + * the FLOSS exception, and it is also available here: + * http://www.alfresco.com/legal/licensing" + */ +package org.alfresco.repo.cache; + +import java.io.FileNotFoundException; +import java.net.URL; + +import net.sf.ehcache.CacheManager; + +import org.alfresco.error.AlfrescoRuntimeException; +import org.alfresco.repo.cache.jgroups.JGroupsRMICacheManagerPeerProvider; +import org.alfresco.util.AbstractLifecycleBean; +import org.alfresco.util.PropertyCheck; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jgroups.JChannel; +import org.springframework.context.ApplicationEvent; +import org.springframework.util.ResourceUtils; + +/** + * A bootstrap bean that sets up the Alfresco-specific cache manager. + * + * @author Derek Hulley + * @since 2.1.3 + */ +public class AlfrescoEhCacheBootstrap extends AbstractLifecycleBean +{ + private static Log logger = LogFactory.getLog(AlfrescoEhCacheBootstrap.class); + + private JChannel ehcacheHeartbeatChannel; + private String clusterName; + + public AlfrescoEhCacheBootstrap() + { + } + + public void setEhcacheHeartbeatChannel(JChannel channel) + { + this.ehcacheHeartbeatChannel = channel; + } + + public void setClusterName(String clusterName) + { + this.clusterName = clusterName; + } + + @Override + protected void onBootstrap(ApplicationEvent event) + { + try + { + CacheManager cacheManager = InternalEhCacheManagerFactoryBean.getInstance(); + // We need only proceed if the cache-manager doesn't already have a peer mechanims + if( cacheManager.getCacheManagerPeerProvider() != null) + { + logger.info("Cache cluster config enabled using ehcache-custom.xml"); + return; + } + + setupCaches(cacheManager); + } + catch (Throwable e) + { + throw new AlfrescoRuntimeException("Failed to bootstrap the EHCache cluster", e); + } + } + + @Override + protected void onShutdown(ApplicationEvent event) + { + // Shut the cache manager down + CacheManager cacheManager = InternalEhCacheManagerFactoryBean.getInstance(); + cacheManager.shutdown(); + // Close the channel + try + { + if (ehcacheHeartbeatChannel != null && ehcacheHeartbeatChannel.isOpen()) + { + ehcacheHeartbeatChannel.close(); + } + } + catch (Throwable e) + { + logger.error("Error during shutdown: ", e); + } + } + + /** + * Adds the necessary peer mechanisms to the caches and cache manager. + * + * @param cacheManager the cache manager to add the factories to + */ + private void setupCaches(CacheManager cacheManager) + { + if (cacheManager.getCachePeerProvider() != null) + { + throw new RuntimeException("Double check for cache manager peer provider failed"); + } + JGroupsRMICacheManagerPeerProvider peerProvider = new JGroupsRMICacheManagerPeerProvider( + cacheManager, + ehcacheHeartbeatChannel, + JGroupsRMICacheManagerPeerProvider.DEFAULT_HEARTBEAT_INTERVAL); + } +} diff --git a/source/java/org/alfresco/repo/cache/jgroups/JGroupsEhCacheTest.java b/source/java/org/alfresco/repo/cache/jgroups/JGroupsEhCacheTest.java new file mode 100644 index 0000000000..6690ec878c --- /dev/null +++ b/source/java/org/alfresco/repo/cache/jgroups/JGroupsEhCacheTest.java @@ -0,0 +1,249 @@ +/* + * Copyright (C) 2005-2007 Alfresco Software Limited. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + * As a special exception to the terms and conditions of version 2.0 of + * the GPL, you may redistribute this Program in connection with Free/Libre + * and Open Source Software ("FLOSS") applications as described in Alfresco's + * FLOSS exception. You should have recieved a copy of the text describing + * the FLOSS exception, and it is also available here: + * http://www.alfresco.com/legal/licensing" + */ +package org.alfresco.repo.cache.jgroups; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import junit.framework.TestCase; +import net.sf.ehcache.Cache; +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.Ehcache; +import net.sf.ehcache.Element; +import net.sf.ehcache.Status; + +import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +/** + * @see JGroupsRMICacheManagerPeerProvider + * + * @author Derek Hulley + */ +public class JGroupsEhCacheTest extends TestCase +{ + private static final String CACHE_INVALIDATION = "org.alresco.test.cache.invalidation"; + private static final String CACHE_REPLICATION = "org.alresco.test.cache.replication"; + private static final String CACHE_NOT_CLUSTERED = "org.alresco.test.cache.not-clustered"; + private static final String BEAN_CACHE_MANAGER = "ehCacheManager"; + + private static final String KEY_A = "A"; + private static final String KEY_B = "B"; + private static final String KEY_C = "C"; + private static final String VALUE_A = "AAA"; + private static final String VALUE_B = "BBB"; + private static final String VALUE_C = "CCC"; + + private static ApplicationContext ctx = new ClassPathXmlApplicationContext( + new String[] { + "classpath:jgroups/ehcache-jgroups-cluster-test-context.xml",} + ); + private CacheManager cacheManager; + private Cache cacheInvalidation; + private Cache cacheReplication; + private Cache cacheNotClustered; + + @Override + public void setUp() throws Exception + { + cacheManager = (CacheManager) ctx.getBean(BEAN_CACHE_MANAGER); + cacheInvalidation = cacheManager.getCache(CACHE_INVALIDATION); + cacheReplication = cacheManager.getCache(CACHE_REPLICATION); + cacheNotClustered = cacheManager.getCache(CACHE_NOT_CLUSTERED); + } + + @Override + public void tearDown() + { + } + + public void testSetUp() throws Exception + { + assertNotNull(cacheManager); + CacheManager cacheManagerCheck = (CacheManager) ctx.getBean(BEAN_CACHE_MANAGER); + assertTrue(cacheManager == cacheManagerCheck); + + // Check that the cache manager is active + assertTrue("Cache manager is not alive", cacheManager.getStatus() == Status.STATUS_ALIVE); + + // Check that the caches are available + assertNotNull("Cache not found: " + CACHE_INVALIDATION, cacheInvalidation); + assertNotNull("Cache not found: " + CACHE_REPLICATION, cacheReplication); + assertNotNull("Cache not found: " + CACHE_NOT_CLUSTERED, cacheReplication); + + // Make sure that the cache manager is cluster-enabled + assertNotNull("CacheManagerPeerProvider is not present", cacheManager.getCacheManagerPeerProvider()); + } + + /** + * Loops, checking the names of the caches present in the URLs being sent to the heartbeat. The test will + * loop for 6s (maximum heartbeat duration) or until the correct cache names are present. + */ + private synchronized void checkHeartbeatCacheNamesPresent(String ... cacheNames) + { + Set lookingFor = new HashSet(Arrays.asList(cacheNames)); + + long startTimeMs = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTimeMs < 6000) + { + String urls = JGroupsKeepAliveHeartbeatSender.getLastHeartbeatSendUrls(); + for (String cacheName : cacheNames) + { + if (urls.contains(cacheName)) + { + lookingFor.remove(cacheName); + } + } + // Did we eliminate all the caches? + if (lookingFor.size() == 0) + { + // All of the given caches are present + return; + } + try { wait(100); } catch (InterruptedException e) {} + } + // Some of the caches are STILL NOT in the heartbeat + fail("Caches did not appear in the heartbeat: " + lookingFor); + } + + /** + * Loops, checking the names of the caches are not present in the URLs being sent to the heartbeat. + * The test will loop for 6s (maximum heartbeat duration) or until the caches are no longer present. + */ + private synchronized void checkHeartbeatCacheNamesNotPresent(String ... cacheNames) + { + Set notLookingFor = new HashSet(Arrays.asList(cacheNames)); + + long startTimeMs = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTimeMs < 6000) + { + String urls = JGroupsKeepAliveHeartbeatSender.getLastHeartbeatSendUrls(); + for (String cacheName : cacheNames) + { + if (!urls.contains(cacheName)) + { + notLookingFor.remove(cacheName); + } + } + // Did we eliminate all the caches? + if (notLookingFor.size() == 0) + { + // None of the given caches are present + return; + } + try { wait(100); } catch (InterruptedException e) {} + } + // Some of the caches are STILL in the heartbeat + fail("Caches were not removed from the heartbeat: " + notLookingFor); + } + + /** + * Check that the default heartbeat is as expected + */ + public void testDefaultHeartbeat() throws Exception + { + checkHeartbeatCacheNamesPresent(CACHE_INVALIDATION, CACHE_REPLICATION); + checkHeartbeatCacheNamesNotPresent(CACHE_NOT_CLUSTERED); + } + + /** + * Manipulate the invalidating cache + */ + public void testInvalidatingCache() throws Exception + { + cacheInvalidation.put(new Element(KEY_A, VALUE_A)); + cacheInvalidation.put(new Element(KEY_B, VALUE_B)); + cacheInvalidation.put(new Element(KEY_C, VALUE_C)); + + cacheInvalidation.put(new Element(KEY_A, VALUE_C)); + cacheInvalidation.put(new Element(KEY_B, VALUE_A)); + cacheInvalidation.put(new Element(KEY_C, VALUE_B)); + + cacheInvalidation.remove(KEY_A); + cacheInvalidation.remove(KEY_B); + cacheInvalidation.remove(KEY_C); + } + + /** + * Manipulate the replicating cache + */ + public void testReplicatingCache() throws Exception + { + cacheReplication.put(new Element(KEY_A, VALUE_A)); + cacheReplication.put(new Element(KEY_B, VALUE_B)); + cacheReplication.put(new Element(KEY_C, VALUE_C)); + + cacheReplication.put(new Element(KEY_A, VALUE_C)); + cacheReplication.put(new Element(KEY_B, VALUE_A)); + cacheReplication.put(new Element(KEY_C, VALUE_B)); + + cacheReplication.remove(KEY_A); + cacheReplication.remove(KEY_B); + cacheReplication.remove(KEY_C); + } + + /** + * Manipulate the non-clustered cache + */ + public void testNonClusteredCache() throws Exception + { + cacheNotClustered.put(new Element(KEY_A, VALUE_A)); + cacheNotClustered.put(new Element(KEY_B, VALUE_B)); + cacheNotClustered.put(new Element(KEY_C, VALUE_C)); + + cacheNotClustered.put(new Element(KEY_A, VALUE_C)); + cacheNotClustered.put(new Element(KEY_B, VALUE_A)); + cacheNotClustered.put(new Element(KEY_C, VALUE_B)); + + cacheNotClustered.remove(KEY_A); + cacheNotClustered.remove(KEY_B); + cacheNotClustered.remove(KEY_C); + } + + /** + * Starts up a second VM and manipulates the cache + */ + public static void main(String ... args) + { + CacheManager cacheManager = (CacheManager) ctx.getBean(BEAN_CACHE_MANAGER); + Ehcache cacheInvalidation = cacheManager.getCache(CACHE_INVALIDATION); + Ehcache cacheReplication = cacheManager.getCache(CACHE_REPLICATION); + Ehcache cacheNotClustered = cacheManager.getCache(CACHE_NOT_CLUSTERED); + + Element[] elements = new Element[] { + new Element(KEY_A, VALUE_A), + new Element(KEY_B, VALUE_B), + new Element(KEY_C, VALUE_C), + }; + + synchronized (cacheManager) + { + try { cacheManager.wait(0); } catch (Throwable e) {} + } + } +} diff --git a/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java b/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java new file mode 100644 index 0000000000..df3a14ff03 --- /dev/null +++ b/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatReceiver.java @@ -0,0 +1,197 @@ +/* + * Copyright (C) 2005-2008 Alfresco Software Limited. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + * As a special exception to the terms and conditions of version 2.0 of + * the GPL, you may redistribute this Program in connection with Free/Libre + * and Open Source Software ("FLOSS") applications as described in Alfresco's + * FLOSS exception. You should have recieved a copy of the text describing + * the FLOSS exception, and it is also available here: + * http://www.alfresco.com/legal/licensing" + */ +package org.alfresco.repo.cache.jgroups; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.StringTokenizer; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.alfresco.util.EqualsHelper; +import org.alfresco.util.TraceableThreadFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jgroups.Address; +import org.jgroups.Channel; +import org.jgroups.Message; +import org.jgroups.ReceiverAdapter; +import org.jgroups.View; + +/** + * Receives heartbeats from the {@link JGroupsKeepAliveHeartbeatSender JGroups heartbeat sender}. + * + * @author Derek Hulley + * @since 2.1.3 + */ +public class JGroupsKeepAliveHeartbeatReceiver extends ReceiverAdapter +{ + private static final int MAX_THREADS = 5; + + private static Log logger = LogFactory.getLog(JGroupsKeepAliveHeartbeatReceiver.class); + + private final JGroupsRMICacheManagerPeerProvider peerProvider; + private final JGroupsKeepAliveHeartbeatSender heartbeatSender; + private final Channel channel; + private boolean stopped; + private final ThreadPoolExecutor threadPool; + private final Set rmiUrlsProcessingQueue; + + public JGroupsKeepAliveHeartbeatReceiver( + JGroupsRMICacheManagerPeerProvider peerProvider, + JGroupsKeepAliveHeartbeatSender heartbeatSender, + Channel channel) + { + this.peerProvider = peerProvider; + this.heartbeatSender = heartbeatSender; + this.channel = channel; + + this.rmiUrlsProcessingQueue = Collections.synchronizedSet(new HashSet()); + + // Build the thread pool + TraceableThreadFactory threadFactory = new TraceableThreadFactory(); + threadFactory.setThreadDaemon(true); + threadFactory.setThreadPriority(Thread.NORM_PRIORITY + 2); + + this.threadPool = new ThreadPoolExecutor( + 1, // Maintain one threads + 1, // We'll increase it, if necessary + 60, // 1 minute until unused threads get trashed + TimeUnit.SECONDS, + new LinkedBlockingQueue(), + threadFactory); + } + + /** + * Register to receive message on the channel + */ + public void init() + { + channel.setReceiver(this); + } + + /** + * Set the stop flag. + */ + public void dispose() + { + stopped = true; + } + + @Override + public byte[] getState() + { + return new byte[] {}; + } + + @Override + public void setState(byte[] state) + { + // Nothing to do + } + + @Override + public void receive(Message message) + { + Address localAddress = heartbeatSender.getHeartbeatSourceAddress(); + Address remoteAddress = message.getSrc(); + // Ignore messages from ourselves + if (EqualsHelper.nullSafeEquals(localAddress, remoteAddress)) + { + if (logger.isDebugEnabled()) + { + logger.debug("\n" + + "Ignoring cache peeer URLs heartbeat from self: " + message); + } + return; + } + + String rmiUrls = new String(message.getBuffer()); + if (logger.isDebugEnabled()) + { + logger.debug("\n" + + "Received cache peer URLs heartbeat: \n" + + " Message: " + message + "\n" + + " Peers: " + rmiUrls); + } + // Quickly split them up + StringTokenizer tokenizer = new StringTokenizer(rmiUrls, JGroupsKeepAliveHeartbeatSender.URL_DELIMITER, false); + while (!stopped && tokenizer.hasMoreTokens()) + { + String rmiUrl = tokenizer.nextToken(); + // Is it pending? + if (rmiUrlsProcessingQueue.add(rmiUrl)) + { + // Not pending. Shedule it. + ProcessingRunnable runnable = new ProcessingRunnable(rmiUrl); + threadPool.execute(runnable); + } + else + { + // It was already waiting to be processed + // Increase the thread pool size + int currentThreadPoolMaxSize = threadPool.getMaximumPoolSize(); + if (currentThreadPoolMaxSize < MAX_THREADS) + { + threadPool.setMaximumPoolSize(currentThreadPoolMaxSize + 1); + } + } + } + } + + /** + * Worker class to go into thread pool + * + * @author Derek Hulley + */ + private class ProcessingRunnable implements Runnable + { + private String rmiUrl; + private ProcessingRunnable(String rmiUrl) + { + this.rmiUrl = rmiUrl; + } + public void run() + { + rmiUrlsProcessingQueue.remove(rmiUrl); + if (stopped) + { + return; + } + peerProvider.registerPeer(rmiUrl); + } + } + + @Override + public void viewAccepted(View newView) + { + if (logger.isDebugEnabled()) + { + logger.debug("Cluster view changed: " + newView); + } + } +} diff --git a/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatSender.java b/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatSender.java new file mode 100644 index 0000000000..b8ee2d3197 --- /dev/null +++ b/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepAliveHeartbeatSender.java @@ -0,0 +1,247 @@ +/* + * Copyright (C) 2005-2008 Alfresco Software Limited. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + * As a special exception to the terms and conditions of version 2.0 of + * the GPL, you may redistribute this Program in connection with Free/Libre + * and Open Source Software ("FLOSS") applications as described in Alfresco's + * FLOSS exception. You should have recieved a copy of the text describing + * the FLOSS exception, and it is also available here: + * http://www.alfresco.com/legal/licensing" + */ +package org.alfresco.repo.cache.jgroups; + +import java.rmi.RemoteException; +import java.util.List; + +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.distribution.CachePeer; + +import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jgroups.Address; +import org.jgroups.Channel; +import org.jgroups.Message; + +/** + * Sends heartbeats to a JGroups cluster containing a list of RMI URLs. + * + * @author Derek Hulley + * @since 2.1.3 + */ +public final class JGroupsKeepAliveHeartbeatSender +{ + private static Log logger = LogFactory.getLog(JGroupsKeepAliveHeartbeatSender.class); + + public static final String URL_DELIMITER = "|"; + + /** Used to detect the necessary changes to the outgoing heartbeat messages */ + private static String lastHeartbeatSendUrls = ""; + public static synchronized String getLastHeartbeatSendUrls() + { + return lastHeartbeatSendUrls; + } + public static synchronized void setLastHeartbeatSendUrls(String heartbeatUrls) + { + lastHeartbeatSendUrls = heartbeatUrls; + } + + private final CacheManager cacheManager; + private final Channel heartbeatChannel; + private long heartBeatInterval; + private boolean stopped; + private HeartbeatSenderThread serverThread; + private Address heartbeatSourceAddress; + + /** + * @param cacheManager the bound CacheManager + * @param heartbeatChannel the cluster channel to use + * @param heartBeatInterval the time between heartbeats + */ + public JGroupsKeepAliveHeartbeatSender( + CacheManager cacheManager, + Channel heartbeatChannel, + long heartBeatInterval) + { + + this.cacheManager = cacheManager; + this.heartbeatChannel = heartbeatChannel; + this.heartBeatInterval = heartBeatInterval; + } + + /** + * @return Return the heartbeat interval (milliseconds) + */ + public long getHeartBeatInterval() + { + return heartBeatInterval; + } + + /** + * @return Returns the last heartbeat source address + */ + /*package*/ synchronized Address getHeartbeatSourceAddress() + { + return heartbeatSourceAddress; + } + /** + * @param heartbeatSourceAddress the source address + */ + private synchronized void setHeartbeatSourceAddress(Address heartbeatSourceAddress) + { + this.heartbeatSourceAddress = heartbeatSourceAddress; + } + + /** + * Start the heartbeat thread + */ + public void init() + { + serverThread = new HeartbeatSenderThread(); + serverThread.start(); + } + + /** + * Shutdown this heartbeat sender + */ + public final synchronized void dispose() + { + stopped = true; + notifyAll(); + serverThread.interrupt(); + } + + /** + * A thread which sends a multicast heartbeat every second + */ + private class HeartbeatSenderThread extends Thread + { + private Message heartBeatMessage; + private int lastCachePeersHash; + + /** + * Constructor + */ + public HeartbeatSenderThread() + { + super("JGroupsKeepAliveHeartbeatSender"); + setDaemon(true); + } + + public final void run() + { + if (logger.isDebugEnabled()) + { + logger.debug("\n" + + "Starting cache peer URLs heartbeat"); + } + while (!stopped) + { + try + { + if (AlfrescoJGroupsChannelFactory.isClusterActive()) + { + Message message = getCachePeersMessage(); + if (logger.isDebugEnabled()) + { + logger.debug("\n" + + "Sending cache peer URLs heartbeat: \n" + + " Message: " + message + "\n" + + " Peers: " + new String(message.getBuffer())); + } + heartbeatChannel.send(message); + // Record the source address + setHeartbeatSourceAddress(message.getSrc()); + } + } + catch (Throwable e) + { + logger.debug("Heartbeat sending failed: ", e); + } + // Wait for the next heartbeat + synchronized (this) + { + try + { + wait(heartBeatInterval); + } + catch (InterruptedException e) + { + } + } + } + } + + /** + * Gets the message containing the peer URLs to send in the next heartbeat. + */ + private Message getCachePeersMessage() + { + @SuppressWarnings("unchecked") + List localCachePeers = cacheManager.getCachePeerListener().getBoundCachePeers(); + int newCachePeersHash = localCachePeers.hashCode(); + if (heartBeatMessage == null || lastCachePeersHash != newCachePeersHash) + { + lastCachePeersHash = newCachePeersHash; + + String peerUrls = assembleUrlList(localCachePeers); + JGroupsKeepAliveHeartbeatSender.setLastHeartbeatSendUrls(peerUrls); + // Convert to message + byte[] peerUrlsBytes = peerUrls.getBytes(); + heartBeatMessage = new Message(null, null, peerUrlsBytes); + if (logger.isDebugEnabled()) + { + logger.debug("\n" + + "Heartbeat message updated: \n" + + " URLs: " + peerUrls + "\n" + + " Message: " + heartBeatMessage); + } + } + return heartBeatMessage; + } + + /** + * Builds a String of cache peer URLs of the form url1|url2|url3 + */ + public String assembleUrlList(List localCachePeers) + { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < localCachePeers.size(); i++) + { + CachePeer cachePeer = (CachePeer) localCachePeers.get(i); + String rmiUrl = null; + try + { + rmiUrl = cachePeer.getUrl(); + } + catch (RemoteException e) + { + logger.error("This should never be thrown as it is called locally"); + } + if (i != localCachePeers.size() - 1) + { + sb.append(rmiUrl).append(URL_DELIMITER); + } + else + { + sb.append(rmiUrl); + } + } + return sb.toString(); + } + } +} diff --git a/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepaliveHeartbeatReceiver.java b/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepaliveHeartbeatReceiver.java new file mode 100644 index 0000000000..01ff41c885 --- /dev/null +++ b/source/java/org/alfresco/repo/cache/jgroups/JGroupsKeepaliveHeartbeatReceiver.java @@ -0,0 +1,230 @@ +/* + * Copyright (C) 2005-2008 Alfresco Software Limited. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + * As a special exception to the terms and conditions of version 2.0 of + * the GPL, you may redistribute this Program in connection with Free/Libre + * and Open Source Software ("FLOSS") applications as described in Alfresco's + * FLOSS exception. You should have recieved a copy of the text describing + * the FLOSS exception, and it is also available here: + * http://www.alfresco.com/legal/licensing" + */ +package org.alfresco.repo.cache.jgroups; + +import net.sf.ehcache.distribution.MulticastKeepaliveHeartbeatSender; + +/** + * Receives heartbeats from any {@link MulticastKeepaliveHeartbeatSender}s out there. + *

+ * Our own multicast heartbeats are ignored. + * + * @author Greg Luck + * @version $Id: MulticastKeepaliveHeartbeatReceiver.java 556 2007-10-29 02:06:30Z gregluck $ + */ +public abstract class JGroupsKeepaliveHeartbeatReceiver +{ +// private static final Log LOG = LogFactory.getLog(MulticastKeepaliveHeartbeatReceiver.class.getName()); +// +// private ExecutorService processingThreadPool; +// private Set rmiUrlsProcessingQueue = Collections.synchronizedSet(new HashSet()); +// private final InetAddress groupMulticastAddress; +// private final Integer groupMulticastPort; +// private MulticastReceiverThread receiverThread; +// private MulticastSocket socket; +// private boolean stopped; +// private final MulticastRMICacheManagerPeerProvider peerProvider; +// +// /** +// * Constructor. +// * +// * @param peerProvider +// * @param multicastAddress +// * @param multicastPort +// */ +// public MulticastKeepaliveHeartbeatReceiver( +// MulticastRMICacheManagerPeerProvider peerProvider, InetAddress multicastAddress, Integer multicastPort) { +// this.peerProvider = peerProvider; +// this.groupMulticastAddress = multicastAddress; +// this.groupMulticastPort = multicastPort; +// } +// +// /** +// * Start. +// * +// * @throws IOException +// */ +// final void init() throws IOException { +// socket = new MulticastSocket(groupMulticastPort.intValue()); +// socket.joinGroup(groupMulticastAddress); +// receiverThread = new MulticastReceiverThread(); +// receiverThread.start(); +// processingThreadPool = Executors.newCachedThreadPool(); +// } +// +// /** +// * Shutdown the heartbeat. +// */ +// public final void dispose() { +// LOG.debug("dispose called"); +// processingThreadPool.shutdownNow(); +// stopped = true; +// receiverThread.interrupt(); +// } +// +// /** +// * A multicast receiver which continously receives heartbeats. +// */ +// private final class MulticastReceiverThread extends Thread { +// +// /** +// * Constructor +// */ +// public MulticastReceiverThread() { +// super("Multicast Heartbeat Receiver Thread"); +// setDaemon(true); +// } +// +// public final void run() { +// byte[] buf = new byte[PayloadUtil.MTU]; +// try { +// while (!stopped) { +// DatagramPacket packet = new DatagramPacket(buf, buf.length); +// try { +// socket.receive(packet); +// byte[] payload = packet.getData(); +// processPayload(payload); +// +// +// } catch (IOException e) { +// if (!stopped) { +// LOG.error("Error receiving heartbeat. " + e.getMessage() + +// ". Initial cause was " + e.getMessage(), e); +// } +// } +// } +// } catch (Throwable t) { +// LOG.error("Multicast receiver thread caught throwable. Cause was " + t.getMessage() + ". Continuing..."); +// } +// } +// +// private void processPayload(byte[] compressedPayload) { +// byte[] payload = PayloadUtil.ungzip(compressedPayload); +// String rmiUrls = new String(payload); +// if (self(rmiUrls)) { +// return; +// } +// rmiUrls = rmiUrls.trim(); +// if (LOG.isTraceEnabled()) { +// LOG.trace("rmiUrls received " + rmiUrls); +// } +// processRmiUrls(rmiUrls); +// } +// +// /** +// * This method forks a new executor to process the received heartbeat in a thread pool. +// * That way each remote cache manager cannot interfere with others. +// *

+// * In the worst case, we have as many concurrent threads as remote cache managers. +// * +// * @param rmiUrls +// */ +// private void processRmiUrls(final String rmiUrls) { +// if (rmiUrlsProcessingQueue.contains(rmiUrls)) { +// if (LOG.isDebugEnabled()) { +// LOG.debug("We are already processing these rmiUrls. Another heartbeat came before we finished: " + rmiUrls); +// } +// return; +// } +// +// if (processingThreadPool == null) { +// return; +// } +// +// processingThreadPool.execute(new Runnable() { +// public void run() { +// try { +// // Add the rmiUrls we are processing. +// rmiUrlsProcessingQueue.add(rmiUrls); +// for (StringTokenizer stringTokenizer = new StringTokenizer(rmiUrls, +// PayloadUtil.URL_DELIMITER); stringTokenizer.hasMoreTokens();) { +// if (stopped) { +// return; +// } +// String rmiUrl = stringTokenizer.nextToken(); +// registerNotification(rmiUrl); +// if (!peerProvider.peerUrls.containsKey(rmiUrl)) { +// if (LOG.isDebugEnabled()) { +// LOG.debug("Aborting processing of rmiUrls since failed to add rmiUrl: " + rmiUrl); +// } +// return; +// } +// } +// } finally { +// // Remove the rmiUrls we just processed +// rmiUrlsProcessingQueue.remove(rmiUrls); +// } +// } +// }); +// } +// +// +// /** +// * @param rmiUrls +// * @return true if our own hostname and listener port are found in the list. This then means we have +// * caught our onw multicast, and should be ignored. +// */ +// private boolean self(String rmiUrls) { +// CacheManager cacheManager = peerProvider.getCacheManager(); +// CacheManagerPeerListener cacheManagerPeerListener = cacheManager.getCachePeerListener(); +// if (cacheManagerPeerListener == null) { +// return false; +// } +// List boundCachePeers = cacheManagerPeerListener.getBoundCachePeers(); +// if (boundCachePeers == null || boundCachePeers.size() == 0) { +// return false; +// } +// CachePeer peer = (CachePeer) boundCachePeers.get(0); +// String cacheManagerUrlBase = null; +// try { +// cacheManagerUrlBase = peer.getUrlBase(); +// } catch (RemoteException e) { +// LOG.error("Error geting url base"); +// } +// int baseUrlMatch = rmiUrls.indexOf(cacheManagerUrlBase); +// return baseUrlMatch != -1; +// } +// +// private void registerNotification(String rmiUrl) { +// peerProvider.registerPeer(rmiUrl); +// } +// +// +// /** +// * {@inheritDoc} +// */ +// public final void interrupt() { +// try { +// socket.leaveGroup(groupMulticastAddress); +// } catch (IOException e) { +// LOG.error("Error leaving group"); +// } +// socket.close(); +// super.interrupt(); +// } +// } +// +// +} diff --git a/source/java/org/alfresco/repo/cache/jgroups/JGroupsRMICacheManagerPeerProvider.java b/source/java/org/alfresco/repo/cache/jgroups/JGroupsRMICacheManagerPeerProvider.java new file mode 100644 index 0000000000..ff7f5d7984 --- /dev/null +++ b/source/java/org/alfresco/repo/cache/jgroups/JGroupsRMICacheManagerPeerProvider.java @@ -0,0 +1,449 @@ +/* + * Copyright (C) 2005-2008 Alfresco Software Limited. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + * As a special exception to the terms and conditions of version 2.0 of + * the GPL, you may redistribute this Program in connection with Free/Libre + * and Open Source Software ("FLOSS") applications as described in Alfresco's + * FLOSS exception. You should have recieved a copy of the text describing + * the FLOSS exception, and it is also available here: + * http://www.alfresco.com/legal/licensing" + */ +package org.alfresco.repo.cache.jgroups; + +import java.io.IOException; +import java.rmi.Naming; +import java.rmi.NotBoundException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import net.sf.ehcache.CacheException; +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.Ehcache; +import net.sf.ehcache.distribution.CacheManagerPeerProvider; +import net.sf.ehcache.distribution.CacheManagerPeerProviderFactory; +import net.sf.ehcache.distribution.CachePeer; + +import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jgroups.Channel; + +/** + * A cache peer provider that does heartbeat sending and receiving using JGroups. + * + * @author Derek Hulley + * @since 2.1.3 + */ +public class JGroupsRMICacheManagerPeerProvider implements CacheManagerPeerProvider +{ + public static final int DEFAULT_HEARTBEAT_INTERVAL = 5000; + public static final int MINIMUM_HEARTBEAT_INTERVAL = 1000; + /** + * the heartbeat signal time in milliseconds.
+ * The default is {@link #DEFAULT_HEARTBEAT_INTERVAL}. + */ + public static final String PROP_HEARTBEAT_INTERVAL = "heartbeatInterval"; + + private static Log logger = LogFactory.getLog(JGroupsRMICacheManagerPeerProvider.class); + + private final JGroupsKeepAliveHeartbeatSender heartbeatSender; + private final JGroupsKeepAliveHeartbeatReceiver heartbeatReceiver; + + /** + * Store the entries referenced first by cache name and then by the RMI URL. + * This looks like duplicated data, but the fastest lookup needs to be a retrieval of + * the list of URLs for a given cache. All other access is infrequent. + */ + private final Map> cachePeersByUrlByCacheName; + + private final long staleAge; + private final ReadLock peersReadLock; + private final WriteLock peersWriteLock; + + public JGroupsRMICacheManagerPeerProvider(CacheManager cacheManager, Channel heartbeatChannel, long heartbeatInterval) + { + this.heartbeatSender = new JGroupsKeepAliveHeartbeatSender(cacheManager, heartbeatChannel, heartbeatInterval); + this.heartbeatReceiver = new JGroupsKeepAliveHeartbeatReceiver(this, heartbeatSender, heartbeatChannel); + + cachePeersByUrlByCacheName = new HashMap>(103); + + ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); + peersReadLock = readWriteLock.readLock(); + peersWriteLock = readWriteLock.writeLock(); + + // Calculate the age that a peer entry must be before we consider it stale. + // This is the method used in the Multicast EHCache code, so I guess it must be OK... + // Ofcourse, it's better to send to peers that are broken than to evict peers when they + // are just having some trouble sending their heartbeats for some reason. + staleAge = (heartbeatInterval * 2 + 100) * 1000 * 1000; + } + + public void init() + { + heartbeatSender.init(); + heartbeatReceiver.init(); + } + + private String extractCacheName(String rmiUrl) + { + return rmiUrl.substring(rmiUrl.lastIndexOf('/') + 1); + } + + /** + * Lazy map creation using appropriate synchronization + * @return the entry if it exists otherwise null + */ + private CachePeerEntry getCachePeerEntry(String cacheName, String rmiUrl) + { + Map peerEntriesByUrl = getPeerEntriesByUrl(cacheName); + + peersReadLock.lock(); + try + { + return peerEntriesByUrl.get(rmiUrl); + } + finally + { + peersReadLock.unlock(); + } + } + + /** + * Lazy map creation using appropriate synchronization + * @return never null + */ + private Map getPeerEntriesByUrl(String cacheName) + { + Map peerEntriesByUrl; + peersReadLock.lock(); + try + { + peerEntriesByUrl = cachePeersByUrlByCacheName.get(cacheName); + if (peerEntriesByUrl != null) + { + return peerEntriesByUrl; + } + } + finally + { + peersReadLock.unlock(); + } + peersWriteLock.lock(); + try + { + // Double check in the write lock + peerEntriesByUrl = cachePeersByUrlByCacheName.get(cacheName); + if (peerEntriesByUrl != null) + { + return peerEntriesByUrl; + } + peerEntriesByUrl = new HashMap(7); + cachePeersByUrlByCacheName.put(cacheName, peerEntriesByUrl); + return peerEntriesByUrl; + } + finally + { + peersWriteLock.unlock(); + } + } + + /** + * Performs the actual RMI setup necessary to create a cache peer. + * @param rmiUrl the RMI URL of the peer + * @return Returns the cache peer + */ + /* Always called from a write block */ + private /*synchronized*/ CachePeer registerPeerImpl(String rmiUrl) + { + try + { + CachePeer cachePeer = (CachePeer) Naming.lookup(rmiUrl); + return cachePeer; + } + catch (NotBoundException e) // Pretty ordinary + { + if (logger.isDebugEnabled()) + { + logger.debug( + "Unable to lookup remote cache peer for " + rmiUrl + ". " + + "Removing from peer list.", + e); + } + } + catch (IOException e) // Some network issue + { + if (logger.isDebugEnabled()) + { + logger.debug( + "Unable to lookup remote cache peer for " + rmiUrl + ". " + + "Removing from peer list.", + e); + } + } + catch (Throwable e) // More serious + { + logger.error( + "Unable to lookup remote cache peer for " + rmiUrl + ". " + + "Removing from peer list.", + e); + } + // Only errors + return null; + } + + public void registerPeer(String rmiUrl) + { + String cacheName = extractCacheName(rmiUrl); + CachePeerEntry peerEntry = getCachePeerEntry(cacheName, rmiUrl); + if (peerEntry != null && !peerEntry.isStale(staleAge)) + { + // It is already there and is still current + peerEntry.updateTimestamp(); + return; + } + // There is no entry + peersWriteLock.lock(); + try + { + // Double check + peerEntry = getCachePeerEntry(cacheName, rmiUrl); + if (peerEntry != null && !peerEntry.isStale(staleAge)) + { + // It has just appeared. Ofcourse, it will be current + peerEntry.updateTimestamp(); + return; + } + // Create a new one + CachePeer cachePeer = registerPeerImpl(rmiUrl); + if (cachePeer == null) + { + // It can be null, ie. the RMI URL is not valid. + // This is not an error and we just ignore it + return; + } + // Cache it + peerEntry = new CachePeerEntry(cachePeer, rmiUrl); + Map peerEntriesByUrl = getPeerEntriesByUrl(cacheName); + peerEntriesByUrl.put(rmiUrl, peerEntry); + // Done + if (logger.isDebugEnabled()) + { + logger.debug("Registered new cache peer with URL: " + rmiUrl); + } + } + finally + { + peersWriteLock.unlock(); + } + } + + public void unregisterPeer(String rmiUrl) + { + String cacheName = extractCacheName(rmiUrl); + Map peerEntriesByUrl = getPeerEntriesByUrl(cacheName); + peersWriteLock.lock(); + try + { + peerEntriesByUrl.remove(rmiUrl); + // Done + if (logger.isDebugEnabled()) + { + logger.debug("Unregistered cache peer with URL: " + rmiUrl); + } + } + finally + { + peersWriteLock.unlock(); + } + } + + public List listRemoteCachePeers(Ehcache cache) throws CacheException + { + String cacheName = cache.getName(); + Map peerEntriesByUrl = getPeerEntriesByUrl(cacheName); + List cachePeers = new ArrayList(peerEntriesByUrl.size()); + List staleUrlEntries = null; + peersReadLock.lock(); + try + { + for (CachePeerEntry peerEntry : peerEntriesByUrl.values()) + { + if (peerEntry.isStale(staleAge)) + { + if (staleUrlEntries == null) + { + staleUrlEntries = new ArrayList(4); + } + // Old + continue; + } + cachePeers.add(peerEntry.getCachePeer()); + } + } + finally + { + peersReadLock.unlock(); + } + // Clean up stale URL entries + if (staleUrlEntries != null) + { + for (String rmiUrl : staleUrlEntries) + { + unregisterPeer(rmiUrl); + } + } + // Done + return cachePeers; + } + + protected boolean stale(Date date) + { + throw new UnsupportedOperationException(); + } + + public long getTimeForClusterToForm() + { + throw new UnsupportedOperationException(); + } + + /** + * Drops all the peer references + */ + public void dispose() throws CacheException + { + peersWriteLock.lock(); + try + { + //TODO + } + finally + { + peersWriteLock.unlock(); + } + } + + /** + * A factory that can be given in the ehcache-default.xml or ehcache-custom.xml. + * When using this factory, it is not necessary to provide individual cache + * cluster configurations provided that the Alfresco bootstrap class, AlfrescoEhCacheBootstrap, + * is used. + * + * @author Derek Hulley + * @since 2.1.3 + */ + public static class Factory extends CacheManagerPeerProviderFactory + { + @Override + public CacheManagerPeerProvider createCachePeerProvider(CacheManager cacheManager, Properties properties) + { + if (properties == null) + { + properties = new Properties(); + } + + Channel channel = AlfrescoJGroupsChannelFactory.getChannel(AlfrescoJGroupsChannelFactory.APP_REGION_EHCACHE_HEARTBEAT); + + long heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL; + try + { + String heartBeatIntervalStr = properties.getProperty(PROP_HEARTBEAT_INTERVAL); + if (heartBeatIntervalStr != null) + { + heartbeatInterval = Long.parseLong(heartBeatIntervalStr); + } + } + catch (NumberFormatException e) + { + throw new RuntimeException( + "The property " + PROP_HEARTBEAT_INTERVAL + + " must be a valid integer greater than " + MINIMUM_HEARTBEAT_INTERVAL); + } + + if (heartbeatInterval < MINIMUM_HEARTBEAT_INTERVAL) + { + throw new RuntimeException( + "The minimum value for property " + PROP_HEARTBEAT_INTERVAL + + " is " + MINIMUM_HEARTBEAT_INTERVAL + "ms"); + } + return new JGroupsRMICacheManagerPeerProvider(cacheManager, channel, heartbeatInterval); + } + } + + /** + * Map entry to keep references to EHCache peers along with the necessary timestamping. + * + * @author Derek Hulley + * @since 2.1.3 + */ + public static class CachePeerEntry + { + private final CachePeer cachePeer; + private String rmiUrl; + private long timestamp; + + /** + * @param cachePeer the remote cache peer + * @param rmiUrl the RMI URL for the peer + */ + public CachePeerEntry(CachePeer cachePeer, String rmiUrl) + { + this.cachePeer = cachePeer; + this.rmiUrl = rmiUrl; + this.timestamp = System.nanoTime(); + } + + @Override + public String toString() + { + return rmiUrl; + } + + public CachePeer getCachePeer() + { + return cachePeer; + } + + public String getRmiUrl() + { + return rmiUrl; + } + + /** + * Refreshes the peer's timestamp. + */ + public void updateTimestamp() + { + timestamp = System.nanoTime(); + } + + /** + * @param age the maximum age (nanoseconds) before the peer is considered old + * @return Returns true if the cache peer is older than the given time (nanoseconds) + */ + public boolean isStale(long age) + { + return (System.nanoTime() - age) > timestamp; + } + } +} diff --git a/source/java/org/alfresco/repo/jgroups/AlfrescoJChannelFactory.java b/source/java/org/alfresco/repo/jgroups/AlfrescoJChannelFactory.java new file mode 100644 index 0000000000..df6e45eb18 --- /dev/null +++ b/source/java/org/alfresco/repo/jgroups/AlfrescoJChannelFactory.java @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2005-2008 Alfresco Software Limited. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + * As a special exception to the terms and conditions of version 2.0 of + * the GPL, you may redistribute this Program in connection with Free/Libre + * and Open Source Software ("FLOSS") applications as described in Alfresco's + * FLOSS exception. You should have recieved a copy of the text describing + * the FLOSS exception, and it is also available here: + * http://www.alfresco.com/legal/licensing" + */ +package org.alfresco.repo.jgroups; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.jgroups.JChannel; + +/** + * A cache peer provider that does heartbeat sending and receiving using JGroups. + * + * @author Derek Hulley + * @since 2.1.3 + */ +public class AlfrescoJChannelFactory +{ + private static final Map channels; + + static + { + channels = new ConcurrentHashMap(5); + } + + /** + * + * @param clusterName the name of the cluster. The effectively groups the + * machines that will be interacting with each other. + * @param applicationRegion the application region. + * @return + */ + public static JChannel getChannel(String clusterName, String applicationRegion) + { + /* + * Synchronization is handled by the Map + */ + StringBuilder sb = new StringBuilder(100).append(clusterName).append("-").append(applicationRegion); + String fullClusterName = sb.toString(); + // Get the channel + JChannel channel = channels.get(fullClusterName); + } + + private static synchronized void newChannel(String clusterName, String clusterRegion) + { + + } + + /** + * TODO: Make into a Spring factory bean so that it can be used as a direct bean reference + */ + private AlfrescoJChannelFactory() + { + } + +} diff --git a/source/java/org/alfresco/repo/jgroups/AlfrescoJChannelFactoryTest.java b/source/java/org/alfresco/repo/jgroups/AlfrescoJChannelFactoryTest.java new file mode 100644 index 0000000000..adbf6c6ce3 --- /dev/null +++ b/source/java/org/alfresco/repo/jgroups/AlfrescoJChannelFactoryTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2005-2008 Alfresco Software Limited. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + * As a special exception to the terms and conditions of version 2.0 of + * the GPL, you may redistribute this Program in connection with Free/Libre + * and Open Source Software ("FLOSS") applications as described in Alfresco's + * FLOSS exception. You should have recieved a copy of the text describing + * the FLOSS exception, and it is also available here: + * http://www.alfresco.com/legal/licensing" + */ +package org.alfresco.repo.jgroups; + +import org.jgroups.Channel; +import org.jgroups.Message; + +import junit.framework.TestCase; + +/** + * @see AlfrescoJChannelFactoryTest + * + * @author Derek Hulley + * @since 2.1.3 + */ +public class AlfrescoJChannelFactoryTest extends TestCase +{ + private static byte[] bytes = new byte[65536]; + static + { + for (int i = 0; i < bytes.length; i++) + { + bytes[i] = 1; + } + } + + private AlfrescoJGroupsChannelFactory factory; + private String appRegion; + + @Override + protected void setUp() throws Exception + { + factory = new AlfrescoJGroupsChannelFactory(); + appRegion = getName(); + } + + /** + * Check that the channel is behaving + */ + private void stressChannel(Channel channel) throws Exception + { + System.out.println("Test: " + getName()); + System.out.println(" Channel: " + channel); + System.out.println(" Cluster: " + channel.getClusterName()); + channel.send(null, null, Boolean.TRUE); + channel.send(new Message(null, null, bytes)); + } + + public void testNoCluster() throws Exception + { + Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion); + stressChannel(channel); + } + + public void testBasicCluster() throws Exception + { + AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("blah"); + Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion); + stressChannel(channel); + } + + public void testHotSwapCluster() throws Exception + { + AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("ONE"); + Channel channel1 = AlfrescoJGroupsChannelFactory.getChannel(appRegion); + stressChannel(channel1); + AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("TWO"); + Channel channel2 = AlfrescoJGroupsChannelFactory.getChannel(appRegion); + stressChannel(channel1); + assertTrue("Channel reference must be the same", channel1 == channel2); + } +} diff --git a/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java new file mode 100644 index 0000000000..4452d845c5 --- /dev/null +++ b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactory.java @@ -0,0 +1,862 @@ +/* + * Copyright (C) 2005-2008 Alfresco Software Limited. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + * As a special exception to the terms and conditions of version 2.0 of + * the GPL, you may redistribute this Program in connection with Free/Libre + * and Open Source Software ("FLOSS") applications as described in Alfresco's + * FLOSS exception. You should have recieved a copy of the text describing + * the FLOSS exception, and it is also available here: + * http://www.alfresco.com/legal/licensing" + */ +package org.alfresco.repo.jgroups; + +import java.io.FileNotFoundException; +import java.io.Serializable; +import java.net.URL; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.alfresco.error.AlfrescoRuntimeException; +import org.alfresco.util.AbstractLifecycleBean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jgroups.Address; +import org.jgroups.Channel; +import org.jgroups.ChannelClosedException; +import org.jgroups.ChannelException; +import org.jgroups.ChannelListener; +import org.jgroups.ChannelNotConnectedException; +import org.jgroups.Event; +import org.jgroups.JChannel; +import org.jgroups.JChannelFactory; +import org.jgroups.Message; +import org.jgroups.Receiver; +import org.jgroups.TimeoutException; +import org.jgroups.UpHandler; +import org.jgroups.View; +import org.springframework.context.ApplicationEvent; +import org.springframework.util.ResourceUtils; + +/** + * A cache peer provider that does heartbeat sending and receiving using JGroups. + *

+ * The cluster name needs to be set before any communication is possible. This can be done using the + * system property
+ * {@link #PROP_CLUSTER_NAME_PREFIX -Dalfresco.cluster-name-prefix}=MyCluster + * or by declaring a bean + *

+ *    
+ *       
+ *          MyCluster
+ *       
+ *    
+ * 
+ *

+ * The channels provided to the callers will be proxies to underlying channels that will be hot-swappable. + * This means that the client code can continue to use the channel references while the actual + * implementation can be switched in and out as required. + * + * @see #PROP_CLUSTER_NAME_PREFIX + * + * @author Derek Hulley + * @since 2.1.3 + */ +public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean +{ + /** A catch-all for unknown application regions. */ + public static final String APP_REGION_DEFAULT = "DEFAULT"; + /** The application region used by the EHCache heartbeat implementation over JGroups. */ + public static final String APP_REGION_EHCACHE_HEARTBEAT = "EHCACHE_HEARTBEAT"; + /** The UDP protocol stack (default) */ + public static final String PROTOCOL_STACK_UDP = "UDP"; + /** The TCP protocol stack */ + public static final String PROTOCOL_STACK_TCP = "TCP"; + + + public static final String PROP_CLUSTER_NAME_PREFIX = "alfresco.cluster-name-prefix"; + public static final String CUSTOM_CONFIGURATION_FILE = "classpath:alfresco/extension/jgroups-custom.xml"; + public static final String DEFAULT_CONFIGURATION_FILE = "classpath:alfresco/jgroups-default.xml"; + + private static Log logger = LogFactory.getLog(AlfrescoJGroupsChannelFactory.class); + + // Synchronization locks + private static ReadLock readLock; + private static WriteLock writeLock; + + // Values that are modified by the bean implementation + private static String clusterNamePrefix; + private static URL configUrl; + private static Map stacksByAppRegion; + + // Derived data + /** A map that stores channel information by the application region. */ + private static final Map channels; + private static JChannelFactory channelFactory; + + static + { + ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); // Fair + readLock = readWriteLock.readLock(); + writeLock = readWriteLock.writeLock(); + + channels = new HashMap(5); + + clusterNamePrefix = null; + configUrl = null; + stacksByAppRegion = new HashMap(5); + stacksByAppRegion.put( + AlfrescoJGroupsChannelFactory.APP_REGION_EHCACHE_HEARTBEAT, + AlfrescoJGroupsChannelFactory.PROTOCOL_STACK_UDP); + stacksByAppRegion.put( + AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT, + AlfrescoJGroupsChannelFactory.PROTOCOL_STACK_UDP); + } + + /** + * Check if a cluster name was provided. + * + * @return Returns true if the cluster configuration is active, + * i.e. a cluster name was provided + */ + public static boolean isClusterActive() + { + readLock.lock(); + try + { + return clusterNamePrefix != null; + } + finally + { + readLock.unlock(); + } + } + + /** + * Close all channels. All the channels will continue to function, but will be replaced + * internally with dummy channels. Effectively, all the cluster communications will be + * closed down. + */ + private static void closeChannels() + { + changeClusterNamePrefix(null); + } + + /** + * Creates a channel for the cluster. This method should not be heavily used + * as the checks and synchronizations will slow the calls. Returns channels can be + * kept and will be modified directly using the factory-held references, if necessary. + *

+ * The application region is used to determine the protocol stack to apply. + *

+ * This method returns a dummy channel if no cluster name has been provided. + * + * @param appRegion the application region identifier. + * @return Returns a channel + */ + public static Channel getChannel(String appRegion) + { + readLock.lock(); + try + { + ChannelProxy channelProxy = channels.get(appRegion); + if (channelProxy != null) + { + // This will do + return channelProxy; + } + } + finally + { + readLock.unlock(); + } + // Being here means that there is no channel yet + // Go write + writeLock.lock(); + try + { + // Double check + ChannelProxy channelProxy = channels.get(appRegion); + if (channelProxy != null) + { + // This will do + return channelProxy; + } + // Get the channel + Channel channel = getChannelInternal(appRegion); + // Proxy the channel + channelProxy = new ChannelProxy(channel); + // Store the channel to the map + channels.put(appRegion, channelProxy); + // Done + return channelProxy; + } + finally + { + writeLock.unlock(); + } + } + + /** + * Creates a channel for the given cluster. The application region is used + * to determine the protocol stack to apply. + * + * @param appRegion the application region identifier. + * @return Returns a channel + */ + /* All calls to this are ultimately wrapped in the writeLock. */ + private static /*synchronized*/ Channel getChannelInternal(String appRegion) + { + Channel channel; + // If there is no cluster defined (yet) then we define a dummy channel + if (AlfrescoJGroupsChannelFactory.clusterNamePrefix == null) + { + try + { + channel = new DummyJChannel(); + } + catch (Throwable e) + { + throw new AlfrescoRuntimeException( + "Failed to create dummy JGroups channel: \n" + + " Cluster prefix: " + clusterNamePrefix + "\n" + + " App region: " + appRegion, + e); + } + } + else // Create real channel + { + JChannelFactory channelFactory = getChannelFactory(); + // Get the protocol stack to use + String stack = stacksByAppRegion.get(appRegion); + if (stack == null) + { + stack = stacksByAppRegion.get(AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT); + } + if (stack == null) + { + throw new AlfrescoRuntimeException( + "No protocol stack was found for application region: \n" + + " Cluster prefix: " + clusterNamePrefix + "\n" + + " App region: " + appRegion + "\n" + + " Regions defined: " + stacksByAppRegion); + } + try + { + // Get the stack config from the factory (we are not using MUX) + String config = channelFactory.getConfig(stack); + channel = new JChannel(config); + } + catch (Throwable e) + { + throw new AlfrescoRuntimeException( + "Failed to create JGroups channel: \n" + + " Cluster prefix: " + clusterNamePrefix + "\n" + + " App region: " + appRegion + "\n" + + " Protocol stack: " + stack + "\n" + + " Configuration URL: " + AlfrescoJGroupsChannelFactory.configUrl, + e); + } + } + // Initialise the channel + try + { + String clusterName = clusterNamePrefix + ":" + appRegion; + // Set reconnect property + channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE); + // Don't accept messages from self + channel.setOpt(Channel.LOCAL, Boolean.FALSE); + // No state transfer + channel.setOpt(Channel.AUTO_GETSTATE, Boolean.FALSE); + // Connect + channel.connect(clusterName, null, null, 5000L); + // Done + if (logger.isDebugEnabled()) + { + logger.debug("\n" + + "Created JGroups channel: \n" + + " Cluster prefix: " + clusterNamePrefix + "\n" + + " App region: " + appRegion + "\n" + + " Channel: " + channel + "\n" + + " Configuration URL: " + AlfrescoJGroupsChannelFactory.configUrl); + } + } + catch (Throwable e) + { + throw new AlfrescoRuntimeException( + "Failed to initialise JGroups channel: \n" + + " Cluster prefix: " + clusterNamePrefix + "\n" + + " App region: " + appRegion + "\n" + + " Channel: " + channel + "\n" + + " Configuration URL: " + AlfrescoJGroupsChannelFactory.configUrl, + e); + } + return channel; + } + + /** + * Builds and initializes a JChannelFactory + */ + /* All calls to this are ultimately wrapped in the writeLock. */ + private static /*synchronized*/ JChannelFactory getChannelFactory() + { + if (AlfrescoJGroupsChannelFactory.channelFactory != null) + { + return AlfrescoJGroupsChannelFactory.channelFactory; + } + // Set the config location to use + if (AlfrescoJGroupsChannelFactory.configUrl == null) + { + // This was not set by the bean so set it using the default mechanism + try + { + AlfrescoJGroupsChannelFactory.configUrl = ResourceUtils.getURL(CUSTOM_CONFIGURATION_FILE); + } + catch (FileNotFoundException e) + { + // try the alfresco default + try + { + AlfrescoJGroupsChannelFactory.configUrl = ResourceUtils.getURL(DEFAULT_CONFIGURATION_FILE); + } + catch (FileNotFoundException ee) + { + throw new AlfrescoRuntimeException("Missing default JGroups config: " + DEFAULT_CONFIGURATION_FILE); + } + } + } + try + { + // Construct factory + AlfrescoJGroupsChannelFactory.channelFactory = new JChannelFactory(); + channelFactory.setMultiplexerConfig(AlfrescoJGroupsChannelFactory.configUrl); + } + catch (Throwable e) + { + throw new AlfrescoRuntimeException( + "Failed to construct JChannelFactory using config: " + AlfrescoJGroupsChannelFactory.configUrl, + e); + } + // done + if (logger.isDebugEnabled()) + { + logger.debug("\n" + + "Created JChannelFactory: \n" + + " configuration: " + AlfrescoJGroupsChannelFactory.configUrl); + } + return AlfrescoJGroupsChannelFactory.channelFactory; + } + + /** + * Throw away all calculated values and rebuild. This means that the channel factory will + * be reconstructed from scratch. All the channels are reconstructed - but this will not + * affect any references to channels held outside this class as the values returned are proxies + * on top of hot swappable implementations. + */ + /* All calls to this are ultimately wrapped in the writeLock. */ + private static /*synchronized*/ void rebuildChannels() + { + // First throw away the channel factory. It will be fetched lazily. + AlfrescoJGroupsChannelFactory.channelFactory = null; + + // Reprocess all the application regions with the new data + for (Map.Entry entry : channels.entrySet()) + { + String appRegion = entry.getKey(); + ChannelProxy channelProxy = entry.getValue(); + + // Create the new channel + Channel newChannel = getChannelInternal(appRegion); + + // Now do the hot-swap + Channel oldChannel = channelProxy.swap(newChannel); + // Close the old channel + try + { + oldChannel.close(); + } + catch (Throwable e) + { + logger.warn( + "Unable to close old channel during channel rebuild: \n" + + " Old channel: " + oldChannel, + e); + } + } + } + + /** + * Set the prefix used to identify the different clusters. Each application region will + * have a separate cluster name that will be: + *

+     *    clusterNamePrefix:appRegion
+     * 
+ * If no cluster name prefix is declared, the cluster is effectively disabled. + * + * @param clusterNamePrefix a prefix to append to the cluster names used + */ + public static void changeClusterNamePrefix(String clusterNamePrefix) + { + writeLock.lock(); + try + { + if (clusterNamePrefix == null || clusterNamePrefix.length() == 0) + { + // Clear everything out + AlfrescoJGroupsChannelFactory.clusterNamePrefix = null; + } + AlfrescoJGroupsChannelFactory.clusterNamePrefix = clusterNamePrefix; + } + finally + { + writeLock.unlock(); + } + } + + /** + * Configure a mapping between the application regions and the available JGroup protocol stacks. + * The map must contain a mapping for application region 'DEFAULT'. + * + * @param protocolMap a mapping from application region (keys) to protocol stacks (values) + */ + public static void changeProtocolStackMapping(Map protocolMap) + { + writeLock.lock(); + try + { + // Check that there is a mapping for default + if (!protocolMap.containsKey(AlfrescoJGroupsChannelFactory.APP_REGION_DEFAULT)) + { + throw new AlfrescoRuntimeException("A protocol stack must be defined for 'DEFAULT'"); + } + stacksByAppRegion = protocolMap; + } + finally + { + writeLock.unlock(); + } + } + + /** + * Set the URL location of the JGroups configuration file. This must refer to a MUX-compatible + * configuration file. + * + * @param configUrl a url of the form file:... or classpath: + */ + public static void changeJgroupsConfigurationUrl(String configUrl) + { + writeLock.lock(); + try + { + AlfrescoJGroupsChannelFactory.configUrl = ResourceUtils.getURL(configUrl); + } + catch (FileNotFoundException e) + { + throw new AlfrescoRuntimeException( + "Failed to set property 'jgroupsConfigurationUrl'. The url is invalid: " + configUrl, + e); + } + finally + { + writeLock.unlock(); + } + } + + /** + * Bean-enabling constructor + */ + public AlfrescoJGroupsChannelFactory() + { + } + + /** + * @see AlfrescoJGroupsChannelFactory#changeClusterName(String) + */ + public void setClusterNamePrefix(String clusterNamePrefix) + { + AlfrescoJGroupsChannelFactory.changeClusterNamePrefix(clusterNamePrefix); + } + + /** + * @see AlfrescoJGroupsChannelFactory#changeProtocolStackMapping(Map) + */ + public void setProtocolStackMapping(Map protocolMap) + { + AlfrescoJGroupsChannelFactory.changeProtocolStackMapping(protocolMap); + } + + /** + * Set the URL location of the JGroups configuration file. This must refer to a MUX-compatible + * configuration file. + * + * @param configUrl a url of the form file:... or classpath: + */ + public void setJgroupsConfigurationUrl(String configUrl) + { + try + { + AlfrescoJGroupsChannelFactory.configUrl = ResourceUtils.getURL(configUrl); + } + catch (FileNotFoundException e) + { + throw new AlfrescoRuntimeException( + "Failed to set property 'jgroupsConfigurationUrl'. The url is invalid: " + configUrl, + e); + } + } + + @Override + protected void onBootstrap(ApplicationEvent event) + { + AlfrescoJGroupsChannelFactory.rebuildChannels(); + } + + @Override + protected void onShutdown(ApplicationEvent event) + { + AlfrescoJGroupsChannelFactory.closeChannels(); + } + + /** + * A no-op JChannel using the "DUMMY_TP" protocol only + * + * @author Derek Hulley + * @since 2.1.3 + */ + private static class DummyJChannel extends JChannel + { + public DummyJChannel() throws ChannelException + { + super("DUMMY_TP:UDP(mcast_addr=224.10.10.200;mcast_port=5679)"); + } + } + + /** + * A proxy channel that can be used to hot-swap underlying channels. All listeners + * and the receiver will be carried over to the new underlying channel when it is + * swapped out. + * + * @author Derek Hulley + */ + @SuppressWarnings("deprecation") + public static class ChannelProxy extends Channel + { + /* + * Not synchronizing. Mostly swapping will be VERY rare and if there is a bit + * of inconsistency it is not important. + */ + private Channel delegate; + private UpHandler delegateUpHandler; + private Set delegateChannelListeners; + private Receiver delegateReceiver; + + public ChannelProxy(Channel delegate) + { + this.delegate = delegate; + this.delegateChannelListeners = new HashSet(7); + } + + /** + * Swap the channel. The old delegate will be disconnected before the swap occurs. + * This guarantees data consistency, assuming that any failures will be handled. + * + * @param the new delegate + * @return the old, disconnected delegate + */ + public Channel swap(Channel channel) + { + // Remove the listeners from the old channel + delegate.setReceiver(null); + for (ChannelListener delegateChannelListener : delegateChannelListeners) + { + delegate.removeChannelListener(delegateChannelListener); + } + delegate.setUpHandler(null); + + // Close the old delegate + delegate.close(); + Channel oldDelegage = delegate; + + // Assign the new delegate and carry the listeners over + delegate = channel; + delegate.setReceiver(delegateReceiver); + for (ChannelListener delegateChannelListener : delegateChannelListeners) + { + delegate.addChannelListener(delegateChannelListener); + } + delegate.setUpHandler(delegateUpHandler); + // Done + return oldDelegage; + } + + @Override + protected Log getLog() + { + throw new UnsupportedOperationException(); + } + + public void setReceiver(Receiver r) + { + delegateReceiver = r; + delegate.setReceiver(r); + } + + public void addChannelListener(ChannelListener listener) + { + if (listener == null) + { + return; + } + delegateChannelListeners.add(listener); + delegate.addChannelListener(listener); + } + + public void removeChannelListener(ChannelListener listener) + { + if (listener != null) + { + delegateChannelListeners.remove(listener); + } + delegate.removeChannelListener(listener); + } + + public void clearChannelListeners() + { + delegateChannelListeners.clear(); + delegate.clearChannelListeners(); + } + + public void setUpHandler(UpHandler up_handler) + { + delegateUpHandler = up_handler; + delegate.setUpHandler(up_handler); + } + + public void blockOk() + { + delegate.blockOk(); + } + + public void close() + { + delegate.close(); + } + + public void connect(String cluster_name, Address target, String state_id, long timeout) throws ChannelException + { + delegate.connect(cluster_name, target, state_id, timeout); + } + + public void connect(String cluster_name) throws ChannelException + { + delegate.connect(cluster_name); + } + + public void disconnect() + { + delegate.disconnect(); + } + + public void down(Event evt) + { + delegate.down(evt); + } + + public Object downcall(Event evt) + { + return delegate.downcall(evt); + } + + public String dumpQueue() + { + return delegate.dumpQueue(); + } + + @SuppressWarnings("unchecked") + public Map dumpStats() + { + return delegate.dumpStats(); + } + + public boolean equals(Object obj) + { + return delegate.equals(obj); + } + + public boolean flushSupported() + { + return delegate.flushSupported(); + } + + @SuppressWarnings("unchecked") + public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException, ChannelClosedException + { + return delegate.getAllStates(targets, timeout); + } + + public String getChannelName() + { + return delegate.getChannelName(); + } + + public String getClusterName() + { + return delegate.getClusterName(); + } + + public Map getInfo() + { + return delegate.getInfo(); + } + + public Address getLocalAddress() + { + return delegate.getLocalAddress(); + } + + public int getNumMessages() + { + return delegate.getNumMessages(); + } + + public Object getOpt(int option) + { + return delegate.getOpt(option); + } + + public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException + { + return delegate.getState(target, timeout); + } + + public boolean getState(Address target, String state_id, long timeout) throws ChannelNotConnectedException, ChannelClosedException + { + return delegate.getState(target, state_id, timeout); + } + + public View getView() + { + return delegate.getView(); + } + + public int hashCode() + { + return delegate.hashCode(); + } + + public boolean isConnected() + { + return delegate.isConnected(); + } + + public boolean isOpen() + { + return delegate.isOpen(); + } + + public void open() throws ChannelException + { + delegate.open(); + } + + public Object peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException + { + return delegate.peek(timeout); + } + + public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException + { + return delegate.receive(timeout); + } + + public void returnState(byte[] state, String state_id) + { + delegate.returnState(state, state_id); + } + + public void returnState(byte[] state) + { + delegate.returnState(state); + } + + public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException, ChannelClosedException + { + delegate.send(dst, src, obj); + } + + public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException + { + delegate.send(msg); + } + + public void setChannelListener(ChannelListener channel_listener) + { + delegate.setChannelListener(channel_listener); + } + + public void setInfo(String key, Object value) + { + delegate.setInfo(key, value); + } + + public void setOpt(int option, Object value) + { + delegate.setOpt(option, value); + } + + public void shutdown() + { + delegate.shutdown(); + } + + public boolean startFlush(boolean automatic_resume) + { + return delegate.startFlush(automatic_resume); + } + + public boolean startFlush(List
flushParticipants, boolean automatic_resume) + { + return delegate.startFlush(flushParticipants, automatic_resume); + } + + public boolean startFlush(long timeout, boolean automatic_resume) + { + return delegate.startFlush(timeout, automatic_resume); + } + + public void stopFlush() + { + delegate.stopFlush(); + } + + public void stopFlush(List
flushParticipants) + { + delegate.stopFlush(flushParticipants); + } + + public String toString() + { + return delegate.toString(); + } + } +} diff --git a/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactoryTest.java b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactoryTest.java new file mode 100644 index 0000000000..2c35095e94 --- /dev/null +++ b/source/java/org/alfresco/repo/jgroups/AlfrescoJGroupsChannelFactoryTest.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2005-2008 Alfresco Software Limited. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + * As a special exception to the terms and conditions of version 2.0 of + * the GPL, you may redistribute this Program in connection with Free/Libre + * and Open Source Software ("FLOSS") applications as described in Alfresco's + * FLOSS exception. You should have recieved a copy of the text describing + * the FLOSS exception, and it is also available here: + * http://www.alfresco.com/legal/licensing" + */ +package org.alfresco.repo.jgroups; + +import org.jgroups.Channel; +import org.jgroups.Message; + +import junit.framework.TestCase; + +/** + * @see AlfrescoJGroupsChannelFactory + * + * @author Derek Hulley + * @since 2.1.3 + */ +public class AlfrescoJGroupsChannelFactoryTest extends TestCase +{ + private static byte[] bytes = new byte[65536]; + static + { + for (int i = 0; i < bytes.length; i++) + { + bytes[i] = 1; + } + } + + private String appRegion; + + @Override + protected void setUp() throws Exception + { + appRegion = getName(); + } + + /** + * Check that the channel is behaving + */ + private void stressChannel(Channel channel) throws Exception + { + System.out.println("Test: " + getName()); + System.out.println(" Channel: " + channel); + System.out.println(" Cluster: " + channel.getClusterName()); + channel.send(null, null, Boolean.TRUE); + channel.send(new Message(null, null, bytes)); + } + + public void testNoCluster() throws Exception + { + Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion); + stressChannel(channel); + } + + public void testBasicCluster() throws Exception + { + AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("blah"); + Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion); + stressChannel(channel); + } + + public void testHotSwapCluster() throws Exception + { + AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("ONE"); + Channel channel1 = AlfrescoJGroupsChannelFactory.getChannel(appRegion); + stressChannel(channel1); + AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("TWO"); + Channel channel2 = AlfrescoJGroupsChannelFactory.getChannel(appRegion); + stressChannel(channel1); + assertTrue("Channel reference must be the same", channel1 == channel2); + } +} diff --git a/source/test-resources/jgroups/ehcache-jgroups-cluster-test-context.xml b/source/test-resources/jgroups/ehcache-jgroups-cluster-test-context.xml new file mode 100644 index 0000000000..d5be11c2b3 --- /dev/null +++ b/source/test-resources/jgroups/ehcache-jgroups-cluster-test-context.xml @@ -0,0 +1,19 @@ + + + + + + + + + classpath:jgroups/ehcache-jgroups-cluster-test.xml + + + + + + ehcache-jgroups-cluster-test + + + + \ No newline at end of file diff --git a/source/test-resources/jgroups/ehcache-jgroups-cluster-test.xml b/source/test-resources/jgroups/ehcache-jgroups-cluster-test.xml new file mode 100644 index 0000000000..a3da88e41f --- /dev/null +++ b/source/test-resources/jgroups/ehcache-jgroups-cluster-test.xml @@ -0,0 +1,83 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +