diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/ActivatableTracker.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/ActivatableTracker.java index cd6e3d3d8..dad53b3fc 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/ActivatableTracker.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/ActivatableTracker.java @@ -27,8 +27,6 @@ package org.alfresco.solr.tracker; import org.alfresco.solr.InformationServer; import org.alfresco.solr.client.SOLRAPIClient; -import org.apache.solr.core.CoreDescriptorDecorator; -import org.apache.solr.core.SolrCore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/ContentTracker.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/ContentTracker.java index 8f77105e7..d6f2de2df 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/ContentTracker.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/ContentTracker.java @@ -53,7 +53,8 @@ public class ContentTracker extends ActivatableTracker { protected final static Logger LOGGER = LoggerFactory.getLogger(ContentTracker.class); - private static final int DEFAULT_CONTENT_TRACKER_MAX_PARALLELISM = 32; + // Keep this value to 1/4 of all the other pools, as ContentTracker Threads are heavier + private static final int DEFAULT_CONTENT_TRACKER_MAX_PARALLELISM = 8; private int contentTrackerParallelism; private int contentUpdateBatchSize; diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/MetadataTracker.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/MetadataTracker.java index 2df15c416..e65eb7f41 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/MetadataTracker.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/MetadataTracker.java @@ -948,33 +948,52 @@ public class MetadataTracker extends ActivatableTracker break; } - final AtomicInteger counter = new AtomicInteger(); + final AtomicInteger counterTransaction = new AtomicInteger(); Collection> txBatches = transactions.getTransactions().stream() .peek(txnsFound::add) .filter(this::isTransactionIndexed) - .collect(Collectors.groupingBy(transaction -> counter.getAndAdd( + .collect(Collectors.groupingBy(transaction -> counterTransaction.getAndAdd( (int) (transaction.getDeletes() + transaction.getUpdates())) / transactionDocsBatchSize)) .values(); // Index batches of transactions and the nodes updated or deleted within the transaction + List> nodeBatches = new ArrayList<>(); for (List batch : txBatches) { // Index nodes contained in the transactions long idTxBatch = System.currentTimeMillis(); - int docCount = indexBatchOfTransactions(batch, idTrackerCycle, idTxBatch); - totalUpdatedDocs += docCount; + nodeBatches.addAll(buildBatchOfTransactions(batch, idTrackerCycle, idTxBatch)); + } + + // Counter used to identify the worker inside the parallel stream processing + final AtomicInteger counterBatch = new AtomicInteger(0); + long idThread = Thread.currentThread().getId(); + totalUpdatedDocs += forkJoinPool.submit(() -> + nodeBatches.parallelStream().map(batch -> { + int count = counterBatch.addAndGet(1); + if (LOGGER.isTraceEnabled()) + { + LOGGER.trace("{}:{}:{}-[CORE {}] indexing {} nodes ...", + idThread, idTrackerCycle, count, + coreName, batch.size()); + } + new NodeIndexWorker(batch, infoSrv, idThread, idTrackerCycle, count).run(); + return batch.size(); + }).reduce(0, Integer::sum)).get(); + for (List batch : txBatches) + { // Add the transactions as found to avoid processing them again in the next iteration batch.forEach(txnsFound::add); - + // Index the transactions - indexTransactionsAfterWorker(batch, idTrackerCycle, idTxBatch); + indexTransactionsAfterWorker(batch); long endElapsed = System.nanoTime(); - trackerStats.addElapsedNodeTime(docCount, endElapsed - startElapsed); + trackerStats.addElapsedNodeTime(totalUpdatedDocs, endElapsed - startElapsed); startElapsed = endElapsed; } - + setLastTxCommitTimeAndTxIdInTrackerState(transactions); } catch(Exception e) @@ -1014,11 +1033,9 @@ public class MetadataTracker extends ActivatableTracker /** * Index transactions and update state of the tracker * @param txsIndexed List of transactions to be indexed - * @param idTrackerCycle Id of the Tracker Cycle being executed - * @param idTxBatch Id of the Transaction Batch being executed * @throws IOException */ - private void indexTransactionsAfterWorker(List txsIndexed, long idTrackerCycle, long idTxBatch) + private void indexTransactionsAfterWorker(List txsIndexed) throws IOException { for (Transaction tx : txsIndexed) @@ -1039,7 +1056,7 @@ public class MetadataTracker extends ActivatableTracker /** - * Index a batch of transactions. + * Build a batch of transactions. * * Updated or deleted nodes from these transactions are also packed into batches in order to get * the metadata of the nodes in smaller invocations to Repository @@ -1047,13 +1064,10 @@ public class MetadataTracker extends ActivatableTracker * @param txBatch Batch of transactions to be indexed * @param idTrackerCycle Id of the Tracker Cycle being executed * @param idTxBatch Id of the Transaction Batch being executed - * @return Number of nodes indexed and last node indexed + * @return List of Nodes to be indexed splitted by nodeBatchSize count * - * @throws AuthenticationException - * @throws IOException - * @throws JSONException */ - private int indexBatchOfTransactions(List txBatch, long idTrackerCycle, long idTxBatch) + private List> buildBatchOfTransactions(List txBatch, long idTrackerCycle, long idTxBatch) throws AuthenticationException, IOException, JSONException, ExecutionException, InterruptedException { @@ -1070,7 +1084,7 @@ public class MetadataTracker extends ActivatableTracker // Skip getting nodes when no transactions left if (txIds.size() == 0) { - return 0; + return Collections.emptyList(); } // Get Nodes Id properties for every transaction @@ -1092,23 +1106,8 @@ public class MetadataTracker extends ActivatableTracker } // Group the nodes in batches of nodeBatchSize (or less) - List> nodeBatches = Lists.partition(nodes, nodeBatchSize); + return Lists.partition(nodes, nodeBatchSize); - // Counter used to identify the worker inside the parallel stream processing - AtomicInteger counter = new AtomicInteger(0); - long idThread = Thread.currentThread().getId(); - return forkJoinPool.submit(() -> - nodeBatches.parallelStream().map(batch -> { - int count = counter.addAndGet(1); - if (LOGGER.isTraceEnabled()) - { - LOGGER.trace("{}:{}:{}:{}-[CORE {}] indexing {} nodes ...", - idThread, idTrackerCycle, idTxBatch, count, - coreName, batch.size()); - } - new NodeIndexWorker(batch, infoSrv, idThread, idTrackerCycle, idTxBatch, count).run(); - return batch.size(); - }).reduce(0, Integer::sum)).get(); } @@ -1119,22 +1118,20 @@ public class MetadataTracker extends ActivatableTracker { InformationServer infoServer; List nodes; - // Unique Id for the worker > thread : trackerCycle : txBatch : worker + // Unique Id for the worker > thread : trackerCycle : worker long idThread; long idTrackerCycle; - long idTxBatch; int idWorker; // Link logger messages to parent Class MetadataTracker protected Logger LOGGER = LoggerFactory.getLogger(MetadataTracker.class); - NodeIndexWorker(List nodes, InformationServer infoServer, long idThread, long idTrackerCycle, long idTxBatch, int idWorker) + NodeIndexWorker(List nodes, InformationServer infoServer, long idThread, long idTrackerCycle, int idWorker) { this.infoServer = infoServer; this.nodes = nodes; this.idThread = idThread; this.idTrackerCycle = idTrackerCycle; - this.idTxBatch = idTxBatch; this.idWorker = idWorker; } @@ -1148,8 +1145,8 @@ public class MetadataTracker extends ActivatableTracker } if (LOGGER.isTraceEnabled()) { - LOGGER.trace("{}:{}:{}:{}-[CORE {}] ...indexed", - idThread, idTrackerCycle, idTxBatch, idWorker, coreName); + LOGGER.trace("{}:{}:{}-[CORE {}] ...indexed", + idThread, idTrackerCycle, idWorker, coreName); } } diff --git a/search-services/alfresco-search/src/main/resources/solr/instance/templates/rerank/conf/solrcore.properties b/search-services/alfresco-search/src/main/resources/solr/instance/templates/rerank/conf/solrcore.properties index 36a308911..d9c844da0 100644 --- a/search-services/alfresco-search/src/main/resources/solr/instance/templates/rerank/conf/solrcore.properties +++ b/search-services/alfresco-search/src/main/resources/solr/instance/templates/rerank/conf/solrcore.properties @@ -139,10 +139,12 @@ alfresco.contentUpdateBatchSize=1000 alfresco.cascadeNodeBatchSize=10 # Trackers thread pools -#alfresco.metadataTrackerMaxParallelism= -#alfresco.aclTrackerMaxParallelism= -#alfresco.contentTrackerMaxParallelism= -#alfresco.cascadeTrackerMaxParallelism +# Keep Content Tracker max threads to 1/4 of other values, +# as this threads are heavier than the other ones. +#alfresco.metadata.tracker.maxParallelism=32 +#alfresco.acl.tracker.maxParallelism=32 +#alfresco.content.tracker.maxParallelism=8 +#alfresco.cascade.tracker.maxParallelism=32 # Warming