SEARCH-2782 commit time as event time (#377)

This commit is contained in:
Davide
2021-04-20 16:32:41 +02:00
committed by GitHub
parent 8a6a76d191
commit 9d711213cc
9 changed files with 246 additions and 95 deletions

View File

@@ -563,6 +563,7 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO
Long txnId = txn.getId(); Long txnId = txn.getId();
// Update it // Update it
Long now = System.currentTimeMillis(); Long now = System.currentTimeMillis();
txn.setCommitTimeMs(now);
updateTransaction(txnId, now); updateTransaction(txnId, now);
} }
} }
@@ -604,6 +605,17 @@ public abstract class AbstractNodeDAOImpl implements NodeDAO, BatchingDAO
return txn; return txn;
} }
public Long getCurrentTransactionCommitTime()
{
Long commitTime = null;
TransactionEntity resource = AlfrescoTransactionSupport.getResource(KEY_TRANSACTION);
if(resource != null)
{
commitTime = resource.getCommitTimeMs();
}
return commitTime;
}
public Long getCurrentTransactionId(boolean ensureNew) public Long getCurrentTransactionId(boolean ensureNew)
{ {
TransactionEntity txn; TransactionEntity txn;

View File

@@ -76,6 +76,13 @@ public interface NodeDAO extends NodeBulkLoader
* Transaction * Transaction
*/ */
/**
* @return the commit time of the current transaction entry or <tt>null</tt> if
* there have not been any modifications to nodes registered in the
* transaction.
*/
Long getCurrentTransactionCommitTime();
/** /**
* @param ensureNew <tt>true</tt> to ensure that a new transaction entry is created * @param ensureNew <tt>true</tt> to ensure that a new transaction entry is created
* if the current transaction does not have one. * if the current transaction does not have one.

View File

@@ -182,4 +182,11 @@ public interface AclDAO
* @return Long * @return Long
*/ */
public Long getMaxChangeSetIdByCommitTime(long maxCommitTime); public Long getMaxChangeSetIdByCommitTime(long maxCommitTime);
/**
* @return the commit time of the current ACL change set entry or <tt>null</tt> if
* there have not been any modifications.
*/
public Long getCurrentChangeSetCommitTime();
} }

View File

@@ -1637,6 +1637,7 @@ public class AclDAOImpl implements AclDAO
} }
private static final String RESOURCE_KEY_ACL_CHANGE_SET_ID = "acl.change.set.id"; private static final String RESOURCE_KEY_ACL_CHANGE_SET_ID = "acl.change.set.id";
private static final String RESOURCE_KEY_ACL_CHANGE_SET_COMMIT_TIME_MS = "acl.change.commit.set.time.ms";
private UpdateChangeSetListener updateChangeSetListener = new UpdateChangeSetListener(); private UpdateChangeSetListener updateChangeSetListener = new UpdateChangeSetListener();
/** /**
@@ -1662,9 +1663,17 @@ public class AclDAOImpl implements AclDAO
} }
// Update it // Update it
long commitTimeMs = System.currentTimeMillis(); long commitTimeMs = System.currentTimeMillis();
AlfrescoTransactionSupport.bindResource(RESOURCE_KEY_ACL_CHANGE_SET_COMMIT_TIME_MS, commitTimeMs);
aclCrudDAO.updateAclChangeSet(changeSetId, commitTimeMs); aclCrudDAO.updateAclChangeSet(changeSetId, commitTimeMs);
} }
} }
@Override
public Long getCurrentChangeSetCommitTime()
{
return AlfrescoTransactionSupport.getResource(RESOURCE_KEY_ACL_CHANGE_SET_COMMIT_TIME_MS);
}
/** /**
* Support to get the current ACL change set and bind this to the transaction. So we only make one new version of an * Support to get the current ACL change set and bind this to the transaction. So we only make one new version of an
* ACL per change set. If something is in the current change set we can update it. * ACL per change set. If something is in the current change set we can update it.

View File

@@ -27,12 +27,16 @@ package org.alfresco.repo.event2;
import java.io.Serializable; import java.io.Serializable;
import java.net.URI; import java.net.URI;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Deque; import java.util.Deque;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.alfresco.repo.domain.node.NodeDAO;
import org.alfresco.repo.domain.node.TransactionEntity;
import org.alfresco.repo.event.v1.model.EventType; import org.alfresco.repo.event.v1.model.EventType;
import org.alfresco.repo.event.v1.model.RepoEvent; import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.repo.event2.filter.ChildAssociationTypeFilter; import org.alfresco.repo.event2.filter.ChildAssociationTypeFilter;
@@ -92,6 +96,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
private TransactionService transactionService; private TransactionService transactionService;
private PersonService personService; private PersonService personService;
protected NodeResourceHelper nodeResourceHelper; protected NodeResourceHelper nodeResourceHelper;
protected NodeDAO nodeDAO;
private EventGeneratorQueue eventGeneratorQueue; private EventGeneratorQueue eventGeneratorQueue;
private NodeTypeFilter nodeTypeFilter; private NodeTypeFilter nodeTypeFilter;
@@ -111,6 +116,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
PropertyCheck.mandatory(this, "transactionService", transactionService); PropertyCheck.mandatory(this, "transactionService", transactionService);
PropertyCheck.mandatory(this, "personService", personService); PropertyCheck.mandatory(this, "personService", personService);
PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper); PropertyCheck.mandatory(this, "nodeResourceHelper", nodeResourceHelper);
PropertyCheck.mandatory(this, "nodeDAO", nodeDAO);
PropertyCheck.mandatory(this, "eventGeneratorQueue", eventGeneratorQueue); PropertyCheck.mandatory(this, "eventGeneratorQueue", eventGeneratorQueue);
this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter(); this.nodeTypeFilter = eventFilterRegistry.getNodeTypeFilter();
@@ -144,6 +150,11 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
new JavaBehaviour(this, "beforeDeleteAssociation")); new JavaBehaviour(this, "beforeDeleteAssociation"));
} }
public void setNodeDAO(NodeDAO nodeDAO)
{
this.nodeDAO = nodeDAO;
}
public void setPolicyComponent(PolicyComponent policyComponent) public void setPolicyComponent(PolicyComponent policyComponent)
{ {
this.policyComponent = policyComponent; this.policyComponent = policyComponent;
@@ -366,15 +377,22 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
return (childAssociationTypeFilter.isExcluded(childAssocType) || (userFilter.isExcluded(user))); return (childAssociationTypeFilter.isExcluded(childAssocType) || (userFilter.isExcluded(user)));
} }
private EventInfo getEventInfo(String user) protected EventInfo getEventInfo(String user)
{ {
return new EventInfo().setTimestamp(ZonedDateTime.now()) return new EventInfo().setTimestamp(getCurrentTransactionTimestamp())
.setId(UUID.randomUUID().toString()) .setId(UUID.randomUUID().toString())
.setTxnId(AlfrescoTransactionSupport.getTransactionId()) .setTxnId(AlfrescoTransactionSupport.getTransactionId())
.setPrincipal(user) .setPrincipal(user)
.setSource(URI.create("/" + descriptorService.getCurrentRepositoryDescriptor().getId())); .setSource(URI.create("/" + descriptorService.getCurrentRepositoryDescriptor().getId()));
} }
private ZonedDateTime getCurrentTransactionTimestamp()
{
Long currentTransactionCommitTime = nodeDAO.getCurrentTransactionCommitTime();
Instant commitTimeMs = Instant.ofEpochMilli(currentTransactionCommitTime);
return ZonedDateTime.ofInstant(commitTimeMs, ZoneOffset.UTC);
}
@Override @Override
protected void onBootstrap(ApplicationEvent applicationEvent) protected void onBootstrap(ApplicationEvent applicationEvent)
{ {
@@ -391,6 +409,8 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
{ {
@Override @Override
public void afterCommit() public void afterCommit()
{
if(isTransactionCommitted())
{ {
try try
{ {
@@ -423,6 +443,7 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
LOGGER.error("Unexpected error while sending repository events", e); LOGGER.error("Unexpected error while sending repository events", e);
} }
} }
}
protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator) protected void sendEvent(NodeRef nodeRef, EventConsolidator consolidator)
{ {
@@ -430,6 +451,15 @@ public class EventGenerator extends AbstractLifecycleBean implements Initializin
eventGeneratorQueue.accept(()-> createEvent(nodeRef, consolidator, eventInfo)); eventGeneratorQueue.accept(()-> createEvent(nodeRef, consolidator, eventInfo));
} }
/**
* @return true if a node transaction is not only active, but also committed with modifications.
* This means that a {@link TransactionEntity} object was created.
*/
protected boolean isTransactionCommitted()
{
return nodeDAO.getCurrentTransactionCommitTime() != null;
}
private RepoEvent<?> createEvent(NodeRef nodeRef, EventConsolidator consolidator, EventInfo eventInfo) private RepoEvent<?> createEvent(NodeRef nodeRef, EventConsolidator consolidator, EventInfo eventInfo)
{ {
String user = eventInfo.getPrincipal(); String user = eventInfo.getPrincipal();

View File

@@ -42,6 +42,7 @@
<property name="personService" ref="personService"/> <property name="personService" ref="personService"/>
<property name="nodeResourceHelper" ref="nodeResourceHelper"/> <property name="nodeResourceHelper" ref="nodeResourceHelper"/>
<property name="eventGeneratorQueue" ref="eventGeneratorQueue"/> <property name="eventGeneratorQueue" ref="eventGeneratorQueue"/>
<property name="nodeDAO" ref="nodeDAO"/>
</bean> </bean>
<bean id="baseNodeResourceHelper" abstract="true"> <bean id="baseNodeResourceHelper" abstract="true">

View File

@@ -92,12 +92,10 @@ public class NodeDAOTest extends TestCase
public void testTransaction() throws Throwable public void testTransaction() throws Throwable
{ {
final boolean[] newTxn = new boolean[] {false}; final boolean[] newTxn = new boolean[] {false};
RetryingTransactionCallback<Long> getTxnIdCallback = new RetryingTransactionCallback<Long>() RetryingTransactionCallback<Pair<Long, Long>> getTxnIdCallback = () -> {
{ Long currentTransactionId = nodeDAO.getCurrentTransactionId(newTxn[0]);
public Long execute() throws Throwable Long currentTransactionCommitTime = nodeDAO.getCurrentTransactionCommitTime();
{ return new Pair<>(currentTransactionId, currentTransactionCommitTime);
return nodeDAO.getCurrentTransactionId(newTxn[0]);
}
}; };
// No txn // No txn
try try
@@ -110,14 +108,24 @@ public class NodeDAOTest extends TestCase
// Expected // Expected
} }
// Read-only // Read-only
assertNull("No Txn ID should be present in read-only txn", txnHelper.doInTransaction(getTxnIdCallback, true)); Pair<Long, Long> txn0 = txnHelper.doInTransaction(getTxnIdCallback);
Long txnId0 = txn0.getFirst();
Long commitTime0 = txn0.getSecond();
assertNull("No Txn ID should be present in read-only txn", txnId0);
assertNull("No Txn Commit time should be present in read-only txn", commitTime0);
// First success // First success
Long txnId1 = txnHelper.doInTransaction(getTxnIdCallback); Pair<Long, Long> txn1 = txnHelper.doInTransaction(getTxnIdCallback);
Long txnId1 = txn1.getFirst();
Long commitTime1 = txn1.getSecond();
assertNull("No Txn ID should be present in untouched txn", txnId1); assertNull("No Txn ID should be present in untouched txn", txnId1);
assertNull("No Txn Commit time should be present in untouched txn", commitTime1);
// Second success // Second success
newTxn[0] = true; newTxn[0] = true;
Long txnId2 = txnHelper.doInTransaction(getTxnIdCallback); Pair<Long, Long> txn2 = txnHelper.doInTransaction(getTxnIdCallback);
Long txnId2 = txn2.getFirst();
Long commitTime2 = txn2.getSecond();
assertNotNull("Txn ID should be present by forcing it", txnId2); assertNotNull("Txn ID should be present by forcing it", txnId2);
assertNotNull("Txn commit time should be present by forcing it", commitTime2);
} }
public void testSelectNodePropertiesByTypes() throws Exception public void testSelectNodePropertiesByTypes() throws Exception

View File

@@ -26,9 +26,14 @@
package org.alfresco.repo.event2; package org.alfresco.repo.event2;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List; import java.util.List;
import org.alfresco.model.ContentModel; import org.alfresco.model.ContentModel;
import org.alfresco.repo.domain.node.NodeDAO;
import org.alfresco.repo.domain.node.Transaction;
import org.alfresco.repo.event.v1.model.EventData; import org.alfresco.repo.event.v1.model.EventData;
import org.alfresco.repo.event.v1.model.EventType; import org.alfresco.repo.event.v1.model.EventType;
import org.alfresco.repo.event.v1.model.NodeResource; import org.alfresco.repo.event.v1.model.NodeResource;
@@ -38,6 +43,7 @@ import org.alfresco.service.namespace.QName;
import org.alfresco.util.GUID; import org.alfresco.util.GUID;
import org.alfresco.util.PropertyMap; import org.alfresco.util.PropertyMap;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
/** /**
* @author Iulian Aftene * @author Iulian Aftene
@@ -45,6 +51,9 @@ import org.junit.Test;
public class CreateRepoEventIT extends AbstractContextAwareRepoEvent public class CreateRepoEventIT extends AbstractContextAwareRepoEvent
{ {
@Autowired
private NodeDAO nodeDAO;
@Test @Test
public void testCreateEvent() public void testCreateEvent()
{ {
@@ -151,7 +160,30 @@ public class CreateRepoEventIT extends AbstractContextAwareRepoEvent
} }
@Test @Test
public void testCteateMultipleNodesInTheSameTransaction() public void testEventTimestampEqualsToTransactionCommitTime()
{
String name = "TestFile-" + System.currentTimeMillis() + ".txt";
PropertyMap propertyMap = new PropertyMap();
propertyMap.put(ContentModel.PROP_NAME, name);
//create a node and return the transaction id required later
Long transactionId = retryingTransactionHelper.doInTransaction(() -> {
nodeService.createNode(rootNodeRef, ContentModel.ASSOC_CHILDREN,
QName.createQName(TEST_NAMESPACE, GUID.generate()), ContentModel.TYPE_CONTENT, propertyMap).getChildRef();
return nodeDAO.getCurrentTransactionId(false);
});
RepoEvent<EventData<NodeResource>> resultRepoEvent = getRepoEvent(1);
Transaction transaction = nodeDAO.getTxnById(transactionId);
Instant commitTimeMs = Instant.ofEpochMilli(transaction.getCommitTimeMs());
ZonedDateTime timestamp = ZonedDateTime.ofInstant(commitTimeMs, ZoneOffset.UTC);
assertEquals(timestamp, resultRepoEvent.getTime());
}
@Test
public void testCreateMultipleNodesInTheSameTransaction()
{ {
retryingTransactionHelper.doInTransaction(() -> { retryingTransactionHelper.doInTransaction(() -> {
for (int i = 0; i < 3; i++) for (int i = 0; i < 3; i++)

View File

@@ -31,8 +31,16 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.Status; import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction; import javax.transaction.UserTransaction;
import junit.framework.TestCase; import junit.framework.TestCase;
@@ -54,6 +62,7 @@ import org.alfresco.repo.security.permissions.SimpleAccessControlEntry;
import org.alfresco.repo.security.permissions.SimpleAccessControlListProperties; import org.alfresco.repo.security.permissions.SimpleAccessControlListProperties;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport; import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport.TxnReadState; import org.alfresco.repo.transaction.AlfrescoTransactionSupport.TxnReadState;
import org.alfresco.repo.transaction.TransactionListener;
import org.alfresco.service.ServiceRegistry; import org.alfresco.service.ServiceRegistry;
import org.alfresco.service.cmr.dictionary.DictionaryService; import org.alfresco.service.cmr.dictionary.DictionaryService;
import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.repository.NodeRef;
@@ -71,9 +80,14 @@ import org.alfresco.test_category.OwnJVMTestsCategory;
import org.alfresco.util.ApplicationContextHelper; import org.alfresco.util.ApplicationContextHelper;
import org.alfresco.util.EqualsHelper; import org.alfresco.util.EqualsHelper;
import org.alfresco.util.testing.category.DBTests; import org.alfresco.util.testing.category.DBTests;
import org.alfresco.util.transaction.TransactionListenerAdapter;
import org.awaitility.Awaitility;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
@Category({OwnJVMTestsCategory.class, DBTests.class}) @Category({OwnJVMTestsCategory.class, DBTests.class})
public class AclDaoComponentTest extends TestCase public class AclDaoComponentTest extends TestCase
{ {
@@ -245,6 +259,37 @@ public class AclDaoComponentTest extends TestCase
assertEquals(aclDaoComponent.getAccessControlListProperties(aclProps.getId()), aclProps); assertEquals(aclDaoComponent.getAccessControlListProperties(aclProps.getId()), aclProps);
} }
public void testGetCurrentACLChangeSet()
throws HeuristicRollbackException, RollbackException, HeuristicMixedException, SystemException
{
SimpleAccessControlListProperties properties = new SimpleAccessControlListProperties();
properties.setAclType(ACLType.DEFINING);
properties.setVersioned(true);
Long id = aclDaoComponent.createAccessControlList(properties).getId();
AccessControlListProperties aclProps = aclDaoComponent.getAccessControlListProperties(id);
assertEquals(aclProps.getAclType(), ACLType.DEFINING);
assertEquals(aclProps.getAclVersion(), Long.valueOf(1l));
assertEquals(aclProps.getInherits(), Boolean.TRUE);
AtomicBoolean afterCommit = new AtomicBoolean();
AlfrescoTransactionSupport.bindListener(new TransactionListenerAdapter() {
@Override
public void afterCommit()
{
//The commit time is available only after a transaction is committed
assertNotNull(aclDaoComponent.getCurrentChangeSetCommitTime());
afterCommit.set(true);
}
});
testTX.commit();
await("Commit time not null")
.atMost(3, TimeUnit.SECONDS)
.untilAtomic(afterCommit, equalTo(true));
}
public void testCreateShared() public void testCreateShared()
{ {
SimpleAccessControlListProperties properties = new SimpleAccessControlListProperties(); SimpleAccessControlListProperties properties = new SimpleAccessControlListProperties();