added reconcile/reindex/retry/fix/purge services

This commit is contained in:
2025-03-05 13:28:41 -05:00
parent 0ed41a39e4
commit 493f1f813d
37 changed files with 2662 additions and 5 deletions

View File

@@ -83,7 +83,7 @@
<dependency> <dependency>
<groupId>com.inteligr8.alfresco</groupId> <groupId>com.inteligr8.alfresco</groupId>
<artifactId>cxf-jaxrs-platform-module</artifactId> <artifactId>cxf-jaxrs-platform-module</artifactId>
<version>1.3.1-acs-v23.3</version> <version>1.3.2-acs-v23.3</version>
<type>amp</type> <type>amp</type>
</dependency> </dependency>

View File

@@ -91,7 +91,7 @@ public abstract class AbstractUnregisterNodeWebScript<T extends NodeParameterSet
if (status == null) { if (status == null) {
this.logger.warn("Registered host/core status could not be retrieved: {}:{}/solr/{}", nodeHostname, nodePort, core); this.logger.warn("Registered host/core status could not be retrieved: {}:{}/solr/{}", nodeHostname, nodePort, core);
} else { } else {
CoreMetadata coreMetadata = status.getStatus().getCores().get(core); CoreMetadata coreMetadata = status.getCores().getByCore(core);
if (coreMetadata == null || coreMetadata.getName() == null) { if (coreMetadata == null || coreMetadata.getName() == null) {
this.logger.warn("Registered core does not actually exist on the node host; could be a DNS issue: {}:{}/solr/{}", nodeHostname, nodePort, core); this.logger.warn("Registered core does not actually exist on the node host; could be a DNS issue: {}:{}/solr/{}", nodeHostname, nodePort, core);
} else { } else {

View File

@@ -36,15 +36,20 @@
<dependency> <dependency>
<groupId>com.inteligr8.alfresco</groupId> <groupId>com.inteligr8.alfresco</groupId>
<artifactId>asie-api</artifactId> <artifactId>asie-api</artifactId>
<version>1.0-SNAPSHOT-asie2</version> <version>1.1-SNAPSHOT-asie2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.inteligr8</groupId> <groupId>com.inteligr8</groupId>
<artifactId>common-rest-client</artifactId> <artifactId>common-rest-client</artifactId>
<version>3.0.1-cxf</version> <version>3.0.3-cxf</version>
</dependency> </dependency>
<!-- Needed by this module, but provided by ACS --> <!-- Needed by this module, but provided by ACS -->
<dependency>
<groupId>org.alfresco</groupId>
<artifactId>alfresco-data-model</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.alfresco</groupId> <groupId>org.alfresco</groupId>
<artifactId>alfresco-repository</artifactId> <artifactId>alfresco-repository</artifactId>

View File

@@ -0,0 +1,119 @@
package com.inteligr8.alfresco.asie.rest;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.alfresco.model.ContentModel;
import org.alfresco.service.cmr.repository.InvalidNodeRefException;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.repository.StoreRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.extensions.webscripts.WebScriptException;
import org.springframework.extensions.webscripts.WebScriptRequest;
import org.springframework.extensions.webscripts.WebScriptResponse;
import org.springframework.http.HttpStatus;
import com.inteligr8.alfresco.asie.model.ShardInstance;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
public abstract class AbstractAcsNodeActionWebScript extends AbstractAsieWebScript {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private NodeService nodeService;
@Override
public void executeAuthorized(WebScriptRequest request, WebScriptResponse response) throws IOException {
String nodeId = request.getServiceMatch().getTemplateVars().get("nodeId");
NodeRef nodeRef = new NodeRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, nodeId);
long nodeDbId = this.findNodeDbId(nodeRef);
this.logger.trace("Found node database ID: {}: {}", nodeId, nodeDbId);
try {
Map<String, Object> responseMap = new HashMap<>();
responseMap.put("nodeDbId", nodeDbId);
ActionCallback callback = new ActionCallback() {
@Override
public void success(ShardInstance instance) {
@SuppressWarnings("unchecked")
List<String> instances = (List<String>) responseMap.get("success");
if (instances == null)
responseMap.put("success", instances = new LinkedList<>());
instances.add(instance.getSpec());
}
@Override
public void scheduled(ShardInstance instance) {
@SuppressWarnings("unchecked")
List<String> instances = (List<String>) responseMap.get("scheduled");
if (instances == null)
responseMap.put("scheduled", instances = new LinkedList<>());
instances.add(instance.getSpec());
}
@Override
public void error(ShardInstance instance, String message) {
@SuppressWarnings("unchecked")
Map<String, Object> instances = (Map<String, Object>) responseMap.get("error");
if (instances == null)
responseMap.put("error", instances = new HashMap<>());
instances.put(instance.getSpec(), Collections.singletonMap("message", message));
}
@Override
public void unknownResult(ShardInstance instance) {
@SuppressWarnings("unchecked")
List<String> instances = (List<String>) responseMap.get("unknown");
if (instances == null)
responseMap.put("unknown", instances = new LinkedList<>());
instances.add(instance.getSpec());
}
};
this.executeAction(nodeDbId, callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS);
if (responseMap.containsKey("error")) {
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value());
} else if (responseMap.containsKey("scheduled")) {
response.setStatus(HttpStatus.ACCEPTED.value());
} else {
response.setStatus(HttpStatus.OK.value());
}
response.setContentType("application/json");
this.getObjectMapper().writeValue(response.getWriter(), responseMap);
} catch (UnsupportedOperationException uoe) {
throw new WebScriptException(HttpStatus.NOT_IMPLEMENTED.value(), uoe.getMessage(), uoe);
} catch (InterruptedException ie) {
throw new WebScriptException(HttpStatus.SERVICE_UNAVAILABLE.value(), "The execution was interrupted", ie);
} catch (TimeoutException te) {
throw new WebScriptException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "The execution may continue, but timed-out waiting", te);
}
}
protected abstract void executeAction(
long nodeDbId, ActionCallback callback,
long fullQueueTimeout, TimeUnit fullQueueUnit,
long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException;
private long findNodeDbId(NodeRef nodeRef) {
try {
return (Long) this.nodeService.getProperty(nodeRef, ContentModel.PROP_NODE_DBID);
} catch (InvalidNodeRefException inre) {
throw new WebScriptException(HttpStatus.NOT_FOUND.value(), "The node does not exist");
}
}
}

View File

@@ -0,0 +1,102 @@
package com.inteligr8.alfresco.asie.rest;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.alfresco.model.ContentModel;
import org.alfresco.service.cmr.repository.InvalidNodeRefException;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.repository.StoreRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.extensions.webscripts.WebScriptException;
import org.springframework.extensions.webscripts.WebScriptRequest;
import org.springframework.extensions.webscripts.WebScriptResponse;
import org.springframework.http.HttpStatus;
import com.inteligr8.alfresco.asie.model.ShardInstance;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
public abstract class AbstractActionWebScript extends AbstractAsieWebScript {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void executeAuthorized(WebScriptRequest request, WebScriptResponse response) throws IOException {
try {
Map<String, Object> responseMap = new HashMap<>();
ActionCallback callback = new ActionCallback() {
@Override
public void success(ShardInstance instance) {
@SuppressWarnings("unchecked")
List<String> instances = (List<String>) responseMap.get("success");
if (instances == null)
responseMap.put("success", instances = new LinkedList<>());
instances.add(instance.getSpec());
}
@Override
public void scheduled(ShardInstance instance) {
@SuppressWarnings("unchecked")
List<String> instances = (List<String>) responseMap.get("scheduled");
if (instances == null)
responseMap.put("scheduled", instances = new LinkedList<>());
instances.add(instance.getSpec());
}
@Override
public void error(ShardInstance instance, String message) {
@SuppressWarnings("unchecked")
Map<String, Object> instances = (Map<String, Object>) responseMap.get("error");
if (instances == null)
responseMap.put("error", instances = new HashMap<>());
instances.put(instance.getSpec(), Collections.singletonMap("message", message));
}
@Override
public void unknownResult(ShardInstance instance) {
@SuppressWarnings("unchecked")
List<String> instances = (List<String>) responseMap.get("unknown");
if (instances == null)
responseMap.put("unknown", instances = new LinkedList<>());
instances.add(instance.getSpec());
}
};
this.executeAction(callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS);
if (responseMap.containsKey("error")) {
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value());
} else if (responseMap.containsKey("scheduled")) {
response.setStatus(HttpStatus.ACCEPTED.value());
} else {
response.setStatus(HttpStatus.OK.value());
}
response.setContentType("application/json");
this.getObjectMapper().writeValue(response.getWriter(), responseMap);
} catch (UnsupportedOperationException uoe) {
throw new WebScriptException(HttpStatus.NOT_IMPLEMENTED.value(), uoe.getMessage(), uoe);
} catch (InterruptedException ie) {
throw new WebScriptException(HttpStatus.SERVICE_UNAVAILABLE.value(), "The execution was interrupted", ie);
} catch (TimeoutException te) {
throw new WebScriptException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "The execution may continue, but timed-out waiting", te);
}
}
protected abstract void executeAction(
ActionCallback callback,
long fullQueueTimeout, TimeUnit fullQueueUnit,
long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException;
}

View File

@@ -0,0 +1,24 @@
package com.inteligr8.alfresco.asie.rest;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.service.FixService;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
@Component(value = "webscript.com.inteligr8.alfresco.asie.fix.post")
public class FixWebScript extends AbstractActionWebScript {
@Autowired
private FixService fixSerivce;
@Override
protected void executeAction(ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit,
long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException {
this.fixSerivce.fix(callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS);
}
}

View File

@@ -0,0 +1,24 @@
package com.inteligr8.alfresco.asie.rest;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.service.PurgeService;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
@Component(value = "webscript.com.inteligr8.alfresco.asie.purgeAcsNode.put")
public class PurgeAcsNodeWebScript extends AbstractAcsNodeActionWebScript {
@Autowired
private PurgeService purgeSerivce;
@Override
protected void executeAction(long nodeDbId, ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit,
long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException {
this.purgeSerivce.purge(nodeDbId, callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS);
}
}

View File

@@ -0,0 +1,128 @@
package com.inteligr8.alfresco.asie.rest;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.extensions.webscripts.WebScriptException;
import org.springframework.extensions.webscripts.WebScriptRequest;
import org.springframework.extensions.webscripts.WebScriptResponse;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.model.ShardInstance;
import com.inteligr8.alfresco.asie.service.AcsReconcileService;
import com.inteligr8.alfresco.asie.spi.ReconcileCallback;
@Component(value = "webscript.com.inteligr8.alfresco.asie.reconcileAcsNodes.post")
public class ReconcileAcsNodesWebScript extends AbstractAsieWebScript {
@Autowired
private AcsReconcileService reconcileService;
@Override
public void executeAuthorized(WebScriptRequest request, WebScriptResponse response) throws IOException {
final int fromDbId = this.getRequestTemplateIntegerVariable(request, "fromDbId");
final int toDbId = this.getRequestTemplateIntegerVariable(request, "toDbId");
final boolean reindex = Boolean.TRUE.equals(this.getOptionalQueryParameter(request, "reindex", Boolean.class));
final boolean includeReconciled = Boolean.TRUE.equals(this.getOptionalQueryParameter(request, "includeReconciled", Boolean.class));
final Map<String, Object> responseMap = new HashMap<>();
ReconcileCallback callback = new ReconcileCallback() {
@Override
public void reconciled(long nodeDbId) {
if (includeReconciled) {
@SuppressWarnings("unchecked")
List<Long> unreconciledNodeDbIds = (List<Long>) responseMap.get("reconciled");
if (unreconciledNodeDbIds == null)
responseMap.put("reconciled", unreconciledNodeDbIds = new LinkedList<>());
unreconciledNodeDbIds.add(nodeDbId);
}
}
@Override
public void unreconciled(long nodeDbId) {
@SuppressWarnings("unchecked")
List<Long> unreconciledNodeDbIds = (List<Long>) responseMap.get("unreconciled");
if (unreconciledNodeDbIds == null)
responseMap.put("unreconciled", unreconciledNodeDbIds = new LinkedList<>());
unreconciledNodeDbIds.add(nodeDbId);
}
@Override
public void processed(long nodeDbId, Set<ShardInstance> instsReconciled, Set<ShardInstance> instsReconciling,
Map<ShardInstance, String> instsErrorMessages) {
if (!instsReconciled.isEmpty()) {
@SuppressWarnings("unchecked")
Map<Long, List<String>> nodeHosts = (Map<Long, List<String>>) responseMap.get("success");
if (nodeHosts == null)
responseMap.put("success", nodeHosts = new HashMap<>());
List<String> instances = new LinkedList<>();
for (ShardInstance instance : instsReconciled)
instances.add(instance.getSpec());
nodeHosts.put(nodeDbId, instances);
}
if (!instsReconciling.isEmpty()) {
@SuppressWarnings("unchecked")
Map<Long, List<String>> nodeHosts = (Map<Long, List<String>>) responseMap.get("scheduled");
if (nodeHosts == null)
responseMap.put("scheduled", nodeHosts = new HashMap<>());
List<String> instances = new LinkedList<>();
for (ShardInstance instance : instsReconciled)
instances.add(instance.getSpec());
nodeHosts.put(nodeDbId, instances);
}
if (!instsErrorMessages.isEmpty()) {
@SuppressWarnings("unchecked")
Map<Long, Map<String, Map<String, String>>> nodeHosts = (Map<Long, Map<String, Map<String, String>>>) responseMap.get("error");
if (nodeHosts == null)
responseMap.put("error", nodeHosts = new HashMap<>());
Map<String, Map<String, String>> nodeHost = new HashMap<>();
for (Entry<ShardInstance, String> message : instsErrorMessages.entrySet())
nodeHost.put(message.getKey().getSpec(), Collections.singletonMap("message", message.getValue()));
nodeHosts.put(nodeDbId, nodeHost);
}
}
};
try {
this.reconcileService.reconcile(fromDbId, toDbId, null, reindex, callback, 1L, TimeUnit.HOURS, 2L, TimeUnit.MINUTES);
if (responseMap.containsKey("error")) {
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value());
} else if (responseMap.containsKey("scheduled")) {
response.setStatus(HttpStatus.ACCEPTED.value());
} else {
response.setStatus(HttpStatus.OK.value());
}
response.setContentType("application/json");
this.getObjectMapper().writeValue(response.getWriter(), responseMap);
} catch (InterruptedException ie) {
throw new WebScriptException(HttpStatus.SERVICE_UNAVAILABLE.value(), "The reindex was interrupted", ie);
} catch (TimeoutException te) {
throw new WebScriptException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "The reindex may continue, but timed-out waiting", te);
}
}
private int getRequestTemplateIntegerVariable(WebScriptRequest request, String templateVariableName) {
String str = request.getServiceMatch().getTemplateVars().get(templateVariableName);
return Integer.valueOf(str);
}
}

View File

@@ -0,0 +1,24 @@
package com.inteligr8.alfresco.asie.rest;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.service.ReindexService;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
@Component(value = "webscript.com.inteligr8.alfresco.asie.reindexAcsNode.put")
public class ReindexAcsNodeWebScript extends AbstractAcsNodeActionWebScript {
@Autowired
private ReindexService reindexSerivce;
@Override
protected void executeAction(long nodeDbId, ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit,
long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException {
this.reindexSerivce.reindex(nodeDbId, callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS);
}
}

View File

@@ -0,0 +1,24 @@
package com.inteligr8.alfresco.asie.rest;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.service.RetryService;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
@Component(value = "webscript.com.inteligr8.alfresco.asie.retry.post")
public class RetryWebScript extends AbstractActionWebScript {
@Autowired
private RetryService retrySerivce;
@Override
protected void executeAction(ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit,
long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException {
this.retrySerivce.retry(callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS);
}
}

View File

@@ -0,0 +1,236 @@
package com.inteligr8.alfresco.asie.service;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.index.shard.Floc;
import org.alfresco.repo.index.shard.Shard;
import org.alfresco.repo.index.shard.ShardInstance;
import org.alfresco.repo.index.shard.ShardRegistry;
import org.alfresco.repo.index.shard.ShardState;
import org.alfresco.service.cmr.repository.StoreRef;
import org.alfresco.service.cmr.search.SearchParameters;
import org.alfresco.service.cmr.search.SearchService;
import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import com.inteligr8.alfresco.asie.Constants;
import com.inteligr8.alfresco.asie.api.CoreAdminApi;
import com.inteligr8.alfresco.asie.model.ActionCoreResponse;
import com.inteligr8.alfresco.asie.model.ShardSet;
import com.inteligr8.alfresco.asie.model.SolrHost;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
import com.inteligr8.alfresco.asie.util.CompositeFuture;
import com.inteligr8.alfresco.asie.util.ThrottledThreadPoolExecutor;
import com.inteligr8.solr.model.Action;
import com.inteligr8.solr.model.ActionResponse;
import com.inteligr8.solr.model.BaseResponse;
public abstract class AbstractActionService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private NamespaceService namespaceService;
@Autowired
private ApiService apiService;
@Autowired
private ExecutorManager executorManager;
@Autowired(required = false)
@Qualifier(Constants.QUALIFIER_ASIE)
private ShardRegistry shardRegistry;
@Value("${inteligr8.asie.default.concurrentQueueSize:64}")
private int concurrentQueueSize;
@Value("${inteligr8.asie.default.concurrency:16}")
private int concurrency;
protected int getConcurrency() {
return this.concurrency;
}
protected int getConcurrentQueueSize() {
return this.concurrentQueueSize;
}
protected abstract String getThreadNamePrefix();
protected abstract String getActionName();
/**
* This method executes an action on the specified node in Solr using its
* ACS unique database identifier. The callback handles all the return
* values. This is the synchronous alternative to the other `action`
* method.
*
* There are two sets of parameters regarding timeouts. The queue timeouts
* are for how long the requesting thread should wait for a full queue to
* open up space for new executions. The execution timeouts are for how
* long the execution should be allowed to take once dequeued. There is no
* timeout for how long the execution is queued.
*
* @param callback A callback to process multiple returned values from the action.
* @param fullQueueTimeout A timeout for how long the calling thread should wait for space on the queue.
* @param fullQueueUnit The time units for the `queueTimeout`.
* @param execTimeout A timeout for the elapsed time the execution should take when dequeued.
* @param execUnit The time units for the `execTimeout`.
* @throws TimeoutException Either the queue or execution timeout lapsed.
* @throws InterruptedException The execution was interrupted (server shutdown).
*/
protected void action(ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException {
long fullQueueExpireTimeMillis = System.currentTimeMillis() + fullQueueUnit.toMillis(fullQueueTimeout);
Future<Void> future = this._action(callback, fullQueueExpireTimeMillis);
try {
future.get(execTimeout, execUnit);
} catch (ExecutionException ee) {
this.logger.error("Reindex thread failed: " + ee.getMessage(), ee);
}
}
/**
* This method executes an action on the specified node in Solr using its
* ACS unique database identifier. The callback handles all the return
* values. This is the asynchronous alternative to the other `action`
* method.
*
* This method may block indefinitely when the queue is full. Once all
* executions are queued, this will return a single future representing all
* the executions.
*
* @param callback A callback to process multiple returned values from the execution.
* @return A reference to the future or active executing task.
* @throws InterruptedException The execution was interrupted (server shutdown).
*/
protected Future<Void> action(ActionCallback callback) throws InterruptedException {
try {
return this._action(callback, null);
} catch (TimeoutException te) {
throw new RuntimeException("This should never happen: " + te.getMessage(), te);
}
}
private Future<Void> _action(ActionCallback callback, Long fullQueueExpireTimeMillis) throws TimeoutException, InterruptedException {
List<com.inteligr8.alfresco.asie.model.ShardInstance> eligibleInstances = this.findPossibleShardInstances();
this.logger.debug("Will attempt to {} {} shard instances", this.getActionName(), eligibleInstances.size());
CompositeFuture<Void> future = new CompositeFuture<>();
ThrottledThreadPoolExecutor executor = this.executorManager.createThrottled(
this.getThreadNamePrefix(),
this.getConcurrency(), this.getConcurrency(), this.getConcurrentQueueSize(),
1L, TimeUnit.MINUTES);
for (final com.inteligr8.alfresco.asie.model.ShardInstance instance : eligibleInstances) {
this.logger.trace("Will attempt to {} shard instance: {}", this.getActionName(), instance);
Callable<Void> callable = new Callable<>() {
@Override
public Void call() {
String core = instance.extractShard().getCoreName();
SolrHost host = instance.extractNode();
URL url = host.toUrl(apiService.isSecure() ? "https" : "http");
CoreAdminApi api = apiService.createApi(url.toString(), CoreAdminApi.class);
try {
logger.debug("Performing {} of shard instance: {}", getActionName(), instance);
BaseResponse apiResponse = execute(api, core);
logger.trace("Performed {} of shard instance: {}", getActionName(), instance);
Action action = null;
if (apiResponse instanceof ActionCoreResponse<?>) {
action = ((ActionCoreResponse<Action>) apiResponse).getCores().getByCore(core);
} else if (apiResponse instanceof ActionResponse<?>) {
action = ((ActionResponse<Action>) apiResponse).getAction();
}
if (action == null) {
callback.unknownResult(instance);
} else {
switch (action.getStatus()) {
case Scheduled:
callback.scheduled(instance);
break;
case Success:
callback.success(instance);
break;
default:
if (apiResponse instanceof com.inteligr8.alfresco.asie.model.BaseResponse) {
com.inteligr8.alfresco.asie.model.BaseResponse asieResponse = (com.inteligr8.alfresco.asie.model.BaseResponse) apiResponse;
logger.debug("Performance of {} of shard instance failed: {}: {}", getActionName(), instance, asieResponse.getException());
callback.error(instance, asieResponse.getException());
} else {
logger.debug("Performance of {} of shard instance failed: {}: {}", getActionName(), instance, apiResponse.getResponseHeader().getStatus());
callback.error(instance, String.valueOf(apiResponse.getResponseHeader().getStatus()));
}
}
}
} catch (Exception e) {
logger.error("An exception occurred", e);
callback.error(instance, e.getMessage());
}
return null;
}
};
if (fullQueueExpireTimeMillis == null) {
future.combine(executor.submit(callable, -1L, null));
} else {
future.combine(executor.submit(callable, fullQueueExpireTimeMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
}
}
return future;
}
protected abstract BaseResponse execute(CoreAdminApi api, String core);
private List<com.inteligr8.alfresco.asie.model.ShardInstance> findPossibleShardInstances() {
if (this.shardRegistry == null)
throw new UnsupportedOperationException("ACS instances without a sharding configuration are not yet implemented");
List<com.inteligr8.alfresco.asie.model.ShardInstance> instances = new LinkedList<>();
for (Entry<Floc, Map<Shard, Set<ShardState>>> floc : this.shardRegistry.getFlocs().entrySet()) {
if (!floc.getKey().getStoreRefs().contains(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE))
continue;
for (Entry<Shard, Set<ShardState>> shard : floc.getValue().entrySet()) {
for (ShardState shardState : shard.getValue())
instances.add(this.toModel(shardState.getShardInstance(), shardState));
}
}
this.logger.trace("Despite sharding, considering all shards and nodes: {}", instances);
return instances;
}
private com.inteligr8.alfresco.asie.model.ShardInstance toModel(ShardInstance instance, ShardState anyShardState) {
Floc floc = instance.getShard().getFloc();
ShardSet shardSet = ShardSet.from(floc, anyShardState);
SolrHost host = SolrHost.from(instance);
com.inteligr8.alfresco.asie.model.Shard shard = com.inteligr8.alfresco.asie.model.Shard.from(shardSet, instance.getShard().getInstance());
return com.inteligr8.alfresco.asie.model.ShardInstance.from(shard, host);
}
}

View File

@@ -0,0 +1,268 @@
package com.inteligr8.alfresco.asie.service;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.index.shard.Floc;
import org.alfresco.repo.index.shard.Shard;
import org.alfresco.repo.index.shard.ShardInstance;
import org.alfresco.repo.index.shard.ShardRegistry;
import org.alfresco.repo.index.shard.ShardState;
import org.alfresco.service.cmr.repository.StoreRef;
import org.alfresco.service.cmr.search.SearchParameters;
import org.alfresco.service.cmr.search.SearchService;
import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import com.inteligr8.alfresco.asie.Constants;
import com.inteligr8.alfresco.asie.api.CoreAdminApi;
import com.inteligr8.alfresco.asie.model.ActionCoreResponse;
import com.inteligr8.alfresco.asie.model.ShardSet;
import com.inteligr8.alfresco.asie.model.SolrHost;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
import com.inteligr8.alfresco.asie.util.CompositeFuture;
import com.inteligr8.alfresco.asie.util.ThrottledThreadPoolExecutor;
import com.inteligr8.solr.model.Action;
import com.inteligr8.solr.model.ActionResponse;
import com.inteligr8.solr.model.BaseResponse;
public abstract class AbstractNodeActionService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private NamespaceService namespaceService;
@Autowired
private ApiService apiService;
@Autowired
private ExecutorManager executorManager;
@Autowired(required = false)
@Qualifier(Constants.QUALIFIER_ASIE)
private ShardRegistry shardRegistry;
@Value("${inteligr8.asie.default.concurrentQueueSize:64}")
private int concurrentQueueSize;
@Value("${inteligr8.asie.default.concurrency:16}")
private int concurrency;
protected int getConcurrency() {
return this.concurrency;
}
protected int getConcurrentQueueSize() {
return this.concurrentQueueSize;
}
protected abstract String getThreadNamePrefix();
protected abstract String getActionName();
/**
* This method executes an action on the specified node in Solr using its
* ACS unique database identifier. The callback handles all the return
* values. This is the synchronous alternative to the other `action`
* method.
*
* There are two sets of parameters regarding timeouts. The queue timeouts
* are for how long the requesting thread should wait for a full queue to
* open up space for new executions. The execution timeouts are for how
* long the execution should be allowed to take once dequeued. There is no
* timeout for how long the execution is queued.
*
* @param nodeDbId A node database ID.
* @param callback A callback to process multiple returned values from the action.
* @param fullQueueTimeout A timeout for how long the calling thread should wait for space on the queue.
* @param fullQueueUnit The time units for the `queueTimeout`.
* @param execTimeout A timeout for the elapsed time the execution should take when dequeued.
* @param execUnit The time units for the `execTimeout`.
* @throws TimeoutException Either the queue or execution timeout lapsed.
* @throws InterruptedException The execution was interrupted (server shutdown).
*/
protected void action(long nodeDbId, ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException {
long fullQueueExpireTimeMillis = System.currentTimeMillis() + fullQueueUnit.toMillis(fullQueueTimeout);
Future<Void> future = this._action(nodeDbId, callback, fullQueueExpireTimeMillis);
try {
future.get(execTimeout, execUnit);
} catch (ExecutionException ee) {
this.logger.error("Reindex thread failed: " + ee.getMessage(), ee);
}
}
/**
* This method executes an action on the specified node in Solr using its
* ACS unique database identifier. The callback handles all the return
* values. This is the asynchronous alternative to the other `action`
* method.
*
* This method may block indefinitely when the queue is full. Once all
* executions are queued, this will return a single future representing all
* the executions.
*
* @param nodeDbId A node database ID.
* @param callback A callback to process multiple returned values from the execution.
* @return A reference to the future or active executing task.
* @throws InterruptedException The execution was interrupted (server shutdown).
*/
protected Future<Void> action(long nodeDbId, ActionCallback callback) throws InterruptedException {
try {
return this._action(nodeDbId, callback, null);
} catch (TimeoutException te) {
throw new RuntimeException("This should never happen: " + te.getMessage(), te);
}
}
private Future<Void> _action(long nodeDbId, ActionCallback callback, Long fullQueueExpireTimeMillis) throws TimeoutException, InterruptedException {
List<com.inteligr8.alfresco.asie.model.ShardInstance> eligibleInstances = this.findPossibleShardInstances(nodeDbId);
this.logger.debug("Will attempt to {} ACS node against {} shard instances: {}", this.getActionName(), eligibleInstances.size(), nodeDbId);
CompositeFuture<Void> future = new CompositeFuture<>();
ThrottledThreadPoolExecutor executor = this.executorManager.createThrottled(
this.getThreadNamePrefix(),
this.getConcurrency(), this.getConcurrency(), this.getConcurrentQueueSize(),
1L, TimeUnit.MINUTES);
for (final com.inteligr8.alfresco.asie.model.ShardInstance instance : eligibleInstances) {
this.logger.trace("Will attempt to {} ACS node against shard instance: {}: {}", this.getActionName(), nodeDbId, instance);
Callable<Void> callable = new Callable<>() {
@Override
public Void call() {
String core = instance.extractShard().getCoreName();
SolrHost host = instance.extractNode();
URL url = host.toUrl(apiService.isSecure() ? "https" : "http");
CoreAdminApi api = apiService.createApi(url.toString(), CoreAdminApi.class);
try {
logger.debug("Performing {} of ACS node against shard instance: {}: {}", getActionName(), nodeDbId, instance);
BaseResponse apiResponse = execute(api, core, nodeDbId);
logger.trace("Performed {} of ACS node against shard instance: {}: {}", getActionName(), nodeDbId, instance);
Action action = null;
if (apiResponse instanceof ActionCoreResponse<?>) {
action = ((ActionCoreResponse<Action>) apiResponse).getCores().getByCore(core);
} else if (apiResponse instanceof ActionResponse<?>) {
action = ((ActionResponse<Action>) apiResponse).getAction();
}
if (action == null) {
callback.unknownResult(instance);
} else {
switch (action.getStatus()) {
case Scheduled:
callback.scheduled(instance);
break;
case Success:
callback.success(instance);
break;
default:
if (apiResponse instanceof com.inteligr8.alfresco.asie.model.BaseResponse) {
com.inteligr8.alfresco.asie.model.BaseResponse asieResponse = (com.inteligr8.alfresco.asie.model.BaseResponse) apiResponse;
logger.debug("Performance of {} of ACS node against shard instance failed: {}: {}: {}", getActionName(), nodeDbId, instance, asieResponse.getException());
callback.error(instance, asieResponse.getException());
} else {
logger.debug("Performance of {} of ACS node against shard instance failed: {}: {}: {}", getActionName(), nodeDbId, instance, apiResponse.getResponseHeader().getStatus());
callback.error(instance, String.valueOf(apiResponse.getResponseHeader().getStatus()));
}
}
}
} catch (Exception e) {
logger.error("An exception occurred", e);
callback.error(instance, e.getMessage());
}
return null;
}
};
if (fullQueueExpireTimeMillis == null) {
future.combine(executor.submit(callable, -1L, null));
} else {
future.combine(executor.submit(callable, fullQueueExpireTimeMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
}
}
return future;
}
protected abstract BaseResponse execute(CoreAdminApi api, String core, long nodeDbId);
private List<com.inteligr8.alfresco.asie.model.ShardInstance> findPossibleShardInstances(long nodeDbId) {
if (this.shardRegistry == null)
throw new UnsupportedOperationException("ACS instances without a sharding configuration are not yet implemented");
SearchParameters searchParams = new SearchParameters();
searchParams.setLanguage(SearchService.LANGUAGE_FTS_ALFRESCO);
searchParams.setQuery("@" + this.formatForFts(ContentModel.PROP_NODE_DBID) + ":" + nodeDbId);
List<com.inteligr8.alfresco.asie.model.ShardInstance> instances = new LinkedList<>();
List<ShardInstance> slicedInstances = this.shardRegistry.getIndexSlice(searchParams);
if (slicedInstances != null) {
this.logger.trace("Due to a sharding method, considering only applicable shards and their ASIE nodes: {}: {}", nodeDbId, slicedInstances);
for (ShardInstance instance : slicedInstances)
instances.add(this.toModel(instance));
} else {
for (Entry<Floc, Map<Shard, Set<ShardState>>> floc : this.shardRegistry.getFlocs().entrySet()) {
if (!floc.getKey().getStoreRefs().contains(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE))
continue;
for (Entry<Shard, Set<ShardState>> shard : floc.getValue().entrySet()) {
for (ShardState shardState : shard.getValue())
instances.add(this.toModel(shardState.getShardInstance(), shardState));
}
}
this.logger.trace("Despite sharding, considering all shards and nodes: {}: {}", nodeDbId, instances);
}
return instances;
}
private com.inteligr8.alfresco.asie.model.ShardInstance toModel(ShardInstance instance) {
// get any random shardState
Floc floc = instance.getShard().getFloc();
Map<Shard, Set<ShardState>> shardsStates = this.shardRegistry.getFlocs().get(floc);
if (shardsStates == null)
throw new IllegalStateException();
Set<ShardState> shardStates = shardsStates.get(instance.getShard());
if (shardStates == null || shardStates.isEmpty())
throw new IllegalStateException();
ShardState anyShardState = shardStates.iterator().next();
return this.toModel(instance, anyShardState);
}
private com.inteligr8.alfresco.asie.model.ShardInstance toModel(ShardInstance instance, ShardState anyShardState) {
Floc floc = instance.getShard().getFloc();
ShardSet shardSet = ShardSet.from(floc, anyShardState);
SolrHost host = SolrHost.from(instance);
com.inteligr8.alfresco.asie.model.Shard shard = com.inteligr8.alfresco.asie.model.Shard.from(shardSet, instance.getShard().getInstance());
return com.inteligr8.alfresco.asie.model.ShardInstance.from(shard, host);
}
private String formatForFts(QName qname) {
return qname.toPrefixString(this.namespaceService).replace("-", "\\-").replace(":", "\\:");
}
}

View File

@@ -0,0 +1,309 @@
package com.inteligr8.alfresco.asie.service;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.alfresco.model.ContentModel;
import org.alfresco.model.RenditionModel;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeRef.Status;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.repository.StoreRef;
import org.alfresco.service.cmr.search.QueryConsistency;
import org.alfresco.service.cmr.search.ResultSet;
import org.alfresco.service.cmr.search.SearchParameters;
import org.alfresco.service.cmr.search.SearchService;
import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.apache.commons.collections4.SetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.model.ShardInstance;
import com.inteligr8.alfresco.asie.spi.ReconcileCallback;
import com.inteligr8.alfresco.asie.spi.ReindexCallback;
import com.inteligr8.alfresco.asie.util.CompositeFuture;
import com.inteligr8.alfresco.asie.util.ThrottledThreadPoolExecutor;
@Component
public class AcsReconcileService implements InitializingBean, DisposableBean {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Logger reconcileLogger = LoggerFactory.getLogger("inteligr8.asie.reconcile");
private final Set<QName> ignoreNodesWithAspects = SetUtils.unmodifiableSet(
RenditionModel.ASPECT_RENDITION,
RenditionModel.ASPECT_RENDITION2);
@Autowired
private NodeService nodeService;
@Autowired
private NamespaceService namespaceService;
@Autowired
private SearchService searchService;
@Autowired
private ReindexService reindexService;
@Value("${inteligr8.asie.reconciliation.nodesChunkSize:250}")
private int nodesChunkSize;
@Value("${inteligr8.asie.reconciliation.nodeTimeoutSeconds:10}")
private int nodeTimeoutSeconds;
@Value("${inteligr8.asie.reconciliation.concurrentQueueSize:64}")
private int concurrentQueueSize;
@Value("${inteligr8.asie.reconciliation.concurrency:2}")
private int concurrency;
private ThrottledThreadPoolExecutor executor;
@Override
public void afterPropertiesSet() {
this.executor = new ThrottledThreadPoolExecutor(this.concurrency, this.concurrency, this.concurrentQueueSize, 1L, TimeUnit.MINUTES, "solr-reconcile");
this.executor.prestartAllCoreThreads();
}
@Override
public void destroy() {
this.executor.shutdown();
}
/**
* This method reconciles the specified node range between ACS and Solr.
* The node range is specified using the ACS unique database identifiers.
* There is no other reasonably efficient attack vector. The callback
* handles all the return values. This is the synchronous alternative to
* the other `reconcile` method.
*
* There are two sets of parameters regarding timeouts. The queue timeouts
* are for how long the requesting thread should wait for a full queue to
* open up space for new re-index executions. The execution timeouts are
* for how long the execution should be allowed to take once dequeued.
* There is no timeout for how long the execution is queued.
*
* @param fromDbId A node database ID, inclusive.
* @param toDbId A node database ID, exclusive.
* @param reindexUnreconciled For nodes not found in Solr, attempt to re-index against all applicable Solr instances.
* @param callback A callback to process multiple returned values from the re-index.
* @param queueTimeout A timeout for how long the calling thread should wait for space on the queue.
* @param queueUnit The time units for the `queueTimeout`.
* @param execTimeout A timeout for the elapsed time the reindex execution should take when dequeued.
* @param execUnit The time units for the `execTimeout`.
* @throws TimeoutException Either the queue or execution timeout lapsed.
* @throws InterruptedException The re-index was interrupted (server shutdown).
*/
public void reconcile(
long fromDbId, long toDbId, Integer nodesChunkSize,
boolean reindexUnreconciled,
ReconcileCallback callback,
long queueTimeout, TimeUnit queueUnit,
long execTimeout, TimeUnit execUnit) throws InterruptedException, TimeoutException {
if (nodesChunkSize == null)
nodesChunkSize = this.nodesChunkSize;
if (this.logger.isTraceEnabled())
this.logger.trace("reconcile({}, {}, {}, {}, {}, {})", fromDbId, toDbId, nodesChunkSize, reindexUnreconciled, queueUnit.toMillis(queueTimeout), execUnit.toMillis(execTimeout));
CompositeFuture<Void> future = new CompositeFuture<>();
for (long startDbId = fromDbId; startDbId < toDbId; startDbId += nodesChunkSize) {
long endDbId = Math.min(toDbId, startDbId + nodesChunkSize);
future.combine(this.reconcileChunk(startDbId, endDbId, reindexUnreconciled, callback, queueTimeout, queueUnit, execTimeout, execUnit));
future.purge(true);
}
try {
future.get(execTimeout, execUnit);
} catch (ExecutionException ee) {
this.logger.error("Reconciliation thread failed: " + ee.getMessage(), ee);
}
}
public Future<Void> reconcile(
long fromDbId, long toDbId, Integer nodesChunkSize,
boolean reindexUnreconciled,
ReconcileCallback callback) throws InterruptedException {
if (nodesChunkSize == null)
nodesChunkSize = this.nodesChunkSize;
this.logger.trace("reconcile({}, {}, {}, {})", fromDbId, toDbId, nodesChunkSize, reindexUnreconciled);
CompositeFuture<Void> future = new CompositeFuture<>();
try {
for (long startDbId = fromDbId; startDbId < toDbId; startDbId += nodesChunkSize) {
long endDbId = Math.min(toDbId, startDbId + nodesChunkSize);
future.combine(this.reconcileChunk(startDbId, endDbId, reindexUnreconciled, callback, -1L, null, -1L, null));
future.purge(true);
}
} catch (TimeoutException te) {
throw new RuntimeException("This should never happen: " + te.getMessage(), te);
}
return future;
}
protected Future<Void> reconcileChunk(
long fromDbId, long toDbId,
boolean reindexUnreconciled,
ReconcileCallback callback,
long queueTimeout, TimeUnit queueUnit,
long execTimeout, TimeUnit execUnit) throws InterruptedException, TimeoutException {
if (this.logger.isTraceEnabled())
this.logger.trace("reconcileChunk({}, {}, {}, {}, {})", fromDbId, toDbId, reindexUnreconciled, queueUnit.toMillis(queueTimeout), execUnit.toMillis(execTimeout));
int dbIdCount = (int) (toDbId - fromDbId);
SearchParameters searchParams = new SearchParameters();
searchParams.addStore(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE);
searchParams.setQueryConsistency(QueryConsistency.EVENTUAL); // force Solr
searchParams.setLanguage(SearchService.LANGUAGE_FTS_ALFRESCO);
searchParams.setQuery("@" + this.formatForFts(ContentModel.PROP_NODE_DBID) + ":[" + fromDbId + " TO " + toDbId + ">");
searchParams.setMaxItems(dbIdCount);
searchParams.setBulkFetchEnabled(false);
searchParams.setIncludeMetadata(false);
// `null`: unknown; or does not exist in DB
// `true`: exists in DB and Solr
// `false`: exists in DB, but not Solr
NodeRef[] nodeRefs = new NodeRef[dbIdCount];
this.logger.trace("Querying for nodes in chunk: {}", searchParams.getQuery());
ResultSet nodes = this.searchService.query(searchParams);
this.logger.debug("Found {} of {} possible nodes in chunk: {}-{}", nodes.getNumberFound(), dbIdCount, fromDbId, toDbId);
for (NodeRef nodeRef : nodes.getNodeRefs()) {
Status nodeStatus = this.nodeService.getNodeStatus(nodeRef);
long nodeDbId = nodeStatus.getDbId();
if (nodeDbId < fromDbId || nodeDbId >= toDbId) {
this.logger.warn("An unexpected DB ID was included in the result set; ignoring: {} != [{}, {})", nodeDbId, fromDbId, toDbId);
continue;
}
int dbIdIndex = (int) (nodeDbId - fromDbId);
nodeRefs[dbIdIndex] = nodeRef;
}
CompositeFuture<Void> future = new CompositeFuture<>();
for (long _nodeDbId = fromDbId; _nodeDbId < toDbId; _nodeDbId++) {
final long nodeDbId = _nodeDbId;
this.logger.trace("Attempting to reconcile ACS node: {}", nodeDbId);
final int dbIdIndex = (int) (nodeDbId - fromDbId);
if (nodeRefs[dbIdIndex] != null) {
this.logger.trace("A node in the DB is already indexed in Solr: {}: {}", nodeDbId, nodeRefs[dbIdIndex]);
this.reconcileLogger.info("RECONCILED: {} <=> {}", nodeDbId, nodeRefs[dbIdIndex]);
callback.reconciled(nodeDbId);
continue;
}
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws InterruptedException, TimeoutException {
reconcile(nodeDbId, reindexUnreconciled, callback, execTimeout, execUnit);
return null;
}
};
if (queueTimeout < 0L) {
future.combine(this.executor.submit(callable, -1L, null));
} else {
future.combine(this.executor.submit(callable, queueTimeout, queueUnit));
}
}
return future;
}
public void reconcile(long nodeDbId,
boolean reindexUnreconciled,
ReconcileCallback callback,
long execTimeout, TimeUnit execUnit) throws InterruptedException, TimeoutException {
NodeRef nodeRef = this.nodeService.getNodeRef(nodeDbId);
if (nodeRef == null) {
this.logger.trace("No such ACS node: {}; skipping ...", nodeDbId);
return;
}
if (!StoreRef.STORE_REF_WORKSPACE_SPACESSTORE.equals(nodeRef.getStoreRef())) {
this.logger.trace("A deliberately ignored store in the DB is not indexed in Solr: {}: {}", nodeDbId, nodeRef);
return;
}
Set<QName> aspects = this.nodeService.getAspects(nodeRef);
aspects.retainAll(this.ignoreNodesWithAspects);
if (!aspects.isEmpty()) {
this.logger.trace("A deliberately ignored node in the DB is not indexed in Solr: {}: {}: {}", nodeDbId, nodeRef, aspects);
return;
}
if (!reindexUnreconciled) {
this.logger.debug("A node in the DB is not indexed in Solr: {}: {}", nodeDbId, nodeRef);
this.reconcileLogger.info("UNRECONCILED: {} <=> {}", nodeDbId, nodeRef);
callback.unreconciled(nodeDbId);
} else {
logger.debug("A node in the DB is not indexed in Solr; attempt to reindex: {}: {}", nodeDbId, nodeRef);
this.reindex(nodeDbId, nodeRef, callback, execTimeout, execUnit);
}
}
public void reindex(long nodeDbId, NodeRef nodeRef,
ReconcileCallback callback,
long execTimeout, TimeUnit execUnit) throws InterruptedException, TimeoutException {
Set<ShardInstance> syncHosts = new HashSet<>();
Set<ShardInstance> asyncHosts = new HashSet<>();
Map<ShardInstance, String> errorHosts = new HashMap<>();
ReindexCallback reindexCallback = new ReindexCallback() {
@Override
public void success(ShardInstance instance) {
reconcileLogger.info("REINDEXED: {} <=> {}", nodeDbId, nodeRef);
syncHosts.add(instance);
}
@Override
public void scheduled(ShardInstance instance) {
reconcileLogger.info("REINDEXING: {} <=> {}", nodeDbId, nodeRef);
asyncHosts.add(instance);
}
@Override
public void error(ShardInstance instance, String message) {
reconcileLogger.info("UNINDEXED: {} <=> {}", nodeDbId, nodeRef);
errorHosts.put(instance, message);
}
};
try {
if (execTimeout < 0L) {
this.reindexService.reindex(nodeDbId, reindexCallback).get();
} else {
this.reindexService.reindex(nodeDbId, reindexCallback).get(execTimeout, execUnit);
}
} catch (ExecutionException ee) {
throw new RuntimeException("An unexpected exception occurred: " + ee.getMessage(), ee);
}
if (callback != null)
callback.processed(nodeDbId, syncHosts, asyncHosts, errorHosts);
}
private String formatForFts(QName qname) {
return qname.toPrefixString(this.namespaceService).replace("-", "\\-").replace(":", "\\:");
}
}

View File

@@ -99,7 +99,7 @@ public class ApiService implements InitializingBean {
return solrSharedSecret == null ? null : new AuthorizationFilter() { return solrSharedSecret == null ? null : new AuthorizationFilter() {
@Override @Override
public void filter(ClientRequestContext requestContext) throws IOException { public void filter(ClientRequestContext requestContext) throws IOException {
logger.debug("Adding authorization headers for ASIE shared auth: {}", solrSharedSecretHeader); logger.trace("Adding authorization headers for ASIE shared auth: {}", solrSharedSecretHeader);
requestContext.getHeaders().putSingle(solrSharedSecretHeader, solrSharedSecret); requestContext.getHeaders().putSingle(solrSharedSecretHeader, solrSharedSecret);
} }
}; };

View File

@@ -0,0 +1,138 @@
package com.inteligr8.alfresco.asie.service;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.inteligr8.alfresco.asie.util.ThrottledThreadPoolExecutor;
/**
* This class manages the instantiation and shutdown of `ExecutorService`
* instances that are "rarely" used.
*
* The default implementation of each `ExecutorService` do not support 0 core
* threads unless you want undesirable effects. So they hold threads and could
* hold them for weeks without being used.
*
* We will shutdown and dereference the whole `ExecutorService` when there are
* no other hard references and after the `keepAliveTime` expires.
*/
@Component
public class ExecutorManager implements InitializingBean, DisposableBean, RemovalListener<String, ExecutorService> {
@Value("${inteligr8.asie.executors.expireTimeInMinutes:30}")
private int expireTimeInMinutes;
private Cache<String, ExecutorService> refCache;
private Cache<String, ExecutorService> expiringCache;
@Override
public void afterPropertiesSet() throws Exception {
// a weak value happens when the executor is no longer referenced
// the possible references are by the caller temporarily using and the `expiringCache` (below; so it expired)
// this keeps the pool from being shutdown after it expires if the caller is still referencing it
// ultimately, if it is cached, it will be in this cache and MAY be in the `expiringCache`.
this.refCache = CacheBuilder.newBuilder()
.initialCapacity(8)
.weakValues()
.removalListener(this)
.build();
this.expiringCache = CacheBuilder.newBuilder()
.initialCapacity(8)
.expireAfterAccess(this.expireTimeInMinutes, TimeUnit.MINUTES)
.build();
}
@Override
public void destroy() throws Exception {
this.refCache.invalidateAll();
this.refCache.cleanUp();
this.expiringCache.invalidateAll();
this.expiringCache.cleanUp();
}
@Override
public void onRemoval(RemovalNotification<String, ExecutorService> notification) {
notification.getValue().shutdown();
}
public ThrottledThreadPoolExecutor createThrottled(
String name,
int coreThreadPoolSize,
int maximumThreadPoolSize,
int maximumQueueSize,
long keepAliveTime,
TimeUnit unit) {
return this.createThrottled(name, coreThreadPoolSize, maximumThreadPoolSize, maximumQueueSize, keepAliveTime, unit, null);
}
public ThrottledThreadPoolExecutor createThrottled(
final String name,
final int coreThreadPoolSize,
final int maximumThreadPoolSize,
final int maximumQueueSize,
final long keepAliveTime,
final TimeUnit unit,
final RejectedExecutionHandler rejectedExecutionHandler) {
try {
// if it is already cached, reuse the cache; otherwise create one
final ExecutorService executor = this.refCache.get(name, new Callable<ThrottledThreadPoolExecutor>() {
@Override
public ThrottledThreadPoolExecutor call() {
ThrottledThreadPoolExecutor executor = null;
if (rejectedExecutionHandler == null) {
executor = new ThrottledThreadPoolExecutor(coreThreadPoolSize, maximumThreadPoolSize, maximumQueueSize,
keepAliveTime, unit,
name);
} else {
executor = new ThrottledThreadPoolExecutor(coreThreadPoolSize, maximumThreadPoolSize, maximumQueueSize,
keepAliveTime, unit,
name,
rejectedExecutionHandler);
}
executor.prestartAllCoreThreads();
return executor;
}
});
return (ThrottledThreadPoolExecutor) this.expiringCache.get(name, new Callable<ExecutorService>() {
@Override
public ExecutorService call() throws Exception {
return executor;
}
});
} catch (ExecutionException ee) {
throw new RuntimeException("This should never happen", ee);
}
}
public ExecutorService get(String name) {
// grab from the expiring cache first, so we can
ExecutorService executor = this.expiringCache.getIfPresent(name);
if (executor != null)
return executor;
executor = this.refCache.getIfPresent(name);
if (executor == null)
return null;
// the executor expired, but it was still referenced by the caller
// re-cache it
this.expiringCache.put(name, executor);
return executor;
}
}

View File

@@ -0,0 +1,43 @@
package com.inteligr8.alfresco.asie.service;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.api.CoreAdminApi;
import com.inteligr8.alfresco.asie.model.ActionCoreResponse;
import com.inteligr8.alfresco.asie.model.core.FixAction;
import com.inteligr8.alfresco.asie.model.core.FixRequest;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
@Component
public class FixService extends AbstractActionService {
@Override
protected String getActionName() {
return "fix";
}
@Override
protected String getThreadNamePrefix() {
return "solr-fix";
}
@Override
protected ActionCoreResponse<FixAction> execute(CoreAdminApi api, String core) {
FixRequest apiRequest = new FixRequest().withCore(core);
return api.fix(apiRequest);
}
public Future<Void> fix(ActionCallback callback) throws InterruptedException {
return super.action(callback);
}
public void fix(ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit,
long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException {
super.action(callback, fullQueueTimeout, fullQueueUnit, execTimeout, execUnit);
}
}

View File

@@ -0,0 +1,43 @@
package com.inteligr8.alfresco.asie.service;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.api.CoreAdminApi;
import com.inteligr8.alfresco.asie.model.ActionCoreResponse;
import com.inteligr8.alfresco.asie.model.core.PurgeRequest;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
import com.inteligr8.solr.model.Action;
@Component
public class PurgeService extends AbstractNodeActionService {
@Override
protected String getActionName() {
return "purge";
}
@Override
protected String getThreadNamePrefix() {
return "solr-purge";
}
@Override
protected ActionCoreResponse<Action> execute(CoreAdminApi api, String core, long nodeDbId) {
PurgeRequest apiRequest = new PurgeRequest().withCore(core).withNodeId(nodeDbId);
return api.purge(apiRequest);
}
public Future<Void> purge(long nodeDbId, ActionCallback callback) throws InterruptedException {
return super.action(nodeDbId, callback);
}
public void purge(long nodeDbId, ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit,
long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException {
super.action(nodeDbId, callback, fullQueueTimeout, fullQueueUnit, execTimeout, execUnit);
}
}

View File

@@ -0,0 +1,43 @@
package com.inteligr8.alfresco.asie.service;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.api.CoreAdminApi;
import com.inteligr8.alfresco.asie.model.ActionCoreResponse;
import com.inteligr8.alfresco.asie.model.core.ReindexRequest;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
import com.inteligr8.solr.model.Action;
@Component
public class ReindexService extends AbstractNodeActionService {
@Override
protected String getActionName() {
return "re-index";
}
@Override
protected String getThreadNamePrefix() {
return "solr-reindex";
}
@Override
protected ActionCoreResponse<Action> execute(CoreAdminApi api, String core, long nodeDbId) {
ReindexRequest apiRequest = new ReindexRequest().withCore(core).withNodeId(nodeDbId);
return api.reindex(apiRequest);
}
public Future<Void> reindex(long nodeDbId, ActionCallback callback) throws InterruptedException {
return super.action(nodeDbId, callback);
}
public void reindex(long nodeDbId, ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit,
long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException {
super.action(nodeDbId, callback, fullQueueTimeout, fullQueueUnit, execTimeout, execUnit);
}
}

View File

@@ -0,0 +1,43 @@
package com.inteligr8.alfresco.asie.service;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.api.CoreAdminApi;
import com.inteligr8.alfresco.asie.model.ActionCoreResponse;
import com.inteligr8.alfresco.asie.model.core.RetryAction;
import com.inteligr8.alfresco.asie.model.core.RetryRequest;
import com.inteligr8.alfresco.asie.spi.ActionCallback;
@Component
public class RetryService extends AbstractActionService {
@Override
protected String getActionName() {
return "retry";
}
@Override
protected String getThreadNamePrefix() {
return "solr-retry";
}
@Override
protected ActionCoreResponse<RetryAction> execute(CoreAdminApi api, String core) {
RetryRequest apiRequest = new RetryRequest().withCore(core);
return api.retry(apiRequest);
}
public Future<Void> retry(ActionCallback callback) throws InterruptedException {
return super.action(callback);
}
public void retry(ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit,
long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException {
super.action(callback, fullQueueTimeout, fullQueueUnit, execTimeout, execUnit);
}
}

View File

@@ -0,0 +1,15 @@
package com.inteligr8.alfresco.asie.spi;
import com.inteligr8.alfresco.asie.model.ShardInstance;
public interface ActionCallback {
void success(ShardInstance instance);
void scheduled(ShardInstance instance);
void error(ShardInstance instance, String message);
void unknownResult(ShardInstance instance);
}

View File

@@ -0,0 +1,19 @@
package com.inteligr8.alfresco.asie.spi;
import java.util.Map;
import java.util.Set;
import com.inteligr8.alfresco.asie.model.ShardInstance;
public interface ReconcileCallback {
void reconciled(long nodeDbId);
void unreconciled(long nodeDbId);
void processed(long nodeDbId,
Set<ShardInstance> instsReconciled,
Set<ShardInstance> instsReconciling,
Map<ShardInstance, String> instsErrorMessages);
}

View File

@@ -0,0 +1,12 @@
package com.inteligr8.alfresco.asie.spi;
import com.inteligr8.alfresco.asie.model.ShardInstance;
public interface ReindexCallback extends ActionCallback {
@Override
default void unknownResult(ShardInstance instance) {
throw new IllegalStateException();
}
}

View File

@@ -0,0 +1,150 @@
package com.inteligr8.alfresco.asie.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.tuple.MutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CompositeFuture<T> implements Future<T> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Collection<Future<T>> futures = new ConcurrentLinkedQueue<>();
public MutablePair<Integer, Integer> counts() {
MutablePair<Integer, Integer> counts = MutablePair.of(0, 0);
for (Future<T> future : this.futures) {
if (future.isDone()) {
counts.setRight(counts.getRight().intValue() + 1);
} else {
counts.setLeft(counts.getLeft().intValue() + 1);
}
}
return counts;
}
public int countIncomplete() {
return this.counts().getLeft().intValue();
}
public int countCompleted() {
return this.counts().getRight().intValue();
}
public void combine(Future<T> future) {
this.futures.add(future);
}
public void combine(Collection<Future<T>> futures) {
this.futures.addAll(futures);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = true;
for (Future<T> future : this.futures)
if (!future.cancel(mayInterruptIfRunning))
cancelled = false;
return cancelled;
}
public List<T> getList() throws InterruptedException, ExecutionException {
List<T> results = new ArrayList<>(this.futures.size());
for (Future<T> future : this.futures)
results.add(future.get());
return results;
}
@Override
public T get() throws InterruptedException, ExecutionException {
this.getList();
return null;
}
public List<T> getList(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long expireTimeMillis = System.currentTimeMillis() + unit.toMillis(timeout);
List<T> results = new ArrayList<>(this.futures.size());
for (Future<T> future : this.futures) {
if (future instanceof RunnableFuture<?>) {
this.logger.debug("Waiting {} ms since the start of the exectuion of the future to complete", unit.toMillis(timeout));
results.add(((RunnableFuture<T>) future).get(timeout, unit));
} else {
long remainingTimeMillis = expireTimeMillis - System.currentTimeMillis();
this.logger.debug("Waiting {} ms for the future to complete", remainingTimeMillis);
results.add(future.get(remainingTimeMillis, TimeUnit.MILLISECONDS));
}
}
return results;
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
this.getList(timeout, unit);
return null;
}
@Override
public boolean isCancelled() {
for (Future<T> future : this.futures)
if (!future.isCancelled())
return false;
return true;
}
@Override
public boolean isDone() {
for (Future<T> future : this.futures)
if (!future.isDone())
return false;
return true;
}
/**
* Remove any futures that are done (or cancelled).
*
* @param includeCancelled `true` to purge cancelled futures; `false` to purge only completed futures
*/
public void purge(boolean includeCancelled) {
List<CompositeFuture<?>> cfutures = new LinkedList<>();
int removedCancelled = 0;
int removedDone = 0;
Iterator<Future<T>> i = this.futures.iterator();
while (i.hasNext()) {
Future<T> future = i.next();
if (future.isCancelled()) {
if (includeCancelled) {
removedCancelled++;
i.remove();
}
} else if (future.isDone()) {
removedDone++;
i.remove();
} else if (future instanceof CompositeFuture<?>) {
cfutures.add((CompositeFuture<?>) future);
}
}
this.logger.debug("Purged {} cancelled and {} completed futures", removedCancelled, removedDone);
for (CompositeFuture<?> cfuture : cfutures)
cfuture.purge(includeCancelled);
}
}

View File

@@ -0,0 +1,43 @@
package com.inteligr8.alfresco.asie.util;
public abstract class ElapsedInterruptableRunnable extends InterruptableRunnable {
private final Object runLock = new Object();
private boolean runOnce = true;
private boolean run = false;
private Long executionStartTimeMillis;
private Long executionEndTimeMillis;
public void setRunOnce(boolean runOnce) {
this.runOnce = runOnce;
}
@Override
public final void runInterruptable() throws InterruptedException {
synchronized (this.runLock) {
if (this.runOnce && this.run)
return;
this.run = true;
}
this.executionStartTimeMillis = System.currentTimeMillis();
try {
this.runElapsed();
} finally {
this.executionEndTimeMillis = System.currentTimeMillis();
}
}
protected abstract void runElapsed() throws InterruptedException;
public Long computeElapsedExecutionMillis() {
if (this.executionEndTimeMillis != null) {
return this.executionEndTimeMillis - this.executionStartTimeMillis;
} else if (this.executionStartTimeMillis != null) {
return System.currentTimeMillis() - this.executionStartTimeMillis;
} else {
return null;
}
}
}

View File

@@ -0,0 +1,75 @@
package com.inteligr8.alfresco.asie.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class InterruptableRunnable implements Runnable {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Object threadLock = new Object();
private boolean run = false;
private Thread runThread = null;
private boolean completed = false;
private boolean interrupted = false;
@Override
public final void run() {
synchronized (this.threadLock) {
this.run = true;
this.runThread = Thread.currentThread();
}
try {
this.runInterruptable();
this.completed = true;
} catch (InterruptedException ie) {
this.logger.debug("Runnable interrupted");
this.interrupted = true;
this.interrupted(ie);
} finally {
this.runThread = null;
}
}
protected abstract void runInterruptable() throws InterruptedException;
public boolean interrupt() {
synchronized (this.threadLock) {
if (this.runThread == null)
return false;
this.logger.trace("Runnable interrupting ...");
this.runThread.interrupt();
}
return true;
}
protected void interrupted(InterruptedException ie) {
}
public boolean isQueued() {
return !this.run;
}
public boolean isInterrupted() {
return this.interrupted;
}
public boolean isCompleted() {
return this.completed;
}
public boolean isFailed() {
synchronized (this.threadLock) {
return this.run && !this.completed && !this.interrupted && this.runThread == null;
}
}
public boolean isDone() {
synchronized (this.threadLock) {
return this.run && this.runThread == null;
}
}
}

View File

@@ -0,0 +1,72 @@
package com.inteligr8.alfresco.asie.util;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RunnableFuture<T> implements Future<T> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final ThreadPoolExecutor executor;
private final WaitableRunnable runnable;
public RunnableFuture(ThreadPoolExecutor executor, WaitableRunnable runnable) {
this.executor = executor;
this.runnable = runnable;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (this.executor.remove(this.runnable)) {
this.logger.debug("Cancelled runnable by removing from queue");
return true;
} else if (mayInterruptIfRunning && this.runnable.interrupt()) {
this.logger.debug("Cancelled runnable by interrupting it");
return true;
}
return false;
}
/**
* This method will not never timeout and will only return or throw when
* the runnable is complete or interrupted.
*
* @return Always `null`.
*/
@Override
public T get() throws InterruptedException {
this.runnable.waitUntilDone();
return null;
}
/**
* This method will timeout in the specified amount of time after the
* execution commences. This will wait indefinitely as the execution is
* queued. Once it is in the executing state, the clock starts. This
* alters the default behavior of the `Future` interface.
*
* @param timeout A positive integer representing a period of time.
* @param unit The time unit of the `timeout` parameter.
* @return Always `null`.
*/
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
this.runnable.waitUntilExecutionElapsed(timeout, unit);
return null;
}
@Override
public boolean isCancelled() {
return this.runnable.isInterrupted();
}
@Override
public boolean isDone() {
return this.runnable.isDone();
}
}

View File

@@ -0,0 +1,96 @@
package com.inteligr8.alfresco.asie.util;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ThrottledThreadPoolExecutor extends ThreadPoolExecutor {
public ThrottledThreadPoolExecutor(
int coreThreadPoolSize,
int maximumThreadPoolSize,
int maximumQueueSize,
long keepAliveTime,
TimeUnit unit,
String threadNamePrefix) {
super(coreThreadPoolSize, maximumThreadPoolSize, keepAliveTime, unit,
new ArrayBlockingQueue<>(maximumQueueSize),
new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.build());
}
public ThrottledThreadPoolExecutor(
int coreThreadPoolSize,
int maximumThreadPoolSize,
int maximumQueueSize,
long keepAliveTime,
TimeUnit unit,
String threadNamePrefix,
RejectedExecutionHandler rejectedExecutionHandler) {
super(coreThreadPoolSize, maximumThreadPoolSize, keepAliveTime, unit,
new ArrayBlockingQueue<>(maximumQueueSize),
new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.build(),
rejectedExecutionHandler);
}
/**
* @param timeout Negative to not wait
*/
public <T> RunnableFuture<T> submit(Callable<T> task, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
WaitableRunnable wrunnable = new WaitableRunnable() {
@Override
protected void runWaitable() throws InterruptedException {
try {
task.call();
} catch (InterruptedException ie) {
throw ie;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
return new RunnableFuture<T>(this, this.submit(wrunnable, timeout, unit));
}
/**
* @param timeout Negative to not wait
*/
public RunnableFuture<?> submit(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
WaitableRunnable wrunnable = new WaitableRunnable() {
@Override
protected void runWaitable() {
runnable.run();
}
};
return new RunnableFuture<Void>(this, this.submit(wrunnable, timeout, unit));
}
private WaitableRunnable submit(WaitableRunnable runnable, long throttlingBlockTimeout, TimeUnit throttlingBlockUnit) throws InterruptedException, TimeoutException {
// if no core threads are running, the queue won't be monitored for runnables
this.prestartAllCoreThreads();
if (throttlingBlockTimeout < 0L) {
this.getQueue().put(runnable);
} else {
if (!this.getQueue().offer(runnable, throttlingBlockTimeout, throttlingBlockUnit))
throw new TimeoutException();
}
return runnable;
}
}

View File

@@ -0,0 +1,60 @@
package com.inteligr8.alfresco.asie.util;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public abstract class WaitableRunnable extends ElapsedInterruptableRunnable {
private final Object lock = new Object();
private boolean done = false;
@Override
public final void runElapsed() throws InterruptedException {
this.done = false;
try {
this.runWaitable();
} finally {
synchronized (this.lock) {
this.done = true;
this.lock.notifyAll();
}
}
}
protected abstract void runWaitable() throws InterruptedException;
public void waitUntilDone() throws InterruptedException {
synchronized (this.lock) {
if (!this.done && !this.isDone())
this.lock.wait();
}
}
public void waitUntilDone(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
synchronized (this.lock) {
if (!this.done && !this.isDone()) {
long waitTime = unit.toMillis(timeout);
long startTime = System.currentTimeMillis();
this.lock.wait(waitTime);
if (System.currentTimeMillis() - startTime >= waitTime)
throw new TimeoutException();
}
}
}
public void waitUntilExecutionElapsed(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
synchronized (this.lock) {
if (!this.done && !this.isDone()) {
while (this.isQueued())
Thread.sleep(50L);
Long elapsedExecutionMillis = this.computeElapsedExecutionMillis();
long waitTime = unit.toMillis(timeout) - elapsedExecutionMillis;
long startTime = System.currentTimeMillis();
this.lock.wait(waitTime);
if (System.currentTimeMillis() - startTime >= waitTime)
throw new TimeoutException();
}
}
}
}

View File

@@ -0,0 +1,46 @@
<webscript xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://bitbucket.org/!api/2.0/snippets/inteligr8/AzMgbp/80fdd26a6b3769a63cdc6b54bf1f39e378545cf7/files/snippet.txt">
<!-- Naming & Organization -->
<shortname>Fix ASIE Indexes</shortname>
<family>Inteligr8 ASIE</family>
<description><![CDATA[
<p>Issue a 'fix' command to the ASIE indexes.
This call will attempt to fix all Solr nodes.
The fix operation is asynchronous and could fail on any Solr node without notification.</p>
<p>The following response body should be expected in most cases (202 and 500 status codes):</p>
<pre>
{
"scheduled": [
"solrHostAsync:8983/solr",
...
],
"error": [
"solrHostThatFailed:8983/solr": {
"message": "string"
},
...
]
}
</pre>
<p>The following status codes should be expected:</p>
<dl>
<dt>202</dt>
<dd>Accepted</dd>
</dl>
]]></description>
<!-- Endpoint Configuration -->
<url>/inteligr8/asie/acs/fix</url>
<format default="json">any</format>
<!-- Security -->
<authentication>user</authentication>
<!-- Functionality -->
<cache>
<never>false</never>
<public>false</public>
</cache>
</webscript>

View File

@@ -0,0 +1,63 @@
<webscript xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://bitbucket.org/!api/2.0/snippets/inteligr8/AzMgbp/80fdd26a6b3769a63cdc6b54bf1f39e378545cf7/files/snippet.txt">
<!-- Naming & Organization -->
<shortname>Purge ACS Node in ASIE Indexes</shortname>
<family>Inteligr8 ASIE</family>
<description><![CDATA[
<p>Purge the specified ACS node in the ASIE indexes.
This call will attempt to purge the ACS node on all applicable Solr nodes.
The purge operation could be synchronous or asynchronous and could fail on any Solr node.
If any Solr node failed synchronously in the execution, then expect a status code of 500.
The response body will still be identical to the 200/202 status codes.
If any Solr node is executing the purge asynchronously and there are no synchronous failures, then expect a status code of 202.</p>
<p>The following path parameters are supported:</p>
<dl>
<dt>nodeId</dt>
<dd>An ACS node ID.</dd>
</dl>
<p>The following response body should be expected in most cases (200, 202, and 500 status codes):</p>
<pre>
{
"nodeDbId": number,
"success": [
"solrHostSync:8983/solr",
...
],
"scheduled": [
"solrHostAsync:8983/solr",
...
],
"error": [
"solrHostThatFailed:8983/solr": {
"message": "string"
},
...
]
}
</pre>
<p>The following status codes should be expected:</p>
<dl>
<dt>200</dt>
<dd>OK</dd>
<dt>202</dt>
<dd>Accepted</dd>
<dt>400</dt>
<dd>The path or query parameters are invalid</dd>
</dl>
]]></description>
<!-- Endpoint Configuration -->
<url>/inteligr8/asie/acs/node/{nodeId}/purge</url>
<format default="json">any</format>
<!-- Security -->
<authentication>user</authentication>
<!-- Functionality -->
<cache>
<never>false</never>
<public>false</public>
</cache>
</webscript>

View File

@@ -0,0 +1,73 @@
<webscript xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://bitbucket.org/!api/2.0/snippets/inteligr8/AzMgbp/80fdd26a6b3769a63cdc6b54bf1f39e378545cf7/files/snippet.txt">
<!-- Naming & Organization -->
<shortname>Reconcile ACS Nodes against ASIE Indexes</shortname>
<family>Inteligr8 ASIE</family>
<description><![CDATA[
<p>Reconcile the ACS node range against the ASIE indexes.
This call will loop through the specified ACS node range, reconciling each against ASIE indexes.
It will discover all unindexed nodes within the range, logging the details with logger `inteligr8.asie.reconcile`.
Optionally, an ASIE `REINDEX` command may be issued against ASIE.</p>
<p>The following path parameters are supported:</p>
<dl>
<dt>fromDbId</dt>
<dd>A DB ID integer for the starting point of a range, inclusive.</dd>
<dt>toDbId</dt>
<dd>A DB ID integer for the ending point of a range, exclusive.</dd>
</dl>
<p>The following response body should be expected in most cases (200, 202, and 500 status codes):</p>
<pre>
{
"unreconciled": [
number, // node DB ID
...
],
"success": {
"number": [ // node DB ID
"solrHostSync:8983/solr",
...
],
...
},
"scheduled": {
"number": [ // node DB ID
"solrHostAsync:8983/solr",
...
],
...
},
"error": {
"number": [ // node DB ID
"solrHostThatFailed:8983/solr": {
"message": "string"
},
...
],
...
}
}
</pre>
<p>The following status codes should be expected:</p>
<dl>
<dt>202</dt>
<dd>Accepted</dd>
<dt>400</dt>
<dd>The path or query parameters are invalid</dd>
</dl>
]]></description>
<!-- Endpoint Configuration -->
<url>/inteligr8/asie/acs/nodes/{fromDbId}/{toDbId}/reconcile?reindex={reindex?}</url>
<format default="json">any</format>
<!-- Security -->
<authentication>user</authentication>
<!-- Functionality -->
<cache>
<never>false</never>
<public>false</public>
</cache>
</webscript>

View File

@@ -0,0 +1,63 @@
<webscript xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://bitbucket.org/!api/2.0/snippets/inteligr8/AzMgbp/80fdd26a6b3769a63cdc6b54bf1f39e378545cf7/files/snippet.txt">
<!-- Naming & Organization -->
<shortname>Reindex ACS Node in ASIE Indexes</shortname>
<family>Inteligr8 ASIE</family>
<description><![CDATA[
<p>Reindex the specified ACS node in the ASIE indexes.
This call will attempt to reindex the ACS node on all applicable Solr nodes.
The reindex operation could be synchronous or asynchronous and could fail on any Solr node.
If any Solr node failed synchronously in the execution, then expect a status code of 500.
The response body will still be identical to the 200/202 status codes.
If any Solr node is executing the reindex asynchronously and there are no synchronous failures, then expect a status code of 202.</p>
<p>The following path parameters are supported:</p>
<dl>
<dt>nodeId</dt>
<dd>An ACS node ID.</dd>
</dl>
<p>The following response body should be expected in most cases (200, 202, and 500 status codes):</p>
<pre>
{
"nodeDbId": number,
"success": [
"solrHostSync:8983/solr",
...
],
"scheduled": [
"solrHostAsync:8983/solr",
...
],
"error": [
"solrHostThatFailed:8983/solr": {
"message": "string"
},
...
]
}
</pre>
<p>The following status codes should be expected:</p>
<dl>
<dt>200</dt>
<dd>OK</dd>
<dt>202</dt>
<dd>Accepted</dd>
<dt>400</dt>
<dd>The path or query parameters are invalid</dd>
</dl>
]]></description>
<!-- Endpoint Configuration -->
<url>/inteligr8/asie/acs/node/{nodeId}/reindex</url>
<format default="json">any</format>
<!-- Security -->
<authentication>user</authentication>
<!-- Functionality -->
<cache>
<never>false</never>
<public>false</public>
</cache>
</webscript>

View File

@@ -0,0 +1,46 @@
<webscript xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://bitbucket.org/!api/2.0/snippets/inteligr8/AzMgbp/80fdd26a6b3769a63cdc6b54bf1f39e378545cf7/files/snippet.txt">
<!-- Naming & Organization -->
<shortname>Retry ASIE Indexes</shortname>
<family>Inteligr8 ASIE</family>
<description><![CDATA[
<p>Issue a 'retry' command to the ASIE indexes.
This call will attempt to retry all remembered failed ACS nodes on Solr nodes.
The retry operation is asynchronous and could fail on any Solr node without notification.</p>
<p>The following response body should be expected in most cases (202 and 500 status codes):</p>
<pre>
{
"scheduled": [
"solrHostAsync:8983/solr",
...
],
"error": [
"solrHostThatFailed:8983/solr": {
"message": "string"
},
...
]
}
</pre>
<p>The following status codes should be expected:</p>
<dl>
<dt>202</dt>
<dd>Accepted</dd>
</dl>
]]></description>
<!-- Endpoint Configuration -->
<url>/inteligr8/asie/acs/retry</url>
<format default="json">any</format>
<!-- Security -->
<authentication>user</authentication>
<!-- Functionality -->
<cache>
<never>false</never>
<public>false</public>
</cache>
</webscript>

View File

@@ -1,3 +1,6 @@
logger.inteligr8-asie.name=com.inteligr8.alfresco.asie logger.inteligr8-asie.name=com.inteligr8.alfresco.asie
logger.inteligr8-asie.level=INFO logger.inteligr8-asie.level=INFO
logger.reconcile.name=inteligr8.asie.reconcile
logger.reconcile.level=INFO

View File

@@ -0,0 +1,92 @@
package com.inteligr8.alfresco.asie.util;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CompositeFutureUnitTest {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Test
public void test() throws InterruptedException, ExecutionException, TimeoutException {
ExecutorService executor = Executors.newWorkStealingPool(3);
CompositeFuture<Void> futures = new CompositeFuture<Void>();
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(100L)));
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(200L)));
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(300L)));
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(400L)));
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(500L)));
Thread.sleep(50L);
Assert.assertEquals(Pair.of(5, 0), futures.counts());
Thread.sleep(100L);
Assert.assertEquals(Pair.of(4, 1), futures.counts());
Thread.sleep(100L);
Assert.assertEquals(Pair.of(3, 2), futures.counts());
Thread.sleep(100L);
Assert.assertEquals(Pair.of(2, 3), futures.counts());
Thread.sleep(100L);
Assert.assertEquals(Pair.of(2, 3), futures.counts());
Thread.sleep(100L);
Assert.assertEquals(Pair.of(1, 4), futures.counts());
Thread.sleep(200L);
Assert.assertEquals(Pair.of(0, 5), futures.counts());
long startTime = System.currentTimeMillis();
futures.get();
Assert.assertTrue(System.currentTimeMillis() - startTime < 5);
startTime = System.currentTimeMillis();
futures.get(10L, TimeUnit.SECONDS);
Assert.assertTrue(System.currentTimeMillis() - startTime < 5);
}
private class Pausable implements Callable<Void> {
private static final MutableInt SEQUENCE = new MutableInt(1);
private final int seq;
private final long pauseInMillis;
public Pausable(long pauseInMillis) {
synchronized (SEQUENCE) {
this.seq = SEQUENCE.getAndIncrement();
}
this.pauseInMillis = pauseInMillis;
}
@Override
public Void call() throws InterruptedException {
logger.trace("Thread started execution: " + Thread.currentThread().getName() + ": " + this.seq);
Thread.sleep(this.pauseInMillis);
logger.trace("Thread ending execution: " + Thread.currentThread().getName() + ": " + this.seq);
return null;
}
}
}

View File

@@ -0,0 +1,146 @@
package com.inteligr8.alfresco.asie.util;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ThrottledThreadPoolExecutorUnitTest {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Test(expected = RejectedExecutionException.class)
public void testHardRejection() throws InterruptedException, ExecutionException, TimeoutException {
ThrottledThreadPoolExecutor executor = new ThrottledThreadPoolExecutor(1, 1, 1, 10L, TimeUnit.SECONDS, "not-throttled");
try {
long startTime = System.currentTimeMillis();
CompositeFuture<Void> futures = new CompositeFuture<Void>();
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(100L)));
// confirm there is no blocking as the pool is filled
this.assertElapsed(startTime, 0L, 50L);
Assert.assertEquals(Pair.of(1, 0), futures.counts());
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(200L)));
// confirm there is no blocking as the queue is filled
this.assertElapsed(startTime, 0L, 50L);
Assert.assertEquals(Pair.of(2, 0), futures.counts());
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(300L)));
} finally {
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(2L, TimeUnit.SECONDS));
}
}
@Test
public void testBlocking() throws InterruptedException, ExecutionException, TimeoutException {
ThrottledThreadPoolExecutor executor = new ThrottledThreadPoolExecutor(2, 2, 2, 10L, TimeUnit.SECONDS, "throttled");
try {
long startTime = System.currentTimeMillis();
CompositeFuture<Void> futures = new CompositeFuture<Void>();
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(100L), 1L, TimeUnit.SECONDS));
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(200L), 1L, TimeUnit.SECONDS));
// confirm there is no blocking as the pool is filled
this.assertElapsed(startTime, 0L, 50L);
Assert.assertEquals(Pair.of(2, 0), futures.counts());
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(300L), 1L, TimeUnit.SECONDS));
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(400L), 1L, TimeUnit.SECONDS));
// confirm there is no blocking as the queue is filled
this.assertElapsed(startTime, 0L, 50L);
Assert.assertEquals(Pair.of(4, 0), futures.counts());
this.logger.trace("Execution submitting");
futures.combine(executor.submit(new Pausable(500L), 10L, TimeUnit.SECONDS));
// confirm there is blocking as the pool and queue are full
this.assertElapsed(startTime, 100L, 150L);
Assert.assertEquals(Pair.of(4, 1), futures.counts());
executor.shutdown();
Thread.sleep(150L); // 250ms
Assert.assertEquals(Pair.of(3, 2), futures.counts());
Thread.sleep(100L); // 350ms
Assert.assertEquals(Pair.of(3, 2), futures.counts());
Thread.sleep(100L); // 450ms
Assert.assertEquals(Pair.of(2, 3), futures.counts());
Thread.sleep(100L); // 550ms
Assert.assertEquals(Pair.of(2, 3), futures.counts());
Thread.sleep(100L); // 650ms
Assert.assertEquals(Pair.of(1, 4), futures.counts());
Thread.sleep(200L); // 850ms
Assert.assertEquals(Pair.of(1, 4), futures.counts());
Thread.sleep(100L); // 950 ms
Assert.assertEquals(Pair.of(0, 5), futures.counts());
startTime = System.currentTimeMillis();
futures.get();
this.assertElapsed(startTime, 0L, 25L);
startTime = System.currentTimeMillis();
futures.get(10L, TimeUnit.SECONDS);
this.assertElapsed(startTime, 0L, 25L);
} finally {
executor.awaitTermination(2L, TimeUnit.SECONDS);
}
}
private void assertElapsed(long startTime, long minRangeInclusive, long maxRangeExclusive) {
long elapsedTime = System.currentTimeMillis() - startTime;
Assert.assertTrue("The execution time was shorter than expected: " + elapsedTime + " ms < " + minRangeInclusive + " ms", elapsedTime >= minRangeInclusive);
Assert.assertTrue("The execution time was longer than expected: " + elapsedTime + " ms >= " + maxRangeExclusive + " ms", elapsedTime < maxRangeExclusive);
}
private class Pausable implements Callable<Void> {
private static final MutableInt SEQUENCE = new MutableInt(1);
private final int seq;
private final long pauseInMillis;
public Pausable(long pauseInMillis) {
synchronized (SEQUENCE) {
this.seq = SEQUENCE.getAndIncrement();
}
this.pauseInMillis = pauseInMillis;
}
@Override
public Void call() throws InterruptedException {
logger.trace("Thread started execution: " + Thread.currentThread().getName() + ": " + this.seq);
Thread.sleep(this.pauseInMillis);
logger.trace("Thread ending execution: " + Thread.currentThread().getName() + ": " + this.seq);
return null;
}
}
}

View File

@@ -0,0 +1,10 @@
rootLogger.level=trace
rootLogger.appenderRef.stdout.ref=STDOUT
logger.this.name=com.inteligr8.alfresco.asie
logger.this.level=trace
appender.stdout.type=Console
appender.stdout.name=STDOUT
appender.stdout.layout.type=PatternLayout
appender.stdout.layout.pattern=%d{ABSOLUTE_MICROS} %level{length=1} %c{1}: %m%n