mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
Merged HEAD-BUG-FIX (5.0/Cloud) to HEAD (5.0/Cloud)
76994: Merged PLATFORM1 (5.0/Cloud) to HEAD-BUG-FIX (5.0/Cloud) 70127: MNT-9882 - refactoring of caches git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@77833 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -1,730 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2013 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.alfresco.repo.cache;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.alfresco.repo.tenant.TenantService;
|
||||
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
|
||||
import org.alfresco.repo.transaction.TransactionListener;
|
||||
import org.alfresco.util.PropertyCheck;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.BeanNameAware;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
|
||||
/**
|
||||
* The base implementation for an asynchronously refreshed cache. Currently supports one value per tenant. Implementors
|
||||
* just need to provide buildCache(String tenanaId)
|
||||
*
|
||||
* @author Andy
|
||||
* @since 4.1.3
|
||||
*/
|
||||
public abstract class AbstractAsynchronouslyRefreshedCache<T> implements AsynchronouslyRefreshedCache<T>, RefreshableCacheListener, Callable<Void>, BeanNameAware,
|
||||
InitializingBean, TransactionListener
|
||||
{
|
||||
private static final String RESOURCE_KEY_TXN_DATA = "AbstractAsynchronouslyRefreshedCache.TxnData";
|
||||
|
||||
private static Log logger = LogFactory.getLog(AbstractAsynchronouslyRefreshedCache.class);
|
||||
|
||||
private enum RefreshState
|
||||
{
|
||||
IDLE, WAITING, RUNNING, DONE
|
||||
};
|
||||
|
||||
private ThreadPoolExecutor threadPoolExecutor;
|
||||
private AsynchronouslyRefreshedCacheRegistry registry;
|
||||
private TenantService tenantService;
|
||||
|
||||
// State
|
||||
|
||||
private List<RefreshableCacheListener> listeners = new LinkedList<RefreshableCacheListener>();
|
||||
private final ReentrantReadWriteLock liveLock = new ReentrantReadWriteLock();
|
||||
private final ReentrantReadWriteLock refreshLock = new ReentrantReadWriteLock();
|
||||
private final ReentrantReadWriteLock runLock = new ReentrantReadWriteLock();
|
||||
private HashMap<String, T> live = new HashMap<String, T>();
|
||||
private LinkedHashSet<Refresh> refreshQueue = new LinkedHashSet<Refresh>();
|
||||
private String cacheId;
|
||||
private RefreshState refreshState = RefreshState.IDLE;
|
||||
private String resourceKeyTxnData;
|
||||
|
||||
@Override
|
||||
public void register(RefreshableCacheListener listener)
|
||||
{
|
||||
listeners.add(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param threadPool
|
||||
* the threadPool to set
|
||||
*/
|
||||
public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor)
|
||||
{
|
||||
this.threadPoolExecutor = threadPoolExecutor;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param registry
|
||||
* the registry to set
|
||||
*/
|
||||
public void setRegistry(AsynchronouslyRefreshedCacheRegistry registry)
|
||||
{
|
||||
this.registry = registry;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tenantService
|
||||
* the tenantService to set
|
||||
*/
|
||||
public void setTenantService(TenantService tenantService)
|
||||
{
|
||||
this.tenantService = tenantService;
|
||||
}
|
||||
|
||||
public void init()
|
||||
{
|
||||
registry.register(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T get()
|
||||
{
|
||||
String tenantId = tenantService.getCurrentUserDomain();
|
||||
liveLock.readLock().lock();
|
||||
try
|
||||
{
|
||||
if (live.get(tenantId) != null)
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("get() from cache");
|
||||
}
|
||||
return live.get(tenantId);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
liveLock.readLock().unlock();
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("get() miss, sechudling and waiting ...");
|
||||
}
|
||||
|
||||
// There was nothing to return so we build and return
|
||||
Refresh refresh = null;
|
||||
refreshLock.writeLock().lock();
|
||||
try
|
||||
{
|
||||
// Is there anything we can wait for
|
||||
for (Refresh existing : refreshQueue)
|
||||
{
|
||||
if (existing.getTenantId().equals(tenantId))
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("get() found existing build to wait for ...");
|
||||
}
|
||||
refresh = existing;
|
||||
}
|
||||
}
|
||||
|
||||
if (refresh == null)
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("get() building from scratch");
|
||||
}
|
||||
refresh = new Refresh(tenantId);
|
||||
refreshQueue.add(refresh);
|
||||
}
|
||||
|
||||
}
|
||||
finally
|
||||
{
|
||||
refreshLock.writeLock().unlock();
|
||||
}
|
||||
submit();
|
||||
waitForBuild(refresh);
|
||||
|
||||
return get();
|
||||
}
|
||||
|
||||
public void forceInChangesForThisUncommittedTransaction()
|
||||
{
|
||||
String tenantId = tenantService.getCurrentUserDomain();
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Building cache for tenant " + tenantId + " ......");
|
||||
}
|
||||
T cache = buildCache(tenantId);
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug(".... cache built for tenant " + tenantId);
|
||||
}
|
||||
|
||||
liveLock.writeLock().lock();
|
||||
try
|
||||
{
|
||||
live.put(tenantId, cache);
|
||||
}
|
||||
finally
|
||||
{
|
||||
liveLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
protected void waitForBuild(Refresh refresh)
|
||||
{
|
||||
while (refresh.getState() != RefreshState.DONE)
|
||||
{
|
||||
synchronized (refresh)
|
||||
{
|
||||
try
|
||||
{
|
||||
refresh.wait(100);
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refresh()
|
||||
{
|
||||
String tenantId = tenantService.getCurrentUserDomain();
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Async cache refresh request: " + cacheId + " for tenant " + tenantId);
|
||||
}
|
||||
registry.broadcastEvent(new RefreshableCacheRefreshEvent(cacheId, tenantId), true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRefreshableCacheEvent(RefreshableCacheEvent refreshableCacheEvent)
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Async cache onRefreshableCacheEvent " + refreshableCacheEvent);
|
||||
}
|
||||
if (false == refreshableCacheEvent.getCacheId().equals(cacheId))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// If in a transaction delay the refresh until after it commits
|
||||
|
||||
if (AlfrescoTransactionSupport.getTransactionId() != null)
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Async cache adding" + refreshableCacheEvent.getTenantId() + " to post commit list");
|
||||
}
|
||||
TransactionData txData = getTransactionData();
|
||||
txData.tenantIds.add(refreshableCacheEvent.getTenantId());
|
||||
}
|
||||
else
|
||||
{
|
||||
LinkedHashSet<String> tenantIds = new LinkedHashSet<String>();
|
||||
tenantIds.add(refreshableCacheEvent.getTenantId());
|
||||
queueRefreshAndSubmit(tenantIds);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* To be used in a transaction only.
|
||||
*/
|
||||
private TransactionData getTransactionData()
|
||||
{
|
||||
TransactionData data = (TransactionData) AlfrescoTransactionSupport.getResource(resourceKeyTxnData);
|
||||
if (data == null)
|
||||
{
|
||||
data = new TransactionData();
|
||||
// create and initialize caches
|
||||
data.tenantIds = new LinkedHashSet<String>();
|
||||
|
||||
// ensure that we get the transaction callbacks as we have bound the unique
|
||||
// transactional caches to a common manager
|
||||
AlfrescoTransactionSupport.bindListener(this);
|
||||
AlfrescoTransactionSupport.bindResource(resourceKeyTxnData, data);
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
private void queueRefreshAndSubmit(LinkedHashSet<String> tenantIds)
|
||||
{
|
||||
if((tenantIds == null) || (tenantIds.size() == 0))
|
||||
{
|
||||
return;
|
||||
}
|
||||
refreshLock.writeLock().lock();
|
||||
try
|
||||
{
|
||||
for (String tenantId : tenantIds)
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Async cache adding refresh to queue for "+tenantId);
|
||||
}
|
||||
refreshQueue.add(new Refresh(tenantId));
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
refreshLock.writeLock().unlock();
|
||||
}
|
||||
submit();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isUpToDate()
|
||||
{
|
||||
String tenantId = tenantService.getCurrentUserDomain();
|
||||
refreshLock.readLock().lock();
|
||||
try
|
||||
{
|
||||
for(Refresh refresh : refreshQueue)
|
||||
{
|
||||
if(refresh.getTenantId().equals(tenantId))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (AlfrescoTransactionSupport.getTransactionId() != null)
|
||||
{
|
||||
return (!getTransactionData().tenantIds.contains(tenantId));
|
||||
}
|
||||
else
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
refreshLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Must be run with runLock.writeLock
|
||||
*/
|
||||
private Refresh getNextRefresh()
|
||||
{
|
||||
if (runLock.writeLock().isHeldByCurrentThread())
|
||||
{
|
||||
for (Refresh refresh : refreshQueue)
|
||||
{
|
||||
if (refresh.state == RefreshState.WAITING)
|
||||
{
|
||||
return refresh;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new IllegalStateException("Method should not be called without holding the write lock");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Must be run with runLock.writeLock
|
||||
*/
|
||||
private int countWaiting()
|
||||
{
|
||||
int count = 0;
|
||||
if (runLock.writeLock().isHeldByCurrentThread())
|
||||
{
|
||||
refreshLock.readLock().lock();
|
||||
try
|
||||
{
|
||||
for (Refresh refresh : refreshQueue)
|
||||
{
|
||||
if (refresh.state == RefreshState.WAITING)
|
||||
{
|
||||
count++;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
finally
|
||||
{
|
||||
refreshLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new IllegalStateException("Method should not be called without holding the write lock");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void submit()
|
||||
{
|
||||
runLock.writeLock().lock();
|
||||
try
|
||||
{
|
||||
if (refreshState == RefreshState.IDLE)
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("submit() scheduling job");
|
||||
}
|
||||
threadPoolExecutor.submit(this);
|
||||
refreshState = RefreshState.WAITING;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
runLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void call()
|
||||
{
|
||||
try
|
||||
{
|
||||
doCall();
|
||||
return null;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
logger.error("Cache update failed (" + this.getCacheId() + ").", e);
|
||||
runLock.writeLock().lock();
|
||||
try
|
||||
{
|
||||
threadPoolExecutor.submit(this);
|
||||
refreshState = RefreshState.WAITING;
|
||||
}
|
||||
finally
|
||||
{
|
||||
runLock.writeLock().unlock();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void doCall() throws Exception
|
||||
{
|
||||
Refresh refresh = setUpRefresh();
|
||||
if (refresh == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Building cache for tenant" + refresh.getTenantId());
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
doRefresh(refresh);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
refresh.setState(RefreshState.WAITING);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private void doRefresh(Refresh refresh)
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Building cache for tenant" + refresh.getTenantId() + " ......");
|
||||
}
|
||||
T cache = buildCache(refresh.getTenantId());
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug(".... cache built for tenant" + refresh.getTenantId());
|
||||
}
|
||||
|
||||
liveLock.writeLock().lock();
|
||||
try
|
||||
{
|
||||
live.put(refresh.getTenantId(), cache);
|
||||
}
|
||||
finally
|
||||
{
|
||||
liveLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Cache entry updated for tenant" + refresh.getTenantId());
|
||||
}
|
||||
|
||||
broadcastEvent(new RefreshableCacheRefreshedEvent(cacheId, refresh.tenantId));
|
||||
|
||||
runLock.writeLock().lock();
|
||||
try
|
||||
{
|
||||
refreshLock.writeLock().lock();
|
||||
try
|
||||
{
|
||||
if (countWaiting() > 0)
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Rescheduling ... more work");
|
||||
}
|
||||
threadPoolExecutor.submit(this);
|
||||
refreshState = RefreshState.WAITING;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Nothing to do .... going idle");
|
||||
}
|
||||
refreshState = RefreshState.IDLE;
|
||||
}
|
||||
refresh.setState(RefreshState.DONE);
|
||||
refreshQueue.remove(refresh);
|
||||
}
|
||||
finally
|
||||
{
|
||||
refreshLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
runLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private Refresh setUpRefresh() throws Exception
|
||||
{
|
||||
Refresh refresh = null;
|
||||
runLock.writeLock().lock();
|
||||
try
|
||||
{
|
||||
if (refreshState == RefreshState.WAITING)
|
||||
{
|
||||
refreshLock.writeLock().lock();
|
||||
try
|
||||
{
|
||||
refresh = getNextRefresh();
|
||||
if (refresh != null)
|
||||
{
|
||||
refreshState = RefreshState.RUNNING;
|
||||
refresh.setState(RefreshState.RUNNING);
|
||||
return refresh;
|
||||
}
|
||||
else
|
||||
{
|
||||
refreshState = RefreshState.IDLE;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
refreshLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
if (refresh != null)
|
||||
{
|
||||
refresh.setState(RefreshState.WAITING);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
finally
|
||||
{
|
||||
runLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBeanName(String name)
|
||||
{
|
||||
cacheId = name;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCacheId()
|
||||
{
|
||||
return cacheId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the cache entry for the specific tenant.
|
||||
* This method is called in a thread-safe manner i.e. it is only ever called by a single
|
||||
* thread.
|
||||
*/
|
||||
protected abstract T buildCache(String tenantId);
|
||||
|
||||
private static class Refresh
|
||||
{
|
||||
private String tenantId;
|
||||
|
||||
private volatile RefreshState state = RefreshState.WAITING;
|
||||
|
||||
Refresh(String tenantId)
|
||||
{
|
||||
this.tenantId = tenantId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the tenantId
|
||||
*/
|
||||
public String getTenantId()
|
||||
{
|
||||
return tenantId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the state
|
||||
*/
|
||||
public RefreshState getState()
|
||||
{
|
||||
return state;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param state
|
||||
* the state to set
|
||||
*/
|
||||
public void setState(RefreshState state)
|
||||
{
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
// The bucked is determined by the tenantId alone - we are going to change the state
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + ((tenantId == null) ? 0 : tenantId.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj)
|
||||
{
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
Refresh other = (Refresh) obj;
|
||||
if (state != other.state)
|
||||
return false;
|
||||
if (tenantId == null)
|
||||
{
|
||||
if (other.tenantId != null)
|
||||
return false;
|
||||
}
|
||||
else if (!tenantId.equals(other.tenantId))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "Refresh [tenantId=" + tenantId + ", state=" + state + ", hashCode()=" + hashCode() + "]";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception
|
||||
{
|
||||
PropertyCheck.mandatory(this, "threadPoolExecutor", threadPoolExecutor);
|
||||
PropertyCheck.mandatory(this, "tenantService", tenantService);
|
||||
PropertyCheck.mandatory(this, "registry", registry);
|
||||
registry.register(this);
|
||||
|
||||
resourceKeyTxnData = RESOURCE_KEY_TXN_DATA + "." + cacheId;
|
||||
|
||||
}
|
||||
|
||||
public void broadcastEvent(RefreshableCacheEvent event)
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Notifying cache listeners for " + getCacheId() + " " + event);
|
||||
}
|
||||
// If the system is up and running, broadcast the event immediately
|
||||
for (RefreshableCacheListener listener : this.listeners)
|
||||
{
|
||||
listener.onRefreshableCacheEvent(event);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush()
|
||||
{
|
||||
// Nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeCommit(boolean readOnly)
|
||||
{
|
||||
// Nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeCompletion()
|
||||
{
|
||||
// Nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCommit()
|
||||
{
|
||||
TransactionData txnData = getTransactionData();
|
||||
queueRefreshAndSubmit(txnData.tenantIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterRollback()
|
||||
{
|
||||
// Nothing
|
||||
}
|
||||
|
||||
private static class TransactionData
|
||||
{
|
||||
LinkedHashSet<String> tenantIds;
|
||||
}
|
||||
}
|
107
source/java/org/alfresco/repo/cache/AbstractMTAsynchronouslyRefreshedCache.java
vendored
Normal file
107
source/java/org/alfresco/repo/cache/AbstractMTAsynchronouslyRefreshedCache.java
vendored
Normal file
@@ -0,0 +1,107 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2013 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.alfresco.repo.cache;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.alfresco.repo.tenant.TenantService;
|
||||
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
|
||||
import org.alfresco.repo.transaction.TransactionListener;
|
||||
import org.alfresco.util.PropertyCheck;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.BeanNameAware;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
|
||||
/**
|
||||
* The base implementation for Multi-tenant asynchronously refreshed cache. Currently supports one value per tenant.
|
||||
*
|
||||
* Implementors just need to provide buildCache(String tennantId)
|
||||
*
|
||||
* @author Andy
|
||||
* @since 4.1.3
|
||||
*/
|
||||
public abstract class AbstractMTAsynchronouslyRefreshedCache<T>
|
||||
extends org.alfresco.util.cache.AbstractAsynchronouslyRefreshedCache<T>
|
||||
implements AsynchronouslyRefreshedCache<T>, InitializingBean
|
||||
{
|
||||
|
||||
private static Log logger = LogFactory.getLog(AbstractMTAsynchronouslyRefreshedCache.class);
|
||||
|
||||
private TenantService tenantService;
|
||||
|
||||
/**
|
||||
* @param tenantService
|
||||
* the tenantService to set
|
||||
*/
|
||||
public void setTenantService(TenantService tenantService)
|
||||
{
|
||||
this.tenantService = tenantService;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public T get()
|
||||
{
|
||||
String tenantId = tenantService.getCurrentUserDomain();
|
||||
return get(tenantId);
|
||||
}
|
||||
|
||||
public void forceInChangesForThisUncommittedTransaction()
|
||||
{
|
||||
String tenantId = tenantService.getCurrentUserDomain();
|
||||
forceInChangesForThisUncommittedTransaction(tenantId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refresh()
|
||||
{
|
||||
String tenantId = tenantService.getCurrentUserDomain();
|
||||
refresh(tenantId);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isUpToDate()
|
||||
{
|
||||
String tenantId = tenantService.getCurrentUserDomain();
|
||||
return isUpToDate(tenantId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the cache entry for the specific tenant.
|
||||
* This method is called in a thread-safe manner i.e. it is only ever called by a single
|
||||
* thread.
|
||||
*/
|
||||
protected abstract T buildCache(String tenantId);
|
||||
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception
|
||||
{
|
||||
PropertyCheck.mandatory(this, "tenantService", tenantService);
|
||||
super.afterPropertiesSet();
|
||||
}
|
||||
}
|
@@ -1,74 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2013 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.alfresco.repo.cache;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Base registry implementation
|
||||
*
|
||||
* @author Andy
|
||||
*/
|
||||
public class DefaultAsynchronouslyRefreshedCacheRegistry implements AsynchronouslyRefreshedCacheRegistry
|
||||
{
|
||||
private static Log logger = LogFactory.getLog(DefaultAsynchronouslyRefreshedCacheRegistry.class);
|
||||
|
||||
private List<RefreshableCacheListener> listeners = new LinkedList<RefreshableCacheListener>();
|
||||
|
||||
@Override
|
||||
public void register(RefreshableCacheListener listener)
|
||||
{
|
||||
if(logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Listener added for " + listener.getCacheId());
|
||||
}
|
||||
listeners.add(listener);
|
||||
}
|
||||
|
||||
public void broadcastEvent(RefreshableCacheEvent event, boolean toAll)
|
||||
{
|
||||
// If the system is up and running, broadcast the event immediately
|
||||
for (RefreshableCacheListener listener : this.listeners)
|
||||
{
|
||||
if (toAll)
|
||||
{
|
||||
if(logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Delivering event (" + event + ") to listener (" + listener + ").");
|
||||
}
|
||||
listener.onRefreshableCacheEvent(event);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (listener.getCacheId().equals(event.getCacheId()))
|
||||
{
|
||||
if(logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Delivering event (" + event + ") to listener (" + listener + ").");
|
||||
}
|
||||
listener.onRefreshableCacheEvent(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@@ -40,12 +40,12 @@ public interface RefreshableCache <T>
|
||||
*/
|
||||
public void refresh();
|
||||
|
||||
/**
|
||||
* Register to be informed when the cache is updated in the background.
|
||||
*
|
||||
* Note: it is up to the implementation to provide any transactional wrapping.
|
||||
* Transactional wrapping is not required to invalidate a shared cache entry directly via a transactional cache
|
||||
* @param listener
|
||||
*/
|
||||
void register(RefreshableCacheListener listener);
|
||||
// /**
|
||||
// * Register to be informed when the cache is updated in the background.
|
||||
// *
|
||||
// * Note: it is up to the implementation to provide any transactional wrapping.
|
||||
// * Transactional wrapping is not required to invalidate a shared cache entry directly via a transactional cache
|
||||
// * @param listener
|
||||
// */
|
||||
// void register(RefreshableCacheListener listener);
|
||||
}
|
||||
|
@@ -23,19 +23,7 @@ package org.alfresco.repo.cache;
|
||||
*
|
||||
* @author Andy
|
||||
*/
|
||||
public interface RefreshableCacheListener
|
||||
public interface RefreshableCacheListener extends org.alfresco.util.cache.RefreshableCacheListener
|
||||
{
|
||||
/**
|
||||
* Callback made when a cache refresh occurs
|
||||
*
|
||||
* @param the cache event
|
||||
*/
|
||||
public void onRefreshableCacheEvent(RefreshableCacheEvent refreshableCacheEvent);
|
||||
|
||||
/**
|
||||
* Cache id so broadcast can be constrained to matching caches
|
||||
*
|
||||
* @return the cache ID
|
||||
*/
|
||||
public String getCacheId();
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user