Merge branch 'feature/contentStoreReplication' of https://git.alfresco.com/search_discovery/insightengine into feature/contentStoreReplication

 Conflicts:
	search-services/alfresco-search/src/main/java/org/alfresco/solr/content/SolrContentStore.java
This commit is contained in:
agazzarini
2019-10-04 09:49:14 +02:00
3 changed files with 54 additions and 55 deletions

View File

@@ -81,6 +81,7 @@ public class SolrContentStore implements Closeable, ReplicationRole
protected final static Logger log = LoggerFactory.getLogger(SolrContentStore.class);
static final long NO_VERSION_AVAILABLE = -1L;
public static final long NO_CONTENT_STORE_REPLICATION_REQUIRED = -2L;
static final String CONTENT_STORE = "contentstore";
static final String SOLR_CONTENT_DIR = "solr.content.dir";

View File

@@ -355,8 +355,8 @@ public class IndexFetcher {
}
}
IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean replicateContentStore) throws IOException, InterruptedException {
return fetchLatestIndex(forceReplication, false, replicateContentStore);
IndexFetchResult fetchLatestIndex(boolean forceReplication) throws IOException, InterruptedException {
return fetchLatestIndex(forceReplication, false);
}
/**
@@ -365,11 +365,10 @@ public class IndexFetcher {
*
* @param forceReplication force a replication in all cases
* @param forceCoreReload force a core reload in all cases
* @param contentStoreReplication replicate contentStore
* @return true on success, false if slave is already in sync
* @throws IOException if an exception occurs
*/
IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload, boolean contentStoreReplication)
IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload)
throws IOException, InterruptedException {
boolean cleanupDone = false;
@@ -405,15 +404,10 @@ public class IndexFetcher {
}
}
long slaveContentStoreVersion = contentStore.getContentStoreVersion();
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
long masterContentStoreVersion = contentStoreReplication? (Long) response.get(CONTENT_STORE_VERSION) : -1;
long masterContentStoreVersion = (Long) response.get(CONTENT_STORE_VERSION);
long latestGeneration = (Long) response.get(GENERATION);
boolean contentStoreReplicationNeeded = contentStoreReplication && (masterContentStoreVersion != slaveContentStoreVersion);
LOG.info("Master's generation: " + latestGeneration);
LOG.info("Master's version: " + latestVersion);
@@ -457,6 +451,13 @@ public class IndexFetcher {
return IndexFetchResult.MASTER_VERSION_ZERO;
}
// The following session should make sure that if replication is happening in more cores at the same time,
// the contentStore replication is done only once.
boolean replicateContentStore = replicationHandler.acquireContentStoreReplicationTask();
long slaveContentStoreVersion = replicateContentStore? contentStore.getLastCommittedVersion() : SolrContentStore.NO_CONTENT_STORE_REPLICATION_REQUIRED;
boolean contentStoreReplicationNeeded = replicateContentStore && (masterContentStoreVersion != slaveContentStoreVersion);
// TODO: Should we be comparing timestamps (across machines) here?
if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
//master and slave are already in sync just return
@@ -572,7 +573,7 @@ public class IndexFetcher {
deleteContentStoreFiles(contentStore.getRootLocation(), contentStoreFilesToDelete);
}
contentStore.setContentStoreVersion(masterContentStoreVersion);
contentStore.setLastCommittedVersion(masterContentStoreVersion);
}
@@ -668,7 +669,7 @@ public class IndexFetcher {
LOG.warn(
"Replication attempt was not successful - trying a full index replication reloadCore={}",
reloadCore);
successfulInstall = fetchLatestIndex(true, reloadCore, contentStoreReplication).getSuccessful();
successfulInstall = fetchLatestIndex(true, reloadCore).getSuccessful();
}
markReplicationStop();
@@ -1955,32 +1956,21 @@ public class IndexFetcher {
@Override
protected void fetch() throws Exception {
try {
while (true) {
final FastInputStream is = getStream();
int result;
try {
//fetch packets one by one in a single request
result = fetchPackets(is);
if (result == 0 || result == NO_CONTENT) {
return;
}
//if there is an error continue. But continue from the point where it got broken
} finally {
IOUtils.closeQuietly(is);
while (true) {
final FastInputStream is = getStream();
int result;
try {
//fetch packets one by one in a single request
result = fetchPackets(is);
if (result == 0 || result == NO_CONTENT) {
return;
}
//if there is an error continue. But continue from the point where it got broken
} finally {
IOUtils.closeQuietly(is);
}
} finally {
// cleanup();
//// if cleanup succeeds . The file is downloaded fully. do an fsync
// fsyncService.submit(() -> {
// try {
// file.sync();
// } catch (IOException e) {
// fsyncException = e;
// }
// });
}
}

View File

@@ -139,6 +139,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private SolrContentStore contentStore;
private volatile boolean closed = false;
private boolean contentStoreReplication = false;
public static final class CommitVersionInfo {
public final long version;
@@ -406,24 +407,37 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private volatile IndexFetcher currentIndexFetcher;
public boolean acquireContentStoreReplicationTask(){
contentStoreReplicationLock.lock();
if (!isContentStoreReplicating)
{
contentStoreReplication = true;
isContentStoreReplicating = true;
}
contentStoreReplicationLock.unlock();
return contentStoreReplication;
}
public void releaseContentStoreReplicationTask()
{
if (contentStoreReplication)
{
contentStoreReplicationLock.lock();
isContentStoreReplicating = false;
contentStoreReplication = false;
contentStoreReplicationLock.unlock();
}
}
public IndexFetcher.IndexFetchResult doFetch(SolrParams solrParams, boolean forceReplication) {
boolean contentStoreReplication = false;
String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
if (!indexFetchLock.tryLock())
return IndexFetcher.IndexFetchResult.LOCK_OBTAIN_FAILED;
try {
contentStoreReplicationLock.lock();
if (!isContentStoreReplicating)
{
contentStoreReplication = true;
isContentStoreReplicating = true;
}
contentStoreReplicationLock.unlock();
if (masterUrl != null) {
if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
currentIndexFetcher.destroy();
@@ -432,7 +446,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
} else {
currentIndexFetcher = pollingIndexFetcher;
}
return currentIndexFetcher.fetchLatestIndex(forceReplication, contentStoreReplication);
return currentIndexFetcher.fetchLatestIndex(forceReplication);
} catch (Exception e) {
SolrException.log(LOG, "Index fetch failed ", e);
return new IndexFetcher.IndexFetchResult(IndexFetcher.IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e);
@@ -441,13 +455,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
currentIndexFetcher = pollingIndexFetcher;
}
if (contentStoreReplication)
{
contentStoreReplicationLock.lock();
isContentStoreReplicating = false;
contentStoreReplicationLock.unlock();
}
releaseContentStoreReplicationTask();
indexFetchLock.unlock();
}
}
@@ -749,7 +757,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
//if configuration files need to be included get their details
rsp.add(CONF_FILES, getConfFileInfoFromCache(confFileNameAlias, confFileInfoCache));
if (contentStoreGeneration != -1)
if (contentStoreGeneration != SolrContentStore.NO_CONTENT_STORE_REPLICATION_REQUIRED)
{
Map<String, List<Map<String, Object>>> changes = contentStore.getChanges(contentStoreGeneration);
rsp.add(CONTENT_STORE_FILES, changes);