Fixed ContentData entity cleanup when null value repeatedly applied to cm:content property

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@14826 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Derek Hulley
2009-06-22 12:08:21 +00:00
parent d5c07c5646
commit 5fe2c1507b
4 changed files with 107 additions and 68 deletions

View File

@@ -221,7 +221,9 @@ public abstract class AbstractContentAccessor implements ContentAccessor
/** /**
* Advise that listens for the completion of specific methods on the * Advise that listens for the completion of specific methods on the
* {@link java.nio.channels.ByteChannel} interface. * {@link java.nio.channels.ByteChannel} interface. This advise reacts
* only in the {@link #afterReturning(Object, Method, Object[], Object) afterReturning} phase
* so that the underlying stream methods have been successfully completed.
* *
* @author Derek Hulley * @author Derek Hulley
*/ */

View File

@@ -40,6 +40,7 @@ import org.alfresco.repo.content.transform.ContentTransformer;
import org.alfresco.repo.policy.JavaBehaviour; import org.alfresco.repo.policy.JavaBehaviour;
import org.alfresco.repo.policy.PolicyComponent; import org.alfresco.repo.policy.PolicyComponent;
import org.alfresco.repo.security.authentication.AuthenticationComponent; import org.alfresco.repo.security.authentication.AuthenticationComponent;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.ServiceRegistry; import org.alfresco.service.ServiceRegistry;
import org.alfresco.service.cmr.repository.ChildAssociationRef; import org.alfresco.service.cmr.repository.ChildAssociationRef;
import org.alfresco.service.cmr.repository.ContentData; import org.alfresco.service.cmr.repository.ContentData;
@@ -73,6 +74,7 @@ public class RoutingContentServiceTest extends TestCase
private static final String TEST_NAMESPACE = "http://www.alfresco.org/test/RoutingContentServiceTest"; private static final String TEST_NAMESPACE = "http://www.alfresco.org/test/RoutingContentServiceTest";
private TransactionService transactionService;
private ContentService contentService; private ContentService contentService;
private PolicyComponent policyComponent; private PolicyComponent policyComponent;
private NodeService nodeService; private NodeService nodeService;
@@ -88,6 +90,7 @@ public class RoutingContentServiceTest extends TestCase
@Override @Override
public void setUp() throws Exception public void setUp() throws Exception
{ {
transactionService = (TransactionService) ctx.getBean("transactionComponent");
nodeService = (NodeService) ctx.getBean("dbNodeService"); nodeService = (NodeService) ctx.getBean("dbNodeService");
contentService = (ContentService) ctx.getBean(ServiceRegistry.CONTENT_SERVICE.getLocalName()); contentService = (ContentService) ctx.getBean(ServiceRegistry.CONTENT_SERVICE.getLocalName());
this.policyComponent = (PolicyComponent) ctx.getBean("policyComponent"); this.policyComponent = (PolicyComponent) ctx.getBean("policyComponent");
@@ -148,7 +151,6 @@ public class RoutingContentServiceTest extends TestCase
private UserTransaction getUserTransaction() private UserTransaction getUserTransaction()
{ {
TransactionService transactionService = (TransactionService) ctx.getBean("transactionComponent");
return (UserTransaction) transactionService.getUserTransaction(); return (UserTransaction) transactionService.getUserTransaction();
} }
@@ -552,58 +554,70 @@ public class RoutingContentServiceTest extends TestCase
txn.rollback(); txn.rollback();
} }
/**
* Create several threads that will attempt to write to the same node property.
* The ContentWriter is handed to the thread, so this checks that the stream closure
* uses the transaction that called <code>close</code> and not the transaction that
* fetched the <code>ContentWriter</code>.
*/
public synchronized void testConcurrentWritesWithMultipleTxns() throws Exception public synchronized void testConcurrentWritesWithMultipleTxns() throws Exception
{ {
// ensure that there is no content to read on the node
ContentReader reader = contentService.getReader(contentNodeRef, ContentModel.PROP_CONTENT);
assertNull("Reader should not be available", reader);
// commit node so that threads can see node // commit node so that threads can see node
txn.commit(); txn.commit();
txn = null; txn = null;
UserTransaction txn = getUserTransaction();
txn.begin();
// ensure that there is no content to read on the node
ContentReader reader = contentService.getReader(contentNodeRef, ContentModel.PROP_CONTENT);
assertNull("Reader should not be available", reader);
ContentWriter threadWriter = contentService.getWriter(contentNodeRef, ContentModel.PROP_CONTENT, true);
String threadContent = "Thread content"; String threadContent = "Thread content";
WriteThread thread = new WriteThread(threadWriter, threadContent); WriteThread[] writeThreads = new WriteThread[5];
// kick off thread for (int i = 0; i < writeThreads.length; i++)
thread.start();
// wait for thread to get to its wait points
while (!thread.isWaiting())
{ {
wait(10); ContentWriter threadWriter = contentService.getWriter(contentNodeRef, ContentModel.PROP_CONTENT, true);
writeThreads[i] = new WriteThread(threadWriter, threadContent);
// Kick each thread off
writeThreads[i].start();
} }
// write to the content // Wait for all threads to be waiting
ContentWriter writer = contentService.getWriter(contentNodeRef, ContentModel.PROP_CONTENT, true); outer:
writer.putContent(SOME_CONTENT); while (true)
// fire thread up again
synchronized(threadWriter)
{ {
threadWriter.notifyAll(); // Wait for each thread to be in a transaction
for (int i = 0; i < writeThreads.length; i++)
{
if (!writeThreads[i].isWaiting())
{
wait(10);
continue outer;
}
}
// All threads were waiting
break outer;
} }
// thread is released - but we have to wait for it to complete
while (!thread.isDone()) // Kick each thread into the stream close phase
for (int i = 0; i < writeThreads.length; i++)
{ {
wait(10); synchronized(writeThreads[i])
{
writeThreads[i].notifyAll();
}
}
// Wait for the threads to complete (one way or another)
for (int i = 0; i < writeThreads.length; i++)
{
while (!writeThreads[i].isDone())
{
wait(10);
}
} }
// the thread has finished and has committed its changes - check the visibility
reader = contentService.getReader(contentNodeRef, ContentModel.PROP_CONTENT);
assertNotNull("Reader should now be available", reader);
String checkContent = reader.getContentString();
assertEquals("Content check failed", SOME_CONTENT, checkContent);
// rollback the txn
txn.rollback();
// check content has taken on thread's content // check content has taken on thread's content
reader = contentService.getReader(contentNodeRef, ContentModel.PROP_CONTENT); reader = contentService.getReader(contentNodeRef, ContentModel.PROP_CONTENT);
assertNotNull("Reader should now be available", reader); assertNotNull("Reader should now be available", reader);
checkContent = reader.getContentString(); String checkContent = reader.getContentString();
assertEquals("Content check failed", threadContent, checkContent); assertEquals("Content check failed", threadContent, checkContent);
} }
@@ -666,14 +680,15 @@ public class RoutingContentServiceTest extends TestCase
* output stream before terminating. * output stream before terminating.
* <p> * <p>
* When firing thread up, be sure to call <code>notify</code> on the * When firing thread up, be sure to call <code>notify</code> on the
* writer in order to let the thread run to completion. * Thread instance in order to let the thread run to completion.
*/ */
private class WriteThread extends Thread private class WriteThread extends Thread
{ {
private ContentWriter writer; private ContentWriter writer;
private String content; private String content;
private boolean isWaiting; private volatile boolean isWaiting;
private boolean isDone; private volatile boolean isDone;
private volatile Throwable error;
public WriteThread(ContentWriter writer, String content) public WriteThread(ContentWriter writer, String content)
{ {
@@ -681,6 +696,7 @@ public class RoutingContentServiceTest extends TestCase
this.content = content; this.content = content;
isWaiting = false; isWaiting = false;
isDone = false; isDone = false;
error = null;
} }
public boolean isWaiting() public boolean isWaiting()
@@ -692,48 +708,62 @@ public class RoutingContentServiceTest extends TestCase
{ {
return isDone; return isDone;
} }
public Throwable getError()
{
return error;
}
public void run() public void run()
{ {
authenticationComponent.setSystemUserAsCurrentUser(); authenticationComponent.setSystemUserAsCurrentUser();
isWaiting = false; synchronized (this)
isDone = false; {
UserTransaction txn = getUserTransaction(); isWaiting = true;
OutputStream os = writer.getContentOutputStream(); try { this.wait(); } catch (InterruptedException e) {}; // wait until notified
}
final OutputStream os = writer.getContentOutputStream();
// Callback to write to the content in a new transaction
RetryingTransactionCallback<Void> callback = new RetryingTransactionCallback<Void>()
{
public Void execute() throws Throwable
{
try
{
// put the content
if (writer.getEncoding() == null)
{
os.write(content.getBytes());
}
else
{
os.write(content.getBytes(writer.getEncoding()));
}
os.close();
}
finally
{
if (os != null)
{
try { os.close(); } catch (IOException e) {}
}
}
return null;
}
};
try try
{ {
txn.begin(); // not testing transactions - this is not a safe pattern transactionService.getRetryingTransactionHelper().doInTransaction(callback);
// put the content
if (writer.getEncoding() == null)
{
os.write(content.getBytes());
}
else
{
os.write(content.getBytes(writer.getEncoding()));
}
synchronized (writer)
{
isWaiting = true;
writer.wait(); // wait until notified
}
os.close();
os = null;
txn.commit();
} }
catch (Throwable e) catch (Throwable e)
{ {
try {txn.rollback(); } catch (Exception ee) {}
e.printStackTrace(); e.printStackTrace();
throw new RuntimeException("Failed writing to output stream for writer: " + writer, e); error = e;
} }
finally finally
{ {
if (os != null)
{
try { os.close(); } catch (IOException e) {}
}
isDone = true; isDone = true;
} }
} }

View File

@@ -159,6 +159,11 @@ public class ContentDataDAOImpl extends AbstractContentDataDAOImpl
{ {
// Get the content urls // Get the content urls
ContentDataEntity contentDataEntity = getContentDataEntity(id); ContentDataEntity contentDataEntity = getContentDataEntity(id);
// This might be null as there is no constraint ensuring that the node points to a valid ContentData entity
if (contentDataEntity == null)
{
continue;
}
// Only check the content URLs if one is present // Only check the content URLs if one is present
String contentUrl = contentDataEntity.getContentUrl(); String contentUrl = contentDataEntity.getContentUrl();
Long contentUrlId = contentDataEntity.getContentUrlId(); Long contentUrlId = contentDataEntity.getContentUrlId();

View File

@@ -1348,7 +1348,9 @@ public class HibernateNodeDaoServiceImpl extends HibernateDaoSupport implements
if (propertyDef != null && propertyDef.getDataType().getName().equals(DataTypeDefinition.CONTENT)) if (propertyDef != null && propertyDef.getDataType().getName().equals(DataTypeDefinition.CONTENT))
{ {
Set<Long> contentQNamesToRemoveIds = Collections.singleton(qnameId); Set<Long> contentQNamesToRemoveIds = Collections.singleton(qnameId);
contentDataDAO.deleteContentDataForNode(node.getId(), contentQNamesToRemoveIds); contentDataDAO.deleteContentDataForNode(
node.getId(),
contentQNamesToRemoveIds);
} }
Map<PropertyMapKey, NodePropertyValue> persistableProperties = new HashMap<PropertyMapKey, NodePropertyValue>(3); Map<PropertyMapKey, NodePropertyValue> persistableProperties = new HashMap<PropertyMapKey, NodePropertyValue>(3);