diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java index 591b7e532..9e7df88e8 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java @@ -81,6 +81,7 @@ public class SolrContentStore implements Closeable, ReplicationRole protected final static Logger log = LoggerFactory.getLogger(SolrContentStore.class); static final long NO_VERSION_AVAILABLE = -1L; + public static final long NO_CONTENT_STORE_REPLICATION_REQUIRED = -2L; static final String CONTENT_STORE = "contentstore"; static final String SOLR_CONTENT_DIR = "solr.content.dir"; diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java index b02660cce..4396dedac 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java @@ -355,8 +355,8 @@ public class IndexFetcher { } } - IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean replicateContentStore) throws IOException, InterruptedException { - return fetchLatestIndex(forceReplication, false, replicateContentStore); + IndexFetchResult fetchLatestIndex(boolean forceReplication) throws IOException, InterruptedException { + return fetchLatestIndex(forceReplication, false); } /** @@ -365,11 +365,10 @@ public class IndexFetcher { * * @param forceReplication force a replication in all cases * @param forceCoreReload force a core reload in all cases - * @param contentStoreReplication replicate contentStore * @return true on success, false if slave is already in sync * @throws IOException if an exception occurs */ - IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload, boolean contentStoreReplication) + IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException { boolean cleanupDone = false; @@ -405,15 +404,10 @@ public class IndexFetcher { } } - - - long slaveContentStoreVersion = contentStore.getContentStoreVersion(); long latestVersion = (Long) response.get(CMD_INDEX_VERSION); - long masterContentStoreVersion = contentStoreReplication? (Long) response.get(CONTENT_STORE_VERSION) : -1; + long masterContentStoreVersion = (Long) response.get(CONTENT_STORE_VERSION); long latestGeneration = (Long) response.get(GENERATION); - boolean contentStoreReplicationNeeded = contentStoreReplication && (masterContentStoreVersion != slaveContentStoreVersion); - LOG.info("Master's generation: " + latestGeneration); LOG.info("Master's version: " + latestVersion); @@ -457,6 +451,13 @@ public class IndexFetcher { return IndexFetchResult.MASTER_VERSION_ZERO; } + // The following session should make sure that if replication is happening in more cores at the same time, + // the contentStore replication is done only once. + boolean replicateContentStore = replicationHandler.acquireContentStoreReplicationTask(); + long slaveContentStoreVersion = replicateContentStore? contentStore.getLastCommittedVersion() : SolrContentStore.NO_CONTENT_STORE_REPLICATION_REQUIRED; + boolean contentStoreReplicationNeeded = replicateContentStore && (masterContentStoreVersion != slaveContentStoreVersion); + + // TODO: Should we be comparing timestamps (across machines) here? if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) { //master and slave are already in sync just return @@ -572,7 +573,7 @@ public class IndexFetcher { deleteContentStoreFiles(contentStore.getRootLocation(), contentStoreFilesToDelete); } - contentStore.setContentStoreVersion(masterContentStoreVersion); + contentStore.setLastCommittedVersion(masterContentStoreVersion); } @@ -668,7 +669,7 @@ public class IndexFetcher { LOG.warn( "Replication attempt was not successful - trying a full index replication reloadCore={}", reloadCore); - successfulInstall = fetchLatestIndex(true, reloadCore, contentStoreReplication).getSuccessful(); + successfulInstall = fetchLatestIndex(true, reloadCore).getSuccessful(); } markReplicationStop(); @@ -1955,32 +1956,21 @@ public class IndexFetcher { @Override protected void fetch() throws Exception { - try { - while (true) { - final FastInputStream is = getStream(); - int result; - try { - //fetch packets one by one in a single request - result = fetchPackets(is); - if (result == 0 || result == NO_CONTENT) { - return; - } - //if there is an error continue. But continue from the point where it got broken - } finally { - IOUtils.closeQuietly(is); + while (true) { + final FastInputStream is = getStream(); + int result; + try { + //fetch packets one by one in a single request + result = fetchPackets(is); + if (result == 0 || result == NO_CONTENT) { + return; } + //if there is an error continue. But continue from the point where it got broken + } finally { + IOUtils.closeQuietly(is); } - } finally { -// cleanup(); -//// if cleanup succeeds . The file is downloaded fully. do an fsync -// fsyncService.submit(() -> { -// try { -// file.sync(); -// } catch (IOException e) { -// fsyncException = e; -// } -// }); } + } diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java index 397acd62d..e3b13c76d 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java @@ -139,6 +139,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw private SolrContentStore contentStore; private volatile boolean closed = false; + private boolean contentStoreReplication = false; public static final class CommitVersionInfo { public final long version; @@ -406,24 +407,37 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw private volatile IndexFetcher currentIndexFetcher; + public boolean acquireContentStoreReplicationTask(){ + contentStoreReplicationLock.lock(); + if (!isContentStoreReplicating) + { + contentStoreReplication = true; + isContentStoreReplicating = true; + } + + contentStoreReplicationLock.unlock(); + return contentStoreReplication; + } + + public void releaseContentStoreReplicationTask() + { + if (contentStoreReplication) + { + contentStoreReplicationLock.lock(); + isContentStoreReplicating = false; + contentStoreReplication = false; + contentStoreReplicationLock.unlock(); + } + } + + public IndexFetcher.IndexFetchResult doFetch(SolrParams solrParams, boolean forceReplication) { - boolean contentStoreReplication = false; String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL); if (!indexFetchLock.tryLock()) return IndexFetcher.IndexFetchResult.LOCK_OBTAIN_FAILED; try { - contentStoreReplicationLock.lock(); - if (!isContentStoreReplicating) - { - contentStoreReplication = true; - isContentStoreReplicating = true; - } - - contentStoreReplicationLock.unlock(); - - if (masterUrl != null) { if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) { currentIndexFetcher.destroy(); @@ -432,7 +446,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw } else { currentIndexFetcher = pollingIndexFetcher; } - return currentIndexFetcher.fetchLatestIndex(forceReplication, contentStoreReplication); + return currentIndexFetcher.fetchLatestIndex(forceReplication); } catch (Exception e) { SolrException.log(LOG, "Index fetch failed ", e); return new IndexFetcher.IndexFetchResult(IndexFetcher.IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e); @@ -441,13 +455,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw currentIndexFetcher = pollingIndexFetcher; } - if (contentStoreReplication) - { - contentStoreReplicationLock.lock(); - isContentStoreReplicating = false; - contentStoreReplicationLock.unlock(); - } - + releaseContentStoreReplicationTask(); indexFetchLock.unlock(); } } @@ -749,7 +757,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw //if configuration files need to be included get their details rsp.add(CONF_FILES, getConfFileInfoFromCache(confFileNameAlias, confFileInfoCache)); - if (contentStoreGeneration != -1) + if (contentStoreGeneration != SolrContentStore.NO_CONTENT_STORE_REPLICATION_REQUIRED) { Map>> changes = contentStore.getChanges(contentStoreGeneration); rsp.add(CONTENT_STORE_FILES, changes);