/*
 * Copyright (C) 2005-2013 Alfresco Software Limited.
 *
 * This file is part of Alfresco
 *
 * Alfresco is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Alfresco is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with Alfresco. If not, see .
 */
package org.alfresco.repo.cache;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.alfresco.repo.tenant.TenantService;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
import org.alfresco.repo.transaction.TransactionListener;
import org.alfresco.util.PropertyCheck;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.InitializingBean;
/**
 * The base implementation for an asynchronously refreshed cache. Currently supports one value per tenant. Implementors
 * just need to provide buildCache(String tenanaId)
 * 
 * @author Andy
 * @since 4.1.3
 */
public abstract class AbstractAsynchronouslyRefreshedCache implements AsynchronouslyRefreshedCache, RefreshableCacheListener, Callable, BeanNameAware,
        InitializingBean, TransactionListener
{
    private static final String RESOURCE_KEY_TXN_DATA = "AbstractAsynchronouslyRefreshedCache.TxnData";
    
    private static Log logger = LogFactory.getLog(AbstractAsynchronouslyRefreshedCache.class);
    private enum RefreshState
    {
        IDLE, WAITING, RUNNING, DONE
    };
    private ThreadPoolExecutor threadPoolExecutor;
    private AsynchronouslyRefreshedCacheRegistry registry;
    private TenantService tenantService;
    // State
    private List listeners = new LinkedList();
    private final ReentrantReadWriteLock liveLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock refreshLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock runLock = new ReentrantReadWriteLock();
    private HashMap live = new HashMap();
    private LinkedHashSet refreshQueue = new LinkedHashSet();
    private String cacheId;
    private RefreshState refreshState = RefreshState.IDLE;
    private String resourceKeyTxnData;
    @Override
    public void register(RefreshableCacheListener listener)
    {
        listeners.add(listener);
    }
    /**
     * @param threadPool
     *            the threadPool to set
     */
    public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor)
    {
        this.threadPoolExecutor = threadPoolExecutor;
    }
    /**
     * @param registry
     *            the registry to set
     */
    public void setRegistry(AsynchronouslyRefreshedCacheRegistry registry)
    {
        this.registry = registry;
    }
    /**
     * @param tenantService
     *            the tenantService to set
     */
    public void setTenantService(TenantService tenantService)
    {
        this.tenantService = tenantService;
    }
    public void init()
    {
        registry.register(this);
    }
    @Override
    public T get()
    {
        String tenantId = tenantService.getCurrentUserDomain();
        liveLock.readLock().lock();
        try
        {
            if (live.get(tenantId) != null)
            {
                if (logger.isDebugEnabled())
                {
                    logger.debug("get() from cache");
                }
                return live.get(tenantId);
            }
        }
        finally
        {
            liveLock.readLock().unlock();
        }
        if (logger.isDebugEnabled())
        {
            logger.debug("get() miss, sechudling and waiting ...");
        }
        // There was nothing to return so we build and return
        Refresh refresh = null;
        refreshLock.writeLock().lock();
        try
        {
            // Is there anything we can wait for
            for (Refresh existing : refreshQueue)
            {
                if (existing.getTenantId().equals(tenantId))
                {
                    if (logger.isDebugEnabled())
                    {
                        logger.debug("get() found existing build to wait for  ...");
                    }
                    refresh = existing;
                }
            }
            if (refresh == null)
            {
                if (logger.isDebugEnabled())
                {
                    logger.debug("get() building from scratch");
                }
                refresh = new Refresh(tenantId);
                refreshQueue.add(refresh);
            }
        }
        finally
        {
            refreshLock.writeLock().unlock();
        }
        submit();
        waitForBuild(refresh);
        return get();
    }
    public void forceInChangesForThisUncommittedTransaction()
    {
        String tenantId = tenantService.getCurrentUserDomain();
        if (logger.isDebugEnabled())
        {
            logger.debug("Building cache for tenant" + tenantId + " ......");
        }
        T cache = buildCache(tenantId);
        if (logger.isDebugEnabled())
        {
            logger.debug(".... cache built for tenant" + tenantId);
        }
        liveLock.writeLock().lock();
        try
        {
            live.put(tenantId, cache);
        }
        finally
        {
            liveLock.writeLock().unlock();
        }
    }
    protected void waitForBuild(Refresh refresh)
    {
        while (refresh.getState() != RefreshState.DONE)
        {
            synchronized (refresh)
            {
                try
                {
                    refresh.wait(100);
                }
                catch (InterruptedException e)
                {
                }
            }
        }
    }
    @Override
    public void refresh()
    {
        String tenantId = tenantService.getCurrentUserDomain();
        if (logger.isDebugEnabled())
        {
            logger.debug("Async cache refresh request: " + cacheId + " for tenant " + tenantId);
        }
        registry.broadcastEvent(new RefreshableCacheRefreshEvent(cacheId, tenantId), true);
    }
    @Override
    public void onRefreshableCacheEvent(RefreshableCacheEvent refreshableCacheEvent)
    {
        if (logger.isDebugEnabled())
        {
            logger.debug("Async cache onRefreshableCacheEvent " + refreshableCacheEvent);
        }
        if (false == refreshableCacheEvent.getCacheId().equals(cacheId))
        {
            return;
        }
        // If in a transaction delay the refresh until after it commits
        if (AlfrescoTransactionSupport.getTransactionId() != null)
        {
            if (logger.isDebugEnabled())
            {
                logger.debug("Async cache adding" + refreshableCacheEvent.getTenantId() + " to post commit list");
            }
            TransactionData txData = getTransactionData();
            txData.tenantIds.add(refreshableCacheEvent.getTenantId());
        }
        else
        {
            LinkedHashSet tenantIds = new LinkedHashSet();
            tenantIds.add(refreshableCacheEvent.getTenantId());
            queueRefreshAndSubmit(tenantIds);
        }
    }
    
    /**
     * To be used in a transaction only.
     */
    private TransactionData getTransactionData()
    {
        TransactionData data = (TransactionData) AlfrescoTransactionSupport.getResource(resourceKeyTxnData);
        if (data == null)
        {
            data = new TransactionData();
            // create and initialize caches
            data.tenantIds = new LinkedHashSet();
            // ensure that we get the transaction callbacks as we have bound the unique
            // transactional caches to a common manager
            AlfrescoTransactionSupport.bindListener(this);
            AlfrescoTransactionSupport.bindResource(resourceKeyTxnData, data);
        }
        return data;
    }
    private void queueRefreshAndSubmit(LinkedHashSet tenantIds)
    {
        if((tenantIds == null) || (tenantIds.size() == 0))
        {
            return;
        }
        refreshLock.writeLock().lock();
        try
        {
            for (String tenantId : tenantIds)
            {
                if (logger.isDebugEnabled())
                {
                    logger.debug("Async cache adding refresh to queue for "+tenantId);
                }
                refreshQueue.add(new Refresh(tenantId));
            }
        }
        finally
        {
            refreshLock.writeLock().unlock();
        }
        submit();
    }
    @Override
    public boolean isUpToDate()
    {
       String tenantId = tenantService.getCurrentUserDomain();
       refreshLock.readLock().lock();
       try
       {
           for(Refresh refresh : refreshQueue)
           {
               if(refresh.getTenantId().equals(tenantId))
               {
                   return false;
               }
           }
           if (AlfrescoTransactionSupport.getTransactionId() != null)
           {
               return (!getTransactionData().tenantIds.contains(tenantId));
           }
           else
           {
               return true;
           }
       }
       finally
       {
           refreshLock.readLock().unlock();
       }
    }
    
    /**
     * Must be run with runLock.writeLock
     */
    private Refresh getNextRefresh()
    {
        if (runLock.writeLock().isHeldByCurrentThread())
        {
            for (Refresh refresh : refreshQueue)
            {
                if (refresh.state == RefreshState.WAITING)
                {
                    return refresh;
                }
            }
            return null;
        }
        else
        {
            throw new IllegalStateException("Method should not be called without holding the write lock");
        }
    }
    /**
     * Must be run with runLock.writeLock
     */
    private int countWaiting()
    {
        int count = 0;
        if (runLock.writeLock().isHeldByCurrentThread())
        {
            refreshLock.readLock().lock();
            try
            {
                for (Refresh refresh : refreshQueue)
                {
                    if (refresh.state == RefreshState.WAITING)
                    {
                        count++;
                    }
                }
                return count;
            }
            finally
            {
                refreshLock.readLock().unlock();
            }
        }
        else
        {
            throw new IllegalStateException("Method should not be called without holding the write lock");
        }
    }
    private void submit()
    {
        runLock.writeLock().lock();
        try
        {
            if (refreshState == RefreshState.IDLE)
            {
                if (logger.isDebugEnabled())
                {
                    logger.debug("submit() scheduling job");
                }
                threadPoolExecutor.submit(this);
                refreshState = RefreshState.WAITING;
            }
        }
        finally
        {
            runLock.writeLock().unlock();
        }
    }
    @Override
    public Void call()
    {
        try
        {
            doCall();
            return null;
        }
        catch (Exception e)
        {
            logger.error("Cache update failed (" + this.getCacheId() + ").", e);
            runLock.writeLock().lock();
            try
            {
                threadPoolExecutor.submit(this);
                refreshState = RefreshState.WAITING;
            }
            finally
            {
                runLock.writeLock().unlock();
            }
            return null;
        }
    }
    private void doCall() throws Exception
    {
        Refresh refresh = setUpRefresh();
        if (refresh == null)
        {
            return;
        }
        if (logger.isDebugEnabled())
        {
            logger.debug("Building cache for tenant" + refresh.getTenantId());
        }
        try
        {
            doRefresh(refresh);
        }
        catch (Exception e)
        {
            refresh.setState(RefreshState.WAITING);
            throw e;
        }
    }
    private void doRefresh(Refresh refresh)
    {
        if (logger.isDebugEnabled())
        {
            logger.debug("Building cache for tenant" + refresh.getTenantId() + " ......");
        }
        T cache = buildCache(refresh.getTenantId());
        if (logger.isDebugEnabled())
        {
            logger.debug(".... cache built for tenant" + refresh.getTenantId());
        }
        liveLock.writeLock().lock();
        try
        {
            live.put(refresh.getTenantId(), cache);
        }
        finally
        {
            liveLock.writeLock().unlock();
        }
        if (logger.isDebugEnabled())
        {
            logger.debug("Cache entry updated for tenant" + refresh.getTenantId());
        }
        broadcastEvent(new RefreshableCacheRefreshedEvent(cacheId, refresh.tenantId));
        
        runLock.writeLock().lock();
        try
        {
            refreshLock.writeLock().lock();
            try
            {
                if (countWaiting() > 0)
                {
                    if (logger.isDebugEnabled())
                    {
                        logger.debug("Rescheduling ... more work");
                    }
                    threadPoolExecutor.submit(this);
                    refreshState = RefreshState.WAITING;
                }
                else
                {
                    if (logger.isDebugEnabled())
                    {
                        logger.debug("Nothing to do .... going idle");
                    }
                    refreshState = RefreshState.IDLE;
                }
                refresh.setState(RefreshState.DONE);
                refreshQueue.remove(refresh);
            }
            finally
            {
                refreshLock.writeLock().unlock();
            }
        }
        finally
        {
            runLock.writeLock().unlock();
        }
    }
    private Refresh setUpRefresh() throws Exception
    {
        Refresh refresh = null;
        runLock.writeLock().lock();
        try
        {
            if (refreshState == RefreshState.WAITING)
            {
                refreshLock.writeLock().lock();
                try
                {
                    refresh = getNextRefresh();
                    if (refresh != null)
                    {
                        refreshState = RefreshState.RUNNING;
                        refresh.setState(RefreshState.RUNNING);
                        return refresh;
                    }
                    else
                    {
                        refreshState = RefreshState.IDLE;
                        return null;
                    }
                }
                finally
                {
                    refreshLock.writeLock().unlock();
                }
            }
            else
            {
                return null;
            }
        }
        catch (Exception e)
        {
            if (refresh != null)
            {
                refresh.setState(RefreshState.WAITING);
            }
            throw e;
        }
        finally
        {
            runLock.writeLock().unlock();
        }
    }
    @Override
    public void setBeanName(String name)
    {
        cacheId = name;
    }
    @Override
    public String getCacheId()
    {
        return cacheId;
    }
    /**
     * Build the cache entry for the specific tenant.
     * This method is called in a thread-safe manner i.e. it is only ever called by a single
     * thread.
     */
    protected abstract T buildCache(String tenantId);
    private static class Refresh
    {
        private String tenantId;
        private volatile RefreshState state = RefreshState.WAITING;
        Refresh(String tenantId)
        {
            this.tenantId = tenantId;
        }
        /**
         * @return the tenantId
         */
        public String getTenantId()
        {
            return tenantId;
        }
        /**
         * @return the state
         */
        public RefreshState getState()
        {
            return state;
        }
        /**
         * @param state
         *            the state to set
         */
        public void setState(RefreshState state)
        {
            this.state = state;
        }
        @Override
        public int hashCode()
        {
            // The bucked is determined by the tenantId alone - we are going to change the state
            final int prime = 31;
            int result = 1;
            result = prime * result + ((tenantId == null) ? 0 : tenantId.hashCode());
            return result;
        }
        @Override
        public boolean equals(Object obj)
        {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            Refresh other = (Refresh) obj;
            if (state != other.state)
                return false;
            if (tenantId == null)
            {
                if (other.tenantId != null)
                    return false;
            }
            else if (!tenantId.equals(other.tenantId))
                return false;
            return true;
        }
        @Override
        public String toString()
        {
            return "Refresh [tenantId=" + tenantId + ", state=" + state + ", hashCode()=" + hashCode() + "]";
        }
    }
    @Override
    public void afterPropertiesSet() throws Exception
    {
        PropertyCheck.mandatory(this, "threadPoolExecutor", threadPoolExecutor);
        PropertyCheck.mandatory(this, "tenantService", tenantService);
        PropertyCheck.mandatory(this, "registry", registry);
        registry.register(this);
        
        resourceKeyTxnData = RESOURCE_KEY_TXN_DATA + "." + cacheId;
    }
    public void broadcastEvent(RefreshableCacheEvent event)
    {
        if (logger.isDebugEnabled())
        {
            logger.debug("Notifying cache listeners for " + getCacheId() + " " + event);
        }
        // If the system is up and running, broadcast the event immediately
        for (RefreshableCacheListener listener : this.listeners)
        {
            listener.onRefreshableCacheEvent(event);
        }
    }
    @Override
    public void flush()
    {
        // Nothing
    }
    @Override
    public void beforeCommit(boolean readOnly)
    {
        // Nothing
    }
    @Override
    public void beforeCompletion()
    {
        // Nothing
    }
    @Override
    public void afterCommit()
    {
        TransactionData txnData = getTransactionData();
        queueRefreshAndSubmit(txnData.tenantIds);
    }
    @Override
    public void afterRollback()
    {
        // Nothing
    }
    private static class TransactionData
    {
        LinkedHashSet tenantIds;
    }
}