Fix for ACE-4210 SOLR 4 - sharded - Dynamic shard information is not exposed via JMX

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@113664 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Andrew Hind
2015-10-05 09:04:41 +00:00
parent 17bcd50769
commit faf109d9af
3 changed files with 136 additions and 20 deletions

View File

@@ -656,7 +656,7 @@ cache.solrFacetNodeRefSharedCache.readBackupData=false
cache.shardStateSharedCache.tx.maxItems=100 cache.shardStateSharedCache.tx.maxItems=100
cache.shardStateSharedCache.tx.statsEnabled=${caches.tx.statsEnabled} cache.shardStateSharedCache.tx.statsEnabled=${caches.tx.statsEnabled}
cache.shardStateSharedCache.maxItems=500 cache.shardStateSharedCache.maxItems=500
cache.shardStateSharedCache.timeToLiveSeconds=300 cache.shardStateSharedCache.timeToLiveSeconds=1800
cache.shardStateSharedCache.maxIdleSeconds=0 cache.shardStateSharedCache.maxIdleSeconds=0
cache.shardStateSharedCache.cluster.type=fully-distributed cache.shardStateSharedCache.cluster.type=fully-distributed
cache.shardStateSharedCache.backup-count=1 cache.shardStateSharedCache.backup-count=1

View File

@@ -18,9 +18,14 @@
*/ */
package org.alfresco.repo.index.shard; package org.alfresco.repo.index.shard;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import org.alfresco.service.cmr.search.SearchParameters; import org.alfresco.service.cmr.search.SearchParameters;
import org.alfresco.util.Pair;
/** /**
* @author Andy * @author Andy
@@ -33,4 +38,8 @@ public interface ShardRegistry
public List<ShardInstance> getIndexSlice(SearchParameters searchParameters); public List<ShardInstance> getIndexSlice(SearchParameters searchParameters);
public void purge(); public void purge();
public HashMap<Floc, HashMap<Shard, HashSet<ShardState>>> getFlocs();
public void purgeAgedOutShards();
} }

View File

@@ -44,6 +44,13 @@ import com.hazelcast.util.ConcurrentHashSet;
*/ */
public class ShardRegistryImpl implements ShardRegistry public class ShardRegistryImpl implements ShardRegistry
{ {
/**
*
*/
public static final String INSTANCE_STATE = "instance.state";
public enum ReplicaState {ACTIVE, LAGGING, SILENT };
/** /**
* The best shard sould be at the top; * The best shard sould be at the top;
* @author Andy * @author Andy
@@ -239,21 +246,28 @@ public class ShardRegistryImpl implements ShardRegistry
} }
private List<ShardInstance> selectShardInstancesForBestFlock(Set<Floc> flocs) private ArrayList<Pair<Floc, HashMap<Shard, HashSet<ShardState>>>> buildIndexes(Set<Floc> flocs, boolean excludeTimedOut)
{ {
ArrayList<Pair<Floc, HashMap<Shard, HashSet<ShardState>>>> indexes = new ArrayList<Pair<Floc, HashMap<Shard, HashSet<ShardState>>>> (); ArrayList<Pair<Floc, HashMap<Shard, HashSet<ShardState>>>> indexes = new ArrayList<Pair<Floc, HashMap<Shard, HashSet<ShardState>>>> ();
for(Floc floc : flocs) for(Floc floc : flocs)
{ {
HashMap<Shard, HashSet<ShardState>> index = new HashMap<Shard, HashSet<ShardState>>(); HashMap<Shard, HashSet<ShardState>> index = new HashMap<Shard, HashSet<ShardState>>();
getShardStatesFromCache(floc, index); getShardStatesFromCache(floc, index, excludeTimedOut);
if (index.size() < floc.getNumberOfShards()) if (index.size() < floc.getNumberOfShards())
{ {
updateShardStateCache(floc); updateShardStateCache(floc);
getShardStatesFromCache(floc, index); getShardStatesFromCache(floc, index, excludeTimedOut);
} }
indexes.add(new Pair<Floc, HashMap<Shard, HashSet<ShardState>>>(floc, index)); indexes.add(new Pair<Floc, HashMap<Shard, HashSet<ShardState>>>(floc, index));
} }
return indexes;
}
private List<ShardInstance> selectShardInstancesForBestFlock(Set<Floc> flocs)
{
ArrayList<Pair<Floc, HashMap<Shard, HashSet<ShardState>>>> indexes = buildIndexes(flocs, true);
Collections.sort(indexes, new FlocComparator()); Collections.sort(indexes, new FlocComparator());
@@ -287,7 +301,7 @@ public class ShardRegistryImpl implements ShardRegistry
for(ShardState state :states) for(ShardState state :states)
{ {
if( (maxTxId - state.getLastIndexedTxId()) <= maxAllowedReplicaTxCountDifference) if( isShardAllowed(maxTxId, state))
{ {
allowed.add(state); allowed.add(state);
} }
@@ -297,20 +311,30 @@ public class ShardRegistryImpl implements ShardRegistry
return allowed.toArray(new ShardState[] {}); return allowed.toArray(new ShardState[] {});
} }
/**
* @param maxTxId
* @param state
* @return
*/
private boolean isShardAllowed(long maxTxId, ShardState state)
{
return (maxTxId - state.getLastIndexedTxId()) <= maxAllowedReplicaTxCountDifference;
}
/** /**
* @param floc * @param floc
*/ */
private void updateShardStateCache(Floc floc) private void updateShardStateCache(Floc floc)
{ {
ShardStateCollector shardStates = getPersistedShardStates(); ShardStateCollector shardStates = getPersistedShardStates();
HashMap<Shard, HashMap<ShardInstance, ShardState>> shards = shardStates.getIndexes().get(floc); HashMap<Shard, HashSet<ShardState>> shards = shardStates.getIndexes().get(floc);
if(shards != null) if(shards != null)
{ {
for (HashMap<ShardInstance, ShardState> map : shards.values()) for (HashSet<ShardState> set : shards.values())
{ {
for (ShardInstance instance : map.keySet()) for (ShardState instance : set)
{ {
shardStateCache.put(instance, map.get(instance)); shardStateCache.put(instance.getShardInstance(), instance);
} }
} }
} }
@@ -320,13 +344,13 @@ public class ShardRegistryImpl implements ShardRegistry
* @param floc * @param floc
* @param index * @param index
*/ */
private void getShardStatesFromCache(Floc floc, HashMap<Shard, HashSet<ShardState>> index) private void getShardStatesFromCache(Floc floc, HashMap<Shard, HashSet<ShardState>> index, boolean excludeTimedOut)
{ {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
for (ShardInstance instance : shardStateCache.getKeys()) for (ShardInstance instance : shardStateCache.getKeys())
{ {
ShardState state = shardStateCache.get(instance); ShardState state = shardStateCache.get(instance);
if( (now - state.getLastUpdated()) > (shardInstanceTimeoutInSeconds * 1000) ) if(excludeTimedOut && isShardTimedOut(now, state) )
{ {
continue; continue;
} }
@@ -344,6 +368,16 @@ public class ShardRegistryImpl implements ShardRegistry
} }
} }
/**
* @param now
* @param state
* @return
*/
private boolean isShardTimedOut(long now, ShardState state)
{
return (now - state.getLastUpdated()) > (shardInstanceTimeoutInSeconds * 1000);
}
private void updateKnownFlocs() private void updateKnownFlocs()
{ {
ShardStateCollector shardStates = getPersistedShardStates(); ShardStateCollector shardStates = getPersistedShardStates();
@@ -416,7 +450,7 @@ public class ShardRegistryImpl implements ShardRegistry
{ {
HashMap<ShardInstance, String> shardGuids = new HashMap<ShardInstance, String>(); HashMap<ShardInstance, String> shardGuids = new HashMap<ShardInstance, String>();
HashMap<Floc, HashMap<Shard, HashMap<ShardInstance, ShardState>>> indexes = new HashMap<Floc, HashMap<Shard, HashMap<ShardInstance, ShardState>>>(); HashMap<Floc, HashMap<Shard, HashSet<ShardState>>> indexes = new HashMap<Floc, HashMap<Shard, HashSet<ShardState>>>();
public ShardStateCollector() public ShardStateCollector()
{ {
@@ -437,21 +471,19 @@ public class ShardRegistryImpl implements ShardRegistry
shardGuids.put(shardState.getShardInstance(), shardInstanceGuid); shardGuids.put(shardState.getShardInstance(), shardInstanceGuid);
HashMap<Shard, HashMap<ShardInstance, ShardState>> shards = indexes.get(shardState.getShardInstance().getShard().getFloc()); HashMap<Shard, HashSet<ShardState>> shards = indexes.get(shardState.getShardInstance().getShard().getFloc());
if (shards == null) if (shards == null)
{ {
shards = new HashMap<Shard, HashMap<ShardInstance, ShardState>>(); shards = new HashMap<Shard, HashSet<ShardState>>();
indexes.put(shardState.getShardInstance().getShard().getFloc(), shards); indexes.put(shardState.getShardInstance().getShard().getFloc(), shards);
} }
HashMap<ShardInstance, ShardState> shardInstances = shards.get(shardState.getShardInstance().getShard()); HashSet<ShardState> shardInstances = shards.get(shardState.getShardInstance().getShard());
if (shardInstances == null) if (shardInstances == null)
{ {
shardInstances = new HashMap<ShardInstance, ShardState>(); shardInstances = new HashSet<ShardState>();
shards.put(shardState.getShardInstance().getShard(), shardInstances); shards.put(shardState.getShardInstance().getShard(), shardInstances);
} }
ShardState currentState = shardInstances.get(shardState.getShardInstance()); shardInstances.add(shardState);
shardInstances.put(shardState.getShardInstance(), shardState);
return true; return true;
} }
@@ -467,10 +499,85 @@ public class ShardRegistryImpl implements ShardRegistry
/** /**
* @return the indexes * @return the indexes
*/ */
public HashMap<Floc, HashMap<Shard, HashMap<ShardInstance, ShardState>>> getIndexes() public HashMap<Floc, HashMap<Shard, HashSet<ShardState>>> getIndexes()
{ {
return indexes; return indexes;
} }
} }
/* (non-Javadoc)
* @see org.alfresco.repo.index.shard.ShardRegistry#getFlocs()
*/
@Override
public HashMap<Floc, HashMap<Shard, HashSet<ShardState>>> getFlocs()
{
ShardStateCollector shardStatesCollector = getPersistedShardStates();
HashMap<Floc, HashMap<Shard, HashSet<ShardState>>> flocs = shardStatesCollector.getIndexes();
long now = System.currentTimeMillis();
for (Floc floc : flocs.keySet())
{
HashMap<Shard, HashSet<ShardState>> shards = flocs.get(floc);
for(Shard shard : shards.keySet())
{
HashSet<ShardState> instances = shards.get(shard);
long minTxId = Long.MAX_VALUE;
long maxTxId = 0;
for(ShardState state : instances)
{
minTxId = Math.min(minTxId, state.getLastIndexedTxId());
maxTxId = Math.max(maxTxId, state.getLastIndexedTxId());
}
for(ShardState state : instances)
{
if(isShardTimedOut(now, state))
{
state.getPropertyBag().put(INSTANCE_STATE, ReplicaState.SILENT.toString());
}
else if(isShardAllowed(maxTxId, state))
{
state.getPropertyBag().put(INSTANCE_STATE, ReplicaState.ACTIVE.toString());
}
else
{
state.getPropertyBag().put(INSTANCE_STATE, ReplicaState.LAGGING.toString());
}
}
}
}
return flocs;
}
/* (non-Javadoc)
* @see org.alfresco.repo.index.shard.ShardRegistry#purgeAgedOutShards()
*/
@Override
public void purgeAgedOutShards()
{
long now = System.currentTimeMillis();
ShardStateCollector shardStates = getPersistedShardStates();
for(Floc floc : shardStates.indexes.keySet())
{
HashMap<Shard, HashSet<ShardState>> shards = shardStates.indexes.get(floc);
for(Shard shard : shards.keySet())
{
HashSet<ShardState> states = shards.get(shard);
for(ShardState state : states)
{
if(isShardTimedOut(now, state))
{
String guid = shardStates.shardGuids.get(state.getShardInstance());
DeleteCallBack dcb = new DeleteCallBack(attributeService, guid);
transactionService.getRetryingTransactionHelper().doInTransaction(dcb, false, true);
}
}
}
}
}
} }