Merge pull request #991 from Alfresco/fix/SEARCH-2415_SlowIndexingPerformance

Fix/search 2415 slow indexing performance
This commit is contained in:
Angel Borroy
2020-11-03 10:17:28 +01:00
committed by GitHub
4 changed files with 45 additions and 47 deletions

View File

@@ -27,8 +27,6 @@ package org.alfresco.solr.tracker;
import org.alfresco.solr.InformationServer; import org.alfresco.solr.InformationServer;
import org.alfresco.solr.client.SOLRAPIClient; import org.alfresco.solr.client.SOLRAPIClient;
import org.apache.solr.core.CoreDescriptorDecorator;
import org.apache.solr.core.SolrCore;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@@ -53,7 +53,8 @@ public class ContentTracker extends ActivatableTracker
{ {
protected final static Logger LOGGER = LoggerFactory.getLogger(ContentTracker.class); protected final static Logger LOGGER = LoggerFactory.getLogger(ContentTracker.class);
private static final int DEFAULT_CONTENT_TRACKER_MAX_PARALLELISM = 32; // Keep this value to 1/4 of all the other pools, as ContentTracker Threads are heavier
private static final int DEFAULT_CONTENT_TRACKER_MAX_PARALLELISM = 8;
private int contentTrackerParallelism; private int contentTrackerParallelism;
private int contentUpdateBatchSize; private int contentUpdateBatchSize;

View File

@@ -948,30 +948,49 @@ public class MetadataTracker extends ActivatableTracker
break; break;
} }
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counterTransaction = new AtomicInteger();
Collection<List<Transaction>> txBatches = transactions.getTransactions().stream() Collection<List<Transaction>> txBatches = transactions.getTransactions().stream()
.peek(txnsFound::add) .peek(txnsFound::add)
.filter(this::isTransactionIndexed) .filter(this::isTransactionIndexed)
.collect(Collectors.groupingBy(transaction -> counter.getAndAdd( .collect(Collectors.groupingBy(transaction -> counterTransaction.getAndAdd(
(int) (transaction.getDeletes() + transaction.getUpdates())) / transactionDocsBatchSize)) (int) (transaction.getDeletes() + transaction.getUpdates())) / transactionDocsBatchSize))
.values(); .values();
// Index batches of transactions and the nodes updated or deleted within the transaction // Index batches of transactions and the nodes updated or deleted within the transaction
List<List<Node>> nodeBatches = new ArrayList<>();
for (List<Transaction> batch : txBatches) for (List<Transaction> batch : txBatches)
{ {
// Index nodes contained in the transactions // Index nodes contained in the transactions
long idTxBatch = System.currentTimeMillis(); long idTxBatch = System.currentTimeMillis();
int docCount = indexBatchOfTransactions(batch, idTrackerCycle, idTxBatch); nodeBatches.addAll(buildBatchOfTransactions(batch, idTrackerCycle, idTxBatch));
totalUpdatedDocs += docCount; }
// Counter used to identify the worker inside the parallel stream processing
final AtomicInteger counterBatch = new AtomicInteger(0);
long idThread = Thread.currentThread().getId();
totalUpdatedDocs += forkJoinPool.submit(() ->
nodeBatches.parallelStream().map(batch -> {
int count = counterBatch.addAndGet(1);
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("{}:{}:{}-[CORE {}] indexing {} nodes ...",
idThread, idTrackerCycle, count,
coreName, batch.size());
}
new NodeIndexWorker(batch, infoSrv, idThread, idTrackerCycle, count).run();
return batch.size();
}).reduce(0, Integer::sum)).get();
for (List<Transaction> batch : txBatches)
{
// Add the transactions as found to avoid processing them again in the next iteration // Add the transactions as found to avoid processing them again in the next iteration
batch.forEach(txnsFound::add); batch.forEach(txnsFound::add);
// Index the transactions // Index the transactions
indexTransactionsAfterWorker(batch, idTrackerCycle, idTxBatch); indexTransactionsAfterWorker(batch);
long endElapsed = System.nanoTime(); long endElapsed = System.nanoTime();
trackerStats.addElapsedNodeTime(docCount, endElapsed - startElapsed); trackerStats.addElapsedNodeTime(totalUpdatedDocs, endElapsed - startElapsed);
startElapsed = endElapsed; startElapsed = endElapsed;
} }
@@ -1014,11 +1033,9 @@ public class MetadataTracker extends ActivatableTracker
/** /**
* Index transactions and update state of the tracker * Index transactions and update state of the tracker
* @param txsIndexed List of transactions to be indexed * @param txsIndexed List of transactions to be indexed
* @param idTrackerCycle Id of the Tracker Cycle being executed
* @param idTxBatch Id of the Transaction Batch being executed
* @throws IOException * @throws IOException
*/ */
private void indexTransactionsAfterWorker(List<Transaction> txsIndexed, long idTrackerCycle, long idTxBatch) private void indexTransactionsAfterWorker(List<Transaction> txsIndexed)
throws IOException throws IOException
{ {
for (Transaction tx : txsIndexed) for (Transaction tx : txsIndexed)
@@ -1039,7 +1056,7 @@ public class MetadataTracker extends ActivatableTracker
/** /**
* Index a batch of transactions. * Build a batch of transactions.
* *
* Updated or deleted nodes from these transactions are also packed into batches in order to get * Updated or deleted nodes from these transactions are also packed into batches in order to get
* the metadata of the nodes in smaller invocations to Repository * the metadata of the nodes in smaller invocations to Repository
@@ -1047,13 +1064,10 @@ public class MetadataTracker extends ActivatableTracker
* @param txBatch Batch of transactions to be indexed * @param txBatch Batch of transactions to be indexed
* @param idTrackerCycle Id of the Tracker Cycle being executed * @param idTrackerCycle Id of the Tracker Cycle being executed
* @param idTxBatch Id of the Transaction Batch being executed * @param idTxBatch Id of the Transaction Batch being executed
* @return Number of nodes indexed and last node indexed * @return List of Nodes to be indexed splitted by nodeBatchSize count
* *
* @throws AuthenticationException
* @throws IOException
* @throws JSONException
*/ */
private int indexBatchOfTransactions(List<Transaction> txBatch, long idTrackerCycle, long idTxBatch) private List<List<Node>> buildBatchOfTransactions(List<Transaction> txBatch, long idTrackerCycle, long idTxBatch)
throws AuthenticationException, IOException, JSONException, ExecutionException, InterruptedException throws AuthenticationException, IOException, JSONException, ExecutionException, InterruptedException
{ {
@@ -1070,7 +1084,7 @@ public class MetadataTracker extends ActivatableTracker
// Skip getting nodes when no transactions left // Skip getting nodes when no transactions left
if (txIds.size() == 0) if (txIds.size() == 0)
{ {
return 0; return Collections.emptyList();
} }
// Get Nodes Id properties for every transaction // Get Nodes Id properties for every transaction
@@ -1092,23 +1106,8 @@ public class MetadataTracker extends ActivatableTracker
} }
// Group the nodes in batches of nodeBatchSize (or less) // Group the nodes in batches of nodeBatchSize (or less)
List<List<Node>> nodeBatches = Lists.partition(nodes, nodeBatchSize); return Lists.partition(nodes, nodeBatchSize);
// Counter used to identify the worker inside the parallel stream processing
AtomicInteger counter = new AtomicInteger(0);
long idThread = Thread.currentThread().getId();
return forkJoinPool.submit(() ->
nodeBatches.parallelStream().map(batch -> {
int count = counter.addAndGet(1);
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("{}:{}:{}:{}-[CORE {}] indexing {} nodes ...",
idThread, idTrackerCycle, idTxBatch, count,
coreName, batch.size());
}
new NodeIndexWorker(batch, infoSrv, idThread, idTrackerCycle, idTxBatch, count).run();
return batch.size();
}).reduce(0, Integer::sum)).get();
} }
@@ -1119,22 +1118,20 @@ public class MetadataTracker extends ActivatableTracker
{ {
InformationServer infoServer; InformationServer infoServer;
List<Node> nodes; List<Node> nodes;
// Unique Id for the worker > thread : trackerCycle : txBatch : worker // Unique Id for the worker > thread : trackerCycle : worker
long idThread; long idThread;
long idTrackerCycle; long idTrackerCycle;
long idTxBatch;
int idWorker; int idWorker;
// Link logger messages to parent Class MetadataTracker // Link logger messages to parent Class MetadataTracker
protected Logger LOGGER = LoggerFactory.getLogger(MetadataTracker.class); protected Logger LOGGER = LoggerFactory.getLogger(MetadataTracker.class);
NodeIndexWorker(List<Node> nodes, InformationServer infoServer, long idThread, long idTrackerCycle, long idTxBatch, int idWorker) NodeIndexWorker(List<Node> nodes, InformationServer infoServer, long idThread, long idTrackerCycle, int idWorker)
{ {
this.infoServer = infoServer; this.infoServer = infoServer;
this.nodes = nodes; this.nodes = nodes;
this.idThread = idThread; this.idThread = idThread;
this.idTrackerCycle = idTrackerCycle; this.idTrackerCycle = idTrackerCycle;
this.idTxBatch = idTxBatch;
this.idWorker = idWorker; this.idWorker = idWorker;
} }
@@ -1148,8 +1145,8 @@ public class MetadataTracker extends ActivatableTracker
} }
if (LOGGER.isTraceEnabled()) if (LOGGER.isTraceEnabled())
{ {
LOGGER.trace("{}:{}:{}:{}-[CORE {}] ...indexed", LOGGER.trace("{}:{}:{}-[CORE {}] ...indexed",
idThread, idTrackerCycle, idTxBatch, idWorker, coreName); idThread, idTrackerCycle, idWorker, coreName);
} }
} }

View File

@@ -139,10 +139,12 @@ alfresco.contentUpdateBatchSize=1000
alfresco.cascadeNodeBatchSize=10 alfresco.cascadeNodeBatchSize=10
# Trackers thread pools # Trackers thread pools
#alfresco.metadataTrackerMaxParallelism= # Keep Content Tracker max threads to 1/4 of other values,
#alfresco.aclTrackerMaxParallelism= # as this threads are heavier than the other ones.
#alfresco.contentTrackerMaxParallelism= #alfresco.metadata.tracker.maxParallelism=32
#alfresco.cascadeTrackerMaxParallelism #alfresco.acl.tracker.maxParallelism=32
#alfresco.content.tracker.maxParallelism=8
#alfresco.cascade.tracker.maxParallelism=32
# Warming # Warming