diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/MetadataTracker.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/MetadataTracker.java index 718414413..2df15c416 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/MetadataTracker.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/tracker/MetadataTracker.java @@ -849,8 +849,11 @@ public class MetadataTracker extends ActivatableTracker infoSrv.txnInIndex(transaction.getId(), true)); if (LOGGER.isTraceEnabled()) { - LOGGER.trace("{}-[CORE {}] Skipping Transaction Id {} as it was already indexed", - Thread.currentThread().getId(), coreName, transaction.getId()); + if (isInIndex) + { + LOGGER.trace("{}-[CORE {}] Skipping Transaction Id {} as it was already indexed", + Thread.currentThread().getId(), coreName, transaction.getId()); + } } return !isInIndex; } @@ -914,11 +917,12 @@ public class MetadataTracker extends ActivatableTracker state.getTimeToStopIndexing()); } - + long idTrackerCycle = System.currentTimeMillis(); if (transactions.getTransactions().size() > 0) { - LOGGER.info("{}-[CORE {}] Found {} transactions after lastTxCommitTime {}, transactions from {} to {}", + LOGGER.info("{}:{}-[CORE {}] Found {} transactions after lastTxCommitTime {}, transactions from {} to {}", Thread.currentThread().getId(), + idTrackerCycle, coreName, transactions.getTransactions().size(), fromCommitTime, @@ -927,8 +931,9 @@ public class MetadataTracker extends ActivatableTracker } else { - LOGGER.info("{}-[CORE {}] No transaction found after lastTxCommitTime {}", + LOGGER.info("{}:{}-[CORE {}] No transaction found after lastTxCommitTime {}", Thread.currentThread().getId(), + idTrackerCycle, coreName, ((txnsFound.size() > 0) ? txnsFound.getLast().getCommitTimeMs() : state.getLastIndexedTxCommitTime())); @@ -943,18 +948,10 @@ public class MetadataTracker extends ActivatableTracker break; } - long transaction_number = transactions.getTransactions().size(); final AtomicInteger counter = new AtomicInteger(); Collection> txBatches = transactions.getTransactions().stream() .peek(txnsFound::add) .filter(this::isTransactionIndexed) - .peek(transaction -> { - if (LOGGER.isTraceEnabled()) - { - LOGGER.trace("{}-[CORE {}] Tracking {} Transactions. Current Transaction Id to be indexed: {}", - Thread.currentThread().getId(), coreName, transaction_number, transaction.getId()); - } - }) .collect(Collectors.groupingBy(transaction -> counter.getAndAdd( (int) (transaction.getDeletes() + transaction.getUpdates())) / transactionDocsBatchSize)) .values(); @@ -964,14 +961,15 @@ public class MetadataTracker extends ActivatableTracker { // Index nodes contained in the transactions - int docCount = indexBatchOfTransactions(batch); + long idTxBatch = System.currentTimeMillis(); + int docCount = indexBatchOfTransactions(batch, idTrackerCycle, idTxBatch); totalUpdatedDocs += docCount; // Add the transactions as found to avoid processing them again in the next iteration batch.forEach(txnsFound::add); // Index the transactions - indexTransactionsAfterWorker(batch); + indexTransactionsAfterWorker(batch, idTrackerCycle, idTxBatch); long endElapsed = System.nanoTime(); trackerStats.addElapsedNodeTime(docCount, endElapsed - startElapsed); startElapsed = endElapsed; @@ -1016,9 +1014,11 @@ public class MetadataTracker extends ActivatableTracker /** * Index transactions and update state of the tracker * @param txsIndexed List of transactions to be indexed + * @param idTrackerCycle Id of the Tracker Cycle being executed + * @param idTxBatch Id of the Transaction Batch being executed * @throws IOException */ - private void indexTransactionsAfterWorker(List txsIndexed) + private void indexTransactionsAfterWorker(List txsIndexed, long idTrackerCycle, long idTxBatch) throws IOException { for (Transaction tx : txsIndexed) @@ -1045,13 +1045,15 @@ public class MetadataTracker extends ActivatableTracker * the metadata of the nodes in smaller invocations to Repository * * @param txBatch Batch of transactions to be indexed + * @param idTrackerCycle Id of the Tracker Cycle being executed + * @param idTxBatch Id of the Transaction Batch being executed * @return Number of nodes indexed and last node indexed * * @throws AuthenticationException * @throws IOException * @throws JSONException */ - private int indexBatchOfTransactions(List txBatch) + private int indexBatchOfTransactions(List txBatch, long idTrackerCycle, long idTxBatch) throws AuthenticationException, IOException, JSONException, ExecutionException, InterruptedException { @@ -1084,16 +1086,27 @@ public class MetadataTracker extends ActivatableTracker if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{}-[CORE {}] Found {} Nodes to be indexed from Transactions: {}", - Thread.currentThread().getId(), coreName, nodes.size(), txIds); + LOGGER.debug("{}:{}:{}-[CORE {}] Indexing {} Nodes from Transactions: {}", + Thread.currentThread().getId(), idTrackerCycle, idTxBatch, + coreName, nodes.size(), txIds); } // Group the nodes in batches of nodeBatchSize (or less) List> nodeBatches = Lists.partition(nodes, nodeBatchSize); + // Counter used to identify the worker inside the parallel stream processing + AtomicInteger counter = new AtomicInteger(0); + long idThread = Thread.currentThread().getId(); return forkJoinPool.submit(() -> nodeBatches.parallelStream().map(batch -> { - new NodeIndexWorker(batch, infoSrv).run(); + int count = counter.addAndGet(1); + if (LOGGER.isTraceEnabled()) + { + LOGGER.trace("{}:{}:{}:{}-[CORE {}] indexing {} nodes ...", + idThread, idTrackerCycle, idTxBatch, count, + coreName, batch.size()); + } + new NodeIndexWorker(batch, infoSrv, idThread, idTrackerCycle, idTxBatch, count).run(); return batch.size(); }).reduce(0, Integer::sum)).get(); } @@ -1106,11 +1119,23 @@ public class MetadataTracker extends ActivatableTracker { InformationServer infoServer; List nodes; + // Unique Id for the worker > thread : trackerCycle : txBatch : worker + long idThread; + long idTrackerCycle; + long idTxBatch; + int idWorker; + + // Link logger messages to parent Class MetadataTracker + protected Logger LOGGER = LoggerFactory.getLogger(MetadataTracker.class); - NodeIndexWorker(List nodes, InformationServer infoServer) + NodeIndexWorker(List nodes, InformationServer infoServer, long idThread, long idTrackerCycle, long idTxBatch, int idWorker) { this.infoServer = infoServer; this.nodes = nodes; + this.idThread = idThread; + this.idTrackerCycle = idTrackerCycle; + this.idTxBatch = idTxBatch; + this.idWorker = idWorker; } @Override @@ -1121,12 +1146,17 @@ public class MetadataTracker extends ActivatableTracker { this.infoServer.indexNodes(filteredNodes, true); } + if (LOGGER.isTraceEnabled()) + { + LOGGER.trace("{}:{}:{}:{}-[CORE {}] ...indexed", + idThread, idTrackerCycle, idTxBatch, idWorker, coreName); + } } @Override protected void onFail(Throwable failCausedBy) { - setRollback(true, failCausedBy); + setRollback(true, failCausedBy); } private List filterNodes(List nodes)