diff --git a/config/alfresco/subsystems/Search/buildonly/common-search.properties b/config/alfresco/subsystems/Search/buildonly/common-search.properties index 6cae6bddfb..2cd4fdfd14 100644 --- a/config/alfresco/subsystems/Search/buildonly/common-search.properties +++ b/config/alfresco/subsystems/Search/buildonly/common-search.properties @@ -5,3 +5,7 @@ search.solrTrackingSupport.ignorePathsForSpecificAspects=false solr.query.fts.queryConsistency=TRANSACTIONAL_IF_POSSIBLE solr.query.cmis.queryConsistency=TRANSACTIONAL_IF_POSSIBLE solr.query.hybrid.enabled=false + +search.solrShardRegistry.purgeOnInit=true +search.solrShardRegistry.shardInstanceTimeoutInSeconds=300 +search.solrShardRegistry.maxAllowedReplicaTxCountDifference=1000 \ No newline at end of file diff --git a/config/alfresco/subsystems/Search/common-search-context.xml b/config/alfresco/subsystems/Search/common-search-context.xml index c3174036ce..4f91f55163 100644 --- a/config/alfresco/subsystems/Search/common-search-context.xml +++ b/config/alfresco/subsystems/Search/common-search-context.xml @@ -3,7 +3,6 @@ - @@ -57,10 +56,20 @@ - + + + + ${search.solrShardRegistry.purgeOnInit} + + + ${search.solrShardRegistry.shardInstanceTimeoutInSeconds} + + + ${search.solrShardRegistry.maxAllowedReplicaTxCountDifference} + diff --git a/config/alfresco/subsystems/Search/noindex/common-search.properties b/config/alfresco/subsystems/Search/noindex/common-search.properties index 8dfc2bd216..cfea085e1e 100644 --- a/config/alfresco/subsystems/Search/noindex/common-search.properties +++ b/config/alfresco/subsystems/Search/noindex/common-search.properties @@ -5,3 +5,7 @@ search.solrTrackingSupport.ignorePathsForSpecificAspects=false solr.query.fts.queryConsistency=TRANSACTIONAL_IF_POSSIBLE solr.query.cmis.queryConsistency=TRANSACTIONAL_IF_POSSIBLE solr.query.hybrid.enabled=false + +search.solrShardRegistry.purgeOnInit=true +search.solrShardRegistry.shardInstanceTimeoutInSeconds=300 +search.solrShardRegistry.maxAllowedReplicaTxCountDifference=1000 \ No newline at end of file diff --git a/config/alfresco/subsystems/Search/solr/common-search.properties b/config/alfresco/subsystems/Search/solr/common-search.properties index cab48f9606..2cd4fdfd14 100644 --- a/config/alfresco/subsystems/Search/solr/common-search.properties +++ b/config/alfresco/subsystems/Search/solr/common-search.properties @@ -4,4 +4,8 @@ search.solrTrackingSupport.ignorePathsForSpecificAspects=false solr.query.fts.queryConsistency=TRANSACTIONAL_IF_POSSIBLE solr.query.cmis.queryConsistency=TRANSACTIONAL_IF_POSSIBLE -solr.query.hybrid.enabled=false \ No newline at end of file +solr.query.hybrid.enabled=false + +search.solrShardRegistry.purgeOnInit=true +search.solrShardRegistry.shardInstanceTimeoutInSeconds=300 +search.solrShardRegistry.maxAllowedReplicaTxCountDifference=1000 \ No newline at end of file diff --git a/config/alfresco/subsystems/Search/solr4/common-search.properties b/config/alfresco/subsystems/Search/solr4/common-search.properties index aff99a34bc..f1c40e0e20 100644 --- a/config/alfresco/subsystems/Search/solr4/common-search.properties +++ b/config/alfresco/subsystems/Search/solr4/common-search.properties @@ -4,4 +4,8 @@ search.solrTrackingSupport.ignorePathsForSpecificAspects=false solr.query.fts.queryConsistency=TRANSACTIONAL_IF_POSSIBLE solr.query.cmis.queryConsistency=TRANSACTIONAL_IF_POSSIBLE -solr.query.hybrid.enabled=false \ No newline at end of file +solr.query.hybrid.enabled=false + +search.solrShardRegistry.purgeOnInit=true +search.solrShardRegistry.shardInstanceTimeoutInSeconds=300 +search.solrShardRegistry.maxAllowedReplicaTxCountDifference=1000 diff --git a/source/java/org/alfresco/repo/index/shard/ShardRegistry.java b/source/java/org/alfresco/repo/index/shard/ShardRegistry.java index 7567c6b4e7..4eea0973e5 100644 --- a/source/java/org/alfresco/repo/index/shard/ShardRegistry.java +++ b/source/java/org/alfresco/repo/index/shard/ShardRegistry.java @@ -31,4 +31,6 @@ public interface ShardRegistry public void registerShardState(ShardState shardState); public List getIndexSlice(SearchParameters searchParameters); + + public void purge(); } diff --git a/source/java/org/alfresco/repo/index/shard/ShardRegistryImpl.java b/source/java/org/alfresco/repo/index/shard/ShardRegistryImpl.java index 0c3706272f..50bd0b1bc3 100644 --- a/source/java/org/alfresco/repo/index/shard/ShardRegistryImpl.java +++ b/source/java/org/alfresco/repo/index/shard/ShardRegistryImpl.java @@ -20,16 +20,22 @@ package org.alfresco.repo.index.shard; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; import org.alfresco.repo.cache.SimpleCache; +import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.alfresco.service.cmr.attributes.AttributeService; import org.alfresco.service.cmr.attributes.AttributeService.AttributeQueryCallback; import org.alfresco.service.cmr.search.SearchParameters; +import org.alfresco.service.transaction.TransactionService; import org.alfresco.util.GUID; +import org.alfresco.util.Pair; import com.hazelcast.util.ConcurrentHashSet; @@ -38,6 +44,47 @@ import com.hazelcast.util.ConcurrentHashSet; */ public class ShardRegistryImpl implements ShardRegistry { + /** + * The best shard sould be at the top; + * @author Andy + * + */ + public static class FlocComparator implements Comparator>>> + { + public FlocComparator() + { + + } + + @Override + public int compare(Pair>> left, Pair>> right) + { + double leftTxCount = 0; + for(HashSet states : left.getSecond().values()) + { + long shardMaxTxCount = 0; + for(ShardState state : states) + { + shardMaxTxCount = Math.max(shardMaxTxCount, state.getLastIndexedTxId()); + } + leftTxCount += ((double)shardMaxTxCount)/left.getFirst().getNumberOfShards(); + } + + double rightTxCount = 0; + for(HashSet states : right.getSecond().values()) + { + long shardMaxTxCount = 0; + for(ShardState state : states) + { + shardMaxTxCount = Math.max(shardMaxTxCount, state.getLastIndexedTxId()); + } + rightTxCount += ((double)shardMaxTxCount)/right.getFirst().getNumberOfShards(); + } + return (int)(rightTxCount - leftTxCount); + } + + } + private static String SHARD_STATE_KEY = ".SHARD_STATE"; private AttributeService attributeService; @@ -49,10 +96,36 @@ public class ShardRegistryImpl implements ShardRegistry private ConcurrentHashSet knownFlocks = new ConcurrentHashSet(); private Random random = new Random(123); + + private boolean purgeOnInit = false; + + TransactionService transactionService; + + private long shardInstanceTimeoutInSeconds = 300; + + private long maxAllowedReplicaTxCountDifference = 1000; public ShardRegistryImpl() { } + + public void init() + { + if(purgeOnInit && (transactionService != null)) + { + transactionService.getRetryingTransactionHelper().doInTransaction(new RetryingTransactionCallback() + { + + @Override + public Object execute() throws Throwable + { + purge(); + return null; + } + }, false, true); + + } + } public void setAttributeService(AttributeService attributeService) { @@ -68,52 +141,162 @@ public class ShardRegistryImpl implements ShardRegistry { this.shardToGuidCache = shardToGuidCache; } + + public void setPurgeOnInit(boolean purgeOnInit) + { + this.purgeOnInit = purgeOnInit; + } + public void setShardInstanceTimeoutInSeconds(int shardInstanceTimeoutInSeconds) + { + this.shardInstanceTimeoutInSeconds = shardInstanceTimeoutInSeconds; + } + + + + /** + * @param maxAllowedReplicaTxCountDifference the maxAllowedReplicaTxCountDifference to set + */ + public void setMaxAllowedReplicaTxCountDifference(long maxAllowedReplicaTxCountDifference) + { + this.maxAllowedReplicaTxCountDifference = maxAllowedReplicaTxCountDifference; + } + + /** + * @param transactionService the transactionService to set + */ + public void setTransactionService(TransactionService transactionService) + { + this.transactionService = transactionService; + } + + public void purge() + { + ShardStateCollector shardStates = getPersistedShardStates(); + for(String guid : shardStates.shardGuids.values()) + { + DeleteCallBack dcb = new DeleteCallBack(attributeService, guid); + transactionService.getRetryingTransactionHelper().doInTransaction(dcb, false, true); + } + } + + private static class DeleteCallBack implements RetryingTransactionCallback + { + AttributeService attributeService; + + String guid; + + DeleteCallBack(AttributeService attributeService, String guid) + { + this.attributeService = attributeService; + this.guid = guid; + } + + /* (non-Javadoc) + * @see org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback#execute() + */ + @Override + public Object execute() throws Throwable + { + attributeService.removeAttributes(SHARD_STATE_KEY, guid); + return null; + } + + } + /** * @param shardState */ - public void registerShardState(ShardState shardState) + public void registerShardState(final ShardState shardState) { - String guid = getPersistedShardStatusGuid(shardState.getShardInstance()); - attributeService.setAttribute(shardState, SHARD_STATE_KEY, guid); - shardStateCache.put(shardState.getShardInstance(), shardState); - knownFlocks.add(shardState.getShardInstance().getShard().getFloc()); + transactionService.getRetryingTransactionHelper().doInTransaction(new RetryingTransactionCallback() + { + + @Override + public Object execute() throws Throwable + { + String guid = getPersistedShardStatusGuid(shardState.getShardInstance()); + attributeService.setAttribute(shardState, SHARD_STATE_KEY, guid); + shardStateCache.put(shardState.getShardInstance(), shardState); + knownFlocks.add(shardState.getShardInstance().getShard().getFloc()); + return null; + } + }, false, true); + + } public List getIndexSlice(SearchParameters searchParameters) { - Floc floc = findFlocFromKnown(searchParameters); - if (floc == null) + Set flocs = findFlocsFromKnown(searchParameters); + if (flocs.size() == 0) { updateKnownFlocs(); - floc = findFlocFromKnown(searchParameters); + flocs = findFlocsFromKnown(searchParameters); } - return selectShardInstancesForFlock(floc); + return selectShardInstancesForBestFlock(flocs); } - private List selectShardInstancesForFlock(Floc floc) + private List selectShardInstancesForBestFlock(Set flocs) { - HashMap> index = new HashMap>(); - - getShardInstancesFromCache(floc, index); - if (index.size() < floc.getNumberOfShards()) + ArrayList>>> indexes = new ArrayList>>> (); + + for(Floc floc : flocs) { - updateShardStateCache(floc); + HashMap> index = new HashMap>(); + getShardStatesFromCache(floc, index); + if (index.size() < floc.getNumberOfShards()) + { + updateShardStateCache(floc); + getShardStatesFromCache(floc, index); + } + indexes.add(new Pair>>(floc, index)); } - getShardInstancesFromCache(floc, index); - ArrayList slice = new ArrayList(floc.getNumberOfShards()); - for (Shard shard : index.keySet()) + Collections.sort(indexes, new FlocComparator()); + + Pair>> best = indexes.get(0); + ArrayList slice = new ArrayList(best.getFirst().getNumberOfShards()); + for (Shard shard : best.getSecond().keySet()) { - int position = random.nextInt(index.get(shard).size()); - ShardInstance instance = index.get(shard).toArray(new ShardInstance[] {})[position]; + // Only allow replicas within some fraction of the max TxId + ShardState[] allowedInstances = getAllowedInstances(best.getSecond().get(shard)); + int position = random.nextInt(allowedInstances.length); + ShardInstance instance = allowedInstances[position].getShardInstance(); slice.add(instance); } return slice; } + + /** + * @param hashSet + * @return + */ + private ShardState[] getAllowedInstances(HashSet states) + { + HashSet allowed = new HashSet(); + + long maxTxId = 0; + for(ShardState state :states) + { + maxTxId = Math.max(maxTxId, state.getLastIndexedTxId()); + } + + for(ShardState state :states) + { + if( (maxTxId - state.getLastIndexedTxId()) <= maxAllowedReplicaTxCountDifference) + { + allowed.add(state); + } + } + + + return allowed.toArray(new ShardState[] {}); + } + /** * @param floc */ @@ -121,11 +304,14 @@ public class ShardRegistryImpl implements ShardRegistry { ShardStateCollector shardStates = getPersistedShardStates(); HashMap> shards = shardStates.getIndexes().get(floc); - for (HashMap map : shards.values()) + if(shards != null) { - for (ShardInstance instance : map.keySet()) + for (HashMap map : shards.values()) { - shardStateCache.put(instance, map.get(instance)); + for (ShardInstance instance : map.keySet()) + { + shardStateCache.put(instance, map.get(instance)); + } } } } @@ -134,19 +320,26 @@ public class ShardRegistryImpl implements ShardRegistry * @param floc * @param index */ - private void getShardInstancesFromCache(Floc floc, HashMap> index) + private void getShardStatesFromCache(Floc floc, HashMap> index) { + long now = System.currentTimeMillis(); for (ShardInstance instance : shardStateCache.getKeys()) { + ShardState state = shardStateCache.get(instance); + if( (now - state.getLastUpdated()) > (shardInstanceTimeoutInSeconds * 1000) ) + { + continue; + } + if (instance.getShard().getFloc().equals(floc)) { - HashSet replicas = index.get(instance.getShard()); + HashSet replicas = index.get(instance.getShard()); if (replicas == null) { - replicas = new HashSet(); + replicas = new HashSet(); index.put(instance.getShard(), replicas); } - replicas.add(instance); + replicas.add(state); } } } @@ -157,17 +350,17 @@ public class ShardRegistryImpl implements ShardRegistry knownFlocks.addAll(shardStates.getIndexes().keySet()); } - private Floc findFlocFromKnown(SearchParameters searchParameters) + private HashSet findFlocsFromKnown(SearchParameters searchParameters) { - Floc best = null; + HashSet flocs = new HashSet(); for (Floc floc : knownFlocks) { if (floc.getStoreRefs().containsAll(searchParameters.getStores())) { - best = getBestFloc(best, floc); + flocs.add(floc); } } - return best; + return flocs; } private Floc getBestFloc(Floc best, Floc floc) @@ -176,6 +369,8 @@ public class ShardRegistryImpl implements ShardRegistry { return floc; } + + if (best.getNumberOfShards() >= floc.getNumberOfShards()) { return best; @@ -230,6 +425,11 @@ public class ShardRegistryImpl implements ShardRegistry @Override public boolean handleAttribute(Long id, Serializable value, Serializable[] keys) { + if(value == null) + { + return true; + } + String shardInstanceGuid = (String) keys[1]; diff --git a/source/test-java/org/alfresco/repo/index/shard/ShardRegistryTest.java b/source/test-java/org/alfresco/repo/index/shard/ShardRegistryTest.java index ca3dfec108..40b9e7953a 100644 --- a/source/test-java/org/alfresco/repo/index/shard/ShardRegistryTest.java +++ b/source/test-java/org/alfresco/repo/index/shard/ShardRegistryTest.java @@ -18,8 +18,7 @@ */ package org.alfresco.repo.index.shard; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -28,6 +27,7 @@ import static org.mockito.Mockito.verify; import java.io.Serializable; import java.net.UnknownHostException; +import java.util.HashSet; import java.util.List; import org.alfresco.repo.cache.DefaultSimpleCache; @@ -36,6 +36,7 @@ import org.alfresco.service.cmr.attributes.AttributeService; import org.alfresco.service.cmr.attributes.AttributeService.AttributeQueryCallback; import org.alfresco.service.cmr.repository.StoreRef; import org.alfresco.service.cmr.search.SearchParameters; +import org.alfresco.service.transaction.TransactionService; import org.alfresco.util.GUID; import org.junit.Before; import org.junit.Test; @@ -59,6 +60,8 @@ public class ShardRegistryTest private DefaultSimpleCache shardToGuidCache = new DefaultSimpleCache(); private @Mock AttributeService attributeService; + + private @Mock TransactionService transactionService; /** * @@ -75,13 +78,18 @@ public class ShardRegistryTest shardRegistry.setAttributeService(attributeService); shardRegistry.setShardStateCache(shardStateCache); shardRegistry.setShardToGuidCache(shardToGuidCache); + shardRegistry.setTransactionService(transactionService); + shardRegistry.setPurgeOnInit(false); + shardRegistry.setShardInstanceTimeoutInSeconds(30); + shardRegistry.setMaxAllowedReplicaTxCountDifference(10); + shardRegistry.init(); } @Test public void registerLocalShardState() { - ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true) + ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard1").withHostName("meep").withPort(1234) .withShard().withInstance(1) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") @@ -98,7 +106,7 @@ public class ShardRegistryTest verify(attributeService).setAttribute(shardState1, ".SHARD_STATE", guid1); - ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true) + ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard2").withHostName("meep").withPort(1234) .withShard().withInstance(2) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") @@ -128,7 +136,91 @@ public class ShardRegistryTest List slice = shardRegistry.getIndexSlice(sp); assertEquals(2, slice.size()); assertTrue(slice.contains(shardState1.getShardInstance())); - assertTrue(slice.contains(shardState2.getShardInstance())); + assertTrue(slice.contains(shardState2.getShardInstance())); + } + + @Test + public void testShardsExpire() + { + + ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) + .withShardInstance().withBaseUrl("/solr4/shard1").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + shardRegistry.registerShardState(shardState1); + + assertEquals(1, shardToGuidCache.getKeys().size()); + assertEquals(1, shardStateCache.getKeys().size()); + + assertEquals(shardState1, shardStateCache.get(shardState1.getShardInstance())); + + String guid1 = shardToGuidCache.get(shardState1.getShardInstance()); + verify(attributeService).setAttribute(shardState1, ".SHARD_STATE", guid1); + + + ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) + .withShardInstance().withBaseUrl("/solr4/shard2").withHostName("meep").withPort(1234) + .withShard().withInstance(2) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + shardRegistry.registerShardState(shardState2); + + assertEquals(2, shardToGuidCache.getKeys().size()); + assertEquals(2, shardStateCache.getKeys().size()); + + assertEquals(shardState2, shardStateCache.get(shardState2.getShardInstance())); + + String guid2 = shardToGuidCache.get(shardState2.getShardInstance()); + verify(attributeService).setAttribute(shardState2, ".SHARD_STATE", guid2); + + // and again + + shardRegistry.registerShardState(shardState2); + assertEquals(2, shardToGuidCache.getKeys().size()); + assertEquals(2, shardStateCache.getKeys().size()); + + assertEquals(shardState2, shardStateCache.get(shardState2.getShardInstance())); + verify(attributeService, times(2)).setAttribute(shardState2, ".SHARD_STATE", guid2); + + SearchParameters sp = new SearchParameters(); + sp.addStore(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE); + List slice = shardRegistry.getIndexSlice(sp); + assertEquals(2, slice.size()); + assertTrue(slice.contains(shardState1.getShardInstance())); + assertTrue(slice.contains(shardState2.getShardInstance())); + + + // expire check + + synchronized(this) + { + try + { + wait(40000); + } + catch (InterruptedException e) + { + fail("Failed to wait"); + } + } + + slice = shardRegistry.getIndexSlice(sp); + assertEquals(0, slice.size()); + + shardState1.setLastUpdated(System.currentTimeMillis()); + shardRegistry.registerShardState(shardState1); + + slice = shardRegistry.getIndexSlice(sp); + assertEquals(1, slice.size()); + + shardState2.setLastUpdated(System.currentTimeMillis()); + shardRegistry.registerShardState(shardState2); + + slice = shardRegistry.getIndexSlice(sp); + assertEquals(2, slice.size()); } @@ -137,7 +229,7 @@ public class ShardRegistryTest { final String guid1 = GUID.generate(); - final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard1").withHostName("meep").withPort(1234) .withShard().withInstance(1) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") @@ -161,7 +253,7 @@ public class ShardRegistryTest } }).when(attributeService).getAttributes(any(AttributeQueryCallback.class), eq(".SHARD_STATE")); - ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true) + ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard2").withHostName("meep").withPort(1234) .withShard().withInstance(2) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") @@ -201,13 +293,13 @@ public class ShardRegistryTest final String guid1 = GUID.generate(); final String guid2 = GUID.generate(); - final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard1").withHostName("meep").withPort(1234) .withShard().withInstance(1) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") .endFloc().endShard().endShardInstance().build(); - final ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard2").withHostName("meep").withPort(1234) .withShard().withInstance(2) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") @@ -270,13 +362,13 @@ public class ShardRegistryTest final String guid1 = GUID.generate(); final String guid2 = GUID.generate(); - final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard1").withHostName("meep").withPort(1234) .withShard().withInstance(1) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") .endFloc().endShard().endShardInstance().build(); - final ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard2").withHostName("meep").withPort(1234) .withShard().withInstance(2) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") @@ -320,43 +412,43 @@ public class ShardRegistryTest final String guid6 = GUID.generate(); final String guid7 = GUID.generate(); - final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard1").withHostName("meep").withPort(1234) .withShard().withInstance(1) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") .endFloc().endShard().endShardInstance().build(); - final ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard2").withHostName("meep").withPort(1234) .withShard().withInstance(2) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") .endFloc().endShard().endShardInstance().build(); - final ShardState shardState3 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState3 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/rep1").withHostName("meep").withPort(1234) .withShard().withInstance(1) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") .endFloc().endShard().endShardInstance().build(); - final ShardState shardState4 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState4 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/rep2").withHostName("meep").withPort(1234) .withShard().withInstance(2) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") .endFloc().endShard().endShardInstance().build(); - final ShardState shardState5 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState5 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard1").withHostName("meep").withPort(1234) .withShard().withInstance(1) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_ARCHIVE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") .endFloc().endShard().endShardInstance().build(); - final ShardState shardState6 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState6 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard2").withHostName("meep").withPort(1234) .withShard().withInstance(2) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_ARCHIVE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") .endFloc().endShard().endShardInstance().build(); - final ShardState shardState7 = ShardStateBuilder.shardState().withMaster(true) + final ShardState shardState7 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()) .withShardInstance().withBaseUrl("/solr4/shard2").withHostName("meep").withPort(1234) .withShard().withInstance(1) .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(1).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") @@ -400,4 +492,362 @@ public class ShardRegistryTest assertTrue(slice.contains(shardState5.getShardInstance())); assertTrue(slice.contains(shardState6.getShardInstance())); } + + @Test + public void testSelectFlocBasedInTxCount1() + { + final String guid1 = GUID.generate(); + + final String guid2 = GUID.generate(); + final String guid3 = GUID.generate(); + + final String guid4 = GUID.generate(); + final String guid5 = GUID.generate(); + final String guid6 = GUID.generate(); + final String guid7 = GUID.generate(); + + final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(10) + .withShardInstance().withBaseUrl("/solr4/single").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(1).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + + final ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(9) + .withShardInstance().withBaseUrl("/solr4/2-1").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState3 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(9) + .withShardInstance().withBaseUrl("/solr4/2-2").withHostName("meep").withPort(1234) + .withShard().withInstance(2) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState4 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(8) + .withShardInstance().withBaseUrl("/solr4/4-1").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState5 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(8) + .withShardInstance().withBaseUrl("/solr4/4-2").withHostName("meep").withPort(1234) + .withShard().withInstance(2) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState6 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(8) + .withShardInstance().withBaseUrl("/solr4/4-3").withHostName("meep").withPort(1234) + .withShard().withInstance(3) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState7 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(8) + .withShardInstance().withBaseUrl("/solr4/4-4").withHostName("meep").withPort(1234) + .withShard().withInstance(4) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + + doAnswer(new Answer() + { + long id = 0; + ShardStateCollector callback; + void handle(Serializable value, String... keys) + { + callback.handleAttribute(id++, value, keys); + } + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + callback = (ShardStateCollector) invocation.getArguments()[0]; + handle(shardState1, ".SHARD_STATE", guid1); + handle(shardState2, ".SHARD_STATE", guid2); + handle(shardState3, ".SHARD_STATE", guid3); + handle(shardState4, ".SHARD_STATE", guid4); + handle(shardState5, ".SHARD_STATE", guid5); + handle(shardState6, ".SHARD_STATE", guid6); + handle(shardState7, ".SHARD_STATE", guid7); + return null; + } + }).when(attributeService).getAttributes(any(AttributeQueryCallback.class), eq(".SHARD_STATE")); + + SearchParameters sp = new SearchParameters(); + sp.addStore(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE); + List slice = shardRegistry.getIndexSlice(sp); + assertEquals(1, slice.size()); + assertTrue(slice.contains(shardState1.getShardInstance())); + } + + @Test + public void testSelectFlocBasedInTxCount2() + { + final String guid1 = GUID.generate(); + + final String guid2 = GUID.generate(); + final String guid3 = GUID.generate(); + + final String guid4 = GUID.generate(); + final String guid5 = GUID.generate(); + final String guid6 = GUID.generate(); + final String guid7 = GUID.generate(); + + final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(8) + .withShardInstance().withBaseUrl("/solr4/single").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(1).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + + final ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(10) + .withShardInstance().withBaseUrl("/solr4/2-1").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState3 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(10) + .withShardInstance().withBaseUrl("/solr4/2-2").withHostName("meep").withPort(1234) + .withShard().withInstance(2) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState4 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(9) + .withShardInstance().withBaseUrl("/solr4/4-1").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState5 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(9) + .withShardInstance().withBaseUrl("/solr4/4-2").withHostName("meep").withPort(1234) + .withShard().withInstance(2) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState6 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(9) + .withShardInstance().withBaseUrl("/solr4/4-3").withHostName("meep").withPort(1234) + .withShard().withInstance(3) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState7 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(9) + .withShardInstance().withBaseUrl("/solr4/4-4").withHostName("meep").withPort(1234) + .withShard().withInstance(4) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + + doAnswer(new Answer() + { + long id = 0; + ShardStateCollector callback; + void handle(Serializable value, String... keys) + { + callback.handleAttribute(id++, value, keys); + } + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + callback = (ShardStateCollector) invocation.getArguments()[0]; + handle(shardState1, ".SHARD_STATE", guid1); + handle(shardState2, ".SHARD_STATE", guid2); + handle(shardState3, ".SHARD_STATE", guid3); + handle(shardState4, ".SHARD_STATE", guid4); + handle(shardState5, ".SHARD_STATE", guid5); + handle(shardState6, ".SHARD_STATE", guid6); + handle(shardState7, ".SHARD_STATE", guid7); + return null; + } + }).when(attributeService).getAttributes(any(AttributeQueryCallback.class), eq(".SHARD_STATE")); + + SearchParameters sp = new SearchParameters(); + sp.addStore(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE); + List slice = shardRegistry.getIndexSlice(sp); + assertEquals(2, slice.size()); + assertTrue(slice.contains(shardState2.getShardInstance())); + assertTrue(slice.contains(shardState3.getShardInstance())); + } + + + @Test + public void testSelectFlocBasedInTxCount3() + { + final String guid1 = GUID.generate(); + + final String guid2 = GUID.generate(); + final String guid3 = GUID.generate(); + + final String guid4 = GUID.generate(); + final String guid5 = GUID.generate(); + final String guid6 = GUID.generate(); + final String guid7 = GUID.generate(); + + final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(9) + .withShardInstance().withBaseUrl("/solr4/single").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(1).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + + final ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(8) + .withShardInstance().withBaseUrl("/solr4/2-1").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState3 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(8) + .withShardInstance().withBaseUrl("/solr4/2-2").withHostName("meep").withPort(1234) + .withShard().withInstance(2) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState4 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(10) + .withShardInstance().withBaseUrl("/solr4/4-1").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState5 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(10) + .withShardInstance().withBaseUrl("/solr4/4-2").withHostName("meep").withPort(1234) + .withShard().withInstance(2) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState6 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(10) + .withShardInstance().withBaseUrl("/solr4/4-3").withHostName("meep").withPort(1234) + .withShard().withInstance(3) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState7 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(10) + .withShardInstance().withBaseUrl("/solr4/4-4").withHostName("meep").withPort(1234) + .withShard().withInstance(4) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(4).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + + doAnswer(new Answer() + { + long id = 0; + ShardStateCollector callback; + void handle(Serializable value, String... keys) + { + callback.handleAttribute(id++, value, keys); + } + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + callback = (ShardStateCollector) invocation.getArguments()[0]; + handle(shardState1, ".SHARD_STATE", guid1); + handle(shardState2, ".SHARD_STATE", guid2); + handle(shardState3, ".SHARD_STATE", guid3); + handle(shardState4, ".SHARD_STATE", guid4); + handle(shardState5, ".SHARD_STATE", guid5); + handle(shardState6, ".SHARD_STATE", guid6); + handle(shardState7, ".SHARD_STATE", guid7); + return null; + } + }).when(attributeService).getAttributes(any(AttributeQueryCallback.class), eq(".SHARD_STATE")); + + SearchParameters sp = new SearchParameters(); + sp.addStore(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE); + List slice = shardRegistry.getIndexSlice(sp); + assertEquals(4, slice.size()); + assertTrue(slice.contains(shardState4.getShardInstance())); + assertTrue(slice.contains(shardState5.getShardInstance())); + assertTrue(slice.contains(shardState6.getShardInstance())); + assertTrue(slice.contains(shardState7.getShardInstance())); + } + + @Test + public void testReplicaSelection() + { + final String guid1 = GUID.generate(); + final String guid2 = GUID.generate(); + + final String guid3 = GUID.generate(); + final String guid4 = GUID.generate(); + + final String guid5 = GUID.generate(); + final String guid6 = GUID.generate(); + + final ShardState shardState1 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(100) + .withShardInstance().withBaseUrl("/solr4/1-1").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState2 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(100) + .withShardInstance().withBaseUrl("/solr4/1-2").withHostName("meep").withPort(1234) + .withShard().withInstance(2) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState3 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(90) + .withShardInstance().withBaseUrl("/solr4/2-1").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState4 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(90) + .withShardInstance().withBaseUrl("/solr4/2-2").withHostName("meep").withPort(1234) + .withShard().withInstance(2) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState5 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(80) + .withShardInstance().withBaseUrl("/solr4/3-1").withHostName("meep").withPort(1234) + .withShard().withInstance(1) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + final ShardState shardState6 = ShardStateBuilder.shardState().withMaster(true).withLastUpdated(System.currentTimeMillis()).withLastIndexedTxId(80) + .withShardInstance().withBaseUrl("/solr4/3-2").withHostName("meep").withPort(1234) + .withShard().withInstance(2) + .withFloc().withAddedStoreRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE).withHasContent(true).withNumberOfShards(2).withShardMethod(ShardMethodEnum.MOD_ACL_ID).withTemplate("default") + .endFloc().endShard().endShardInstance().build(); + + + doAnswer(new Answer() + { + long id = 0; + ShardStateCollector callback; + void handle(Serializable value, String... keys) + { + callback.handleAttribute(id++, value, keys); + } + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + callback = (ShardStateCollector) invocation.getArguments()[0]; + handle(shardState1, ".SHARD_STATE", guid1); + handle(shardState2, ".SHARD_STATE", guid2); + handle(shardState3, ".SHARD_STATE", guid3); + handle(shardState4, ".SHARD_STATE", guid4); + handle(shardState5, ".SHARD_STATE", guid5); + handle(shardState6, ".SHARD_STATE", guid6); + return null; + } + }).when(attributeService).getAttributes(any(AttributeQueryCallback.class), eq(".SHARD_STATE")); + + HashSet found = new HashSet(); + for(int i = 0; i < 1000; i++) + { + SearchParameters sp = new SearchParameters(); + sp.addStore(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE); + List slice = shardRegistry.getIndexSlice(sp); + assertEquals(2, slice.size()); + assertTrue(slice.contains(shardState1.getShardInstance()) || slice.contains(shardState3.getShardInstance())); + assertTrue(slice.contains(shardState2.getShardInstance()) || slice.contains(shardState4.getShardInstance())); + + found.add(slice.get(0)); + found.add(slice.get(1)); + } + + assertEquals(4, found.size()); + } }