mirror of
https://github.com/Alfresco/SearchServices.git
synced 2025-09-17 14:21:20 +00:00
Merge pull request #1493 from Alfresco/servicepack/MNT-23072
servicepack/MNT-23072
This commit is contained in:
@@ -94,10 +94,6 @@ public class MetadataTracker extends ActivatableTracker
|
|||||||
private final ConcurrentLinkedQueue<Long> nodesToIndex = new ConcurrentLinkedQueue<>();
|
private final ConcurrentLinkedQueue<Long> nodesToIndex = new ConcurrentLinkedQueue<>();
|
||||||
private final ConcurrentLinkedQueue<Long> nodesToPurge = new ConcurrentLinkedQueue<>();
|
private final ConcurrentLinkedQueue<Long> nodesToPurge = new ConcurrentLinkedQueue<>();
|
||||||
private final ConcurrentLinkedQueue<String> queriesToReindex = new ConcurrentLinkedQueue<>();
|
private final ConcurrentLinkedQueue<String> queriesToReindex = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
private final boolean isRunningInProduction =
|
|
||||||
!Boolean.parseBoolean(System.getProperty("alfresco.test", "false"));
|
|
||||||
|
|
||||||
private ForkJoinPool forkJoinPool;
|
private ForkJoinPool forkJoinPool;
|
||||||
|
|
||||||
// Share run and write locks across all MetadataTracker threads
|
// Share run and write locks across all MetadataTracker threads
|
||||||
@@ -186,7 +182,7 @@ public class MetadataTracker extends ActivatableTracker
|
|||||||
|
|
||||||
// In order to apply performance optimizations, checking the availability of Repo Web Scripts is required.
|
// In order to apply performance optimizations, checking the availability of Repo Web Scripts is required.
|
||||||
// As these services are available from ACS 6.2
|
// As these services are available from ACS 6.2
|
||||||
if (checkRepoServicesAvailability && isRunningInProduction)
|
if (checkRepoServicesAvailability)
|
||||||
{
|
{
|
||||||
// Try invoking getNextTxCommitTime service
|
// Try invoking getNextTxCommitTime service
|
||||||
try
|
try
|
||||||
@@ -863,7 +859,7 @@ public class MetadataTracker extends ActivatableTracker
|
|||||||
latestTransaction.setCommitTimeMs(transactions.getMaxTxnCommitTime());
|
latestTransaction.setCommitTimeMs(transactions.getMaxTxnCommitTime());
|
||||||
latestTransaction.setId(transactions.getMaxTxnId());
|
latestTransaction.setId(transactions.getMaxTxnId());
|
||||||
|
|
||||||
if (!isTransactionIndexed(latestTransaction))
|
if (isTransactionToBeIndexed(latestTransaction))
|
||||||
{
|
{
|
||||||
transactions = new Transactions(Collections.singletonList(latestTransaction), transactions.getMaxTxnCommitTime(),
|
transactions = new Transactions(Collections.singletonList(latestTransaction), transactions.getMaxTxnCommitTime(),
|
||||||
transactions.getMaxTxnId());
|
transactions.getMaxTxnId());
|
||||||
@@ -880,7 +876,7 @@ public class MetadataTracker extends ActivatableTracker
|
|||||||
return transactions;
|
return transactions;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isTransactionIndexed(Transaction transaction)
|
boolean isTransactionToBeIndexed(Transaction transaction)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -990,7 +986,7 @@ public class MetadataTracker extends ActivatableTracker
|
|||||||
final AtomicInteger counterTransaction = new AtomicInteger();
|
final AtomicInteger counterTransaction = new AtomicInteger();
|
||||||
Collection<List<Transaction>> txBatches = transactions.getTransactions().stream()
|
Collection<List<Transaction>> txBatches = transactions.getTransactions().stream()
|
||||||
.peek(txnsFound::add)
|
.peek(txnsFound::add)
|
||||||
.filter(this::isTransactionIndexed)
|
.filter(this::isTransactionToBeIndexed)
|
||||||
.collect(Collectors.groupingBy(transaction -> counterTransaction.getAndAdd(
|
.collect(Collectors.groupingBy(transaction -> counterTransaction.getAndAdd(
|
||||||
(int) (transaction.getDeletes() + transaction.getUpdates())) / transactionDocsBatchSize))
|
(int) (transaction.getDeletes() + transaction.getUpdates())) / transactionDocsBatchSize))
|
||||||
.values();
|
.values();
|
||||||
|
@@ -2,7 +2,7 @@
|
|||||||
* #%L
|
* #%L
|
||||||
* Alfresco Search Services
|
* Alfresco Search Services
|
||||||
* %%
|
* %%
|
||||||
* Copyright (C) 2005 - 2020 Alfresco Software Limited
|
* Copyright (C) 2005 - 2022 Alfresco Software Limited
|
||||||
* %%
|
* %%
|
||||||
* This file is part of the Alfresco software.
|
* This file is part of the Alfresco software.
|
||||||
* If the software was purchased under a paid Alfresco license, the terms of
|
* If the software was purchased under a paid Alfresco license, the terms of
|
||||||
@@ -23,7 +23,6 @@
|
|||||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||||
* #L%
|
* #L%
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.alfresco.solr.tracker;
|
package org.alfresco.solr.tracker;
|
||||||
|
|
||||||
import org.alfresco.solr.AbstractAlfrescoDistributedIT;
|
import org.alfresco.solr.AbstractAlfrescoDistributedIT;
|
||||||
@@ -33,10 +32,12 @@ import org.alfresco.solr.client.AclChangeSet;
|
|||||||
import org.alfresco.solr.client.AclReaders;
|
import org.alfresco.solr.client.AclReaders;
|
||||||
import org.alfresco.solr.client.Node;
|
import org.alfresco.solr.client.Node;
|
||||||
import org.alfresco.solr.client.NodeMetaData;
|
import org.alfresco.solr.client.NodeMetaData;
|
||||||
|
import org.alfresco.solr.client.SOLRAPIQueueClient;
|
||||||
import org.alfresco.solr.client.Transaction;
|
import org.alfresco.solr.client.Transaction;
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.search.TermQuery;
|
import org.apache.lucene.search.TermQuery;
|
||||||
import org.apache.solr.SolrTestCaseJ4;
|
import org.apache.solr.SolrTestCaseJ4;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@@ -47,6 +48,7 @@ import java.util.List;
|
|||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
import static org.alfresco.repo.search.adaptor.QueryConstants.FIELD_DOC_TYPE;
|
import static org.alfresco.repo.search.adaptor.QueryConstants.FIELD_DOC_TYPE;
|
||||||
|
import static org.alfresco.solr.AlfrescoSolrUtils.MAX_WAIT_TIME;
|
||||||
import static org.alfresco.solr.AlfrescoSolrUtils.getAcl;
|
import static org.alfresco.solr.AlfrescoSolrUtils.getAcl;
|
||||||
import static org.alfresco.solr.AlfrescoSolrUtils.getAclChangeSet;
|
import static org.alfresco.solr.AlfrescoSolrUtils.getAclChangeSet;
|
||||||
import static org.alfresco.solr.AlfrescoSolrUtils.getAclReaders;
|
import static org.alfresco.solr.AlfrescoSolrUtils.getAclReaders;
|
||||||
@@ -54,7 +56,6 @@ import static org.alfresco.solr.AlfrescoSolrUtils.getNode;
|
|||||||
import static org.alfresco.solr.AlfrescoSolrUtils.getNodeMetaData;
|
import static org.alfresco.solr.AlfrescoSolrUtils.getNodeMetaData;
|
||||||
import static org.alfresco.solr.AlfrescoSolrUtils.getTransaction;
|
import static org.alfresco.solr.AlfrescoSolrUtils.getTransaction;
|
||||||
import static org.alfresco.solr.AlfrescoSolrUtils.indexAclChangeSet;
|
import static org.alfresco.solr.AlfrescoSolrUtils.indexAclChangeSet;
|
||||||
import static org.alfresco.solr.AlfrescoSolrUtils.list;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Joel
|
* @author Joel
|
||||||
@@ -65,7 +66,7 @@ public class DistributedDbidRangeAlfrescoSolrTrackerIT extends AbstractAlfrescoD
|
|||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void initData() throws Throwable
|
public static void initData() throws Throwable
|
||||||
{
|
{
|
||||||
initSolrServers(2, getSimpleClassName(), getShardMethod());
|
initSolrServers(3, getSimpleClassName(), getShardMethod());
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
@@ -74,18 +75,24 @@ public class DistributedDbidRangeAlfrescoSolrTrackerIT extends AbstractAlfrescoD
|
|||||||
dismissSolrServers();
|
dismissSolrServers();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@After
|
||||||
public void testDbIdRange() throws Exception
|
public void deleteDataFromIndex() throws Exception
|
||||||
{
|
{
|
||||||
putHandleDefaults();
|
SOLRAPIQueueClient.TRANSACTION_QUEUE.clear();
|
||||||
|
SOLRAPIQueueClient.NODE_MAP.clear();
|
||||||
|
deleteByQueryAllClients("*:*");
|
||||||
|
waitForDocCount(new TermQuery(new Term("content@s___t@{http://www.alfresco.org/model/content/1.0}content", "world")), 0, MAX_WAIT_TIME);
|
||||||
|
}
|
||||||
|
|
||||||
int numAcls = 250;
|
private List<Acl> createAcls(int numAcls)
|
||||||
|
{
|
||||||
AclChangeSet bulkAclChangeSet = getAclChangeSet(numAcls);
|
AclChangeSet bulkAclChangeSet = getAclChangeSet(numAcls);
|
||||||
|
|
||||||
List<Acl> bulkAcls = new ArrayList<>();
|
List<Acl> bulkAcls = new ArrayList<>();
|
||||||
List<AclReaders> bulkAclReaders = new ArrayList<>();
|
List<AclReaders> bulkAclReaders = new ArrayList<>();
|
||||||
|
|
||||||
for(int i=0; i<numAcls; i++) {
|
for(int i=0; i<numAcls; i++)
|
||||||
|
{
|
||||||
Acl bulkAcl = getAcl(bulkAclChangeSet);
|
Acl bulkAcl = getAcl(bulkAclChangeSet);
|
||||||
bulkAcls.add(bulkAcl);
|
bulkAcls.add(bulkAcl);
|
||||||
bulkAclReaders.add(getAclReaders(bulkAclChangeSet,
|
bulkAclReaders.add(getAclReaders(bulkAclChangeSet,
|
||||||
@@ -99,6 +106,17 @@ public class DistributedDbidRangeAlfrescoSolrTrackerIT extends AbstractAlfrescoD
|
|||||||
bulkAcls,
|
bulkAcls,
|
||||||
bulkAclReaders);
|
bulkAclReaders);
|
||||||
|
|
||||||
|
return bulkAcls;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDbIdRange() throws Exception
|
||||||
|
{
|
||||||
|
putHandleDefaults();
|
||||||
|
|
||||||
|
int numAcls = 250;
|
||||||
|
var bulkAcls = createAcls(numAcls);
|
||||||
|
|
||||||
int numNodes = 150;
|
int numNodes = 150;
|
||||||
List<Node> nodes = new ArrayList<>();
|
List<Node> nodes = new ArrayList<>();
|
||||||
List<NodeMetaData> nodeMetaDatas = new ArrayList<>();
|
List<NodeMetaData> nodeMetaDatas = new ArrayList<>();
|
||||||
@@ -108,14 +126,14 @@ public class DistributedDbidRangeAlfrescoSolrTrackerIT extends AbstractAlfrescoD
|
|||||||
for(int i=0; i<numNodes; i++)
|
for(int i=0; i<numNodes; i++)
|
||||||
{
|
{
|
||||||
int aclIndex = i % numAcls;
|
int aclIndex = i % numAcls;
|
||||||
Node node = getNode((long)i, bigTxn, bulkAcls.get(aclIndex), Node.SolrApiNodeStatus.UPDATED);
|
Node node = getNode(i, bigTxn, bulkAcls.get(aclIndex), Node.SolrApiNodeStatus.UPDATED);
|
||||||
nodes.add(node);
|
nodes.add(node);
|
||||||
NodeMetaData nodeMetaData = getNodeMetaData(node, bigTxn, bulkAcls.get(aclIndex), "mike", null, false);
|
NodeMetaData nodeMetaData = getNodeMetaData(node, bigTxn, bulkAcls.get(aclIndex), "mike", null, false);
|
||||||
nodeMetaDatas.add(nodeMetaData);
|
nodeMetaDatas.add(nodeMetaData);
|
||||||
}
|
}
|
||||||
|
|
||||||
indexTransaction(bigTxn, nodes, nodeMetaDatas);
|
indexTransaction(bigTxn, nodes, nodeMetaDatas);
|
||||||
waitForDocCount(new TermQuery(new Term("content@s___t@{http://www.alfresco.org/model/content/1.0}content", "world")), numNodes, 100000);
|
waitForDocCount(new TermQuery(new Term("content@s___t@{http://www.alfresco.org/model/content/1.0}content", "world")), numNodes, MAX_WAIT_TIME);
|
||||||
waitForDocCountAllCores(new TermQuery(new Term(FIELD_DOC_TYPE, SolrInformationServer.DOC_TYPE_ACL)), numAcls, 80000);
|
waitForDocCountAllCores(new TermQuery(new Term(FIELD_DOC_TYPE, SolrInformationServer.DOC_TYPE_ACL)), numAcls, 80000);
|
||||||
|
|
||||||
//The test framework has ranges 0-100, 100-200, ...
|
//The test framework has ranges 0-100, 100-200, ...
|
||||||
@@ -123,6 +141,41 @@ public class DistributedDbidRangeAlfrescoSolrTrackerIT extends AbstractAlfrescoD
|
|||||||
assertShardCount(1, new TermQuery(new Term("content@s___t@{http://www.alfresco.org/model/content/1.0}content", "world")), 50);
|
assertShardCount(1, new TermQuery(new Term("content@s___t@{http://www.alfresco.org/model/content/1.0}content", "world")), 50);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIndexLastTransaction() throws Exception
|
||||||
|
{
|
||||||
|
var acls = createAcls(1);
|
||||||
|
int latestIndexedTransactionId = 0;
|
||||||
|
|
||||||
|
// index 50 trx for each shard
|
||||||
|
// Test configuration uses the following ranges: 0-100, 100-200, 200-300 (300-400 in case of a fourth shard which is not the case here)
|
||||||
|
for (int shardIndex = 0; shardIndex < 3; shardIndex++)
|
||||||
|
{
|
||||||
|
for (int nodeNumber = 0; nodeNumber< 50; nodeNumber++)
|
||||||
|
{
|
||||||
|
Transaction trx = getTransaction(0, 1);
|
||||||
|
trx.setId(latestIndexedTransactionId++);
|
||||||
|
var node = getNode(shardIndex*100 + 10 + nodeNumber, trx, acls.get(0), Node.SolrApiNodeStatus.UPDATED);
|
||||||
|
indexTransaction(trx, List.of(node), List.of(getNodeMetaData(node, trx, acls.get(0), "mike", null, false)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForDocCount(new TermQuery(new Term("content@s___t@{http://www.alfresco.org/model/content/1.0}content", "world")), 150, MAX_WAIT_TIME);
|
||||||
|
|
||||||
|
// Check all the shards have indexed the last transaction
|
||||||
|
assertShardCount(0, new TermQuery(new Term("content@s___t@{http://www.alfresco.org/model/content/1.0}content", "world")), 50);
|
||||||
|
assertShardCount(1, new TermQuery(new Term("content@s___t@{http://www.alfresco.org/model/content/1.0}content", "world")), 50);
|
||||||
|
assertShardCount(2, new TermQuery(new Term("content@s___t@{http://www.alfresco.org/model/content/1.0}content", "world")), 50);
|
||||||
|
|
||||||
|
// check the last transaction has been indexed in all the shards
|
||||||
|
|
||||||
|
var latestIndexedTransactionIdAsString = String.valueOf(latestIndexedTransactionId - 1);
|
||||||
|
|
||||||
|
assertShardCount(0, new TermQuery(new Term("S_TXID", latestIndexedTransactionIdAsString)), 1);
|
||||||
|
assertShardCount(1, new TermQuery(new Term("S_TXID", latestIndexedTransactionIdAsString)), 1);
|
||||||
|
assertShardCount(2, new TermQuery(new Term("S_TXID", latestIndexedTransactionIdAsString)), 1);
|
||||||
|
}
|
||||||
|
|
||||||
protected static Properties getShardMethod()
|
protected static Properties getShardMethod()
|
||||||
{
|
{
|
||||||
Properties prop = new Properties();
|
Properties prop = new Properties();
|
||||||
|
@@ -26,9 +26,6 @@
|
|||||||
|
|
||||||
package org.alfresco.solr.tracker;
|
package org.alfresco.solr.tracker;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
|
||||||
import static org.mockito.Mockito.*;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -36,7 +33,6 @@ import java.util.Properties;
|
|||||||
|
|
||||||
import org.alfresco.httpclient.AuthenticationException;
|
import org.alfresco.httpclient.AuthenticationException;
|
||||||
import org.alfresco.repo.index.shard.ShardState;
|
import org.alfresco.repo.index.shard.ShardState;
|
||||||
import org.alfresco.solr.AlfrescoCoreAdminHandler;
|
|
||||||
import org.alfresco.solr.InformationServer;
|
import org.alfresco.solr.InformationServer;
|
||||||
import org.alfresco.solr.NodeReport;
|
import org.alfresco.solr.NodeReport;
|
||||||
import org.alfresco.solr.TrackerState;
|
import org.alfresco.solr.TrackerState;
|
||||||
@@ -56,6 +52,23 @@ import org.mockito.Mock;
|
|||||||
import org.mockito.Spy;
|
import org.mockito.Spy;
|
||||||
import org.mockito.junit.MockitoJUnitRunner;
|
import org.mockito.junit.MockitoJUnitRunner;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.ArgumentMatchers.isNull;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.inOrder;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class MetadataTrackerTest
|
public class MetadataTrackerTest
|
||||||
{
|
{
|
||||||
@@ -75,6 +88,9 @@ public class MetadataTrackerTest
|
|||||||
@Mock
|
@Mock
|
||||||
private TrackerStats trackerStats;
|
private TrackerStats trackerStats;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private TrackerState trackerState;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp()
|
public void setUp()
|
||||||
{
|
{
|
||||||
@@ -84,12 +100,9 @@ public class MetadataTrackerTest
|
|||||||
this.metadataTracker = spy(new MetadataTracker(props, repositoryClient, coreName, srv));
|
this.metadataTracker = spy(new MetadataTracker(props, repositoryClient, coreName, srv));
|
||||||
|
|
||||||
ModelTracker modelTracker = mock(ModelTracker.class);
|
ModelTracker modelTracker = mock(ModelTracker.class);
|
||||||
when(modelTracker.hasModels()).thenReturn(true);
|
|
||||||
AlfrescoCoreAdminHandler adminHandler = mock(AlfrescoCoreAdminHandler.class);
|
|
||||||
TrackerRegistry registry = new TrackerRegistry();
|
TrackerRegistry registry = new TrackerRegistry();
|
||||||
registry.setModelTracker(modelTracker);
|
registry.setModelTracker(modelTracker);
|
||||||
when(adminHandler.getTrackerRegistry()).thenReturn(registry);
|
metadataTracker.state = trackerState;
|
||||||
when(srv.getAdminHandler()).thenReturn(adminHandler);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -188,6 +201,79 @@ public class MetadataTrackerTest
|
|||||||
assertEquals(TX_ID, nodeReport.getDbTx());
|
assertEquals(TX_ID, nodeReport.getDbTx());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void incomingCommitTimeIsLesserThanLastIndexedTxCommitTime_transactionShouldBeMarkedAsIndexed() throws Exception {
|
||||||
|
var incomingTransactionCommitTime = 10L;
|
||||||
|
var lastIndexedTransactionCommitTime = incomingTransactionCommitTime + 1;
|
||||||
|
|
||||||
|
var incomingTransaction = new Transaction();
|
||||||
|
incomingTransaction.setId(1);
|
||||||
|
incomingTransaction.setCommitTimeMs(incomingTransactionCommitTime);
|
||||||
|
|
||||||
|
when(srv.txnInIndex(incomingTransaction.getId(), true)).thenReturn(true);
|
||||||
|
when(trackerState.getLastIndexedTxCommitTime()).thenReturn(lastIndexedTransactionCommitTime);
|
||||||
|
|
||||||
|
assertFalse(metadataTracker.isTransactionToBeIndexed(incomingTransaction));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void incomingCommitTimeIsLesserThanLastIndexedTxCommitTimeButTheTransactionIsNotIndexed_transactionShouldBeMarkedAsToBeIndexed() throws Exception {
|
||||||
|
var incomingTransactionCommitTime = 10L;
|
||||||
|
var lastIndexedTransactionCommitTime = incomingTransactionCommitTime + 1;
|
||||||
|
|
||||||
|
var incomingTransaction = new Transaction();
|
||||||
|
incomingTransaction.setId(1);
|
||||||
|
incomingTransaction.setCommitTimeMs(incomingTransactionCommitTime);
|
||||||
|
|
||||||
|
when(srv.txnInIndex(incomingTransaction.getId(), true)).thenReturn(false);
|
||||||
|
when(trackerState.getLastIndexedTxCommitTime()).thenReturn(lastIndexedTransactionCommitTime);
|
||||||
|
|
||||||
|
assertTrue(metadataTracker.isTransactionToBeIndexed(incomingTransaction));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void incomingCommitTimeIsGreaterThanLastIndexedTxCommitTime_transactionShouldBeMarkedAsToBeIndexed() {
|
||||||
|
var lastIndexedTransactionCommitTime = 10L;
|
||||||
|
var incomingTransactionCommitTime = lastIndexedTransactionCommitTime + 1;
|
||||||
|
|
||||||
|
var incomingTransaction = new Transaction();
|
||||||
|
incomingTransaction.setId(1);
|
||||||
|
incomingTransaction.setCommitTimeMs(incomingTransactionCommitTime);
|
||||||
|
|
||||||
|
when(trackerState.getLastIndexedTxCommitTime()).thenReturn(lastIndexedTransactionCommitTime);
|
||||||
|
|
||||||
|
assertTrue(metadataTracker.isTransactionToBeIndexed(incomingTransaction));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void incomingCommitTimeIsGreaterThanLastIndexedTxCommitTimeButTheTransactionIsAlreadyIndexed_transactionShouldBeMarkedAsToBeIndexed() {
|
||||||
|
var lastIndexedTransactionCommitTime = 10L;
|
||||||
|
var incomingTransactionCommitTime = lastIndexedTransactionCommitTime + 1;
|
||||||
|
|
||||||
|
var incomingTransaction = new Transaction();
|
||||||
|
incomingTransaction.setId(1);
|
||||||
|
incomingTransaction.setCommitTimeMs(incomingTransactionCommitTime);
|
||||||
|
|
||||||
|
when(trackerState.getLastIndexedTxCommitTime()).thenReturn(lastIndexedTransactionCommitTime);
|
||||||
|
|
||||||
|
assertTrue(metadataTracker.isTransactionToBeIndexed(incomingTransaction));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void anIOExceptionIsRaised_transactionShouldBeMarkedAsToBeIndexed() throws Exception {
|
||||||
|
var incomingTransactionCommitTime = 10L;
|
||||||
|
var lastIndexedTransactionCommitTime = incomingTransactionCommitTime + 1;
|
||||||
|
|
||||||
|
var incomingTransaction = new Transaction();
|
||||||
|
incomingTransaction.setId(1);
|
||||||
|
incomingTransaction.setCommitTimeMs(incomingTransactionCommitTime);
|
||||||
|
|
||||||
|
when(srv.txnInIndex(incomingTransaction.getId(), true)).thenThrow(new IOException());
|
||||||
|
when(trackerState.getLastIndexedTxCommitTime()).thenReturn(lastIndexedTransactionCommitTime);
|
||||||
|
|
||||||
|
assertTrue(metadataTracker.isTransactionToBeIndexed(incomingTransaction));
|
||||||
|
}
|
||||||
|
|
||||||
private Node getNode()
|
private Node getNode()
|
||||||
{
|
{
|
||||||
Node node = new Node();
|
Node node = new Node();
|
||||||
@@ -195,16 +281,4 @@ public class MetadataTrackerTest
|
|||||||
node.setTxnId(TX_ID);
|
node.setTxnId(TX_ID);
|
||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
@Ignore("Superseded by AlfrescoSolrTrackerTest")
|
|
||||||
public void testGetFullNodesForDbTransaction() throws AuthenticationException, IOException, JSONException
|
|
||||||
{
|
|
||||||
List<Node> nodes = getNodes();
|
|
||||||
when(repositoryClient.getNodes(any(GetNodesParameters.class), anyInt())).thenReturn(nodes);
|
|
||||||
|
|
||||||
List<Node> nodes4Tx = this.metadataTracker.getFullNodesForDbTransaction(TX_ID);
|
|
||||||
|
|
||||||
assertSame(nodes4Tx, nodes);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@@ -36,15 +36,18 @@ import java.net.ConnectException;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.alfresco.httpclient.Response;
|
import org.alfresco.httpclient.Response;
|
||||||
import org.alfresco.repo.dictionary.NamespaceDAO;
|
import org.alfresco.repo.dictionary.NamespaceDAO;
|
||||||
import org.alfresco.repo.index.shard.ShardState;
|
import org.alfresco.repo.index.shard.ShardState;
|
||||||
import org.alfresco.service.namespace.QName;
|
import org.alfresco.service.namespace.QName;
|
||||||
|
import org.alfresco.util.Pair;
|
||||||
import org.apache.http.HttpStatus;
|
import org.apache.http.HttpStatus;
|
||||||
import org.json.JSONException;
|
import org.json.JSONException;
|
||||||
|
|
||||||
@@ -161,6 +164,30 @@ public class SOLRAPIQueueClient extends SOLRAPIClient
|
|||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Long getNextTxCommitTime(String coreName, Long fromCommitTime) throws NoSuchMethodException
|
||||||
|
{
|
||||||
|
throw new NoSuchMethodException();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Pair<Long, Long> getTxIntervalCommitTime(String coreName, Long fromNodeId, Long toNodeId)
|
||||||
|
{
|
||||||
|
List<Long> transactionCommitTimestamps = TRANSACTION_QUEUE.stream()
|
||||||
|
.filter(txn -> NODE_MAP.get(txn.getId())
|
||||||
|
.stream()
|
||||||
|
.anyMatch(node -> node.getId() >= fromNodeId && node.getId() <= toNodeId))
|
||||||
|
.map(tx -> tx.getCommitTimeMs())
|
||||||
|
.sorted()
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
if (transactionCommitTimestamps.size() > 0)
|
||||||
|
{
|
||||||
|
return new Pair<>( transactionCommitTimestamps.get(0), transactionCommitTimestamps.get(transactionCommitTimestamps.size() - 1));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return new Pair<>(-1l, -1l);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Transactions getTransactions(Long fromCommitTime, Long minTxnId, Long toCommitTime, Long maxTxnId, int maxResults) throws IOException, JSONException
|
public Transactions getTransactions(Long fromCommitTime, Long minTxnId, Long toCommitTime, Long maxTxnId, int maxResults) throws IOException, JSONException
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user