diff --git a/config/alfresco/caches.properties b/config/alfresco/caches.properties index f21587a88f..89f42a117c 100644 --- a/config/alfresco/caches.properties +++ b/config/alfresco/caches.properties @@ -656,7 +656,7 @@ cache.solrFacetNodeRefSharedCache.readBackupData=false cache.shardStateSharedCache.tx.maxItems=100 cache.shardStateSharedCache.tx.statsEnabled=${caches.tx.statsEnabled} cache.shardStateSharedCache.maxItems=500 -cache.shardStateSharedCache.timeToLiveSeconds=300 +cache.shardStateSharedCache.timeToLiveSeconds=1800 cache.shardStateSharedCache.maxIdleSeconds=0 cache.shardStateSharedCache.cluster.type=fully-distributed cache.shardStateSharedCache.backup-count=1 diff --git a/source/java/org/alfresco/repo/index/shard/ShardRegistry.java b/source/java/org/alfresco/repo/index/shard/ShardRegistry.java index 4eea0973e5..0822fce266 100644 --- a/source/java/org/alfresco/repo/index/shard/ShardRegistry.java +++ b/source/java/org/alfresco/repo/index/shard/ShardRegistry.java @@ -18,9 +18,14 @@ */ package org.alfresco.repo.index.shard; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.alfresco.service.cmr.search.SearchParameters; +import org.alfresco.util.Pair; /** * @author Andy @@ -33,4 +38,8 @@ public interface ShardRegistry public List getIndexSlice(SearchParameters searchParameters); public void purge(); + + public HashMap>> getFlocs(); + + public void purgeAgedOutShards(); } diff --git a/source/java/org/alfresco/repo/index/shard/ShardRegistryImpl.java b/source/java/org/alfresco/repo/index/shard/ShardRegistryImpl.java index 50bd0b1bc3..76ee440b7d 100644 --- a/source/java/org/alfresco/repo/index/shard/ShardRegistryImpl.java +++ b/source/java/org/alfresco/repo/index/shard/ShardRegistryImpl.java @@ -44,6 +44,13 @@ import com.hazelcast.util.ConcurrentHashSet; */ public class ShardRegistryImpl implements ShardRegistry { + /** + * + */ + public static final String INSTANCE_STATE = "instance.state"; + + public enum ReplicaState {ACTIVE, LAGGING, SILENT }; + /** * The best shard sould be at the top; * @author Andy @@ -239,21 +246,28 @@ public class ShardRegistryImpl implements ShardRegistry } - private List selectShardInstancesForBestFlock(Set flocs) + private ArrayList>>> buildIndexes(Set flocs, boolean excludeTimedOut) { ArrayList>>> indexes = new ArrayList>>> (); for(Floc floc : flocs) { HashMap> index = new HashMap>(); - getShardStatesFromCache(floc, index); + getShardStatesFromCache(floc, index, excludeTimedOut); if (index.size() < floc.getNumberOfShards()) { updateShardStateCache(floc); - getShardStatesFromCache(floc, index); + getShardStatesFromCache(floc, index, excludeTimedOut); } indexes.add(new Pair>>(floc, index)); } + return indexes; + } + + + private List selectShardInstancesForBestFlock(Set flocs) + { + ArrayList>>> indexes = buildIndexes(flocs, true); Collections.sort(indexes, new FlocComparator()); @@ -287,7 +301,7 @@ public class ShardRegistryImpl implements ShardRegistry for(ShardState state :states) { - if( (maxTxId - state.getLastIndexedTxId()) <= maxAllowedReplicaTxCountDifference) + if( isShardAllowed(maxTxId, state)) { allowed.add(state); } @@ -297,20 +311,30 @@ public class ShardRegistryImpl implements ShardRegistry return allowed.toArray(new ShardState[] {}); } + /** + * @param maxTxId + * @param state + * @return + */ + private boolean isShardAllowed(long maxTxId, ShardState state) + { + return (maxTxId - state.getLastIndexedTxId()) <= maxAllowedReplicaTxCountDifference; + } + /** * @param floc */ private void updateShardStateCache(Floc floc) { ShardStateCollector shardStates = getPersistedShardStates(); - HashMap> shards = shardStates.getIndexes().get(floc); + HashMap> shards = shardStates.getIndexes().get(floc); if(shards != null) { - for (HashMap map : shards.values()) + for (HashSet set : shards.values()) { - for (ShardInstance instance : map.keySet()) + for (ShardState instance : set) { - shardStateCache.put(instance, map.get(instance)); + shardStateCache.put(instance.getShardInstance(), instance); } } } @@ -320,13 +344,13 @@ public class ShardRegistryImpl implements ShardRegistry * @param floc * @param index */ - private void getShardStatesFromCache(Floc floc, HashMap> index) + private void getShardStatesFromCache(Floc floc, HashMap> index, boolean excludeTimedOut) { long now = System.currentTimeMillis(); for (ShardInstance instance : shardStateCache.getKeys()) { ShardState state = shardStateCache.get(instance); - if( (now - state.getLastUpdated()) > (shardInstanceTimeoutInSeconds * 1000) ) + if(excludeTimedOut && isShardTimedOut(now, state) ) { continue; } @@ -344,6 +368,16 @@ public class ShardRegistryImpl implements ShardRegistry } } + /** + * @param now + * @param state + * @return + */ + private boolean isShardTimedOut(long now, ShardState state) + { + return (now - state.getLastUpdated()) > (shardInstanceTimeoutInSeconds * 1000); + } + private void updateKnownFlocs() { ShardStateCollector shardStates = getPersistedShardStates(); @@ -416,7 +450,7 @@ public class ShardRegistryImpl implements ShardRegistry { HashMap shardGuids = new HashMap(); - HashMap>> indexes = new HashMap>>(); + HashMap>> indexes = new HashMap>>(); public ShardStateCollector() { @@ -437,21 +471,19 @@ public class ShardRegistryImpl implements ShardRegistry shardGuids.put(shardState.getShardInstance(), shardInstanceGuid); - HashMap> shards = indexes.get(shardState.getShardInstance().getShard().getFloc()); + HashMap> shards = indexes.get(shardState.getShardInstance().getShard().getFloc()); if (shards == null) { - shards = new HashMap>(); + shards = new HashMap>(); indexes.put(shardState.getShardInstance().getShard().getFloc(), shards); } - HashMap shardInstances = shards.get(shardState.getShardInstance().getShard()); + HashSet shardInstances = shards.get(shardState.getShardInstance().getShard()); if (shardInstances == null) { - shardInstances = new HashMap(); + shardInstances = new HashSet(); shards.put(shardState.getShardInstance().getShard(), shardInstances); } - ShardState currentState = shardInstances.get(shardState.getShardInstance()); - - shardInstances.put(shardState.getShardInstance(), shardState); + shardInstances.add(shardState); return true; } @@ -467,10 +499,85 @@ public class ShardRegistryImpl implements ShardRegistry /** * @return the indexes */ - public HashMap>> getIndexes() + public HashMap>> getIndexes() { return indexes; } } + + /* (non-Javadoc) + * @see org.alfresco.repo.index.shard.ShardRegistry#getFlocs() + */ + @Override + public HashMap>> getFlocs() + { + ShardStateCollector shardStatesCollector = getPersistedShardStates(); + HashMap>> flocs = shardStatesCollector.getIndexes(); + + long now = System.currentTimeMillis(); + for (Floc floc : flocs.keySet()) + { + HashMap> shards = flocs.get(floc); + for(Shard shard : shards.keySet()) + { + HashSet instances = shards.get(shard); + + + long minTxId = Long.MAX_VALUE; + long maxTxId = 0; + for(ShardState state : instances) + { + minTxId = Math.min(minTxId, state.getLastIndexedTxId()); + maxTxId = Math.max(maxTxId, state.getLastIndexedTxId()); + } + + for(ShardState state : instances) + { + if(isShardTimedOut(now, state)) + { + state.getPropertyBag().put(INSTANCE_STATE, ReplicaState.SILENT.toString()); + } + else if(isShardAllowed(maxTxId, state)) + { + state.getPropertyBag().put(INSTANCE_STATE, ReplicaState.ACTIVE.toString()); + } + else + { + state.getPropertyBag().put(INSTANCE_STATE, ReplicaState.LAGGING.toString()); + } + } + + } + } + return flocs; + } + + /* (non-Javadoc) + * @see org.alfresco.repo.index.shard.ShardRegistry#purgeAgedOutShards() + */ + @Override + public void purgeAgedOutShards() + { + long now = System.currentTimeMillis(); + ShardStateCollector shardStates = getPersistedShardStates(); + + for(Floc floc : shardStates.indexes.keySet()) + { + HashMap> shards = shardStates.indexes.get(floc); + for(Shard shard : shards.keySet()) + { + HashSet states = shards.get(shard); + for(ShardState state : states) + { + if(isShardTimedOut(now, state)) + { + String guid = shardStates.shardGuids.get(state.getShardInstance()); + DeleteCallBack dcb = new DeleteCallBack(attributeService, guid); + transactionService.getRetryingTransactionHelper().doInTransaction(dcb, false, true); + } + } + } + } + } }