ACS-406: S3 Connector: support for content direct access urls throws … (#1080)

-  added implementation for CachingContentStore and AggregatingContentStore
  -  added tests
This commit is contained in:
Cristian Turlica
2020-07-07 12:31:59 +03:00
committed by GitHub
parent 8885b75e0b
commit b16e62761b
4 changed files with 1347 additions and 1045 deletions

View File

@@ -23,8 +23,9 @@
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.content.caching;
package org.alfresco.repo.content.caching;
import java.util.Date;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -39,440 +40,451 @@ import org.alfresco.service.cmr.repository.ContentIOException;
import org.alfresco.service.cmr.repository.ContentReader;
import org.alfresco.service.cmr.repository.ContentStreamListener;
import org.alfresco.service.cmr.repository.ContentWriter;
import org.alfresco.service.cmr.repository.DirectAccessUrl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
/**
* Implementation of ContentStore that wraps any other ContentStore (the backing store)
* transparently providing caching of content in that backing store.
* <p>
* CachingContentStore should only be used to wrap content stores that are significantly
* slower that FileContentStore - otherwise performance may actually degrade from its use.
* <p>
* It is important that cacheOnInbound is set to true for exceptionally slow backing stores.
* <p>
* This store handles the {@link FileContentStore#SPOOF_PROTOCOL} and can be used to wrap stores
* that do not handle the protocol out of the box e.g. the S3 connector's store.
*
* @author Matt Ward
*/
public class CachingContentStore implements ContentStore, ApplicationEventPublisherAware, BeanNameAware
{
private final static Log log = LogFactory.getLog(CachingContentStore.class);
// NUM_LOCKS absolutely must be a power of 2 for the use of locks to be evenly balanced
private final static int numLocks = 256;
private final static ReentrantReadWriteLock[] locks;
private ContentStore backingStore;
private ContentCache cache;
private QuotaManagerStrategy quota = new UnlimitedQuotaStrategy();
private boolean cacheOnInbound;
private int maxCacheTries = 2;
private ApplicationEventPublisher eventPublisher;
private String beanName;
static
{
locks = new ReentrantReadWriteLock[numLocks];
for (int i = 0; i < numLocks; i++)
{
locks[i] = new ReentrantReadWriteLock();
}
}
public CachingContentStore()
{
}
public CachingContentStore(ContentStore backingStore, ContentCache cache, boolean cacheOnInbound)
{
this.backingStore = backingStore;
this.cache = cache;
this.cacheOnInbound = cacheOnInbound;
}
/**
* Initialisation method, should be called once the CachingContentStore has been constructed.
*/
public void init()
{
eventPublisher.publishEvent(new CachingContentStoreCreatedEvent(this));
}
@Override
public boolean isContentUrlSupported(String contentUrl)
{
return backingStore.isContentUrlSupported(contentUrl);
}
@Override
public boolean isWriteSupported()
{
return backingStore.isWriteSupported();
}
@Override
public long getSpaceFree()
{
return backingStore.getSpaceFree();
}
@Override
public long getSpaceTotal()
{
return backingStore.getSpaceTotal();
}
@Override
public String getRootLocation()
{
return backingStore.getRootLocation();
}
/**
* {@inheritDoc}
* <p>
* For {@link #SPOOF_PROTOCOL spoofed} URLs, the URL always exists.
*/
@Override
public boolean exists(String contentUrl)
{
if (contentUrl.startsWith(FileContentStore.SPOOF_PROTOCOL))
{
return true;
}
else
{
return backingStore.exists(contentUrl);
}
}
/**
* {@inheritDoc}
* <p>
* This store handles the {@link FileContentStore#SPOOF_PROTOCOL} so that underlying stores do not need
* to implement anything <a href="https://issues.alfresco.com/jira/browse/ACE-4516">related to spoofing</a>.
*/
@Override
public ContentReader getReader(String contentUrl)
{
// Handle the spoofed URL
if (contentUrl.startsWith(FileContentStore.SPOOF_PROTOCOL))
{
return new SpoofedTextContentReader(contentUrl);
}
// Use pool of locks - which one is determined by a hash of the URL.
// This will stop the content from being read/cached multiple times from the backing store
// when it should only be read once - cached versions should be returned after that.
ReadLock readLock = readWriteLock(contentUrl).readLock();
readLock.lock();
try
{
if (cache.contains(contentUrl))
{
return cache.getReader(contentUrl);
}
}
catch(CacheMissException e)
{
// Fall through to cacheAndRead(url);
}
finally
{
readLock.unlock();
}
return cacheAndRead(contentUrl);
}
private ContentReader cacheAndRead(String url)
{
WriteLock writeLock = readWriteLock(url).writeLock();
writeLock.lock();
try
{
for (int i = 0; i < maxCacheTries; i++)
{
ContentReader backingStoreReader = backingStore.getReader(url);
long contentSize = backingStoreReader.getSize();
if (!quota.beforeWritingCacheFile(contentSize))
{
return backingStoreReader;
}
ContentReader reader = attemptCacheAndRead(url, backingStoreReader);
if (reader != null)
{
boolean keepCacheFile = quota.afterWritingCacheFile(contentSize);
if (keepCacheFile)
{
return reader;
}
else
{
// Quota strategy has requested cache file not to be kept.
cache.deleteFile(url);
cache.remove(url);
return backingStore.getReader(url);
}
}
}
// Have tried multiple times to cache the item and read it back from the cache
// but there is a recurring problem - give up and return the item from the backing store.
if (log.isWarnEnabled())
{
log.warn("Attempted " + maxCacheTries + " times to cache content item and failed - "
+ "returning reader from backing store instead [" +
"backingStore=" + backingStore +
", url=" + url +
"]");
}
return backingStore.getReader(url);
}
finally
{
writeLock.unlock();
}
}
/**
* Attempt to read content into a cached file and return a reader onto it. If the content is
* already in the cache (possibly due to a race condition between the read/write locks) then
* a reader onto that content is returned.
* <p>
* If it is not possible to cache the content and/or get a reader onto the cached content then
* <code>null</code> is returned and the method ensure that the URL is not stored in the cache.
*
* @param url URL to cache.
* @return A reader onto the cached content file or null if unable to provide one.
*/
private ContentReader attemptCacheAndRead(String url, ContentReader backingStoreReader)
{
ContentReader reader = null;
try
{
if (!cache.contains(url))
{
if (cache.put(url, backingStoreReader))
{
reader = cache.getReader(url);
}
}
else
{
reader = cache.getReader(url);
}
}
catch(CacheMissException e)
{
cache.remove(url);
}
return reader;
}
@Override
public ContentWriter getWriter(final ContentContext context)
{
if (cacheOnInbound)
{
final ContentWriter bsWriter = backingStore.getWriter(context);
if (!quota.beforeWritingCacheFile(0))
{
return bsWriter;
}
// Writing will be performed straight to the cache.
final String url = bsWriter.getContentUrl();
final BackingStoreAwareCacheWriter cacheWriter = new BackingStoreAwareCacheWriter(cache.getWriter(url), bsWriter);
// When finished writing perform these actions.
cacheWriter.addListener(new ContentStreamListener()
{
@Override
public void contentStreamClosed() throws ContentIOException
{
// Finished writing to the cache, so copy to the backing store -
// ensuring that the encoding attributes are set to the same as for the cache writer.
bsWriter.setEncoding(cacheWriter.getEncoding());
bsWriter.setLocale(cacheWriter.getLocale());
bsWriter.setMimetype(cacheWriter.getMimetype());
bsWriter.putContent(cacheWriter.getReader());
boolean contentUrlChanged = !url.equals(bsWriter.getContentUrl());
// MNT-11758 fix, re-cache files for which content url has changed after write to backing store (e.g. XAM, Centera)
if (!quota.afterWritingCacheFile(cacheWriter.getSize()) || contentUrlChanged)
{
if (contentUrlChanged)
{
// MNT-11758 fix, cache file with new and correct contentUrl after write operation to backing store completed
cache.put(bsWriter.getContentUrl(), cacheWriter.getReader());
}
// Quota manager has requested that the new cache file is not kept.
cache.deleteFile(url);
cache.remove(url);
}
}
});
return cacheWriter;
}
else
{
// No need to invalidate the cache for this content URL, since a content URL
// is only ever written to once.
return backingStore.getWriter(context);
}
}
@Override
public boolean delete(String contentUrl)
{
if (contentUrl.startsWith(FileContentStore.SPOOF_PROTOCOL))
{
// This is not a failure but the content can never actually be deleted
return false;
}
ReentrantReadWriteLock readWriteLock = readWriteLock(contentUrl);
ReadLock readLock = readWriteLock.readLock();
readLock.lock();
try
{
if (!cache.contains(contentUrl))
{
// The item isn't in the cache, so simply delete from the backing store
return backingStore.delete(contentUrl);
}
}
finally
{
readLock.unlock();
}
WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try
{
// Double check the content still exists in the cache
if (cache.contains(contentUrl))
{
// The item is in the cache, so remove.
cache.remove(contentUrl);
}
// Whether the item was in the cache or not, it must still be deleted from the backing store.
return backingStore.delete(contentUrl);
}
finally
{
writeLock.unlock();
}
}
/**
* Get a ReentrantReadWriteLock for a given URL. The lock is from a pool rather than
* per URL, so some contention is expected.
*
* @param url String
* @return ReentrantReadWriteLock
*/
public ReentrantReadWriteLock readWriteLock(String url)
{
return locks[lockIndex(url)];
}
private int lockIndex(String url)
{
return url.hashCode() & (numLocks - 1);
}
@Required
public void setBackingStore(ContentStore backingStore)
{
this.backingStore = backingStore;
}
public String getBackingStoreType()
{
return backingStore.getClass().getName();
}
public String getBackingStoreDescription()
{
return backingStore.toString();
}
@Required
public void setCache(ContentCache cache)
{
this.cache = cache;
}
public ContentCache getCache()
{
return this.cache;
}
public void setCacheOnInbound(boolean cacheOnInbound)
{
this.cacheOnInbound = cacheOnInbound;
}
public boolean isCacheOnInbound()
{
return this.cacheOnInbound;
}
public int getMaxCacheTries()
{
return this.maxCacheTries;
}
public void setMaxCacheTries(int maxCacheTries)
{
this.maxCacheTries = maxCacheTries;
}
/**
* Sets the QuotaManagerStrategy that will be used.
*
* @param quota QuotaManagerStrategy
*/
@Required
public void setQuota(QuotaManagerStrategy quota)
{
this.quota = quota;
}
public QuotaManagerStrategy getQuota()
{
return this.quota;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
{
this.eventPublisher = applicationEventPublisher;
}
@Override
public void setBeanName(String name)
{
this.beanName = name;
}
public String getBeanName()
{
return this.beanName;
}
}
/**
* Implementation of ContentStore that wraps any other ContentStore (the backing store)
* transparently providing caching of content in that backing store.
* <p>
* CachingContentStore should only be used to wrap content stores that are significantly
* slower that FileContentStore - otherwise performance may actually degrade from its use.
* <p>
* It is important that cacheOnInbound is set to true for exceptionally slow backing stores.
* <p>
* This store handles the {@link FileContentStore#SPOOF_PROTOCOL} and can be used to wrap stores
* that do not handle the protocol out of the box e.g. the S3 connector's store.
*
* @author Matt Ward
*/
public class CachingContentStore implements ContentStore, ApplicationEventPublisherAware, BeanNameAware
{
private final static Log log = LogFactory.getLog(CachingContentStore.class);
// NUM_LOCKS absolutely must be a power of 2 for the use of locks to be evenly balanced
private final static int numLocks = 256;
private final static ReentrantReadWriteLock[] locks;
private ContentStore backingStore;
private ContentCache cache;
private QuotaManagerStrategy quota = new UnlimitedQuotaStrategy();
private boolean cacheOnInbound;
private int maxCacheTries = 2;
private ApplicationEventPublisher eventPublisher;
private String beanName;
static
{
locks = new ReentrantReadWriteLock[numLocks];
for (int i = 0; i < numLocks; i++)
{
locks[i] = new ReentrantReadWriteLock();
}
}
public CachingContentStore()
{
}
public CachingContentStore(ContentStore backingStore, ContentCache cache, boolean cacheOnInbound)
{
this.backingStore = backingStore;
this.cache = cache;
this.cacheOnInbound = cacheOnInbound;
}
/**
* Initialisation method, should be called once the CachingContentStore has been constructed.
*/
public void init()
{
eventPublisher.publishEvent(new CachingContentStoreCreatedEvent(this));
}
@Override
public boolean isContentUrlSupported(String contentUrl)
{
return backingStore.isContentUrlSupported(contentUrl);
}
@Override
public boolean isWriteSupported()
{
return backingStore.isWriteSupported();
}
@Override
public long getSpaceFree()
{
return backingStore.getSpaceFree();
}
@Override
public long getSpaceTotal()
{
return backingStore.getSpaceTotal();
}
@Override
public String getRootLocation()
{
return backingStore.getRootLocation();
}
/**
* {@inheritDoc}
* <p>
* For {@link #SPOOF_PROTOCOL spoofed} URLs, the URL always exists.
*/
@Override
public boolean exists(String contentUrl)
{
if (contentUrl.startsWith(FileContentStore.SPOOF_PROTOCOL))
{
return true;
}
else
{
return backingStore.exists(contentUrl);
}
}
/**
* {@inheritDoc}
* <p>
* This store handles the {@link FileContentStore#SPOOF_PROTOCOL} so that underlying stores do not need
* to implement anything <a href="https://issues.alfresco.com/jira/browse/ACE-4516">related to spoofing</a>.
*/
@Override
public ContentReader getReader(String contentUrl)
{
// Handle the spoofed URL
if (contentUrl.startsWith(FileContentStore.SPOOF_PROTOCOL))
{
return new SpoofedTextContentReader(contentUrl);
}
// Use pool of locks - which one is determined by a hash of the URL.
// This will stop the content from being read/cached multiple times from the backing store
// when it should only be read once - cached versions should be returned after that.
ReadLock readLock = readWriteLock(contentUrl).readLock();
readLock.lock();
try
{
if (cache.contains(contentUrl))
{
return cache.getReader(contentUrl);
}
}
catch(CacheMissException e)
{
// Fall through to cacheAndRead(url);
}
finally
{
readLock.unlock();
}
return cacheAndRead(contentUrl);
}
private ContentReader cacheAndRead(String url)
{
WriteLock writeLock = readWriteLock(url).writeLock();
writeLock.lock();
try
{
for (int i = 0; i < maxCacheTries; i++)
{
ContentReader backingStoreReader = backingStore.getReader(url);
long contentSize = backingStoreReader.getSize();
if (!quota.beforeWritingCacheFile(contentSize))
{
return backingStoreReader;
}
ContentReader reader = attemptCacheAndRead(url, backingStoreReader);
if (reader != null)
{
boolean keepCacheFile = quota.afterWritingCacheFile(contentSize);
if (keepCacheFile)
{
return reader;
}
else
{
// Quota strategy has requested cache file not to be kept.
cache.deleteFile(url);
cache.remove(url);
return backingStore.getReader(url);
}
}
}
// Have tried multiple times to cache the item and read it back from the cache
// but there is a recurring problem - give up and return the item from the backing store.
if (log.isWarnEnabled())
{
log.warn("Attempted " + maxCacheTries + " times to cache content item and failed - "
+ "returning reader from backing store instead [" +
"backingStore=" + backingStore +
", url=" + url +
"]");
}
return backingStore.getReader(url);
}
finally
{
writeLock.unlock();
}
}
/**
* Attempt to read content into a cached file and return a reader onto it. If the content is
* already in the cache (possibly due to a race condition between the read/write locks) then
* a reader onto that content is returned.
* <p>
* If it is not possible to cache the content and/or get a reader onto the cached content then
* <code>null</code> is returned and the method ensure that the URL is not stored in the cache.
*
* @param url URL to cache.
* @return A reader onto the cached content file or null if unable to provide one.
*/
private ContentReader attemptCacheAndRead(String url, ContentReader backingStoreReader)
{
ContentReader reader = null;
try
{
if (!cache.contains(url))
{
if (cache.put(url, backingStoreReader))
{
reader = cache.getReader(url);
}
}
else
{
reader = cache.getReader(url);
}
}
catch(CacheMissException e)
{
cache.remove(url);
}
return reader;
}
@Override
public ContentWriter getWriter(final ContentContext context)
{
if (cacheOnInbound)
{
final ContentWriter bsWriter = backingStore.getWriter(context);
if (!quota.beforeWritingCacheFile(0))
{
return bsWriter;
}
// Writing will be performed straight to the cache.
final String url = bsWriter.getContentUrl();
final BackingStoreAwareCacheWriter cacheWriter = new BackingStoreAwareCacheWriter(cache.getWriter(url), bsWriter);
// When finished writing perform these actions.
cacheWriter.addListener(new ContentStreamListener()
{
@Override
public void contentStreamClosed() throws ContentIOException
{
// Finished writing to the cache, so copy to the backing store -
// ensuring that the encoding attributes are set to the same as for the cache writer.
bsWriter.setEncoding(cacheWriter.getEncoding());
bsWriter.setLocale(cacheWriter.getLocale());
bsWriter.setMimetype(cacheWriter.getMimetype());
bsWriter.putContent(cacheWriter.getReader());
boolean contentUrlChanged = !url.equals(bsWriter.getContentUrl());
// MNT-11758 fix, re-cache files for which content url has changed after write to backing store (e.g. XAM, Centera)
if (!quota.afterWritingCacheFile(cacheWriter.getSize()) || contentUrlChanged)
{
if (contentUrlChanged)
{
// MNT-11758 fix, cache file with new and correct contentUrl after write operation to backing store completed
cache.put(bsWriter.getContentUrl(), cacheWriter.getReader());
}
// Quota manager has requested that the new cache file is not kept.
cache.deleteFile(url);
cache.remove(url);
}
}
});
return cacheWriter;
}
else
{
// No need to invalidate the cache for this content URL, since a content URL
// is only ever written to once.
return backingStore.getWriter(context);
}
}
@Override
public boolean delete(String contentUrl)
{
if (contentUrl.startsWith(FileContentStore.SPOOF_PROTOCOL))
{
// This is not a failure but the content can never actually be deleted
return false;
}
ReentrantReadWriteLock readWriteLock = readWriteLock(contentUrl);
ReadLock readLock = readWriteLock.readLock();
readLock.lock();
try
{
if (!cache.contains(contentUrl))
{
// The item isn't in the cache, so simply delete from the backing store
return backingStore.delete(contentUrl);
}
}
finally
{
readLock.unlock();
}
WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try
{
// Double check the content still exists in the cache
if (cache.contains(contentUrl))
{
// The item is in the cache, so remove.
cache.remove(contentUrl);
}
// Whether the item was in the cache or not, it must still be deleted from the backing store.
return backingStore.delete(contentUrl);
}
finally
{
writeLock.unlock();
}
}
/**
* Get a ReentrantReadWriteLock for a given URL. The lock is from a pool rather than
* per URL, so some contention is expected.
*
* @param url String
* @return ReentrantReadWriteLock
*/
public ReentrantReadWriteLock readWriteLock(String url)
{
return locks[lockIndex(url)];
}
private int lockIndex(String url)
{
return url.hashCode() & (numLocks - 1);
}
@Required
public void setBackingStore(ContentStore backingStore)
{
this.backingStore = backingStore;
}
public String getBackingStoreType()
{
return backingStore.getClass().getName();
}
public String getBackingStoreDescription()
{
return backingStore.toString();
}
@Required
public void setCache(ContentCache cache)
{
this.cache = cache;
}
public ContentCache getCache()
{
return this.cache;
}
public void setCacheOnInbound(boolean cacheOnInbound)
{
this.cacheOnInbound = cacheOnInbound;
}
public boolean isCacheOnInbound()
{
return this.cacheOnInbound;
}
public int getMaxCacheTries()
{
return this.maxCacheTries;
}
public void setMaxCacheTries(int maxCacheTries)
{
this.maxCacheTries = maxCacheTries;
}
/**
* Sets the QuotaManagerStrategy that will be used.
*
* @param quota QuotaManagerStrategy
*/
@Required
public void setQuota(QuotaManagerStrategy quota)
{
this.quota = quota;
}
public QuotaManagerStrategy getQuota()
{
return this.quota;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
{
this.eventPublisher = applicationEventPublisher;
}
@Override
public void setBeanName(String name)
{
this.beanName = name;
}
public String getBeanName()
{
return this.beanName;
}
public boolean isDirectAccessSupported()
{
return backingStore.isDirectAccessSupported();
}
public DirectAccessUrl getDirectAccessUrl(String contentUrl, Date expiresAt)
{
return backingStore.getDirectAccessUrl(contentUrl, expiresAt);
}
}

View File

@@ -25,6 +25,7 @@
*/
package org.alfresco.repo.content.replication;
import java.util.Date;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -39,6 +40,7 @@ import org.alfresco.repo.content.caching.CachingContentStore;
import org.alfresco.service.cmr.repository.ContentIOException;
import org.alfresco.service.cmr.repository.ContentReader;
import org.alfresco.service.cmr.repository.ContentWriter;
import org.alfresco.service.cmr.repository.DirectAccessUrl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -262,4 +264,115 @@ public class AggregatingContentStore extends AbstractContentStore
}
return deleted;
}
/**
* @return Returns <tt>true</tt> if at least one store supports direct access
*/
public boolean isDirectAccessSupported()
{
// Check the primary store
boolean isDirectAccessSupported = primaryStore.isDirectAccessSupported();
if (!isDirectAccessSupported)
{
// Direct access is not supported by the primary store so we have to check the
// other stores
for (ContentStore store : secondaryStores)
{
isDirectAccessSupported = store.isDirectAccessSupported();
if (isDirectAccessSupported)
{
break;
}
}
}
return isDirectAccessSupported;
}
public DirectAccessUrl getDirectAccessUrl(String contentUrl, Date expiresAt)
{
if (primaryStore == null)
{
throw new AlfrescoRuntimeException("ReplicatingContentStore not initialised");
}
// get a read lock so that we are sure that no replication is underway
readLock.lock();
try
{
// Keep track of the unsupported state of the content URL - it might be a rubbish URL
boolean contentUrlSupported = true;
boolean directAccessUrlSupported = true;
DirectAccessUrl directAccessUrl = null;
// Check the primary store
try
{
directAccessUrl = primaryStore.getDirectAccessUrl(contentUrl, expiresAt);
}
catch (UnsupportedOperationException e)
{
// The store does not support direct access URL
directAccessUrlSupported = false;
}
catch (UnsupportedContentUrlException e)
{
// The store can't handle the content URL
contentUrlSupported = false;
}
if (directAccessUrl != null)
{
return directAccessUrl;
}
// the content is not in the primary store so we have to go looking for it
for (ContentStore store : secondaryStores)
{
try
{
directAccessUrl = store.getDirectAccessUrl(contentUrl, expiresAt);
}
catch (UnsupportedOperationException e)
{
// The store does not support direct access URL
directAccessUrlSupported = false;
}
catch (UnsupportedContentUrlException e)
{
// The store can't handle the content URL
contentUrlSupported = false;
}
if (directAccessUrl != null)
{
break;
}
}
if (directAccessUrl == null)
{
if (!directAccessUrlSupported)
{
// The direct access URL was not supported
throw new UnsupportedOperationException("Retrieving direct access URLs is not supported by this content store.");
}
else if (!contentUrlSupported)
{
// The content URL was not supported
throw new UnsupportedContentUrlException(this, contentUrl);
}
}
return directAccessUrl;
}
finally
{
readLock.unlock();
}
}
}

View File

@@ -23,8 +23,8 @@
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.content.replication;
package org.alfresco.repo.content.replication;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
@@ -32,136 +32,280 @@ import java.util.List;
import org.alfresco.repo.content.AbstractWritableContentStoreTest;
import org.alfresco.repo.content.ContentContext;
import org.alfresco.repo.content.ContentStore;
import org.alfresco.repo.content.UnsupportedContentUrlException;
import org.alfresco.repo.content.filestore.FileContentStore;
import org.alfresco.service.cmr.repository.ContentReader;
import org.alfresco.service.cmr.repository.ContentWriter;
import org.alfresco.service.cmr.repository.DirectAccessUrl;
import org.alfresco.test_category.OwnJVMTestsCategory;
import org.alfresco.util.GUID;
import org.alfresco.util.TempFileProvider;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Tests read and write functionality for the aggregating store.
* <p>
*
* @see org.alfresco.repo.content.replication.AggregatingContentStore
*
* @author Derek Hulley
* @author Mark Rogers
*/
@Category(OwnJVMTestsCategory.class)
public class AggregatingContentStoreTest extends AbstractWritableContentStoreTest
{
private static final String SOME_CONTENT = "The No. 1 Ladies' Detective Agency";
private AggregatingContentStore aggregatingStore;
private ContentStore primaryStore;
private List<ContentStore> secondaryStores;
@Before
public void before() throws Exception
{
File tempDir = TempFileProvider.getTempDir();
// create a primary file store
String storeDir = tempDir.getAbsolutePath() + File.separatorChar + GUID.generate();
primaryStore = new FileContentStore(ctx, storeDir);
// create some secondary file stores
secondaryStores = new ArrayList<ContentStore>(3);
for (int i = 0; i < 4; i++)
{
storeDir = tempDir.getAbsolutePath() + File.separatorChar + GUID.generate();
FileContentStore store = new FileContentStore(ctx, storeDir);
secondaryStores.add(store);
}
// Create the aggregating store
aggregatingStore = new AggregatingContentStore();
aggregatingStore.setPrimaryStore(primaryStore);
aggregatingStore.setSecondaryStores(secondaryStores);
}
@Override
public ContentStore getStore()
{
return aggregatingStore;
}
/**
* Get a writer into the store. This test class assumes that the store is writable and
* that it therefore supports the ability to write content.
*
* @return
* Returns a writer for new content
*/
protected ContentWriter getWriter()
{
ContentStore store = getStore();
return store.getWriter(ContentStore.NEW_CONTENT_CONTEXT);
}
/**
* {@inheritDoc}
* <p>
* This implementation creates some content in the store and returns the new content URL.
*/
protected String getExistingContentUrl()
{
ContentWriter writer = getWriter();
writer.putContent("Content for getExistingContentUrl");
return writer.getContentUrl();
}
public void testAddContent() throws Exception
{
ContentWriter writer = getWriter();
writer.putContent(SOME_CONTENT);
String contentUrl = writer.getContentUrl();
checkForUrl(contentUrl, true);
}
/**
* Checks that the url is present in each of the stores
*
* @param contentUrl String
* @param mustExist true if the content must exist, false if it must <b>not</b> exist
*/
private void checkForUrl(String contentUrl, boolean mustExist)
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests read and write functionality for the aggregating store.
* <p>
*
* @see org.alfresco.repo.content.replication.AggregatingContentStore
*
* @author Derek Hulley
* @author Mark Rogers
*/
@Category(OwnJVMTestsCategory.class)
public class AggregatingContentStoreTest extends AbstractWritableContentStoreTest
{
private static final String SOME_CONTENT = "The No. 1 Ladies' Detective Agency";
private AggregatingContentStore aggregatingStore;
private ContentStore primaryStore;
private List<ContentStore> secondaryStores;
@Mock
ContentStore primaryStoreMock;
@Mock
ContentStore secondaryStoreMock;
@Mock
AggregatingContentStore aggregatingContentStoreMock;
@Rule
public MockitoRule rule = MockitoJUnit.rule();
@Before
public void before() throws Exception
{
File tempDir = TempFileProvider.getTempDir();
// create a primary file store
String storeDir = tempDir.getAbsolutePath() + File.separatorChar + GUID.generate();
primaryStore = new FileContentStore(ctx, storeDir);
// create some secondary file stores
secondaryStores = new ArrayList<ContentStore>(3);
for (int i = 0; i < 4; i++)
{
storeDir = tempDir.getAbsolutePath() + File.separatorChar + GUID.generate();
FileContentStore store = new FileContentStore(ctx, storeDir);
secondaryStores.add(store);
}
// Create the aggregating store
aggregatingStore = new AggregatingContentStore();
aggregatingStore.setPrimaryStore(primaryStore);
aggregatingStore.setSecondaryStores(secondaryStores);
}
@Override
public ContentStore getStore()
{
return aggregatingStore;
}
/**
* Get a writer into the store. This test class assumes that the store is writable and
* that it therefore supports the ability to write content.
*
* @return
* Returns a writer for new content
*/
protected ContentWriter getWriter()
{
ContentStore store = getStore();
return store.getWriter(ContentStore.NEW_CONTENT_CONTEXT);
}
/**
* {@inheritDoc}
* <p>
* This implementation creates some content in the store and returns the new content URL.
*/
protected String getExistingContentUrl()
{
ContentWriter writer = getWriter();
writer.putContent("Content for getExistingContentUrl");
return writer.getContentUrl();
}
public void testAddContent() throws Exception
{
ContentWriter writer = getWriter();
writer.putContent(SOME_CONTENT);
String contentUrl = writer.getContentUrl();
checkForUrl(contentUrl, true);
}
/**
* Checks that the url is present in each of the stores
*
* @param contentUrl String
* @param mustExist true if the content must exist, false if it must <b>not</b> exist
*/
private void checkForUrl(String contentUrl, boolean mustExist)
{
ContentReader reader = getReader(contentUrl);
assertEquals("Reader state differs from expected: " + reader, mustExist, reader.exists());
}
public void testDelete() throws Exception
{
// write some content
ContentWriter writer = getWriter();
writer.putContent(SOME_CONTENT);
String contentUrl = writer.getContentUrl();
ContentReader reader = primaryStore.getReader(contentUrl);
assertTrue("Content was not in the primary store", reader.exists());
assertEquals("The content was incorrect", SOME_CONTENT, reader.getContentString());
getStore().delete(contentUrl);
checkForUrl(contentUrl, false);
}
public void testReadFromSecondaryStore()
{
// pick a secondary store and write some content to it
ContentStore secondaryStore = secondaryStores.get(2);
ContentWriter writer = secondaryStore.getWriter(ContentContext.NULL_CONTEXT);
writer.putContent(SOME_CONTENT);
String contentUrl = writer.getContentUrl();
checkForUrl(contentUrl, true);
}
}
assertEquals("Reader state differs from expected: " + reader, mustExist, reader.exists());
}
public void testDelete() throws Exception
{
// write some content
ContentWriter writer = getWriter();
writer.putContent(SOME_CONTENT);
String contentUrl = writer.getContentUrl();
ContentReader reader = primaryStore.getReader(contentUrl);
assertTrue("Content was not in the primary store", reader.exists());
assertEquals("The content was incorrect", SOME_CONTENT, reader.getContentString());
getStore().delete(contentUrl);
checkForUrl(contentUrl, false);
}
public void testReadFromSecondaryStore()
{
// pick a secondary store and write some content to it
ContentStore secondaryStore = secondaryStores.get(2);
ContentWriter writer = secondaryStore.getWriter(ContentContext.NULL_CONTEXT);
writer.putContent(SOME_CONTENT);
String contentUrl = writer.getContentUrl();
checkForUrl(contentUrl, true);
}
@Test
public void testIsDirectAccessSupported()
{
// Create the aggregating store
AggregatingContentStore aggStore = new AggregatingContentStore();
aggStore.setPrimaryStore(primaryStoreMock);
aggStore.setSecondaryStores(List.of(secondaryStoreMock));
// By default it is unsupported
assertFalse(aggStore.isDirectAccessSupported());
// Supported if at least one store supports direct access
{
when(primaryStoreMock.isDirectAccessSupported()).thenReturn(false);
when(secondaryStoreMock.isDirectAccessSupported()).thenReturn(true);
assertTrue(aggStore.isDirectAccessSupported());
when(primaryStoreMock.isDirectAccessSupported()).thenReturn(true);
when(secondaryStoreMock.isDirectAccessSupported()).thenReturn(true);
assertTrue(aggStore.isDirectAccessSupported());
when(primaryStoreMock.isDirectAccessSupported()).thenReturn(true);
when(secondaryStoreMock.isDirectAccessSupported()).thenReturn(false);
assertTrue(aggStore.isDirectAccessSupported());
}
}
@Test
public void testGetDirectAccessUrl()
{
// Create the aggregating store
AggregatingContentStore aggStore = new AggregatingContentStore();
aggStore.setPrimaryStore(primaryStoreMock);
aggStore.setSecondaryStores(List.of(secondaryStoreMock));
UnsupportedOperationException unsupportedExc = new UnsupportedOperationException();
UnsupportedContentUrlException unsupportedContentUrlExc = new UnsupportedContentUrlException(aggStore, "");
// By default it is unsupported
DirectAccessUrl directAccessUrl = aggStore.getDirectAccessUrl("url", null);
assertNull(directAccessUrl);
// Direct access not supported
try
{
when(primaryStoreMock.getDirectAccessUrl(eq("urlDANotSupported"), any())).thenThrow(unsupportedExc);
when(secondaryStoreMock.getDirectAccessUrl(eq("urlDANotSupported"), any())).thenThrow(unsupportedExc);
aggStore.getDirectAccessUrl("urlDANotSupported", null);
fail();
}
catch (UnsupportedOperationException e)
{
// Expected
}
try
{
when(primaryStoreMock.getDirectAccessUrl(eq("urlDANotSupported"), any())).thenThrow(unsupportedContentUrlExc);
when(secondaryStoreMock.getDirectAccessUrl(eq("urlDANotSupported"), any())).thenThrow(unsupportedExc);
aggStore.getDirectAccessUrl("urlDANotSupported", null);
fail();
}
catch (UnsupportedOperationException e)
{
// Expected
}
try
{
when(primaryStoreMock.getDirectAccessUrl(eq("urlDANotSupported"), any())).thenThrow(unsupportedExc);
when(secondaryStoreMock.getDirectAccessUrl(eq("urlDANotSupported"), any())).thenThrow(unsupportedContentUrlExc);
aggStore.getDirectAccessUrl("urlDANotSupported", null);
fail();
}
catch (UnsupportedOperationException e)
{
// Expected
}
// Content url not supported
try
{
when(primaryStoreMock.getDirectAccessUrl(eq("urlNotSupported"), any())).thenThrow(unsupportedContentUrlExc);
when(secondaryStoreMock.getDirectAccessUrl(eq("urlNotSupported"), any())).thenThrow(unsupportedContentUrlExc);
aggStore.getDirectAccessUrl("urlNotSupported", null);
fail();
}
catch (UnsupportedContentUrlException e)
{
// Expected
}
when(primaryStoreMock.getDirectAccessUrl(eq("urlPriSupported"), any())).thenReturn(new DirectAccessUrl());
when(secondaryStoreMock.getDirectAccessUrl(eq("urlPriSupported"), any())).thenThrow(unsupportedExc);
directAccessUrl = aggStore.getDirectAccessUrl("urlPriSupported", null);
assertNotNull(directAccessUrl);
when(primaryStoreMock.getDirectAccessUrl(eq("urlPriSupported"), any())).thenReturn(new DirectAccessUrl());
when(secondaryStoreMock.getDirectAccessUrl(eq("urlPriSupported"), any())).thenThrow(unsupportedContentUrlExc);
directAccessUrl = aggStore.getDirectAccessUrl("urlPriSupported", null);
assertNotNull(directAccessUrl);
when(primaryStoreMock.getDirectAccessUrl(eq("urlSecSupported"), any())).thenThrow(unsupportedExc);
when(secondaryStoreMock.getDirectAccessUrl(eq("urlSecSupported"), any())).thenReturn(new DirectAccessUrl());
directAccessUrl = aggStore.getDirectAccessUrl("urlSecSupported", null);
assertNotNull(directAccessUrl);
when(primaryStoreMock.getDirectAccessUrl(eq("urlSecSupported"), any())).thenThrow(unsupportedContentUrlExc);
when(secondaryStoreMock.getDirectAccessUrl(eq("urlSecSupported"), any())).thenReturn(new DirectAccessUrl());
directAccessUrl = aggStore.getDirectAccessUrl("urlSecSupported", null);
assertNotNull(directAccessUrl);
when(primaryStoreMock.getDirectAccessUrl(eq("urlPriSupported"), any())).thenReturn(new DirectAccessUrl());
when(secondaryStoreMock.getDirectAccessUrl(eq("urlSecSupported"), any())).thenReturn(new DirectAccessUrl());
directAccessUrl = aggStore.getDirectAccessUrl("urlPriSupported", null);
assertNotNull(directAccessUrl);
directAccessUrl = aggStore.getDirectAccessUrl("urlSecSupported", null);
assertNotNull(directAccessUrl);
}
}