/*
 * Copyright (C) 2005-2010 Alfresco Software Limited.
 *
 * This file is part of Alfresco
 *
 * Alfresco is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Alfresco is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with Alfresco. If not, see 
* The cluster name needs to be set before any communication is possible. This can be done using the * property {@link #setClusterName(String)}. *
 * The channels provided to the callers will be proxies to underlying channels that will be hot-swappable.
 * This means that the client code can continue to use the channel references while the actual
 * implementation can be switched in and out as required.
 *  
 * @author Derek Hulley
 * @since 2.1.3
 */
public class AlfrescoJGroupsChannelFactory extends AbstractLifecycleBean
{
    /** A catch-all for unknown application regions. */
    public static final String APP_REGION_DEFAULT = "DEFAULT";
    /** The application region used by the EHCache heartbeat implementation over JGroups. */
    public static final String APP_REGION_EHCACHE_HEARTBEAT = "EHCACHE_HEARTBEAT";
    /** The UDP protocol config (default) */
    public static final String DEFAULT_CONFIG_UDP = "classpath:alfresco/jgroups/alfresco-jgroups-UDP.xml";
    /** The TCP protocol config */
    public static final String DEFAULT_CONFIG_TCP = "classpath:alfresco/jgroups/alfresco-jgroups-TCP.xml";
    
    private static Log logger = LogFactory.getLog(AlfrescoJGroupsChannelFactory.class);
    
    // Synchronization locks
    private static ReadLock readLock;
    private static WriteLock writeLock;
    
    // Values that are modified by the bean implementation
    private static String clusterNamePrefix;
    private static Map 
     * The application region is used to determine the protocol configuration to apply.
     *  
     * This method returns a dummy channel if no cluster name has been provided.
     * 
     * @param appRegion             the application region identifier.
     * @return                      Returns a channel
     */
    public static Channel getChannel(String appRegion)
    {
        readLock.lock();
        try
        {
            ChannelProxy channelProxy = channelsByAppRegion.get(appRegion);
            if (channelProxy != null)
            {
                // This will do
                return channelProxy;
            }
        }
        finally
        {
            readLock.unlock();
        }
        // Being here means that there is no channel yet
        // Go write
        writeLock.lock();
        try
        {
            ChannelProxy channelProxy = channelsByAppRegion.get(appRegion);
            if (channelProxy != null)
            {
                // This will do
                return channelProxy;
            }
            // Get the channel
            Channel channel = getChannelInternal(appRegion);
            // Proxy the channel
            channelProxy = new ChannelProxy(channel);
            // Store the channel to the map
            channelsByAppRegion.put(appRegion, channelProxy);
            // Done
            return channelProxy;
        }
        finally
        {
            writeLock.unlock();
        }
    }
    
    /**
     * Creates a channel for the given cluster.  The application region is used
     * to determine the protocol configuration to apply.
     * 
     * @param appRegion             the application region identifier.
     * @return                      Returns a channel
     */
    /* All calls to this are ultimately wrapped in the writeLock. */
    private static /*synchronized*/ Channel getChannelInternal(String appRegion)
    {
        Channel channel;
        URL configUrl = null;
        // If there is no cluster defined (yet) then we define a dummy channel
        if (AlfrescoJGroupsChannelFactory.clusterNamePrefix == null)
        {
            try
            {
                channel = new DummyJChannel();
            }
            catch (Throwable e)
            {
                throw new AlfrescoRuntimeException(
                        "Failed to create dummy JGroups channel: \n" +
                        "   Cluster prefix:    " + clusterNamePrefix + "\n" +
                        "   App region:        " + appRegion,
                        e);
            }
        }
        else                    // Create real channel
        {
            // Get the protocol configuration to use
            String configUrlStr = getConfigUrl(appRegion);
            try
            {
                // Construct the JChannel directly
                configUrl = ResourceUtils.getURL(configUrlStr);
                channel = new JChannel(configUrl);
            }
            catch (Throwable e)
            {
                throw new AlfrescoRuntimeException(
                        "Failed to create JGroups channel: \n" +
                        "   Cluster prefix:    " + clusterNamePrefix + "\n" +
                        "   App region:        " + appRegion + "\n" +
                        "   Regions defined: " + configUrlsByAppRegion + "\n" +
                        "   Configuration URL: " + configUrlStr,
                        e);
            }
        }
        // Initialise the channel
        try
        {
            String clusterName = clusterNamePrefix + ":" + appRegion;
            // Don't accept messages from self
            channel.setOpt(Channel.LOCAL, Boolean.FALSE);
            // Connect
            channel.connect(clusterName);
            // Done
            if (logger.isDebugEnabled())
            {
                logger.debug("\n" +
                        "Created JGroups channel: \n" +
                        "   Cluster prefix:    " + clusterNamePrefix + "\n" +
                        "   App region:        " + appRegion + "\n" +
                        "   Regions defined: " + configUrlsByAppRegion + "\n" +
                        "   Channel:           " + channel + "\n" +
                        "   Configuration URL: " + configUrl);
            }
        }
        catch (Throwable e)
        {
            throw new AlfrescoRuntimeException(
                    "Failed to initialise JGroups channel: \n" +
                    "   Cluster prefix:    " + clusterNamePrefix + "\n" +
                    "   App region:        " + appRegion + "\n" +
                    "   Channel:           " + channel + "\n" +
                    "   Configuration URL: " + configUrl,
                    e);
        }
        return channel;
    }
    
    /**
     * Rebuild all the channels using the current cluster name and configuration mappings.
     */
    public static void rebuildChannels()
    {
        writeLock.lock();
        try
        {
            rebuildChannelsInternal();
        }
        finally
        {
            writeLock.unlock();
        }
    }
    
    /**
     * Throw away all calculated values and rebuild.  This means that the channel factory will
     * be reconstructed from scratch.  All the channels are reconstructed - but this will not
     * affect any references to channels held outside this class as the values returned are proxies
     * on top of hot swappable implementations.
     *  
     * The old channel is closed before the new one is created, so it is possible for a channel
     * held by client code to be rendered unusable during the switch-over.
     */
    /* All calls to this are ultimately wrapped in the writeLock. */
    private static /*synchronized*/ void rebuildChannelsInternal()
    {
        // Reprocess all the application regions with the new data
        for (Map.Entry 
     * NOTE: The channels must be {@link #rebuildChannels() rebuilt}.
     * 
     * @param clusterNamePrefix     a prefix to append to the cluster names used
     */
    public static void changeClusterNamePrefix(String clusterNamePrefix)
    {
        writeLock.lock();
        try
        {
            if (!PropertyCheck.isValidPropertyString(clusterNamePrefix))
            {
                // Clear everything out
                AlfrescoJGroupsChannelFactory.clusterNamePrefix = null;
            }
            else
            {
                AlfrescoJGroupsChannelFactory.clusterNamePrefix = clusterNamePrefix;
            }
        }
        finally
        {
            writeLock.unlock();
        }
    }
    
    /**
     * Configure a mapping between the application regions and the available JGroup protocol configurations.
     * The map must contain a mapping for application region 'DEFAULT'.
     *  
     * NOTE: The channels must be {@link #rebuildChannels() rebuilt}.
     * 
     * @param configUrlsByAppRegion     a mapping from application region (keys) to protocol configuration URLs (values)
     */
    private static void changeConfigUrlsMapping(Map 
         * Note that the old delegate is not closed or shutdown.
         * 
         * @param           the new delegate
         * @return          the old, disconnected delegate
         */
        public synchronized Channel swap(Channel channel)
        {
            // Remove the listeners from the old channel
            delegate.setReceiver(null);
            for (ChannelListener delegateChannelListener : delegateChannelListeners)
            {
                delegate.removeChannelListener(delegateChannelListener);
            }
            delegate.setUpHandler(null);
            
            Channel oldDelegate = delegate;
            
            // Assign the new delegate and carry the listeners over
            delegate = channel;
            delegate.setReceiver(delegateReceiver);
            for (ChannelListener delegateChannelListener : delegateChannelListeners)
            {
                delegate.addChannelListener(delegateChannelListener);
            }
            delegate.setUpHandler(delegateUpHandler);
            // Done
            return oldDelegate;
        }
        @Override
        protected org.jgroups.logging.Log getLog()
        {
            throw new UnsupportedOperationException();
        }
        @Override
        public Address getAddress()
        {
            return delegate.getAddress();
        }
        @Override
        public String getName()
        {
            return delegate.getName();
        }
        @Override
        public ProtocolStack getProtocolStack()
        {
            return delegate.getProtocolStack();
        }
        @Override
        public synchronized void setReceiver(Receiver r)
        {
            delegateReceiver = r;
            delegate.setReceiver(r);
        }
        @Override
        public synchronized void addChannelListener(ChannelListener listener)
        {
            if (listener == null)
            {
                return;
            }
            delegateChannelListeners.add(listener);
            delegate.addChannelListener(listener);
        }
        @Override
        public synchronized void removeChannelListener(ChannelListener listener)
        {
            if (listener != null)
            {
                delegateChannelListeners.remove(listener);
            }
            delegate.removeChannelListener(listener);
        }
        @Override
        public synchronized void clearChannelListeners()
        {
            delegateChannelListeners.clear();
            delegate.clearChannelListeners();
        }
        @Override
        public synchronized void setUpHandler(UpHandler up_handler)
        {
            delegateUpHandler = up_handler;
            delegate.setUpHandler(up_handler);
        }
        @Override
        public void blockOk()
        {
            delegate.blockOk();
        }
        @Override
        public void close()
        {
            delegate.close();
        }
        @Override
        public void connect(String cluster_name, Address target, String state_id, long timeout) throws ChannelException
        {
            delegate.connect(cluster_name, target, state_id, timeout);
        }
        @Override
        public void connect(String cluster_name) throws ChannelException
        {
            delegate.connect(cluster_name);
        }
        @Override
        public void disconnect()
        {
            delegate.disconnect();
        }
        @Override
        public void down(Event evt)
        {
            delegate.down(evt);
        }
        @Override
        public Object downcall(Event evt)
        {
            return delegate.downcall(evt);
        }
        @Override
        public String dumpQueue()
        {
            return delegate.dumpQueue();
        }
        @Override
        @SuppressWarnings({ "unchecked", "rawtypes" })
        public Map dumpStats()
        {
            return delegate.dumpStats();
        }
        @Override
        public boolean equals(Object obj)
        {
            return delegate.equals(obj);
        }
        @Override
        public boolean flushSupported()
        {
            return delegate.flushSupported();
        }
        @SuppressWarnings("rawtypes")
        @Override
        public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException, ChannelClosedException
        {
            return delegate.getAllStates(targets, timeout);
        }
        @Override
        public String getChannelName()
        {
            return delegate.getChannelName();
        }
        @Override
        public String getClusterName()
        {
            return delegate.getClusterName();
        }
        @Override
        public Map
     *    clusterNamePrefix:appRegion
     * 
     * If no cluster name prefix is declared, the cluster is effectively disabled.
     *