From 493f1f813d3a7f129282bdb513f7c766646c27c6 Mon Sep 17 00:00:00 2001 From: Brian Long Date: Wed, 5 Mar 2025 13:28:41 -0500 Subject: [PATCH] added reconcile/reindex/retry/fix/purge services --- enterprise-module/pom.xml | 2 +- .../rest/AbstractUnregisterNodeWebScript.java | 2 +- shared/pom.xml | 9 +- .../rest/AbstractAcsNodeActionWebScript.java | 119 +++++++ .../asie/rest/AbstractActionWebScript.java | 102 ++++++ .../alfresco/asie/rest/FixWebScript.java | 24 ++ .../asie/rest/PurgeAcsNodeWebScript.java | 24 ++ .../asie/rest/ReconcileAcsNodesWebScript.java | 128 ++++++++ .../asie/rest/ReindexAcsNodeWebScript.java | 24 ++ .../alfresco/asie/rest/RetryWebScript.java | 24 ++ .../asie/service/AbstractActionService.java | 236 +++++++++++++ .../service/AbstractNodeActionService.java | 268 +++++++++++++++ .../asie/service/AcsReconcileService.java | 309 ++++++++++++++++++ .../alfresco/asie/service/ApiService.java | 2 +- .../asie/service/ExecutorManager.java | 138 ++++++++ .../alfresco/asie/service/FixService.java | 43 +++ .../alfresco/asie/service/PurgeService.java | 43 +++ .../alfresco/asie/service/ReindexService.java | 43 +++ .../alfresco/asie/service/RetryService.java | 43 +++ .../alfresco/asie/spi/ActionCallback.java | 15 + .../alfresco/asie/spi/ReconcileCallback.java | 19 ++ .../alfresco/asie/spi/ReindexCallback.java | 12 + .../alfresco/asie/util/CompositeFuture.java | 150 +++++++++ .../util/ElapsedInterruptableRunnable.java | 43 +++ .../asie/util/InterruptableRunnable.java | 75 +++++ .../alfresco/asie/util/RunnableFuture.java | 72 ++++ .../util/ThrottledThreadPoolExecutor.java | 96 ++++++ .../alfresco/asie/util/WaitableRunnable.java | 60 ++++ .../inteligr8/alfresco/asie/fix.post.desc.xml | 46 +++ .../alfresco/asie/purgeAcsNode.put.desc.xml | 63 ++++ .../asie/reconcileAcsNodes.post.desc.xml | 73 +++++ .../alfresco/asie/reindexAcsNode.put.desc.xml | 63 ++++ .../alfresco/asie/retry.post.desc.xml | 46 +++ .../log4j2.properties | 3 + .../asie/util/CompositeFutureUnitTest.java | 92 ++++++ .../ThrottledThreadPoolExecutorUnitTest.java | 146 +++++++++ .../src/test/resources/log4j2-test.properties | 10 + 37 files changed, 2662 insertions(+), 5 deletions(-) create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/rest/AbstractAcsNodeActionWebScript.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/rest/AbstractActionWebScript.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/rest/FixWebScript.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/rest/PurgeAcsNodeWebScript.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/rest/ReconcileAcsNodesWebScript.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/rest/ReindexAcsNodeWebScript.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/rest/RetryWebScript.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/service/AbstractActionService.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/service/AbstractNodeActionService.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/service/AcsReconcileService.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/service/ExecutorManager.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/service/FixService.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/service/PurgeService.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/service/ReindexService.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/service/RetryService.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/spi/ActionCallback.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/spi/ReconcileCallback.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/spi/ReindexCallback.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/util/CompositeFuture.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/util/ElapsedInterruptableRunnable.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/util/InterruptableRunnable.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/util/RunnableFuture.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/util/ThrottledThreadPoolExecutor.java create mode 100644 shared/src/main/java/com/inteligr8/alfresco/asie/util/WaitableRunnable.java create mode 100644 shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/fix.post.desc.xml create mode 100644 shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/purgeAcsNode.put.desc.xml create mode 100644 shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/reconcileAcsNodes.post.desc.xml create mode 100644 shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/reindexAcsNode.put.desc.xml create mode 100644 shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/retry.post.desc.xml create mode 100644 shared/src/test/java/com/inteligr8/alfresco/asie/util/CompositeFutureUnitTest.java create mode 100644 shared/src/test/java/com/inteligr8/alfresco/asie/util/ThrottledThreadPoolExecutorUnitTest.java create mode 100644 shared/src/test/resources/log4j2-test.properties diff --git a/enterprise-module/pom.xml b/enterprise-module/pom.xml index 58085c4..796bffe 100644 --- a/enterprise-module/pom.xml +++ b/enterprise-module/pom.xml @@ -83,7 +83,7 @@ com.inteligr8.alfresco cxf-jaxrs-platform-module - 1.3.1-acs-v23.3 + 1.3.2-acs-v23.3 amp diff --git a/enterprise-module/src/main/java/com/inteligr8/alfresco/asie/enterprise/rest/AbstractUnregisterNodeWebScript.java b/enterprise-module/src/main/java/com/inteligr8/alfresco/asie/enterprise/rest/AbstractUnregisterNodeWebScript.java index 7824f2c..cef7d4c 100755 --- a/enterprise-module/src/main/java/com/inteligr8/alfresco/asie/enterprise/rest/AbstractUnregisterNodeWebScript.java +++ b/enterprise-module/src/main/java/com/inteligr8/alfresco/asie/enterprise/rest/AbstractUnregisterNodeWebScript.java @@ -91,7 +91,7 @@ public abstract class AbstractUnregisterNodeWebScript com.inteligr8.alfresco asie-api - 1.0-SNAPSHOT-asie2 + 1.1-SNAPSHOT-asie2 com.inteligr8 common-rest-client - 3.0.1-cxf + 3.0.3-cxf + + org.alfresco + alfresco-data-model + provided + org.alfresco alfresco-repository diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/rest/AbstractAcsNodeActionWebScript.java b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/AbstractAcsNodeActionWebScript.java new file mode 100644 index 0000000..e333913 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/AbstractAcsNodeActionWebScript.java @@ -0,0 +1,119 @@ +package com.inteligr8.alfresco.asie.rest; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.alfresco.model.ContentModel; +import org.alfresco.service.cmr.repository.InvalidNodeRefException; +import org.alfresco.service.cmr.repository.NodeRef; +import org.alfresco.service.cmr.repository.NodeService; +import org.alfresco.service.cmr.repository.StoreRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.extensions.webscripts.WebScriptException; +import org.springframework.extensions.webscripts.WebScriptRequest; +import org.springframework.extensions.webscripts.WebScriptResponse; +import org.springframework.http.HttpStatus; + +import com.inteligr8.alfresco.asie.model.ShardInstance; +import com.inteligr8.alfresco.asie.spi.ActionCallback; + +public abstract class AbstractAcsNodeActionWebScript extends AbstractAsieWebScript { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private NodeService nodeService; + + @Override + public void executeAuthorized(WebScriptRequest request, WebScriptResponse response) throws IOException { + String nodeId = request.getServiceMatch().getTemplateVars().get("nodeId"); + NodeRef nodeRef = new NodeRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, nodeId); + long nodeDbId = this.findNodeDbId(nodeRef); + this.logger.trace("Found node database ID: {}: {}", nodeId, nodeDbId); + + try { + Map responseMap = new HashMap<>(); + responseMap.put("nodeDbId", nodeDbId); + + ActionCallback callback = new ActionCallback() { + + @Override + public void success(ShardInstance instance) { + @SuppressWarnings("unchecked") + List instances = (List) responseMap.get("success"); + if (instances == null) + responseMap.put("success", instances = new LinkedList<>()); + instances.add(instance.getSpec()); + } + + @Override + public void scheduled(ShardInstance instance) { + @SuppressWarnings("unchecked") + List instances = (List) responseMap.get("scheduled"); + if (instances == null) + responseMap.put("scheduled", instances = new LinkedList<>()); + instances.add(instance.getSpec()); + } + + @Override + public void error(ShardInstance instance, String message) { + @SuppressWarnings("unchecked") + Map instances = (Map) responseMap.get("error"); + if (instances == null) + responseMap.put("error", instances = new HashMap<>()); + instances.put(instance.getSpec(), Collections.singletonMap("message", message)); + } + + @Override + public void unknownResult(ShardInstance instance) { + @SuppressWarnings("unchecked") + List instances = (List) responseMap.get("unknown"); + if (instances == null) + responseMap.put("unknown", instances = new LinkedList<>()); + instances.add(instance.getSpec()); + } + }; + + this.executeAction(nodeDbId, callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS); + + if (responseMap.containsKey("error")) { + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value()); + } else if (responseMap.containsKey("scheduled")) { + response.setStatus(HttpStatus.ACCEPTED.value()); + } else { + response.setStatus(HttpStatus.OK.value()); + } + + response.setContentType("application/json"); + this.getObjectMapper().writeValue(response.getWriter(), responseMap); + } catch (UnsupportedOperationException uoe) { + throw new WebScriptException(HttpStatus.NOT_IMPLEMENTED.value(), uoe.getMessage(), uoe); + } catch (InterruptedException ie) { + throw new WebScriptException(HttpStatus.SERVICE_UNAVAILABLE.value(), "The execution was interrupted", ie); + } catch (TimeoutException te) { + throw new WebScriptException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "The execution may continue, but timed-out waiting", te); + } + } + + protected abstract void executeAction( + long nodeDbId, ActionCallback callback, + long fullQueueTimeout, TimeUnit fullQueueUnit, + long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException; + + private long findNodeDbId(NodeRef nodeRef) { + try { + return (Long) this.nodeService.getProperty(nodeRef, ContentModel.PROP_NODE_DBID); + } catch (InvalidNodeRefException inre) { + throw new WebScriptException(HttpStatus.NOT_FOUND.value(), "The node does not exist"); + } + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/rest/AbstractActionWebScript.java b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/AbstractActionWebScript.java new file mode 100644 index 0000000..dd2be85 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/AbstractActionWebScript.java @@ -0,0 +1,102 @@ +package com.inteligr8.alfresco.asie.rest; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.alfresco.model.ContentModel; +import org.alfresco.service.cmr.repository.InvalidNodeRefException; +import org.alfresco.service.cmr.repository.NodeRef; +import org.alfresco.service.cmr.repository.NodeService; +import org.alfresco.service.cmr.repository.StoreRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.extensions.webscripts.WebScriptException; +import org.springframework.extensions.webscripts.WebScriptRequest; +import org.springframework.extensions.webscripts.WebScriptResponse; +import org.springframework.http.HttpStatus; + +import com.inteligr8.alfresco.asie.model.ShardInstance; +import com.inteligr8.alfresco.asie.spi.ActionCallback; + +public abstract class AbstractActionWebScript extends AbstractAsieWebScript { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Override + public void executeAuthorized(WebScriptRequest request, WebScriptResponse response) throws IOException { + try { + Map responseMap = new HashMap<>(); + + ActionCallback callback = new ActionCallback() { + + @Override + public void success(ShardInstance instance) { + @SuppressWarnings("unchecked") + List instances = (List) responseMap.get("success"); + if (instances == null) + responseMap.put("success", instances = new LinkedList<>()); + instances.add(instance.getSpec()); + } + + @Override + public void scheduled(ShardInstance instance) { + @SuppressWarnings("unchecked") + List instances = (List) responseMap.get("scheduled"); + if (instances == null) + responseMap.put("scheduled", instances = new LinkedList<>()); + instances.add(instance.getSpec()); + } + + @Override + public void error(ShardInstance instance, String message) { + @SuppressWarnings("unchecked") + Map instances = (Map) responseMap.get("error"); + if (instances == null) + responseMap.put("error", instances = new HashMap<>()); + instances.put(instance.getSpec(), Collections.singletonMap("message", message)); + } + + @Override + public void unknownResult(ShardInstance instance) { + @SuppressWarnings("unchecked") + List instances = (List) responseMap.get("unknown"); + if (instances == null) + responseMap.put("unknown", instances = new LinkedList<>()); + instances.add(instance.getSpec()); + } + }; + + this.executeAction(callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS); + + if (responseMap.containsKey("error")) { + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value()); + } else if (responseMap.containsKey("scheduled")) { + response.setStatus(HttpStatus.ACCEPTED.value()); + } else { + response.setStatus(HttpStatus.OK.value()); + } + + response.setContentType("application/json"); + this.getObjectMapper().writeValue(response.getWriter(), responseMap); + } catch (UnsupportedOperationException uoe) { + throw new WebScriptException(HttpStatus.NOT_IMPLEMENTED.value(), uoe.getMessage(), uoe); + } catch (InterruptedException ie) { + throw new WebScriptException(HttpStatus.SERVICE_UNAVAILABLE.value(), "The execution was interrupted", ie); + } catch (TimeoutException te) { + throw new WebScriptException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "The execution may continue, but timed-out waiting", te); + } + } + + protected abstract void executeAction( + ActionCallback callback, + long fullQueueTimeout, TimeUnit fullQueueUnit, + long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException; + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/rest/FixWebScript.java b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/FixWebScript.java new file mode 100644 index 0000000..a4bd437 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/FixWebScript.java @@ -0,0 +1,24 @@ +package com.inteligr8.alfresco.asie.rest; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.inteligr8.alfresco.asie.service.FixService; +import com.inteligr8.alfresco.asie.spi.ActionCallback; + +@Component(value = "webscript.com.inteligr8.alfresco.asie.fix.post") +public class FixWebScript extends AbstractActionWebScript { + + @Autowired + private FixService fixSerivce; + + @Override + protected void executeAction(ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, + long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException { + this.fixSerivce.fix(callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/rest/PurgeAcsNodeWebScript.java b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/PurgeAcsNodeWebScript.java new file mode 100644 index 0000000..504b593 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/PurgeAcsNodeWebScript.java @@ -0,0 +1,24 @@ +package com.inteligr8.alfresco.asie.rest; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.inteligr8.alfresco.asie.service.PurgeService; +import com.inteligr8.alfresco.asie.spi.ActionCallback; + +@Component(value = "webscript.com.inteligr8.alfresco.asie.purgeAcsNode.put") +public class PurgeAcsNodeWebScript extends AbstractAcsNodeActionWebScript { + + @Autowired + private PurgeService purgeSerivce; + + @Override + protected void executeAction(long nodeDbId, ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, + long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException { + this.purgeSerivce.purge(nodeDbId, callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/rest/ReconcileAcsNodesWebScript.java b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/ReconcileAcsNodesWebScript.java new file mode 100644 index 0000000..a7e5a46 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/ReconcileAcsNodesWebScript.java @@ -0,0 +1,128 @@ +package com.inteligr8.alfresco.asie.rest; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.extensions.webscripts.WebScriptException; +import org.springframework.extensions.webscripts.WebScriptRequest; +import org.springframework.extensions.webscripts.WebScriptResponse; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; + +import com.inteligr8.alfresco.asie.model.ShardInstance; +import com.inteligr8.alfresco.asie.service.AcsReconcileService; +import com.inteligr8.alfresco.asie.spi.ReconcileCallback; + +@Component(value = "webscript.com.inteligr8.alfresco.asie.reconcileAcsNodes.post") +public class ReconcileAcsNodesWebScript extends AbstractAsieWebScript { + + @Autowired + private AcsReconcileService reconcileService; + + @Override + public void executeAuthorized(WebScriptRequest request, WebScriptResponse response) throws IOException { + final int fromDbId = this.getRequestTemplateIntegerVariable(request, "fromDbId"); + final int toDbId = this.getRequestTemplateIntegerVariable(request, "toDbId"); + final boolean reindex = Boolean.TRUE.equals(this.getOptionalQueryParameter(request, "reindex", Boolean.class)); + final boolean includeReconciled = Boolean.TRUE.equals(this.getOptionalQueryParameter(request, "includeReconciled", Boolean.class)); + + final Map responseMap = new HashMap<>(); + + ReconcileCallback callback = new ReconcileCallback() { + + @Override + public void reconciled(long nodeDbId) { + if (includeReconciled) { + @SuppressWarnings("unchecked") + List unreconciledNodeDbIds = (List) responseMap.get("reconciled"); + if (unreconciledNodeDbIds == null) + responseMap.put("reconciled", unreconciledNodeDbIds = new LinkedList<>()); + unreconciledNodeDbIds.add(nodeDbId); + } + } + + @Override + public void unreconciled(long nodeDbId) { + @SuppressWarnings("unchecked") + List unreconciledNodeDbIds = (List) responseMap.get("unreconciled"); + if (unreconciledNodeDbIds == null) + responseMap.put("unreconciled", unreconciledNodeDbIds = new LinkedList<>()); + unreconciledNodeDbIds.add(nodeDbId); + } + + @Override + public void processed(long nodeDbId, Set instsReconciled, Set instsReconciling, + Map instsErrorMessages) { + if (!instsReconciled.isEmpty()) { + @SuppressWarnings("unchecked") + Map> nodeHosts = (Map>) responseMap.get("success"); + if (nodeHosts == null) + responseMap.put("success", nodeHosts = new HashMap<>()); + + List instances = new LinkedList<>(); + for (ShardInstance instance : instsReconciled) + instances.add(instance.getSpec()); + nodeHosts.put(nodeDbId, instances); + } + + if (!instsReconciling.isEmpty()) { + @SuppressWarnings("unchecked") + Map> nodeHosts = (Map>) responseMap.get("scheduled"); + if (nodeHosts == null) + responseMap.put("scheduled", nodeHosts = new HashMap<>()); + + List instances = new LinkedList<>(); + for (ShardInstance instance : instsReconciled) + instances.add(instance.getSpec()); + nodeHosts.put(nodeDbId, instances); + } + + if (!instsErrorMessages.isEmpty()) { + @SuppressWarnings("unchecked") + Map>> nodeHosts = (Map>>) responseMap.get("error"); + if (nodeHosts == null) + responseMap.put("error", nodeHosts = new HashMap<>()); + + Map> nodeHost = new HashMap<>(); + for (Entry message : instsErrorMessages.entrySet()) + nodeHost.put(message.getKey().getSpec(), Collections.singletonMap("message", message.getValue())); + nodeHosts.put(nodeDbId, nodeHost); + } + } + }; + + try { + this.reconcileService.reconcile(fromDbId, toDbId, null, reindex, callback, 1L, TimeUnit.HOURS, 2L, TimeUnit.MINUTES); + + if (responseMap.containsKey("error")) { + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value()); + } else if (responseMap.containsKey("scheduled")) { + response.setStatus(HttpStatus.ACCEPTED.value()); + } else { + response.setStatus(HttpStatus.OK.value()); + } + + response.setContentType("application/json"); + this.getObjectMapper().writeValue(response.getWriter(), responseMap); + } catch (InterruptedException ie) { + throw new WebScriptException(HttpStatus.SERVICE_UNAVAILABLE.value(), "The reindex was interrupted", ie); + } catch (TimeoutException te) { + throw new WebScriptException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "The reindex may continue, but timed-out waiting", te); + } + } + + private int getRequestTemplateIntegerVariable(WebScriptRequest request, String templateVariableName) { + String str = request.getServiceMatch().getTemplateVars().get(templateVariableName); + return Integer.valueOf(str); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/rest/ReindexAcsNodeWebScript.java b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/ReindexAcsNodeWebScript.java new file mode 100644 index 0000000..ed764fa --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/ReindexAcsNodeWebScript.java @@ -0,0 +1,24 @@ +package com.inteligr8.alfresco.asie.rest; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.inteligr8.alfresco.asie.service.ReindexService; +import com.inteligr8.alfresco.asie.spi.ActionCallback; + +@Component(value = "webscript.com.inteligr8.alfresco.asie.reindexAcsNode.put") +public class ReindexAcsNodeWebScript extends AbstractAcsNodeActionWebScript { + + @Autowired + private ReindexService reindexSerivce; + + @Override + protected void executeAction(long nodeDbId, ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, + long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException { + this.reindexSerivce.reindex(nodeDbId, callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/rest/RetryWebScript.java b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/RetryWebScript.java new file mode 100644 index 0000000..1ab65e9 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/rest/RetryWebScript.java @@ -0,0 +1,24 @@ +package com.inteligr8.alfresco.asie.rest; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.inteligr8.alfresco.asie.service.RetryService; +import com.inteligr8.alfresco.asie.spi.ActionCallback; + +@Component(value = "webscript.com.inteligr8.alfresco.asie.retry.post") +public class RetryWebScript extends AbstractActionWebScript { + + @Autowired + private RetryService retrySerivce; + + @Override + protected void executeAction(ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, + long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException { + this.retrySerivce.retry(callback, 10L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/service/AbstractActionService.java b/shared/src/main/java/com/inteligr8/alfresco/asie/service/AbstractActionService.java new file mode 100644 index 0000000..e3bea63 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/service/AbstractActionService.java @@ -0,0 +1,236 @@ +package com.inteligr8.alfresco.asie.service; + +import java.net.URL; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.alfresco.model.ContentModel; +import org.alfresco.repo.index.shard.Floc; +import org.alfresco.repo.index.shard.Shard; +import org.alfresco.repo.index.shard.ShardInstance; +import org.alfresco.repo.index.shard.ShardRegistry; +import org.alfresco.repo.index.shard.ShardState; +import org.alfresco.service.cmr.repository.StoreRef; +import org.alfresco.service.cmr.search.SearchParameters; +import org.alfresco.service.cmr.search.SearchService; +import org.alfresco.service.namespace.NamespaceService; +import org.alfresco.service.namespace.QName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; + +import com.inteligr8.alfresco.asie.Constants; +import com.inteligr8.alfresco.asie.api.CoreAdminApi; +import com.inteligr8.alfresco.asie.model.ActionCoreResponse; +import com.inteligr8.alfresco.asie.model.ShardSet; +import com.inteligr8.alfresco.asie.model.SolrHost; +import com.inteligr8.alfresco.asie.spi.ActionCallback; +import com.inteligr8.alfresco.asie.util.CompositeFuture; +import com.inteligr8.alfresco.asie.util.ThrottledThreadPoolExecutor; +import com.inteligr8.solr.model.Action; +import com.inteligr8.solr.model.ActionResponse; +import com.inteligr8.solr.model.BaseResponse; + +public abstract class AbstractActionService { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private NamespaceService namespaceService; + + @Autowired + private ApiService apiService; + + @Autowired + private ExecutorManager executorManager; + + @Autowired(required = false) + @Qualifier(Constants.QUALIFIER_ASIE) + private ShardRegistry shardRegistry; + + @Value("${inteligr8.asie.default.concurrentQueueSize:64}") + private int concurrentQueueSize; + + @Value("${inteligr8.asie.default.concurrency:16}") + private int concurrency; + + protected int getConcurrency() { + return this.concurrency; + } + + protected int getConcurrentQueueSize() { + return this.concurrentQueueSize; + } + + protected abstract String getThreadNamePrefix(); + + protected abstract String getActionName(); + + /** + * This method executes an action on the specified node in Solr using its + * ACS unique database identifier. The callback handles all the return + * values. This is the synchronous alternative to the other `action` + * method. + * + * There are two sets of parameters regarding timeouts. The queue timeouts + * are for how long the requesting thread should wait for a full queue to + * open up space for new executions. The execution timeouts are for how + * long the execution should be allowed to take once dequeued. There is no + * timeout for how long the execution is queued. + * + * @param callback A callback to process multiple returned values from the action. + * @param fullQueueTimeout A timeout for how long the calling thread should wait for space on the queue. + * @param fullQueueUnit The time units for the `queueTimeout`. + * @param execTimeout A timeout for the elapsed time the execution should take when dequeued. + * @param execUnit The time units for the `execTimeout`. + * @throws TimeoutException Either the queue or execution timeout lapsed. + * @throws InterruptedException The execution was interrupted (server shutdown). + */ + protected void action(ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException { + long fullQueueExpireTimeMillis = System.currentTimeMillis() + fullQueueUnit.toMillis(fullQueueTimeout); + Future future = this._action(callback, fullQueueExpireTimeMillis); + try { + future.get(execTimeout, execUnit); + } catch (ExecutionException ee) { + this.logger.error("Reindex thread failed: " + ee.getMessage(), ee); + } + } + + /** + * This method executes an action on the specified node in Solr using its + * ACS unique database identifier. The callback handles all the return + * values. This is the asynchronous alternative to the other `action` + * method. + * + * This method may block indefinitely when the queue is full. Once all + * executions are queued, this will return a single future representing all + * the executions. + * + * @param callback A callback to process multiple returned values from the execution. + * @return A reference to the future or active executing task. + * @throws InterruptedException The execution was interrupted (server shutdown). + */ + protected Future action(ActionCallback callback) throws InterruptedException { + try { + return this._action(callback, null); + } catch (TimeoutException te) { + throw new RuntimeException("This should never happen: " + te.getMessage(), te); + } + } + + private Future _action(ActionCallback callback, Long fullQueueExpireTimeMillis) throws TimeoutException, InterruptedException { + List eligibleInstances = this.findPossibleShardInstances(); + this.logger.debug("Will attempt to {} {} shard instances", this.getActionName(), eligibleInstances.size()); + + CompositeFuture future = new CompositeFuture<>(); + + ThrottledThreadPoolExecutor executor = this.executorManager.createThrottled( + this.getThreadNamePrefix(), + this.getConcurrency(), this.getConcurrency(), this.getConcurrentQueueSize(), + 1L, TimeUnit.MINUTES); + + for (final com.inteligr8.alfresco.asie.model.ShardInstance instance : eligibleInstances) { + this.logger.trace("Will attempt to {} shard instance: {}", this.getActionName(), instance); + + Callable callable = new Callable<>() { + @Override + public Void call() { + String core = instance.extractShard().getCoreName(); + SolrHost host = instance.extractNode(); + URL url = host.toUrl(apiService.isSecure() ? "https" : "http"); + CoreAdminApi api = apiService.createApi(url.toString(), CoreAdminApi.class); + + try { + logger.debug("Performing {} of shard instance: {}", getActionName(), instance); + BaseResponse apiResponse = execute(api, core); + logger.trace("Performed {} of shard instance: {}", getActionName(), instance); + + Action action = null; + if (apiResponse instanceof ActionCoreResponse) { + action = ((ActionCoreResponse) apiResponse).getCores().getByCore(core); + } else if (apiResponse instanceof ActionResponse) { + action = ((ActionResponse) apiResponse).getAction(); + } + + if (action == null) { + callback.unknownResult(instance); + } else { + switch (action.getStatus()) { + case Scheduled: + callback.scheduled(instance); + break; + case Success: + callback.success(instance); + break; + default: + if (apiResponse instanceof com.inteligr8.alfresco.asie.model.BaseResponse) { + com.inteligr8.alfresco.asie.model.BaseResponse asieResponse = (com.inteligr8.alfresco.asie.model.BaseResponse) apiResponse; + logger.debug("Performance of {} of shard instance failed: {}: {}", getActionName(), instance, asieResponse.getException()); + callback.error(instance, asieResponse.getException()); + } else { + logger.debug("Performance of {} of shard instance failed: {}: {}", getActionName(), instance, apiResponse.getResponseHeader().getStatus()); + callback.error(instance, String.valueOf(apiResponse.getResponseHeader().getStatus())); + } + } + } + } catch (Exception e) { + logger.error("An exception occurred", e); + callback.error(instance, e.getMessage()); + } + + return null; + } + }; + + if (fullQueueExpireTimeMillis == null) { + future.combine(executor.submit(callable, -1L, null)); + } else { + future.combine(executor.submit(callable, fullQueueExpireTimeMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); + } + } + + return future; + } + + protected abstract BaseResponse execute(CoreAdminApi api, String core); + + private List findPossibleShardInstances() { + if (this.shardRegistry == null) + throw new UnsupportedOperationException("ACS instances without a sharding configuration are not yet implemented"); + + List instances = new LinkedList<>(); + + for (Entry>> floc : this.shardRegistry.getFlocs().entrySet()) { + if (!floc.getKey().getStoreRefs().contains(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE)) + continue; + for (Entry> shard : floc.getValue().entrySet()) { + for (ShardState shardState : shard.getValue()) + instances.add(this.toModel(shardState.getShardInstance(), shardState)); + } + } + + this.logger.trace("Despite sharding, considering all shards and nodes: {}", instances); + + return instances; + } + + private com.inteligr8.alfresco.asie.model.ShardInstance toModel(ShardInstance instance, ShardState anyShardState) { + Floc floc = instance.getShard().getFloc(); + + ShardSet shardSet = ShardSet.from(floc, anyShardState); + SolrHost host = SolrHost.from(instance); + com.inteligr8.alfresco.asie.model.Shard shard = com.inteligr8.alfresco.asie.model.Shard.from(shardSet, instance.getShard().getInstance()); + return com.inteligr8.alfresco.asie.model.ShardInstance.from(shard, host); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/service/AbstractNodeActionService.java b/shared/src/main/java/com/inteligr8/alfresco/asie/service/AbstractNodeActionService.java new file mode 100644 index 0000000..8beadc7 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/service/AbstractNodeActionService.java @@ -0,0 +1,268 @@ +package com.inteligr8.alfresco.asie.service; + +import java.net.URL; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.alfresco.model.ContentModel; +import org.alfresco.repo.index.shard.Floc; +import org.alfresco.repo.index.shard.Shard; +import org.alfresco.repo.index.shard.ShardInstance; +import org.alfresco.repo.index.shard.ShardRegistry; +import org.alfresco.repo.index.shard.ShardState; +import org.alfresco.service.cmr.repository.StoreRef; +import org.alfresco.service.cmr.search.SearchParameters; +import org.alfresco.service.cmr.search.SearchService; +import org.alfresco.service.namespace.NamespaceService; +import org.alfresco.service.namespace.QName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; + +import com.inteligr8.alfresco.asie.Constants; +import com.inteligr8.alfresco.asie.api.CoreAdminApi; +import com.inteligr8.alfresco.asie.model.ActionCoreResponse; +import com.inteligr8.alfresco.asie.model.ShardSet; +import com.inteligr8.alfresco.asie.model.SolrHost; +import com.inteligr8.alfresco.asie.spi.ActionCallback; +import com.inteligr8.alfresco.asie.util.CompositeFuture; +import com.inteligr8.alfresco.asie.util.ThrottledThreadPoolExecutor; +import com.inteligr8.solr.model.Action; +import com.inteligr8.solr.model.ActionResponse; +import com.inteligr8.solr.model.BaseResponse; + +public abstract class AbstractNodeActionService { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private NamespaceService namespaceService; + + @Autowired + private ApiService apiService; + + @Autowired + private ExecutorManager executorManager; + + @Autowired(required = false) + @Qualifier(Constants.QUALIFIER_ASIE) + private ShardRegistry shardRegistry; + + @Value("${inteligr8.asie.default.concurrentQueueSize:64}") + private int concurrentQueueSize; + + @Value("${inteligr8.asie.default.concurrency:16}") + private int concurrency; + + protected int getConcurrency() { + return this.concurrency; + } + + protected int getConcurrentQueueSize() { + return this.concurrentQueueSize; + } + + protected abstract String getThreadNamePrefix(); + + protected abstract String getActionName(); + + /** + * This method executes an action on the specified node in Solr using its + * ACS unique database identifier. The callback handles all the return + * values. This is the synchronous alternative to the other `action` + * method. + * + * There are two sets of parameters regarding timeouts. The queue timeouts + * are for how long the requesting thread should wait for a full queue to + * open up space for new executions. The execution timeouts are for how + * long the execution should be allowed to take once dequeued. There is no + * timeout for how long the execution is queued. + * + * @param nodeDbId A node database ID. + * @param callback A callback to process multiple returned values from the action. + * @param fullQueueTimeout A timeout for how long the calling thread should wait for space on the queue. + * @param fullQueueUnit The time units for the `queueTimeout`. + * @param execTimeout A timeout for the elapsed time the execution should take when dequeued. + * @param execUnit The time units for the `execTimeout`. + * @throws TimeoutException Either the queue or execution timeout lapsed. + * @throws InterruptedException The execution was interrupted (server shutdown). + */ + protected void action(long nodeDbId, ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException { + long fullQueueExpireTimeMillis = System.currentTimeMillis() + fullQueueUnit.toMillis(fullQueueTimeout); + Future future = this._action(nodeDbId, callback, fullQueueExpireTimeMillis); + try { + future.get(execTimeout, execUnit); + } catch (ExecutionException ee) { + this.logger.error("Reindex thread failed: " + ee.getMessage(), ee); + } + } + + /** + * This method executes an action on the specified node in Solr using its + * ACS unique database identifier. The callback handles all the return + * values. This is the asynchronous alternative to the other `action` + * method. + * + * This method may block indefinitely when the queue is full. Once all + * executions are queued, this will return a single future representing all + * the executions. + * + * @param nodeDbId A node database ID. + * @param callback A callback to process multiple returned values from the execution. + * @return A reference to the future or active executing task. + * @throws InterruptedException The execution was interrupted (server shutdown). + */ + protected Future action(long nodeDbId, ActionCallback callback) throws InterruptedException { + try { + return this._action(nodeDbId, callback, null); + } catch (TimeoutException te) { + throw new RuntimeException("This should never happen: " + te.getMessage(), te); + } + } + + private Future _action(long nodeDbId, ActionCallback callback, Long fullQueueExpireTimeMillis) throws TimeoutException, InterruptedException { + List eligibleInstances = this.findPossibleShardInstances(nodeDbId); + this.logger.debug("Will attempt to {} ACS node against {} shard instances: {}", this.getActionName(), eligibleInstances.size(), nodeDbId); + + CompositeFuture future = new CompositeFuture<>(); + + ThrottledThreadPoolExecutor executor = this.executorManager.createThrottled( + this.getThreadNamePrefix(), + this.getConcurrency(), this.getConcurrency(), this.getConcurrentQueueSize(), + 1L, TimeUnit.MINUTES); + + for (final com.inteligr8.alfresco.asie.model.ShardInstance instance : eligibleInstances) { + this.logger.trace("Will attempt to {} ACS node against shard instance: {}: {}", this.getActionName(), nodeDbId, instance); + + Callable callable = new Callable<>() { + @Override + public Void call() { + String core = instance.extractShard().getCoreName(); + SolrHost host = instance.extractNode(); + URL url = host.toUrl(apiService.isSecure() ? "https" : "http"); + CoreAdminApi api = apiService.createApi(url.toString(), CoreAdminApi.class); + + try { + logger.debug("Performing {} of ACS node against shard instance: {}: {}", getActionName(), nodeDbId, instance); + BaseResponse apiResponse = execute(api, core, nodeDbId); + logger.trace("Performed {} of ACS node against shard instance: {}: {}", getActionName(), nodeDbId, instance); + + Action action = null; + if (apiResponse instanceof ActionCoreResponse) { + action = ((ActionCoreResponse) apiResponse).getCores().getByCore(core); + } else if (apiResponse instanceof ActionResponse) { + action = ((ActionResponse) apiResponse).getAction(); + } + + if (action == null) { + callback.unknownResult(instance); + } else { + switch (action.getStatus()) { + case Scheduled: + callback.scheduled(instance); + break; + case Success: + callback.success(instance); + break; + default: + if (apiResponse instanceof com.inteligr8.alfresco.asie.model.BaseResponse) { + com.inteligr8.alfresco.asie.model.BaseResponse asieResponse = (com.inteligr8.alfresco.asie.model.BaseResponse) apiResponse; + logger.debug("Performance of {} of ACS node against shard instance failed: {}: {}: {}", getActionName(), nodeDbId, instance, asieResponse.getException()); + callback.error(instance, asieResponse.getException()); + } else { + logger.debug("Performance of {} of ACS node against shard instance failed: {}: {}: {}", getActionName(), nodeDbId, instance, apiResponse.getResponseHeader().getStatus()); + callback.error(instance, String.valueOf(apiResponse.getResponseHeader().getStatus())); + } + } + } + } catch (Exception e) { + logger.error("An exception occurred", e); + callback.error(instance, e.getMessage()); + } + + return null; + } + }; + + if (fullQueueExpireTimeMillis == null) { + future.combine(executor.submit(callable, -1L, null)); + } else { + future.combine(executor.submit(callable, fullQueueExpireTimeMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); + } + } + + return future; + } + + protected abstract BaseResponse execute(CoreAdminApi api, String core, long nodeDbId); + + private List findPossibleShardInstances(long nodeDbId) { + if (this.shardRegistry == null) + throw new UnsupportedOperationException("ACS instances without a sharding configuration are not yet implemented"); + + SearchParameters searchParams = new SearchParameters(); + searchParams.setLanguage(SearchService.LANGUAGE_FTS_ALFRESCO); + searchParams.setQuery("@" + this.formatForFts(ContentModel.PROP_NODE_DBID) + ":" + nodeDbId); + + List instances = new LinkedList<>(); + + List slicedInstances = this.shardRegistry.getIndexSlice(searchParams); + if (slicedInstances != null) { + this.logger.trace("Due to a sharding method, considering only applicable shards and their ASIE nodes: {}: {}", nodeDbId, slicedInstances); + + for (ShardInstance instance : slicedInstances) + instances.add(this.toModel(instance)); + } else { + for (Entry>> floc : this.shardRegistry.getFlocs().entrySet()) { + if (!floc.getKey().getStoreRefs().contains(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE)) + continue; + for (Entry> shard : floc.getValue().entrySet()) { + for (ShardState shardState : shard.getValue()) + instances.add(this.toModel(shardState.getShardInstance(), shardState)); + } + } + + this.logger.trace("Despite sharding, considering all shards and nodes: {}: {}", nodeDbId, instances); + } + + return instances; + } + + private com.inteligr8.alfresco.asie.model.ShardInstance toModel(ShardInstance instance) { + // get any random shardState + Floc floc = instance.getShard().getFloc(); + Map> shardsStates = this.shardRegistry.getFlocs().get(floc); + if (shardsStates == null) + throw new IllegalStateException(); + Set shardStates = shardsStates.get(instance.getShard()); + if (shardStates == null || shardStates.isEmpty()) + throw new IllegalStateException(); + ShardState anyShardState = shardStates.iterator().next(); + + return this.toModel(instance, anyShardState); + } + + private com.inteligr8.alfresco.asie.model.ShardInstance toModel(ShardInstance instance, ShardState anyShardState) { + Floc floc = instance.getShard().getFloc(); + + ShardSet shardSet = ShardSet.from(floc, anyShardState); + SolrHost host = SolrHost.from(instance); + com.inteligr8.alfresco.asie.model.Shard shard = com.inteligr8.alfresco.asie.model.Shard.from(shardSet, instance.getShard().getInstance()); + return com.inteligr8.alfresco.asie.model.ShardInstance.from(shard, host); + } + + private String formatForFts(QName qname) { + return qname.toPrefixString(this.namespaceService).replace("-", "\\-").replace(":", "\\:"); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/service/AcsReconcileService.java b/shared/src/main/java/com/inteligr8/alfresco/asie/service/AcsReconcileService.java new file mode 100644 index 0000000..048f373 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/service/AcsReconcileService.java @@ -0,0 +1,309 @@ +package com.inteligr8.alfresco.asie.service; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.alfresco.model.ContentModel; +import org.alfresco.model.RenditionModel; +import org.alfresco.service.cmr.repository.NodeRef; +import org.alfresco.service.cmr.repository.NodeRef.Status; +import org.alfresco.service.cmr.repository.NodeService; +import org.alfresco.service.cmr.repository.StoreRef; +import org.alfresco.service.cmr.search.QueryConsistency; +import org.alfresco.service.cmr.search.ResultSet; +import org.alfresco.service.cmr.search.SearchParameters; +import org.alfresco.service.cmr.search.SearchService; +import org.alfresco.service.namespace.NamespaceService; +import org.alfresco.service.namespace.QName; +import org.apache.commons.collections4.SetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import com.inteligr8.alfresco.asie.model.ShardInstance; +import com.inteligr8.alfresco.asie.spi.ReconcileCallback; +import com.inteligr8.alfresco.asie.spi.ReindexCallback; +import com.inteligr8.alfresco.asie.util.CompositeFuture; +import com.inteligr8.alfresco.asie.util.ThrottledThreadPoolExecutor; + +@Component +public class AcsReconcileService implements InitializingBean, DisposableBean { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Logger reconcileLogger = LoggerFactory.getLogger("inteligr8.asie.reconcile"); + private final Set ignoreNodesWithAspects = SetUtils.unmodifiableSet( + RenditionModel.ASPECT_RENDITION, + RenditionModel.ASPECT_RENDITION2); + + @Autowired + private NodeService nodeService; + + @Autowired + private NamespaceService namespaceService; + + @Autowired + private SearchService searchService; + + @Autowired + private ReindexService reindexService; + + @Value("${inteligr8.asie.reconciliation.nodesChunkSize:250}") + private int nodesChunkSize; + + @Value("${inteligr8.asie.reconciliation.nodeTimeoutSeconds:10}") + private int nodeTimeoutSeconds; + + @Value("${inteligr8.asie.reconciliation.concurrentQueueSize:64}") + private int concurrentQueueSize; + + @Value("${inteligr8.asie.reconciliation.concurrency:2}") + private int concurrency; + + private ThrottledThreadPoolExecutor executor; + + @Override + public void afterPropertiesSet() { + this.executor = new ThrottledThreadPoolExecutor(this.concurrency, this.concurrency, this.concurrentQueueSize, 1L, TimeUnit.MINUTES, "solr-reconcile"); + this.executor.prestartAllCoreThreads(); + } + + @Override + public void destroy() { + this.executor.shutdown(); + } + + /** + * This method reconciles the specified node range between ACS and Solr. + * The node range is specified using the ACS unique database identifiers. + * There is no other reasonably efficient attack vector. The callback + * handles all the return values. This is the synchronous alternative to + * the other `reconcile` method. + * + * There are two sets of parameters regarding timeouts. The queue timeouts + * are for how long the requesting thread should wait for a full queue to + * open up space for new re-index executions. The execution timeouts are + * for how long the execution should be allowed to take once dequeued. + * There is no timeout for how long the execution is queued. + * + * @param fromDbId A node database ID, inclusive. + * @param toDbId A node database ID, exclusive. + * @param reindexUnreconciled For nodes not found in Solr, attempt to re-index against all applicable Solr instances. + * @param callback A callback to process multiple returned values from the re-index. + * @param queueTimeout A timeout for how long the calling thread should wait for space on the queue. + * @param queueUnit The time units for the `queueTimeout`. + * @param execTimeout A timeout for the elapsed time the reindex execution should take when dequeued. + * @param execUnit The time units for the `execTimeout`. + * @throws TimeoutException Either the queue or execution timeout lapsed. + * @throws InterruptedException The re-index was interrupted (server shutdown). + */ + public void reconcile( + long fromDbId, long toDbId, Integer nodesChunkSize, + boolean reindexUnreconciled, + ReconcileCallback callback, + long queueTimeout, TimeUnit queueUnit, + long execTimeout, TimeUnit execUnit) throws InterruptedException, TimeoutException { + if (nodesChunkSize == null) + nodesChunkSize = this.nodesChunkSize; + if (this.logger.isTraceEnabled()) + this.logger.trace("reconcile({}, {}, {}, {}, {}, {})", fromDbId, toDbId, nodesChunkSize, reindexUnreconciled, queueUnit.toMillis(queueTimeout), execUnit.toMillis(execTimeout)); + + CompositeFuture future = new CompositeFuture<>(); + + for (long startDbId = fromDbId; startDbId < toDbId; startDbId += nodesChunkSize) { + long endDbId = Math.min(toDbId, startDbId + nodesChunkSize); + future.combine(this.reconcileChunk(startDbId, endDbId, reindexUnreconciled, callback, queueTimeout, queueUnit, execTimeout, execUnit)); + future.purge(true); + } + + try { + future.get(execTimeout, execUnit); + } catch (ExecutionException ee) { + this.logger.error("Reconciliation thread failed: " + ee.getMessage(), ee); + } + } + + public Future reconcile( + long fromDbId, long toDbId, Integer nodesChunkSize, + boolean reindexUnreconciled, + ReconcileCallback callback) throws InterruptedException { + if (nodesChunkSize == null) + nodesChunkSize = this.nodesChunkSize; + this.logger.trace("reconcile({}, {}, {}, {})", fromDbId, toDbId, nodesChunkSize, reindexUnreconciled); + + CompositeFuture future = new CompositeFuture<>(); + + try { + for (long startDbId = fromDbId; startDbId < toDbId; startDbId += nodesChunkSize) { + long endDbId = Math.min(toDbId, startDbId + nodesChunkSize); + future.combine(this.reconcileChunk(startDbId, endDbId, reindexUnreconciled, callback, -1L, null, -1L, null)); + future.purge(true); + } + } catch (TimeoutException te) { + throw new RuntimeException("This should never happen: " + te.getMessage(), te); + } + + return future; + } + + protected Future reconcileChunk( + long fromDbId, long toDbId, + boolean reindexUnreconciled, + ReconcileCallback callback, + long queueTimeout, TimeUnit queueUnit, + long execTimeout, TimeUnit execUnit) throws InterruptedException, TimeoutException { + if (this.logger.isTraceEnabled()) + this.logger.trace("reconcileChunk({}, {}, {}, {}, {})", fromDbId, toDbId, reindexUnreconciled, queueUnit.toMillis(queueTimeout), execUnit.toMillis(execTimeout)); + + int dbIdCount = (int) (toDbId - fromDbId); + + SearchParameters searchParams = new SearchParameters(); + searchParams.addStore(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE); + searchParams.setQueryConsistency(QueryConsistency.EVENTUAL); // force Solr + searchParams.setLanguage(SearchService.LANGUAGE_FTS_ALFRESCO); + searchParams.setQuery("@" + this.formatForFts(ContentModel.PROP_NODE_DBID) + ":[" + fromDbId + " TO " + toDbId + ">"); + searchParams.setMaxItems(dbIdCount); + searchParams.setBulkFetchEnabled(false); + searchParams.setIncludeMetadata(false); + + // `null`: unknown; or does not exist in DB + // `true`: exists in DB and Solr + // `false`: exists in DB, but not Solr + NodeRef[] nodeRefs = new NodeRef[dbIdCount]; + + this.logger.trace("Querying for nodes in chunk: {}", searchParams.getQuery()); + ResultSet nodes = this.searchService.query(searchParams); + this.logger.debug("Found {} of {} possible nodes in chunk: {}-{}", nodes.getNumberFound(), dbIdCount, fromDbId, toDbId); + for (NodeRef nodeRef : nodes.getNodeRefs()) { + Status nodeStatus = this.nodeService.getNodeStatus(nodeRef); + long nodeDbId = nodeStatus.getDbId(); + if (nodeDbId < fromDbId || nodeDbId >= toDbId) { + this.logger.warn("An unexpected DB ID was included in the result set; ignoring: {} != [{}, {})", nodeDbId, fromDbId, toDbId); + continue; + } + + int dbIdIndex = (int) (nodeDbId - fromDbId); + nodeRefs[dbIdIndex] = nodeRef; + } + + CompositeFuture future = new CompositeFuture<>(); + + for (long _nodeDbId = fromDbId; _nodeDbId < toDbId; _nodeDbId++) { + final long nodeDbId = _nodeDbId; + this.logger.trace("Attempting to reconcile ACS node: {}", nodeDbId); + + final int dbIdIndex = (int) (nodeDbId - fromDbId); + if (nodeRefs[dbIdIndex] != null) { + this.logger.trace("A node in the DB is already indexed in Solr: {}: {}", nodeDbId, nodeRefs[dbIdIndex]); + this.reconcileLogger.info("RECONCILED: {} <=> {}", nodeDbId, nodeRefs[dbIdIndex]); + callback.reconciled(nodeDbId); + continue; + } + + Callable callable = new Callable() { + @Override + public Void call() throws InterruptedException, TimeoutException { + reconcile(nodeDbId, reindexUnreconciled, callback, execTimeout, execUnit); + return null; + } + }; + + if (queueTimeout < 0L) { + future.combine(this.executor.submit(callable, -1L, null)); + } else { + future.combine(this.executor.submit(callable, queueTimeout, queueUnit)); + } + } + + return future; + } + + public void reconcile(long nodeDbId, + boolean reindexUnreconciled, + ReconcileCallback callback, + long execTimeout, TimeUnit execUnit) throws InterruptedException, TimeoutException { + NodeRef nodeRef = this.nodeService.getNodeRef(nodeDbId); + if (nodeRef == null) { + this.logger.trace("No such ACS node: {}; skipping ...", nodeDbId); + return; + } + + if (!StoreRef.STORE_REF_WORKSPACE_SPACESSTORE.equals(nodeRef.getStoreRef())) { + this.logger.trace("A deliberately ignored store in the DB is not indexed in Solr: {}: {}", nodeDbId, nodeRef); + return; + } + + Set aspects = this.nodeService.getAspects(nodeRef); + aspects.retainAll(this.ignoreNodesWithAspects); + if (!aspects.isEmpty()) { + this.logger.trace("A deliberately ignored node in the DB is not indexed in Solr: {}: {}: {}", nodeDbId, nodeRef, aspects); + return; + } + + if (!reindexUnreconciled) { + this.logger.debug("A node in the DB is not indexed in Solr: {}: {}", nodeDbId, nodeRef); + this.reconcileLogger.info("UNRECONCILED: {} <=> {}", nodeDbId, nodeRef); + callback.unreconciled(nodeDbId); + } else { + logger.debug("A node in the DB is not indexed in Solr; attempt to reindex: {}: {}", nodeDbId, nodeRef); + this.reindex(nodeDbId, nodeRef, callback, execTimeout, execUnit); + } + } + + public void reindex(long nodeDbId, NodeRef nodeRef, + ReconcileCallback callback, + long execTimeout, TimeUnit execUnit) throws InterruptedException, TimeoutException { + Set syncHosts = new HashSet<>(); + Set asyncHosts = new HashSet<>(); + Map errorHosts = new HashMap<>(); + + ReindexCallback reindexCallback = new ReindexCallback() { + + @Override + public void success(ShardInstance instance) { + reconcileLogger.info("REINDEXED: {} <=> {}", nodeDbId, nodeRef); + syncHosts.add(instance); + } + + @Override + public void scheduled(ShardInstance instance) { + reconcileLogger.info("REINDEXING: {} <=> {}", nodeDbId, nodeRef); + asyncHosts.add(instance); + } + + @Override + public void error(ShardInstance instance, String message) { + reconcileLogger.info("UNINDEXED: {} <=> {}", nodeDbId, nodeRef); + errorHosts.put(instance, message); + } + }; + + try { + if (execTimeout < 0L) { + this.reindexService.reindex(nodeDbId, reindexCallback).get(); + } else { + this.reindexService.reindex(nodeDbId, reindexCallback).get(execTimeout, execUnit); + } + } catch (ExecutionException ee) { + throw new RuntimeException("An unexpected exception occurred: " + ee.getMessage(), ee); + } + + if (callback != null) + callback.processed(nodeDbId, syncHosts, asyncHosts, errorHosts); + } + + private String formatForFts(QName qname) { + return qname.toPrefixString(this.namespaceService).replace("-", "\\-").replace(":", "\\:"); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/service/ApiService.java b/shared/src/main/java/com/inteligr8/alfresco/asie/service/ApiService.java index 305beed..f4703f3 100644 --- a/shared/src/main/java/com/inteligr8/alfresco/asie/service/ApiService.java +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/service/ApiService.java @@ -99,7 +99,7 @@ public class ApiService implements InitializingBean { return solrSharedSecret == null ? null : new AuthorizationFilter() { @Override public void filter(ClientRequestContext requestContext) throws IOException { - logger.debug("Adding authorization headers for ASIE shared auth: {}", solrSharedSecretHeader); + logger.trace("Adding authorization headers for ASIE shared auth: {}", solrSharedSecretHeader); requestContext.getHeaders().putSingle(solrSharedSecretHeader, solrSharedSecret); } }; diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/service/ExecutorManager.java b/shared/src/main/java/com/inteligr8/alfresco/asie/service/ExecutorManager.java new file mode 100644 index 0000000..d3f09c4 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/service/ExecutorManager.java @@ -0,0 +1,138 @@ +package com.inteligr8.alfresco.asie.service; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.TimeUnit; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.inteligr8.alfresco.asie.util.ThrottledThreadPoolExecutor; + +/** + * This class manages the instantiation and shutdown of `ExecutorService` + * instances that are "rarely" used. + * + * The default implementation of each `ExecutorService` do not support 0 core + * threads unless you want undesirable effects. So they hold threads and could + * hold them for weeks without being used. + * + * We will shutdown and dereference the whole `ExecutorService` when there are + * no other hard references and after the `keepAliveTime` expires. + */ +@Component +public class ExecutorManager implements InitializingBean, DisposableBean, RemovalListener { + + @Value("${inteligr8.asie.executors.expireTimeInMinutes:30}") + private int expireTimeInMinutes; + + private Cache refCache; + private Cache expiringCache; + + @Override + public void afterPropertiesSet() throws Exception { + // a weak value happens when the executor is no longer referenced + // the possible references are by the caller temporarily using and the `expiringCache` (below; so it expired) + // this keeps the pool from being shutdown after it expires if the caller is still referencing it + // ultimately, if it is cached, it will be in this cache and MAY be in the `expiringCache`. + this.refCache = CacheBuilder.newBuilder() + .initialCapacity(8) + .weakValues() + .removalListener(this) + .build(); + + this.expiringCache = CacheBuilder.newBuilder() + .initialCapacity(8) + .expireAfterAccess(this.expireTimeInMinutes, TimeUnit.MINUTES) + .build(); + } + + @Override + public void destroy() throws Exception { + this.refCache.invalidateAll(); + this.refCache.cleanUp(); + this.expiringCache.invalidateAll(); + this.expiringCache.cleanUp(); + } + + @Override + public void onRemoval(RemovalNotification notification) { + notification.getValue().shutdown(); + } + + public ThrottledThreadPoolExecutor createThrottled( + String name, + int coreThreadPoolSize, + int maximumThreadPoolSize, + int maximumQueueSize, + long keepAliveTime, + TimeUnit unit) { + return this.createThrottled(name, coreThreadPoolSize, maximumThreadPoolSize, maximumQueueSize, keepAliveTime, unit, null); + } + + public ThrottledThreadPoolExecutor createThrottled( + final String name, + final int coreThreadPoolSize, + final int maximumThreadPoolSize, + final int maximumQueueSize, + final long keepAliveTime, + final TimeUnit unit, + final RejectedExecutionHandler rejectedExecutionHandler) { + try { + // if it is already cached, reuse the cache; otherwise create one + final ExecutorService executor = this.refCache.get(name, new Callable() { + @Override + public ThrottledThreadPoolExecutor call() { + ThrottledThreadPoolExecutor executor = null; + if (rejectedExecutionHandler == null) { + executor = new ThrottledThreadPoolExecutor(coreThreadPoolSize, maximumThreadPoolSize, maximumQueueSize, + keepAliveTime, unit, + name); + } else { + executor = new ThrottledThreadPoolExecutor(coreThreadPoolSize, maximumThreadPoolSize, maximumQueueSize, + keepAliveTime, unit, + name, + rejectedExecutionHandler); + } + + executor.prestartAllCoreThreads(); + return executor; + } + }); + + return (ThrottledThreadPoolExecutor) this.expiringCache.get(name, new Callable() { + @Override + public ExecutorService call() throws Exception { + return executor; + } + }); + } catch (ExecutionException ee) { + throw new RuntimeException("This should never happen", ee); + } + } + + public ExecutorService get(String name) { + // grab from the expiring cache first, so we can + ExecutorService executor = this.expiringCache.getIfPresent(name); + if (executor != null) + return executor; + + executor = this.refCache.getIfPresent(name); + if (executor == null) + return null; + + // the executor expired, but it was still referenced by the caller + // re-cache it + this.expiringCache.put(name, executor); + return executor; + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/service/FixService.java b/shared/src/main/java/com/inteligr8/alfresco/asie/service/FixService.java new file mode 100644 index 0000000..01e47e2 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/service/FixService.java @@ -0,0 +1,43 @@ +package com.inteligr8.alfresco.asie.service; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.stereotype.Component; + +import com.inteligr8.alfresco.asie.api.CoreAdminApi; +import com.inteligr8.alfresco.asie.model.ActionCoreResponse; +import com.inteligr8.alfresco.asie.model.core.FixAction; +import com.inteligr8.alfresco.asie.model.core.FixRequest; +import com.inteligr8.alfresco.asie.spi.ActionCallback; + +@Component +public class FixService extends AbstractActionService { + + @Override + protected String getActionName() { + return "fix"; + } + + @Override + protected String getThreadNamePrefix() { + return "solr-fix"; + } + + @Override + protected ActionCoreResponse execute(CoreAdminApi api, String core) { + FixRequest apiRequest = new FixRequest().withCore(core); + return api.fix(apiRequest); + } + + public Future fix(ActionCallback callback) throws InterruptedException { + return super.action(callback); + } + + public void fix(ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, + long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException { + super.action(callback, fullQueueTimeout, fullQueueUnit, execTimeout, execUnit); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/service/PurgeService.java b/shared/src/main/java/com/inteligr8/alfresco/asie/service/PurgeService.java new file mode 100644 index 0000000..ca7cecb --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/service/PurgeService.java @@ -0,0 +1,43 @@ +package com.inteligr8.alfresco.asie.service; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.stereotype.Component; + +import com.inteligr8.alfresco.asie.api.CoreAdminApi; +import com.inteligr8.alfresco.asie.model.ActionCoreResponse; +import com.inteligr8.alfresco.asie.model.core.PurgeRequest; +import com.inteligr8.alfresco.asie.spi.ActionCallback; +import com.inteligr8.solr.model.Action; + +@Component +public class PurgeService extends AbstractNodeActionService { + + @Override + protected String getActionName() { + return "purge"; + } + + @Override + protected String getThreadNamePrefix() { + return "solr-purge"; + } + + @Override + protected ActionCoreResponse execute(CoreAdminApi api, String core, long nodeDbId) { + PurgeRequest apiRequest = new PurgeRequest().withCore(core).withNodeId(nodeDbId); + return api.purge(apiRequest); + } + + public Future purge(long nodeDbId, ActionCallback callback) throws InterruptedException { + return super.action(nodeDbId, callback); + } + + public void purge(long nodeDbId, ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, + long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException { + super.action(nodeDbId, callback, fullQueueTimeout, fullQueueUnit, execTimeout, execUnit); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/service/ReindexService.java b/shared/src/main/java/com/inteligr8/alfresco/asie/service/ReindexService.java new file mode 100644 index 0000000..ff8374f --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/service/ReindexService.java @@ -0,0 +1,43 @@ +package com.inteligr8.alfresco.asie.service; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.stereotype.Component; + +import com.inteligr8.alfresco.asie.api.CoreAdminApi; +import com.inteligr8.alfresco.asie.model.ActionCoreResponse; +import com.inteligr8.alfresco.asie.model.core.ReindexRequest; +import com.inteligr8.alfresco.asie.spi.ActionCallback; +import com.inteligr8.solr.model.Action; + +@Component +public class ReindexService extends AbstractNodeActionService { + + @Override + protected String getActionName() { + return "re-index"; + } + + @Override + protected String getThreadNamePrefix() { + return "solr-reindex"; + } + + @Override + protected ActionCoreResponse execute(CoreAdminApi api, String core, long nodeDbId) { + ReindexRequest apiRequest = new ReindexRequest().withCore(core).withNodeId(nodeDbId); + return api.reindex(apiRequest); + } + + public Future reindex(long nodeDbId, ActionCallback callback) throws InterruptedException { + return super.action(nodeDbId, callback); + } + + public void reindex(long nodeDbId, ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, + long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException { + super.action(nodeDbId, callback, fullQueueTimeout, fullQueueUnit, execTimeout, execUnit); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/service/RetryService.java b/shared/src/main/java/com/inteligr8/alfresco/asie/service/RetryService.java new file mode 100644 index 0000000..3edee85 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/service/RetryService.java @@ -0,0 +1,43 @@ +package com.inteligr8.alfresco.asie.service; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.stereotype.Component; + +import com.inteligr8.alfresco.asie.api.CoreAdminApi; +import com.inteligr8.alfresco.asie.model.ActionCoreResponse; +import com.inteligr8.alfresco.asie.model.core.RetryAction; +import com.inteligr8.alfresco.asie.model.core.RetryRequest; +import com.inteligr8.alfresco.asie.spi.ActionCallback; + +@Component +public class RetryService extends AbstractActionService { + + @Override + protected String getActionName() { + return "retry"; + } + + @Override + protected String getThreadNamePrefix() { + return "solr-retry"; + } + + @Override + protected ActionCoreResponse execute(CoreAdminApi api, String core) { + RetryRequest apiRequest = new RetryRequest().withCore(core); + return api.retry(apiRequest); + } + + public Future retry(ActionCallback callback) throws InterruptedException { + return super.action(callback); + } + + public void retry(ActionCallback callback, long fullQueueTimeout, TimeUnit fullQueueUnit, + long execTimeout, TimeUnit execUnit) throws TimeoutException, InterruptedException { + super.action(callback, fullQueueTimeout, fullQueueUnit, execTimeout, execUnit); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/spi/ActionCallback.java b/shared/src/main/java/com/inteligr8/alfresco/asie/spi/ActionCallback.java new file mode 100644 index 0000000..52d1049 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/spi/ActionCallback.java @@ -0,0 +1,15 @@ +package com.inteligr8.alfresco.asie.spi; + +import com.inteligr8.alfresco.asie.model.ShardInstance; + +public interface ActionCallback { + + void success(ShardInstance instance); + + void scheduled(ShardInstance instance); + + void error(ShardInstance instance, String message); + + void unknownResult(ShardInstance instance); + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/spi/ReconcileCallback.java b/shared/src/main/java/com/inteligr8/alfresco/asie/spi/ReconcileCallback.java new file mode 100644 index 0000000..e80ee5c --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/spi/ReconcileCallback.java @@ -0,0 +1,19 @@ +package com.inteligr8.alfresco.asie.spi; + +import java.util.Map; +import java.util.Set; + +import com.inteligr8.alfresco.asie.model.ShardInstance; + +public interface ReconcileCallback { + + void reconciled(long nodeDbId); + + void unreconciled(long nodeDbId); + + void processed(long nodeDbId, + Set instsReconciled, + Set instsReconciling, + Map instsErrorMessages); + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/spi/ReindexCallback.java b/shared/src/main/java/com/inteligr8/alfresco/asie/spi/ReindexCallback.java new file mode 100644 index 0000000..6442259 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/spi/ReindexCallback.java @@ -0,0 +1,12 @@ +package com.inteligr8.alfresco.asie.spi; + +import com.inteligr8.alfresco.asie.model.ShardInstance; + +public interface ReindexCallback extends ActionCallback { + + @Override + default void unknownResult(ShardInstance instance) { + throw new IllegalStateException(); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/util/CompositeFuture.java b/shared/src/main/java/com/inteligr8/alfresco/asie/util/CompositeFuture.java new file mode 100644 index 0000000..d51319d --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/util/CompositeFuture.java @@ -0,0 +1,150 @@ +package com.inteligr8.alfresco.asie.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang3.tuple.MutablePair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompositeFuture implements Future { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Collection> futures = new ConcurrentLinkedQueue<>(); + + public MutablePair counts() { + MutablePair counts = MutablePair.of(0, 0); + for (Future future : this.futures) { + if (future.isDone()) { + counts.setRight(counts.getRight().intValue() + 1); + } else { + counts.setLeft(counts.getLeft().intValue() + 1); + } + } + + return counts; + } + + public int countIncomplete() { + return this.counts().getLeft().intValue(); + } + + public int countCompleted() { + return this.counts().getRight().intValue(); + } + + public void combine(Future future) { + this.futures.add(future); + } + + public void combine(Collection> futures) { + this.futures.addAll(futures); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = true; + for (Future future : this.futures) + if (!future.cancel(mayInterruptIfRunning)) + cancelled = false; + + return cancelled; + } + + public List getList() throws InterruptedException, ExecutionException { + List results = new ArrayList<>(this.futures.size()); + for (Future future : this.futures) + results.add(future.get()); + + return results; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + this.getList(); + return null; + } + + public List getList(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long expireTimeMillis = System.currentTimeMillis() + unit.toMillis(timeout); + + List results = new ArrayList<>(this.futures.size()); + for (Future future : this.futures) { + if (future instanceof RunnableFuture) { + this.logger.debug("Waiting {} ms since the start of the exectuion of the future to complete", unit.toMillis(timeout)); + results.add(((RunnableFuture) future).get(timeout, unit)); + } else { + long remainingTimeMillis = expireTimeMillis - System.currentTimeMillis(); + this.logger.debug("Waiting {} ms for the future to complete", remainingTimeMillis); + results.add(future.get(remainingTimeMillis, TimeUnit.MILLISECONDS)); + } + } + + return results; + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + this.getList(timeout, unit); + return null; + } + + @Override + public boolean isCancelled() { + for (Future future : this.futures) + if (!future.isCancelled()) + return false; + + return true; + } + + @Override + public boolean isDone() { + for (Future future : this.futures) + if (!future.isDone()) + return false; + + return true; + } + + /** + * Remove any futures that are done (or cancelled). + * + * @param includeCancelled `true` to purge cancelled futures; `false` to purge only completed futures + */ + public void purge(boolean includeCancelled) { + List> cfutures = new LinkedList<>(); + int removedCancelled = 0; + int removedDone = 0; + + Iterator> i = this.futures.iterator(); + while (i.hasNext()) { + Future future = i.next(); + if (future.isCancelled()) { + if (includeCancelled) { + removedCancelled++; + i.remove(); + } + } else if (future.isDone()) { + removedDone++; + i.remove(); + } else if (future instanceof CompositeFuture) { + cfutures.add((CompositeFuture) future); + } + } + + this.logger.debug("Purged {} cancelled and {} completed futures", removedCancelled, removedDone); + + for (CompositeFuture cfuture : cfutures) + cfuture.purge(includeCancelled); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/util/ElapsedInterruptableRunnable.java b/shared/src/main/java/com/inteligr8/alfresco/asie/util/ElapsedInterruptableRunnable.java new file mode 100644 index 0000000..6892acf --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/util/ElapsedInterruptableRunnable.java @@ -0,0 +1,43 @@ +package com.inteligr8.alfresco.asie.util; + +public abstract class ElapsedInterruptableRunnable extends InterruptableRunnable { + + private final Object runLock = new Object(); + private boolean runOnce = true; + private boolean run = false; + private Long executionStartTimeMillis; + private Long executionEndTimeMillis; + + public void setRunOnce(boolean runOnce) { + this.runOnce = runOnce; + } + + @Override + public final void runInterruptable() throws InterruptedException { + synchronized (this.runLock) { + if (this.runOnce && this.run) + return; + this.run = true; + } + + this.executionStartTimeMillis = System.currentTimeMillis(); + try { + this.runElapsed(); + } finally { + this.executionEndTimeMillis = System.currentTimeMillis(); + } + } + + protected abstract void runElapsed() throws InterruptedException; + + public Long computeElapsedExecutionMillis() { + if (this.executionEndTimeMillis != null) { + return this.executionEndTimeMillis - this.executionStartTimeMillis; + } else if (this.executionStartTimeMillis != null) { + return System.currentTimeMillis() - this.executionStartTimeMillis; + } else { + return null; + } + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/util/InterruptableRunnable.java b/shared/src/main/java/com/inteligr8/alfresco/asie/util/InterruptableRunnable.java new file mode 100644 index 0000000..f77391d --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/util/InterruptableRunnable.java @@ -0,0 +1,75 @@ +package com.inteligr8.alfresco.asie.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class InterruptableRunnable implements Runnable { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Object threadLock = new Object(); + private boolean run = false; + private Thread runThread = null; + private boolean completed = false; + private boolean interrupted = false; + + @Override + public final void run() { + synchronized (this.threadLock) { + this.run = true; + this.runThread = Thread.currentThread(); + } + + try { + this.runInterruptable(); + this.completed = true; + } catch (InterruptedException ie) { + this.logger.debug("Runnable interrupted"); + this.interrupted = true; + this.interrupted(ie); + } finally { + this.runThread = null; + } + } + + protected abstract void runInterruptable() throws InterruptedException; + + public boolean interrupt() { + synchronized (this.threadLock) { + if (this.runThread == null) + return false; + + this.logger.trace("Runnable interrupting ..."); + this.runThread.interrupt(); + } + + return true; + } + + protected void interrupted(InterruptedException ie) { + } + + public boolean isQueued() { + return !this.run; + } + + public boolean isInterrupted() { + return this.interrupted; + } + + public boolean isCompleted() { + return this.completed; + } + + public boolean isFailed() { + synchronized (this.threadLock) { + return this.run && !this.completed && !this.interrupted && this.runThread == null; + } + } + + public boolean isDone() { + synchronized (this.threadLock) { + return this.run && this.runThread == null; + } + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/util/RunnableFuture.java b/shared/src/main/java/com/inteligr8/alfresco/asie/util/RunnableFuture.java new file mode 100644 index 0000000..fb293aa --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/util/RunnableFuture.java @@ -0,0 +1,72 @@ +package com.inteligr8.alfresco.asie.util; + +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RunnableFuture implements Future { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final ThreadPoolExecutor executor; + private final WaitableRunnable runnable; + + public RunnableFuture(ThreadPoolExecutor executor, WaitableRunnable runnable) { + this.executor = executor; + this.runnable = runnable; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (this.executor.remove(this.runnable)) { + this.logger.debug("Cancelled runnable by removing from queue"); + return true; + } else if (mayInterruptIfRunning && this.runnable.interrupt()) { + this.logger.debug("Cancelled runnable by interrupting it"); + return true; + } + return false; + } + + /** + * This method will not never timeout and will only return or throw when + * the runnable is complete or interrupted. + * + * @return Always `null`. + */ + @Override + public T get() throws InterruptedException { + this.runnable.waitUntilDone(); + return null; + } + + /** + * This method will timeout in the specified amount of time after the + * execution commences. This will wait indefinitely as the execution is + * queued. Once it is in the executing state, the clock starts. This + * alters the default behavior of the `Future` interface. + * + * @param timeout A positive integer representing a period of time. + * @param unit The time unit of the `timeout` parameter. + * @return Always `null`. + */ + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { + this.runnable.waitUntilExecutionElapsed(timeout, unit); + return null; + } + + @Override + public boolean isCancelled() { + return this.runnable.isInterrupted(); + } + + @Override + public boolean isDone() { + return this.runnable.isDone(); + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/util/ThrottledThreadPoolExecutor.java b/shared/src/main/java/com/inteligr8/alfresco/asie/util/ThrottledThreadPoolExecutor.java new file mode 100644 index 0000000..dcf1bcf --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/util/ThrottledThreadPoolExecutor.java @@ -0,0 +1,96 @@ +package com.inteligr8.alfresco.asie.util; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class ThrottledThreadPoolExecutor extends ThreadPoolExecutor { + + public ThrottledThreadPoolExecutor( + int coreThreadPoolSize, + int maximumThreadPoolSize, + int maximumQueueSize, + long keepAliveTime, + TimeUnit unit, + String threadNamePrefix) { + super(coreThreadPoolSize, maximumThreadPoolSize, keepAliveTime, unit, + new ArrayBlockingQueue<>(maximumQueueSize), + new ThreadFactoryBuilder() + .setNameFormat(threadNamePrefix + "-%d") + .build()); + } + + public ThrottledThreadPoolExecutor( + int coreThreadPoolSize, + int maximumThreadPoolSize, + int maximumQueueSize, + long keepAliveTime, + TimeUnit unit, + String threadNamePrefix, + RejectedExecutionHandler rejectedExecutionHandler) { + super(coreThreadPoolSize, maximumThreadPoolSize, keepAliveTime, unit, + new ArrayBlockingQueue<>(maximumQueueSize), + new ThreadFactoryBuilder() + .setNameFormat(threadNamePrefix + "-%d") + .build(), + rejectedExecutionHandler); + } + + /** + * @param timeout Negative to not wait + */ + public RunnableFuture submit(Callable task, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { + WaitableRunnable wrunnable = new WaitableRunnable() { + + @Override + protected void runWaitable() throws InterruptedException { + try { + task.call(); + } catch (InterruptedException ie) { + throw ie; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + }; + + return new RunnableFuture(this, this.submit(wrunnable, timeout, unit)); + } + + /** + * @param timeout Negative to not wait + */ + public RunnableFuture submit(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { + WaitableRunnable wrunnable = new WaitableRunnable() { + + @Override + protected void runWaitable() { + runnable.run(); + } + + }; + + return new RunnableFuture(this, this.submit(wrunnable, timeout, unit)); + } + + private WaitableRunnable submit(WaitableRunnable runnable, long throttlingBlockTimeout, TimeUnit throttlingBlockUnit) throws InterruptedException, TimeoutException { + // if no core threads are running, the queue won't be monitored for runnables + this.prestartAllCoreThreads(); + + if (throttlingBlockTimeout < 0L) { + this.getQueue().put(runnable); + } else { + if (!this.getQueue().offer(runnable, throttlingBlockTimeout, throttlingBlockUnit)) + throw new TimeoutException(); + } + + return runnable; + } + +} diff --git a/shared/src/main/java/com/inteligr8/alfresco/asie/util/WaitableRunnable.java b/shared/src/main/java/com/inteligr8/alfresco/asie/util/WaitableRunnable.java new file mode 100644 index 0000000..e2783c6 --- /dev/null +++ b/shared/src/main/java/com/inteligr8/alfresco/asie/util/WaitableRunnable.java @@ -0,0 +1,60 @@ +package com.inteligr8.alfresco.asie.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public abstract class WaitableRunnable extends ElapsedInterruptableRunnable { + + private final Object lock = new Object(); + private boolean done = false; + + @Override + public final void runElapsed() throws InterruptedException { + this.done = false; + try { + this.runWaitable(); + } finally { + synchronized (this.lock) { + this.done = true; + this.lock.notifyAll(); + } + } + } + + protected abstract void runWaitable() throws InterruptedException; + + public void waitUntilDone() throws InterruptedException { + synchronized (this.lock) { + if (!this.done && !this.isDone()) + this.lock.wait(); + } + } + + public void waitUntilDone(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { + synchronized (this.lock) { + if (!this.done && !this.isDone()) { + long waitTime = unit.toMillis(timeout); + long startTime = System.currentTimeMillis(); + this.lock.wait(waitTime); + if (System.currentTimeMillis() - startTime >= waitTime) + throw new TimeoutException(); + } + } + } + + public void waitUntilExecutionElapsed(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { + synchronized (this.lock) { + if (!this.done && !this.isDone()) { + while (this.isQueued()) + Thread.sleep(50L); + Long elapsedExecutionMillis = this.computeElapsedExecutionMillis(); + long waitTime = unit.toMillis(timeout) - elapsedExecutionMillis; + long startTime = System.currentTimeMillis(); + this.lock.wait(waitTime); + if (System.currentTimeMillis() - startTime >= waitTime) + throw new TimeoutException(); + } + } + } + +} diff --git a/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/fix.post.desc.xml b/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/fix.post.desc.xml new file mode 100644 index 0000000..71855e7 --- /dev/null +++ b/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/fix.post.desc.xml @@ -0,0 +1,46 @@ + + + + Fix ASIE Indexes + Inteligr8 ASIE + Issue a 'fix' command to the ASIE indexes. + This call will attempt to fix all Solr nodes. + The fix operation is asynchronous and could fail on any Solr node without notification.

+

The following response body should be expected in most cases (202 and 500 status codes):

+
+			{
+				"scheduled": [
+					"solrHostAsync:8983/solr",
+					...
+				],
+				"error": [
+					"solrHostThatFailed:8983/solr": {
+						"message": "string"
+					},
+					...
+				]
+			}
+		
+

The following status codes should be expected:

+
+
202
+
Accepted
+
+ ]]>
+ + + /inteligr8/asie/acs/fix + any + + + user + + + + false + false + + +
\ No newline at end of file diff --git a/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/purgeAcsNode.put.desc.xml b/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/purgeAcsNode.put.desc.xml new file mode 100644 index 0000000..231a589 --- /dev/null +++ b/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/purgeAcsNode.put.desc.xml @@ -0,0 +1,63 @@ + + + + Purge ACS Node in ASIE Indexes + Inteligr8 ASIE + Purge the specified ACS node in the ASIE indexes. + This call will attempt to purge the ACS node on all applicable Solr nodes. + The purge operation could be synchronous or asynchronous and could fail on any Solr node. + If any Solr node failed synchronously in the execution, then expect a status code of 500. + The response body will still be identical to the 200/202 status codes. + If any Solr node is executing the purge asynchronously and there are no synchronous failures, then expect a status code of 202.

+

The following path parameters are supported:

+
+
nodeId
+
An ACS node ID.
+
+

The following response body should be expected in most cases (200, 202, and 500 status codes):

+
+			{
+				"nodeDbId": number,
+				"success": [
+					"solrHostSync:8983/solr",
+					...
+				],
+				"scheduled": [
+					"solrHostAsync:8983/solr",
+					...
+				],
+				"error": [
+					"solrHostThatFailed:8983/solr": {
+						"message": "string"
+					},
+					...
+				]
+			}
+		
+

The following status codes should be expected:

+
+
200
+
OK
+
202
+
Accepted
+
400
+
The path or query parameters are invalid
+
+ ]]>
+ + + /inteligr8/asie/acs/node/{nodeId}/purge + any + + + user + + + + false + false + + +
\ No newline at end of file diff --git a/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/reconcileAcsNodes.post.desc.xml b/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/reconcileAcsNodes.post.desc.xml new file mode 100644 index 0000000..82e3963 --- /dev/null +++ b/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/reconcileAcsNodes.post.desc.xml @@ -0,0 +1,73 @@ + + + + Reconcile ACS Nodes against ASIE Indexes + Inteligr8 ASIE + Reconcile the ACS node range against the ASIE indexes. + This call will loop through the specified ACS node range, reconciling each against ASIE indexes. + It will discover all unindexed nodes within the range, logging the details with logger `inteligr8.asie.reconcile`. + Optionally, an ASIE `REINDEX` command may be issued against ASIE.

+

The following path parameters are supported:

+
+
fromDbId
+
A DB ID integer for the starting point of a range, inclusive.
+
toDbId
+
A DB ID integer for the ending point of a range, exclusive.
+
+

The following response body should be expected in most cases (200, 202, and 500 status codes):

+
+			{
+				"unreconciled": [
+					number,       // node DB ID
+					...
+				],
+				"success": {
+					"number": [   // node DB ID
+						"solrHostSync:8983/solr",
+						...
+					],
+					...
+				},
+				"scheduled": {
+					"number": [   // node DB ID
+						"solrHostAsync:8983/solr",
+						...
+					],
+					...
+				},
+				"error": {
+					"number": [   // node DB ID
+						"solrHostThatFailed:8983/solr": {
+							"message": "string"
+						},
+						...
+					],
+					...
+				}
+			}
+		
+

The following status codes should be expected:

+
+
202
+
Accepted
+
400
+
The path or query parameters are invalid
+
+ ]]>
+ + + /inteligr8/asie/acs/nodes/{fromDbId}/{toDbId}/reconcile?reindex={reindex?} + any + + + user + + + + false + false + + +
\ No newline at end of file diff --git a/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/reindexAcsNode.put.desc.xml b/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/reindexAcsNode.put.desc.xml new file mode 100644 index 0000000..1bf205d --- /dev/null +++ b/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/reindexAcsNode.put.desc.xml @@ -0,0 +1,63 @@ + + + + Reindex ACS Node in ASIE Indexes + Inteligr8 ASIE + Reindex the specified ACS node in the ASIE indexes. + This call will attempt to reindex the ACS node on all applicable Solr nodes. + The reindex operation could be synchronous or asynchronous and could fail on any Solr node. + If any Solr node failed synchronously in the execution, then expect a status code of 500. + The response body will still be identical to the 200/202 status codes. + If any Solr node is executing the reindex asynchronously and there are no synchronous failures, then expect a status code of 202.

+

The following path parameters are supported:

+
+
nodeId
+
An ACS node ID.
+
+

The following response body should be expected in most cases (200, 202, and 500 status codes):

+
+			{
+				"nodeDbId": number,
+				"success": [
+					"solrHostSync:8983/solr",
+					...
+				],
+				"scheduled": [
+					"solrHostAsync:8983/solr",
+					...
+				],
+				"error": [
+					"solrHostThatFailed:8983/solr": {
+						"message": "string"
+					},
+					...
+				]
+			}
+		
+

The following status codes should be expected:

+
+
200
+
OK
+
202
+
Accepted
+
400
+
The path or query parameters are invalid
+
+ ]]>
+ + + /inteligr8/asie/acs/node/{nodeId}/reindex + any + + + user + + + + false + false + + +
\ No newline at end of file diff --git a/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/retry.post.desc.xml b/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/retry.post.desc.xml new file mode 100644 index 0000000..9496fd6 --- /dev/null +++ b/shared/src/main/resources/alfresco/extension/templates/webscripts/com/inteligr8/alfresco/asie/retry.post.desc.xml @@ -0,0 +1,46 @@ + + + + Retry ASIE Indexes + Inteligr8 ASIE + Issue a 'retry' command to the ASIE indexes. + This call will attempt to retry all remembered failed ACS nodes on Solr nodes. + The retry operation is asynchronous and could fail on any Solr node without notification.

+

The following response body should be expected in most cases (202 and 500 status codes):

+
+			{
+				"scheduled": [
+					"solrHostAsync:8983/solr",
+					...
+				],
+				"error": [
+					"solrHostThatFailed:8983/solr": {
+						"message": "string"
+					},
+					...
+				]
+			}
+		
+

The following status codes should be expected:

+
+
202
+
Accepted
+
+ ]]>
+ + + /inteligr8/asie/acs/retry + any + + + user + + + + false + false + + +
\ No newline at end of file diff --git a/shared/src/main/resources/alfresco/module/com_inteligr8_alfresco_asie-shared/log4j2.properties b/shared/src/main/resources/alfresco/module/com_inteligr8_alfresco_asie-shared/log4j2.properties index 6c345f1..1dd5278 100644 --- a/shared/src/main/resources/alfresco/module/com_inteligr8_alfresco_asie-shared/log4j2.properties +++ b/shared/src/main/resources/alfresco/module/com_inteligr8_alfresco_asie-shared/log4j2.properties @@ -1,3 +1,6 @@ logger.inteligr8-asie.name=com.inteligr8.alfresco.asie logger.inteligr8-asie.level=INFO + +logger.reconcile.name=inteligr8.asie.reconcile +logger.reconcile.level=INFO diff --git a/shared/src/test/java/com/inteligr8/alfresco/asie/util/CompositeFutureUnitTest.java b/shared/src/test/java/com/inteligr8/alfresco/asie/util/CompositeFutureUnitTest.java new file mode 100644 index 0000000..926d08d --- /dev/null +++ b/shared/src/test/java/com/inteligr8/alfresco/asie/util/CompositeFutureUnitTest.java @@ -0,0 +1,92 @@ +package com.inteligr8.alfresco.asie.util; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompositeFutureUnitTest { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Test + public void test() throws InterruptedException, ExecutionException, TimeoutException { + ExecutorService executor = Executors.newWorkStealingPool(3); + + CompositeFuture futures = new CompositeFuture(); + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(100L))); + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(200L))); + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(300L))); + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(400L))); + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(500L))); + + Thread.sleep(50L); + + Assert.assertEquals(Pair.of(5, 0), futures.counts()); + + Thread.sleep(100L); + Assert.assertEquals(Pair.of(4, 1), futures.counts()); + + Thread.sleep(100L); + Assert.assertEquals(Pair.of(3, 2), futures.counts()); + + Thread.sleep(100L); + Assert.assertEquals(Pair.of(2, 3), futures.counts()); + + Thread.sleep(100L); + Assert.assertEquals(Pair.of(2, 3), futures.counts()); + + Thread.sleep(100L); + Assert.assertEquals(Pair.of(1, 4), futures.counts()); + + Thread.sleep(200L); + Assert.assertEquals(Pair.of(0, 5), futures.counts()); + + long startTime = System.currentTimeMillis(); + futures.get(); + Assert.assertTrue(System.currentTimeMillis() - startTime < 5); + + startTime = System.currentTimeMillis(); + futures.get(10L, TimeUnit.SECONDS); + Assert.assertTrue(System.currentTimeMillis() - startTime < 5); + } + + + + private class Pausable implements Callable { + + private static final MutableInt SEQUENCE = new MutableInt(1); + private final int seq; + private final long pauseInMillis; + + public Pausable(long pauseInMillis) { + synchronized (SEQUENCE) { + this.seq = SEQUENCE.getAndIncrement(); + } + this.pauseInMillis = pauseInMillis; + } + + @Override + public Void call() throws InterruptedException { + logger.trace("Thread started execution: " + Thread.currentThread().getName() + ": " + this.seq); + Thread.sleep(this.pauseInMillis); + logger.trace("Thread ending execution: " + Thread.currentThread().getName() + ": " + this.seq); + return null; + } + } + +} diff --git a/shared/src/test/java/com/inteligr8/alfresco/asie/util/ThrottledThreadPoolExecutorUnitTest.java b/shared/src/test/java/com/inteligr8/alfresco/asie/util/ThrottledThreadPoolExecutorUnitTest.java new file mode 100644 index 0000000..faa87a6 --- /dev/null +++ b/shared/src/test/java/com/inteligr8/alfresco/asie/util/ThrottledThreadPoolExecutorUnitTest.java @@ -0,0 +1,146 @@ +package com.inteligr8.alfresco.asie.util; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ThrottledThreadPoolExecutorUnitTest { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Test(expected = RejectedExecutionException.class) + public void testHardRejection() throws InterruptedException, ExecutionException, TimeoutException { + ThrottledThreadPoolExecutor executor = new ThrottledThreadPoolExecutor(1, 1, 1, 10L, TimeUnit.SECONDS, "not-throttled"); + try { + long startTime = System.currentTimeMillis(); + + CompositeFuture futures = new CompositeFuture(); + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(100L))); + + // confirm there is no blocking as the pool is filled + this.assertElapsed(startTime, 0L, 50L); + Assert.assertEquals(Pair.of(1, 0), futures.counts()); + + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(200L))); + + // confirm there is no blocking as the queue is filled + this.assertElapsed(startTime, 0L, 50L); + Assert.assertEquals(Pair.of(2, 0), futures.counts()); + + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(300L))); + } finally { + executor.shutdown(); + Assert.assertTrue(executor.awaitTermination(2L, TimeUnit.SECONDS)); + } + } + + @Test + public void testBlocking() throws InterruptedException, ExecutionException, TimeoutException { + ThrottledThreadPoolExecutor executor = new ThrottledThreadPoolExecutor(2, 2, 2, 10L, TimeUnit.SECONDS, "throttled"); + try { + long startTime = System.currentTimeMillis(); + + CompositeFuture futures = new CompositeFuture(); + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(100L), 1L, TimeUnit.SECONDS)); + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(200L), 1L, TimeUnit.SECONDS)); + + // confirm there is no blocking as the pool is filled + this.assertElapsed(startTime, 0L, 50L); + Assert.assertEquals(Pair.of(2, 0), futures.counts()); + + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(300L), 1L, TimeUnit.SECONDS)); + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(400L), 1L, TimeUnit.SECONDS)); + + // confirm there is no blocking as the queue is filled + this.assertElapsed(startTime, 0L, 50L); + Assert.assertEquals(Pair.of(4, 0), futures.counts()); + + this.logger.trace("Execution submitting"); + futures.combine(executor.submit(new Pausable(500L), 10L, TimeUnit.SECONDS)); + + // confirm there is blocking as the pool and queue are full + this.assertElapsed(startTime, 100L, 150L); + Assert.assertEquals(Pair.of(4, 1), futures.counts()); + + executor.shutdown(); + + Thread.sleep(150L); // 250ms + Assert.assertEquals(Pair.of(3, 2), futures.counts()); + + Thread.sleep(100L); // 350ms + Assert.assertEquals(Pair.of(3, 2), futures.counts()); + + Thread.sleep(100L); // 450ms + Assert.assertEquals(Pair.of(2, 3), futures.counts()); + + Thread.sleep(100L); // 550ms + Assert.assertEquals(Pair.of(2, 3), futures.counts()); + + Thread.sleep(100L); // 650ms + Assert.assertEquals(Pair.of(1, 4), futures.counts()); + + Thread.sleep(200L); // 850ms + Assert.assertEquals(Pair.of(1, 4), futures.counts()); + + Thread.sleep(100L); // 950 ms + Assert.assertEquals(Pair.of(0, 5), futures.counts()); + + startTime = System.currentTimeMillis(); + futures.get(); + this.assertElapsed(startTime, 0L, 25L); + + startTime = System.currentTimeMillis(); + futures.get(10L, TimeUnit.SECONDS); + this.assertElapsed(startTime, 0L, 25L); + } finally { + executor.awaitTermination(2L, TimeUnit.SECONDS); + } + } + + private void assertElapsed(long startTime, long minRangeInclusive, long maxRangeExclusive) { + long elapsedTime = System.currentTimeMillis() - startTime; + Assert.assertTrue("The execution time was shorter than expected: " + elapsedTime + " ms < " + minRangeInclusive + " ms", elapsedTime >= minRangeInclusive); + Assert.assertTrue("The execution time was longer than expected: " + elapsedTime + " ms >= " + maxRangeExclusive + " ms", elapsedTime < maxRangeExclusive); + } + + + + private class Pausable implements Callable { + + private static final MutableInt SEQUENCE = new MutableInt(1); + private final int seq; + private final long pauseInMillis; + + public Pausable(long pauseInMillis) { + synchronized (SEQUENCE) { + this.seq = SEQUENCE.getAndIncrement(); + } + this.pauseInMillis = pauseInMillis; + } + + @Override + public Void call() throws InterruptedException { + logger.trace("Thread started execution: " + Thread.currentThread().getName() + ": " + this.seq); + Thread.sleep(this.pauseInMillis); + logger.trace("Thread ending execution: " + Thread.currentThread().getName() + ": " + this.seq); + return null; + } + } + +} diff --git a/shared/src/test/resources/log4j2-test.properties b/shared/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000..be69a85 --- /dev/null +++ b/shared/src/test/resources/log4j2-test.properties @@ -0,0 +1,10 @@ +rootLogger.level=trace +rootLogger.appenderRef.stdout.ref=STDOUT + +logger.this.name=com.inteligr8.alfresco.asie +logger.this.level=trace + +appender.stdout.type=Console +appender.stdout.name=STDOUT +appender.stdout.layout.type=PatternLayout +appender.stdout.layout.pattern=%d{ABSOLUTE_MICROS} %level{length=1} %c{1}: %m%n