diff --git a/config/alfresco/cache-context.xml b/config/alfresco/cache-context.xml index 86cd2ab74a..efc888887b 100644 --- a/config/alfresco/cache-context.xml +++ b/config/alfresco/cache-context.xml @@ -470,10 +470,20 @@ - + + + + + + + + + + + diff --git a/config/alfresco/caches.properties b/config/alfresco/caches.properties index 4c62e2e706..f21587a88f 100644 --- a/config/alfresco/caches.properties +++ b/config/alfresco/caches.properties @@ -650,3 +650,33 @@ cache.solrFacetNodeRefSharedCache.eviction-percentage=25 cache.solrFacetNodeRefSharedCache.merge-policy=com.hazelcast.map.merge.LatestUpdateMapMergePolicy cache.solrFacetNodeRefSharedCache.readBackupData=false +# +#Shard states cache +# +cache.shardStateSharedCache.tx.maxItems=100 +cache.shardStateSharedCache.tx.statsEnabled=${caches.tx.statsEnabled} +cache.shardStateSharedCache.maxItems=500 +cache.shardStateSharedCache.timeToLiveSeconds=300 +cache.shardStateSharedCache.maxIdleSeconds=0 +cache.shardStateSharedCache.cluster.type=fully-distributed +cache.shardStateSharedCache.backup-count=1 +cache.shardStateSharedCache.eviction-policy=LRU +cache.shardStateSharedCache.eviction-percentage=25 +cache.shardStateSharedCache.merge-policy=hz.ADD_NEW_ENTRY +cache.shardStateSharedCache.readBackupData=false + +# +#Shard instance to guid cache +# +cache.shardToGuidSharedCache.tx.maxItems=100 +cache.shardToGuidSharedCache.tx.statsEnabled=${caches.tx.statsEnabled} +cache.shardToGuidSharedCache.maxItems=500 +cache.shardToGuidSharedCache.timeToLiveSeconds=0 +cache.shardToGuidSharedCache.maxIdleSeconds=0 +cache.shardToGuidSharedCache.cluster.type=fully-distributed +cache.shardToGuidSharedCache.backup-count=1 +cache.shardToGuidSharedCache.eviction-policy=LRU +cache.shardToGuidSharedCache.eviction-percentage=25 +cache.shardToGuidSharedCache.merge-policy=hz.ADD_NEW_ENTRY +cache.shardToGuidSharedCache.readBackupData=false + diff --git a/config/alfresco/subsystems/Search/common-search-context.xml b/config/alfresco/subsystems/Search/common-search-context.xml index 0f17724e98..c3174036ce 100644 --- a/config/alfresco/subsystems/Search/common-search-context.xml +++ b/config/alfresco/subsystems/Search/common-search-context.xml @@ -21,6 +21,46 @@ + + + + + + + + org.alfresco.repo.index.shard.ShardRegistry + + + + + + + + + + + + + + + + + + + + + + ${server.transaction.mode.readOnly} + ${server.transaction.mode.readOnly} + ${server.transaction.mode.default} + + + + + + + + diff --git a/config/alfresco/subsystems/Search/solr4/solr-search-context.xml b/config/alfresco/subsystems/Search/solr4/solr-search-context.xml index 9b3b8cf755..e7201c271b 100644 --- a/config/alfresco/subsystems/Search/solr4/solr-search-context.xml +++ b/config/alfresco/subsystems/Search/solr4/solr-search-context.xml @@ -43,6 +43,9 @@ + + + @@ -69,6 +72,7 @@ + diff --git a/config/alfresco/subsystems/Search/solr4/solr-search.properties b/config/alfresco/subsystems/Search/solr4/solr-search.properties index 2c092fd8eb..6bca0a1506 100644 --- a/config/alfresco/subsystems/Search/solr4/solr-search.properties +++ b/config/alfresco/subsystems/Search/solr4/solr-search.properties @@ -8,6 +8,8 @@ solr.baseUrl=/solr4 solr.defaultUnshardedFacetLimit=100 solr.defaultShardedFacetLimit=20 +solr.useDynamicShardRegistration=false + # # Solr Suggester properties # diff --git a/config/alfresco/tx-cache-context.xml b/config/alfresco/tx-cache-context.xml index 37f9cd65cf..cb7ed3a24c 100644 --- a/config/alfresco/tx-cache-context.xml +++ b/config/alfresco/tx-cache-context.xml @@ -690,5 +690,37 @@ + + + + + + + + org.alfresco.cache.shardStateTransactionalCache + + + + + + + + + + + + + + + + org.alfresco.cache.shardToGuidTransactionalCache + + + + + + + + diff --git a/source/java/org/alfresco/repo/search/impl/solr/ExplicitSolrStoreMappingWrapper.java b/source/java/org/alfresco/repo/search/impl/solr/ExplicitSolrStoreMappingWrapper.java new file mode 100644 index 0000000000..ee5ca4fee0 --- /dev/null +++ b/source/java/org/alfresco/repo/search/impl/solr/ExplicitSolrStoreMappingWrapper.java @@ -0,0 +1,404 @@ +/* + * Copyright (C) 2005-2015 Alfresco Software Limited. + * + * This file is part of Alfresco + * + * Alfresco is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Alfresco is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Alfresco. If not, see . + */ +package org.alfresco.repo.search.impl.solr; + +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.alfresco.error.AlfrescoRuntimeException; +import org.alfresco.httpclient.HttpClientFactory; +import org.alfresco.repo.search.impl.lucene.LuceneQueryParserException; +import org.alfresco.service.cmr.repository.StoreRef; +import org.alfresco.util.Pair; +import org.alfresco.util.shard.ExplicitShardingPolicy; +import org.apache.commons.codec.net.URLCodec; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.params.HttpClientParams; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.BeanNameAware; + +/** + * @author Andy + */ +public class ExplicitSolrStoreMappingWrapper implements SolrStoreMappingWrapper +{ + + private HttpClientFactory httpClientFactory; + + private LinkedHashSet httpClientsAndBaseURLs = new LinkedHashSet(); + + private ExplicitShardingPolicy policy; + + private Random random; + + private BeanFactory beanFactory; + + private SolrStoreMapping wrapped; + + public ExplicitSolrStoreMappingWrapper(SolrStoreMapping wrapped, BeanFactory beanFactory) + { + this.wrapped = wrapped; + this.beanFactory = beanFactory; + init(); + } + + public void init() + { + httpClientFactory = (HttpClientFactory)beanFactory.getBean(wrapped.getHttpClientFactory()); + random = new Random(123); + + if ((wrapped.getNodes() == null) || (wrapped.getNodes().length == 0)) + { + HttpClient httpClient = httpClientFactory.getHttpClient(); + HttpClientParams params = httpClient.getParams(); + //params.setBooleanParameter(HttpClientParams.PREEMPTIVE_AUTHENTICATION, true); + //httpClient.getState().setCredentials(new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT), new UsernamePasswordCredentials("admin", "admin")); + httpClientsAndBaseURLs.add(new HttpClientAndBaseUrl(httpClient, wrapped.getBaseUrl())); + } + else + { + for (String node : wrapped.getNodes()) + { + String nodeHost = httpClientFactory.getHost(); + String nodePort = "" + httpClientFactory.getPort(); + String nodeBaseUrl = wrapped.getBaseUrl(); + + if (node.length() > 0) + { + int colon = node.indexOf(':'); + int forward = (colon > -1) ? node.indexOf('/', colon) : node.indexOf('/'); + + if (colon == -1) + { + if (forward == -1) + { + // single value + if (node.startsWith("/")) + { + nodeBaseUrl = node; + } + try + { + int port = Integer.parseInt(node); + nodePort = "" + port; + } + catch (NumberFormatException nfe) + { + nodeHost = node; + } + } + else + { + try + { + String potentialPort = node.substring(0, forward); + if (potentialPort.length() > 0) + { + int port = Integer.parseInt(potentialPort); + nodePort = "" + port; + } + } + catch (NumberFormatException nfe) + { + nodeHost = node.substring(0, forward); + } + nodeBaseUrl = node.substring(forward); + } + } + else + { + if (forward == -1) + { + if (colon > 0) + { + nodeHost = node.substring(0, colon); + } + if (colon + 1 < node.length()) + { + String port = node.substring(colon + 1); + if (port.length() > 0) + { + nodePort = port; + } + } + } + else + { + if (colon > 0) + { + nodeHost = node.substring(0, colon); + } + + String port = node.substring(colon + 1, forward); + if (port.length() > 0) + { + nodePort = port; + } + nodeBaseUrl = node.substring(forward); + + } + } + } + + try + { + int realPort = Integer.parseInt(nodePort); + httpClientsAndBaseURLs.add(new HttpClientAndBaseUrl(httpClientFactory.getHttpClient(nodeHost, realPort), nodeBaseUrl)); + } + catch (NumberFormatException nfe) + { + httpClientsAndBaseURLs.add(new HttpClientAndBaseUrl(httpClientFactory.getHttpClient(nodeHost, httpClientFactory.getPort()), nodeBaseUrl)); + } + } + } + + policy = new ExplicitShardingPolicy(wrapped.getNumShards(), wrapped.getReplicationFactor(), httpClientsAndBaseURLs.size()); + + } + + + + + public boolean isSharded() + { + return wrapped.getNumShards() > 1; + } + + public String getShards() + { + + if (!policy.configurationIsValid()) + { + throw new AlfrescoRuntimeException("Invalid shard configuration: shard = " + + wrapped.getNumShards() + " reoplicationFactor = " + wrapped.getReplicationFactor() + " with node count = " + httpClientsAndBaseURLs.size()); + } + + return getShards2(); + } + + private String getShards1() + { + try + { + URLCodec encoder = new URLCodec(); + StringBuilder builder = new StringBuilder(); + + Set shards = new HashSet(); + for (int i = 0; i < httpClientsAndBaseURLs.size(); i += wrapped.getReplicationFactor()) + { + for (Integer shardId : policy.getShardIdsForNode(i + 1)) + { + if (!shards.contains(shardId % wrapped.getNumShards())) + { + if (shards.size() > 0) + { + builder.append(','); + } + HttpClientAndBaseUrl httpClientAndBaseUrl = httpClientsAndBaseURLs.toArray(new HttpClientAndBaseUrl[0])[i]; + builder.append(encoder.encode(httpClientAndBaseUrl.getHost(), "UTF-8")); + builder.append(':'); + builder.append(encoder.encode("" + httpClientAndBaseUrl.getPort(), "UTF-8")); + if (httpClientAndBaseUrl.getBaseUrl().startsWith("/")) + { + builder.append(encoder.encode(httpClientAndBaseUrl.getBaseUrl(), "UTF-8")); + } + else + { + builder.append(encoder.encode("/" + httpClientAndBaseUrl.getBaseUrl(), "UTF-8")); + } + + builder.append('-').append(shardId); + + shards.add(shardId % wrapped.getNumShards()); + } + + } + } + return builder.toString(); + } + catch (UnsupportedEncodingException e) + { + throw new LuceneQueryParserException("", e); + } + } + + private String getShards2() + { + try + { + URLCodec encoder = new URLCodec(); + StringBuilder builder = new StringBuilder(); + + for (int shard = 0; shard < wrapped.getNumShards(); shard++) + { + int position = random.nextInt(wrapped.getReplicationFactor()); + List nodeInstances = policy.getNodeInstancesForShardId(shard); + Integer nodeId = nodeInstances.get(position); + + if (builder.length() > 0) + { + builder.append(','); + } + HttpClientAndBaseUrl httpClientAndBaseUrl = httpClientsAndBaseURLs.toArray(new HttpClientAndBaseUrl[0])[nodeId-1]; + builder.append(encoder.encode(httpClientAndBaseUrl.getHost(), "UTF-8")); + builder.append(':'); + builder.append(encoder.encode("" + httpClientAndBaseUrl.getPort(), "UTF-8")); + if (httpClientAndBaseUrl.getBaseUrl().startsWith("/")) + { + builder.append(encoder.encode(httpClientAndBaseUrl.getBaseUrl(), "UTF-8")); + } + else + { + builder.append(encoder.encode("/" + httpClientAndBaseUrl.getBaseUrl(), "UTF-8")); + } + + builder.append('-').append(shard); + + } + return builder.toString(); + } + catch (UnsupportedEncodingException e) + { + throw new LuceneQueryParserException("", e); + } + } + + /** + * @return + */ + public int getNodeCount() + { + return httpClientsAndBaseURLs.size(); + } + + private static class HttpClientAndBaseUrl + { + + private HttpClient httpClient; + + private String baseUrl; + + HttpClientAndBaseUrl(HttpClient httpClient, String baseUrl) + { + this.httpClient = httpClient; + this.baseUrl = baseUrl; + } + + public String getBaseUrl() + { + return baseUrl; + } + + public String getHost() + { + return httpClient.getHostConfiguration().getHost(); + } + + public int getPort() + { + return httpClient.getHostConfiguration().getPort(); + } + + /* + * (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((baseUrl == null) ? 0 : baseUrl.hashCode()); + result = prime * result + ((getHost() == null) ? 0 : getHost().hashCode()); + result = prime * result + getPort(); + return result; + } + + /* + * (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HttpClientAndBaseUrl other = (HttpClientAndBaseUrl) obj; + if (baseUrl == null) + { + if (other.baseUrl != null) + return false; + } + else if (!baseUrl.equals(other.baseUrl)) + return false; + if (httpClient == null) + { + if (other.httpClient != null) + return false; + } + else if (!getHost().equals(other.getHost())) + return false; + else if (getPort() != other.getPort()) + return false; + return true; + } + + /* + * (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() + { + return "HttpClientAndBaseUrl [getBaseUrl()=" + getBaseUrl() + ", getHost()=" + getHost() + ", getPort()=" + getPort() + "]"; + } + + } + + /** + * @return + */ + public Pair getHttpClientAndBaseUrl() + { + + if (!policy.configurationIsValid()) + { + throw new AlfrescoRuntimeException("Invalid shard configuration: shard = " + + wrapped.getNumShards() + " reoplicationFactor = " + wrapped.getReplicationFactor() + " with node count = " + httpClientsAndBaseURLs.size()); + } + + int shard = random.nextInt(wrapped.getNumShards()); + int position = random.nextInt(wrapped.getReplicationFactor()); + List nodeInstances = policy.getNodeInstancesForShardId(shard); + Integer nodeId = nodeInstances.get(position); + HttpClientAndBaseUrl httpClientAndBaseUrl = httpClientsAndBaseURLs.toArray(new HttpClientAndBaseUrl[0])[nodeId-1]; + return new Pair<>(httpClientAndBaseUrl.httpClient, isSharded() ? httpClientAndBaseUrl.baseUrl+"-"+shard : httpClientAndBaseUrl.baseUrl); + } + +} diff --git a/source/java/org/alfresco/repo/search/impl/solr/SolrQueryHTTPClient.java b/source/java/org/alfresco/repo/search/impl/solr/SolrQueryHTTPClient.java index 8287de9291..9315da015b 100644 --- a/source/java/org/alfresco/repo/search/impl/solr/SolrQueryHTTPClient.java +++ b/source/java/org/alfresco/repo/search/impl/solr/SolrQueryHTTPClient.java @@ -38,6 +38,8 @@ import org.alfresco.httpclient.HttpClientFactory; import org.alfresco.opencmis.dictionary.CMISStrictDictionaryService; import org.alfresco.repo.admin.RepositoryState; import org.alfresco.repo.domain.node.NodeDAO; +import org.alfresco.repo.index.shard.ShardInstance; +import org.alfresco.repo.index.shard.ShardRegistry; import org.alfresco.repo.search.impl.lucene.JSONResult; import org.alfresco.repo.search.impl.lucene.LuceneQueryParserException; import org.alfresco.repo.search.impl.lucene.SolrJSONResultSet; @@ -100,6 +102,8 @@ public class SolrQueryHTTPClient implements BeanFactoryAware private NodeDAO nodeDAO; private TenantService tenantService; + + private ShardRegistry shardRegistry; private Map languageMappings; @@ -118,6 +122,8 @@ public class SolrQueryHTTPClient implements BeanFactoryAware private int maximumResultsFromUnlimitedQuery = Integer.MAX_VALUE; private boolean anyDenyDenies; + + private boolean useDynamicShardRegistration = false; public static final int DEFAULT_SAVEPOST_BUFFER = 4096; @@ -140,7 +146,7 @@ public class SolrQueryHTTPClient implements BeanFactoryAware for(SolrStoreMapping mapping : storeMappings) { - mappingLookup.put(mapping.getStoreRef(), new SolrStoreMappingWrapper(mapping, beanFactory)); + mappingLookup.put(mapping.getStoreRef(), new ExplicitSolrStoreMappingWrapper(mapping, beanFactory)); } } @@ -180,6 +186,16 @@ public class SolrQueryHTTPClient implements BeanFactoryAware this.tenantService = tenantService; } + public void setShardRegistry(ShardRegistry shardRegistry) + { + this.shardRegistry = shardRegistry; + } + + public void setUseDynamicShardRegistration(boolean useDynamicShardRegistration) + { + this.useDynamicShardRegistration = useDynamicShardRegistration; + } + public void setLanguageMappings(Map languageMappings) { this.languageMappings = languageMappings; @@ -765,13 +781,23 @@ public class SolrQueryHTTPClient implements BeanFactoryAware private SolrStoreMappingWrapper extractMapping(StoreRef store) { - SolrStoreMappingWrapper mappings = mappingLookup.get(store); - - if (mappings == null) + if(useDynamicShardRegistration) { - throw new AlfrescoRuntimeException("No solr query support for store " + store); + SearchParameters sp = new SearchParameters(); + sp.addStore(store); + List slice = shardRegistry.getIndexSlice(sp); + return DynamicSolrStoreMappingWrapperFactory.wrap(slice, beanFactory); + } + else + { + SolrStoreMappingWrapper mappings = mappingLookup.get(store); + + if (mappings == null) + { + throw new AlfrescoRuntimeException("No solr query support for store " + store); + } + return mappings; } - return mappings; } private StoreRef extractStoreRef(BasicSearchParameters searchParameters) diff --git a/source/java/org/alfresco/repo/search/impl/solr/SolrStoreMappingWrapper.java b/source/java/org/alfresco/repo/search/impl/solr/SolrStoreMappingWrapper.java index ea71b69a0a..8787b1a4f2 100644 --- a/source/java/org/alfresco/repo/search/impl/solr/SolrStoreMappingWrapper.java +++ b/source/java/org/alfresco/repo/search/impl/solr/SolrStoreMappingWrapper.java @@ -18,387 +18,29 @@ */ package org.alfresco.repo.search.impl.solr; -import java.io.UnsupportedEncodingException; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; - -import org.alfresco.error.AlfrescoRuntimeException; -import org.alfresco.httpclient.HttpClientFactory; -import org.alfresco.repo.search.impl.lucene.LuceneQueryParserException; -import org.alfresco.service.cmr.repository.StoreRef; import org.alfresco.util.Pair; -import org.alfresco.util.shard.ExplicitShardingPolicy; -import org.apache.commons.codec.net.URLCodec; import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.params.HttpClientParams; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.BeanFactoryAware; -import org.springframework.beans.factory.BeanNameAware; /** * @author Andy + * */ -public class SolrStoreMappingWrapper +public interface SolrStoreMappingWrapper { - - private HttpClientFactory httpClientFactory; - - private LinkedHashSet httpClientsAndBaseURLs = new LinkedHashSet(); - - private ExplicitShardingPolicy policy; - - private Random random; - - private BeanFactory beanFactory; - - private SolrStoreMapping wrapped; - - public SolrStoreMappingWrapper(SolrStoreMapping wrapped, BeanFactory beanFactory) - { - this.wrapped = wrapped; - this.beanFactory = beanFactory; - init(); - } - - public void init() - { - httpClientFactory = (HttpClientFactory)beanFactory.getBean(wrapped.getHttpClientFactory()); - random = new Random(123); - - if ((wrapped.getNodes() == null) || (wrapped.getNodes().length == 0)) - { - HttpClient httpClient = httpClientFactory.getHttpClient(); - HttpClientParams params = httpClient.getParams(); - //params.setBooleanParameter(HttpClientParams.PREEMPTIVE_AUTHENTICATION, true); - //httpClient.getState().setCredentials(new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT), new UsernamePasswordCredentials("admin", "admin")); - httpClientsAndBaseURLs.add(new HttpClientAndBaseUrl(httpClient, wrapped.getBaseUrl())); - } - else - { - for (String node : wrapped.getNodes()) - { - String nodeHost = httpClientFactory.getHost(); - String nodePort = "" + httpClientFactory.getPort(); - String nodeBaseUrl = wrapped.getBaseUrl(); - - if (node.length() > 0) - { - int colon = node.indexOf(':'); - int forward = (colon > -1) ? node.indexOf('/', colon) : node.indexOf('/'); - - if (colon == -1) - { - if (forward == -1) - { - // single value - if (node.startsWith("/")) - { - nodeBaseUrl = node; - } - try - { - int port = Integer.parseInt(node); - nodePort = "" + port; - } - catch (NumberFormatException nfe) - { - nodeHost = node; - } - } - else - { - try - { - String potentialPort = node.substring(0, forward); - if (potentialPort.length() > 0) - { - int port = Integer.parseInt(potentialPort); - nodePort = "" + port; - } - } - catch (NumberFormatException nfe) - { - nodeHost = node.substring(0, forward); - } - nodeBaseUrl = node.substring(forward); - } - } - else - { - if (forward == -1) - { - if (colon > 0) - { - nodeHost = node.substring(0, colon); - } - if (colon + 1 < node.length()) - { - String port = node.substring(colon + 1); - if (port.length() > 0) - { - nodePort = port; - } - } - } - else - { - if (colon > 0) - { - nodeHost = node.substring(0, colon); - } - - String port = node.substring(colon + 1, forward); - if (port.length() > 0) - { - nodePort = port; - } - nodeBaseUrl = node.substring(forward); - - } - } - } - - try - { - int realPort = Integer.parseInt(nodePort); - httpClientsAndBaseURLs.add(new HttpClientAndBaseUrl(httpClientFactory.getHttpClient(nodeHost, realPort), nodeBaseUrl)); - } - catch (NumberFormatException nfe) - { - httpClientsAndBaseURLs.add(new HttpClientAndBaseUrl(httpClientFactory.getHttpClient(nodeHost, httpClientFactory.getPort()), nodeBaseUrl)); - } - } - } - - policy = new ExplicitShardingPolicy(wrapped.getNumShards(), wrapped.getReplicationFactor(), httpClientsAndBaseURLs.size()); - - } - - - - - public boolean isSharded() - { - return wrapped.getNumShards() > 1; - } - - public String getShards() - { - - if (!policy.configurationIsValid()) - { - throw new AlfrescoRuntimeException("Invalid shard configuration: shard = " - + wrapped.getNumShards() + " reoplicationFactor = " + wrapped.getReplicationFactor() + " with node count = " + httpClientsAndBaseURLs.size()); - } - - return getShards2(); - } - - private String getShards1() - { - try - { - URLCodec encoder = new URLCodec(); - StringBuilder builder = new StringBuilder(); - - Set shards = new HashSet(); - for (int i = 0; i < httpClientsAndBaseURLs.size(); i += wrapped.getReplicationFactor()) - { - for (Integer shardId : policy.getShardIdsForNode(i + 1)) - { - if (!shards.contains(shardId % wrapped.getNumShards())) - { - if (shards.size() > 0) - { - builder.append(','); - } - HttpClientAndBaseUrl httpClientAndBaseUrl = httpClientsAndBaseURLs.toArray(new HttpClientAndBaseUrl[0])[i]; - builder.append(encoder.encode(httpClientAndBaseUrl.getHost(), "UTF-8")); - builder.append(':'); - builder.append(encoder.encode("" + httpClientAndBaseUrl.getPort(), "UTF-8")); - if (httpClientAndBaseUrl.getBaseUrl().startsWith("/")) - { - builder.append(encoder.encode(httpClientAndBaseUrl.getBaseUrl(), "UTF-8")); - } - else - { - builder.append(encoder.encode("/" + httpClientAndBaseUrl.getBaseUrl(), "UTF-8")); - } - - builder.append('-').append(shardId); - - shards.add(shardId % wrapped.getNumShards()); - } - - } - } - return builder.toString(); - } - catch (UnsupportedEncodingException e) - { - throw new LuceneQueryParserException("", e); - } - } - - private String getShards2() - { - try - { - URLCodec encoder = new URLCodec(); - StringBuilder builder = new StringBuilder(); - - for (int shard = 0; shard < wrapped.getNumShards(); shard++) - { - int position = random.nextInt(wrapped.getReplicationFactor()); - List nodeInstances = policy.getNodeInstancesForShardId(shard); - Integer nodeId = nodeInstances.get(position); - - if (builder.length() > 0) - { - builder.append(','); - } - HttpClientAndBaseUrl httpClientAndBaseUrl = httpClientsAndBaseURLs.toArray(new HttpClientAndBaseUrl[0])[nodeId-1]; - builder.append(encoder.encode(httpClientAndBaseUrl.getHost(), "UTF-8")); - builder.append(':'); - builder.append(encoder.encode("" + httpClientAndBaseUrl.getPort(), "UTF-8")); - if (httpClientAndBaseUrl.getBaseUrl().startsWith("/")) - { - builder.append(encoder.encode(httpClientAndBaseUrl.getBaseUrl(), "UTF-8")); - } - else - { - builder.append(encoder.encode("/" + httpClientAndBaseUrl.getBaseUrl(), "UTF-8")); - } - - builder.append('-').append(shard); - - } - return builder.toString(); - } - catch (UnsupportedEncodingException e) - { - throw new LuceneQueryParserException("", e); - } - } /** * @return */ - public int getNodeCount() - { - return httpClientsAndBaseURLs.size(); - } - - private static class HttpClientAndBaseUrl - { - - private HttpClient httpClient; - - private String baseUrl; - - HttpClientAndBaseUrl(HttpClient httpClient, String baseUrl) - { - this.httpClient = httpClient; - this.baseUrl = baseUrl; - } - - public String getBaseUrl() - { - return baseUrl; - } - - public String getHost() - { - return httpClient.getHostConfiguration().getHost(); - } - - public int getPort() - { - return httpClient.getHostConfiguration().getPort(); - } - - /* - * (non-Javadoc) - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() - { - final int prime = 31; - int result = 1; - result = prime * result + ((baseUrl == null) ? 0 : baseUrl.hashCode()); - result = prime * result + ((getHost() == null) ? 0 : getHost().hashCode()); - result = prime * result + getPort(); - return result; - } - - /* - * (non-Javadoc) - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) - { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - HttpClientAndBaseUrl other = (HttpClientAndBaseUrl) obj; - if (baseUrl == null) - { - if (other.baseUrl != null) - return false; - } - else if (!baseUrl.equals(other.baseUrl)) - return false; - if (httpClient == null) - { - if (other.httpClient != null) - return false; - } - else if (!getHost().equals(other.getHost())) - return false; - else if (getPort() != other.getPort()) - return false; - return true; - } - - /* - * (non-Javadoc) - * @see java.lang.Object#toString() - */ - @Override - public String toString() - { - return "HttpClientAndBaseUrl [getBaseUrl()=" + getBaseUrl() + ", getHost()=" + getHost() + ", getPort()=" + getPort() + "]"; - } - - } + Pair getHttpClientAndBaseUrl(); /** * @return */ - public Pair getHttpClientAndBaseUrl() - { + boolean isSharded(); + + /** + * @return + */ + String getShards(); - if (!policy.configurationIsValid()) - { - throw new AlfrescoRuntimeException("Invalid shard configuration: shard = " - + wrapped.getNumShards() + " reoplicationFactor = " + wrapped.getReplicationFactor() + " with node count = " + httpClientsAndBaseURLs.size()); - } - - int shard = random.nextInt(wrapped.getNumShards()); - int position = random.nextInt(wrapped.getReplicationFactor()); - List nodeInstances = policy.getNodeInstancesForShardId(shard); - Integer nodeId = nodeInstances.get(position); - HttpClientAndBaseUrl httpClientAndBaseUrl = httpClientsAndBaseURLs.toArray(new HttpClientAndBaseUrl[0])[nodeId-1]; - return new Pair<>(httpClientAndBaseUrl.httpClient, isSharded() ? httpClientAndBaseUrl.baseUrl+"-"+shard : httpClientAndBaseUrl.baseUrl); - } - } diff --git a/source/java/org/alfresco/repo/solr/SOLRTrackingComponent.java b/source/java/org/alfresco/repo/solr/SOLRTrackingComponent.java index 4b40a48f38..8376457e70 100644 --- a/source/java/org/alfresco/repo/solr/SOLRTrackingComponent.java +++ b/source/java/org/alfresco/repo/solr/SOLRTrackingComponent.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; import org.alfresco.repo.domain.node.Node; +import org.alfresco.repo.index.shard.ShardRegistry; +import org.alfresco.repo.index.shard.ShardState; import org.alfresco.service.namespace.QName; /** @@ -166,4 +168,17 @@ public interface SOLRTrackingComponent * @return Long */ public Long getMaxChangeSetId(); + + /** + * Register and update a shard state + * @param shardState + */ + public void registerShardState(ShardState shardState); + + /** + * Get the shard registry + * @return the shard registry or null if one is not registered. + * This is an optional feature. + */ + public ShardRegistry getShardRegistry(); } diff --git a/source/java/org/alfresco/repo/solr/SOLRTrackingComponentImpl.java b/source/java/org/alfresco/repo/solr/SOLRTrackingComponentImpl.java index 6dcc574f4d..ae19ee2ed0 100644 --- a/source/java/org/alfresco/repo/solr/SOLRTrackingComponentImpl.java +++ b/source/java/org/alfresco/repo/solr/SOLRTrackingComponentImpl.java @@ -42,6 +42,9 @@ import org.alfresco.repo.domain.node.NodeDAO.ChildAssocRefQueryCallback; import org.alfresco.repo.domain.permissions.AclDAO; import org.alfresco.repo.domain.qname.QNameDAO; import org.alfresco.repo.domain.solr.SOLRDAO; +import org.alfresco.repo.index.shard.ShardRegistry; +import org.alfresco.repo.index.shard.ShardRegistryImpl; +import org.alfresco.repo.index.shard.ShardState; import org.alfresco.repo.search.AspectIndexFilter; import org.alfresco.repo.search.TypeIndexFilter; import org.alfresco.repo.security.authentication.AuthenticationUtil; @@ -85,6 +88,7 @@ public class SOLRTrackingComponentImpl implements SOLRTrackingComponent private boolean cacheAncestors =true; private TypeIndexFilter typeIndexFilter; private AspectIndexFilter aspectIndexFilter; + private ShardRegistry shardRegistry; @Override @@ -162,6 +166,11 @@ public class SOLRTrackingComponentImpl implements SOLRTrackingComponent this.aspectIndexFilter = aspectIndexFilter; } + public void setShardRegistry(ShardRegistry shardRegistry) + { + this.shardRegistry = shardRegistry; + } + /** * Initialize */ @@ -1225,4 +1234,25 @@ public class SOLRTrackingComponentImpl implements SOLRTrackingComponent long maxCommitTime = System.currentTimeMillis()+1L; return aclDAO.getMaxChangeSetIdByCommitTime(maxCommitTime); } + + /* (non-Javadoc) + * @see org.alfresco.repo.solr.SOLRTrackingComponent#registerShardState(org.alfresco.repo.index.ShardState) + */ + @Override + public void registerShardState(ShardState shardState) + { + if(shardRegistry != null) + { + shardRegistry.registerShardState(shardState); + } + } + + /* (non-Javadoc) + * @see org.alfresco.repo.solr.SOLRTrackingComponent#getShardRegistry() + */ + @Override + public ShardRegistry getShardRegistry() + { + return this.shardRegistry; + } } diff --git a/source/test-java/org/alfresco/repo/search/impl/solr/SolrStoreMappingWrapperTest.java b/source/test-java/org/alfresco/repo/search/impl/solr/SolrStoreMappingWrapperTest.java index ed600c7a15..c387972276 100644 --- a/source/test-java/org/alfresco/repo/search/impl/solr/SolrStoreMappingWrapperTest.java +++ b/source/test-java/org/alfresco/repo/search/impl/solr/SolrStoreMappingWrapperTest.java @@ -46,7 +46,7 @@ public class SolrStoreMappingWrapperTest { SolrStoreMapping mapping; - SolrStoreMappingWrapper wrapper; + ExplicitSolrStoreMappingWrapper wrapper; @Mock HttpClientFactory httpClientFactory; @@ -113,7 +113,7 @@ public class SolrStoreMappingWrapperTest private SolrStoreMapping unsharded; - private SolrStoreMappingWrapper unshardedWrapper; + private ExplicitSolrStoreMappingWrapper unshardedWrapper; @Mock private BeanFactory beanFactory; @@ -190,7 +190,7 @@ public class SolrStoreMappingWrapperTest mapping.setNumShards(24); mapping.setProtocol(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE.getProtocol()); mapping.setReplicationFactor(3); - wrapper = new SolrStoreMappingWrapper(mapping, beanFactory); + wrapper = new ExplicitSolrStoreMappingWrapper(mapping, beanFactory); @@ -199,7 +199,7 @@ public class SolrStoreMappingWrapperTest unsharded.setHttpClientFactory("httpClientFactory"); unsharded.setIdentifier(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE.getIdentifier()); unsharded.setProtocol(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE.getProtocol()); - unshardedWrapper = new SolrStoreMappingWrapper(unsharded, beanFactory); + unshardedWrapper = new ExplicitSolrStoreMappingWrapper(unsharded, beanFactory); }