diff --git a/source/java/org/alfresco/repo/content/caching/CachingContentStore.java b/source/java/org/alfresco/repo/content/caching/CachingContentStore.java index 858b969a23..b8c0f9721c 100644 --- a/source/java/org/alfresco/repo/content/caching/CachingContentStore.java +++ b/source/java/org/alfresco/repo/content/caching/CachingContentStore.java @@ -42,10 +42,20 @@ import org.springframework.beans.factory.annotation.Required; */ public class CachingContentStore implements ContentStore { + // NUM_LOCKS absolutely must be a power of 2 for the use of locks to be evenly balanced + private final static int numLocks = 32; + private final static Object[] locks; private ContentStore backingStore; private ContentCache cache; private boolean cacheOnInbound; + static { + locks = new Object[numLocks]; + for (int i = 0; i < numLocks; i++) + { + locks[i] = new Object(); + } + } public CachingContentStore() { @@ -136,17 +146,23 @@ public class CachingContentStore implements ContentStore */ @Override public ContentReader getReader(String contentUrl) - { - if (!cache.contains(contentUrl)) + { + // Synchronise on one of a pool of locks - which one is determined by a hash of the URL. + // This will stop the content from being read multiple times from the backing store + // when it should only be read once and cached versions should be returned after that. + synchronized(lock(contentUrl)) { - ContentReader bsReader = backingStore.getReader(contentUrl); - if (!cache.put(contentUrl, bsReader)) + if (!cache.contains(contentUrl)) { - // Content wasn't put into cache successfully. - return bsReader.getReader(); - } + ContentReader bsReader = backingStore.getReader(contentUrl); + if (!cache.put(contentUrl, bsReader)) + { + // Content wasn't put into cache successfully. + return bsReader.getReader(); + } + } } - + // TODO: what if, in the meantime this item has been deleted from the disk cache? return cache.getReader(contentUrl); } @@ -231,6 +247,17 @@ public class CachingContentStore implements ContentStore } + private Object lock(String s) + { + return locks[lockIndex(s)]; + } + + private int lockIndex(String s) + { + return s.hashCode() & (numLocks - 1); + } + + @Required public void setBackingStore(ContentStore backingStore) { diff --git a/source/java/org/alfresco/repo/content/caching/ContentCacheImpl.java b/source/java/org/alfresco/repo/content/caching/ContentCacheImpl.java index 5fcc7319ab..5b76fde2f9 100644 --- a/source/java/org/alfresco/repo/content/caching/ContentCacheImpl.java +++ b/source/java/org/alfresco/repo/content/caching/ContentCacheImpl.java @@ -20,7 +20,7 @@ package org.alfresco.repo.content.caching; import java.io.File; -import org.alfresco.repo.cache.EhCacheAdapter; +import org.alfresco.repo.cache.SimpleCache; import org.alfresco.repo.content.ContentStore; import org.alfresco.repo.content.filestore.FileContentReader; import org.alfresco.repo.content.filestore.FileContentWriter; @@ -45,7 +45,7 @@ public class ContentCacheImpl implements ContentCache private static final String CACHE_DIR = "caching_cs"; private static final String TMP_FILE_EXTENSION = ".tmp"; private final File cacheRoot = TempFileProvider.getLongLifeTempDir(CACHE_DIR); - private EhCacheAdapter memoryStore; + private SimpleCache memoryStore; @@ -169,7 +169,7 @@ public class ContentCacheImpl implements ContentCache * * @param memoryStore the memoryStore to set */ - public void setMemoryStore(EhCacheAdapter memoryStore) + public void setMemoryStore(SimpleCache memoryStore) { this.memoryStore = memoryStore; } diff --git a/source/java/org/alfresco/repo/content/caching/FullTest.java b/source/java/org/alfresco/repo/content/caching/FullTest.java index f662c9fa81..6fbe8f7b90 100644 --- a/source/java/org/alfresco/repo/content/caching/FullTest.java +++ b/source/java/org/alfresco/repo/content/caching/FullTest.java @@ -22,14 +22,9 @@ package org.alfresco.repo.content.caching; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import java.util.Locale; - import org.alfresco.repo.content.ContentContext; import org.alfresco.repo.content.filestore.FileContentStore; -import org.alfresco.service.cmr.repository.ContentAccessor; -import org.alfresco.service.cmr.repository.ContentIOException; import org.alfresco.service.cmr.repository.ContentReader; -import org.alfresco.service.cmr.repository.ContentStreamListener; import org.alfresco.service.cmr.repository.ContentWriter; import org.alfresco.util.ApplicationContextHelper; import org.junit.Before; diff --git a/source/java/org/alfresco/repo/content/caching/test/ConcurrentCachingStoreTest.java b/source/java/org/alfresco/repo/content/caching/test/ConcurrentCachingStoreTest.java new file mode 100644 index 0000000000..54da872c7b --- /dev/null +++ b/source/java/org/alfresco/repo/content/caching/test/ConcurrentCachingStoreTest.java @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2005-2011 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ + +package org.alfresco.repo.content.caching.test; + + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.alfresco.repo.content.caching.CachingContentStore; +import org.alfresco.service.cmr.repository.ContentReader; +import org.alfresco.util.ApplicationContextHelper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.context.ApplicationContext; + +/** + * Tests to ensure that the CachingContentStore works as expected under highly concurrent load. + * + * @author Matt Ward + */ +public class ConcurrentCachingStoreTest +{ + private static final Log log = LogFactory.getLog(ConcurrentCachingStoreTest.class); + // NUM_THREADS must be at least 2 x NUM_URLS to ensure each URLs is accessed by more than one thread. + private static final int NUM_THREADS = 2000; + private static final int NUM_URLS = 40; + private ApplicationContext ctx; + private CachingContentStore store; + private SlowContentStore backingStore; + + @Before + public void setUp() + { + String conf = "classpath:cachingstore/test-context.xml"; + String slowconf = "classpath:cachingstore/test-slow-context.xml"; + ctx = ApplicationContextHelper.getApplicationContext(new String[] { conf, slowconf }); + + store = (CachingContentStore) ctx.getBean("cachingContentStore"); + store.setCacheOnInbound(false); + + backingStore = (SlowContentStore) ctx.getBean("backingStore"); + } + + + @Test + public void concurrentReadsWillReadCacheOncePerURL() throws InterruptedException + { + // Attack with multiple threads + Thread[] threads = new Thread[NUM_THREADS]; + for (int i = 0; i < NUM_THREADS; i++) + { + CacheReaderThread t = new CacheReaderThread(i, NUM_URLS); + threads[i] = t; + t.start(); + } + + for (int i = 0; i < threads.length; i++) + threads[i].join(); + + + log.debug("\nResults:"); + + // Check how many times the backing store was read from + int failedURLs = 0; + + for (Map.Entry entry : backingStore.getUrlHits().entrySet()) + { + String url = entry.getKey(); + long numHits = entry.getValue().get(); + log.debug("URL: " + url + ", hits: " + numHits); + + if (numHits > 1) failedURLs++; + } + + + // If any of the URLs were accessed more than once, then the test will fail. + if (failedURLs > 0) + Assert.fail(failedURLs + " URLs were requested more than once."); + } + + + + private class CacheReaderThread extends Thread + { + private final int threadNum; + private final int numUrls; + private int reads = 50; + + CacheReaderThread(int threadNum, int numUrls) { + super(CacheReaderThread.class.getSimpleName() + "-" + threadNum); + this.threadNum = threadNum; + this.numUrls = numUrls; + } + + @Override + public void run() + { + while (reads > 0) + { + String url = generateUrlToRead(); + ContentReader reader = store.getReader(url); + String content = reader.getContentString(); + log.debug("Thread: " + getName() + ", URL: " + url + ", content: " + content); + reads--; + } + } + + private String generateUrlToRead() + { + int urlNum = threadNum % numUrls; + return "store://2010/11/5/17/33/" + urlNum + ".bin"; + } + } +} diff --git a/source/java/org/alfresco/repo/content/caching/test/SlowContentStore.java b/source/java/org/alfresco/repo/content/caching/test/SlowContentStore.java index b1a4752b05..fb8dd634f7 100644 --- a/source/java/org/alfresco/repo/content/caching/test/SlowContentStore.java +++ b/source/java/org/alfresco/repo/content/caching/test/SlowContentStore.java @@ -23,6 +23,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import org.alfresco.repo.content.AbstractContentReader; import org.alfresco.repo.content.AbstractContentStore; @@ -39,7 +42,8 @@ import org.alfresco.service.cmr.repository.ContentWriter; */ class SlowContentStore extends AbstractContentStore { - + private ConcurrentMap urlHits = new ConcurrentHashMap(); + /* * @see org.alfresco.repo.content.ContentStore#isWriteSupported() */ @@ -55,6 +59,9 @@ class SlowContentStore extends AbstractContentStore @Override public ContentReader getReader(String contentUrl) { + urlHits.putIfAbsent(contentUrl, new AtomicLong(0)); + urlHits.get(contentUrl).incrementAndGet(); + return new SlowReader(contentUrl); } @@ -217,7 +224,16 @@ class SlowContentStore extends AbstractContentStore } - + /** + * Get statistics for which URLs have been asked for and the frequencies. + * + * @return Map of URL to frequency + */ + public ConcurrentMap getUrlHits() + { + return this.urlHits; + } + public static void main(String[] args) { SlowContentStore scs = new SlowContentStore();