From de8e0bf2d73af4c8fa4395bcf6d4efac2214c15c Mon Sep 17 00:00:00 2001 From: "Brian M. Long" Date: Wed, 13 Nov 2024 18:03:22 -0500 Subject: [PATCH] update from refactoring (incomplete) --- community-module/pom.xml | 5 + .../alfresco/asie/CommunityConstants.java | 23 + .../alfresco/asie/cache/MultiValueCache.java | 67 -- .../asie/service/ShardStateService.java | 63 +- .../asie/service/SolrShardRegistry.java | 607 +++++++++++------- .../asie/util/ShardSetSearchComparator.java | 79 +++ 6 files changed, 490 insertions(+), 354 deletions(-) create mode 100644 community-module/src/main/java/com/inteligr8/alfresco/asie/CommunityConstants.java delete mode 100644 community-module/src/main/java/com/inteligr8/alfresco/asie/cache/MultiValueCache.java create mode 100644 community-module/src/main/java/com/inteligr8/alfresco/asie/util/ShardSetSearchComparator.java diff --git a/community-module/pom.xml b/community-module/pom.xml index 359c4f3..9ee7607 100644 --- a/community-module/pom.xml +++ b/community-module/pom.xml @@ -33,6 +33,11 @@ + + com.inteligr8.alfresco + cachext-platform-module + 1.0-SNAPSHOT + com.inteligr8.alfresco asie-shared diff --git a/community-module/src/main/java/com/inteligr8/alfresco/asie/CommunityConstants.java b/community-module/src/main/java/com/inteligr8/alfresco/asie/CommunityConstants.java new file mode 100644 index 0000000..ce16419 --- /dev/null +++ b/community-module/src/main/java/com/inteligr8/alfresco/asie/CommunityConstants.java @@ -0,0 +1,23 @@ +package com.inteligr8.alfresco.asie; + +public interface CommunityConstants extends Constants { + + static final String BEAN_SHARDSETS_CACHE = "asieShardsetsCache"; + static final String BEAN_NODES_CACHE = "asieNodesCache"; + static final String BEAN_SHARD_NODES_CACHE = "asieShardNodesCache"; + static final String BEAN_SHARDINST_STATE_CACHE = "asieShardInstanceStateCache"; + static final String BEAN_NODE_DISABLE_CACHE = "asieNodeDisabledCache"; + static final String BEAN_NODE_UNAVAIL_CACHE = "asieNodeUnavailableCache"; + static final String BEAN_SHARDINST_DISABLE_CACHE = "asieShardInstanceDisabledCache"; + static final String BEAN_SHARDINST_UNAVAIL_CACHE = "asieShardInstanceUnavailableCache"; + static final String BEAN_CORE_EXPLICIT_CACHE = "asieCoreExplicitCache"; + + static final String ATTR_ASIE_SHARDSET = "inteligr8.asie.shardSet"; + static final String ATTR_ASIE_NODE = "inteligr8.asie.node"; + static final String ATTR_ASIE_SHARD_NODES = "inteligr8.asie.shard.nodes"; + static final String ATTR_ASIE_SHARD_NODE = "inteligr8.asie.shard.node"; + static final String ATTR_OBJECT = "object"; + static final String ATTR_DISABLE = "disabled"; + static final String ATTR_NODES = "nodes"; + +} diff --git a/community-module/src/main/java/com/inteligr8/alfresco/asie/cache/MultiValueCache.java b/community-module/src/main/java/com/inteligr8/alfresco/asie/cache/MultiValueCache.java deleted file mode 100644 index d6d820c..0000000 --- a/community-module/src/main/java/com/inteligr8/alfresco/asie/cache/MultiValueCache.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.inteligr8.alfresco.asie.cache; - -import java.io.Serializable; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.Collection; - -import org.alfresco.repo.cache.SimpleCache; - -public class MultiValueCache> implements SimpleCache { - - private SimpleCache cache; - private Class collectionType; - - public MultiValueCache(SimpleCache cache, Class collectionType) { - this.cache = cache; - this.collectionType = collectionType; - } - - @SuppressWarnings("unchecked") - public boolean add(K key, V value) { - C c = this.cache.get(key); - if (c != null) - return c.add(value); - - try { - Constructor constructor = this.collectionType.getConstructor(); - c = (C) constructor.newInstance(); - this.cache.put(key, c); - return c.add(value); - } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e) { - throw new UnsupportedOperationException(e.getMessage(), e); - } - } - - @Override - public void clear() { - this.cache.clear(); - } - - @Override - public C get(K key) { - return this.cache.get(key); - } - - @Override - public boolean contains(K key) { - C c = this.cache.get(key); - return c == null ? false : !c.isEmpty(); - } - - @Override - public Collection getKeys() { - return this.cache.getKeys(); - } - - @Override - public void put(K key, C value) { - this.cache.put(key, value); - } - - @Override - public void remove(K key) { - this.cache.remove(key); - } - -} diff --git a/community-module/src/main/java/com/inteligr8/alfresco/asie/service/ShardStateService.java b/community-module/src/main/java/com/inteligr8/alfresco/asie/service/ShardStateService.java index c93c5ef..5a43bf5 100644 --- a/community-module/src/main/java/com/inteligr8/alfresco/asie/service/ShardStateService.java +++ b/community-module/src/main/java/com/inteligr8/alfresco/asie/service/ShardStateService.java @@ -1,80 +1,25 @@ package com.inteligr8.alfresco.asie.service; -import java.io.Serializable; -import java.util.Arrays; - -import org.alfresco.repo.cache.SimpleCache; -import org.alfresco.repo.index.shard.ShardInstance; -import org.alfresco.repo.index.shard.ShardState; import org.alfresco.service.cmr.attributes.AttributeService; -import org.alfresco.service.cmr.attributes.AttributeService.AttributeQueryCallback; -import org.apache.commons.lang3.ArrayUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import com.inteligr8.alfresco.asie.Constants; -import com.inteligr8.alfresco.asie.enterprise.EnterpriseConstants; @Component public class ShardStateService implements com.inteligr8.alfresco.asie.spi.ShardStateService { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - @Autowired @Qualifier(Constants.QUALIFIER_ASIE) private AttributeService attrService; - + @Autowired - @Qualifier(Constants.BEAN_SHARD_STATE_CACHE) - private SimpleCache shardStateCache; + private SolrShardRegistry shardRegistry; + @Override public void clear() { - this.logger.info("Removing all nodes/shards from the shard registry"); - - // this clears the state from the backend database - this.attrService.removeAttributes(EnterpriseConstants.ATTR_SHARD_STATE); - this.attrService.removeAttributes(EnterpriseConstants.ATTR_SHARD_SUBSCRIPTION); - this.attrService.removeAttributes(Constants.ATTR_ASIE); - - // this clears the state from Hazelcast - this.shardStateCache.clear(); - this.shardToGuidCache.clear(); - } - - public void remove(Serializable... keys) { - if (keys.length == 0) - throw new IllegalArgumentException(); - - this.logger.info("Removing from the shard registry: {}", Arrays.toString(keys)); - - Serializable[] shardStateKeys = keys; - Serializable[] shardSubKeys; - if (EnterpriseConstants.ATTR_SHARD_STATE.equals(keys[0])) { - shardSubKeys = ArrayUtils.clone(keys); - shardSubKeys[0] = EnterpriseConstants.ATTR_SHARD_SUBSCRIPTION; - } else { - shardStateKeys = ArrayUtils.addFirst(keys, EnterpriseConstants.ATTR_SHARD_STATE); - shardSubKeys = ArrayUtils.addFirst(keys, EnterpriseConstants.ATTR_SHARD_SUBSCRIPTION); - } - - ShardState shardState = (ShardState) this.attrService.getAttribute(shardStateKeys); - - // this clears the state from the backend database - this.attrService.removeAttribute(shardStateKeys); - this.attrService.removeAttribute(shardSubKeys); - - // this clears the state from Hazelcast - if (shardState != null) { - this.shardStateCache.remove(shardState.getShardInstance()); - this.shardToGuidCache.remove(shardState.getShardInstance()); - } - } - - public void iterate(AttributeQueryCallback callback) { - this.attrService.getAttributes(callback, EnterpriseConstants.ATTR_SHARD_STATE); + this.shardRegistry.purge(); } } diff --git a/community-module/src/main/java/com/inteligr8/alfresco/asie/service/SolrShardRegistry.java b/community-module/src/main/java/com/inteligr8/alfresco/asie/service/SolrShardRegistry.java index c982712..52381ae 100644 --- a/community-module/src/main/java/com/inteligr8/alfresco/asie/service/SolrShardRegistry.java +++ b/community-module/src/main/java/com/inteligr8/alfresco/asie/service/SolrShardRegistry.java @@ -1,6 +1,7 @@ package com.inteligr8.alfresco.asie.service; import java.io.Serializable; +import java.time.OffsetDateTime; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -8,18 +9,12 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.OptionalInt; +import java.util.Random; import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.alfresco.repo.cache.SimpleCache; import org.alfresco.repo.index.shard.Floc; -import org.alfresco.repo.index.shard.Shard; -import org.alfresco.repo.index.shard.ShardInstance; -import org.alfresco.repo.index.shard.ShardMethodEnum; -import org.alfresco.repo.index.shard.ShardRegistry; import org.alfresco.repo.index.shard.ShardState; import org.alfresco.repo.lock.JobLockService; import org.alfresco.service.cmr.attributes.AttributeService; @@ -29,7 +24,6 @@ import org.alfresco.service.cmr.dictionary.DictionaryService; import org.alfresco.service.cmr.search.SearchParameters; import org.alfresco.service.namespace.NamespaceService; import org.alfresco.service.namespace.QName; -import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -39,27 +33,29 @@ import org.springframework.context.ApplicationEvent; import org.springframework.extensions.surf.util.AbstractLifecycleBean; import org.springframework.stereotype.Component; +import com.inteligr8.alfresco.asie.CommunityConstants; import com.inteligr8.alfresco.asie.Constants; -import com.inteligr8.alfresco.asie.cache.MultiValueCache; import com.inteligr8.alfresco.asie.compute.QueryInspector; import com.inteligr8.alfresco.asie.compute.QueryInspector.QueryRangeValue; import com.inteligr8.alfresco.asie.compute.QueryInspector.QuerySingleValue; import com.inteligr8.alfresco.asie.compute.QueryInspector.QueryValue; import com.inteligr8.alfresco.asie.compute.QueryInspectorFactory; -import com.inteligr8.alfresco.asie.model.Node; +import com.inteligr8.alfresco.asie.model.Shard; +import com.inteligr8.alfresco.asie.model.ShardInstance; +import com.inteligr8.alfresco.asie.model.ShardInstanceState; import com.inteligr8.alfresco.asie.model.ShardSet; +import com.inteligr8.alfresco.asie.model.SolrHost; +import com.inteligr8.alfresco.asie.spi.ShardRegistry; +import com.inteligr8.alfresco.cachext.CollectionCache; +import com.inteligr8.alfresco.cachext.MultiValueCache; @Component public class SolrShardRegistry extends AbstractLifecycleBean implements ShardRegistry { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Pattern coreShardPattern = Pattern.compile("(.+)-[0-9]+"); - + private final Random random = new Random(); private final QName shardLock = QName.createQName(Constants.NAMESPACE_ASIE, "shardLock"); - @Autowired - private ShardStateService sss; - @Autowired @Qualifier(Constants.QUALIFIER_ASIE) private AttributeService attrService; @@ -77,19 +73,39 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg private JobLockService jobLockService; @Autowired - @Qualifier(Constants.BEAN_FLOC_SHARD_NODE_CACHE) - private SimpleCache>> flocShardNodeCache; + @Qualifier(CommunityConstants.BEAN_SHARDSETS_CACHE) + private SimpleCache shardsetsCache; @Autowired - @Qualifier(Constants.BEAN_ONLINE_SHARD_STATE_CACHE) - private SimpleCache onlineNodeShardStateCache; + @Qualifier(CommunityConstants.BEAN_NODES_CACHE) + private SimpleCache nodesCache; @Autowired - @Qualifier(Constants.BEAN_OFFILINE_SHARD_STATE_CACHE) - private SimpleCache offlineNodeShardStateCache; + @Qualifier(CommunityConstants.BEAN_SHARD_NODES_CACHE) + private MultiValueCache shardNodesCache; @Autowired - @Qualifier(Constants.BEAN_CORE_EXPLICIT_CACHE) + @Qualifier(CommunityConstants.BEAN_SHARDINST_STATE_CACHE) + private SimpleCache shardInstanceStatesCache; + + @Autowired + @Qualifier(CommunityConstants.BEAN_NODE_UNAVAIL_CACHE) + private CollectionCache> nodeUnavailableCache; + + @Autowired + @Qualifier(CommunityConstants.BEAN_NODE_DISABLE_CACHE) + private CollectionCache> nodeDisableCache; + + @Autowired + @Qualifier(CommunityConstants.BEAN_SHARDINST_UNAVAIL_CACHE) + private CollectionCache> shardInstanceUnavailableCache; + + @Autowired + @Qualifier(CommunityConstants.BEAN_SHARDINST_DISABLE_CACHE) + private CollectionCache> shardInstanceDisableCache; + + @Autowired + @Qualifier(CommunityConstants.BEAN_CORE_EXPLICIT_CACHE) private SimpleCache coreExplicitIdCache; @Value("${inteligr8.asie.registerUnknownShardOffline}") @@ -103,204 +119,343 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg @Override protected void onBootstrap(ApplicationEvent event) { - String lock = this.jobLockService.getLock(this.shardLock, 2500L, 500L, 10); - try { - this.attrService.getAttributes(new AttributeQueryCallback() { - @Override - public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) { - switch ((String) keys[2]) { - case Constants.ATTR_STATE: - ShardState shardNodeState = (ShardState) value; - ShardInstance shardNode = shardNodeState.getShardInstance(); - cacheShard(shardNode, shardNodeState, (String) keys[1]); - return true; - default: - return true; - - } - } - }, Constants.ATTR_ASIE_NODE_SHARD); - } finally { - this.jobLockService.releaseLock(lock, this.shardLock); - } + this.loadPersistedToCache(); } @Override protected void onShutdown(ApplicationEvent event) { } - /** - * This is private because it must be wrapped in a cluster-safe lock - */ - private void cacheShard(ShardInstance shardNode, ShardState shardNodeState, String nodeShardId) { - ShardInstance detachedShardNode = this.detach(shardNode); - - SimpleCache shardCache = this.onlineNodeShardStateCache; - ShardState cachedShardNodeState = this.onlineNodeShardStateCache.get(detachedShardNode); - if (cachedShardNodeState == null) { - cachedShardNodeState = this.offlineNodeShardStateCache.get(detachedShardNode); - shardCache = this.offlineNodeShardStateCache; - } + protected void loadPersistedToCache() { + String lockId = this.jobLockService.getLock(this.shardLock, 2500L, 500L, 10); + try { + this.attrService.getAttributes(new AttributeQueryCallback() { + @Override + public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) { + String core = (String) keys[1]; + if (!shardsetsCache.contains(core)) { + ShardSet shardSet = (ShardSet) value; + shardsetsCache.put(core, shardSet); + + switch (shardSet.getMethod()) { + case EXPLICIT_ID: + cacheExplicitShard(shardSet, false); + break; + default: + } + } + return true; + } + }, CommunityConstants.ATTR_ASIE_SHARDSET); - Shard shard = shardNode.getShard(); - this.putPutAdd(this.flocShardNodeCache, shard.getFloc(), shard.getInstance(), detachedShardNode); - - if (cachedShardNodeState == null) { - Boolean online = (Boolean) this.attrService.getAttribute(Constants.ATTR_ASIE_NODE_SHARD, nodeShardId, Constants.ATTR_ONLINE); - if (online != null) { - if (online.booleanValue()) { - this.onlineNodeShardStateCache.put(detachedShardNode, cachedShardNodeState); - } else { - this.offlineNodeShardStateCache.put(detachedShardNode, cachedShardNodeState); + this.attrService.getAttributes(new AttributeQueryCallback() { + @Override + public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) { + String nodeSpec = (String) keys[2]; + SolrHost node = (SolrHost) value; + if (!nodesCache.contains(nodeSpec)) + nodesCache.put(nodeSpec, node); + if (Boolean.TRUE.equals(attrService.getAttribute(CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_DISABLE, nodeSpec))) { + if (!nodeDisableCache.contains(node)) + nodeDisableCache.add(node); + } else if (nodeDisableCache.contains(node)) { + nodeDisableCache.remove(node); + } + return true; } - } else { - if (this.registerOffline) { - this.offlineNodeShardStateCache.put(detachedShardNode, cachedShardNodeState); - } else { - this.onlineNodeShardStateCache.put(detachedShardNode, cachedShardNodeState); + }, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_OBJECT); + + this.attrService.getAttributes(new AttributeQueryCallback() { + @Override + public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) { + Shard shard = (Shard) keys[1]; + SolrHost node = (SolrHost) keys[2]; + if (!shardNodesCache.contains(shard, node)) + shardNodesCache.add(shard, node); + return true; } - } - } else if (cachedShardNodeState.getLastIndexedTxId() < shardNodeState.getLastIndexedTxId()) { - // update the cached state if the state's last indexes transaction is later - shardCache.put(shardNode, this.detach(shardNodeState)); - } - - switch (shardNode.getShard().getFloc().getShardMethod()) { - case EXPLICIT_ID: - cacheExplicitShard(shardNode, shardNodeState); - break; - default: + }, CommunityConstants.ATTR_ASIE_SHARD_NODES); + + this.attrService.getAttributes(new AttributeQueryCallback() { + @Override + public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) { + ShardInstance shardNode = (ShardInstance) keys[2]; + ShardInstanceState state = (ShardInstanceState) value; + if (!shardInstanceStatesCache.contains(shardNode)) + shardInstanceStatesCache.put(shardNode, state); + if (Boolean.TRUE.equals(attrService.getAttribute(CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_DISABLE, shardNode))) { + if (!shardInstanceDisableCache.contains(shardNode)) + shardInstanceDisableCache.add(shardNode); + } else if (shardInstanceDisableCache.contains(shardNode)) { + shardInstanceDisableCache.remove(shardNode); + } + return true; + } + }, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_OBJECT); + } finally { + this.jobLockService.releaseLock(lockId, this.shardLock); } } - private void cacheExplicitShard(ShardInstance shardNode, ShardState shardNodeState) { - String coreName = shardNode.getShard().getFloc().getPropertyBag().get("coreName"); - if (coreName != null && !this.coreExplicitIdCache.contains(coreName)) { - String property = shardNodeState.getPropertyBag().get("shard.key"); - QName propertyQName = QName.createQName(property, this.namespaceService); - - this.logger.debug("Mapping core to explicit ID: {} => {}", coreName, propertyQName); - this.coreExplicitIdCache.put(coreName, propertyQName); + private void cacheExplicitShard(ShardSet shardSet, boolean overwrite) { + if (overwrite || !this.coreExplicitIdCache.contains(shardSet.getCore())) { + String property = shardSet.getPrefixedProperty(); + QName propertyQName = QName.createQName(property, namespaceService); + + this.logger.debug("Mapping core to explicit ID: {} => {}", shardSet.getCore(), propertyQName); + this.coreExplicitIdCache.put(shardSet.getCore(), propertyQName); } } + protected void persistCache() { + String lockId = this.jobLockService.getLock(this.shardLock, 2500L, 100L, 50); + try { + this.persistShardSetCache(); + this.persistNodeCache(); + this.persistShardNodeCache(); + this.persistShardInstanceCache(); + } finally { + this.jobLockService.releaseLock(lockId, this.shardLock); + } + } + + private void persistShardSetCache() { + // add anything missing + // update anything changed + for (String core : this.shardsetsCache.getKeys()) { + ShardSet shardSet = this.shardsetsCache.get(core); + this.checkSetAttribute(shardSet, CommunityConstants.ATTR_ASIE_SHARDSET, core); + } + + // we are not removing anything removed from the cache, as it might have expired + // it will just recache on the next load + } + + private void persistNodeCache() { + // add anything missing + // update anything changed + for (String nodeSpec : this.nodesCache.getKeys()) { + SolrHost node = this.nodesCache.get(nodeSpec); + this.checkSetAttribute(node, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_OBJECT, nodeSpec); + } + + // we are not removing anything removed from the cache, as it might have expired + // it will just recache on the next load + + // add anything disabled + for (SolrHost node : this.nodeDisableCache.values()) + this.checkSetAttribute(Boolean.TRUE, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_DISABLE, node.getSpec()); + + // remove anything not disabled + this.attrService.getAttributes(new AttributeQueryCallback() { + @Override + public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) { + SolrHost node = SolrHost.from((String) keys[2]); + if (!nodeDisableCache.contains(node)) + attrService.removeAttribute(keys); + return true; + } + }, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_DISABLE); + } + + private void persistShardNodeCache() { + // add anything missing + // update anything changed + for (Shard shard : this.shardNodesCache.getKeys()) { + Collection nodes = this.shardNodesCache.get(shard); + for (SolrHost node : nodes) { + this.checkSetAttribute(node, CommunityConstants.ATTR_ASIE_SHARD_NODES, shard, node.getSpec()); + } + } + + // we are not removing anything removed from the cache, as it might have expired + // it will just recache on the next load + } + + private void persistShardInstanceCache() { + // add anything missing + // update anything changed + for (ShardInstance shardNode : this.shardInstanceStatesCache.getKeys()) { + ShardInstanceState state = this.shardInstanceStatesCache.get(shardNode); + ShardInstanceState currentState = (ShardInstanceState) this.attrService.getAttribute(CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_OBJECT, shardNode); + if (currentState != null) { + if (currentState.compareTo(state) >= 0) { + // do nothing + } else { + this.logger.debug("The persisted state was old; updating: {}: {} => {}", shardNode, currentState, state); + this.attrService.setAttribute(state, CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_OBJECT, shardNode); + } + } else { + this.attrService.setAttribute(state, CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_OBJECT, shardNode); + } + } + + // we are not removing anything removed from the cache, as it might have expired + // it will just recache on the next load + + // add anything disabled + for (ShardInstance shardNode : this.shardInstanceDisableCache.values()) + this.checkSetAttribute(Boolean.TRUE, CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_DISABLE, shardNode); + + // remove anything not disabled + this.attrService.getAttributes(new AttributeQueryCallback() { + @Override + public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) { + ShardInstance shardNode = (ShardInstance) keys[2]; + if (!shardInstanceDisableCache.contains(shardNode)) + attrService.removeAttribute(keys); + return true; + } + }, CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_DISABLE); + } + + protected void persistCache(ShardSet shardSet, SolrHost node, Shard shard, ShardInstance shardNode, ShardInstanceState state) { + String lockId = this.jobLockService.getLock(this.shardLock, 2000L, 100L, 50); + try { + this.checkSetAttribute(shardSet, CommunityConstants.ATTR_ASIE_SHARDSET, shardSet.getCore()); + this.checkSetAttribute(node, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_OBJECT, node.getSpec()); + this.checkSetAttribute(node, CommunityConstants.ATTR_ASIE_SHARD_NODES, shard, node.getSpec()); + this.checkSetAttribute(state, CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_OBJECT, shardNode); + } finally { + this.jobLockService.releaseLock(lockId, this.shardLock); + } + } + + private void checkSetAttribute(Serializable value, Serializable... keys) { + Serializable currentValue = this.attrService.getAttribute(keys); + if (currentValue != null) { + if (currentValue.equals(value)) + return; + this.logger.warn("The attribute value unexpectedly changed: {}: {} => {}", keys, currentValue, value); + } + + this.attrService.setAttribute(value, keys); + } + @Override public void registerShardState(ShardState shardNodeState) { - ShardInstance shardNode = shardNodeState.getShardInstance(); - Node node = new Node(shardNode); - this.fixFlocPropertyBag(shardNodeState); + ShardSet shardSet = ShardSet.from(shardNodeState.getShardInstance().getShard().getFloc(), shardNodeState); + Shard shard = Shard.from(shardSet, shardNodeState.getShardInstance().getShard().getInstance()); + SolrHost node = SolrHost.from(shardNodeState.getShardInstance()); + ShardInstance shardNode = ShardInstance.from(shard, node); + ShardInstanceState state = ShardInstanceState.from(shardNodeState); - String lock = this.jobLockService.getLock(this.shardLock, 2500L, 500L, 10); - try { - this.cacheShard(shardNode, shardNodeState, node.getId()); - this.persistShards(); - } finally { - this.jobLockService.releaseLock(lock, this.shardLock); - } - } - - protected void fixFlocPropertyBag(ShardState shardNodeState) { - Floc floc = shardNodeState.getShardInstance().getShard().getFloc(); - if (floc.getPropertyBag().isEmpty()) { - for (Entry prop : shardNodeState.getPropertyBag().entrySet()) { - if (prop.getKey().startsWith("shard.")) { - floc.getPropertyBag().put(prop.getKey(), prop.getValue()); - } else if (prop.getKey().equals("coreName")) { - String coreName = this.extractCoreName(prop.getValue()); - if (coreName != null) - floc.getPropertyBag().put(prop.getKey(), coreName); - } - } - } - } - - protected String extractCoreName(String coreShardName) { - Matcher matcher = this.coreShardPattern.matcher(coreShardName); - if (!matcher.matches()) - return null; - return matcher.group(1); - } - - /** - * This is private because it must be wrapped in a cluster-safe lock - */ - private void persistShards() { - long onlineExpired = System.currentTimeMillis() - this.offlineIdleShardInSeconds * 1000L; - long offlineExpired = System.currentTimeMillis() - this.forgetOfflineShardInSeconds * 1000L; - - for (ShardInstance shardNode : this.onlineNodeShardStateCache.getKeys()) { - String nodeShardId = new Node(shardNode).getId() + ";" + shardNode.getShard().getInstance(); - ShardState shardNodeState = this.onlineNodeShardStateCache.get(shardNode); - if (shardNodeState.getLastUpdated() < onlineExpired) { - this.logger.warn("Taking shard offline: {}", shardNode); - this.onlineNodeShardStateCache.remove(shardNode); - this.offlineNodeShardStateCache.put(shardNode, shardNodeState); - } else { - this.attrService.setAttribute(shardNodeState, Constants.ATTR_ASIE_NODE_SHARD, nodeShardId, Constants.ATTR_STATE); - this.attrService.setAttribute(Boolean.TRUE, Constants.ATTR_ASIE_NODE_SHARD, nodeShardId, Constants.ATTR_ONLINE); - } - } - - for (ShardInstance shardNode : this.offlineNodeShardStateCache.getKeys()) { - String nodeShardId = new Node(shardNode).getId() + ";" + shardNode.getShard().getInstance(); - ShardState shardNodeState = this.offlineNodeShardStateCache.get(shardNode); - if (shardNodeState.getLastUpdated() < offlineExpired) { - this.logger.info("Forgetting about already offline shard: {}", shardNode); - this.offlineNodeShardStateCache.remove(shardNode); - } else { - this.attrService.setAttribute(shardNodeState, Constants.ATTR_ASIE_NODE_SHARD, nodeShardId, Constants.ATTR_STATE); - this.attrService.setAttribute(Boolean.FALSE, Constants.ATTR_ASIE_NODE_SHARD, nodeShardId, Constants.ATTR_ONLINE); - } - } + this.persistCache(shardSet, node, shard, shardNode, state); } @Override - public Map>> getFlocs() { - Map>> flocs = new HashMap<>(); + public void unregisterShardInstance(org.alfresco.repo.index.shard.ShardInstance shardInstance) { + ShardSet shardSet = ShardSet.from(shardInstance.getShard().getFloc(), null); + Shard shard = Shard.from(shardSet, shardInstance.getShard().getInstance()); + SolrHost node = SolrHost.from(shardInstance); + ShardInstance shardNode = ShardInstance.from(shard, node); + + this.shardInstanceStatesCache.remove(shardNode); + this.shardInstanceDisableCache.remove(shardNode); + this.shardInstanceUnavailableCache.remove(shardNode); + this.nodeDisableCache.remove(node); + this.nodeUnavailableCache.remove(node); + this.attrService.removeAttribute(CommunityConstants.ATTR_ASIE_SHARD_NODES, shard, node.getSpec()); + } + + @Override + public Map>> getFlocs() { + Map flocs = new HashMap<>(); + Map>> response = new HashMap<>(); - for (ShardInstance shardNode : this.onlineNodeShardStateCache.getKeys()) { - Floc floc = shardNode.getShard().getFloc(); + for (Shard shard : this.shardNodesCache.getKeys()) { + String core = shard.extractShardSetCore(); + ShardSet shardSet = this.shardsetsCache.get(core); + + Map> shards; + Floc floc = flocs.get(core); + if (floc != null) { + floc = shardSet.toAlfrescoModel(); + shards = new HashMap<>(); + } else { + shards = response.get(floc); + } - Map> shards = flocs.get(floc); - if (shards == null) - flocs.put(floc, shards = new HashMap<>()); + org.alfresco.repo.index.shard.Shard shard_ = shard.toAlfrescoModel(floc); + Set states = shards.get(shard_); + if (states == null) + states = new HashSet<>(); - Set shardNodeStates = shards.get(shardNode.getShard()); - if (shardNodeStates == null) - shards.put(shardNode.getShard(), shardNodeStates = new HashSet<>()); + for (SolrHost node : this.shardNodesCache.get(shard)) { + if (this.nodeDisableCache.contains(node) || this.nodeUnavailableCache.contains(node)) { + this.logger.debug("Excluding node as it is disabled or considered unavailable: {}", node); + continue; + } + + ShardInstance shardNode = ShardInstance.from(shard, node); + if (this.shardInstanceDisableCache.contains(shardNode) || this.shardInstanceUnavailableCache.contains(shardNode)) { + this.logger.debug("Excluding shard node as it is disabled or considered unavailable: {}", shardNode); + continue; + } + + ShardInstanceState shardNodeState = this.shardInstanceStatesCache.get(shardNode); + states.add(shardNodeState.toAlfrescoModel(shardNode.toAlfrescoModel(shard_))); + } - ShardState shardNodeState = this.onlineNodeShardStateCache.get(shardNode); - if (shardNodeState != null) // in case it was removed during the looping (very rare) - shardNodeStates.add(shardNodeState); + if (!states.isEmpty()) + shards.put(shard_, states); + if (!shards.isEmpty()) + response.put(floc, shards); } - return flocs; + return response; } @Override public void purge() { - this.sss.clear(); + String lockId = this.jobLockService.getLock(this.shardLock, 2500L, 100L, 50); + try { + this.logger.info("Removing all nodes/shards from the shard registry"); + this.shardsetsCache.clear(); + this.attrService.removeAttributes(CommunityConstants.ATTR_ASIE_SHARDSET); + + this.nodesCache.clear(); + this.nodeDisableCache.clear(); + this.nodeUnavailableCache.clear(); + this.attrService.removeAttributes(CommunityConstants.ATTR_ASIE_NODE); + + this.shardNodesCache.clear(); + this.attrService.removeAttributes(CommunityConstants.ATTR_ASIE_SHARD_NODES); + + this.shardInstanceStatesCache.clear(); + this.shardInstanceDisableCache.clear(); + this.shardInstanceUnavailableCache.clear(); + this.attrService.removeAttributes(CommunityConstants.ATTR_ASIE_SHARD_NODE); + } finally { + this.jobLockService.releaseLock(lockId, this.shardLock); + } } @Override public void purgeAgedOutShards() { - long onlineExpired = System.currentTimeMillis() - this.offlineIdleShardInSeconds * 1000L; - long offlineExpired = System.currentTimeMillis() - this.forgetOfflineShardInSeconds * 1000L; + OffsetDateTime onlineExpired = OffsetDateTime.now().minusSeconds(this.offlineIdleShardInSeconds); + OffsetDateTime offlineExpired = OffsetDateTime.now().minusSeconds(this.forgetOfflineShardInSeconds); - for (ShardInstance shardNode : this.onlineNodeShardStateCache.getKeys()) { - ShardState shardNodeState = this.onlineNodeShardStateCache.get(shardNode); - if (shardNodeState.getLastUpdated() < onlineExpired) { + for (ShardInstance shardNode : this.shardInstanceStatesCache.getKeys()) { + ShardInstanceState state = this.shardInstanceStatesCache.get(shardNode); + SolrHost node = shardNode.extractNode(); + + if (this.shardInstanceDisableCache.contains(shardNode)) { + this.logger.debug("Ignoring disabled shard instance during purgeAgedOutShards()"); + } else if (this.nodeDisableCache.contains(node)) { + this.logger.debug("Ignoring disabled node during purgeAgedOutShards()"); + } else if (state.getLastUpdated().isBefore(offlineExpired)) { + this.shardInstanceStatesCache.remove(shardNode); + if (this.shardInstanceUnavailableCache.remove(shardNode)) { + this.logger.info("Forgetting about already offline shard: {}", shardNode); + } else if (this.nodeUnavailableCache.remove(node)) { + this.logger.info("Forgetting about already offline shard: {}", shardNode); + } else { + this.logger.warn("Forgetting about online shard: {}", shardNode); + } + } else if (state.getLastUpdated().isBefore(onlineExpired)) { this.logger.warn("Taking shard offline: {}", shardNode); - this.onlineNodeShardStateCache.remove(shardNode); - this.offlineNodeShardStateCache.put(shardNode, shardNodeState); - } - } - - for (ShardInstance shardNode : this.offlineNodeShardStateCache.getKeys()) { - ShardState shardNodeState = this.offlineNodeShardStateCache.get(shardNode); - if (shardNodeState.getLastUpdated() < offlineExpired) { - this.logger.info("Forgetting about already offline shard: {}", shardNode); - this.offlineNodeShardStateCache.remove(shardNode); + this.shardInstanceUnavailableCache.add(shardNode); } } } @@ -314,8 +469,15 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg public Set getShardInstanceList(String coreName) { Set shardIds = new HashSet<>(); - for (ShardInstance shardNode : this.onlineNodeShardStateCache.getKeys()) { - shardIds.add(shardNode.getShard().getInstance()); + ShardSet shardSet = this.shardsetsCache.get(coreName); + if (shardSet == null) + return Collections.emptySet(); + + + for (Shard shard : this.shardNodesCache.getKeys()) { + if (shardSet.getCore().equals(shard.extractShardSetCore())) { + shardIds.add(shard.extractShardId()); + } } return shardIds; @@ -327,18 +489,19 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg } @Override - public List getIndexSlice(SearchParameters searchParameters) { + public List getIndexSlice(SearchParameters searchParameters) { if (searchParameters.getQuery() == null) return Collections.emptyList(); - List bestShards = null; + List bestShards = null; - for (Floc floc : this.flocShardMultiCache.getKeys()) { - List shards = new LinkedList<>(); + for (String shardSetSpec : this.shardsetsCache.getKeys()) { + ShardSet shardSet = this.shardsetsCache.get(shardSetSpec); + List shards = new LinkedList<>(); - switch (floc.getShardMethod()) { + switch (shardSet.getMethod()) { case EXPLICIT_ID: - String property = floc.getPropertyBag().get("shard.key"); + String property = shardSet.getPrefixedProperty(); QName propertyQName = QName.createQName(property, this.namespaceService); DataTypeDefinition dtdef = this.dictionaryService.getProperty(propertyQName).getDataType(); @@ -367,8 +530,7 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg } } - // shardIds to shardInstances - break; + shards.addAll(this.getIndexSlice(shardSet, shardIds)); default: // make no determination } @@ -380,42 +542,31 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg return bestShards; } - protected List getIndexSlice() { + protected List getIndexSlice(ShardSet shardSet, Collection shardIds) { + List shardNodes = new LinkedList<>(); - } - - private ShardInstance detach(ShardInstance shardNode) { - ShardInstance detachedShardNode = new ShardInstance(); - detachedShardNode.setHostName(shardNode.getHostName()); - detachedShardNode.setPort(shardNode.getPort()); - detachedShardNode.setBaseUrl(shardNode.getBaseUrl()); - return detachedShardNode; - } - - private ShardState detach(ShardState shardState) { - ShardState detachedShardState = new ShardState(); - detachedShardState.setLastIndexedChangeSetCommitTime(shardState.getLastIndexedChangeSetCommitTime()); - detachedShardState.setLastIndexedChangeSetId(shardState.getLastIndexedChangeSetId()); - detachedShardState.setLastIndexedTxCommitTime(shardState.getLastIndexedTxCommitTime()); - detachedShardState.setLastIndexedTxId(shardState.getLastIndexedTxId()); - detachedShardState.setLastUpdated(shardState.getLastUpdated()); - detachedShardState.setMaster(shardState.isMaster()); - detachedShardState.setPropertyBag(shardState.getPropertyBag()); - return detachedShardState; - } - - private boolean putPutAdd(SimpleCache>> cache, K1 cacheKey, K2 mapKey, V mapValue) { - Map> map = cache.get(cacheKey); - if (map == null) - map = new HashMap<>(); - return this.putAdd(map, mapKey, mapValue); - } - - private boolean putAdd(Map> map, K key, V value) { - Set set = map.get(key); - if (set == null) - set = new HashSet<>(); - return set.add(value); + for (Integer shardId : shardIds) { + Shard shard = Shard.from(shardSet, shardId); + + Collection nodes = this.shardNodesCache.get(shard); + List availableNodes = new LinkedList<>(); + for (SolrHost node : nodes) { + if (this.nodeDisableCache.contains(node) || this.nodeUnavailableCache.contains(node)) + continue; + + ShardInstance shardNode = ShardInstance.from(shard, node); + if (this.shardInstanceDisableCache.contains(shardNode) || this.shardInstanceUnavailableCache.contains(shardNode)) + continue; + + availableNodes.add(node); + } + + SolrHost randomNode = availableNodes.get(this.random.nextInt(availableNodes.size())); + + shardNodes.add(ShardInstance.from(shard, randomNode).toAlfrescoModel(shard.toAlfrescoModel(shardSet.toAlfrescoModel()))); + } + + return shardNodes; } } diff --git a/community-module/src/main/java/com/inteligr8/alfresco/asie/util/ShardSetSearchComparator.java b/community-module/src/main/java/com/inteligr8/alfresco/asie/util/ShardSetSearchComparator.java new file mode 100644 index 0000000..78905c5 --- /dev/null +++ b/community-module/src/main/java/com/inteligr8/alfresco/asie/util/ShardSetSearchComparator.java @@ -0,0 +1,79 @@ +package com.inteligr8.alfresco.asie.util; + +import java.util.Comparator; + +import org.alfresco.repo.index.shard.ShardMethodEnum; + +import com.inteligr8.alfresco.asie.model.ShardSet; + +public class ShardSetSearchComparator implements Comparator { + + @Override + public int compare(ShardSet ss1, ShardSet ss2) { + int compare = this.compare(ss1.getMethod(), ss2.getMethod()); + if (compare != 0) + return compare; + + return this.compare(ss1.getShards(), ss2.getShards()); + } + + private int compare(ShardMethodEnum method1, ShardMethodEnum method2) { + if (method1.equals(method2)) + return 0; + + switch (method1) { + case EXPLICIT_ID: + case EXPLICIT_ID_FALLBACK_LRIS: + return -1; + case PROPERTY: + case DATE: + switch (method2) { + case EXPLICIT_ID: + case EXPLICIT_ID_FALLBACK_LRIS: + return 1; + default: + return -1; + } + case ACL_ID: + case MOD_ACL_ID: + switch (method2) { + case EXPLICIT_ID: + case EXPLICIT_ID_FALLBACK_LRIS: + case PROPERTY: + case DATE: + return 1; + default: + return -1; + } + default: + switch (method2) { + case EXPLICIT_ID: + case EXPLICIT_ID_FALLBACK_LRIS: + case PROPERTY: + case DATE: + case ACL_ID: + case MOD_ACL_ID: + return 1; + default: + } + } + + return 0; + } + + private int compare(Short shards1, Short shards2) { + // the larger the shard count, the more shards that may need to be queried + // so prefer smaller shard counts + // no shard count (DB_ID_RANGE) should be treated as the worst (unlimited) + if (shards1 == null && shards2 == null) { + return 0; + } else if (shards1 == null) { + return 1; + } else if (shards2 == null) { + return -1; + } else { + return shards1.compareTo(shards2); + } + } + +}