mirror of
https://github.com/Alfresco/SearchServices.git
synced 2025-09-17 14:21:20 +00:00
Merge branch 'feature/SEARCH_2233_additional_params_in_fix_tool' into 'master'
Feature/search 2233 additional params in fix tool See merge request search_discovery/insightengine!535
This commit is contained in:
@@ -29,6 +29,7 @@ package org.alfresco.test.search.functional.searchServices.solr.admin;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.alfresco.rest.core.RestResponse;
|
||||
import org.alfresco.search.TestGroup;
|
||||
@@ -603,56 +604,48 @@ public class SolrE2eAdminTest extends AbstractE2EFunctionalTest
|
||||
Assert.assertEquals(actionStatus, "scheduled");
|
||||
}
|
||||
|
||||
/**
|
||||
* FIX for every core.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(priority = 27, dependsOnMethods = "testPurge")
|
||||
public void testFix() throws Exception
|
||||
{
|
||||
RestResponse response = restClient.withSolrAdminAPI().getAction("fix");
|
||||
|
||||
checkResponseStatusOk(response);
|
||||
|
||||
DEFAULT_CORE_NAMES.forEach(core -> {
|
||||
List<String> txToReindex = response.getResponse().body().jsonPath().get("action." + core +".txToReindex");
|
||||
Assert.assertTrue(txToReindex.size() >= 0, "Expected a list of transactions (or empty list) to be reindexed,");
|
||||
List<String> aclToReindex = response.getResponse().body().jsonPath().get("action." + core + ".aclChangeSetToReindex");
|
||||
Assert.assertTrue(aclToReindex.size() >= 0, "Expected a list of ACLs (or empty list) to be reindexed,");
|
||||
});
|
||||
|
||||
String actionStatus = response.getResponse().body().jsonPath().get("action.status");
|
||||
Assert.assertEquals(actionStatus, "scheduled");
|
||||
}
|
||||
|
||||
/**
|
||||
* FIX for specific core.
|
||||
* @throws Exception
|
||||
* The test checks the response structure in order to make sure the expected sections are present.
|
||||
*
|
||||
* We are not testing the content of each section because due to the underlying E2E infrastructure, we cannot know
|
||||
* in advance the transactions that will be scheduled for reindexing.
|
||||
*/
|
||||
@Test(priority = 28)
|
||||
public void testFixCore() throws Exception
|
||||
public void testFixCore()
|
||||
{
|
||||
DEFAULT_CORE_NAMES.forEach(core -> {
|
||||
|
||||
try
|
||||
{
|
||||
RestResponse response = restClient.withParams("core=" + core).withSolrAdminAPI().getAction("fix");
|
||||
|
||||
checkResponseStatusOk(response);
|
||||
|
||||
List<String> txToReindex = response.getResponse().body().jsonPath().get("action." + core +".txToReindex");
|
||||
Assert.assertTrue(txToReindex.size() >= 0, "Expected a list of transactions (or empty list) to be reindexed,");
|
||||
List<String> aclToReindex = response.getResponse().body().jsonPath().get("action." + core + ".aclChangeSetToReindex");
|
||||
Assert.assertTrue(aclToReindex.size() >= 0, "Expected a list of ACLs (or empty list) to be reindexed,");
|
||||
|
||||
Map<String, Object> txInIndexNotInDb = response.getResponse().body().jsonPath().get("action." + core +".txToReindex.txInIndexNotInDb");
|
||||
Assert.assertNotNull(txInIndexNotInDb, "Expected a list of transactions (even empty) that are in index but not in the database to be reindexed,");
|
||||
|
||||
Map<String, Object> duplicatedTx = response.getResponse().body().jsonPath().get("action." + core +".txToReindex.duplicatedTxInIndex");
|
||||
Assert.assertNotNull(duplicatedTx, "Expected a list of duplicated transactions (even empty) to be reindexed,");
|
||||
|
||||
Map<String, Object> missingTx = response.getResponse().body().jsonPath().get("action." + core +".txToReindex.missingTxInIndex");
|
||||
Assert.assertNotNull(missingTx, "Expected a list of missing transactions (or empty list) to be reindexed,");
|
||||
|
||||
Map<String, Object> aclTxInIndexNotInDb = response.getResponse().body().jsonPath().get("action." + core + ".aclChangeSetToReindex.aclTxInIndexNotInDb");
|
||||
Assert.assertNotNull(aclTxInIndexNotInDb, "Expected a list of ACLs (or empty list) to be reindexed,");
|
||||
|
||||
Map<String, Object> duplicatedAclTxInIndex = response.getResponse().body().jsonPath().get("action." + core + ".aclChangeSetToReindex.duplicatedAclTxInIndex");
|
||||
Assert.assertNotNull(duplicatedAclTxInIndex, "Expected a list of ACLs (or empty list) to be reindexed,");
|
||||
|
||||
Map<String, Object> missingAclTxInIndex = response.getResponse().body().jsonPath().get("action." + core + ".aclChangeSetToReindex.missingAclTxInIndex");
|
||||
Assert.assertNotNull(missingAclTxInIndex, "Expected a list of ACLs (or empty list) to be reindexed,");
|
||||
|
||||
String actionStatus = response.getResponse().body().jsonPath().get("action.status");
|
||||
Assert.assertEquals(actionStatus, "scheduled");
|
||||
Assert.assertEquals(actionStatus, "notScheduled");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
|
@@ -54,6 +54,7 @@ import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.handler.admin.CoreAdminHandler;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
@@ -76,16 +77,19 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.LongToIntFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Arrays.stream;
|
||||
import static java.util.Optional.of;
|
||||
import static java.util.Optional.ofNullable;
|
||||
import static org.alfresco.repo.search.adaptor.lucene.QueryConstants.FIELD_INACLTXID;
|
||||
import static org.alfresco.repo.search.adaptor.lucene.QueryConstants.FIELD_INTXID;
|
||||
import static org.alfresco.solr.HandlerOfResources.extractCustomProperties;
|
||||
import static org.alfresco.solr.HandlerOfResources.getSafeBoolean;
|
||||
import static org.alfresco.solr.HandlerOfResources.getSafeLong;
|
||||
@@ -99,6 +103,7 @@ import static org.alfresco.solr.HandlerReportHelper.buildAclTxReport;
|
||||
import static org.alfresco.solr.HandlerReportHelper.buildNodeReport;
|
||||
import static org.alfresco.solr.HandlerReportHelper.buildTrackerReport;
|
||||
import static org.alfresco.solr.HandlerReportHelper.buildTxReport;
|
||||
import static org.alfresco.solr.utils.Utils.isNullOrEmpty;
|
||||
import static org.alfresco.solr.utils.Utils.notNullOrEmpty;
|
||||
|
||||
/**
|
||||
@@ -160,22 +165,39 @@ public class AlfrescoCoreAdminHandler extends CoreAdminHandler
|
||||
*/
|
||||
private static final String ACTION_STATUS_SUCCESS = "success";
|
||||
private static final String ACTION_STATUS_ERROR = "error";
|
||||
private static final String ACTION_STATUS_SCHEDULED = "scheduled";
|
||||
static final String ACTION_STATUS_SCHEDULED = "scheduled";
|
||||
static final String ACTION_STATUS_NOT_SCHEDULED = "notScheduled";
|
||||
|
||||
static final String DRY_RUN_PARAMETER_NAME = "dryRun";
|
||||
static final String FROM_TX_COMMIT_TIME_PARAMETER_NAME = "fromTxCommitTime";
|
||||
static final String TO_TX_COMMIT_TIME_PARAMETER_NAME = "toTxCommitTime";
|
||||
static final String MAX_TRANSACTIONS_TO_SCHEDULE_PARAMETER_NAME = "maxScheduledTransactions";
|
||||
static final String MAX_TRANSACTIONS_TO_SCHEDULE_CONF_PROPERTY_NAME = "alfresco.admin.fix.maxScheduledTransactions";
|
||||
static final String TX_IN_INDEX_NOT_IN_DB = "txInIndexNotInDb";
|
||||
static final String DUPLICATED_TX_IN_INDEX = "duplicatedTxInIndex";
|
||||
static final String MISSING_TX_IN_INDEX = "missingTxInIndex";
|
||||
static final String ACL_TX_IN_INDEX_NOT_IN_DB = "aclTxInIndexNotInDb";
|
||||
static final String DUPLICATED_ACL_TX_IN_INDEX = "duplicatedAclTxInIndex";
|
||||
static final String MISSING_ACL_TX_IN_INDEX = "missingAclTxInIndex";
|
||||
|
||||
/**
|
||||
* JSON/XML labels for the Action response
|
||||
*/
|
||||
private static final String ACTION_LABEL = "action";
|
||||
private static final String ACTION_STATUS_LABEL = "status";
|
||||
private static final String ACTION_ERROR_MESSAGE_LABEL = "errorMessage";
|
||||
static final String ACTION_STATUS_LABEL = "status";
|
||||
|
||||
static final String ACTION_ERROR_MESSAGE_LABEL = "errorMessage";
|
||||
static final String UNKNOWN_CORE_MESSAGE = "Unknown core:";
|
||||
static final String UNPROCESSABLE_REQUEST_ON_SLAVE_NODES = "Requested action cannot be performed on slave nodes.";
|
||||
|
||||
private static final String ACTION_TX_TO_REINDEX = "txToReindex";
|
||||
private static final String ACTION_ACL_CHANGE_SET_TO_REINDEX = "aclChangeSetToReindex";
|
||||
|
||||
private SolrTrackerScheduler scheduler;
|
||||
private TrackerRegistry trackerRegistry;
|
||||
private ConcurrentHashMap<String, InformationServer> informationServers;
|
||||
TrackerRegistry trackerRegistry;
|
||||
ConcurrentHashMap<String, InformationServer> informationServers;
|
||||
|
||||
private static List<String> CORE_PARAMETER_NAMES = asList(CoreAdminParams.CORE, "coreName", "index");
|
||||
private final static List<String> CORE_PARAMETER_NAMES = asList(CoreAdminParams.CORE, "coreName", "index");
|
||||
|
||||
public AlfrescoCoreAdminHandler()
|
||||
{
|
||||
@@ -938,8 +960,7 @@ public class AlfrescoCoreAdminHandler extends CoreAdminHandler
|
||||
*
|
||||
* Synchronous execution
|
||||
*
|
||||
* @param req Query Request without parameters
|
||||
* - coreName, optional, the name of the core to be checked
|
||||
* @param cname, optional, the name of the core to be checked
|
||||
*
|
||||
* @return Response including the action result:
|
||||
* - status: success, when the core has been created
|
||||
@@ -1366,7 +1387,6 @@ public class AlfrescoCoreAdminHandler extends CoreAdminHandler
|
||||
* - fromAclTx, optional: from ACL transaction Id to filter report results
|
||||
* - toCalTx, optional: to ACL transaction Id to filter report results
|
||||
*
|
||||
* @param Response including the action result:
|
||||
* - report.core: multiple Objects with the details of the report ("core" is the name of the Core)
|
||||
*
|
||||
* @throws JSONException
|
||||
@@ -1497,7 +1517,6 @@ public class AlfrescoCoreAdminHandler extends CoreAdminHandler
|
||||
*
|
||||
* @param params Query Request with following parameters:
|
||||
* - core, optional: The name of the SOLR Core
|
||||
* @param rsp Query Response including the action result:
|
||||
* - action.status: scheduled, as it will be executed by Trackers on the next maintenance operation
|
||||
* - core: list of Document Ids with error that are going to reindexed
|
||||
*/
|
||||
@@ -1594,66 +1613,140 @@ public class AlfrescoCoreAdminHandler extends CoreAdminHandler
|
||||
* - action.status: scheduled, as it will be executed by Trackers on the next maintenance operation
|
||||
* - txToReindex: list of Transaction Ids that are going to be reindexed
|
||||
* - aclChangeSetToReindex: list of ACL Change Set Ids that are going to be reindexed
|
||||
* @throws JSONException
|
||||
*/
|
||||
private NamedList<Object> actionFIX(SolrParams params) throws JSONException
|
||||
NamedList<Object> actionFIX(SolrParams params) throws JSONException
|
||||
{
|
||||
String requestedCoreName = coreName(params);
|
||||
|
||||
var wrapper = new Object()
|
||||
{
|
||||
NamedList<Object> response = new SimpleOrderedMap<>();;
|
||||
final NamedList<Object> response = new SimpleOrderedMap<>();
|
||||
};
|
||||
|
||||
if (isNullOrEmpty(requestedCoreName))
|
||||
{
|
||||
return wrapper.response;
|
||||
}
|
||||
|
||||
if (!coreNames().contains(requestedCoreName))
|
||||
{
|
||||
wrapper.response.add(ACTION_ERROR_MESSAGE_LABEL, UNKNOWN_CORE_MESSAGE + requestedCoreName);
|
||||
return wrapper.response;
|
||||
}
|
||||
|
||||
if (!isMasterOrStandalone(requestedCoreName)) {
|
||||
wrapper.response.add(ACTION_ERROR_MESSAGE_LABEL, UNPROCESSABLE_REQUEST_ON_SLAVE_NODES);
|
||||
return wrapper.response;
|
||||
}
|
||||
|
||||
Long fromTxCommitTime = params.getLong(FROM_TX_COMMIT_TIME_PARAMETER_NAME);
|
||||
Long toTxCommitTime = params.getLong(TO_TX_COMMIT_TIME_PARAMETER_NAME);
|
||||
boolean dryRun = params.getBool(DRY_RUN_PARAMETER_NAME, true);
|
||||
int maxTransactionsToSchedule = getMaxTransactionToSchedule(params);
|
||||
|
||||
LOGGER.debug("FIX Admin request on core {}, parameters: " +
|
||||
FROM_TX_COMMIT_TIME_PARAMETER_NAME + " = {}, " +
|
||||
TO_TX_COMMIT_TIME_PARAMETER_NAME + " = {}, " +
|
||||
DRY_RUN_PARAMETER_NAME + " = {}, " +
|
||||
MAX_TRANSACTIONS_TO_SCHEDULE_PARAMETER_NAME + " = {}",
|
||||
requestedCoreName,
|
||||
ofNullable(fromTxCommitTime).map(Object::toString).orElse("N.A."),
|
||||
ofNullable(toTxCommitTime).map(Object::toString).orElse("N.A."),
|
||||
dryRun,
|
||||
maxTransactionsToSchedule);
|
||||
|
||||
coreNames().stream()
|
||||
.filter(coreName -> requestedCoreName == null || coreName.equals(requestedCoreName))
|
||||
.filter(this::isMasterOrStandalone)
|
||||
.forEach(coreName -> {
|
||||
wrapper.response.add(coreName, fixOnSpecificCore(coreName));
|
||||
});
|
||||
.forEach(coreName ->
|
||||
wrapper.response.add(
|
||||
coreName,
|
||||
fixOnSpecificCore(coreName, fromTxCommitTime, toTxCommitTime, dryRun, maxTransactionsToSchedule)));
|
||||
|
||||
if (wrapper.response.size() > 0)
|
||||
{
|
||||
wrapper.response.add(DRY_RUN_PARAMETER_NAME, dryRun);
|
||||
|
||||
ofNullable(fromTxCommitTime).ifPresent(value -> wrapper.response.add(FROM_TX_COMMIT_TIME_PARAMETER_NAME, value));
|
||||
ofNullable(toTxCommitTime).ifPresent(value -> wrapper.response.add(TO_TX_COMMIT_TIME_PARAMETER_NAME, value));
|
||||
|
||||
wrapper.response.add(MAX_TRANSACTIONS_TO_SCHEDULE_PARAMETER_NAME, maxTransactionsToSchedule);
|
||||
wrapper.response.add(ACTION_STATUS_LABEL, dryRun ? ACTION_STATUS_NOT_SCHEDULED : ACTION_STATUS_SCHEDULED);
|
||||
}
|
||||
|
||||
wrapper.response.add(ACTION_STATUS_LABEL, ACTION_STATUS_SCHEDULED);
|
||||
return wrapper.response;
|
||||
}
|
||||
|
||||
private NamedList<Object> fixOnSpecificCore(String coreName)
|
||||
/**
|
||||
* Detects the transactions that need a FIX (i.e. reindexing) because the following reasons:
|
||||
*
|
||||
* <ul>
|
||||
* <li>A transaction is in the index but not in repository</li>
|
||||
* <li>A transaction is duplicated in the index</li>
|
||||
* <li>A transaction is missing in the index</li>
|
||||
* </ul>
|
||||
*
|
||||
* Depending on the dryRun parameter, other than collecting, this method could also schedule the transactions for
|
||||
* reindexing.
|
||||
*
|
||||
* @param coreName the target core name.
|
||||
* @param fromTxCommitTime the start commit time we consider for collecting transaction.
|
||||
* @param toTxCommitTime the end commit time we consider for collecting transaction.
|
||||
* @param dryRun a flag indicating if the collected transactions must be actually scheduled for reindexing.
|
||||
* @param maxTransactionsToSchedule the maximum number of transactions to be scheduled for reindexing.
|
||||
* @return a report about transactions that need to be fixed.
|
||||
*/
|
||||
NamedList<Object> fixOnSpecificCore(String coreName, Long fromTxCommitTime, Long toTxCommitTime, boolean dryRun, int maxTransactionsToSchedule)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Gets Metadata health and fixes any problems
|
||||
MetadataTracker metadataTracker = trackerRegistry.getTrackerForCore(coreName, MetadataTracker.class);
|
||||
IndexHealthReport indexHealthReport = metadataTracker.checkIndex(null, null, null);
|
||||
IOpenBitSet toReindex = indexHealthReport.getTxInIndexButNotInDb();
|
||||
toReindex.or(indexHealthReport.getDuplicatedTxInIndex());
|
||||
toReindex.or(indexHealthReport.getMissingTxFromIndex());
|
||||
long current = -1;
|
||||
// Goes through problems in the index
|
||||
Set<Long> txToReindex = new TreeSet<>();
|
||||
while ((current = toReindex.nextSetBit(current + 1)) != -1)
|
||||
{
|
||||
metadataTracker.addTransactionToReindex(current);
|
||||
txToReindex.add(current);
|
||||
}
|
||||
final IndexHealthReport metadataTrackerIndexHealthReport =
|
||||
metadataTracker.checkIndex(null, fromTxCommitTime, toTxCommitTime);
|
||||
|
||||
LOGGER.debug("FIX Admin action built the MetadataTracker Index Health Report on core {}, parameters: " +
|
||||
FROM_TX_COMMIT_TIME_PARAMETER_NAME + " = {}, " +
|
||||
TO_TX_COMMIT_TIME_PARAMETER_NAME + " = {}, " +
|
||||
DRY_RUN_PARAMETER_NAME + " = {}, " +
|
||||
MAX_TRANSACTIONS_TO_SCHEDULE_PARAMETER_NAME + " = {}",
|
||||
coreName,
|
||||
ofNullable(fromTxCommitTime).map(Object::toString).orElse("N.A."),
|
||||
ofNullable(toTxCommitTime).map(Object::toString).orElse("N.A."),
|
||||
dryRun,
|
||||
maxTransactionsToSchedule);
|
||||
|
||||
// Gets the Acl health and fixes any problems
|
||||
AclTracker aclTracker = trackerRegistry.getTrackerForCore(coreName, AclTracker.class);
|
||||
indexHealthReport = aclTracker.checkIndex(null, null, null);
|
||||
toReindex = indexHealthReport.getAclTxInIndexButNotInDb();
|
||||
toReindex.or(indexHealthReport.getDuplicatedAclTxInIndex());
|
||||
toReindex.or(indexHealthReport.getMissingAclTxFromIndex());
|
||||
current = -1;
|
||||
// Goes through the problems in the index
|
||||
Set<Long> aclChangeSetToReindex = new TreeSet<>();
|
||||
while ((current = toReindex.nextSetBit(current + 1)) != -1)
|
||||
{
|
||||
aclTracker.addAclChangeSetToReindex(current);
|
||||
aclChangeSetToReindex.add(current);
|
||||
}
|
||||
final IndexHealthReport aclTrackerIndexHealthReport =
|
||||
aclTracker.checkIndex(null, fromTxCommitTime, toTxCommitTime);
|
||||
|
||||
LOGGER.debug("FIX Admin action built the AclTracker Index Health Report on core {}, parameters: " +
|
||||
FROM_TX_COMMIT_TIME_PARAMETER_NAME + " = {}, " +
|
||||
TO_TX_COMMIT_TIME_PARAMETER_NAME + " = {}, " +
|
||||
DRY_RUN_PARAMETER_NAME + " = {}, " +
|
||||
MAX_TRANSACTIONS_TO_SCHEDULE_PARAMETER_NAME + " = {}",
|
||||
coreName,
|
||||
ofNullable(fromTxCommitTime).map(Object::toString).orElse("N.A."),
|
||||
ofNullable(toTxCommitTime).map(Object::toString).orElse("N.A."),
|
||||
dryRun,
|
||||
maxTransactionsToSchedule);
|
||||
|
||||
NamedList<Object> response = new SimpleOrderedMap<>();
|
||||
response.add(ACTION_TX_TO_REINDEX, txToReindex);
|
||||
response.add(ACTION_ACL_CHANGE_SET_TO_REINDEX, aclChangeSetToReindex);
|
||||
return response;
|
||||
response.add(ACTION_TX_TO_REINDEX,
|
||||
txToReindex(
|
||||
coreName,
|
||||
metadataTracker,
|
||||
metadataTrackerIndexHealthReport,
|
||||
dryRun ? txid -> {} : metadataTracker::addTransactionToReindex,
|
||||
maxTransactionsToSchedule));
|
||||
|
||||
response.add(ACTION_ACL_CHANGE_SET_TO_REINDEX,
|
||||
aclTxToReindex(
|
||||
coreName,
|
||||
aclTracker,
|
||||
aclTrackerIndexHealthReport,
|
||||
dryRun ? txid -> {} : aclTracker::addAclChangeSetToReindex,
|
||||
maxTransactionsToSchedule));
|
||||
return response;
|
||||
}
|
||||
catch(Exception exception)
|
||||
{
|
||||
@@ -1661,6 +1754,153 @@ public class AlfrescoCoreAdminHandler extends CoreAdminHandler
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Detects the transactions that need a FIX (i.e. reindexing) because the following reasons:
|
||||
*
|
||||
* <ul>
|
||||
* <li>A transaction is in the index but not in repository</li>
|
||||
* <li>A transaction is duplicated in the index</li>
|
||||
* <li>A transaction is missing in the index</li>
|
||||
* </ul>
|
||||
*
|
||||
* Note: the method, as a side effect, could also schedule the detected transactions for reindexing.
|
||||
* That is controlled by the scheduler input param (which is directly connected with the FIX tool "dryRun" parameter).
|
||||
*
|
||||
* @param coreName the target core name.
|
||||
* @param tracker the {@link MetadataTracker} instance associated with the target core.
|
||||
* @param report the index healt report produced by the tracker.
|
||||
* @param scheduler the controller which manages the actual transaction scheduling.
|
||||
* @param maxTransactionsToSchedule the maximum number of transactions to schedule for reindexing.
|
||||
* @return a report which includes the transactions that need a reindexing.
|
||||
* @see <a href="https://issues.alfresco.com/jira/browse/SEARCH-2233">SEARCH-2233</a>
|
||||
* @see <a href="https://issues.alfresco.com/jira/browse/SEARCH-2248">SEARCH-2248</a>
|
||||
*/
|
||||
NamedList<Object> txToReindex(
|
||||
String coreName,
|
||||
MetadataTracker tracker,
|
||||
final IndexHealthReport report,
|
||||
Consumer<Long> scheduler,
|
||||
int maxTransactionsToSchedule)
|
||||
{
|
||||
final AtomicInteger globalLimit = new AtomicInteger(maxTransactionsToSchedule);
|
||||
|
||||
final LongToIntFunction retrieveTransactionRelatedNodesCountFromRepository =
|
||||
txid -> notNullOrEmpty(tracker.getFullNodesForDbTransaction(txid)).size();
|
||||
|
||||
final LongToIntFunction retrieveTransactionRelatedNodesCountFromIndex =
|
||||
txid -> of(getInformationServers().get(coreName))
|
||||
.map(SolrInformationServer.class::cast)
|
||||
.map(server -> server.getDocListSize(FIELD_INTXID + ":" + txid))
|
||||
.orElse(0);
|
||||
|
||||
NamedList<Object> txToReindex = new SimpleOrderedMap<>();
|
||||
txToReindex.add(TX_IN_INDEX_NOT_IN_DB,
|
||||
manageTransactionsToBeFixed(
|
||||
report.getTxInIndexButNotInDb(),
|
||||
retrieveTransactionRelatedNodesCountFromIndex,
|
||||
scheduler,
|
||||
globalLimit));
|
||||
|
||||
txToReindex.add(DUPLICATED_TX_IN_INDEX,
|
||||
manageTransactionsToBeFixed(
|
||||
report.getDuplicatedTxInIndex(),
|
||||
retrieveTransactionRelatedNodesCountFromIndex,
|
||||
scheduler,
|
||||
globalLimit));
|
||||
|
||||
txToReindex.add(MISSING_TX_IN_INDEX,
|
||||
manageTransactionsToBeFixed(
|
||||
report.getMissingTxFromIndex(),
|
||||
retrieveTransactionRelatedNodesCountFromRepository,
|
||||
scheduler,
|
||||
globalLimit));
|
||||
return txToReindex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Detects the ACL transactions that need a FIX (i.e. reindexing) because the following reasons:
|
||||
*
|
||||
* <ul>
|
||||
* <li>A transaction is in the index but not in repository</li>
|
||||
* <li>A transaction is duplicated in the index</li>
|
||||
* <li>A transaction is missing in the index</li>
|
||||
* </ul>
|
||||
*
|
||||
* This method is almost the same as {@link #txToReindex(String, MetadataTracker, IndexHealthReport, Consumer, int)}.
|
||||
* The main difference is the target tracker ({@link AclTracker} in this case, instead of {@link MetadataTracker}).
|
||||
*
|
||||
* Note: the method, as a side effect, could also schedule the detected transactions for reindexing.
|
||||
* That is controlled by the scheduler input param (which is directly connected with the FIX tool "dryRun" parameter).
|
||||
*
|
||||
* @param coreName the target core name.
|
||||
* @param tracker the {@link AclTracker} instance associated with the target core.
|
||||
* @param report the index healt report produced by the tracker.
|
||||
* @param scheduler the controller which manages the actual transaction scheduling.
|
||||
* @return a report which includes the transactions that need a reindexing.
|
||||
* @see <a href="https://issues.alfresco.com/jira/browse/SEARCH-2233">SEARCH-2233</a>
|
||||
* @see <a href="https://issues.alfresco.com/jira/browse/SEARCH-2248">SEARCH-2248</a>
|
||||
*/
|
||||
NamedList<Object> aclTxToReindex(
|
||||
String coreName,
|
||||
AclTracker tracker,
|
||||
final IndexHealthReport report,
|
||||
Consumer<Long> scheduler,
|
||||
int maxTransactionsToSchedule)
|
||||
{
|
||||
final AtomicInteger globalLimit = new AtomicInteger(maxTransactionsToSchedule);
|
||||
|
||||
final LongToIntFunction retrieveAclTransactionRelatedNodesCountFromRepository =
|
||||
txid -> notNullOrEmpty(tracker.getAclsForDbAclTransaction(txid)).size();
|
||||
|
||||
final LongToIntFunction retrieveAclTransactionRelatedNodesCountFromIndex =
|
||||
txid -> of(getInformationServers().get(coreName))
|
||||
.map(SolrInformationServer.class::cast)
|
||||
.map(server -> server.getDocListSize(FIELD_INACLTXID + ":" + txid))
|
||||
.orElse(0);
|
||||
|
||||
NamedList<Object> aclTxToReindex = new SimpleOrderedMap<>();
|
||||
aclTxToReindex.add(ACL_TX_IN_INDEX_NOT_IN_DB,
|
||||
manageTransactionsToBeFixed(
|
||||
report.getAclTxInIndexButNotInDb(),
|
||||
retrieveAclTransactionRelatedNodesCountFromIndex,
|
||||
scheduler,
|
||||
globalLimit));
|
||||
|
||||
aclTxToReindex.add(DUPLICATED_ACL_TX_IN_INDEX,
|
||||
manageTransactionsToBeFixed(
|
||||
report.getDuplicatedAclTxInIndex(),
|
||||
retrieveAclTransactionRelatedNodesCountFromIndex,
|
||||
scheduler,
|
||||
globalLimit));
|
||||
|
||||
aclTxToReindex.add(MISSING_ACL_TX_IN_INDEX,
|
||||
manageTransactionsToBeFixed(
|
||||
report.getMissingAclTxFromIndex(),
|
||||
retrieveAclTransactionRelatedNodesCountFromRepository,
|
||||
scheduler,
|
||||
globalLimit));
|
||||
|
||||
return aclTxToReindex;
|
||||
}
|
||||
|
||||
NamedList<Object> manageTransactionsToBeFixed(
|
||||
IOpenBitSet transactions,
|
||||
LongToIntFunction nodesCounter,
|
||||
Consumer<Long> scheduler,
|
||||
AtomicInteger limit)
|
||||
{
|
||||
final NamedList<Object> transactionsList = new SimpleOrderedMap<>();
|
||||
|
||||
long txid = -1;
|
||||
while ((txid = transactions.nextSetBit(txid + 1)) != -1 && limit.decrementAndGet() >= 0)
|
||||
{
|
||||
transactionsList.add(String.valueOf(txid), nodesCounter.applyAsInt(txid));
|
||||
scheduler.accept(txid);
|
||||
}
|
||||
|
||||
return transactionsList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get detailed report for a core or for every core including information
|
||||
* related with handlers and trackers.
|
||||
@@ -1830,4 +2070,18 @@ public class AlfrescoCoreAdminHandler extends CoreAdminHandler
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
int getMaxTransactionToSchedule(SolrParams params)
|
||||
{
|
||||
String requestedCoreName = coreName(params);
|
||||
return ofNullable(params.getInt(MAX_TRANSACTIONS_TO_SCHEDULE_PARAMETER_NAME))
|
||||
.orElseGet(() ->
|
||||
ofNullable(coreContainer)
|
||||
.map(container -> container.getCore(requestedCoreName))
|
||||
.map(SolrCore::getResourceLoader)
|
||||
.map(SolrResourceLoader::getCoreProperties)
|
||||
.map(conf -> conf.getProperty(MAX_TRANSACTIONS_TO_SCHEDULE_CONF_PROPERTY_NAME))
|
||||
.map(Integer::parseInt)
|
||||
.orElse(Integer.MAX_VALUE)); // Last fallback if we don't have a request param and a value in configuration
|
||||
}
|
||||
}
|
@@ -3632,7 +3632,7 @@ public class SolrInformationServer implements InformationServer
|
||||
return (NamedList) facetFields.get(field);
|
||||
}
|
||||
|
||||
private int getDocListSize(String query)
|
||||
public int getDocListSize(String query)
|
||||
{
|
||||
try (SolrQueryRequest request = this.newSolrQueryRequest())
|
||||
{
|
||||
|
@@ -11,6 +11,11 @@ enable.alfresco.tracking=true
|
||||
#data.dir.store=workspace/SpacesStore
|
||||
#alfresco.stores=workspace://SpacesStore
|
||||
|
||||
# Max transaction to schedule for reindexing in the admin FIX tool.
|
||||
# If the admin fix action is called with a "maxScheduledTransactions" request parameter
|
||||
# then that is used (i.e. the value configured in this file is ignored).
|
||||
alfresco.admin.fix.maxScheduledTransactions=500
|
||||
|
||||
#
|
||||
# Properties loaded during alfresco tracking
|
||||
#
|
||||
|
@@ -0,0 +1,512 @@
|
||||
/*
|
||||
* #%L
|
||||
* Alfresco Search Services
|
||||
* %%
|
||||
* Copyright (C) 2005 - 2020 Alfresco Software Limited
|
||||
* %%
|
||||
* This file is part of the Alfresco software.
|
||||
* If the software was purchased under a paid Alfresco license, the terms of
|
||||
* the paid license agreement will prevail. Otherwise, the software is
|
||||
* provided under the following open source license terms:
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
* #L%
|
||||
*/
|
||||
|
||||
package org.alfresco.solr;
|
||||
|
||||
import org.alfresco.solr.adapters.IOpenBitSet;
|
||||
import org.alfresco.solr.adapters.SolrOpenBitSetAdapter;
|
||||
import org.alfresco.solr.tracker.AclTracker;
|
||||
import org.alfresco.solr.tracker.IndexHealthReport;
|
||||
import org.alfresco.solr.tracker.MetadataTracker;
|
||||
import org.alfresco.solr.tracker.TrackerRegistry;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static java.util.Optional.of;
|
||||
import static java.util.stream.IntStream.range;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.ACL_TX_IN_INDEX_NOT_IN_DB;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.ACTION_ERROR_MESSAGE_LABEL;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.ACTION_STATUS_LABEL;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.ACTION_STATUS_NOT_SCHEDULED;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.ALFRESCO_CORE_NAME;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.ARCHIVE_CORE_NAME;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.DRY_RUN_PARAMETER_NAME;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.DUPLICATED_ACL_TX_IN_INDEX;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.DUPLICATED_TX_IN_INDEX;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.FROM_TX_COMMIT_TIME_PARAMETER_NAME;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.MAX_TRANSACTIONS_TO_SCHEDULE_PARAMETER_NAME;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.MAX_TRANSACTIONS_TO_SCHEDULE_CONF_PROPERTY_NAME;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.MISSING_ACL_TX_IN_INDEX;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.MISSING_TX_IN_INDEX;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.TO_TX_COMMIT_TIME_PARAMETER_NAME;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.TX_IN_INDEX_NOT_IN_DB;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.UNKNOWN_CORE_MESSAGE;
|
||||
import static org.alfresco.solr.AlfrescoCoreAdminHandler.UNPROCESSABLE_REQUEST_ON_SLAVE_NODES;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CORE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class AlfrescoCoreAdminHandlerTest
|
||||
{
|
||||
private AlfrescoCoreAdminHandler admin;
|
||||
|
||||
@Mock
|
||||
TrackerRegistry registry;
|
||||
|
||||
private ModifiableSolrParams params;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
admin = new AlfrescoCoreAdminHandler();
|
||||
admin.trackerRegistry = registry;
|
||||
when(registry.getCoreNames()).thenReturn(Set.of(ALFRESCO_CORE_NAME, ARCHIVE_CORE_NAME));
|
||||
|
||||
params = new ModifiableSolrParams();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noTargetCoreInParams()
|
||||
{
|
||||
assertEquals(0, params.size());
|
||||
|
||||
NamedList<Object> actionResponse = admin.actionFIX(params);
|
||||
assertEquals(0, actionResponse.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void unknownTargetCoreInParams()
|
||||
{
|
||||
String invalidCoreName = "thisIsAnInvalidOrAtLeastUnknownCoreName";
|
||||
params.set(CORE, invalidCoreName);
|
||||
|
||||
NamedList<Object> actionResponse = admin.actionFIX(params);
|
||||
assertEquals( 1, actionResponse.size());
|
||||
assertEquals(UNKNOWN_CORE_MESSAGE + invalidCoreName, actionResponse.get(ACTION_ERROR_MESSAGE_LABEL));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fixOnSlaveNodeHasNoEffect()
|
||||
{
|
||||
params.set(CORE, ALFRESCO_CORE_NAME);
|
||||
|
||||
assertFalse(admin.isMasterOrStandalone(ALFRESCO_CORE_NAME));
|
||||
|
||||
NamedList<Object> actionResponse = admin.actionFIX(params);
|
||||
assertEquals( 1, actionResponse.size());
|
||||
assertEquals(UNPROCESSABLE_REQUEST_ON_SLAVE_NODES, actionResponse.get(ACTION_ERROR_MESSAGE_LABEL));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxTransactionScheduledParameterIsNotNull()
|
||||
{
|
||||
int expectedMaxTransactionToSchedule = 12876;
|
||||
|
||||
params.set(CORE, ALFRESCO_CORE_NAME);
|
||||
params.set(MAX_TRANSACTIONS_TO_SCHEDULE_PARAMETER_NAME, expectedMaxTransactionToSchedule);
|
||||
|
||||
admin = new AlfrescoCoreAdminHandler();
|
||||
|
||||
assertEquals(expectedMaxTransactionToSchedule, admin.getMaxTransactionToSchedule(params));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxTransactionScheduledIsNull_shouldBeGatheredFromCoreProperties()
|
||||
{
|
||||
params.set(CORE, ALFRESCO_CORE_NAME);
|
||||
|
||||
int expectedMaxTransactionToSchedule = 17892;
|
||||
Properties coreProperties = new Properties();
|
||||
coreProperties.setProperty(
|
||||
MAX_TRANSACTIONS_TO_SCHEDULE_CONF_PROPERTY_NAME,
|
||||
String.valueOf(expectedMaxTransactionToSchedule));
|
||||
|
||||
CoreContainer coreContainer = mock(CoreContainer.class);
|
||||
SolrCore core = mock(SolrCore.class);
|
||||
SolrResourceLoader resourceLoader = mock(SolrResourceLoader.class);
|
||||
when(coreContainer.getCore(ALFRESCO_CORE_NAME)).thenReturn(core);
|
||||
when(core.getResourceLoader()).thenReturn(resourceLoader);
|
||||
when(resourceLoader.getCoreProperties()).thenReturn(coreProperties);
|
||||
|
||||
admin = new AlfrescoCoreAdminHandler(coreContainer);
|
||||
|
||||
assertEquals(expectedMaxTransactionToSchedule, admin.getMaxTransactionToSchedule(params));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxTransactionScheduledParameterAndConfigurationIsNull_shouldGetTheHardCodedDefault()
|
||||
{
|
||||
params.set(CORE, ALFRESCO_CORE_NAME);
|
||||
|
||||
Properties coreProperties = new Properties();
|
||||
|
||||
CoreContainer coreContainer = mock(CoreContainer.class);
|
||||
SolrCore core = mock(SolrCore.class);
|
||||
SolrResourceLoader resourceLoader = mock(SolrResourceLoader.class);
|
||||
when(coreContainer.getCore(ALFRESCO_CORE_NAME)).thenReturn(core);
|
||||
when(core.getResourceLoader()).thenReturn(resourceLoader);
|
||||
when(resourceLoader.getCoreProperties()).thenReturn(coreProperties);
|
||||
|
||||
admin = new AlfrescoCoreAdminHandler(coreContainer);
|
||||
|
||||
assertEquals(Integer.MAX_VALUE, admin.getMaxTransactionToSchedule(params));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void masterOrStandaloneNode_implicitDryRunParameterIsEchoed()
|
||||
{
|
||||
admin = new AlfrescoCoreAdminHandler() {
|
||||
@Override
|
||||
NamedList<Object> fixOnSpecificCore(
|
||||
String coreName,
|
||||
Long fromTxCommitTime,
|
||||
Long toTxCommitTime,
|
||||
boolean dryRun,
|
||||
int maxTransactionsToSchedule) {
|
||||
return new NamedList<>(); // dummy entry
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isMasterOrStandalone(String coreName)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
admin.trackerRegistry = registry;
|
||||
|
||||
params.set(CORE, ALFRESCO_CORE_NAME);
|
||||
|
||||
NamedList<Object> actionResponse = admin.actionFIX(params);
|
||||
assertEquals(true, actionResponse.get(DRY_RUN_PARAMETER_NAME));
|
||||
assertEquals(ACTION_STATUS_NOT_SCHEDULED, actionResponse.get(ACTION_STATUS_LABEL));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void masterOrStandaloneNode_explicitDryRunParameterIsEchoed()
|
||||
{
|
||||
assertThatExplicitParameterIsEchoed(
|
||||
DRY_RUN_PARAMETER_NAME,
|
||||
true);
|
||||
|
||||
assertThatExplicitParameterIsEchoed(
|
||||
DRY_RUN_PARAMETER_NAME,
|
||||
false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void masterOrStandaloneNode_explicitFromCommitTimeParameterIsEchoed()
|
||||
{
|
||||
assertThatExplicitParameterIsEchoed(
|
||||
FROM_TX_COMMIT_TIME_PARAMETER_NAME,
|
||||
System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void masterOrStandaloneNode_explicitToCommitTimeParameterIsEchoed()
|
||||
{
|
||||
assertThatExplicitParameterIsEchoed(
|
||||
TO_TX_COMMIT_TIME_PARAMETER_NAME,
|
||||
System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void masterOrStandaloneNode_explicitMaxTransactionsToScheduleParameterIsEchoed()
|
||||
{
|
||||
assertThatExplicitParameterIsEchoed(
|
||||
MAX_TRANSACTIONS_TO_SCHEDULE_PARAMETER_NAME,
|
||||
Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void manageTransactionsToBeFixed_shouldRespectTheInputGlobalLimit()
|
||||
{
|
||||
AtomicInteger limit = new AtomicInteger(4);
|
||||
AtomicInteger transactionCount = new AtomicInteger();
|
||||
|
||||
IOpenBitSet transactions = new SolrOpenBitSetAdapter();
|
||||
range(1, 7).forEach(transactions::set);
|
||||
|
||||
Consumer<Long> counter = tx -> transactionCount.incrementAndGet();
|
||||
admin.manageTransactionsToBeFixed(transactions, tx -> 0, counter, limit);
|
||||
|
||||
assertEquals(-1, limit.get());
|
||||
assertEquals(4, transactionCount.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void subsequentInvocationsToManageTransactionsToBeFixed_shouldRespectTheInputGlobalLimit()
|
||||
{
|
||||
// Limit is set to max 13 transactions
|
||||
AtomicInteger limit = new AtomicInteger(13);
|
||||
AtomicInteger transactionCount = new AtomicInteger();
|
||||
|
||||
// First transaction set contains 6 transactions
|
||||
IOpenBitSet firstTransactionSet = new SolrOpenBitSetAdapter();
|
||||
range(1, 7).forEach(firstTransactionSet::set);
|
||||
|
||||
Consumer<Long> counter = tx -> transactionCount.incrementAndGet();
|
||||
admin.manageTransactionsToBeFixed(firstTransactionSet, tx -> 0, counter, limit);
|
||||
|
||||
assertEquals(7, limit.get());
|
||||
assertEquals(6, transactionCount.get());
|
||||
|
||||
// Second transaction set contains 9 transactions (more than the remaining transactions to process)
|
||||
IOpenBitSet secondTransactionSet = new SolrOpenBitSetAdapter();
|
||||
range(10, 21).forEach(secondTransactionSet::set);
|
||||
|
||||
admin.manageTransactionsToBeFixed(secondTransactionSet, tx -> 0, counter, limit);
|
||||
|
||||
assertEquals("Global transaction limit should have been exceeded", -1, limit.get());
|
||||
assertEquals(13, transactionCount.get());
|
||||
|
||||
// Third transaction set contains 10 transactions, it should be completely ignored as we already exceeded the
|
||||
// global limit above
|
||||
IOpenBitSet thirdTransactionSet = new SolrOpenBitSetAdapter();
|
||||
range(31, 42).forEach(thirdTransactionSet::set);
|
||||
|
||||
Consumer<Long> thisShoulndtBeInvoked = tx -> { throw new RuntimeException("We should never be here, as the global limit has been already exceeded."); };
|
||||
|
||||
admin.manageTransactionsToBeFixed(thirdTransactionSet, tx -> 0, thisShoulndtBeInvoked, limit);
|
||||
|
||||
assertEquals( -2, limit.get());
|
||||
assertEquals(13, transactionCount.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noAclTransactionToReindex_shouldReturnAnEmptyResponse()
|
||||
{
|
||||
IndexHealthReport emptyReport = mock(IndexHealthReport.class);
|
||||
when(emptyReport.getAclTxInIndexButNotInDb()).thenReturn(new SolrOpenBitSetAdapter());
|
||||
when(emptyReport.getDuplicatedAclTxInIndex()).thenReturn(new SolrOpenBitSetAdapter());
|
||||
when(emptyReport.getMissingAclTxFromIndex()).thenReturn(new SolrOpenBitSetAdapter());
|
||||
|
||||
NamedList<Object> subReport = admin.aclTxToReindex(ALFRESCO_CORE_NAME, mock(AclTracker.class), emptyReport, tx -> {}, Integer.MAX_VALUE);
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(0L),
|
||||
of(subReport.get(ACL_TX_IN_INDEX_NOT_IN_DB))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(ACL_TX_IN_INDEX_NOT_IN_DB + " section not found in response.")));
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(0L),
|
||||
of(subReport.get(DUPLICATED_ACL_TX_IN_INDEX))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(DUPLICATED_ACL_TX_IN_INDEX + " section not found in response.")));
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(0L),
|
||||
of(subReport.get(MISSING_ACL_TX_IN_INDEX))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(MISSING_ACL_TX_IN_INDEX + " section not found in response.")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noTransactionToReindex_shouldReturnAnEmptyResponse()
|
||||
{
|
||||
IndexHealthReport emptyReport = mock(IndexHealthReport.class);
|
||||
when(emptyReport.getTxInIndexButNotInDb()).thenReturn(new SolrOpenBitSetAdapter());
|
||||
when(emptyReport.getDuplicatedTxInIndex()).thenReturn(new SolrOpenBitSetAdapter());
|
||||
when(emptyReport.getMissingTxFromIndex()).thenReturn(new SolrOpenBitSetAdapter());
|
||||
|
||||
NamedList<Object> subReport = admin.txToReindex(ALFRESCO_CORE_NAME, mock(MetadataTracker.class), emptyReport, tx -> {}, Integer.MAX_VALUE);
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(0L),
|
||||
of(subReport.get(TX_IN_INDEX_NOT_IN_DB))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(TX_IN_INDEX_NOT_IN_DB + " section not found in response.")));
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(0L),
|
||||
of(subReport.get(DUPLICATED_TX_IN_INDEX))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(DUPLICATED_TX_IN_INDEX + " section not found in response.")));
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(0L),
|
||||
of(subReport.get(MISSING_TX_IN_INDEX))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(MISSING_TX_IN_INDEX + " section not found in response.")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxTransactionsGlobalLimitShouldBeAppliedInCascade()
|
||||
{
|
||||
SolrInformationServer server = mock(SolrInformationServer.class);
|
||||
when(server.getDocListSize(anyString())).thenReturn(0);
|
||||
|
||||
ConcurrentHashMap<String, InformationServer> informationServers = new ConcurrentHashMap<>();
|
||||
informationServers.put(ALFRESCO_CORE_NAME, server);
|
||||
admin.informationServers = informationServers;
|
||||
|
||||
IOpenBitSet txInIndexButNotInDb = new SolrOpenBitSetAdapter();
|
||||
IOpenBitSet duplicatedTxInIndex = new SolrOpenBitSetAdapter();
|
||||
IOpenBitSet missingTxFromIndex = new SolrOpenBitSetAdapter();
|
||||
range(1, 10).forEach(txInIndexButNotInDb::set);
|
||||
range(21, 32).forEach(duplicatedTxInIndex::set);
|
||||
range(50, 61).forEach(missingTxFromIndex::set);
|
||||
|
||||
int maxTransactionToSchedule = (int) (txInIndexButNotInDb.cardinality() +
|
||||
duplicatedTxInIndex.cardinality() +
|
||||
missingTxFromIndex.cardinality() -
|
||||
5);
|
||||
|
||||
IndexHealthReport emptyReport = mock(IndexHealthReport.class);
|
||||
when(emptyReport.getTxInIndexButNotInDb()).thenReturn(txInIndexButNotInDb);
|
||||
when(emptyReport.getDuplicatedTxInIndex()).thenReturn(duplicatedTxInIndex);
|
||||
when(emptyReport.getMissingTxFromIndex()).thenReturn(missingTxFromIndex);
|
||||
|
||||
NamedList<Object> subReport = admin.txToReindex(ALFRESCO_CORE_NAME, mock(MetadataTracker.class), emptyReport, tx -> {}, maxTransactionToSchedule);
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(txInIndexButNotInDb.cardinality()),
|
||||
of(subReport.get(TX_IN_INDEX_NOT_IN_DB))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(TX_IN_INDEX_NOT_IN_DB + " section not found in response.")));
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(duplicatedTxInIndex.cardinality()),
|
||||
of(subReport.get(DUPLICATED_TX_IN_INDEX))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(DUPLICATED_TX_IN_INDEX + " section not found in response.")));
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(missingTxFromIndex.cardinality() - 5),
|
||||
of(subReport.get(MISSING_TX_IN_INDEX))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(MISSING_TX_IN_INDEX + " section not found in response.")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxAclTransactionsGlobalLimitShouldBeAppliedInCascade()
|
||||
{
|
||||
SolrInformationServer server = mock(SolrInformationServer.class);
|
||||
when(server.getDocListSize(anyString())).thenReturn(0);
|
||||
|
||||
ConcurrentHashMap<String, InformationServer> informationServers = new ConcurrentHashMap<>();
|
||||
informationServers.put(ALFRESCO_CORE_NAME, server);
|
||||
admin.informationServers = informationServers;
|
||||
|
||||
IOpenBitSet txInIndexButNotInDb = new SolrOpenBitSetAdapter();
|
||||
IOpenBitSet duplicatedTxInIndex = new SolrOpenBitSetAdapter();
|
||||
IOpenBitSet missingTxFromIndex = new SolrOpenBitSetAdapter();
|
||||
range(1, 10).forEach(txInIndexButNotInDb::set);
|
||||
range(21, 32).forEach(duplicatedTxInIndex::set);
|
||||
range(50, 61).forEach(missingTxFromIndex::set);
|
||||
|
||||
int maxTransactionToSchedule = (int) (txInIndexButNotInDb.cardinality() +
|
||||
duplicatedTxInIndex.cardinality() +
|
||||
missingTxFromIndex.cardinality() -
|
||||
5);
|
||||
|
||||
IndexHealthReport emptyReport = mock(IndexHealthReport.class);
|
||||
when(emptyReport.getAclTxInIndexButNotInDb()).thenReturn(txInIndexButNotInDb);
|
||||
when(emptyReport.getDuplicatedAclTxInIndex()).thenReturn(duplicatedTxInIndex);
|
||||
when(emptyReport.getMissingAclTxFromIndex()).thenReturn(missingTxFromIndex);
|
||||
|
||||
NamedList<Object> subReport = admin.aclTxToReindex(ALFRESCO_CORE_NAME, mock(AclTracker.class), emptyReport, tx -> {}, maxTransactionToSchedule);
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(txInIndexButNotInDb.cardinality()),
|
||||
of(subReport.get(ACL_TX_IN_INDEX_NOT_IN_DB))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(ACL_TX_IN_INDEX_NOT_IN_DB + " section not found in response.")));
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(duplicatedTxInIndex.cardinality()),
|
||||
of(subReport.get(DUPLICATED_ACL_TX_IN_INDEX))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(DUPLICATED_ACL_TX_IN_INDEX + " section not found in response.")));
|
||||
|
||||
assertEquals(
|
||||
Long.valueOf(missingTxFromIndex.cardinality() - 5),
|
||||
of(subReport.get(MISSING_ACL_TX_IN_INDEX))
|
||||
.map(NamedList.class::cast)
|
||||
.map(NamedList::size)
|
||||
.map(Number::longValue)
|
||||
.orElseThrow(() -> new RuntimeException(MISSING_ACL_TX_IN_INDEX + " section not found in response.")));
|
||||
}
|
||||
|
||||
private <T> void assertThatExplicitParameterIsEchoed(String parameterName, T parameterValue)
|
||||
{
|
||||
admin = new AlfrescoCoreAdminHandler() {
|
||||
@Override
|
||||
NamedList<Object> fixOnSpecificCore(
|
||||
String coreName,
|
||||
Long fromTxCommitTime,
|
||||
Long toTxCommitTime,
|
||||
boolean dryRun,
|
||||
int maxTransactionsToSchedule) {
|
||||
return new NamedList<>(); // dummy entry
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isMasterOrStandalone(String coreName) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
admin.trackerRegistry = registry;
|
||||
|
||||
params.set(CORE, ALFRESCO_CORE_NAME);
|
||||
params.set(parameterName, parameterValue.toString());
|
||||
|
||||
NamedList<Object> actionResponse = admin.actionFIX(params);
|
||||
assertEquals(parameterValue, actionResponse.get(parameterName));
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user