Fixed AR-546:

VersionCounterDao is enclosed in non-propagating transactions (via config)
   version_counter row is locked while version number is incremented
   Added tests to ensure failure before fixing


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@2646 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Derek Hulley
2006-04-11 10:54:09 +00:00
parent d9a5b76e63
commit f870a17a66
7 changed files with 310 additions and 85 deletions

View File

@@ -17,6 +17,7 @@
package org.alfresco.repo.version.common.counter;
import org.alfresco.service.cmr.repository.StoreRef;
import org.alfresco.service.namespace.QName;
/**
* Version counter DAO service interface.
@@ -25,6 +26,14 @@ import org.alfresco.service.cmr.repository.StoreRef;
*/
public interface VersionCounterDaoService
{
/**
* Helper method for simple patching
*
* @param nodeTypeQName not used
* @param storeRef the store to create a counter for
*/
public void beforeCreateStore(QName nodeTypeQName, StoreRef storeRef);
/**
* Get the next available version number for the specified store.
*

View File

@@ -16,75 +16,203 @@
*/
package org.alfresco.repo.version.common.counter;
import javax.transaction.UserTransaction;
import junit.framework.TestCase;
import org.alfresco.repo.transaction.TransactionUtil;
import org.alfresco.repo.transaction.TransactionUtil.TransactionWork;
import org.alfresco.service.ServiceRegistry;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.repository.StoreRef;
import org.alfresco.util.BaseSpringTest;
import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.ApplicationContextHelper;
import org.springframework.context.ApplicationContext;
/**
* @author Roy Wetherall
*/
public class VersionCounterDaoServiceTest extends BaseSpringTest
public class VersionCounterDaoServiceTest extends TestCase
{
/*
* Test store id's
*/
private final static String STORE_ID_1 = "test1_" + System.currentTimeMillis();
private final static String STORE_ID_2 = "test2_" + System.currentTimeMillis();
private static final String STORE_NONE = "test3_" + System.currentTimeMillis();;
private static ApplicationContext ctx = ApplicationContextHelper.getApplicationContext();
private StoreRef storeRef1;
private StoreRef storeRef2;
private TransactionService transactionService;
private NodeService nodeService;
private VersionCounterDaoService counter;
@Override
public void onSetUpInTransaction()
public void setUp() throws Exception
{
nodeService = (NodeService) applicationContext.getBean("dbNodeService");
counter = (VersionCounterDaoService) applicationContext.getBean("versionCounterDaoService");
ServiceRegistry serviceRegistry = (ServiceRegistry) ctx.getBean("ServiceRegistry");
transactionService = serviceRegistry.getTransactionService();
nodeService = serviceRegistry.getNodeService();
counter = (VersionCounterDaoService) ctx.getBean("versionCounterDaoService");
storeRef1 = nodeService.createStore(StoreRef.PROTOCOL_WORKSPACE, "test1_" + System.currentTimeMillis());
storeRef2 = nodeService.createStore(StoreRef.PROTOCOL_WORKSPACE, "test2_" + System.currentTimeMillis());
}
@Override
public void tearDown() throws Exception
{
synchronized(endWait)
{
endWait.notifyAll();
}
}
public void testSetUp() throws Exception
{
assertNotNull(nodeService);
assertNotNull(transactionService);
assertNotNull(counter);
}
/**
* Test nextVersionNumber
*/
public void testNextVersionNumber()
public void testNextVersionNumber() throws Exception
{
// Create the store references
StoreRef store1 = new StoreRef(StoreRef.PROTOCOL_WORKSPACE, VersionCounterDaoServiceTest.STORE_ID_1);
StoreRef store2 = new StoreRef(StoreRef.PROTOCOL_WORKSPACE, VersionCounterDaoServiceTest.STORE_ID_2);
StoreRef storeNone = new StoreRef(StoreRef.PROTOCOL_WORKSPACE, VersionCounterDaoServiceTest.STORE_NONE);
int store1Version0 = this.counter.nextVersionNumber(store1);
assertEquals(store1Version0, 1);
int store1Version1 = this.counter.nextVersionNumber(store1);
assertEquals(store1Version1, 2);
int store2Version0 = this.counter.nextVersionNumber(store2);
assertEquals(store2Version0, 1);
int store1Version2 = this.counter.nextVersionNumber(store1);
assertEquals(store1Version2, 3);
int store2Version1 = this.counter.nextVersionNumber(store2);
assertEquals(store2Version1, 2);
int store1Current = this.counter.currentVersionNumber(store1);
assertEquals(store1Current, 3);
int store2Current = this.counter.currentVersionNumber(store2);
assertEquals(store2Current, 2);
int storeNoneCurrent = this.counter.currentVersionNumber(storeNone);
assertEquals(storeNoneCurrent, 0);
// Need to clean-up since the version counter works in its own transaction
this.counter.resetVersionNumber(store1);
this.counter.resetVersionNumber(store2);
UserTransaction txn = transactionService.getUserTransaction();
try
{
txn.begin();
int store1Version0 = counter.nextVersionNumber(storeRef1);
assertEquals(1, store1Version0);
int store1Version1 = counter.nextVersionNumber(storeRef1);
assertEquals(2, store1Version1);
int store2Version0 = counter.nextVersionNumber(storeRef2);
assertEquals(1, store2Version0);
int store1Version2 = counter.nextVersionNumber(storeRef1);
assertEquals(3, store1Version2);
int store2Version1 = counter.nextVersionNumber(storeRef2);
assertEquals(2, store2Version1);
int store1Current = counter.currentVersionNumber(storeRef1);
assertEquals(3, store1Current);
int store2Current = counter.currentVersionNumber(storeRef2);
assertEquals(2, store2Current);
// Need to clean-up since the version counter works in its own transaction
counter.resetVersionNumber(storeRef1);
counter.resetVersionNumber(storeRef2);
}
finally
{
try { txn.rollback(); } catch (Throwable e) {}
}
}
public void testConcurrentVersionNumber() throws Throwable
{
VersionCounterThread[] threads = new VersionCounterThread[5];
for (int i = 0; i < threads.length; i++)
{
threads[i] = new VersionCounterThread("VersionCounterThread_" + i);
// start the thread
threads[i].start();
}
// now wait until all the threads are waiting
int iteration = 0;
while (waitCount < threads.length && iteration++ < 5000)
{
synchronized (this)
{
this.wait(20); // 20 ms wait
}
}
// reset wait count
this.waitCount = 0;
// kick them off
synchronized(beginWait)
{
beginWait.notifyAll();
}
// now wait until all the threads are waiting
while (waitCount < threads.length && iteration++ < 5000)
{
synchronized (this)
{
this.wait(20); // 20 ms wait
}
}
// let them finish
iteration = 0;
synchronized(endWait)
{
endWait.notifyAll();
}
// check for exceptions
for (VersionCounterThread thread : threads)
{
if (thread.error != null)
{
throw thread.error;
}
}
}
private Object beginWait = new String("BEGIN_WAIT");
private Object endWait = new String("END_WAIT");
private int waitCount = 0;
private class VersionCounterThread extends Thread
{
private Throwable error = new RuntimeException("Execution didn't complete");
public VersionCounterThread(String name)
{
super(name);
}
@Override
public void run()
{
TransactionWork<Object> versionWork = new TransactionWork<Object>()
{
public Object doWork() throws Exception
{
// wait for all other threads to enter into their transactions
synchronized(beginWait)
{
waitCount++;
beginWait.wait(10000L);
}
int startVersion = counter.currentVersionNumber(storeRef1);
// increment it
int incrementedVersion = counter.nextVersionNumber(storeRef1);
assertTrue("Version number was not incremented", incrementedVersion > startVersion);
// wait for all other threads to have finished their increments
synchronized(endWait)
{
waitCount++;
endWait.wait(10000L);
}
return null;
}
};
try
{
TransactionUtil.executeInNonPropagatingUserTransaction(transactionService, versionWork, false);
error = null;
}
catch (Throwable e)
{
error = e;
e.printStackTrace();
}
}
}
}

View File

@@ -19,8 +19,14 @@ package org.alfresco.repo.version.common.counter.hibernate;
import org.alfresco.repo.domain.StoreKey;
import org.alfresco.repo.domain.VersionCount;
import org.alfresco.repo.domain.hibernate.VersionCountImpl;
import org.alfresco.repo.node.NodeServicePolicies;
import org.alfresco.repo.policy.JavaBehaviour;
import org.alfresco.repo.policy.PolicyComponent;
import org.alfresco.repo.version.common.counter.VersionCounterDaoService;
import org.alfresco.service.cmr.repository.StoreRef;
import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.hibernate.LockMode;
import org.springframework.orm.hibernate3.support.HibernateDaoSupport;
/**
@@ -33,28 +39,93 @@ import org.springframework.orm.hibernate3.support.HibernateDaoSupport;
*
* @author Derek Hulley
*/
public class HibernateVersionCounterDaoServiceImpl extends HibernateDaoSupport implements VersionCounterDaoService
public class HibernateVersionCounterDaoServiceImpl
extends HibernateDaoSupport
implements VersionCounterDaoService, NodeServicePolicies.BeforeCreateStorePolicy
{
private PolicyComponent policyComponent;
/**
* Retrieves or creates a version counter
* @param policyComponent the component to register behaviour
*/
public void setPolicyComponent(PolicyComponent policyComponent)
{
this.policyComponent = policyComponent;
}
/**
* Bind to receive notifications of store creations
*/
public void init()
{
policyComponent.bindClassBehaviour(
QName.createQName(NamespaceService.ALFRESCO_URI, "beforeCreateStore"),
this,
new JavaBehaviour(this, "beforeCreateStore"));
}
/**
* Create a version counter for the store
* @param nodeTypeQName
* @param storeRef
*/
public void beforeCreateStore(QName nodeTypeQName, StoreRef storeRef)
{
final StoreKey storeKey = new StoreKey(storeRef.getProtocol(), storeRef.getIdentifier());
VersionCount versionCount = (VersionCount) getHibernateTemplate().get(VersionCountImpl.class, storeKey);
if (versionCount != null)
{
// already exists
return;
}
versionCount = new VersionCountImpl();
versionCount.setKey(storeKey);
getHibernateTemplate().save(versionCount);
}
/**
* Retrieves or creates a version counter. This locks the counter against updates for the
* current transaction.
*
* @param storeKey
* @param storeKey the primary key of the counter
* @return Returns a current or new version counter
*/
private VersionCount getVersionCounter(StoreRef storeRef)
{
StoreKey storeKey = new StoreKey(storeRef.getProtocol(), storeRef.getIdentifier());
// get the version counter
VersionCount versionCounter = (VersionCount) getHibernateTemplate().get(VersionCountImpl.class, storeKey);
final StoreKey storeKey = new StoreKey(storeRef.getProtocol(), storeRef.getIdentifier());
// check if it exists
if (versionCounter == null)
VersionCount versionCount = (VersionCount) getHibernateTemplate().get(
VersionCountImpl.class,
storeKey,
LockMode.UPGRADE);
if (versionCount == null)
{
// create a new one
versionCounter = new VersionCountImpl();
versionCounter.setKey(storeKey);
getHibernateTemplate().save(versionCounter);
// This could fail on some databases with concurrent adds. But it is only those databases
// that don't lock the index against an addition of the row, and then it will only fail once.
versionCount = new VersionCountImpl();
versionCount.setKey(storeKey);
getHibernateTemplate().save(versionCount);
// debug
if (logger.isDebugEnabled())
{
logger.debug("Created version counter: \n" +
" Thread: " + Thread.currentThread().getName() + "\n" +
" Version count: " + versionCount.getVersionCount());
}
}
return versionCounter;
else
{
// debug
if (logger.isDebugEnabled())
{
logger.debug("Got version counter: \n" +
" Thread: " + Thread.currentThread().getName() + "\n" +
" Version count: " + versionCount.getVersionCount());
}
}
// done
return versionCount;
}
/**
@@ -63,12 +134,21 @@ public class HibernateVersionCounterDaoServiceImpl extends HibernateDaoSupport i
* @param storeRef the version store id
* @return the next version number
*/
public synchronized int nextVersionNumber(StoreRef storeRef)
public int nextVersionNumber(StoreRef storeRef)
{
// get the version counter
VersionCount versionCounter = getVersionCounter(storeRef);
VersionCount versionCount = getVersionCounter(storeRef);
// get an incremented count
return versionCounter.incrementVersionCount();
int nextCount = versionCount.incrementVersionCount();
// done
if (logger.isDebugEnabled())
{
logger.debug("Incremented version count: \n" +
" Thread: " + Thread.currentThread().getName() + "\n" +
" New version count: " + versionCount.getVersionCount());
}
return nextCount;
}
/**
@@ -77,7 +157,7 @@ public class HibernateVersionCounterDaoServiceImpl extends HibernateDaoSupport i
* @param storeRef the store reference
* @return the current version number, zero if no version yet allocated.
*/
public synchronized int currentVersionNumber(StoreRef storeRef)
public int currentVersionNumber(StoreRef storeRef)
{
// get the version counter
VersionCount versionCounter = getVersionCounter(storeRef);