update from refactoring (incomplete)

This commit is contained in:
2024-11-13 18:03:22 -05:00
parent 006597f6fb
commit de8e0bf2d7
6 changed files with 490 additions and 354 deletions

View File

@@ -33,6 +33,11 @@
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.inteligr8.alfresco</groupId>
<artifactId>cachext-platform-module</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.inteligr8.alfresco</groupId>
<artifactId>asie-shared</artifactId>

View File

@@ -0,0 +1,23 @@
package com.inteligr8.alfresco.asie;
public interface CommunityConstants extends Constants {
static final String BEAN_SHARDSETS_CACHE = "asieShardsetsCache";
static final String BEAN_NODES_CACHE = "asieNodesCache";
static final String BEAN_SHARD_NODES_CACHE = "asieShardNodesCache";
static final String BEAN_SHARDINST_STATE_CACHE = "asieShardInstanceStateCache";
static final String BEAN_NODE_DISABLE_CACHE = "asieNodeDisabledCache";
static final String BEAN_NODE_UNAVAIL_CACHE = "asieNodeUnavailableCache";
static final String BEAN_SHARDINST_DISABLE_CACHE = "asieShardInstanceDisabledCache";
static final String BEAN_SHARDINST_UNAVAIL_CACHE = "asieShardInstanceUnavailableCache";
static final String BEAN_CORE_EXPLICIT_CACHE = "asieCoreExplicitCache";
static final String ATTR_ASIE_SHARDSET = "inteligr8.asie.shardSet";
static final String ATTR_ASIE_NODE = "inteligr8.asie.node";
static final String ATTR_ASIE_SHARD_NODES = "inteligr8.asie.shard.nodes";
static final String ATTR_ASIE_SHARD_NODE = "inteligr8.asie.shard.node";
static final String ATTR_OBJECT = "object";
static final String ATTR_DISABLE = "disabled";
static final String ATTR_NODES = "nodes";
}

View File

@@ -1,67 +0,0 @@
package com.inteligr8.alfresco.asie.cache;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import org.alfresco.repo.cache.SimpleCache;
public class MultiValueCache<K extends Serializable, V, C extends Collection<V>> implements SimpleCache<K, C> {
private SimpleCache<K, C> cache;
private Class<?> collectionType;
public MultiValueCache(SimpleCache<K, C> cache, Class<?> collectionType) {
this.cache = cache;
this.collectionType = collectionType;
}
@SuppressWarnings("unchecked")
public boolean add(K key, V value) {
C c = this.cache.get(key);
if (c != null)
return c.add(value);
try {
Constructor<?> constructor = this.collectionType.getConstructor();
c = (C) constructor.newInstance();
this.cache.put(key, c);
return c.add(value);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e) {
throw new UnsupportedOperationException(e.getMessage(), e);
}
}
@Override
public void clear() {
this.cache.clear();
}
@Override
public C get(K key) {
return this.cache.get(key);
}
@Override
public boolean contains(K key) {
C c = this.cache.get(key);
return c == null ? false : !c.isEmpty();
}
@Override
public Collection<K> getKeys() {
return this.cache.getKeys();
}
@Override
public void put(K key, C value) {
this.cache.put(key, value);
}
@Override
public void remove(K key) {
this.cache.remove(key);
}
}

View File

@@ -1,80 +1,25 @@
package com.inteligr8.alfresco.asie.service;
import java.io.Serializable;
import java.util.Arrays;
import org.alfresco.repo.cache.SimpleCache;
import org.alfresco.repo.index.shard.ShardInstance;
import org.alfresco.repo.index.shard.ShardState;
import org.alfresco.service.cmr.attributes.AttributeService;
import org.alfresco.service.cmr.attributes.AttributeService.AttributeQueryCallback;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.Constants;
import com.inteligr8.alfresco.asie.enterprise.EnterpriseConstants;
@Component
public class ShardStateService implements com.inteligr8.alfresco.asie.spi.ShardStateService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
@Qualifier(Constants.QUALIFIER_ASIE)
private AttributeService attrService;
@Autowired
@Qualifier(Constants.BEAN_SHARD_STATE_CACHE)
private SimpleCache<ShardInstance, ShardState> shardStateCache;
private SolrShardRegistry shardRegistry;
@Override
public void clear() {
this.logger.info("Removing all nodes/shards from the shard registry");
// this clears the state from the backend database
this.attrService.removeAttributes(EnterpriseConstants.ATTR_SHARD_STATE);
this.attrService.removeAttributes(EnterpriseConstants.ATTR_SHARD_SUBSCRIPTION);
this.attrService.removeAttributes(Constants.ATTR_ASIE);
// this clears the state from Hazelcast
this.shardStateCache.clear();
this.shardToGuidCache.clear();
}
public void remove(Serializable... keys) {
if (keys.length == 0)
throw new IllegalArgumentException();
this.logger.info("Removing from the shard registry: {}", Arrays.toString(keys));
Serializable[] shardStateKeys = keys;
Serializable[] shardSubKeys;
if (EnterpriseConstants.ATTR_SHARD_STATE.equals(keys[0])) {
shardSubKeys = ArrayUtils.clone(keys);
shardSubKeys[0] = EnterpriseConstants.ATTR_SHARD_SUBSCRIPTION;
} else {
shardStateKeys = ArrayUtils.addFirst(keys, EnterpriseConstants.ATTR_SHARD_STATE);
shardSubKeys = ArrayUtils.addFirst(keys, EnterpriseConstants.ATTR_SHARD_SUBSCRIPTION);
}
ShardState shardState = (ShardState) this.attrService.getAttribute(shardStateKeys);
// this clears the state from the backend database
this.attrService.removeAttribute(shardStateKeys);
this.attrService.removeAttribute(shardSubKeys);
// this clears the state from Hazelcast
if (shardState != null) {
this.shardStateCache.remove(shardState.getShardInstance());
this.shardToGuidCache.remove(shardState.getShardInstance());
}
}
public void iterate(AttributeQueryCallback callback) {
this.attrService.getAttributes(callback, EnterpriseConstants.ATTR_SHARD_STATE);
this.shardRegistry.purge();
}
}

View File

@@ -1,6 +1,7 @@
package com.inteligr8.alfresco.asie.service;
import java.io.Serializable;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -8,18 +9,12 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.alfresco.repo.cache.SimpleCache;
import org.alfresco.repo.index.shard.Floc;
import org.alfresco.repo.index.shard.Shard;
import org.alfresco.repo.index.shard.ShardInstance;
import org.alfresco.repo.index.shard.ShardMethodEnum;
import org.alfresco.repo.index.shard.ShardRegistry;
import org.alfresco.repo.index.shard.ShardState;
import org.alfresco.repo.lock.JobLockService;
import org.alfresco.service.cmr.attributes.AttributeService;
@@ -29,7 +24,6 @@ import org.alfresco.service.cmr.dictionary.DictionaryService;
import org.alfresco.service.cmr.search.SearchParameters;
import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -39,27 +33,29 @@ import org.springframework.context.ApplicationEvent;
import org.springframework.extensions.surf.util.AbstractLifecycleBean;
import org.springframework.stereotype.Component;
import com.inteligr8.alfresco.asie.CommunityConstants;
import com.inteligr8.alfresco.asie.Constants;
import com.inteligr8.alfresco.asie.cache.MultiValueCache;
import com.inteligr8.alfresco.asie.compute.QueryInspector;
import com.inteligr8.alfresco.asie.compute.QueryInspector.QueryRangeValue;
import com.inteligr8.alfresco.asie.compute.QueryInspector.QuerySingleValue;
import com.inteligr8.alfresco.asie.compute.QueryInspector.QueryValue;
import com.inteligr8.alfresco.asie.compute.QueryInspectorFactory;
import com.inteligr8.alfresco.asie.model.Node;
import com.inteligr8.alfresco.asie.model.Shard;
import com.inteligr8.alfresco.asie.model.ShardInstance;
import com.inteligr8.alfresco.asie.model.ShardInstanceState;
import com.inteligr8.alfresco.asie.model.ShardSet;
import com.inteligr8.alfresco.asie.model.SolrHost;
import com.inteligr8.alfresco.asie.spi.ShardRegistry;
import com.inteligr8.alfresco.cachext.CollectionCache;
import com.inteligr8.alfresco.cachext.MultiValueCache;
@Component
public class SolrShardRegistry extends AbstractLifecycleBean implements ShardRegistry {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Pattern coreShardPattern = Pattern.compile("(.+)-[0-9]+");
private final Random random = new Random();
private final QName shardLock = QName.createQName(Constants.NAMESPACE_ASIE, "shardLock");
@Autowired
private ShardStateService sss;
@Autowired
@Qualifier(Constants.QUALIFIER_ASIE)
private AttributeService attrService;
@@ -77,19 +73,39 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg
private JobLockService jobLockService;
@Autowired
@Qualifier(Constants.BEAN_FLOC_SHARD_NODE_CACHE)
private SimpleCache<Floc, Map<Integer, Set<ShardInstance>>> flocShardNodeCache;
@Qualifier(CommunityConstants.BEAN_SHARDSETS_CACHE)
private SimpleCache<String, ShardSet> shardsetsCache;
@Autowired
@Qualifier(Constants.BEAN_ONLINE_SHARD_STATE_CACHE)
private SimpleCache<ShardInstance, ShardState> onlineNodeShardStateCache;
@Qualifier(CommunityConstants.BEAN_NODES_CACHE)
private SimpleCache<String, SolrHost> nodesCache;
@Autowired
@Qualifier(Constants.BEAN_OFFILINE_SHARD_STATE_CACHE)
private SimpleCache<ShardInstance, ShardState> offlineNodeShardStateCache;
@Qualifier(CommunityConstants.BEAN_SHARD_NODES_CACHE)
private MultiValueCache<Shard, SolrHost> shardNodesCache;
@Autowired
@Qualifier(Constants.BEAN_CORE_EXPLICIT_CACHE)
@Qualifier(CommunityConstants.BEAN_SHARDINST_STATE_CACHE)
private SimpleCache<ShardInstance, ShardInstanceState> shardInstanceStatesCache;
@Autowired
@Qualifier(CommunityConstants.BEAN_NODE_UNAVAIL_CACHE)
private CollectionCache<SolrHost, HashSet<SolrHost>> nodeUnavailableCache;
@Autowired
@Qualifier(CommunityConstants.BEAN_NODE_DISABLE_CACHE)
private CollectionCache<SolrHost, HashSet<SolrHost>> nodeDisableCache;
@Autowired
@Qualifier(CommunityConstants.BEAN_SHARDINST_UNAVAIL_CACHE)
private CollectionCache<ShardInstance, HashSet<ShardInstance>> shardInstanceUnavailableCache;
@Autowired
@Qualifier(CommunityConstants.BEAN_SHARDINST_DISABLE_CACHE)
private CollectionCache<ShardInstance, HashSet<ShardInstance>> shardInstanceDisableCache;
@Autowired
@Qualifier(CommunityConstants.BEAN_CORE_EXPLICIT_CACHE)
private SimpleCache<String, QName> coreExplicitIdCache;
@Value("${inteligr8.asie.registerUnknownShardOffline}")
@@ -103,204 +119,343 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg
@Override
protected void onBootstrap(ApplicationEvent event) {
String lock = this.jobLockService.getLock(this.shardLock, 2500L, 500L, 10);
try {
this.attrService.getAttributes(new AttributeQueryCallback() {
@Override
public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) {
switch ((String) keys[2]) {
case Constants.ATTR_STATE:
ShardState shardNodeState = (ShardState) value;
ShardInstance shardNode = shardNodeState.getShardInstance();
cacheShard(shardNode, shardNodeState, (String) keys[1]);
return true;
default:
return true;
}
}
}, Constants.ATTR_ASIE_NODE_SHARD);
} finally {
this.jobLockService.releaseLock(lock, this.shardLock);
}
this.loadPersistedToCache();
}
@Override
protected void onShutdown(ApplicationEvent event) {
}
/**
* This is private because it must be wrapped in a cluster-safe lock
*/
private void cacheShard(ShardInstance shardNode, ShardState shardNodeState, String nodeShardId) {
ShardInstance detachedShardNode = this.detach(shardNode);
protected void loadPersistedToCache() {
String lockId = this.jobLockService.getLock(this.shardLock, 2500L, 500L, 10);
try {
this.attrService.getAttributes(new AttributeQueryCallback() {
@Override
public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) {
String core = (String) keys[1];
if (!shardsetsCache.contains(core)) {
ShardSet shardSet = (ShardSet) value;
shardsetsCache.put(core, shardSet);
SimpleCache<ShardInstance, ShardState> shardCache = this.onlineNodeShardStateCache;
ShardState cachedShardNodeState = this.onlineNodeShardStateCache.get(detachedShardNode);
if (cachedShardNodeState == null) {
cachedShardNodeState = this.offlineNodeShardStateCache.get(detachedShardNode);
shardCache = this.offlineNodeShardStateCache;
}
Shard shard = shardNode.getShard();
this.putPutAdd(this.flocShardNodeCache, shard.getFloc(), shard.getInstance(), detachedShardNode);
if (cachedShardNodeState == null) {
Boolean online = (Boolean) this.attrService.getAttribute(Constants.ATTR_ASIE_NODE_SHARD, nodeShardId, Constants.ATTR_ONLINE);
if (online != null) {
if (online.booleanValue()) {
this.onlineNodeShardStateCache.put(detachedShardNode, cachedShardNodeState);
} else {
this.offlineNodeShardStateCache.put(detachedShardNode, cachedShardNodeState);
}
} else {
if (this.registerOffline) {
this.offlineNodeShardStateCache.put(detachedShardNode, cachedShardNodeState);
} else {
this.onlineNodeShardStateCache.put(detachedShardNode, cachedShardNodeState);
}
}
} else if (cachedShardNodeState.getLastIndexedTxId() < shardNodeState.getLastIndexedTxId()) {
// update the cached state if the state's last indexes transaction is later
shardCache.put(shardNode, this.detach(shardNodeState));
}
switch (shardNode.getShard().getFloc().getShardMethod()) {
switch (shardSet.getMethod()) {
case EXPLICIT_ID:
cacheExplicitShard(shardNode, shardNodeState);
cacheExplicitShard(shardSet, false);
break;
default:
}
}
private void cacheExplicitShard(ShardInstance shardNode, ShardState shardNodeState) {
String coreName = shardNode.getShard().getFloc().getPropertyBag().get("coreName");
if (coreName != null && !this.coreExplicitIdCache.contains(coreName)) {
String property = shardNodeState.getPropertyBag().get("shard.key");
QName propertyQName = QName.createQName(property, this.namespaceService);
this.logger.debug("Mapping core to explicit ID: {} => {}", coreName, propertyQName);
this.coreExplicitIdCache.put(coreName, propertyQName);
return true;
}
}, CommunityConstants.ATTR_ASIE_SHARDSET);
this.attrService.getAttributes(new AttributeQueryCallback() {
@Override
public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) {
String nodeSpec = (String) keys[2];
SolrHost node = (SolrHost) value;
if (!nodesCache.contains(nodeSpec))
nodesCache.put(nodeSpec, node);
if (Boolean.TRUE.equals(attrService.getAttribute(CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_DISABLE, nodeSpec))) {
if (!nodeDisableCache.contains(node))
nodeDisableCache.add(node);
} else if (nodeDisableCache.contains(node)) {
nodeDisableCache.remove(node);
}
return true;
}
}, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_OBJECT);
this.attrService.getAttributes(new AttributeQueryCallback() {
@Override
public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) {
Shard shard = (Shard) keys[1];
SolrHost node = (SolrHost) keys[2];
if (!shardNodesCache.contains(shard, node))
shardNodesCache.add(shard, node);
return true;
}
}, CommunityConstants.ATTR_ASIE_SHARD_NODES);
this.attrService.getAttributes(new AttributeQueryCallback() {
@Override
public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) {
ShardInstance shardNode = (ShardInstance) keys[2];
ShardInstanceState state = (ShardInstanceState) value;
if (!shardInstanceStatesCache.contains(shardNode))
shardInstanceStatesCache.put(shardNode, state);
if (Boolean.TRUE.equals(attrService.getAttribute(CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_DISABLE, shardNode))) {
if (!shardInstanceDisableCache.contains(shardNode))
shardInstanceDisableCache.add(shardNode);
} else if (shardInstanceDisableCache.contains(shardNode)) {
shardInstanceDisableCache.remove(shardNode);
}
return true;
}
}, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_OBJECT);
} finally {
this.jobLockService.releaseLock(lockId, this.shardLock);
}
}
private void cacheExplicitShard(ShardSet shardSet, boolean overwrite) {
if (overwrite || !this.coreExplicitIdCache.contains(shardSet.getCore())) {
String property = shardSet.getPrefixedProperty();
QName propertyQName = QName.createQName(property, namespaceService);
this.logger.debug("Mapping core to explicit ID: {} => {}", shardSet.getCore(), propertyQName);
this.coreExplicitIdCache.put(shardSet.getCore(), propertyQName);
}
}
protected void persistCache() {
String lockId = this.jobLockService.getLock(this.shardLock, 2500L, 100L, 50);
try {
this.persistShardSetCache();
this.persistNodeCache();
this.persistShardNodeCache();
this.persistShardInstanceCache();
} finally {
this.jobLockService.releaseLock(lockId, this.shardLock);
}
}
private void persistShardSetCache() {
// add anything missing
// update anything changed
for (String core : this.shardsetsCache.getKeys()) {
ShardSet shardSet = this.shardsetsCache.get(core);
this.checkSetAttribute(shardSet, CommunityConstants.ATTR_ASIE_SHARDSET, core);
}
// we are not removing anything removed from the cache, as it might have expired
// it will just recache on the next load
}
private void persistNodeCache() {
// add anything missing
// update anything changed
for (String nodeSpec : this.nodesCache.getKeys()) {
SolrHost node = this.nodesCache.get(nodeSpec);
this.checkSetAttribute(node, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_OBJECT, nodeSpec);
}
// we are not removing anything removed from the cache, as it might have expired
// it will just recache on the next load
// add anything disabled
for (SolrHost node : this.nodeDisableCache.values())
this.checkSetAttribute(Boolean.TRUE, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_DISABLE, node.getSpec());
// remove anything not disabled
this.attrService.getAttributes(new AttributeQueryCallback() {
@Override
public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) {
SolrHost node = SolrHost.from((String) keys[2]);
if (!nodeDisableCache.contains(node))
attrService.removeAttribute(keys);
return true;
}
}, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_DISABLE);
}
private void persistShardNodeCache() {
// add anything missing
// update anything changed
for (Shard shard : this.shardNodesCache.getKeys()) {
Collection<SolrHost> nodes = this.shardNodesCache.get(shard);
for (SolrHost node : nodes) {
this.checkSetAttribute(node, CommunityConstants.ATTR_ASIE_SHARD_NODES, shard, node.getSpec());
}
}
// we are not removing anything removed from the cache, as it might have expired
// it will just recache on the next load
}
private void persistShardInstanceCache() {
// add anything missing
// update anything changed
for (ShardInstance shardNode : this.shardInstanceStatesCache.getKeys()) {
ShardInstanceState state = this.shardInstanceStatesCache.get(shardNode);
ShardInstanceState currentState = (ShardInstanceState) this.attrService.getAttribute(CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_OBJECT, shardNode);
if (currentState != null) {
if (currentState.compareTo(state) >= 0) {
// do nothing
} else {
this.logger.debug("The persisted state was old; updating: {}: {} => {}", shardNode, currentState, state);
this.attrService.setAttribute(state, CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_OBJECT, shardNode);
}
} else {
this.attrService.setAttribute(state, CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_OBJECT, shardNode);
}
}
// we are not removing anything removed from the cache, as it might have expired
// it will just recache on the next load
// add anything disabled
for (ShardInstance shardNode : this.shardInstanceDisableCache.values())
this.checkSetAttribute(Boolean.TRUE, CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_DISABLE, shardNode);
// remove anything not disabled
this.attrService.getAttributes(new AttributeQueryCallback() {
@Override
public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) {
ShardInstance shardNode = (ShardInstance) keys[2];
if (!shardInstanceDisableCache.contains(shardNode))
attrService.removeAttribute(keys);
return true;
}
}, CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_DISABLE);
}
protected void persistCache(ShardSet shardSet, SolrHost node, Shard shard, ShardInstance shardNode, ShardInstanceState state) {
String lockId = this.jobLockService.getLock(this.shardLock, 2000L, 100L, 50);
try {
this.checkSetAttribute(shardSet, CommunityConstants.ATTR_ASIE_SHARDSET, shardSet.getCore());
this.checkSetAttribute(node, CommunityConstants.ATTR_ASIE_NODE, CommunityConstants.ATTR_OBJECT, node.getSpec());
this.checkSetAttribute(node, CommunityConstants.ATTR_ASIE_SHARD_NODES, shard, node.getSpec());
this.checkSetAttribute(state, CommunityConstants.ATTR_ASIE_SHARD_NODE, CommunityConstants.ATTR_OBJECT, shardNode);
} finally {
this.jobLockService.releaseLock(lockId, this.shardLock);
}
}
private void checkSetAttribute(Serializable value, Serializable... keys) {
Serializable currentValue = this.attrService.getAttribute(keys);
if (currentValue != null) {
if (currentValue.equals(value))
return;
this.logger.warn("The attribute value unexpectedly changed: {}: {} => {}", keys, currentValue, value);
}
this.attrService.setAttribute(value, keys);
}
@Override
public void registerShardState(ShardState shardNodeState) {
ShardInstance shardNode = shardNodeState.getShardInstance();
Node node = new Node(shardNode);
this.fixFlocPropertyBag(shardNodeState);
ShardSet shardSet = ShardSet.from(shardNodeState.getShardInstance().getShard().getFloc(), shardNodeState);
Shard shard = Shard.from(shardSet, shardNodeState.getShardInstance().getShard().getInstance());
SolrHost node = SolrHost.from(shardNodeState.getShardInstance());
ShardInstance shardNode = ShardInstance.from(shard, node);
ShardInstanceState state = ShardInstanceState.from(shardNodeState);
String lock = this.jobLockService.getLock(this.shardLock, 2500L, 500L, 10);
try {
this.cacheShard(shardNode, shardNodeState, node.getId());
this.persistShards();
} finally {
this.jobLockService.releaseLock(lock, this.shardLock);
}
}
protected void fixFlocPropertyBag(ShardState shardNodeState) {
Floc floc = shardNodeState.getShardInstance().getShard().getFloc();
if (floc.getPropertyBag().isEmpty()) {
for (Entry<String, String> prop : shardNodeState.getPropertyBag().entrySet()) {
if (prop.getKey().startsWith("shard.")) {
floc.getPropertyBag().put(prop.getKey(), prop.getValue());
} else if (prop.getKey().equals("coreName")) {
String coreName = this.extractCoreName(prop.getValue());
if (coreName != null)
floc.getPropertyBag().put(prop.getKey(), coreName);
}
}
}
}
protected String extractCoreName(String coreShardName) {
Matcher matcher = this.coreShardPattern.matcher(coreShardName);
if (!matcher.matches())
return null;
return matcher.group(1);
}
/**
* This is private because it must be wrapped in a cluster-safe lock
*/
private void persistShards() {
long onlineExpired = System.currentTimeMillis() - this.offlineIdleShardInSeconds * 1000L;
long offlineExpired = System.currentTimeMillis() - this.forgetOfflineShardInSeconds * 1000L;
for (ShardInstance shardNode : this.onlineNodeShardStateCache.getKeys()) {
String nodeShardId = new Node(shardNode).getId() + ";" + shardNode.getShard().getInstance();
ShardState shardNodeState = this.onlineNodeShardStateCache.get(shardNode);
if (shardNodeState.getLastUpdated() < onlineExpired) {
this.logger.warn("Taking shard offline: {}", shardNode);
this.onlineNodeShardStateCache.remove(shardNode);
this.offlineNodeShardStateCache.put(shardNode, shardNodeState);
} else {
this.attrService.setAttribute(shardNodeState, Constants.ATTR_ASIE_NODE_SHARD, nodeShardId, Constants.ATTR_STATE);
this.attrService.setAttribute(Boolean.TRUE, Constants.ATTR_ASIE_NODE_SHARD, nodeShardId, Constants.ATTR_ONLINE);
}
}
for (ShardInstance shardNode : this.offlineNodeShardStateCache.getKeys()) {
String nodeShardId = new Node(shardNode).getId() + ";" + shardNode.getShard().getInstance();
ShardState shardNodeState = this.offlineNodeShardStateCache.get(shardNode);
if (shardNodeState.getLastUpdated() < offlineExpired) {
this.logger.info("Forgetting about already offline shard: {}", shardNode);
this.offlineNodeShardStateCache.remove(shardNode);
} else {
this.attrService.setAttribute(shardNodeState, Constants.ATTR_ASIE_NODE_SHARD, nodeShardId, Constants.ATTR_STATE);
this.attrService.setAttribute(Boolean.FALSE, Constants.ATTR_ASIE_NODE_SHARD, nodeShardId, Constants.ATTR_ONLINE);
}
}
this.persistCache(shardSet, node, shard, shardNode, state);
}
@Override
public Map<Floc, Map<Shard, Set<ShardState>>> getFlocs() {
Map<Floc, Map<Shard, Set<ShardState>>> flocs = new HashMap<>();
public void unregisterShardInstance(org.alfresco.repo.index.shard.ShardInstance shardInstance) {
ShardSet shardSet = ShardSet.from(shardInstance.getShard().getFloc(), null);
Shard shard = Shard.from(shardSet, shardInstance.getShard().getInstance());
SolrHost node = SolrHost.from(shardInstance);
ShardInstance shardNode = ShardInstance.from(shard, node);
for (ShardInstance shardNode : this.onlineNodeShardStateCache.getKeys()) {
Floc floc = shardNode.getShard().getFloc();
Map<Shard, Set<ShardState>> shards = flocs.get(floc);
if (shards == null)
flocs.put(floc, shards = new HashMap<>());
Set<ShardState> shardNodeStates = shards.get(shardNode.getShard());
if (shardNodeStates == null)
shards.put(shardNode.getShard(), shardNodeStates = new HashSet<>());
ShardState shardNodeState = this.onlineNodeShardStateCache.get(shardNode);
if (shardNodeState != null) // in case it was removed during the looping (very rare)
shardNodeStates.add(shardNodeState);
this.shardInstanceStatesCache.remove(shardNode);
this.shardInstanceDisableCache.remove(shardNode);
this.shardInstanceUnavailableCache.remove(shardNode);
this.nodeDisableCache.remove(node);
this.nodeUnavailableCache.remove(node);
this.attrService.removeAttribute(CommunityConstants.ATTR_ASIE_SHARD_NODES, shard, node.getSpec());
}
return flocs;
@Override
public Map<Floc, Map<org.alfresco.repo.index.shard.Shard, Set<ShardState>>> getFlocs() {
Map<String, Floc> flocs = new HashMap<>();
Map<Floc, Map<org.alfresco.repo.index.shard.Shard, Set<ShardState>>> response = new HashMap<>();
for (Shard shard : this.shardNodesCache.getKeys()) {
String core = shard.extractShardSetCore();
ShardSet shardSet = this.shardsetsCache.get(core);
Map<org.alfresco.repo.index.shard.Shard, Set<ShardState>> shards;
Floc floc = flocs.get(core);
if (floc != null) {
floc = shardSet.toAlfrescoModel();
shards = new HashMap<>();
} else {
shards = response.get(floc);
}
org.alfresco.repo.index.shard.Shard shard_ = shard.toAlfrescoModel(floc);
Set<ShardState> states = shards.get(shard_);
if (states == null)
states = new HashSet<>();
for (SolrHost node : this.shardNodesCache.get(shard)) {
if (this.nodeDisableCache.contains(node) || this.nodeUnavailableCache.contains(node)) {
this.logger.debug("Excluding node as it is disabled or considered unavailable: {}", node);
continue;
}
ShardInstance shardNode = ShardInstance.from(shard, node);
if (this.shardInstanceDisableCache.contains(shardNode) || this.shardInstanceUnavailableCache.contains(shardNode)) {
this.logger.debug("Excluding shard node as it is disabled or considered unavailable: {}", shardNode);
continue;
}
ShardInstanceState shardNodeState = this.shardInstanceStatesCache.get(shardNode);
states.add(shardNodeState.toAlfrescoModel(shardNode.toAlfrescoModel(shard_)));
}
if (!states.isEmpty())
shards.put(shard_, states);
if (!shards.isEmpty())
response.put(floc, shards);
}
return response;
}
@Override
public void purge() {
this.sss.clear();
String lockId = this.jobLockService.getLock(this.shardLock, 2500L, 100L, 50);
try {
this.logger.info("Removing all nodes/shards from the shard registry");
this.shardsetsCache.clear();
this.attrService.removeAttributes(CommunityConstants.ATTR_ASIE_SHARDSET);
this.nodesCache.clear();
this.nodeDisableCache.clear();
this.nodeUnavailableCache.clear();
this.attrService.removeAttributes(CommunityConstants.ATTR_ASIE_NODE);
this.shardNodesCache.clear();
this.attrService.removeAttributes(CommunityConstants.ATTR_ASIE_SHARD_NODES);
this.shardInstanceStatesCache.clear();
this.shardInstanceDisableCache.clear();
this.shardInstanceUnavailableCache.clear();
this.attrService.removeAttributes(CommunityConstants.ATTR_ASIE_SHARD_NODE);
} finally {
this.jobLockService.releaseLock(lockId, this.shardLock);
}
}
@Override
public void purgeAgedOutShards() {
long onlineExpired = System.currentTimeMillis() - this.offlineIdleShardInSeconds * 1000L;
long offlineExpired = System.currentTimeMillis() - this.forgetOfflineShardInSeconds * 1000L;
OffsetDateTime onlineExpired = OffsetDateTime.now().minusSeconds(this.offlineIdleShardInSeconds);
OffsetDateTime offlineExpired = OffsetDateTime.now().minusSeconds(this.forgetOfflineShardInSeconds);
for (ShardInstance shardNode : this.onlineNodeShardStateCache.getKeys()) {
ShardState shardNodeState = this.onlineNodeShardStateCache.get(shardNode);
if (shardNodeState.getLastUpdated() < onlineExpired) {
this.logger.warn("Taking shard offline: {}", shardNode);
this.onlineNodeShardStateCache.remove(shardNode);
this.offlineNodeShardStateCache.put(shardNode, shardNodeState);
}
}
for (ShardInstance shardNode : this.shardInstanceStatesCache.getKeys()) {
ShardInstanceState state = this.shardInstanceStatesCache.get(shardNode);
SolrHost node = shardNode.extractNode();
for (ShardInstance shardNode : this.offlineNodeShardStateCache.getKeys()) {
ShardState shardNodeState = this.offlineNodeShardStateCache.get(shardNode);
if (shardNodeState.getLastUpdated() < offlineExpired) {
if (this.shardInstanceDisableCache.contains(shardNode)) {
this.logger.debug("Ignoring disabled shard instance during purgeAgedOutShards()");
} else if (this.nodeDisableCache.contains(node)) {
this.logger.debug("Ignoring disabled node during purgeAgedOutShards()");
} else if (state.getLastUpdated().isBefore(offlineExpired)) {
this.shardInstanceStatesCache.remove(shardNode);
if (this.shardInstanceUnavailableCache.remove(shardNode)) {
this.logger.info("Forgetting about already offline shard: {}", shardNode);
this.offlineNodeShardStateCache.remove(shardNode);
} else if (this.nodeUnavailableCache.remove(node)) {
this.logger.info("Forgetting about already offline shard: {}", shardNode);
} else {
this.logger.warn("Forgetting about online shard: {}", shardNode);
}
} else if (state.getLastUpdated().isBefore(onlineExpired)) {
this.logger.warn("Taking shard offline: {}", shardNode);
this.shardInstanceUnavailableCache.add(shardNode);
}
}
}
@@ -314,8 +469,15 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg
public Set<Integer> getShardInstanceList(String coreName) {
Set<Integer> shardIds = new HashSet<>();
for (ShardInstance shardNode : this.onlineNodeShardStateCache.getKeys()) {
shardIds.add(shardNode.getShard().getInstance());
ShardSet shardSet = this.shardsetsCache.get(coreName);
if (shardSet == null)
return Collections.emptySet();
for (Shard shard : this.shardNodesCache.getKeys()) {
if (shardSet.getCore().equals(shard.extractShardSetCore())) {
shardIds.add(shard.extractShardId());
}
}
return shardIds;
@@ -327,18 +489,19 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg
}
@Override
public List<ShardInstance> getIndexSlice(SearchParameters searchParameters) {
public List<org.alfresco.repo.index.shard.ShardInstance> getIndexSlice(SearchParameters searchParameters) {
if (searchParameters.getQuery() == null)
return Collections.emptyList();
List<ShardInstance> bestShards = null;
List<org.alfresco.repo.index.shard.ShardInstance> bestShards = null;
for (Floc floc : this.flocShardMultiCache.getKeys()) {
List<ShardInstance> shards = new LinkedList<>();
for (String shardSetSpec : this.shardsetsCache.getKeys()) {
ShardSet shardSet = this.shardsetsCache.get(shardSetSpec);
List<org.alfresco.repo.index.shard.ShardInstance> shards = new LinkedList<>();
switch (floc.getShardMethod()) {
switch (shardSet.getMethod()) {
case EXPLICIT_ID:
String property = floc.getPropertyBag().get("shard.key");
String property = shardSet.getPrefixedProperty();
QName propertyQName = QName.createQName(property, this.namespaceService);
DataTypeDefinition dtdef = this.dictionaryService.getProperty(propertyQName).getDataType();
@@ -367,8 +530,7 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg
}
}
// shardIds to shardInstances
break;
shards.addAll(this.getIndexSlice(shardSet, shardIds));
default:
// make no determination
}
@@ -380,42 +542,31 @@ public class SolrShardRegistry extends AbstractLifecycleBean implements ShardReg
return bestShards;
}
protected List<ShardInstance> getIndexSlice() {
protected List<org.alfresco.repo.index.shard.ShardInstance> getIndexSlice(ShardSet shardSet, Collection<Integer> shardIds) {
List<org.alfresco.repo.index.shard.ShardInstance> shardNodes = new LinkedList<>();
for (Integer shardId : shardIds) {
Shard shard = Shard.from(shardSet, shardId);
Collection<SolrHost> nodes = this.shardNodesCache.get(shard);
List<SolrHost> availableNodes = new LinkedList<>();
for (SolrHost node : nodes) {
if (this.nodeDisableCache.contains(node) || this.nodeUnavailableCache.contains(node))
continue;
ShardInstance shardNode = ShardInstance.from(shard, node);
if (this.shardInstanceDisableCache.contains(shardNode) || this.shardInstanceUnavailableCache.contains(shardNode))
continue;
availableNodes.add(node);
}
private ShardInstance detach(ShardInstance shardNode) {
ShardInstance detachedShardNode = new ShardInstance();
detachedShardNode.setHostName(shardNode.getHostName());
detachedShardNode.setPort(shardNode.getPort());
detachedShardNode.setBaseUrl(shardNode.getBaseUrl());
return detachedShardNode;
SolrHost randomNode = availableNodes.get(this.random.nextInt(availableNodes.size()));
shardNodes.add(ShardInstance.from(shard, randomNode).toAlfrescoModel(shard.toAlfrescoModel(shardSet.toAlfrescoModel())));
}
private ShardState detach(ShardState shardState) {
ShardState detachedShardState = new ShardState();
detachedShardState.setLastIndexedChangeSetCommitTime(shardState.getLastIndexedChangeSetCommitTime());
detachedShardState.setLastIndexedChangeSetId(shardState.getLastIndexedChangeSetId());
detachedShardState.setLastIndexedTxCommitTime(shardState.getLastIndexedTxCommitTime());
detachedShardState.setLastIndexedTxId(shardState.getLastIndexedTxId());
detachedShardState.setLastUpdated(shardState.getLastUpdated());
detachedShardState.setMaster(shardState.isMaster());
detachedShardState.setPropertyBag(shardState.getPropertyBag());
return detachedShardState;
}
private <K1 extends Serializable, K2, V> boolean putPutAdd(SimpleCache<K1, Map<K2, Set<V>>> cache, K1 cacheKey, K2 mapKey, V mapValue) {
Map<K2, Set<V>> map = cache.get(cacheKey);
if (map == null)
map = new HashMap<>();
return this.putAdd(map, mapKey, mapValue);
}
private <K, V> boolean putAdd(Map<K, Set<V>> map, K key, V value) {
Set<V> set = map.get(key);
if (set == null)
set = new HashSet<>();
return set.add(value);
return shardNodes;
}
}

View File

@@ -0,0 +1,79 @@
package com.inteligr8.alfresco.asie.util;
import java.util.Comparator;
import org.alfresco.repo.index.shard.ShardMethodEnum;
import com.inteligr8.alfresco.asie.model.ShardSet;
public class ShardSetSearchComparator implements Comparator<ShardSet> {
@Override
public int compare(ShardSet ss1, ShardSet ss2) {
int compare = this.compare(ss1.getMethod(), ss2.getMethod());
if (compare != 0)
return compare;
return this.compare(ss1.getShards(), ss2.getShards());
}
private int compare(ShardMethodEnum method1, ShardMethodEnum method2) {
if (method1.equals(method2))
return 0;
switch (method1) {
case EXPLICIT_ID:
case EXPLICIT_ID_FALLBACK_LRIS:
return -1;
case PROPERTY:
case DATE:
switch (method2) {
case EXPLICIT_ID:
case EXPLICIT_ID_FALLBACK_LRIS:
return 1;
default:
return -1;
}
case ACL_ID:
case MOD_ACL_ID:
switch (method2) {
case EXPLICIT_ID:
case EXPLICIT_ID_FALLBACK_LRIS:
case PROPERTY:
case DATE:
return 1;
default:
return -1;
}
default:
switch (method2) {
case EXPLICIT_ID:
case EXPLICIT_ID_FALLBACK_LRIS:
case PROPERTY:
case DATE:
case ACL_ID:
case MOD_ACL_ID:
return 1;
default:
}
}
return 0;
}
private int compare(Short shards1, Short shards2) {
// the larger the shard count, the more shards that may need to be queried
// so prefer smaller shard counts
// no shard count (DB_ID_RANGE) should be treated as the worst (unlimited)
if (shards1 == null && shards2 == null) {
return 0;
} else if (shards1 == null) {
return 1;
} else if (shards2 == null) {
return -1;
} else {
return shards1.compareTo(shards2);
}
}
}