add reconcile throttling waits
This commit is contained in:
@@ -75,6 +75,12 @@ public class AcsReconcileService implements DisposableBean {
|
||||
@Value("${inteligr8.asie.reconciliation.concurrency}")
|
||||
private int concurrency;
|
||||
|
||||
@Value("${inteligr8.asie.reconciliation.waitAfterSolrNodeActionMillis}")
|
||||
private long waitAfterSolrNodeActionMillis;
|
||||
|
||||
@Value("${inteligr8.asie.reconciliation.waitAfterSolrNodeReconcileMillis}")
|
||||
private long waitAfterSolrNodeReconcileMillis;
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
ExecutorService executor = this.executorManager.get("solr-reconcile");
|
||||
@@ -238,6 +244,7 @@ public class AcsReconcileService implements DisposableBean {
|
||||
this.logger.trace("Attempting to reconcile ACS node: {}", nodeDbId);
|
||||
|
||||
Callable<Void> callable;
|
||||
boolean callingSolr = false;
|
||||
|
||||
final int dbIdIndex = (int) (nodeDbId - fromDbId);
|
||||
if (nodeRefs[dbIdIndex] != null) {
|
||||
@@ -257,6 +264,8 @@ public class AcsReconcileService implements DisposableBean {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
if (reindexReconciled)
|
||||
callingSolr = true;
|
||||
} else {
|
||||
callable = new Callable<Void>() {
|
||||
@Override
|
||||
@@ -265,6 +274,7 @@ public class AcsReconcileService implements DisposableBean {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
callingSolr = true;
|
||||
}
|
||||
|
||||
if (queueTimeout < 0L) {
|
||||
@@ -272,36 +282,42 @@ public class AcsReconcileService implements DisposableBean {
|
||||
} else {
|
||||
future.combine(executor.submit(callable, queueTimeout, queueUnit));
|
||||
}
|
||||
|
||||
if (callingSolr && this.waitAfterSolrNodeReconcileMillis > 0L) {
|
||||
this.logger.trace("Waiting between each node reconcile");
|
||||
Thread.sleep(this.waitAfterSolrNodeReconcileMillis);
|
||||
}
|
||||
}
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
public void reconcile(long nodeDbId,
|
||||
public boolean reconcile(long nodeDbId,
|
||||
boolean index,
|
||||
ReconcileCallback callback) throws InterruptedException, TimeoutException {
|
||||
NodeRef nodeRef = this.nodeService.getNodeRef(nodeDbId);
|
||||
if (nodeRef == null) {
|
||||
this.logger.trace("No such ACS node: {}; skipping ...", nodeDbId);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!StoreRef.STORE_REF_WORKSPACE_SPACESSTORE.equals(nodeRef.getStoreRef())) {
|
||||
this.logger.trace("A deliberately ignored store in the DB is not indexed in Solr: {}: {}", nodeDbId, nodeRef);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
Set<QName> aspects = this.nodeService.getAspects(nodeRef);
|
||||
aspects.retainAll(this.ignoreNodesWithAspects);
|
||||
if (!aspects.isEmpty()) {
|
||||
this.logger.trace("A deliberately ignored node in the DB is not indexed in Solr: {}: {}: {}", nodeDbId, nodeRef, aspects);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!index) {
|
||||
this.logger.debug("A node in the DB is not indexed in Solr: {}: {}", nodeDbId, nodeRef);
|
||||
this.reconcileLogger.info("UNRECONCILED: {} <=> {}", nodeDbId, nodeRef);
|
||||
callback.unreconciled(nodeDbId);
|
||||
return false;
|
||||
} else {
|
||||
this.logger.debug("A node in the DB is not indexed in Solr; attempt to index: {}: {}", nodeDbId, nodeRef);
|
||||
this.index(nodeDbId, nodeRef, callback);
|
||||
@@ -309,6 +325,7 @@ public class AcsReconcileService implements DisposableBean {
|
||||
// its results will be logged
|
||||
// the reconcile thread will continue independently
|
||||
// the callback will lag
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -343,7 +360,13 @@ public class AcsReconcileService implements DisposableBean {
|
||||
}
|
||||
};
|
||||
|
||||
return this.indexService.index(nodeDbId, indexCallback);
|
||||
Future<Void> future = this.indexService.index(nodeDbId, indexCallback);
|
||||
|
||||
if (this.waitAfterSolrNodeActionMillis > 0L) {
|
||||
Thread.sleep(this.waitAfterSolrNodeActionMillis);
|
||||
}
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
public Future<Void> reindex(long nodeDbId, NodeRef nodeRef, ReconcileCallback callback) throws InterruptedException {
|
||||
@@ -377,7 +400,13 @@ public class AcsReconcileService implements DisposableBean {
|
||||
}
|
||||
};
|
||||
|
||||
return this.reindexService.reindex(nodeDbId, reindexCallback);
|
||||
Future<Void> future = this.reindexService.reindex(nodeDbId, reindexCallback);
|
||||
|
||||
if (this.waitAfterSolrNodeActionMillis > 0L) {
|
||||
Thread.sleep(this.waitAfterSolrNodeActionMillis);
|
||||
}
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
private String formatForFts(QName qname) {
|
||||
|
||||
@@ -19,6 +19,8 @@ inteligr8.asie.reconciliation.nodesChunkSize=250
|
||||
inteligr8.asie.reconciliation.nodeTimeoutSeconds=10
|
||||
inteligr8.asie.reconciliation.concurrentQueueSize=32
|
||||
inteligr8.asie.reconciliation.concurrency=2
|
||||
inteligr8.asie.reconciliation.waitAfterSolrNodeActionMillis=0
|
||||
inteligr8.asie.reconciliation.waitAfterSolrNodeReconcileMillis=0
|
||||
|
||||
# Action (like indexing and re-indexing) configuration
|
||||
inteligr8.asie.default.concurrentQueueSize=32
|
||||
|
||||
Reference in New Issue
Block a user