diff --git a/source/java/org/alfresco/repo/content/AbstractContentAccessor.java b/source/java/org/alfresco/repo/content/AbstractContentAccessor.java
index 896cc3ce5f..746fcd7c0d 100644
--- a/source/java/org/alfresco/repo/content/AbstractContentAccessor.java
+++ b/source/java/org/alfresco/repo/content/AbstractContentAccessor.java
@@ -221,7 +221,9 @@ public abstract class AbstractContentAccessor implements ContentAccessor
/**
* Advise that listens for the completion of specific methods on the
- * {@link java.nio.channels.ByteChannel} interface.
+ * {@link java.nio.channels.ByteChannel} interface. This advise reacts
+ * only in the {@link #afterReturning(Object, Method, Object[], Object) afterReturning} phase
+ * so that the underlying stream methods have been successfully completed.
*
* @author Derek Hulley
*/
diff --git a/source/java/org/alfresco/repo/content/RoutingContentServiceTest.java b/source/java/org/alfresco/repo/content/RoutingContentServiceTest.java
index 82e4b19571..f626f7c0ea 100644
--- a/source/java/org/alfresco/repo/content/RoutingContentServiceTest.java
+++ b/source/java/org/alfresco/repo/content/RoutingContentServiceTest.java
@@ -40,6 +40,7 @@ import org.alfresco.repo.content.transform.ContentTransformer;
import org.alfresco.repo.policy.JavaBehaviour;
import org.alfresco.repo.policy.PolicyComponent;
import org.alfresco.repo.security.authentication.AuthenticationComponent;
+import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.ServiceRegistry;
import org.alfresco.service.cmr.repository.ChildAssociationRef;
import org.alfresco.service.cmr.repository.ContentData;
@@ -73,6 +74,7 @@ public class RoutingContentServiceTest extends TestCase
private static final String TEST_NAMESPACE = "http://www.alfresco.org/test/RoutingContentServiceTest";
+ private TransactionService transactionService;
private ContentService contentService;
private PolicyComponent policyComponent;
private NodeService nodeService;
@@ -88,6 +90,7 @@ public class RoutingContentServiceTest extends TestCase
@Override
public void setUp() throws Exception
{
+ transactionService = (TransactionService) ctx.getBean("transactionComponent");
nodeService = (NodeService) ctx.getBean("dbNodeService");
contentService = (ContentService) ctx.getBean(ServiceRegistry.CONTENT_SERVICE.getLocalName());
this.policyComponent = (PolicyComponent) ctx.getBean("policyComponent");
@@ -148,7 +151,6 @@ public class RoutingContentServiceTest extends TestCase
private UserTransaction getUserTransaction()
{
- TransactionService transactionService = (TransactionService) ctx.getBean("transactionComponent");
return (UserTransaction) transactionService.getUserTransaction();
}
@@ -552,58 +554,70 @@ public class RoutingContentServiceTest extends TestCase
txn.rollback();
}
+ /**
+ * Create several threads that will attempt to write to the same node property.
+ * The ContentWriter is handed to the thread, so this checks that the stream closure
+ * uses the transaction that called close
and not the transaction that
+ * fetched the ContentWriter
.
+ */
public synchronized void testConcurrentWritesWithMultipleTxns() throws Exception
{
+ // ensure that there is no content to read on the node
+ ContentReader reader = contentService.getReader(contentNodeRef, ContentModel.PROP_CONTENT);
+ assertNull("Reader should not be available", reader);
+
// commit node so that threads can see node
txn.commit();
txn = null;
- UserTransaction txn = getUserTransaction();
- txn.begin();
-
- // ensure that there is no content to read on the node
- ContentReader reader = contentService.getReader(contentNodeRef, ContentModel.PROP_CONTENT);
- assertNull("Reader should not be available", reader);
-
- ContentWriter threadWriter = contentService.getWriter(contentNodeRef, ContentModel.PROP_CONTENT, true);
String threadContent = "Thread content";
- WriteThread thread = new WriteThread(threadWriter, threadContent);
- // kick off thread
- thread.start();
- // wait for thread to get to its wait points
- while (!thread.isWaiting())
+ WriteThread[] writeThreads = new WriteThread[5];
+ for (int i = 0; i < writeThreads.length; i++)
{
- wait(10);
+ ContentWriter threadWriter = contentService.getWriter(contentNodeRef, ContentModel.PROP_CONTENT, true);
+ writeThreads[i] = new WriteThread(threadWriter, threadContent);
+ // Kick each thread off
+ writeThreads[i].start();
}
- // write to the content
- ContentWriter writer = contentService.getWriter(contentNodeRef, ContentModel.PROP_CONTENT, true);
- writer.putContent(SOME_CONTENT);
-
- // fire thread up again
- synchronized(threadWriter)
+ // Wait for all threads to be waiting
+ outer:
+ while (true)
{
- threadWriter.notifyAll();
+ // Wait for each thread to be in a transaction
+ for (int i = 0; i < writeThreads.length; i++)
+ {
+ if (!writeThreads[i].isWaiting())
+ {
+ wait(10);
+ continue outer;
+ }
+ }
+ // All threads were waiting
+ break outer;
}
- // thread is released - but we have to wait for it to complete
- while (!thread.isDone())
+
+ // Kick each thread into the stream close phase
+ for (int i = 0; i < writeThreads.length; i++)
{
- wait(10);
+ synchronized(writeThreads[i])
+ {
+ writeThreads[i].notifyAll();
+ }
+ }
+ // Wait for the threads to complete (one way or another)
+ for (int i = 0; i < writeThreads.length; i++)
+ {
+ while (!writeThreads[i].isDone())
+ {
+ wait(10);
+ }
}
- // the thread has finished and has committed its changes - check the visibility
- reader = contentService.getReader(contentNodeRef, ContentModel.PROP_CONTENT);
- assertNotNull("Reader should now be available", reader);
- String checkContent = reader.getContentString();
- assertEquals("Content check failed", SOME_CONTENT, checkContent);
-
- // rollback the txn
- txn.rollback();
-
// check content has taken on thread's content
reader = contentService.getReader(contentNodeRef, ContentModel.PROP_CONTENT);
assertNotNull("Reader should now be available", reader);
- checkContent = reader.getContentString();
+ String checkContent = reader.getContentString();
assertEquals("Content check failed", threadContent, checkContent);
}
@@ -666,14 +680,15 @@ public class RoutingContentServiceTest extends TestCase
* output stream before terminating.
*
* When firing thread up, be sure to call notify
on the
- * writer in order to let the thread run to completion.
+ * Thread instance in order to let the thread run to completion.
*/
private class WriteThread extends Thread
{
private ContentWriter writer;
private String content;
- private boolean isWaiting;
- private boolean isDone;
+ private volatile boolean isWaiting;
+ private volatile boolean isDone;
+ private volatile Throwable error;
public WriteThread(ContentWriter writer, String content)
{
@@ -681,6 +696,7 @@ public class RoutingContentServiceTest extends TestCase
this.content = content;
isWaiting = false;
isDone = false;
+ error = null;
}
public boolean isWaiting()
@@ -692,48 +708,62 @@ public class RoutingContentServiceTest extends TestCase
{
return isDone;
}
+
+ public Throwable getError()
+ {
+ return error;
+ }
public void run()
{
authenticationComponent.setSystemUserAsCurrentUser();
- isWaiting = false;
- isDone = false;
- UserTransaction txn = getUserTransaction();
- OutputStream os = writer.getContentOutputStream();
+ synchronized (this)
+ {
+ isWaiting = true;
+ try { this.wait(); } catch (InterruptedException e) {}; // wait until notified
+ }
+
+ final OutputStream os = writer.getContentOutputStream();
+ // Callback to write to the content in a new transaction
+ RetryingTransactionCallback