Merge pull request #921 from Alfresco/fix/SEARCH-2375_MetadataTrackerLogs

SEARCH-2375: Added TRACE messages to report indexing progress from wo…
This commit is contained in:
Angel Borroy
2020-09-03 10:58:24 +02:00
committed by GitHub

View File

@@ -849,8 +849,11 @@ public class MetadataTracker extends ActivatableTracker
infoSrv.txnInIndex(transaction.getId(), true)); infoSrv.txnInIndex(transaction.getId(), true));
if (LOGGER.isTraceEnabled()) if (LOGGER.isTraceEnabled())
{ {
LOGGER.trace("{}-[CORE {}] Skipping Transaction Id {} as it was already indexed", if (isInIndex)
Thread.currentThread().getId(), coreName, transaction.getId()); {
LOGGER.trace("{}-[CORE {}] Skipping Transaction Id {} as it was already indexed",
Thread.currentThread().getId(), coreName, transaction.getId());
}
} }
return !isInIndex; return !isInIndex;
} }
@@ -914,11 +917,12 @@ public class MetadataTracker extends ActivatableTracker
state.getTimeToStopIndexing()); state.getTimeToStopIndexing());
} }
long idTrackerCycle = System.currentTimeMillis();
if (transactions.getTransactions().size() > 0) if (transactions.getTransactions().size() > 0)
{ {
LOGGER.info("{}-[CORE {}] Found {} transactions after lastTxCommitTime {}, transactions from {} to {}", LOGGER.info("{}:{}-[CORE {}] Found {} transactions after lastTxCommitTime {}, transactions from {} to {}",
Thread.currentThread().getId(), Thread.currentThread().getId(),
idTrackerCycle,
coreName, coreName,
transactions.getTransactions().size(), transactions.getTransactions().size(),
fromCommitTime, fromCommitTime,
@@ -927,8 +931,9 @@ public class MetadataTracker extends ActivatableTracker
} }
else else
{ {
LOGGER.info("{}-[CORE {}] No transaction found after lastTxCommitTime {}", LOGGER.info("{}:{}-[CORE {}] No transaction found after lastTxCommitTime {}",
Thread.currentThread().getId(), Thread.currentThread().getId(),
idTrackerCycle,
coreName, coreName,
((txnsFound.size() > 0) ? txnsFound.getLast().getCommitTimeMs() ((txnsFound.size() > 0) ? txnsFound.getLast().getCommitTimeMs()
: state.getLastIndexedTxCommitTime())); : state.getLastIndexedTxCommitTime()));
@@ -943,18 +948,10 @@ public class MetadataTracker extends ActivatableTracker
break; break;
} }
long transaction_number = transactions.getTransactions().size();
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
Collection<List<Transaction>> txBatches = transactions.getTransactions().stream() Collection<List<Transaction>> txBatches = transactions.getTransactions().stream()
.peek(txnsFound::add) .peek(txnsFound::add)
.filter(this::isTransactionIndexed) .filter(this::isTransactionIndexed)
.peek(transaction -> {
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("{}-[CORE {}] Tracking {} Transactions. Current Transaction Id to be indexed: {}",
Thread.currentThread().getId(), coreName, transaction_number, transaction.getId());
}
})
.collect(Collectors.groupingBy(transaction -> counter.getAndAdd( .collect(Collectors.groupingBy(transaction -> counter.getAndAdd(
(int) (transaction.getDeletes() + transaction.getUpdates())) / transactionDocsBatchSize)) (int) (transaction.getDeletes() + transaction.getUpdates())) / transactionDocsBatchSize))
.values(); .values();
@@ -964,14 +961,15 @@ public class MetadataTracker extends ActivatableTracker
{ {
// Index nodes contained in the transactions // Index nodes contained in the transactions
int docCount = indexBatchOfTransactions(batch); long idTxBatch = System.currentTimeMillis();
int docCount = indexBatchOfTransactions(batch, idTrackerCycle, idTxBatch);
totalUpdatedDocs += docCount; totalUpdatedDocs += docCount;
// Add the transactions as found to avoid processing them again in the next iteration // Add the transactions as found to avoid processing them again in the next iteration
batch.forEach(txnsFound::add); batch.forEach(txnsFound::add);
// Index the transactions // Index the transactions
indexTransactionsAfterWorker(batch); indexTransactionsAfterWorker(batch, idTrackerCycle, idTxBatch);
long endElapsed = System.nanoTime(); long endElapsed = System.nanoTime();
trackerStats.addElapsedNodeTime(docCount, endElapsed - startElapsed); trackerStats.addElapsedNodeTime(docCount, endElapsed - startElapsed);
startElapsed = endElapsed; startElapsed = endElapsed;
@@ -1016,9 +1014,11 @@ public class MetadataTracker extends ActivatableTracker
/** /**
* Index transactions and update state of the tracker * Index transactions and update state of the tracker
* @param txsIndexed List of transactions to be indexed * @param txsIndexed List of transactions to be indexed
* @param idTrackerCycle Id of the Tracker Cycle being executed
* @param idTxBatch Id of the Transaction Batch being executed
* @throws IOException * @throws IOException
*/ */
private void indexTransactionsAfterWorker(List<Transaction> txsIndexed) private void indexTransactionsAfterWorker(List<Transaction> txsIndexed, long idTrackerCycle, long idTxBatch)
throws IOException throws IOException
{ {
for (Transaction tx : txsIndexed) for (Transaction tx : txsIndexed)
@@ -1045,13 +1045,15 @@ public class MetadataTracker extends ActivatableTracker
* the metadata of the nodes in smaller invocations to Repository * the metadata of the nodes in smaller invocations to Repository
* *
* @param txBatch Batch of transactions to be indexed * @param txBatch Batch of transactions to be indexed
* @param idTrackerCycle Id of the Tracker Cycle being executed
* @param idTxBatch Id of the Transaction Batch being executed
* @return Number of nodes indexed and last node indexed * @return Number of nodes indexed and last node indexed
* *
* @throws AuthenticationException * @throws AuthenticationException
* @throws IOException * @throws IOException
* @throws JSONException * @throws JSONException
*/ */
private int indexBatchOfTransactions(List<Transaction> txBatch) private int indexBatchOfTransactions(List<Transaction> txBatch, long idTrackerCycle, long idTxBatch)
throws AuthenticationException, IOException, JSONException, ExecutionException, InterruptedException throws AuthenticationException, IOException, JSONException, ExecutionException, InterruptedException
{ {
@@ -1084,16 +1086,27 @@ public class MetadataTracker extends ActivatableTracker
if (LOGGER.isDebugEnabled()) if (LOGGER.isDebugEnabled())
{ {
LOGGER.debug("{}-[CORE {}] Found {} Nodes to be indexed from Transactions: {}", LOGGER.debug("{}:{}:{}-[CORE {}] Indexing {} Nodes from Transactions: {}",
Thread.currentThread().getId(), coreName, nodes.size(), txIds); Thread.currentThread().getId(), idTrackerCycle, idTxBatch,
coreName, nodes.size(), txIds);
} }
// Group the nodes in batches of nodeBatchSize (or less) // Group the nodes in batches of nodeBatchSize (or less)
List<List<Node>> nodeBatches = Lists.partition(nodes, nodeBatchSize); List<List<Node>> nodeBatches = Lists.partition(nodes, nodeBatchSize);
// Counter used to identify the worker inside the parallel stream processing
AtomicInteger counter = new AtomicInteger(0);
long idThread = Thread.currentThread().getId();
return forkJoinPool.submit(() -> return forkJoinPool.submit(() ->
nodeBatches.parallelStream().map(batch -> { nodeBatches.parallelStream().map(batch -> {
new NodeIndexWorker(batch, infoSrv).run(); int count = counter.addAndGet(1);
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("{}:{}:{}:{}-[CORE {}] indexing {} nodes ...",
idThread, idTrackerCycle, idTxBatch, count,
coreName, batch.size());
}
new NodeIndexWorker(batch, infoSrv, idThread, idTrackerCycle, idTxBatch, count).run();
return batch.size(); return batch.size();
}).reduce(0, Integer::sum)).get(); }).reduce(0, Integer::sum)).get();
} }
@@ -1106,11 +1119,23 @@ public class MetadataTracker extends ActivatableTracker
{ {
InformationServer infoServer; InformationServer infoServer;
List<Node> nodes; List<Node> nodes;
// Unique Id for the worker > thread : trackerCycle : txBatch : worker
long idThread;
long idTrackerCycle;
long idTxBatch;
int idWorker;
// Link logger messages to parent Class MetadataTracker
protected Logger LOGGER = LoggerFactory.getLogger(MetadataTracker.class);
NodeIndexWorker(List<Node> nodes, InformationServer infoServer) NodeIndexWorker(List<Node> nodes, InformationServer infoServer, long idThread, long idTrackerCycle, long idTxBatch, int idWorker)
{ {
this.infoServer = infoServer; this.infoServer = infoServer;
this.nodes = nodes; this.nodes = nodes;
this.idThread = idThread;
this.idTrackerCycle = idTrackerCycle;
this.idTxBatch = idTxBatch;
this.idWorker = idWorker;
} }
@Override @Override
@@ -1121,12 +1146,17 @@ public class MetadataTracker extends ActivatableTracker
{ {
this.infoServer.indexNodes(filteredNodes, true); this.infoServer.indexNodes(filteredNodes, true);
} }
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("{}:{}:{}:{}-[CORE {}] ...indexed",
idThread, idTrackerCycle, idTxBatch, idWorker, coreName);
}
} }
@Override @Override
protected void onFail(Throwable failCausedBy) protected void onFail(Throwable failCausedBy)
{ {
setRollback(true, failCausedBy); setRollback(true, failCausedBy);
} }
private List<Node> filterNodes(List<Node> nodes) private List<Node> filterNodes(List<Node> nodes)