Merged enterprise features

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@2746 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Paul Holmes-Higgin
2006-05-03 18:34:13 +00:00
parent 405c00bd8e
commit c37ff8805c
83 changed files with 15809 additions and 9 deletions

View File

@@ -0,0 +1,277 @@
/*
* Copyright (C) 2005 Alfresco, Inc.
*
* Licensed under the Alfresco Network License. You may obtain a
* copy of the License at
*
* http://www.alfrescosoftware.com/legal/
*
* Please view the license relevant to your network subscription.
*
* BY CLICKING THE "I UNDERSTAND AND ACCEPT" BOX, OR INSTALLING,
* READING OR USING ALFRESCO'S Network SOFTWARE (THE "SOFTWARE"),
* YOU ARE AGREEING ON BEHALF OF THE ENTITY LICENSING THE SOFTWARE
* ("COMPANY") THAT COMPANY WILL BE BOUND BY AND IS BECOMING A PARTY TO
* THIS ALFRESCO NETWORK AGREEMENT ("AGREEMENT") AND THAT YOU HAVE THE
* AUTHORITY TO BIND COMPANY. IF COMPANY DOES NOT AGREE TO ALL OF THE
* TERMS OF THIS AGREEMENT, DO NOT SELECT THE "I UNDERSTAND AND AGREE"
* BOX AND DO NOT INSTALL THE SOFTWARE OR VIEW THE SOURCE CODE. COMPANY
* HAS NOT BECOME A LICENSEE OF, AND IS NOT AUTHORIZED TO USE THE
* SOFTWARE UNLESS AND UNTIL IT HAS AGREED TO BE BOUND BY THESE LICENSE
* TERMS. THE "EFFECTIVE DATE" FOR THIS AGREEMENT SHALL BE THE DAY YOU
* CHECK THE "I UNDERSTAND AND ACCEPT" BOX.
*/
package org.alfresco.repo.content.replication;
import java.util.Set;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.repo.content.ContentStore;
import org.alfresco.repo.node.index.IndexRecovery;
import org.alfresco.service.cmr.repository.ContentReader;
import org.alfresco.service.cmr.repository.ContentWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
/**
* This component performs one-way replication between to content stores.
* <p>
* It ensure that the content from the first store is copied to the second
* store where required, therefore primarily acting as a backup or
* replication mechanism.
* <p>
* Once started, this process runs continuously on a low-priority thread
* and cannot be restarted.
*
* @author Derek Hulley
*/
public class ContentStoreReplicator
{
private static Log logger = LogFactory.getLog(ContentStoreReplicator.class);
private ContentStore sourceStore;
private ContentStore targetStore;
/** used to ensure that this instance gets started once only */
private boolean started;
/** set this on to keep replicating and never stop. The default is <code>true</code>. */
private boolean runContinuously;
/** the time to wait between passes */
private long waitTime;
public ContentStoreReplicator()
{
this.started = false;
this.runContinuously = true;
this.waitTime = 60000L;
}
/**
* Set the source that content must be taken from
*
* @param sourceStore the content source
*/
public void setSourceStore(ContentStore sourceStore)
{
this.sourceStore = sourceStore;
}
/**
* Set the target that content must be written to
*
* @param targetStore the content target
*/
public void setTargetStore(ContentStore targetStore)
{
this.targetStore = targetStore;
}
/**
* Set whether the thread should run continuously or terminate after
* a first pass.
*
* @param runContinuously true to run continously (default)
*/
public void setRunContinuously(boolean runContinuously)
{
this.runContinuously = runContinuously;
}
/**
* Set the time to wait between replication passes (in seconds)
*
* @param waitTime the time between passes (in seconds). Default is 60s.
*/
public void setWaitTime(long waitTime)
{
// convert to millis
this.waitTime = waitTime * 1000L;
}
/**
* Kick off the replication thread. This method can be used once.
*/
public synchronized void start()
{
if (started)
{
throw new AlfrescoRuntimeException("This ContentStoreReplicator has already been started");
}
// create a low-priority, daemon thread to do the work
Runnable runnable = new ReplicationRunner();
Thread thread = new Thread(runnable);
thread.setPriority(Thread.MIN_PRIORITY);
thread.setDaemon(true);
// start it
thread.start();
}
/**
* Stateful thread runnable that performs the replication.
*
* @author Derek Hulley
*/
private class ReplicationRunner implements Runnable
{
public void run()
{
// keep this thread going permanently
while (true)
{
try
{
ContentStoreReplicator.this.replicate();
// check if the process should terminate
if (!runContinuously)
{
// the thread has caught up with all the available work and should not
// run continuously
if (logger.isDebugEnabled())
{
logger.debug("Thread quitting - first pass of replication complete:");
}
break;
}
// pause the the required wait time
synchronized(ContentStoreReplicator.this)
{
ContentStoreReplicator.this.wait(waitTime);
}
}
catch (InterruptedException e)
{
// ignore
}
catch (Throwable e)
{
// report
logger.error("Replication failure", e);
}
}
}
}
/**
* Perform a full replication of all source to target URLs.
*/
private void replicate()
{
// get all the URLs from the source
Set<String> sourceUrls = sourceStore.getUrls();
// get all the URLs from the target
Set<String> targetUrls = targetStore.getUrls();
// remove source URLs that are present in the target
sourceUrls.removeAll(targetUrls);
// ensure that each remaining source URL is present in the target
for (String contentUrl : sourceUrls)
{
replicate(contentUrl);
}
}
/**
* Checks if the target store has the URL, and if not, replicates the content.
* <p>
* Any failures are reported and not thrown, but the target URL is removed for
* good measure.
*
* @param contentUrl the URL to replicate
*/
private void replicate(String contentUrl)
{
try
{
// check that the target doesn't have it
if (targetStore.exists(contentUrl))
{
// ignore this as the target has it already
if (logger.isDebugEnabled())
{
logger.debug("No replication required - URL exists in target store: \n" +
" source store: " + sourceStore + "\n" +
" target store: " + targetStore + "\n" +
" content URL: " + contentUrl);
}
return;
}
// get a writer to the target store - this can fail if the content is there now
ContentWriter writer = targetStore.getWriter(null, contentUrl);
// get the source reader
ContentReader reader = sourceStore.getReader(contentUrl);
if (!reader.exists())
{
// the content may have disappeared from the source store
if (logger.isDebugEnabled())
{
logger.debug("Source store no longer has URL - no replication possible: \n" +
" source store: " + sourceStore + "\n" +
" target store: " + targetStore + "\n" +
" content URL: " + contentUrl);
}
return;
}
// copy from the reader to the writer
writer.putContent(reader);
}
catch (Throwable e)
{
logger.error("Failed to replicate URL - removing target content: \n" +
" source store: " + sourceStore + "\n" +
" target store: " + targetStore + "\n" +
" content URL: " + contentUrl,
e);
targetStore.delete(contentUrl);
}
}
/**
* Kicks off the {@link ContentStoreReplicator content store replicator}.
*
* @author Derek Hulley
*/
public class ContentStoreReplicatorJob implements Job
{
/** KEY_CONTENT_STORE_REPLICATOR = 'contentStoreReplicator' */
public static final String KEY_CONTENT_STORE_REPLICATOR = "contentStoreReplicator";
/**
* Forces a full index recovery using the {@link IndexRecovery recovery component} passed
* in via the job detail.
*/
public void execute(JobExecutionContext context) throws JobExecutionException
{
ContentStoreReplicator contentStoreReplicator = (ContentStoreReplicator) context.getJobDetail()
.getJobDataMap().get(KEY_CONTENT_STORE_REPLICATOR);
if (contentStoreReplicator == null)
{
throw new JobExecutionException("Missing job data: " + KEY_CONTENT_STORE_REPLICATOR);
}
// reindex
contentStoreReplicator.start();
}
}
}

View File

@@ -0,0 +1,160 @@
/*
* Copyright (C) 2005 Alfresco, Inc.
*
* Licensed under the Alfresco Network License. You may obtain a
* copy of the License at
*
* http://www.alfrescosoftware.com/legal/
*
* Please view the license relevant to your network subscription.
*
* BY CLICKING THE "I UNDERSTAND AND ACCEPT" BOX, OR INSTALLING,
* READING OR USING ALFRESCO'S Network SOFTWARE (THE "SOFTWARE"),
* YOU ARE AGREEING ON BEHALF OF THE ENTITY LICENSING THE SOFTWARE
* ("COMPANY") THAT COMPANY WILL BE BOUND BY AND IS BECOMING A PARTY TO
* THIS ALFRESCO NETWORK AGREEMENT ("AGREEMENT") AND THAT YOU HAVE THE
* AUTHORITY TO BIND COMPANY. IF COMPANY DOES NOT AGREE TO ALL OF THE
* TERMS OF THIS AGREEMENT, DO NOT SELECT THE "I UNDERSTAND AND AGREE"
* BOX AND DO NOT INSTALL THE SOFTWARE OR VIEW THE SOURCE CODE. COMPANY
* HAS NOT BECOME A LICENSEE OF, AND IS NOT AUTHORIZED TO USE THE
* SOFTWARE UNLESS AND UNTIL IT HAS AGREED TO BE BOUND BY THESE LICENSE
* TERMS. THE "EFFECTIVE DATE" FOR THIS AGREEMENT SHALL BE THE DAY YOU
* CHECK THE "I UNDERSTAND AND ACCEPT" BOX.
*/
package org.alfresco.repo.content.replication;
import java.io.File;
import java.util.Set;
import junit.framework.TestCase;
import org.alfresco.repo.content.AbstractContentStore;
import org.alfresco.repo.content.ContentStore;
import org.alfresco.repo.content.filestore.FileContentStore;
import org.alfresco.service.cmr.repository.ContentWriter;
import org.alfresco.util.GUID;
import org.alfresco.util.TempFileProvider;
/**
* Tests the content store replicator.
*
* @see org.alfresco.repo.content.replication.ContentStoreReplicator
*
* @author Derek Hulley
*/
@SuppressWarnings("unused")
public class ContentStoreReplicatorTest extends TestCase
{
private static final String SOME_CONTENT = "The No. 1 Ladies' Detective Agency";
private ContentStoreReplicator replicator;
private ContentStore sourceStore;
private ContentStore targetStore;
@Override
public void setUp() throws Exception
{
super.setUp();
File tempDir = TempFileProvider.getTempDir();
// create the source file store
String storeDir = tempDir.getAbsolutePath() + File.separatorChar + getName() + File.separatorChar + GUID.generate();
sourceStore = new FileContentStore(storeDir);
// create the target file store
storeDir = tempDir.getAbsolutePath() + File.separatorChar + getName() + File.separatorChar + GUID.generate();
targetStore = new FileContentStore(storeDir);
// create the replicator
replicator = new ContentStoreReplicator();
replicator.setSourceStore(sourceStore);
replicator.setTargetStore(targetStore);
replicator.setRunContinuously(false); // replicate once
replicator.setWaitTime(0);
}
/**
* Creates a source with some files and replicates in a single pass, checking the results.
*/
public void testSinglePassReplication() throws Exception
{
ContentWriter writer = sourceStore.getWriter(null, null);
writer.putContent("123");
// replicate
replicator.start();
// wait a second
synchronized(this)
{
this.wait(1000L);
}
assertTrue("Target store doesn't have content added to source",
targetStore.exists(writer.getContentUrl()));
// this was a single pass, so now more replication should be done
writer = sourceStore.getWriter(null, null);
writer.putContent("456");
// wait a second
synchronized(this)
{
this.wait(1000L);
}
assertFalse("Replication should have been single-pass",
targetStore.exists(writer.getContentUrl()));
}
/**
* Adds content to the source while the replicator is going as fast as possible.
* Just to make it more interesting, the content is sometimes put in the target
* store as well.
* <p>
* Afterwards, some content is removed from the the target.
* <p>
* Then, finally, a check is performed to ensure that the source and target are
* in synch.
*/
public void testContinuousReplication() throws Exception
{
replicator.setRunContinuously(true);
replicator.setWaitTime(0L);
replicator.start();
String duplicateUrl = AbstractContentStore.createNewUrl();
// start the replicator - it won't wait between iterations
for (int i = 0; i < 10; i++)
{
// put some content into both the target and source
duplicateUrl = AbstractContentStore.createNewUrl();
ContentWriter duplicateTargetWriter = targetStore.getWriter(null, duplicateUrl);
ContentWriter duplicateSourceWriter = sourceStore.getWriter(null, duplicateUrl);
duplicateTargetWriter.putContent("Duplicate Target Content: " + i);
duplicateSourceWriter.putContent(duplicateTargetWriter.getReader());
for (int j = 0; j < 100; j++)
{
// write content
ContentWriter writer = sourceStore.getWriter(null, null);
writer.putContent("Repeated put: " + j);
}
}
// remove the last duplicated URL from the target
targetStore.delete(duplicateUrl);
// allow time for the replicator to catch up
synchronized(this)
{
this.wait(1000L);
}
// check that we have an exact match of URLs
Set<String> sourceUrls = sourceStore.getUrls();
Set<String> targetUrls = targetStore.getUrls();
sourceUrls.containsAll(targetUrls);
targetUrls.contains(sourceUrls);
}
}

View File

@@ -0,0 +1,447 @@
/*
* Copyright (C) 2005 Alfresco, Inc.
*
* Licensed under the Alfresco Network License. You may obtain a
* copy of the License at
*
* http://www.alfrescosoftware.com/legal/
*
* Please view the license relevant to your network subscription.
*
* BY CLICKING THE "I UNDERSTAND AND ACCEPT" BOX, OR INSTALLING,
* READING OR USING ALFRESCO'S Network SOFTWARE (THE "SOFTWARE"),
* YOU ARE AGREEING ON BEHALF OF THE ENTITY LICENSING THE SOFTWARE
* ("COMPANY") THAT COMPANY WILL BE BOUND BY AND IS BECOMING A PARTY TO
* THIS ALFRESCO NETWORK AGREEMENT ("AGREEMENT") AND THAT YOU HAVE THE
* AUTHORITY TO BIND COMPANY. IF COMPANY DOES NOT AGREE TO ALL OF THE
* TERMS OF THIS AGREEMENT, DO NOT SELECT THE "I UNDERSTAND AND AGREE"
* BOX AND DO NOT INSTALL THE SOFTWARE OR VIEW THE SOURCE CODE. COMPANY
* HAS NOT BECOME A LICENSEE OF, AND IS NOT AUTHORIZED TO USE THE
* SOFTWARE UNLESS AND UNTIL IT HAS AGREED TO BE BOUND BY THESE LICENSE
* TERMS. THE "EFFECTIVE DATE" FOR THIS AGREEMENT SHALL BE THE DAY YOU
* CHECK THE "I UNDERSTAND AND ACCEPT" BOX.
*/
package org.alfresco.repo.content.replication;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.repo.content.AbstractContentStore;
import org.alfresco.repo.content.ContentStore;
import org.alfresco.service.cmr.repository.ContentIOException;
import org.alfresco.service.cmr.repository.ContentReader;
import org.alfresco.service.cmr.repository.ContentStreamListener;
import org.alfresco.service.cmr.repository.ContentWriter;
import org.alfresco.service.transaction.TransactionService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* </h1><u>Replicating Content Store</u></h1>
* <p>
* A content store implementation that is able to replicate content between stores.
* Content is not persisted by this store, but rather it relies on any number of
* child {@link org.alfresco.repo.content.ContentStore stores} to provide access to
* content readers and writers.
* <p>
* The order in which the stores appear in the list of stores participating is
* important. The first store in the list is known as the <i>primary store</i>.
* When the replicator goes to fetch content, the stores are searched
* from first to last. The stores should therefore be arranged in order of
* speed.
* <p>
* It supports the notion of inbound and/or outbound replication, both of which can be
* operational at the same time.
*
* </h2><u>Outbound Replication</u></h2>
* <p>
* When this is enabled, then the primary store is used for writes. When the
* content write completes (i.e. the write channel is closed) then the content
* is synchronously copied to all other stores. The write is therefore slowed
* down, but the content replication will occur <i>in-transaction</i>.
* <p>
* The {@link #setOutboundThreadPoolExecutor(boolean) outboundThreadPoolExecutor }
* property to enable asynchronous replication.<br>
* With asynchronous replication, there is always a risk that a failure
* occurs during the replication. Depending on the configuration of the server,
* further action may need to be taken to rectify the problem manually.
*
* </h2><u>Inbound Replication</u></h2>
* <p>
* This can be used to lazily replicate content onto the primary store. When
* content can't be found in the primary store, the other stores are checked
* in order. If content is found, then it is copied into the local store
* before being returned. Subsequent accesses will use the primary store.<br>
* This should be used where the secondary stores are much slower, such as in
* the case of a store against some kind of archival mechanism.
*
* <h2><u>No Replication</u></h2>
* <p>
* Content is not written to the primary store only. The other stores are
* only used to retrieve content and the primary store is not updated with
* the content.
*
* @author Derek Hulley
*/
public class ReplicatingContentStore extends AbstractContentStore
{
/*
* The replication process uses thread synchronization as it can
* decide to write content to specific URLs during requests for
* a reader.
* While this won't help the underlying stores if there are
* multiple replications on top of them, it will prevent repeated
* work from multiple threads entering an instance of this component
* looking for the same content at the same time.
*/
private static Log logger = LogFactory.getLog(ReplicatingContentStore.class);
private TransactionService transactionService;
private ContentStore primaryStore;
private List<ContentStore> secondaryStores;
private boolean inbound;
private boolean outbound;
private ThreadPoolExecutor outboundThreadPoolExecutor;
private Lock readLock;
private Lock writeLock;
/**
* Default constructor set <code>inbound = false</code> and <code>outbound = true</code>;
*/
public ReplicatingContentStore()
{
inbound = false;
outbound = true;
ReadWriteLock storeLock = new ReentrantReadWriteLock();
readLock = storeLock.readLock();
writeLock = storeLock.writeLock();
}
/**
* Required to ensure that content listeners are executed in a transaction
*
* @param transactionService
*/
public void setTransactionService(TransactionService transactionService)
{
this.transactionService = transactionService;
}
/**
* Set the primary store that content will be replicated to or from
*
* @param primaryStore the primary content store
*/
public void setPrimaryStore(ContentStore primaryStore)
{
this.primaryStore = primaryStore;
}
/**
* Set the secondary stores that this component will replicate to or from
*
* @param stores a list of stores to replicate to or from
*/
public void setSecondaryStores(List<ContentStore> secondaryStores)
{
this.secondaryStores = secondaryStores;
}
/**
* Set whether or not this component should replicate content to the
* primary store if not found.
*
* @param inbound true to pull content onto the primary store when found
* on one of the other stores
*/
public void setInbound(boolean inbound)
{
this.inbound = inbound;
}
/**
* Set whether or not this component should replicate content to all stores
* as it is written.
*
* @param outbound true to enable synchronous replication to all stores
*/
public void setOutbound(boolean outbound)
{
this.outbound = outbound;
}
/**
* Set the thread pool executer
*
* @param outboundThreadPoolExecutor set this to have the synchronization occur in a separate
* thread
*/
public void setOutboundThreadPoolExecutor(ThreadPoolExecutor outboundThreadPoolExecutor)
{
this.outboundThreadPoolExecutor = outboundThreadPoolExecutor;
}
/**
* Forwards the call directly to the first store in the list of stores.
*/
public ContentReader getReader(String contentUrl) throws ContentIOException
{
if (primaryStore == null)
{
throw new AlfrescoRuntimeException("ReplicatingContentStore not initialised");
}
// get a read lock so that we are sure that no replication is underway
ContentReader existingContentReader = null;
readLock.lock();
try
{
// get a reader from the primary store
ContentReader primaryReader = primaryStore.getReader(contentUrl);
// give it straight back if the content is there
if (primaryReader.exists())
{
return primaryReader;
}
// the content is not in the primary reader so we have to go looking for it
ContentReader secondaryContentReader = null;
for (ContentStore store : secondaryStores)
{
ContentReader reader = store.getReader(contentUrl);
if (reader.exists())
{
// found the content in a secondary store
secondaryContentReader = reader;
break;
}
}
// we already know that the primary has nothing
// drop out if no content was found
if (secondaryContentReader == null)
{
return primaryReader;
}
// secondary content was found
// return it if we are not doing inbound
if (!inbound)
{
return secondaryContentReader;
}
// we have to replicate inbound
existingContentReader = secondaryContentReader;
}
finally
{
readLock.unlock();
}
// -- a small gap for concurrent threads to get through --
// do inbound replication
writeLock.lock();
try
{
// double check the primary
ContentReader primaryContentReader = primaryStore.getReader(contentUrl);
if (primaryContentReader.exists())
{
// we were beaten to it
return primaryContentReader;
}
// get a writer
ContentWriter primaryContentWriter = primaryStore.getWriter(existingContentReader, contentUrl);
// copy it over
primaryContentWriter.putContent(existingContentReader);
// get a writer to the new content
primaryContentReader = primaryContentWriter.getReader();
// done
return primaryContentReader;
}
finally
{
writeLock.unlock();
}
}
/**
*
*/
public ContentWriter getWriter(ContentReader existingContentReader, String newContentUrl) throws ContentIOException
{
// get the writer
ContentWriter writer = primaryStore.getWriter(existingContentReader, newContentUrl);
// attach a replicating listener if outbound replication is on
if (outbound)
{
if (logger.isDebugEnabled())
{
logger.debug(
"Attaching " + (outboundThreadPoolExecutor == null ? "" : "a") + "synchronous " +
"replicating listener to local writer: \n" +
" primary store: " + primaryStore + "\n" +
" writer: " + writer);
}
// attach the listener
ContentStreamListener listener = new ReplicatingWriteListener(secondaryStores, writer, outboundThreadPoolExecutor);
writer.addListener(listener);
writer.setTransactionService(transactionService); // mandatory when listeners are added
}
// done
return writer;
}
/**
* Performs a delete on the local store and if outbound replication is on, propogates
* the delete to the other stores too.
*
* @return Returns the value returned by the delete on the primary store.
*/
public boolean delete(String contentUrl) throws ContentIOException
{
// delete on the primary store
boolean deleted = primaryStore.delete(contentUrl);
// propogate outbound deletions
if (outbound)
{
for (ContentStore store : secondaryStores)
{
store.delete(contentUrl);
}
// log
if (logger.isDebugEnabled())
{
logger.debug("Propagated content delete to " + secondaryStores.size() + " stores:" + contentUrl);
}
}
// done
if (logger.isDebugEnabled())
{
logger.debug("Deleted content for URL: " + contentUrl);
}
return deleted;
}
/**
* @return Returns the results as given by the primary store, and if inbound
* replication is active, merges the URLs from the secondary stores.
*/
public Set<String> getUrls(Date createdAfter, Date createdBefore) throws ContentIOException
{
Set<String> urls = new HashSet<String>(1024);
// add in URLs from primary store
Set<String> primaryUrls = primaryStore.getUrls(createdAfter, createdBefore);
urls.addAll(primaryUrls);
// add in URLs from secondary stores (they are visible for reads)
for (ContentStore secondaryStore : secondaryStores)
{
Set<String> secondaryUrls = secondaryStore.getUrls(createdAfter, createdBefore);
// merge them
urls.addAll(secondaryUrls);
}
// done
if (logger.isDebugEnabled())
{
logger.debug("Found " + urls.size() + " URLs, of which " + primaryUrls.size() + " are primary: \n" +
" created after: " + createdAfter + "\n" +
" created before: " + createdBefore);
}
return urls;
}
/**
* Replicates the content upon stream closure. If the thread pool is available,
* then the process will be asynchronous.
* <p>
* No transaction boundaries have been declared as the
* {@link ContentWriter#addListener(ContentStreamListener)} method indicates that
* all listeners will be called within a transaction.
*
* @author Derek Hulley
*/
public static class ReplicatingWriteListener implements ContentStreamListener
{
private List<ContentStore> stores;
private ContentWriter writer;
private ThreadPoolExecutor threadPoolExecutor;
public ReplicatingWriteListener(
List<ContentStore> stores,
ContentWriter writer,
ThreadPoolExecutor threadPoolExecutor)
{
this.stores = stores;
this.writer = writer;
this.threadPoolExecutor = threadPoolExecutor;
}
public void contentStreamClosed() throws ContentIOException
{
Runnable runnable = new ReplicateOnCloseRunnable();
if (threadPoolExecutor == null)
{
// execute direct
runnable.run();
}
else
{
threadPoolExecutor.execute(runnable);
}
}
/**
* Performs the actual replication work.
*
* @author Derek Hulley
*/
private class ReplicateOnCloseRunnable implements Runnable
{
public void run()
{
for (ContentStore store : stores)
{
try
{
// replicate the content to the store - we know the URL that we want to write to
ContentReader reader = writer.getReader();
String contentUrl = reader.getContentUrl();
// in order to replicate, we have to specify the URL that we are going to write to
ContentWriter replicatedWriter = store.getWriter(null, contentUrl);
// write it
replicatedWriter.putContent(reader);
if (logger.isDebugEnabled())
{
logger.debug("Replicated content to store: \n" +
" url: " + contentUrl + "\n" +
" to store: " + store);
}
}
catch (Throwable e)
{
throw new ContentIOException("Content replication failed: \n" +
" url: " + writer.getContentUrl() + "\n" +
" to store: " + store);
}
}
}
}
}
}

View File

@@ -0,0 +1,201 @@
/*
* Copyright (C) 2005 Alfresco, Inc.
*
* Licensed under the Alfresco Network License. You may obtain a
* copy of the License at
*
* http://www.alfrescosoftware.com/legal/
*
* Please view the license relevant to your network subscription.
*
* BY CLICKING THE "I UNDERSTAND AND ACCEPT" BOX, OR INSTALLING,
* READING OR USING ALFRESCO'S Network SOFTWARE (THE "SOFTWARE"),
* YOU ARE AGREEING ON BEHALF OF THE ENTITY LICENSING THE SOFTWARE
* ("COMPANY") THAT COMPANY WILL BE BOUND BY AND IS BECOMING A PARTY TO
* THIS ALFRESCO NETWORK AGREEMENT ("AGREEMENT") AND THAT YOU HAVE THE
* AUTHORITY TO BIND COMPANY. IF COMPANY DOES NOT AGREE TO ALL OF THE
* TERMS OF THIS AGREEMENT, DO NOT SELECT THE "I UNDERSTAND AND AGREE"
* BOX AND DO NOT INSTALL THE SOFTWARE OR VIEW THE SOURCE CODE. COMPANY
* HAS NOT BECOME A LICENSEE OF, AND IS NOT AUTHORIZED TO USE THE
* SOFTWARE UNLESS AND UNTIL IT HAS AGREED TO BE BOUND BY THESE LICENSE
* TERMS. THE "EFFECTIVE DATE" FOR THIS AGREEMENT SHALL BE THE DAY YOU
* CHECK THE "I UNDERSTAND AND ACCEPT" BOX.
*/
package org.alfresco.repo.content.replication;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.alfresco.repo.content.AbstractContentReadWriteTest;
import org.alfresco.repo.content.ContentStore;
import org.alfresco.repo.content.filestore.FileContentStore;
import org.alfresco.repo.transaction.DummyTransactionService;
import org.alfresco.service.cmr.repository.ContentReader;
import org.alfresco.service.cmr.repository.ContentWriter;
import org.alfresco.util.GUID;
import org.alfresco.util.TempFileProvider;
/**
* Tests read and write functionality for the replicating store.
* <p>
* By default, replication is off for both the inbound and outbound
* replication. Specific tests change this.
*
* @see org.alfresco.repo.content.replication.ReplicatingContentStore
*
* @author Derek Hulley
*/
public class ReplicatingContentStoreTest extends AbstractContentReadWriteTest
{
private static final String SOME_CONTENT = "The No. 1 Ladies' Detective Agency";
private ReplicatingContentStore replicatingStore;
private ContentStore primaryStore;
private List<ContentStore> secondaryStores;
@Override
public void setUp() throws Exception
{
super.setUp();
File tempDir = TempFileProvider.getTempDir();
// create a primary file store
String storeDir = tempDir.getAbsolutePath() + File.separatorChar + GUID.generate();
primaryStore = new FileContentStore(storeDir);
// create some secondary file stores
secondaryStores = new ArrayList<ContentStore>(3);
for (int i = 0; i < 3; i++)
{
storeDir = tempDir.getAbsolutePath() + File.separatorChar + GUID.generate();
ContentStore store = new FileContentStore(storeDir);
secondaryStores.add(store);
}
replicatingStore = new ReplicatingContentStore();
replicatingStore.setTransactionService(new DummyTransactionService());
replicatingStore.setPrimaryStore(primaryStore);
replicatingStore.setSecondaryStores(secondaryStores);
replicatingStore.setOutbound(false);
replicatingStore.setInbound(false);
}
@Override
public ContentStore getStore()
{
return replicatingStore;
}
/**
* Performs checks necessary to ensure the proper replication of content for the given
* URL
*/
private void checkForReplication(boolean inbound, boolean outbound, String contentUrl, String content)
{
if (inbound)
{
ContentReader reader = primaryStore.getReader(contentUrl);
assertTrue("Content was not replicated into the primary store", reader.exists());
assertEquals("The replicated content was incorrect", content, reader.getContentString());
}
if (outbound)
{
for (ContentStore store : secondaryStores)
{
ContentReader reader = store.getReader(contentUrl);
assertTrue("Content was not replicated out to the secondary stores within a second", reader.exists());
assertEquals("The replicated content was incorrect", content, reader.getContentString());
}
}
}
/**
* Checks that the url is present in each of the stores
*
* @param contentUrl
* @param mustExist true if the content must exist, false if it must <b>not</b> exist
*/
private void checkForUrl(String contentUrl, boolean mustExist)
{
// check that the URL is present for each of the stores
for (ContentStore store : secondaryStores)
{
Set<String> urls = store.getUrls();
assertTrue("URL of new content not present in store", urls.contains(contentUrl) == mustExist);
}
}
public void testNoReplication() throws Exception
{
ContentWriter writer = getWriter();
writer.putContent(SOME_CONTENT);
checkForReplication(false, false, writer.getContentUrl(), SOME_CONTENT);
}
public void testOutboundReplication() throws Exception
{
replicatingStore.setOutbound(true);
// write some content
ContentWriter writer = getWriter();
writer.putContent(SOME_CONTENT);
String contentUrl = writer.getContentUrl();
checkForReplication(false, true, contentUrl, SOME_CONTENT);
// check for outbound deletes
replicatingStore.delete(contentUrl);
checkForUrl(contentUrl, false);
}
public void testAsyncOutboundReplication() throws Exception
{
ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
replicatingStore.setOutbound(true);
replicatingStore.setOutboundThreadPoolExecutor(tpe);
// write some content
ContentWriter writer = getWriter();
writer.putContent(SOME_CONTENT);
String contentUrl = writer.getContentUrl();
// wait for a second
synchronized(this)
{
this.wait(1000L);
}
checkForReplication(false, true, contentUrl, SOME_CONTENT);
// check for outbound deletes
replicatingStore.delete(contentUrl);
checkForUrl(contentUrl, false);
}
public void testInboundReplication() throws Exception
{
replicatingStore.setInbound(false);
// pick a secondary store and write some content to it
ContentStore secondaryStore = secondaryStores.get(2);
ContentWriter writer = secondaryStore.getWriter(null, null);
writer.putContent(SOME_CONTENT);
String contentUrl = writer.getContentUrl();
// get a reader from the replicating store
ContentReader reader = replicatingStore.getReader(contentUrl);
assertTrue("Reader must have been found in secondary store", reader.exists());
// set inbound replication on and repeat
replicatingStore.setInbound(true);
reader = replicatingStore.getReader(contentUrl);
// this time, it must have been replicated to the primary store
checkForReplication(true, false, contentUrl, SOME_CONTENT);
}
}