ALF-9613: added a concurrency test and fix

to ensure concurrent requests for content at a particular URL won't result in the backing store being asked for the same content multiple times.

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@29935 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Matt Ward
2011-08-19 16:32:32 +00:00
parent d6ec5f444b
commit 24354aae0a
5 changed files with 190 additions and 18 deletions

View File

@@ -42,10 +42,20 @@ import org.springframework.beans.factory.annotation.Required;
*/ */
public class CachingContentStore implements ContentStore public class CachingContentStore implements ContentStore
{ {
// NUM_LOCKS absolutely must be a power of 2 for the use of locks to be evenly balanced
private final static int numLocks = 32;
private final static Object[] locks;
private ContentStore backingStore; private ContentStore backingStore;
private ContentCache cache; private ContentCache cache;
private boolean cacheOnInbound; private boolean cacheOnInbound;
static {
locks = new Object[numLocks];
for (int i = 0; i < numLocks; i++)
{
locks[i] = new Object();
}
}
public CachingContentStore() public CachingContentStore()
{ {
@@ -136,17 +146,23 @@ public class CachingContentStore implements ContentStore
*/ */
@Override @Override
public ContentReader getReader(String contentUrl) public ContentReader getReader(String contentUrl)
{ {
if (!cache.contains(contentUrl)) // Synchronise on one of a pool of locks - which one is determined by a hash of the URL.
// This will stop the content from being read multiple times from the backing store
// when it should only be read once and cached versions should be returned after that.
synchronized(lock(contentUrl))
{ {
ContentReader bsReader = backingStore.getReader(contentUrl); if (!cache.contains(contentUrl))
if (!cache.put(contentUrl, bsReader))
{ {
// Content wasn't put into cache successfully. ContentReader bsReader = backingStore.getReader(contentUrl);
return bsReader.getReader(); if (!cache.put(contentUrl, bsReader))
} {
// Content wasn't put into cache successfully.
return bsReader.getReader();
}
}
} }
// TODO: what if, in the meantime this item has been deleted from the disk cache? // TODO: what if, in the meantime this item has been deleted from the disk cache?
return cache.getReader(contentUrl); return cache.getReader(contentUrl);
} }
@@ -231,6 +247,17 @@ public class CachingContentStore implements ContentStore
} }
private Object lock(String s)
{
return locks[lockIndex(s)];
}
private int lockIndex(String s)
{
return s.hashCode() & (numLocks - 1);
}
@Required @Required
public void setBackingStore(ContentStore backingStore) public void setBackingStore(ContentStore backingStore)
{ {

View File

@@ -20,7 +20,7 @@ package org.alfresco.repo.content.caching;
import java.io.File; import java.io.File;
import org.alfresco.repo.cache.EhCacheAdapter; import org.alfresco.repo.cache.SimpleCache;
import org.alfresco.repo.content.ContentStore; import org.alfresco.repo.content.ContentStore;
import org.alfresco.repo.content.filestore.FileContentReader; import org.alfresco.repo.content.filestore.FileContentReader;
import org.alfresco.repo.content.filestore.FileContentWriter; import org.alfresco.repo.content.filestore.FileContentWriter;
@@ -45,7 +45,7 @@ public class ContentCacheImpl implements ContentCache
private static final String CACHE_DIR = "caching_cs"; private static final String CACHE_DIR = "caching_cs";
private static final String TMP_FILE_EXTENSION = ".tmp"; private static final String TMP_FILE_EXTENSION = ".tmp";
private final File cacheRoot = TempFileProvider.getLongLifeTempDir(CACHE_DIR); private final File cacheRoot = TempFileProvider.getLongLifeTempDir(CACHE_DIR);
private EhCacheAdapter<String, String> memoryStore; private SimpleCache<String, String> memoryStore;
@@ -169,7 +169,7 @@ public class ContentCacheImpl implements ContentCache
* *
* @param memoryStore the memoryStore to set * @param memoryStore the memoryStore to set
*/ */
public void setMemoryStore(EhCacheAdapter<String, String> memoryStore) public void setMemoryStore(SimpleCache<String, String> memoryStore)
{ {
this.memoryStore = memoryStore; this.memoryStore = memoryStore;
} }

View File

@@ -22,14 +22,9 @@ package org.alfresco.repo.content.caching;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import java.util.Locale;
import org.alfresco.repo.content.ContentContext; import org.alfresco.repo.content.ContentContext;
import org.alfresco.repo.content.filestore.FileContentStore; import org.alfresco.repo.content.filestore.FileContentStore;
import org.alfresco.service.cmr.repository.ContentAccessor;
import org.alfresco.service.cmr.repository.ContentIOException;
import org.alfresco.service.cmr.repository.ContentReader; import org.alfresco.service.cmr.repository.ContentReader;
import org.alfresco.service.cmr.repository.ContentStreamListener;
import org.alfresco.service.cmr.repository.ContentWriter; import org.alfresco.service.cmr.repository.ContentWriter;
import org.alfresco.util.ApplicationContextHelper; import org.alfresco.util.ApplicationContextHelper;
import org.junit.Before; import org.junit.Before;

View File

@@ -0,0 +1,134 @@
/*
* Copyright (C) 2005-2011 Alfresco Software Limited.
*
* This file is part of Alfresco
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
*/
package org.alfresco.repo.content.caching.test;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.alfresco.repo.content.caching.CachingContentStore;
import org.alfresco.service.cmr.repository.ContentReader;
import org.alfresco.util.ApplicationContextHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
/**
* Tests to ensure that the CachingContentStore works as expected under highly concurrent load.
*
* @author Matt Ward
*/
public class ConcurrentCachingStoreTest
{
private static final Log log = LogFactory.getLog(ConcurrentCachingStoreTest.class);
// NUM_THREADS must be at least 2 x NUM_URLS to ensure each URLs is accessed by more than one thread.
private static final int NUM_THREADS = 2000;
private static final int NUM_URLS = 40;
private ApplicationContext ctx;
private CachingContentStore store;
private SlowContentStore backingStore;
@Before
public void setUp()
{
String conf = "classpath:cachingstore/test-context.xml";
String slowconf = "classpath:cachingstore/test-slow-context.xml";
ctx = ApplicationContextHelper.getApplicationContext(new String[] { conf, slowconf });
store = (CachingContentStore) ctx.getBean("cachingContentStore");
store.setCacheOnInbound(false);
backingStore = (SlowContentStore) ctx.getBean("backingStore");
}
@Test
public void concurrentReadsWillReadCacheOncePerURL() throws InterruptedException
{
// Attack with multiple threads
Thread[] threads = new Thread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++)
{
CacheReaderThread t = new CacheReaderThread(i, NUM_URLS);
threads[i] = t;
t.start();
}
for (int i = 0; i < threads.length; i++)
threads[i].join();
log.debug("\nResults:");
// Check how many times the backing store was read from
int failedURLs = 0;
for (Map.Entry<String, AtomicLong> entry : backingStore.getUrlHits().entrySet())
{
String url = entry.getKey();
long numHits = entry.getValue().get();
log.debug("URL: " + url + ", hits: " + numHits);
if (numHits > 1) failedURLs++;
}
// If any of the URLs were accessed more than once, then the test will fail.
if (failedURLs > 0)
Assert.fail(failedURLs + " URLs were requested more than once.");
}
private class CacheReaderThread extends Thread
{
private final int threadNum;
private final int numUrls;
private int reads = 50;
CacheReaderThread(int threadNum, int numUrls) {
super(CacheReaderThread.class.getSimpleName() + "-" + threadNum);
this.threadNum = threadNum;
this.numUrls = numUrls;
}
@Override
public void run()
{
while (reads > 0)
{
String url = generateUrlToRead();
ContentReader reader = store.getReader(url);
String content = reader.getContentString();
log.debug("Thread: " + getName() + ", URL: " + url + ", content: " + content);
reads--;
}
}
private String generateUrlToRead()
{
int urlNum = threadNum % numUrls;
return "store://2010/11/5/17/33/" + urlNum + ".bin";
}
}
}

View File

@@ -23,6 +23,9 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.alfresco.repo.content.AbstractContentReader; import org.alfresco.repo.content.AbstractContentReader;
import org.alfresco.repo.content.AbstractContentStore; import org.alfresco.repo.content.AbstractContentStore;
@@ -39,7 +42,8 @@ import org.alfresco.service.cmr.repository.ContentWriter;
*/ */
class SlowContentStore extends AbstractContentStore class SlowContentStore extends AbstractContentStore
{ {
private ConcurrentMap<String, AtomicLong> urlHits = new ConcurrentHashMap<String, AtomicLong>();
/* /*
* @see org.alfresco.repo.content.ContentStore#isWriteSupported() * @see org.alfresco.repo.content.ContentStore#isWriteSupported()
*/ */
@@ -55,6 +59,9 @@ class SlowContentStore extends AbstractContentStore
@Override @Override
public ContentReader getReader(String contentUrl) public ContentReader getReader(String contentUrl)
{ {
urlHits.putIfAbsent(contentUrl, new AtomicLong(0));
urlHits.get(contentUrl).incrementAndGet();
return new SlowReader(contentUrl); return new SlowReader(contentUrl);
} }
@@ -217,7 +224,16 @@ class SlowContentStore extends AbstractContentStore
} }
/**
* Get statistics for which URLs have been asked for and the frequencies.
*
* @return Map of URL to frequency
*/
public ConcurrentMap<String, AtomicLong> getUrlHits()
{
return this.urlHits;
}
public static void main(String[] args) public static void main(String[] args)
{ {
SlowContentStore scs = new SlowContentStore(); SlowContentStore scs = new SlowContentStore();