From 23c836f0ce4658df155f2667107331210064b3bd Mon Sep 17 00:00:00 2001 From: eliaporciani Date: Thu, 3 Oct 2019 22:09:45 +0200 Subject: [PATCH 1/3] [contentStoreReplication] code formatting --- .../solr/content/SolrContentStore.java | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java index 7ae49bc96..e8a9a1d44 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java @@ -79,6 +79,7 @@ public class SolrContentStore implements ContentStore, Closeable { protected final static Logger log = LoggerFactory.getLogger(SolrContentStore.class); static final long NO_VERSION_AVAILABLE = -1L; + static final long NO_CONTENT_STORE_REPLICATION_REQUIRED = -2L; static final String CONTENT_STORE = "contentstore"; static final String SOLR_CONTENT_DIR = "solr.content.dir"; @@ -389,7 +390,8 @@ public class SolrContentStore implements ContentStore, Closeable } @Override - public void close() throws IOException { + public void close() throws IOException + { changeSet.close(); } @@ -397,8 +399,10 @@ public class SolrContentStore implements ContentStore, Closeable * Set a new version of content store version * @param contentStoreVersion */ - public void setContentStoreVersion(Long contentStoreVersion) { - try { + public void setContentStoreVersion(Long contentStoreVersion) + { + try + { File tmpFile = new File(root, ".version-" + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date())); FileWriter wr = new FileWriter(tmpFile); @@ -407,29 +411,35 @@ public class SolrContentStore implements ContentStore, Closeable tmpFile.renameTo(new File(root, ".version")); - } catch (IOException e) { + } + catch (IOException e) + { e.printStackTrace(); } - } - public Long getContentStoreVersion() { + public Long getContentStoreVersion() + { return getPersistedContentStoreVersion(); } - - private Long getPersistedContentStoreVersion(){ - try { + private Long getPersistedContentStoreVersion() + { + try + { return Files.lines(Paths.get(root, VERSION_FILE)) .map(Long::parseLong) - .findFirst().orElseThrow(); - } catch (IOException e) { - return 0l; + .findFirst().orElse(NO_VERSION_AVAILABLE); + } + catch (IOException e) + { + return NO_VERSION_AVAILABLE; } } - public void flushChangeSet() throws IOException { + public void flushChangeSet() throws IOException + { changeSet.flush(); } From 67bb3b919a7bde8bfb0de943e5c3253e46f2fe2d Mon Sep 17 00:00:00 2001 From: eliaporciani Date: Thu, 3 Oct 2019 22:10:51 +0200 Subject: [PATCH 2/3] [contentStoreReplication] modified mechanism for getting content store replication task. --- .../alfresco/solr/handler/IndexFetcher.java | 24 +++++----- .../solr/handler/ReplicationHandler.java | 46 +++++++++++-------- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java index b02660cce..51b742e85 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java @@ -355,8 +355,8 @@ public class IndexFetcher { } } - IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean replicateContentStore) throws IOException, InterruptedException { - return fetchLatestIndex(forceReplication, false, replicateContentStore); + IndexFetchResult fetchLatestIndex(boolean forceReplication) throws IOException, InterruptedException { + return fetchLatestIndex(forceReplication, false); } /** @@ -365,11 +365,11 @@ public class IndexFetcher { * * @param forceReplication force a replication in all cases * @param forceCoreReload force a core reload in all cases - * @param contentStoreReplication replicate contentStore + * @param replicateContentStore replicate contentStore * @return true on success, false if slave is already in sync * @throws IOException if an exception occurs */ - IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload, boolean contentStoreReplication) + IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException { boolean cleanupDone = false; @@ -405,15 +405,10 @@ public class IndexFetcher { } } - - - long slaveContentStoreVersion = contentStore.getContentStoreVersion(); long latestVersion = (Long) response.get(CMD_INDEX_VERSION); - long masterContentStoreVersion = contentStoreReplication? (Long) response.get(CONTENT_STORE_VERSION) : -1; + long masterContentStoreVersion = (Long) response.get(CONTENT_STORE_VERSION); long latestGeneration = (Long) response.get(GENERATION); - boolean contentStoreReplicationNeeded = contentStoreReplication && (masterContentStoreVersion != slaveContentStoreVersion); - LOG.info("Master's generation: " + latestGeneration); LOG.info("Master's version: " + latestVersion); @@ -457,6 +452,13 @@ public class IndexFetcher { return IndexFetchResult.MASTER_VERSION_ZERO; } + // The following session should make sure that if replication is happening in more cores at the same time, + // the contentStore replication is done only once. + boolean replicateContentStore = replicationHandler.acquireContentStoreReplicationTask(); + long slaveContentStoreVersion = replicateContentStore? contentStore.getContentStoreVersion() : -1; + boolean contentStoreReplicationNeeded = replicateContentStore && (masterContentStoreVersion != slaveContentStoreVersion); + + // TODO: Should we be comparing timestamps (across machines) here? if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) { //master and slave are already in sync just return @@ -668,7 +670,7 @@ public class IndexFetcher { LOG.warn( "Replication attempt was not successful - trying a full index replication reloadCore={}", reloadCore); - successfulInstall = fetchLatestIndex(true, reloadCore, contentStoreReplication).getSuccessful(); + successfulInstall = fetchLatestIndex(true, reloadCore).getSuccessful(); } markReplicationStop(); diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java index 397acd62d..a78732fef 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java @@ -139,6 +139,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw private SolrContentStore contentStore; private volatile boolean closed = false; + private boolean contentStoreReplication = false; public static final class CommitVersionInfo { public final long version; @@ -406,24 +407,37 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw private volatile IndexFetcher currentIndexFetcher; + public boolean acquireContentStoreReplicationTask(){ + contentStoreReplicationLock.lock(); + if (!isContentStoreReplicating) + { + contentStoreReplication = true; + isContentStoreReplicating = true; + } + + contentStoreReplicationLock.unlock(); + return contentStoreReplication; + } + + public void releaseContentStoreReplicationTask() + { + if (contentStoreReplication) + { + contentStoreReplicationLock.lock(); + isContentStoreReplicating = false; + contentStoreReplication = false; + contentStoreReplicationLock.unlock(); + } + } + + public IndexFetcher.IndexFetchResult doFetch(SolrParams solrParams, boolean forceReplication) { - boolean contentStoreReplication = false; String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL); if (!indexFetchLock.tryLock()) return IndexFetcher.IndexFetchResult.LOCK_OBTAIN_FAILED; try { - contentStoreReplicationLock.lock(); - if (!isContentStoreReplicating) - { - contentStoreReplication = true; - isContentStoreReplicating = true; - } - - contentStoreReplicationLock.unlock(); - - if (masterUrl != null) { if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) { currentIndexFetcher.destroy(); @@ -432,7 +446,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw } else { currentIndexFetcher = pollingIndexFetcher; } - return currentIndexFetcher.fetchLatestIndex(forceReplication, contentStoreReplication); + return currentIndexFetcher.fetchLatestIndex(forceReplication); } catch (Exception e) { SolrException.log(LOG, "Index fetch failed ", e); return new IndexFetcher.IndexFetchResult(IndexFetcher.IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e); @@ -441,13 +455,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw currentIndexFetcher = pollingIndexFetcher; } - if (contentStoreReplication) - { - contentStoreReplicationLock.lock(); - isContentStoreReplicating = false; - contentStoreReplicationLock.unlock(); - } - + releaseContentStoreReplicationTask(); indexFetchLock.unlock(); } } From bca077f3bd514c7901acc1dd9964d03d61959ecb Mon Sep 17 00:00:00 2001 From: eliaporciani Date: Fri, 4 Oct 2019 09:40:44 +0200 Subject: [PATCH 3/3] [contentStoreReplication] code refactoring --- .../solr/content/SolrContentStore.java | 4 +- .../alfresco/solr/handler/IndexFetcher.java | 38 +++++++------------ .../solr/handler/ReplicationHandler.java | 2 +- 3 files changed, 16 insertions(+), 28 deletions(-) diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java index e8a9a1d44..ddb5bfce4 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java @@ -78,8 +78,8 @@ import java.util.zip.GZIPOutputStream; public class SolrContentStore implements ContentStore, Closeable { protected final static Logger log = LoggerFactory.getLogger(SolrContentStore.class); - static final long NO_VERSION_AVAILABLE = -1L; - static final long NO_CONTENT_STORE_REPLICATION_REQUIRED = -2L; + public static final long NO_VERSION_AVAILABLE = -1L; + public static final long NO_CONTENT_STORE_REPLICATION_REQUIRED = -2L; static final String CONTENT_STORE = "contentstore"; static final String SOLR_CONTENT_DIR = "solr.content.dir"; diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java index 51b742e85..168c6e336 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/IndexFetcher.java @@ -365,7 +365,6 @@ public class IndexFetcher { * * @param forceReplication force a replication in all cases * @param forceCoreReload force a core reload in all cases - * @param replicateContentStore replicate contentStore * @return true on success, false if slave is already in sync * @throws IOException if an exception occurs */ @@ -455,7 +454,7 @@ public class IndexFetcher { // The following session should make sure that if replication is happening in more cores at the same time, // the contentStore replication is done only once. boolean replicateContentStore = replicationHandler.acquireContentStoreReplicationTask(); - long slaveContentStoreVersion = replicateContentStore? contentStore.getContentStoreVersion() : -1; + long slaveContentStoreVersion = replicateContentStore? contentStore.getContentStoreVersion() : SolrContentStore.NO_CONTENT_STORE_REPLICATION_REQUIRED; boolean contentStoreReplicationNeeded = replicateContentStore && (masterContentStoreVersion != slaveContentStoreVersion); @@ -1957,32 +1956,21 @@ public class IndexFetcher { @Override protected void fetch() throws Exception { - try { - while (true) { - final FastInputStream is = getStream(); - int result; - try { - //fetch packets one by one in a single request - result = fetchPackets(is); - if (result == 0 || result == NO_CONTENT) { - return; - } - //if there is an error continue. But continue from the point where it got broken - } finally { - IOUtils.closeQuietly(is); + while (true) { + final FastInputStream is = getStream(); + int result; + try { + //fetch packets one by one in a single request + result = fetchPackets(is); + if (result == 0 || result == NO_CONTENT) { + return; } + //if there is an error continue. But continue from the point where it got broken + } finally { + IOUtils.closeQuietly(is); } - } finally { -// cleanup(); -//// if cleanup succeeds . The file is downloaded fully. do an fsync -// fsyncService.submit(() -> { -// try { -// file.sync(); -// } catch (IOException e) { -// fsyncException = e; -// } -// }); } + } diff --git a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java index a78732fef..e3b13c76d 100644 --- a/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java +++ b/search-services/alfresco-search/src/main/java/org/alfresco/solr/handler/ReplicationHandler.java @@ -757,7 +757,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw //if configuration files need to be included get their details rsp.add(CONF_FILES, getConfFileInfoFromCache(confFileNameAlias, confFileInfoCache)); - if (contentStoreGeneration != -1) + if (contentStoreGeneration != SolrContentStore.NO_CONTENT_STORE_REPLICATION_REQUIRED) { Map>> changes = contentStore.getChanges(contentStoreGeneration); rsp.add(CONTENT_STORE_FILES, changes);