mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
Merged BRANCHES/DEV/mward/cifs_hazelcast_conf to HEAD:
36614: ALF-13822: Renamed JGroupsKeepAliveHeartbeatReceiver. 36615: ALF-13822: renamed JGroupsKeepAliveHeartbeatSender. 36616: ALF-13822: renamed JGroupsRMICacheManagerPeerProvider. 36626: ALF-13822: renamed package from jgroups to cluster. 36642: ALF-13822: remove jgroupsChannelFactory from bootstrap-context.xml, exchange for Hazelcast. 36648: ALF-13822: remove jgroups references from RMICacheManagerPeerProvider. 36649: ALF-13822: Replaced jgroups with hazelcast in ehcache testing context. 36650: ALF-13822: Renamed ehcache test context and config so that jgroups not in name. 36651: ALF-13822: renamed jgroups package to ehcache. 36652: ALF-13822: Fix broken test from XML file renames. 36653: ALF-13822: rename file so that it doesn't have jgroups in the name. 36667: ALF-13822: removed some commented out jgroups code. 36679: ALF-13822: reimplemented membership change logging for KeepAliveHeartbeatReceiver. 36680: ALF-13822: remove redundant JGroups properties from clusterPropertySetter 36681: ALF-13822: remove redundant properties from repository.properties. 36682: ALF-13822: remove jgroups configuration files. 36683: ALF-13822: remove jgroups messenger abstraction classes. 36684: ALF-13822: removed jgroups test XML 36685: ALF-13822: remove AlfrescoJGroupsChannelFactory. 36686: ALF-13822: updated comment to include word hazelcast rather than jgroups. 36687: ALF-13822: removed jgroups from comment 36688: ALF-13822: removed jgroups from comment 36689: ALF-13822: removed jgroups comment as no longer relavent 36697: ALF-13822: removed jgroups from ant build files. 36704: ALF-13822: removed jgroups libraries. 36707: ALF-13822: removed jgroups source zip. 36708: ALF-13822: removed jgroups form installer license file. 36710: ALF-13822: removed jgroups lib from 3rd-party eclipse project's classpath. 36731: ALF-13822: removed mentions of jgroups from JLAN. 36761: ALF-13822: fixed hazelcast TCP config - removed ever-present loopback interface. 36762: ALF-13822: improved logging of KeepAliveHeartbeatReceiver and added toString() for HazelcastMessenger. 36990: ALF-13822: fixed AWS placeholder properties and added interface binding properties. git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@37008 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -170,24 +170,10 @@
|
||||
</property>
|
||||
</bean>
|
||||
|
||||
<!-- Set up JGroups communication, if necessary -->
|
||||
<bean name="jgroupsChannelFactory" class="org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory">
|
||||
<property name="clusterName">
|
||||
<value>${alfresco.cluster.name}</value>
|
||||
</property>
|
||||
<property name="configUrlsByAppRegion">
|
||||
<map>
|
||||
<entry key="DEFAULT">
|
||||
<value>${alfresco.jgroups.configLocation}</value>
|
||||
</entry>
|
||||
</map>
|
||||
</property>
|
||||
<bean id="messengerFactory" class="org.alfresco.repo.cluster.HazelcastMessengerFactory">
|
||||
<property name="hazelcastInstanceFactory" ref="hazelcastInstanceFactory"/>
|
||||
</bean>
|
||||
|
||||
<bean id="messengerFactory"
|
||||
class="org.alfresco.repo.cluster.JGroupsMessengerFactory"
|
||||
depends-on="jgroupsChannelFactory"/>
|
||||
|
||||
<!-- Set up this class for static access, don't use this for DI - use messengerFactory instead -->
|
||||
<bean id="staticMessengerFactoryProvider" class="org.alfresco.repo.cluster.MessengerFactoryProvider">
|
||||
<property name="instance" ref="messengerFactory"/>
|
||||
|
@@ -6,8 +6,22 @@
|
||||
<property name="configFile" value="${alfresco.hazelcast.configLocation}"/>
|
||||
<property name="properties">
|
||||
<props>
|
||||
<!-- Common options -->
|
||||
<prop key="alfresco.cluster.name">${alfresco.cluster.name}</prop>
|
||||
<prop key="alfresco.hazelcast.password">${alfresco.hazelcast.password}</prop>
|
||||
<prop key="alfresco.hazelcast.specify.interface">${alfresco.hazelcast.specify.interface}</prop>
|
||||
<prop key="alfresco.hazelcast.bind.interface">${alfresco.hazelcast.bind.interface}</prop>
|
||||
|
||||
<!-- TCP/IP discovery options -->
|
||||
<prop key="alfresco.hazelcast.tcp.config">${alfresco.hazelcast.tcp.config}</prop>
|
||||
|
||||
<!-- AWS discovery options -->
|
||||
<prop key="alfresco.hazelcast.ec2.accesskey">${alfresco.hazelcast.ec2.accesskey}</prop>
|
||||
<prop key="alfresco.hazelcast.ec2.secretkey">${alfresco.hazelcast.ec2.secretkey}</prop>
|
||||
<prop key="alfresco.hazelcast.ec2.region">${alfresco.hazelcast.ec2.region}</prop>
|
||||
<prop key="alfresco.hazelcast.ec2.securitygroup">${alfresco.hazelcast.ec2.securitygroup}</prop>
|
||||
<prop key="alfresco.hazelcast.ec2.tagkey">${alfresco.hazelcast.ec2.tagkey}</prop>
|
||||
<prop key="alfresco.hazelcast.ec2.tagvalue">${alfresco.hazelcast.ec2.tagvalue}</prop>
|
||||
</props>
|
||||
</property>
|
||||
</bean>
|
||||
|
@@ -121,12 +121,6 @@
|
||||
<bean id="clusterPropertySetter" class="org.alfresco.config.SystemPropertiesSetterBean" init-method="init">
|
||||
<property name="propertyMap">
|
||||
<map>
|
||||
<entry key="jgroups.bind_addr">
|
||||
<value>${alfresco.jgroups.bind_address}</value>
|
||||
</entry>
|
||||
<entry key="alfresco.jgroups.bind_interface">
|
||||
<value>${alfresco.jgroups.bind_interface}</value>
|
||||
</entry>
|
||||
<entry key="alfresco.ehcache.rmi.hostname">
|
||||
<value>${alfresco.ehcache.rmi.hostname}</value>
|
||||
</entry>
|
||||
|
@@ -3,7 +3,7 @@
|
||||
path="java.io.tmpdir"/>
|
||||
|
||||
<!--
|
||||
The 'heartbeatInterval' property is the only one used for the JGroups-enabled implementation
|
||||
The 'heartbeatInterval' property is the only one used for the Hazelcast-enabled implementation
|
||||
-->
|
||||
<cacheManagerPeerProviderFactory
|
||||
class="org.alfresco.repo.cache.AlfrescoCacheManagerPeerProviderFactory"
|
||||
|
@@ -27,8 +27,8 @@
|
||||
<tag-value>${alfresco.hazelcast.ec2.tagvalue}</tag-value>
|
||||
</aws>
|
||||
</join>
|
||||
<interfaces enabled="false">
|
||||
<interface>10.10.1.*</interface>
|
||||
<interfaces enabled="${alfresco.hazelcast.specify.interface}">
|
||||
<interface>${alfresco.hazelcast.bind.interface}</interface>
|
||||
</interfaces>
|
||||
<symmetric-encryption enabled="false">
|
||||
<!--
|
||||
|
@@ -15,7 +15,6 @@
|
||||
</multicast>
|
||||
<tcp-ip enabled="true">
|
||||
${alfresco.hazelcast.tcp.config}
|
||||
<interface>127.0.0.1</interface>
|
||||
</tcp-ip>
|
||||
<aws enabled="false">
|
||||
<access-key>my-access-key</access-key>
|
||||
@@ -28,8 +27,8 @@
|
||||
<tag-value>hz-nodes</tag-value>
|
||||
</aws>
|
||||
</join>
|
||||
<interfaces enabled="false">
|
||||
<interface>10.10.1.*</interface>
|
||||
<interfaces enabled="${alfresco.hazelcast.specify.interface}">
|
||||
<interface>${alfresco.hazelcast.bind.interface}</interface>
|
||||
</interfaces>
|
||||
<symmetric-encryption enabled="false">
|
||||
<!--
|
||||
|
@@ -27,8 +27,8 @@
|
||||
<tag-value>hz-nodes</tag-value>
|
||||
</aws>
|
||||
</join>
|
||||
<interfaces enabled="false">
|
||||
<interface>10.10.1.*</interface>
|
||||
<interfaces enabled="${alfresco.hazelcast.specify.interface}">
|
||||
<interface>${alfresco.hazelcast.bind.interface}</interface>
|
||||
</interfaces>
|
||||
<symmetric-encryption enabled="false">
|
||||
<!--
|
||||
|
File diff suppressed because it is too large
Load Diff
@@ -1,65 +0,0 @@
|
||||
<!--
|
||||
TCP based stack, with flow control and message bundling. This is usually used when IP
|
||||
multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast).
|
||||
Options:
|
||||
bind_port="${alfresco.tcp.start_port:7800}"
|
||||
initial_hosts="${alfresco.tcp.initial_hosts:localhost[7800]}"
|
||||
port_range="${alfresco.tcp.port_range:3}"
|
||||
author: Bela Ban (JGroups)
|
||||
author: Derek Hulley (Alfresco)
|
||||
-->
|
||||
<config>
|
||||
<TCP bind_port="${alfresco.tcp.start_port:7800}"
|
||||
bind_interface="${alfresco.jgroups.bind_interface:}"
|
||||
loopback="true"
|
||||
recv_buf_size="20000000"
|
||||
send_buf_size="640000"
|
||||
discard_incompatible_packets="true"
|
||||
max_bundle_size="64000"
|
||||
max_bundle_timeout="30"
|
||||
enable_bundling="true"
|
||||
use_send_queues="false"
|
||||
sock_conn_timeout="300"
|
||||
skip_suspected_members="true"
|
||||
|
||||
thread_pool.enabled="true"
|
||||
thread_pool.min_threads="1"
|
||||
thread_pool.max_threads="25"
|
||||
thread_pool.keep_alive_time="5000"
|
||||
thread_pool.queue_enabled="false"
|
||||
thread_pool.queue_max_size="100"
|
||||
thread_pool.rejection_policy="run"
|
||||
|
||||
oob_thread_pool.enabled="true"
|
||||
oob_thread_pool.min_threads="1"
|
||||
oob_thread_pool.max_threads="8"
|
||||
oob_thread_pool.keep_alive_time="5000"
|
||||
oob_thread_pool.queue_enabled="false"
|
||||
oob_thread_pool.queue_max_size="100"
|
||||
oob_thread_pool.rejection_policy="run"/>
|
||||
|
||||
<TCPPING timeout="3000"
|
||||
initial_hosts="${alfresco.tcp.initial_hosts:localhost[7800]}"
|
||||
port_range="${alfresco.tcp.port_range:3}"
|
||||
num_initial_members="2"/>
|
||||
<MERGE2 max_interval="30000"
|
||||
min_interval="10000"/>
|
||||
<FD_SIMPLE timeout="10000" max_missed_hbs="10" />
|
||||
<VERIFY_SUSPECT timeout="1500" />
|
||||
<BARRIER />
|
||||
<pbcast.NAKACK
|
||||
use_mcast_xmit="false" gc_lag="0"
|
||||
retransmit_timeout="300,600,1200,2400,4800"
|
||||
discard_delivered_msgs="true"/>
|
||||
<UNICAST timeout="300,600,1200" />
|
||||
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
|
||||
max_bytes="400000"/>
|
||||
<VIEW_SYNC avg_send_interval="60000"/>
|
||||
<pbcast.GMS print_local_addr="true" join_timeout="3000"
|
||||
view_bundling="true"/>
|
||||
<FC max_credits="2000000"
|
||||
min_threshold="0.10"/>
|
||||
<FRAG2 frag_size="60000" />
|
||||
<pbcast.STREAMING_STATE_TRANSFER/>
|
||||
<!-- <pbcast.STATE_TRANSFER/> -->
|
||||
</config>
|
@@ -1,67 +0,0 @@
|
||||
|
||||
<!--
|
||||
Default stack using IP multicasting. It is similar to the "udp"
|
||||
stack in stacks.xml, but doesn't use streaming state transfer and flushing
|
||||
author: Bela Ban (JGroups)
|
||||
author: Derek Hulley (Alfresco)
|
||||
-->
|
||||
|
||||
<config>
|
||||
<UDP
|
||||
mcast_addr="${alfresco.udp.mcast_addr:230.0.0.1}"
|
||||
mcast_port="${alfresco.udp.mcast_port:4446}"
|
||||
tos="8"
|
||||
ucast_recv_buf_size="20000000"
|
||||
ucast_send_buf_size="640000"
|
||||
mcast_recv_buf_size="25000000"
|
||||
mcast_send_buf_size="640000"
|
||||
loopback="false"
|
||||
discard_incompatible_packets="true"
|
||||
max_bundle_size="64000"
|
||||
max_bundle_timeout="30"
|
||||
ip_ttl="${alfresco.udp.ip_ttl:2}"
|
||||
enable_bundling="true"
|
||||
enable_diagnostics="true"
|
||||
thread_naming_pattern="cl"
|
||||
|
||||
thread_pool.enabled="true"
|
||||
thread_pool.min_threads="2"
|
||||
thread_pool.max_threads="8"
|
||||
thread_pool.keep_alive_time="5000"
|
||||
thread_pool.queue_enabled="true"
|
||||
thread_pool.queue_max_size="10000"
|
||||
thread_pool.rejection_policy="discard"
|
||||
|
||||
oob_thread_pool.enabled="true"
|
||||
oob_thread_pool.min_threads="1"
|
||||
oob_thread_pool.max_threads="8"
|
||||
oob_thread_pool.keep_alive_time="5000"
|
||||
oob_thread_pool.queue_enabled="false"
|
||||
oob_thread_pool.queue_max_size="100"
|
||||
oob_thread_pool.rejection_policy="Run"/>
|
||||
|
||||
<PING timeout="2000"
|
||||
num_initial_members="2"/>
|
||||
<MERGE2 max_interval="30000"
|
||||
min_interval="10000"/>
|
||||
<FD_SIMPLE timeout="10000" max_missed_hbs="10" />
|
||||
<VERIFY_SUSPECT timeout="1500" />
|
||||
<BARRIER />
|
||||
<pbcast.NAKACK use_stats_for_retransmission="false"
|
||||
exponential_backoff="150"
|
||||
use_mcast_xmit="true" gc_lag="0"
|
||||
retransmit_timeout="50,300,600,1200"
|
||||
discard_delivered_msgs="true"/>
|
||||
<UNICAST timeout="300,600,1200"/>
|
||||
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
|
||||
max_bytes="1000000"/>
|
||||
<VIEW_SYNC avg_send_interval="60000" />
|
||||
<pbcast.GMS print_local_addr="true" join_timeout="3000"
|
||||
view_bundling="true"/>
|
||||
<FC max_credits="500000"
|
||||
min_threshold="0.20"/>
|
||||
<FRAG2 frag_size="60000" />
|
||||
<!--pbcast.STREAMING_STATE_TRANSFER /-->
|
||||
<pbcast.STATE_TRANSFER />
|
||||
<!-- pbcast.FLUSH /-->
|
||||
</config>
|
@@ -129,8 +129,13 @@ alfresco.hazelcast.protocol=tcp
|
||||
# Location of the Hazelcast configuration file
|
||||
alfresco.hazelcast.configLocation=classpath:alfresco/hazelcast/hazelcast-${alfresco.hazelcast.protocol}.xml
|
||||
# XML elements to incorporate into Hazelcast config, in particular
|
||||
# hostnames to use for membership discovery
|
||||
alfresco.hazelcast.tcp.config=<hostname>localhost</hostname>
|
||||
# hostnames/IP addresses to use for membership discovery
|
||||
alfresco.hazelcast.tcp.config=<members></members>
|
||||
# Whether to bind to a specific host interface
|
||||
alfresco.hazelcast.specify.interface=false
|
||||
# The interface to bind to, if enabled above.
|
||||
alfresco.hazelcast.bind.interface=
|
||||
|
||||
# Amazon Web Services - EC2 discovery
|
||||
alfresco.hazelcast.ec2.accesskey=my-access-key
|
||||
alfresco.hazelcast.ec2.secretkey=my-secret-key
|
||||
@@ -149,17 +154,6 @@ alfresco.ehcache.rmi.remoteObjectPort=0
|
||||
alfresco.ehcache.rmi.port=0
|
||||
alfresco.ehcache.rmi.socketTimeoutMillis=5000
|
||||
|
||||
# The protocol stack to use from the JGroups configuration file
|
||||
# Use this property to select which communication method should be used.
|
||||
# The JGroups configuration file is build up using the protocol string
|
||||
alfresco.jgroups.defaultProtocol=UDP
|
||||
# The bind address and interface for JGroups to use; equivalent to -Djgroups.bind_addr and -Djgroups.bind_interface
|
||||
alfresco.jgroups.bind_address=
|
||||
alfresco.jgroups.bind_interface=
|
||||
# JGroups configuration (http://www.jgroups.org)
|
||||
# The location of the JGroups configuration file
|
||||
alfresco.jgroups.configLocation=classpath:alfresco/jgroups/alfresco-jgroups-${alfresco.jgroups.defaultProtocol}.xml
|
||||
|
||||
#
|
||||
# How long should shutdown wait to complete normally before
|
||||
# taking stronger action and calling System.exit()
|
||||
|
@@ -46,7 +46,7 @@ public class AlfrescoCacheManagerPeerProviderFactory extends CacheManagerPeerPro
|
||||
try
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
Class clazz = Class.forName("org.alfresco.enterprise.repo.cache.jgroups.JGroupsRMICacheManagerPeerProvider$Factory");
|
||||
Class clazz = Class.forName("org.alfresco.enterprise.repo.cache.cluster.RMICacheManagerPeerProvider$Factory");
|
||||
factory = (CacheManagerPeerProviderFactory) clazz.newInstance();
|
||||
}
|
||||
catch (ClassNotFoundException e)
|
||||
@@ -55,7 +55,7 @@ public class AlfrescoCacheManagerPeerProviderFactory extends CacheManagerPeerPro
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
logger.error("Failed to instantiate JGroupsRMICacheManagerPeerProvider factory.", e);
|
||||
logger.error("Failed to instantiate RMICacheManagerPeerProvider factory.", e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
@@ -36,8 +36,7 @@ import org.junit.runners.Suite.SuiteClasses;
|
||||
@SuiteClasses({
|
||||
org.alfresco.repo.cluster.HazelcastConfigFactoryBeanTest.class,
|
||||
org.alfresco.repo.cluster.HazelcastMessengerFactoryTest.class,
|
||||
org.alfresco.repo.cluster.HazelcastMessengerTest.class,
|
||||
org.alfresco.repo.cluster.JGroupsMessengerTest.class
|
||||
org.alfresco.repo.cluster.HazelcastMessengerTest.class
|
||||
})
|
||||
public class BuildSafeTestSuite
|
||||
{
|
||||
|
@@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
/**
|
||||
* Implementing classes can react to members joining or leaving the cluster.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public interface ClusterMembershipListener
|
||||
{
|
||||
void memberJoined(String member, String[] cluster);
|
||||
void memberLeft(String member, String[] cluster);
|
||||
}
|
@@ -36,8 +36,7 @@ import org.junit.runners.Suite.SuiteClasses;
|
||||
org.alfresco.repo.cluster.BuildSafeTestSuite.class,
|
||||
|
||||
// Additionally run these tests that cannot be run on the build servers.
|
||||
org.alfresco.repo.cluster.HazelcastTest.class,
|
||||
org.alfresco.repo.cluster.JGroupsTest.class
|
||||
org.alfresco.repo.cluster.HazelcastTest.class
|
||||
})
|
||||
public class ClusterTestSuite
|
||||
{
|
||||
|
@@ -21,9 +21,9 @@ package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.extensions.surf.util.ParameterCheck;
|
||||
|
||||
import com.hazelcast.core.ITopic;
|
||||
import com.hazelcast.core.MessageListener;
|
||||
@@ -40,6 +40,7 @@ public class HazelcastMessenger<T extends Serializable> implements Messenger<T>,
|
||||
private MessageReceiver<T> receiverDelegate;
|
||||
private String address;
|
||||
private final static Log logger = LogFactory.getLog(HazelcastMessenger.class);
|
||||
|
||||
/**
|
||||
* @param topic
|
||||
*/
|
||||
@@ -55,7 +56,8 @@ public class HazelcastMessenger<T extends Serializable> implements Messenger<T>,
|
||||
{
|
||||
if (logger.isTraceEnabled())
|
||||
{
|
||||
logger.trace("Sending " + message);
|
||||
String digest = StringUtils.abbreviate(message.toString(), 50);
|
||||
logger.trace("Sending [source: " + address + "]: " + digest);
|
||||
}
|
||||
topic.publish(message);
|
||||
}
|
||||
@@ -74,7 +76,8 @@ public class HazelcastMessenger<T extends Serializable> implements Messenger<T>,
|
||||
{
|
||||
if (logger.isTraceEnabled())
|
||||
{
|
||||
logger.trace("Received (will be delegated to receiver): " + message);
|
||||
String digest = StringUtils.abbreviate(message.toString(), 50);
|
||||
logger.trace("Received [destination: " + address + "] (delegating to receiver): " + digest);
|
||||
}
|
||||
receiverDelegate.onReceive(message);
|
||||
}
|
||||
@@ -96,4 +99,13 @@ public class HazelcastMessenger<T extends Serializable> implements Messenger<T>,
|
||||
{
|
||||
return address;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "HazelcastMessenger[connected=" + isConnected() +
|
||||
", topic=" + getTopic() +
|
||||
", address=" + getAddress() + "]";
|
||||
}
|
||||
}
|
||||
|
@@ -20,13 +20,13 @@
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Set;
|
||||
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import com.hazelcast.config.Config;
|
||||
import com.hazelcast.config.GroupConfig;
|
||||
import com.hazelcast.core.HazelcastInstance;
|
||||
import com.hazelcast.core.ITopic;
|
||||
import com.hazelcast.core.Member;
|
||||
import com.hazelcast.core.MembershipEvent;
|
||||
import com.hazelcast.core.MembershipListener;
|
||||
|
||||
/**
|
||||
* Hazelcast-based implementation of the {@link MessengerFactory} interface.
|
||||
@@ -37,7 +37,7 @@ import com.hazelcast.core.ITopic;
|
||||
*/
|
||||
public class HazelcastMessengerFactory implements MessengerFactory
|
||||
{
|
||||
private HazelcastInstance hazelcast;
|
||||
private HazelcastInstanceFactory hazelcastInstanceFactory;
|
||||
|
||||
@Override
|
||||
public <T extends Serializable> Messenger<T> createMessenger(String appRegion)
|
||||
@@ -48,28 +48,70 @@ public class HazelcastMessengerFactory implements MessengerFactory
|
||||
@Override
|
||||
public <T extends Serializable> Messenger<T> createMessenger(String appRegion, boolean acceptLocalMessages)
|
||||
{
|
||||
if (!isClusterActive())
|
||||
{
|
||||
return new NullMessenger<T>();
|
||||
}
|
||||
// Clustering is enabled, create a messenger.
|
||||
HazelcastInstance hazelcast = hazelcastInstanceFactory.getInstance();
|
||||
ITopic<T> topic = hazelcast.getTopic(appRegion);
|
||||
String address = hazelcast.getCluster().getLocalMember().getInetSocketAddress().toString();
|
||||
return new HazelcastMessenger<T>(topic, address);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param hazelcast the hazelcast to set
|
||||
* Provide the messenger factory with a means to obtain a HazelcastInstance.
|
||||
*
|
||||
* @param hazelcastInstanceFactory
|
||||
*/
|
||||
public void setHazelcast(HazelcastInstance hazelcast)
|
||||
public void setHazelcastInstanceFactory(HazelcastInstanceFactory hazelcastInstanceFactory)
|
||||
{
|
||||
this.hazelcast = hazelcast;
|
||||
this.hazelcastInstanceFactory = hazelcastInstanceFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClusterActive()
|
||||
{
|
||||
Config config = hazelcast.getConfig();
|
||||
if (config == null || config.getGroupConfig() == null)
|
||||
return hazelcastInstanceFactory.isClusteringEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addMembershipListener(final ClusterMembershipListener listener)
|
||||
{
|
||||
return false;
|
||||
if (isClusterActive())
|
||||
{
|
||||
HazelcastInstance hazelcast = hazelcastInstanceFactory.getInstance();
|
||||
hazelcast.getCluster().addMembershipListener(new MembershipListener()
|
||||
{
|
||||
@Override
|
||||
public void memberRemoved(MembershipEvent e)
|
||||
{
|
||||
listener.memberLeft(member(e), cluster(e));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void memberAdded(MembershipEvent e)
|
||||
{
|
||||
listener.memberJoined(member(e), cluster(e));
|
||||
}
|
||||
|
||||
private String member(MembershipEvent e)
|
||||
{
|
||||
return e.getMember().getInetSocketAddress().toString();
|
||||
}
|
||||
|
||||
private String[] cluster(MembershipEvent e)
|
||||
{
|
||||
Set<Member> members = e.getCluster().getMembers();
|
||||
String[] cluster = new String[members.size()];
|
||||
int i = 0;
|
||||
for (Member m : members)
|
||||
{
|
||||
cluster[i++] = m.getInetSocketAddress().toString();
|
||||
}
|
||||
return cluster;
|
||||
}
|
||||
});
|
||||
}
|
||||
GroupConfig groupConfig = config.getGroupConfig();
|
||||
return StringUtils.hasText(groupConfig.getName());
|
||||
}
|
||||
}
|
||||
|
@@ -19,7 +19,7 @@
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@@ -31,8 +31,6 @@ import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import com.hazelcast.config.Config;
|
||||
import com.hazelcast.config.GroupConfig;
|
||||
import com.hazelcast.core.Cluster;
|
||||
import com.hazelcast.core.HazelcastInstance;
|
||||
import com.hazelcast.core.ITopic;
|
||||
@@ -48,19 +46,20 @@ import com.hazelcast.core.Member;
|
||||
public class HazelcastMessengerFactoryTest
|
||||
{
|
||||
private HazelcastMessengerFactory factory;
|
||||
private GroupConfig groupConfig;
|
||||
private @Mock HazelcastInstance hazelcast;
|
||||
private @Mock Member member;
|
||||
private @Mock Cluster cluster;
|
||||
private @Mock ITopic<String> topic;
|
||||
private @Mock Config config;
|
||||
private @Mock HazelcastInstanceFactory hazelcastInstanceFactory;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
factory = new HazelcastMessengerFactory();
|
||||
factory.setHazelcast(hazelcast);
|
||||
groupConfig = new GroupConfig();
|
||||
factory.setHazelcastInstanceFactory(hazelcastInstanceFactory);
|
||||
|
||||
when(hazelcastInstanceFactory.isClusteringEnabled()).thenReturn(true);
|
||||
when(hazelcastInstanceFactory.getInstance()).thenReturn(hazelcast);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -80,13 +79,9 @@ public class HazelcastMessengerFactoryTest
|
||||
@Test
|
||||
public void canCheckClusterIsActive()
|
||||
{
|
||||
when(hazelcast.getConfig()).thenReturn(config);
|
||||
when(config.getGroupConfig()).thenReturn(groupConfig);
|
||||
|
||||
groupConfig.setName("my-cluster-name");
|
||||
assertEquals(true, factory.isClusterActive());
|
||||
|
||||
groupConfig.setName("");
|
||||
when(hazelcastInstanceFactory.isClusteringEnabled()).thenReturn(false);
|
||||
assertEquals(false, factory.isClusterActive());
|
||||
}
|
||||
}
|
||||
|
@@ -29,8 +29,6 @@ import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import com.hazelcast.config.Config;
|
||||
import com.hazelcast.config.GroupConfig;
|
||||
import com.hazelcast.core.Hazelcast;
|
||||
import com.hazelcast.core.HazelcastInstance;
|
||||
import com.hazelcast.core.ITopic;
|
||||
@@ -46,6 +44,8 @@ public class HazelcastTest implements MessageListener<String>
|
||||
{
|
||||
private static ApplicationContext ctx;
|
||||
private MessengerTestHelper helper;
|
||||
private HazelcastInstanceFactory hiFactory;
|
||||
private HazelcastInstance hi;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpClass()
|
||||
@@ -65,14 +65,14 @@ public class HazelcastTest implements MessageListener<String>
|
||||
public void setUp()
|
||||
{
|
||||
helper = new MessengerTestHelper();
|
||||
hiFactory = ctx.getBean(HazelcastInstanceFactory.class);
|
||||
hi = hiFactory.getInstance();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void canSendWithHazelcastMessengerFactory() throws InterruptedException
|
||||
{
|
||||
Config config = createConfig();
|
||||
HazelcastInstance hi = Hazelcast.newHazelcastInstance(config);
|
||||
ITopic<String> topic = hi.getTopic("testregion");
|
||||
|
||||
topic.addMessageListener(this);
|
||||
@@ -93,8 +93,6 @@ public class HazelcastTest implements MessageListener<String>
|
||||
TestMessageReceiver r1 = new TestMessageReceiver();
|
||||
m1.setReceiver(r1);
|
||||
|
||||
Config config = createConfig();
|
||||
HazelcastInstance hi = Hazelcast.newHazelcastInstance(config);
|
||||
ITopic<String> topic2 = hi.getTopic("testregion");
|
||||
String address2 = hi.getCluster().getLocalMember().getInetSocketAddress().toString();
|
||||
Messenger<String> m2 = new HazelcastMessenger<String>(topic2, address2);
|
||||
@@ -112,14 +110,4 @@ public class HazelcastTest implements MessageListener<String>
|
||||
{
|
||||
helper.setReceivedMsg(message);
|
||||
}
|
||||
|
||||
private Config createConfig()
|
||||
{
|
||||
Config config = new Config();
|
||||
GroupConfig groupConfig = new GroupConfig();
|
||||
groupConfig.setName("testcluster");
|
||||
groupConfig.setPassword("secret");
|
||||
config.setGroupConfig(groupConfig);
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
@@ -1,144 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInput;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutput;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.jgroups.Channel;
|
||||
import org.jgroups.Message;
|
||||
import org.jgroups.ReceiverAdapter;
|
||||
|
||||
/**
|
||||
* JGroups implementation of the {@link Messenger} class.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class JGroupsMessenger<T extends Serializable> extends ReceiverAdapter implements Messenger<T>
|
||||
{
|
||||
private final Channel channel;
|
||||
private MessageReceiver<T> receiverDelegate;
|
||||
private final static Log logger = LogFactory.getLog(JGroupsMessenger.class);
|
||||
|
||||
/**
|
||||
* Construct a messenger that wraps a JGroups Channel.
|
||||
*
|
||||
* @param channel
|
||||
*/
|
||||
public JGroupsMessenger(Channel channel)
|
||||
{
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void send(T message)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Serializing the message ourselves and passing a byte[]
|
||||
// to Channel.send() as recommended by JGroups.
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
ObjectOutput out = new ObjectOutputStream(bytes);
|
||||
out.writeObject(message);
|
||||
out.close();
|
||||
bytes.close();
|
||||
if (logger.isTraceEnabled())
|
||||
{
|
||||
logger.trace("Sending " + message);
|
||||
}
|
||||
channel.send(null, null, bytes.toByteArray());
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
throw new MessageSendingException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @see org.alfresco.repo.cluster.Messenger#setReceiver(org.alfresco.repo.cluster.MessageReceiver)
|
||||
*/
|
||||
@Override
|
||||
public void setReceiver(MessageReceiver<T> receiver)
|
||||
{
|
||||
// Make sure the delegate is installed, before starting to receive messages.
|
||||
receiverDelegate = receiver;
|
||||
// Start receiving messages and dispatch them to the delegate.
|
||||
channel.setReceiver(this);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* @see org.jgroups.ReceiverAdapter#receive(org.jgroups.Message)
|
||||
*/
|
||||
@Override
|
||||
public void receive(Message msg)
|
||||
{
|
||||
// Deserializing the message ourselves rather than using
|
||||
// the Message's getObject() method (as recommended by JGroups).
|
||||
byte[] msgBytes = msg.getBuffer();
|
||||
ByteArrayInputStream bytes = new ByteArrayInputStream(msgBytes);
|
||||
ObjectInput in;
|
||||
try
|
||||
{
|
||||
in = new ObjectInputStream(bytes);
|
||||
@SuppressWarnings("unchecked")
|
||||
T payload = (T) in.readObject();
|
||||
in.close();
|
||||
bytes.close();
|
||||
if (logger.isTraceEnabled())
|
||||
{
|
||||
logger.trace("Received (will be delegated to receiver): " + payload);
|
||||
}
|
||||
// Pass the deserialized payload on to the receiver delegate
|
||||
receiverDelegate.onReceive(payload);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new RuntimeException("Couldn't receive object.", e);
|
||||
}
|
||||
catch (ClassNotFoundException e)
|
||||
{
|
||||
throw new RuntimeException("Couldn't receive object.", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isConnected()
|
||||
{
|
||||
return channel.isConnected();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getAddress()
|
||||
{
|
||||
return channel.getAddress().toString();
|
||||
}
|
||||
}
|
@@ -1,54 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory;
|
||||
import org.alfresco.util.ParameterCheck;
|
||||
import org.jgroups.Channel;
|
||||
|
||||
/**
|
||||
* JGroups implementation of the {@link MessengerFactory} interface.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class JGroupsMessengerFactory implements MessengerFactory
|
||||
{
|
||||
@Override
|
||||
public <T extends Serializable> Messenger<T> createMessenger(String appRegion)
|
||||
{
|
||||
return createMessenger(appRegion, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends Serializable> Messenger<T> createMessenger(String appRegion, boolean acceptLocalMessages)
|
||||
{
|
||||
ParameterCheck.mandatory("appRegion", appRegion);
|
||||
Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion, acceptLocalMessages);
|
||||
return new JGroupsMessenger<T>(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClusterActive()
|
||||
{
|
||||
return AlfrescoJGroupsChannelFactory.isClusterActive();
|
||||
}
|
||||
}
|
@@ -1,125 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectOutput;
|
||||
import java.io.ObjectOutputStream;
|
||||
|
||||
import org.jgroups.Address;
|
||||
import org.jgroups.Channel;
|
||||
import org.jgroups.ChannelClosedException;
|
||||
import org.jgroups.ChannelNotConnectedException;
|
||||
import org.jgroups.Message;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* Tests for the JGroupsMessenger class.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class JGroupsMessengerTest
|
||||
{
|
||||
private Channel channel;
|
||||
private JGroupsMessenger<String> messenger;
|
||||
protected String receivedMsg;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
channel = Mockito.mock(Channel.class);
|
||||
messenger = new JGroupsMessenger<String>(channel);
|
||||
receivedMsg = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canSendMessage() throws ChannelNotConnectedException, ChannelClosedException, IOException
|
||||
{
|
||||
String testText = "This is a test message";
|
||||
byte[] testTextSer = serialize(testText);
|
||||
// When a message is sent...
|
||||
messenger.send(testText);
|
||||
|
||||
// the underlying channel should have been used to send it,
|
||||
// but will be called with a serialized version of the text.
|
||||
verify(channel).send(null, null, testTextSer);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void canReceiveMessage() throws IOException
|
||||
{
|
||||
MessageReceiver<String> receiver = new MessageReceiver<String>()
|
||||
{
|
||||
@Override
|
||||
public void onReceive(String message)
|
||||
{
|
||||
receivedMsg = message;
|
||||
}
|
||||
};
|
||||
|
||||
messenger.setReceiver(receiver);
|
||||
Message jgroupsMessage = new Message(null, null, serialize("JGroups message payload"));
|
||||
// JGroups will call the receive method
|
||||
messenger.receive(jgroupsMessage);
|
||||
|
||||
// The Messenger should have installed itself as the message
|
||||
// receiver for the underlying channel.
|
||||
verify(channel).setReceiver(messenger);
|
||||
|
||||
assertEquals("JGroups message payload", receivedMsg.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canDelegateIsConnected()
|
||||
{
|
||||
Mockito.when(channel.isConnected()).thenReturn(true);
|
||||
assertEquals(true, messenger.isConnected());
|
||||
|
||||
Mockito.when(channel.isConnected()).thenReturn(false);
|
||||
assertEquals(false, messenger.isConnected());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canDelegateGetAddress()
|
||||
{
|
||||
Address address = Mockito.mock(Address.class);
|
||||
Mockito.when(address.toString()).thenReturn("an-address");
|
||||
Mockito.when(channel.getAddress()).thenReturn(address);
|
||||
assertEquals("an-address", messenger.getAddress());
|
||||
}
|
||||
|
||||
private byte[] serialize(String text) throws IOException
|
||||
{
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
ObjectOutput out = new ObjectOutputStream(bytes);
|
||||
out.writeObject(text);
|
||||
out.close();
|
||||
bytes.close();
|
||||
return bytes.toByteArray();
|
||||
}
|
||||
}
|
||||
|
@@ -1,177 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInput;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.alfresco.repo.cluster.MessengerTestHelper.TestMessageReceiver;
|
||||
import org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory;
|
||||
import org.alfresco.util.ApplicationContextHelper;
|
||||
import org.jgroups.Channel;
|
||||
import org.jgroups.ChannelException;
|
||||
import org.jgroups.JChannel;
|
||||
import org.jgroups.Message;
|
||||
import org.jgroups.ReceiverAdapter;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
/**
|
||||
* Tests for the JGroups messaging abstractions.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class JGroupsTest extends ReceiverAdapter
|
||||
{
|
||||
private static ApplicationContext ctx;
|
||||
private MessengerTestHelper helper;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpClass()
|
||||
{
|
||||
ctx = ApplicationContextHelper.
|
||||
getApplicationContext(new String[] { "cluster-test/jgroups-messenger-test.xml" });
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownClass()
|
||||
{
|
||||
ApplicationContextHelper.closeApplicationContext();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
helper = new MessengerTestHelper();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canSendWithJGroupsMessengerFactory() throws InterruptedException, ChannelException
|
||||
{
|
||||
Channel ch = new JChannel("udp.xml");
|
||||
ch.connect("testcluster:testregion");
|
||||
ch.setReceiver(this);
|
||||
|
||||
MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory");
|
||||
Messenger<String> messenger = messengerFactory.createMessenger("testregion");
|
||||
messenger.send("Full test including spring.");
|
||||
|
||||
helper.checkMessageReceivedWas("Full test including spring.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canSendWithJGroupsMessengerFactoryWithoutSpring() throws InterruptedException, ChannelException
|
||||
{
|
||||
Channel ch = new JChannel("udp.xml");
|
||||
ch.connect("testcluster:testregion");
|
||||
ch.setReceiver(this);
|
||||
|
||||
AlfrescoJGroupsChannelFactory channelFactory = new AlfrescoJGroupsChannelFactory();
|
||||
channelFactory.setClusterName("testcluster");
|
||||
channelFactory.setConfigUrlsByAppRegion(Collections.singletonMap("DEFAULT", "classpath:udp.xml"));
|
||||
AlfrescoJGroupsChannelFactory.rebuildChannels();
|
||||
|
||||
JGroupsMessengerFactory messengerFactory = new JGroupsMessengerFactory();
|
||||
|
||||
Messenger<String> messenger = messengerFactory.createMessenger("testregion");
|
||||
messenger.send("This is a test payload.");
|
||||
|
||||
helper.checkMessageReceivedWas("This is a test payload.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canWrapRawChannels() throws ChannelException, InterruptedException
|
||||
{
|
||||
Channel sendCh = new JChannel("udp.xml");
|
||||
sendCh.connect("mycluster");
|
||||
Messenger<String> messenger = new JGroupsMessenger<String>(sendCh);
|
||||
|
||||
Channel recvCh = new JChannel("udp.xml");
|
||||
recvCh.connect("mycluster");
|
||||
recvCh.setReceiver(this);
|
||||
|
||||
messenger.send("This message was sent with jgroups");
|
||||
|
||||
helper.checkMessageReceivedWas("This message was sent with jgroups");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canCheckIsClusterActive()
|
||||
{
|
||||
JGroupsMessengerFactory messengerFactory = new JGroupsMessengerFactory();
|
||||
|
||||
AlfrescoJGroupsChannelFactory.changeClusterNamePrefix(null);
|
||||
assertEquals(false, messengerFactory.isClusterActive());
|
||||
|
||||
AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("my-cluster-name");
|
||||
assertEquals(true, messengerFactory.isClusterActive());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void messengerWillNotReceiveMessagesFromSelf() throws InterruptedException, ChannelException
|
||||
{
|
||||
MessengerFactory messengerFactory = (MessengerFactory) ctx.getBean("messengerFactory");
|
||||
Messenger<String> m1 = messengerFactory.createMessenger("testregion");
|
||||
TestMessageReceiver r1 = new TestMessageReceiver();
|
||||
m1.setReceiver(r1);
|
||||
|
||||
Channel ch2 = new JChannel("udp.xml");
|
||||
ch2.connect("testcluster:testregion");
|
||||
Messenger<String> m2 = new JGroupsMessenger<String>(ch2);
|
||||
TestMessageReceiver r2 = new TestMessageReceiver();
|
||||
m2.setReceiver(r2);
|
||||
|
||||
m1.send("This should be received by r2 but not r1");
|
||||
|
||||
r2.helper.checkMessageReceivedWas("This should be received by r2 but not r1");
|
||||
r1.helper.checkNoMessageReceived();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receive(Message msg)
|
||||
{
|
||||
ByteArrayInputStream bytes = new ByteArrayInputStream(msg.getBuffer());
|
||||
ObjectInput in;
|
||||
try
|
||||
{
|
||||
in = new ObjectInputStream(bytes);
|
||||
String payload = (String) in.readObject();
|
||||
in.close();
|
||||
bytes.close();
|
||||
helper.setReceivedMsg(payload);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new RuntimeException("Couldn't receive object.", e);
|
||||
}
|
||||
catch (ClassNotFoundException e)
|
||||
{
|
||||
throw new RuntimeException("Couldn't receive object.", e);
|
||||
}
|
||||
}
|
||||
}
|
@@ -23,8 +23,8 @@ import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Provides facilities for peer-to-peer messaging within a cluster. This interface
|
||||
* is intended to act as a facade, allowing the actual implementation (e.g. JGroups
|
||||
* or Hazelcast) to be decoupled as much as possible from the Alfresco code base.
|
||||
* is intended to act as a facade, allowing the actual implementation (e.g. Hazelcast)
|
||||
* to be decoupled as much as possible from the Alfresco code base.
|
||||
* <p>
|
||||
* Instances of this class are parameterised with the type of message payload
|
||||
* to send and receive.
|
||||
|
@@ -31,7 +31,7 @@ public interface MessengerFactory
|
||||
/** A catch-all for unknown application regions. */
|
||||
public static final String APP_REGION_DEFAULT = "DEFAULT";
|
||||
|
||||
/** The application region used by the EHCache heartbeat implementation over JGroups. */
|
||||
/** The application region used by the EHCache heartbeat implementation. */
|
||||
public static final String APP_REGION_EHCACHE_HEARTBEAT = "EHCACHE_HEARTBEAT";
|
||||
|
||||
<T extends Serializable> Messenger<T> createMessenger(String appRegion);
|
||||
@@ -39,4 +39,6 @@ public interface MessengerFactory
|
||||
<T extends Serializable> Messenger<T> createMessenger(String appRegion, boolean acceptLocalMessages);
|
||||
|
||||
boolean isClusterActive();
|
||||
|
||||
void addMembershipListener(ClusterMembershipListener membershipListener);
|
||||
}
|
||||
|
68
source/java/org/alfresco/repo/cluster/NullMessenger.java
Normal file
68
source/java/org/alfresco/repo/cluster/NullMessenger.java
Normal file
@@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2012 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.alfresco.repo.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* A do-nothing implementation of the {@link Messenger} interface.
|
||||
*
|
||||
* @author Matt Ward
|
||||
*/
|
||||
public class NullMessenger<T extends Serializable> implements Messenger<T>
|
||||
{
|
||||
private static final Log logger = LogFactory.getLog(NullMessenger.class);
|
||||
|
||||
@Override
|
||||
public void send(T message)
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Throwing away message: " + message);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReceiver(MessageReceiver<T> receiver)
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Throwing away receiver: " + receiver);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConnected()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAddress()
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("getAddress() always returns loopback address: 127.0.0.1");
|
||||
}
|
||||
return "127.0.0.1";
|
||||
}
|
||||
}
|
@@ -1,966 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2010 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.alfresco.repo.jgroups;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.net.URL;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.alfresco.error.AlfrescoRuntimeException;
|
||||
import org.alfresco.repo.cluster.MessengerFactory;
|
||||
import org.alfresco.util.PropertyCheck;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.jgroups.Address;
|
||||
import org.jgroups.Channel;
|
||||
import org.jgroups.ChannelClosedException;
|
||||
import org.jgroups.ChannelException;
|
||||
import org.jgroups.ChannelListener;
|
||||
import org.jgroups.ChannelNotConnectedException;
|
||||
import org.jgroups.Event;
|
||||
import org.jgroups.JChannel;
|
||||
import org.jgroups.Message;
|
||||
import org.jgroups.Receiver;
|
||||
import org.jgroups.TimeoutException;
|
||||
import org.jgroups.UpHandler;
|
||||
import org.jgroups.View;
|
||||
import org.jgroups.protocols.LOOPBACK;
|
||||
import org.jgroups.stack.ProtocolStack;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.extensions.surf.util.AbstractLifecycleBean;
|
||||
import org.springframework.util.ResourceUtils;
|
||||
|
||||
/**
|
||||
* A cache peer provider that does heartbeat sending and receiving using JGroups.
|
||||
* <p>
|
||||
* The cluster name needs to be set before any communication is possible. This can be done using the
|
||||
* property {@link #setClusterName(String)}.
|
||||
* <p>
|
||||
* The channels provided to the callers will be proxies to underlying channels that will be hot-swappable.
|
||||
* This means that the client code can continue to use the channel references while the actual
|
||||
* implementation can be switched in and out as required.
|
||||
*
|
||||
* @author Derek Hulley
|
||||
* @since 2.1.3
|
||||
*/
|
||||
public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean
|
||||
{
|
||||
/** The UDP protocol config (default) */
|
||||
public static final String DEFAULT_CONFIG_UDP = "classpath:alfresco/jgroups/alfresco-jgroups-UDP.xml";
|
||||
/** The TCP protocol config */
|
||||
public static final String DEFAULT_CONFIG_TCP = "classpath:alfresco/jgroups/alfresco-jgroups-TCP.xml";
|
||||
|
||||
private static Log logger = LogFactory.getLog(AlfrescoJGroupsChannelFactory.class);
|
||||
|
||||
// Synchronization locks
|
||||
private static ReadLock readLock;
|
||||
private static WriteLock writeLock;
|
||||
|
||||
// Values that are modified by the bean implementation
|
||||
private static String clusterNamePrefix;
|
||||
private static Map<String, String> configUrlsByAppRegion;
|
||||
|
||||
// Derived data
|
||||
/** A map that stores channel information by the application region. */
|
||||
private static final Map<String, ChannelProxy> channelsByAppRegion;
|
||||
|
||||
static
|
||||
{
|
||||
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); // Fair
|
||||
readLock = readWriteLock.readLock();
|
||||
writeLock = readWriteLock.writeLock();
|
||||
|
||||
channelsByAppRegion = new HashMap<String, ChannelProxy>(5);
|
||||
|
||||
clusterNamePrefix = null;
|
||||
configUrlsByAppRegion = new HashMap<String, String>(5);
|
||||
configUrlsByAppRegion.put(
|
||||
MessengerFactory.APP_REGION_DEFAULT,
|
||||
AlfrescoJGroupsChannelFactory.DEFAULT_CONFIG_UDP);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a cluster name was provided.
|
||||
*
|
||||
* @return Returns <tt>true</tt> if the cluster configuration is active,
|
||||
* i.e. a cluster name was provided
|
||||
*/
|
||||
public static boolean isClusterActive()
|
||||
{
|
||||
readLock.lock();
|
||||
try
|
||||
{
|
||||
return clusterNamePrefix != null;
|
||||
}
|
||||
finally
|
||||
{
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all channels. All the channels will be closed and will cease to function.
|
||||
*/
|
||||
private static void closeChannels()
|
||||
{
|
||||
for (Map.Entry<String, ChannelProxy> entry : channelsByAppRegion.entrySet())
|
||||
{
|
||||
ChannelProxy channelProxy = entry.getValue();
|
||||
|
||||
// Close the channel via the proxy
|
||||
try
|
||||
{
|
||||
channelProxy.close();
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("\n" +
|
||||
"Closed channel: " + channelProxy);
|
||||
}
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
logger.warn(
|
||||
"Unable to close channel: \n" +
|
||||
" Channel: " + channelProxy,
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configuration URL to use for the given application region. This might default to the
|
||||
* {@link #APP_REGION_DEFAULT default app region}.
|
||||
*/
|
||||
private static String getConfigUrl(String appRegion)
|
||||
{
|
||||
readLock.lock();
|
||||
try
|
||||
{
|
||||
// Get the configuration to use
|
||||
String configUrlStr = configUrlsByAppRegion.get(appRegion);
|
||||
if (!PropertyCheck.isValidPropertyString(configUrlStr))
|
||||
{
|
||||
configUrlStr = configUrlsByAppRegion.get(MessengerFactory.APP_REGION_DEFAULT);
|
||||
}
|
||||
if (configUrlStr == null)
|
||||
{
|
||||
throw new AlfrescoRuntimeException(
|
||||
"No protocol configuration was found for application region: \n" +
|
||||
" Cluster prefix: " + clusterNamePrefix + "\n" +
|
||||
" App region: " + appRegion + "\n" +
|
||||
" Regions defined: " + configUrlsByAppRegion);
|
||||
}
|
||||
return configUrlStr;
|
||||
}
|
||||
finally
|
||||
{
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* Creates a channel for the cluster. This method should not be heavily used
|
||||
* as the checks and synchronizations will slow the calls. Returned channels can be
|
||||
* kept and will be modified directly using the factory-held references, if necessary.
|
||||
* <p>
|
||||
* The application region is used to determine the protocol configuration to apply.
|
||||
* <p>
|
||||
* This method returns a dummy channel if no cluster name has been provided.
|
||||
*
|
||||
* @param appRegion the application region identifier.
|
||||
* @return Returns a channel
|
||||
*/
|
||||
public static Channel getChannel(String appRegion, boolean acceptLocalMessages)
|
||||
{
|
||||
readLock.lock();
|
||||
try
|
||||
{
|
||||
ChannelProxy channelProxy = channelsByAppRegion.get(appRegion);
|
||||
if (channelProxy != null)
|
||||
{
|
||||
// This will do
|
||||
return channelProxy;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
readLock.unlock();
|
||||
}
|
||||
// Being here means that there is no channel yet
|
||||
// Go write
|
||||
writeLock.lock();
|
||||
try
|
||||
{
|
||||
ChannelProxy channelProxy = channelsByAppRegion.get(appRegion);
|
||||
if (channelProxy != null)
|
||||
{
|
||||
// This will do
|
||||
return channelProxy;
|
||||
}
|
||||
// Get the channel
|
||||
Channel channel = getChannelInternal(appRegion, acceptLocalMessages);
|
||||
// Proxy the channel
|
||||
channelProxy = new ChannelProxy(channel);
|
||||
// Store the channel to the map
|
||||
channelsByAppRegion.put(appRegion, channelProxy);
|
||||
// Done
|
||||
return channelProxy;
|
||||
}
|
||||
finally
|
||||
{
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a channel for the given cluster. The application region is used
|
||||
* to determine the protocol configuration to apply.
|
||||
*
|
||||
* @param appRegion the application region identifier.
|
||||
* @return Returns a channel
|
||||
*/
|
||||
/* All calls to this are ultimately wrapped in the writeLock. */
|
||||
private static /*synchronized*/ Channel getChannelInternal(String appRegion, boolean acceptLocalMessages)
|
||||
{
|
||||
Channel channel;
|
||||
URL configUrl = null;
|
||||
// If there is no cluster defined (yet) then we define a dummy channel
|
||||
if (AlfrescoJGroupsChannelFactory.clusterNamePrefix == null)
|
||||
{
|
||||
try
|
||||
{
|
||||
channel = new DummyJChannel();
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
throw new AlfrescoRuntimeException(
|
||||
"Failed to create dummy JGroups channel: \n" +
|
||||
" Cluster prefix: " + clusterNamePrefix + "\n" +
|
||||
" App region: " + appRegion,
|
||||
e);
|
||||
}
|
||||
}
|
||||
else // Create real channel
|
||||
{
|
||||
// Get the protocol configuration to use
|
||||
String configUrlStr = getConfigUrl(appRegion);
|
||||
try
|
||||
{
|
||||
// Construct the JChannel directly
|
||||
configUrl = ResourceUtils.getURL(configUrlStr);
|
||||
channel = new JChannel(configUrl);
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
throw new AlfrescoRuntimeException(
|
||||
"Failed to create JGroups channel: \n" +
|
||||
" Cluster prefix: " + clusterNamePrefix + "\n" +
|
||||
" App region: " + appRegion + "\n" +
|
||||
" Regions defined: " + configUrlsByAppRegion + "\n" +
|
||||
" Configuration URL: " + configUrlStr,
|
||||
e);
|
||||
}
|
||||
}
|
||||
// Initialise the channel
|
||||
try
|
||||
{
|
||||
String clusterName = clusterNamePrefix + ":" + appRegion;
|
||||
// Don't accept messages from self
|
||||
if(acceptLocalMessages)
|
||||
{
|
||||
channel.setOpt(Channel.LOCAL, Boolean.TRUE);
|
||||
}
|
||||
else
|
||||
{
|
||||
channel.setOpt(Channel.LOCAL, Boolean.FALSE);
|
||||
}
|
||||
|
||||
// Connect
|
||||
channel.connect(clusterName);
|
||||
// Done
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("\n" +
|
||||
"Created JGroups channel: \n" +
|
||||
" Cluster prefix: " + clusterNamePrefix + "\n" +
|
||||
" App region: " + appRegion + "\n" +
|
||||
" Regions defined: " + configUrlsByAppRegion + "\n" +
|
||||
" Channel: " + channel + "\n" +
|
||||
" Configuration URL: " + configUrl);
|
||||
}
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
throw new AlfrescoRuntimeException(
|
||||
"Failed to initialise JGroups channel: \n" +
|
||||
" Cluster prefix: " + clusterNamePrefix + "\n" +
|
||||
" App region: " + appRegion + "\n" +
|
||||
" Channel: " + channel + "\n" +
|
||||
" Configuration URL: " + configUrl,
|
||||
e);
|
||||
}
|
||||
return channel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Rebuild all the channels using the current cluster name and configuration mappings.
|
||||
*/
|
||||
public static void rebuildChannels()
|
||||
{
|
||||
writeLock.lock();
|
||||
try
|
||||
{
|
||||
rebuildChannelsInternal();
|
||||
}
|
||||
finally
|
||||
{
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throw away all calculated values and rebuild. This means that the channel factory will
|
||||
* be reconstructed from scratch. All the channels are reconstructed - but this will not
|
||||
* affect any references to channels held outside this class as the values returned are proxies
|
||||
* on top of hot swappable implementations.
|
||||
* <p>
|
||||
* The old channel is closed before the new one is created, so it is possible for a channel
|
||||
* held by client code to be rendered unusable during the switch-over.
|
||||
*/
|
||||
/* All calls to this are ultimately wrapped in the writeLock. */
|
||||
private static /*synchronized*/ void rebuildChannelsInternal()
|
||||
{
|
||||
// Reprocess all the application regions with the new data
|
||||
for (Map.Entry<String, ChannelProxy> entry : channelsByAppRegion.entrySet())
|
||||
{
|
||||
String appRegion = entry.getKey();
|
||||
ChannelProxy channelProxy = entry.getValue();
|
||||
|
||||
// Get the old channel
|
||||
Channel oldChannel = channelProxy.getDelegate();
|
||||
|
||||
Boolean acceptLocalMessages = (Boolean)oldChannel.getOpt(Channel.LOCAL);
|
||||
|
||||
// Close the old channel.
|
||||
try
|
||||
{
|
||||
oldChannel.close();
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("\n" +
|
||||
"Closed old channel during channel rebuild: \n" +
|
||||
" Old channel: " + oldChannel);
|
||||
}
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
logger.warn(
|
||||
"Unable to close old channel during channel rebuild: \n" +
|
||||
" Old channel: " + oldChannel,
|
||||
e);
|
||||
}
|
||||
|
||||
// Create the new channel
|
||||
Channel newChannel = getChannelInternal(appRegion, acceptLocalMessages.booleanValue());
|
||||
|
||||
// Now do the hot-swap
|
||||
channelProxy.swap(newChannel);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the prefix used to identify the different clusters. Each application region will
|
||||
* have a separate cluster name that will be:
|
||||
* <pre>
|
||||
* clusterNamePrefix:appRegion
|
||||
* </pre>
|
||||
* If no cluster name prefix is declared, the cluster is effectively disabled.
|
||||
* <p>
|
||||
* <b>NOTE: </b>The channels must be {@link #rebuildChannels() rebuilt}.
|
||||
*
|
||||
* @param clusterNamePrefix a prefix to append to the cluster names used
|
||||
*/
|
||||
public static void changeClusterNamePrefix(String clusterNamePrefix)
|
||||
{
|
||||
writeLock.lock();
|
||||
try
|
||||
{
|
||||
if (!PropertyCheck.isValidPropertyString(clusterNamePrefix))
|
||||
{
|
||||
// Clear everything out
|
||||
AlfrescoJGroupsChannelFactory.clusterNamePrefix = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
AlfrescoJGroupsChannelFactory.clusterNamePrefix = clusterNamePrefix;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure a mapping between the application regions and the available JGroup protocol configurations.
|
||||
* The map <b>must</b> contain a mapping for application region 'DEFAULT'.
|
||||
* <p>
|
||||
* <b>NOTE: </b>The channels must be {@link #rebuildChannels() rebuilt}.
|
||||
*
|
||||
* @param configUrlsByAppRegion a mapping from application region (keys) to protocol configuration URLs (values)
|
||||
*/
|
||||
private static void changeConfigUrlsMapping(Map<String, String> configUrlsByAppRegion)
|
||||
{
|
||||
writeLock.lock();
|
||||
try
|
||||
{
|
||||
// Check that there is a mapping for default
|
||||
if (!configUrlsByAppRegion.containsKey(MessengerFactory.APP_REGION_DEFAULT))
|
||||
{
|
||||
throw new AlfrescoRuntimeException("A configuration URL must be defined for 'DEFAULT'");
|
||||
}
|
||||
AlfrescoJGroupsChannelFactory.configUrlsByAppRegion = configUrlsByAppRegion;
|
||||
}
|
||||
finally
|
||||
{
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Bean-enabling constructor
|
||||
*/
|
||||
public AlfrescoJGroupsChannelFactory()
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* @see AlfrescoJGroupsChannelFactory#changeClusterNamePrefix(String)
|
||||
*/
|
||||
public void setClusterName(String clusterName)
|
||||
{
|
||||
AlfrescoJGroupsChannelFactory.changeClusterNamePrefix(clusterName);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see AlfrescoJGroupsChannelFactory#changeConfigUrlsMapping(Map)
|
||||
*/
|
||||
public void setConfigUrlsByAppRegion(Map<String, String> configUrlsByAppRegion)
|
||||
{
|
||||
AlfrescoJGroupsChannelFactory.changeConfigUrlsMapping(configUrlsByAppRegion);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link #setConfigUrlsByAppRegion(Map)}
|
||||
*/
|
||||
public void setProtocolStackMapping(Map<String, String> unused)
|
||||
{
|
||||
throw new AlfrescoRuntimeException(
|
||||
"Properties 'protocolStackMapping' and 'jgroupsConfigurationUrl'" +
|
||||
" have been deprecated in favour of 'configUrlsByAppRegion'.");
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link #setConfigUrlsByAppRegion(Map)}
|
||||
*/
|
||||
public void setJgroupsConfigurationUrl(String configUrl)
|
||||
{
|
||||
throw new AlfrescoRuntimeException(
|
||||
"Properties 'protocolStackMapping' and 'jgroupsConfigurationUrl'" +
|
||||
" have been deprecated in favour of 'configUrlsByAppRegion'.");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onBootstrap(ApplicationEvent event)
|
||||
{
|
||||
AlfrescoJGroupsChannelFactory.rebuildChannels();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onShutdown(ApplicationEvent event)
|
||||
{
|
||||
AlfrescoJGroupsChannelFactory.closeChannels();
|
||||
}
|
||||
|
||||
/**
|
||||
* A no-op JChannel using the "DUMMY_TP" protocol only
|
||||
*
|
||||
* @author Derek Hulley
|
||||
* @since 2.1.3
|
||||
*/
|
||||
private static class DummyJChannel extends JChannel
|
||||
{
|
||||
public DummyJChannel() throws ChannelException
|
||||
{
|
||||
super("org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory$DummyProtocol");
|
||||
}
|
||||
}
|
||||
|
||||
public static class DummyProtocol extends LOOPBACK
|
||||
{
|
||||
public DummyProtocol()
|
||||
{
|
||||
super();
|
||||
enable_diagnostics = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return "ALF_DUMMY";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object down(Event evt)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object up(Event evt)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A proxy channel that can be used to hot-swap underlying channels. All listeners
|
||||
* and the receiver will be carried over to the new underlying channel when it is
|
||||
* swapped out.
|
||||
*
|
||||
* @author Derek Hulley
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public static class ChannelProxy extends Channel
|
||||
{
|
||||
/*
|
||||
* Not synchronizing. Mostly swapping will be VERY rare and if there is a bit
|
||||
* of inconsistency it is not important.
|
||||
*/
|
||||
private Channel delegate;
|
||||
private UpHandler delegateUpHandler;
|
||||
private Set<ChannelListener> delegateChannelListeners;
|
||||
private Receiver delegateReceiver;
|
||||
|
||||
/**
|
||||
* @param delegate the real channel that will do the work
|
||||
*/
|
||||
public ChannelProxy(Channel delegate)
|
||||
{
|
||||
this.delegate = delegate;
|
||||
this.delegateChannelListeners = new HashSet<ChannelListener>(7);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the channel to which the implementation will delegate
|
||||
*/
|
||||
public Channel getDelegate()
|
||||
{
|
||||
return delegate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Swap the channel. The old delegate will be disconnected before the swap occurs.
|
||||
* This guarantees data consistency, assuming that any failures will be handled.
|
||||
* <p>
|
||||
* Note that the old delegate is not closed or shutdown.
|
||||
*
|
||||
* @param the new delegate
|
||||
* @return the old, disconnected delegate
|
||||
*/
|
||||
public synchronized Channel swap(Channel channel)
|
||||
{
|
||||
// Remove the listeners from the old channel
|
||||
delegate.setReceiver(null);
|
||||
for (ChannelListener delegateChannelListener : delegateChannelListeners)
|
||||
{
|
||||
delegate.removeChannelListener(delegateChannelListener);
|
||||
}
|
||||
delegate.setUpHandler(null);
|
||||
|
||||
Channel oldDelegate = delegate;
|
||||
|
||||
// Assign the new delegate and carry the listeners over
|
||||
delegate = channel;
|
||||
delegate.setReceiver(delegateReceiver);
|
||||
delegate.setOpt(Channel.LOCAL, oldDelegate.getOpt(Channel.LOCAL));
|
||||
for (ChannelListener delegateChannelListener : delegateChannelListeners)
|
||||
{
|
||||
delegate.addChannelListener(delegateChannelListener);
|
||||
}
|
||||
delegate.setUpHandler(delegateUpHandler);
|
||||
// Done
|
||||
return oldDelegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected org.jgroups.logging.Log getLog()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Address getAddress()
|
||||
{
|
||||
return delegate.getAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return delegate.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProtocolStack getProtocolStack()
|
||||
{
|
||||
return delegate.getProtocolStack();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setReceiver(Receiver r)
|
||||
{
|
||||
delegateReceiver = r;
|
||||
delegate.setReceiver(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void addChannelListener(ChannelListener listener)
|
||||
{
|
||||
if (listener == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
delegateChannelListeners.add(listener);
|
||||
delegate.addChannelListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void removeChannelListener(ChannelListener listener)
|
||||
{
|
||||
if (listener != null)
|
||||
{
|
||||
delegateChannelListeners.remove(listener);
|
||||
}
|
||||
delegate.removeChannelListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void clearChannelListeners()
|
||||
{
|
||||
delegateChannelListeners.clear();
|
||||
delegate.clearChannelListeners();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setUpHandler(UpHandler up_handler)
|
||||
{
|
||||
delegateUpHandler = up_handler;
|
||||
delegate.setUpHandler(up_handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockOk()
|
||||
{
|
||||
delegate.blockOk();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect(String cluster_name, Address target, String state_id, long timeout) throws ChannelException
|
||||
{
|
||||
delegate.connect(cluster_name, target, state_id, timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect(String cluster_name) throws ChannelException
|
||||
{
|
||||
delegate.connect(cluster_name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect()
|
||||
{
|
||||
delegate.disconnect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void down(Event evt)
|
||||
{
|
||||
delegate.down(evt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object downcall(Event evt)
|
||||
{
|
||||
return delegate.downcall(evt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String dumpQueue()
|
||||
{
|
||||
return delegate.dumpQueue();
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public Map dumpStats()
|
||||
{
|
||||
return delegate.dumpStats();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj)
|
||||
{
|
||||
return delegate.equals(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean flushSupported()
|
||||
{
|
||||
return delegate.flushSupported();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Override
|
||||
public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException, ChannelClosedException
|
||||
{
|
||||
return delegate.getAllStates(targets, timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getChannelName()
|
||||
{
|
||||
return delegate.getChannelName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterName()
|
||||
{
|
||||
return delegate.getClusterName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getInfo()
|
||||
{
|
||||
return delegate.getInfo();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Address getLocalAddress()
|
||||
{
|
||||
return delegate.getLocalAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumMessages()
|
||||
{
|
||||
return delegate.getNumMessages();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getOpt(int option)
|
||||
{
|
||||
return delegate.getOpt(option);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException
|
||||
{
|
||||
return delegate.getState(target, timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getState(Address target, String state_id, long timeout) throws ChannelNotConnectedException, ChannelClosedException
|
||||
{
|
||||
return delegate.getState(target, state_id, timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public View getView()
|
||||
{
|
||||
return delegate.getView();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return delegate.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConnected()
|
||||
{
|
||||
return delegate.isConnected();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen()
|
||||
{
|
||||
return delegate.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() throws ChannelException
|
||||
{
|
||||
delegate.open();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
|
||||
{
|
||||
return delegate.peek(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
|
||||
{
|
||||
return delegate.receive(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void returnState(byte[] state, String state_id)
|
||||
{
|
||||
delegate.returnState(state, state_id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void returnState(byte[] state)
|
||||
{
|
||||
delegate.returnState(state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException, ChannelClosedException
|
||||
{
|
||||
delegate.send(dst, src, obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException
|
||||
{
|
||||
delegate.send(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setChannelListener(ChannelListener channel_listener)
|
||||
{
|
||||
delegate.setChannelListener(channel_listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setInfo(String key, Object value)
|
||||
{
|
||||
delegate.setInfo(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOpt(int option, Object value)
|
||||
{
|
||||
delegate.setOpt(option, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown()
|
||||
{
|
||||
delegate.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean startFlush(boolean automatic_resume)
|
||||
{
|
||||
return delegate.startFlush(automatic_resume);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean startFlush(List<Address> flushParticipants, boolean automatic_resume)
|
||||
{
|
||||
return delegate.startFlush(flushParticipants, automatic_resume);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean startFlush(long timeout, boolean automatic_resume)
|
||||
{
|
||||
return delegate.startFlush(timeout, automatic_resume);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopFlush()
|
||||
{
|
||||
delegate.stopFlush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopFlush(List<Address> flushParticipants)
|
||||
{
|
||||
delegate.stopFlush(flushParticipants);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String toString()
|
||||
{
|
||||
if (delegate instanceof DummyJChannel)
|
||||
{
|
||||
return delegate.toString() + "(dummy)";
|
||||
}
|
||||
else
|
||||
{
|
||||
return delegate.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName(Address member)
|
||||
{
|
||||
return delegate.getName(member);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Address dst, Address src, byte[] buf) throws ChannelNotConnectedException, ChannelClosedException
|
||||
{
|
||||
delegate.send(dst, src, buf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Address dst, Address src, byte[] buf, int offset, int length) throws ChannelNotConnectedException, ChannelClosedException
|
||||
{
|
||||
delegate.send(dst, src, buf, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setName(String name)
|
||||
{
|
||||
delegate.setName(name);
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,89 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2010 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.alfresco.repo.jgroups;
|
||||
|
||||
import org.jgroups.Channel;
|
||||
import org.jgroups.Message;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
/**
|
||||
* @see AlfrescoJGroupsChannelFactory
|
||||
*
|
||||
* @author Derek Hulley
|
||||
* @since 2.1.3
|
||||
*/
|
||||
public class AlfrescoJGroupsChannelFactoryTest extends TestCase
|
||||
{
|
||||
private static byte[] bytes = new byte[65536];
|
||||
static
|
||||
{
|
||||
for (int i = 0; i < bytes.length; i++)
|
||||
{
|
||||
bytes[i] = 1;
|
||||
}
|
||||
}
|
||||
|
||||
private String appRegion;
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception
|
||||
{
|
||||
appRegion = getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that the channel is behaving
|
||||
*/
|
||||
private void stressChannel(Channel channel) throws Exception
|
||||
{
|
||||
System.out.println("Test: " + getName());
|
||||
System.out.println(" Channel: " + channel);
|
||||
System.out.println(" Cluster: " + channel.getClusterName());
|
||||
channel.send(null, null, Boolean.TRUE);
|
||||
channel.send(new Message(null, null, bytes));
|
||||
}
|
||||
|
||||
public void testNoCluster() throws Exception
|
||||
{
|
||||
Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion, false);
|
||||
stressChannel(channel);
|
||||
}
|
||||
|
||||
public void testBasicCluster() throws Exception
|
||||
{
|
||||
AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("blah");
|
||||
AlfrescoJGroupsChannelFactory.rebuildChannels();
|
||||
Channel channel = AlfrescoJGroupsChannelFactory.getChannel(appRegion, false);
|
||||
stressChannel(channel);
|
||||
}
|
||||
|
||||
public void testHotSwapCluster() throws Exception
|
||||
{
|
||||
AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("ONE");
|
||||
AlfrescoJGroupsChannelFactory.rebuildChannels();
|
||||
Channel channel1 = AlfrescoJGroupsChannelFactory.getChannel(appRegion, false);
|
||||
stressChannel(channel1);
|
||||
AlfrescoJGroupsChannelFactory.changeClusterNamePrefix("TWO");
|
||||
AlfrescoJGroupsChannelFactory.rebuildChannels();
|
||||
Channel channel2 = AlfrescoJGroupsChannelFactory.getChannel(appRegion, false);
|
||||
stressChannel(channel1);
|
||||
assertTrue("Channel reference must be the same", channel1 == channel2);
|
||||
}
|
||||
}
|
@@ -531,7 +531,6 @@ public abstract class AlfrescoTransactionSupport
|
||||
private final Set<LuceneIndexerAndSearcher> lucenes;
|
||||
private final LinkedHashSet<TransactionListener> listeners;
|
||||
private final Set<TransactionalCache<Serializable, Object>> transactionalCaches;
|
||||
// private final Set<JGroupsEhCacheListener> jgroupsEhCacheListeners;
|
||||
private final Map<Object, Object> resources;
|
||||
|
||||
/**
|
||||
@@ -548,7 +547,6 @@ public abstract class AlfrescoTransactionSupport
|
||||
lucenes = new HashSet<LuceneIndexerAndSearcher>(3);
|
||||
listeners = new LinkedHashSet<TransactionListener>(5);
|
||||
transactionalCaches = new HashSet<TransactionalCache<Serializable, Object>>(3);
|
||||
// jgroupsEhCacheListeners = new HashSet<JGroupsEhCacheListener>(3);
|
||||
resources = new HashMap<Object, Object>(17);
|
||||
}
|
||||
|
||||
@@ -602,10 +600,6 @@ public abstract class AlfrescoTransactionSupport
|
||||
{
|
||||
return transactionalCaches.add((TransactionalCache<Serializable, Object>)listener);
|
||||
}
|
||||
// else if (listener instanceof JGroupsEhCacheListener)
|
||||
// {
|
||||
// return jgroupsEhCacheListeners.add((JGroupsEhCacheListener)listener);
|
||||
// }
|
||||
else
|
||||
{
|
||||
return listeners.add(listener);
|
||||
@@ -709,12 +703,6 @@ public abstract class AlfrescoTransactionSupport
|
||||
{
|
||||
cache.beforeCommit(readOnly);
|
||||
}
|
||||
//
|
||||
// // Flush the JGroups listeners
|
||||
// for (JGroupsEhCacheListener listener : jgroupsEhCacheListeners)
|
||||
// {
|
||||
// listener.beforeCommit(readOnly);
|
||||
// }
|
||||
}
|
||||
|
||||
/**
|
||||
|
@@ -2,26 +2,21 @@
|
||||
<!DOCTYPE beans PUBLIC '-//SPRING//DTD BEAN//EN' 'http://www.springframework.org/dtd/spring-beans.dtd'>
|
||||
|
||||
<beans>
|
||||
|
||||
<!--
|
||||
Unlike the JGroupsMessengerFactory, this messengerFactory bean doesn't require an explicit
|
||||
dependency (through the depends-on attribute) as it specifies the dependency through the
|
||||
reference to the hazelcastInstance bean.
|
||||
-->
|
||||
<bean id="messengerFactory" class="org.alfresco.repo.cluster.HazelcastMessengerFactory">
|
||||
<property name="hazelcast" ref="hazelcastInstance"/>
|
||||
<property name="hazelcastInstanceFactory" ref="hazelcastInstanceFactory"/>
|
||||
</bean>
|
||||
|
||||
<bean id="hazelcastInstance" class="com.hazelcast.core.Hazelcast" factory-method="newHazelcastInstance">
|
||||
<constructor-arg>
|
||||
<bean class="com.hazelcast.config.Config">
|
||||
<property name="groupConfig">
|
||||
<bean class="com.hazelcast.config.GroupConfig">
|
||||
<property name="name" value="testcluster"/>
|
||||
<property name="password" value="secret"/>
|
||||
</bean>
|
||||
<bean id="hazelcastConfig" class="org.alfresco.repo.cluster.HazelcastConfigFactoryBean">
|
||||
<property name="configFile" value="classpath:alfresco/hazelcast/hazelcast-udp.xml"/>
|
||||
<property name="properties">
|
||||
<props>
|
||||
<prop key="alfresco.cluster.name">test_hazelcast_cluster</prop>
|
||||
<prop key="alfresco.hazelcast.password">test_hazelcast_cluster_password</prop>
|
||||
</props>
|
||||
</property>
|
||||
</bean>
|
||||
</constructor-arg>
|
||||
|
||||
<bean id="hazelcastInstanceFactory" class="org.alfresco.repo.cluster.HazelcastInstanceFactory">
|
||||
<property name="config" ref="hazelcastConfig"/>
|
||||
</bean>
|
||||
</beans>
|
||||
|
@@ -1,22 +0,0 @@
|
||||
<?xml version='1.0' encoding='UTF-8'?>
|
||||
<!DOCTYPE beans PUBLIC '-//SPRING//DTD BEAN//EN' 'http://www.springframework.org/dtd/spring-beans.dtd'>
|
||||
|
||||
<beans>
|
||||
<bean name="jgroupsChannelFactory"
|
||||
class="org.alfresco.repo.jgroups.AlfrescoJGroupsChannelFactory">
|
||||
<property name="clusterName">
|
||||
<value>testcluster</value>
|
||||
</property>
|
||||
<property name="configUrlsByAppRegion">
|
||||
<map>
|
||||
<entry key="DEFAULT">
|
||||
<value>classpath:udp.xml</value>
|
||||
</entry>
|
||||
</map>
|
||||
</property>
|
||||
</bean>
|
||||
|
||||
<bean id="messengerFactory"
|
||||
class="org.alfresco.repo.cluster.JGroupsMessengerFactory"
|
||||
depends-on="jgroupsChannelFactory"/>
|
||||
</beans>
|
Reference in New Issue
Block a user