[SEARCH-2279]

Registration to ACS is handled by a specific tracker
This commit is contained in:
Elia Porciani
2020-06-15 08:26:24 +01:00
parent 45f53ad71c
commit 3d45956f14
12 changed files with 62 additions and 71 deletions

View File

@@ -33,12 +33,12 @@ import org.alfresco.solr.adapters.IOpenBitSet;
import org.alfresco.solr.client.SOLRAPIClientFactory;
import org.alfresco.solr.config.ConfigUtil;
import org.alfresco.solr.tracker.AclTracker;
import org.alfresco.solr.tracker.CoreStatePublisher;
import org.alfresco.solr.tracker.AbstractShardInformationPublisher;
import org.alfresco.solr.tracker.DBIDRangeRouter;
import org.alfresco.solr.tracker.DocRouter;
import org.alfresco.solr.tracker.IndexHealthReport;
import org.alfresco.solr.tracker.MetadataTracker;
import org.alfresco.solr.tracker.SlaveCoreStatePublisher;
import org.alfresco.solr.tracker.NodeStatePublisher;
import org.alfresco.solr.tracker.SolrTrackerScheduler;
import org.alfresco.solr.tracker.Tracker;
import org.alfresco.solr.tracker.TrackerRegistry;
@@ -2009,11 +2009,11 @@ public class AlfrescoCoreAdminHandler extends CoreAdminHandler
* @param coreName the owning core name.
* @return the component which is in charge to publish the core state.
*/
CoreStatePublisher coreStatePublisher(String coreName)
AbstractShardInformationPublisher coreStatePublisher(String coreName)
{
return ofNullable(trackerRegistry.getTrackerForCore(coreName, MetadataTracker.class))
.map(CoreStatePublisher.class::cast)
.orElse(trackerRegistry.getTrackerForCore(coreName, SlaveCoreStatePublisher.class));
.map(AbstractShardInformationPublisher.class::cast)
.orElse(trackerRegistry.getTrackerForCore(coreName, NodeStatePublisher.class));
}
/**

View File

@@ -125,7 +125,7 @@ class HandlerReportHelper
return nr;
}
static NamedList<Object> buildNodeReport(CoreStatePublisher publisher, Long dbid) throws JSONException
static NamedList<Object> buildNodeReport(AbstractShardInformationPublisher publisher, Long dbid) throws JSONException
{
NodeReport nodeReport = publisher.checkNode(dbid);
@@ -248,7 +248,7 @@ class HandlerReportHelper
NamedList<Object> coreSummary = new SimpleOrderedMap<>();
coreSummary.addAll((SimpleOrderedMap<Object>) srv.getCoreStats());
SlaveCoreStatePublisher statePublisher = trackerRegistry.getTrackerForCore(cname, SlaveCoreStatePublisher.class);
NodeStatePublisher statePublisher = trackerRegistry.getTrackerForCore(cname, NodeStatePublisher.class);
TrackerState trackerState = statePublisher.getTrackerState();
long lastIndexTxCommitTime = trackerState.getLastIndexedTxCommitTime();

View File

@@ -51,7 +51,7 @@ import org.alfresco.solr.tracker.CommitTracker;
import org.alfresco.solr.tracker.ContentTracker;
import org.alfresco.solr.tracker.MetadataTracker;
import org.alfresco.solr.tracker.ModelTracker;
import org.alfresco.solr.tracker.SlaveCoreStatePublisher;
import org.alfresco.solr.tracker.NodeStatePublisher;
import org.alfresco.solr.tracker.SolrTrackerScheduler;
import org.alfresco.solr.tracker.Tracker;
import org.alfresco.solr.tracker.TrackerRegistry;
@@ -190,7 +190,7 @@ public class SolrCoreLoadListener extends AbstractSolrEventListener
{
LOGGER.info("SearchServices Core Trackers have been explicitly disabled on core \"{}\" through \"enable.alfresco.tracking\" configuration property.", core.getName());
SlaveCoreStatePublisher statePublisher = new SlaveCoreStatePublisher(false, coreProperties, repositoryClient, core.getName(), informationServer);
NodeStatePublisher statePublisher = new NodeStatePublisher(false, coreProperties, repositoryClient, core.getName(), informationServer);
trackerRegistry.register(core.getName(), statePublisher);
scheduler.schedule(statePublisher, core.getName(), coreProperties);
trackers.add(statePublisher);
@@ -205,7 +205,7 @@ public class SolrCoreLoadListener extends AbstractSolrEventListener
{
LOGGER.info("SearchServices Core Trackers have been disabled on core \"{}\" because it is a slave core.", core.getName());
SlaveCoreStatePublisher statePublisher = new SlaveCoreStatePublisher(false, coreProperties, repositoryClient, core.getName(), informationServer);
NodeStatePublisher statePublisher = new NodeStatePublisher(false, coreProperties, repositoryClient, core.getName(), informationServer);
trackerRegistry.register(core.getName(), statePublisher);
scheduler.schedule(statePublisher, core.getName(), coreProperties);
trackers.add(statePublisher);
@@ -258,12 +258,21 @@ public class SolrCoreLoadListener extends AbstractSolrEventListener
MetadataTracker metadataTracker =
registerAndSchedule(
new MetadataTracker(true, props, repositoryClient, core.getName(), srv, true),
new MetadataTracker(props, repositoryClient, core.getName(), srv, true),
core,
props,
trackerRegistry,
scheduler);
NodeStatePublisher coreStateTracker =
registerAndSchedule(
new NodeStatePublisher(true, props, repositoryClient, core.getName(), srv),
core,
props,
trackerRegistry,
scheduler
);
List<Tracker> trackers = new ArrayList<>();
String cascadeTrackerEnabledProp = ofNullable((String) props.get(CASCADE_TRACKER_ENABLED)).orElse("true");
@@ -283,7 +292,7 @@ public class SolrCoreLoadListener extends AbstractSolrEventListener
//The ContentTracker will likely have the longest runs so put it first to ensure the MetadataTracker is not paused while
//waiting for the ContentTracker to release it's lock.
//The aclTracker will likely have the shortest runs so put it last.
trackers.addAll(asList(contentTracker, metadataTracker, aclTracker));
trackers.addAll(asList(contentTracker, metadataTracker, aclTracker, coreStateTracker));
return trackers;
}

View File

@@ -26,10 +26,6 @@
package org.alfresco.solr.tracker;
import static java.util.Optional.of;
import static java.util.Optional.ofNullable;
import static org.alfresco.solr.tracker.DocRouterFactory.SHARD_KEY_KEY;
import org.alfresco.opencmis.dictionary.CMISStrictDictionaryService;
import org.alfresco.repo.dictionary.NamespaceDAO;
import org.alfresco.repo.index.shard.ShardMethodEnum;
@@ -53,6 +49,10 @@ import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import static java.util.Optional.of;
import static java.util.Optional.ofNullable;
import static org.alfresco.solr.tracker.DocRouterFactory.SHARD_KEY_KEY;
/**
* Superclass for all components which are able to inform Alfresco about the hosting node state.
* This has been introduced in SEARCH-1752 for splitting the dual responsibility of the {@link org.alfresco.solr.tracker.MetadataTracker}.
@@ -63,9 +63,9 @@ import java.util.Properties;
* @since 1.5
* @see <a href="https://issues.alfresco.com/jira/browse/SEARCH-1752">SEARCH-1752</a>
*/
public abstract class CoreStatePublisher extends AbstractTracker
public abstract class AbstractShardInformationPublisher extends AbstractTracker
{
private static final Logger LOGGER = LoggerFactory.getLogger(CoreStatePublisher.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractShardInformationPublisher.class);
DocRouter docRouter;
private final boolean isMaster;
@@ -75,7 +75,7 @@ public abstract class CoreStatePublisher extends AbstractTracker
/** The property to use for determining the shard. */
protected Optional<QName> shardProperty = Optional.empty();
CoreStatePublisher(
AbstractShardInformationPublisher(
boolean isMaster,
Properties p,
SOLRAPIClient client,
@@ -93,7 +93,7 @@ public abstract class CoreStatePublisher extends AbstractTracker
docRouter = DocRouterFactory.getRouter(p, ShardMethodEnum.getShardMethod(shardMethod));
}
CoreStatePublisher(Type type)
AbstractShardInformationPublisher(Type type)
{
super(type);
this.isMaster = false;
@@ -177,18 +177,8 @@ public abstract class CoreStatePublisher extends AbstractTracker
* The {@link ShardState}, as the name suggests, encapsulates/stores the state of the shard which hosts this
* {@link MetadataTracker} instance.
*
* The {@link ShardState} is primarily used in two places:
*
* <ul>
* <li>Transaction tracking: (see {@link MetadataTracker#getSomeTransactions(BoundedDeque, Long, long, int, long}): for pulling/tracking transactions from Alfresco</li>
* <li>
* DynamicSharding: the {@link MetadataTracker} is not running on a slave instances; in those cases a special
* "tracker" ({@link SlaveCoreStatePublisher}) will be in charge to send the correspondin shard state to Alfresco.
* </li>
* </ul>
*
* @return the {@link ShardState} instance which stores the current state of the hosting shard.
* @see SlaveCoreStatePublisher
* @see NodeStatePublisher
*/
ShardState getShardState()
{
@@ -230,7 +220,7 @@ public abstract class CoreStatePublisher extends AbstractTracker
.endShard()
.endShardInstance()
.build();
}
/**

View File

@@ -29,7 +29,6 @@ package org.alfresco.solr.tracker;
import com.google.common.collect.Lists;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.httpclient.AuthenticationException;
import org.alfresco.repo.index.shard.ShardState;
import org.alfresco.solr.BoundedDeque;
import org.alfresco.solr.InformationServer;
import org.alfresco.solr.NodeReport;
@@ -69,7 +68,7 @@ import static org.alfresco.repo.index.shard.ShardMethodEnum.DB_ID_RANGE;
* This tracks two things: transactions and metadata nodes
* @author Ahmed Owian
*/
public class MetadataTracker extends CoreStatePublisher implements Tracker
public class MetadataTracker extends AbstractShardInformationPublisher implements Tracker
{
protected final static Logger LOGGER = LoggerFactory.getLogger(MetadataTracker.class);
@@ -142,26 +141,25 @@ public class MetadataTracker extends CoreStatePublisher implements Tracker
*/
private Pair<Long, Long> minTxnIdRange;
public MetadataTracker(final boolean isMaster, Properties p, SOLRAPIClient client, String coreName,
public MetadataTracker(Properties p, SOLRAPIClient client, String coreName,
InformationServer informationServer)
{
this(isMaster, p, client, coreName, informationServer, false);
this(p, client, coreName, informationServer, false);
}
/**
* MetadataTracker constructor
*
* @param isMaster is true if SOLR instance is master, false otherwise
*
* @param p includes SOLR core properties (from environment variables and properties file)
* @param client Alfresco Repository http client
* @param coreName Name of the SOLR Core (alfresco, archive)
* @param informationServer SOLR Information Server
* @param checkRepoServicesAvailability is true if Repo Services availability needs to be checked
*/
public MetadataTracker(final boolean isMaster, Properties p, SOLRAPIClient client, String coreName,
public MetadataTracker( Properties p, SOLRAPIClient client, String coreName,
InformationServer informationServer, boolean checkRepoServicesAvailability)
{
super(isMaster, p, client, coreName, informationServer, Tracker.Type.METADATA);
super(true, p, client, coreName, informationServer, Tracker.Type.METADATA);
transactionDocsBatchSize = Integer.parseInt(p.getProperty("alfresco.transactionDocsBatchSize",
String.valueOf(DEFAULT_TRANSACTION_DOCS_BATCH_SIZE)));
@@ -736,8 +734,6 @@ public class MetadataTracker extends CoreStatePublisher implements Tracker
int maxResults, long endTime)
throws AuthenticationException, IOException, JSONException, EncoderException, NoSuchMethodException
{
ShardState shardstate = getShardState();
Transactions transactions;
// step forward in time until we find something or hit the time bound
@@ -749,14 +745,13 @@ public class MetadataTracker extends CoreStatePublisher implements Tracker
null,
startTime + timeStep,
null,
maxResults,
shardstate);
maxResults);
}
do
{
transactions = client.getTransactions(startTime, null, startTime + timeStep,
null, maxResults, shardstate);
null, maxResults);
startTime += timeStep;
// If no transactions are found, advance the time window to the next available transaction commit time
@@ -768,7 +763,7 @@ public class MetadataTracker extends CoreStatePublisher implements Tracker
LOGGER.info("{}-[CORE {}] Advancing transactions from {} to {}",
Thread.currentThread().getId(), coreName, startTime, nextTxCommitTime);
transactions = client.getTransactions(nextTxCommitTime, null,
nextTxCommitTime + timeStep, null, maxResults, shardstate);
nextTxCommitTime + timeStep, null, maxResults);
}
}

View File

@@ -58,9 +58,9 @@ import java.util.concurrent.Semaphore;
* @author Andrea Gazzarini
* @since 1.5
*/
public class SlaveCoreStatePublisher extends CoreStatePublisher
public class NodeStatePublisher extends AbstractShardInformationPublisher
{
private static final Logger LOGGER = LoggerFactory.getLogger(SlaveCoreStatePublisher.class);
private static final Logger LOGGER = LoggerFactory.getLogger(NodeStatePublisher.class);
// Share run and write locks across all SlaveCoreStatePublisher threads
@@ -79,7 +79,7 @@ public class SlaveCoreStatePublisher extends CoreStatePublisher
return RUN_LOCK_BY_CORE.get(coreName);
}
public SlaveCoreStatePublisher(
public NodeStatePublisher(
boolean isMaster,
Properties coreProperties,
SOLRAPIClient repositoryClient,
@@ -87,7 +87,6 @@ public class SlaveCoreStatePublisher extends CoreStatePublisher
SolrInformationServer informationServer)
{
super(isMaster, coreProperties, repositoryClient, name, informationServer, NODE_STATE_PUBLISHER);
RUN_LOCK_BY_CORE.put(coreName, new Semaphore(1, true));
WRITE_LOCK_BY_CORE.put(coreName, new Semaphore(1, true));
}
@@ -114,12 +113,6 @@ public class SlaveCoreStatePublisher extends CoreStatePublisher
// Do nothing here
}
@Override
public boolean isOnMasterOrStandalone()
{
return false;
}
@Override
public boolean hasMaintenance()
{

View File

@@ -62,7 +62,7 @@ import org.alfresco.solr.tracker.DocRouter;
import org.alfresco.solr.tracker.IndexHealthReport;
import org.alfresco.solr.tracker.MetadataTracker;
import org.alfresco.solr.tracker.PropertyRouter;
import org.alfresco.solr.tracker.SlaveCoreStatePublisher;
import org.alfresco.solr.tracker.NodeStatePublisher;
import org.alfresco.solr.tracker.TrackerRegistry;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CoreAdminParams;
@@ -232,12 +232,12 @@ public class AlfrescoCoreAdminHandlerIT
@Test
public void coreIsSlave_thenCoreStatePublisherInstanceCorrespondsToSlaveCoreStatePublisher()
{
SlaveCoreStatePublisher coreStatePublisher = mock(SlaveCoreStatePublisher.class);
NodeStatePublisher coreStateTracker = mock(NodeStatePublisher.class);
when(trackerRegistry.getTrackerForCore(anyString(), eq(MetadataTracker.class))).thenReturn(null);
when(trackerRegistry.getTrackerForCore(anyString(), eq(SlaveCoreStatePublisher.class))).thenReturn(coreStatePublisher);
when(trackerRegistry.getTrackerForCore(anyString(), eq(NodeStatePublisher.class))).thenReturn(coreStateTracker);
assertSame(coreStatePublisher, alfrescoCoreAdminHandler.coreStatePublisher("ThisIsTheCoreName"));
assertSame(coreStateTracker, alfrescoCoreAdminHandler.coreStatePublisher("ThisIsTheCoreName"));
}
@Test

View File

@@ -33,6 +33,7 @@ import static org.alfresco.solr.tracker.Tracker.Type.ACL;
import static org.alfresco.solr.tracker.Tracker.Type.CASCADE;
import static org.alfresco.solr.tracker.Tracker.Type.CONTENT;
import static org.alfresco.solr.tracker.Tracker.Type.METADATA;
import static org.alfresco.solr.tracker.Tracker.Type.NODE_STATE_PUBLISHER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -126,7 +127,7 @@ public class SolrCoreLoadListenerTest
verify(scheduler).schedule(any(CascadeTracker.class), eq(coreName), same(coreProperties));
Set<Type> trackerTypes = coreTrackers.stream().map(Tracker::getType).collect(Collectors.toSet());
assertEquals("Unexpected trackers found.", Set.of(ACL, CONTENT, METADATA, CASCADE), trackerTypes);
assertEquals("Unexpected trackers found.", Set.of(ACL, CONTENT, METADATA, NODE_STATE_PUBLISHER, CASCADE), trackerTypes);
}
@Test
@@ -147,7 +148,7 @@ public class SolrCoreLoadListenerTest
verify(scheduler, never()).schedule(any(CascadeTracker.class), eq(coreName), same(coreProperties));
Set<Type> trackerTypes = coreTrackers.stream().map(Tracker::getType).collect(Collectors.toSet());
assertEquals("Unexpected trackers found.", Set.of(ACL, CONTENT, METADATA), trackerTypes);
assertEquals("Unexpected trackers found.", Set.of(ACL, CONTENT, METADATA, NODE_STATE_PUBLISHER), trackerTypes);
}
@Test

View File

@@ -138,6 +138,7 @@ public class DistributedCascadeTrackerIT extends AbstractAlfrescoDistributedIT
indexParentFolderWithCascade();
waitForDocCount(params("qt", "/afts", "q", "PATH:" + cascadingFirstChild), 1, INDEX_TIMEOUT);
waitForDocCount(params("qt", "/afts", "q", "PATH:" + cascadingSecondChild), 1, INDEX_TIMEOUT);
// Check if the path is updated for both the nodes
assertShardCount(0, params("qt", "/afts", "q", "PATH:" + cascadingFirstChild), 1);

View File

@@ -81,7 +81,7 @@ public class MetadataTrackerTest
doReturn("workspace://SpacesStore").when(props).getProperty("alfresco.stores");
when(srv.getTrackerStats()).thenReturn(trackerStats);
String coreName = "theCoreName";
this.metadataTracker = spy(new MetadataTracker(true, props, repositoryClient, coreName, srv));
this.metadataTracker = spy(new MetadataTracker(props, repositoryClient, coreName, srv));
ModelTracker modelTracker = mock(ModelTracker.class);
when(modelTracker.hasModels()).thenReturn(true);

View File

@@ -133,7 +133,7 @@ public class SolrTrackerSchedulerTest
{
String exp = "0/4 * * * * ? *";
props.put("alfresco.metadata.tracker.cron", exp);
MetadataTracker metadataTracker = new MetadataTracker(true, props, client, exp, informationServer);
MetadataTracker metadataTracker = new MetadataTracker(props, client, exp, informationServer);
this.trackerScheduler.schedule(metadataTracker, CORE_NAME, props);
verify(spiedQuartzScheduler).scheduleJob(any(JobDetail.class), any(Trigger.class));
checkCronExpression(exp);

View File

@@ -346,10 +346,12 @@ public class SOLRAPIClient
}
}
public Transactions getTransactions(Long fromCommitTime, Long minTxnId, Long toCommitTime, Long maxTxnId, int maxResults, ShardState shardState) throws AuthenticationException, IOException, JSONException, EncoderException
public Transactions getTransactions(Long fromCommitTime, Long minTxnId, Long toCommitTime,
Long maxTxnId, int maxResults, ShardState shardState)
throws AuthenticationException, IOException, JSONException, EncoderException
{
URLCodec encoder = new URLCodec();
StringBuilder url = new StringBuilder(GET_TRANSACTIONS_URL);
StringBuilder args = new StringBuilder();
if (fromCommitTime != null)
@@ -379,7 +381,7 @@ public class SOLRAPIClient
args.append(encoder.encode("baseUrl")).append("=").append(encoder.encode(shardState.getShardInstance().getBaseUrl()));
args.append("&").append(encoder.encode("hostName")).append("=").append(encoder.encode(shardState.getShardInstance().getHostName()));
args.append("&").append(encoder.encode("template")).append("=").append(encoder.encode(shardState.getShardInstance().getShard().getFloc().getTemplate()));
for(String key : shardState.getShardInstance().getShard().getFloc().getPropertyBag().keySet())
{
String value = shardState.getShardInstance().getShard().getFloc().getPropertyBag().get(key);
@@ -388,7 +390,7 @@ public class SOLRAPIClient
args.append("&").append(encoder.encode("floc.property."+key)).append("=").append(encoder.encode(value));
}
}
for(String key : shardState.getPropertyBag().keySet())
{
String value = shardState.getPropertyBag().get(key);
@@ -397,7 +399,7 @@ public class SOLRAPIClient
args.append("&").append(encoder.encode("state.property."+key)).append("=").append(encoder.encode(value));
}
}
args.append("&").append(encoder.encode("instance")).append("=").append(encoder.encode("" + shardState.getShardInstance().getShard().getInstance()));
args.append("&").append(encoder.encode("numberOfShards")).append("=").append(encoder.encode("" + shardState.getShardInstance().getShard().getFloc().getNumberOfShards()));
args.append("&").append(encoder.encode("port")).append("=").append(encoder.encode("" + shardState.getShardInstance().getPort()));
@@ -413,13 +415,13 @@ public class SOLRAPIClient
args.append("&").append(encoder.encode("isMaster")).append("=").append(encoder.encode("" + shardState.isMaster()));
args.append("&").append(encoder.encode("hasContent")).append("=").append(encoder.encode("" + shardState.getShardInstance().getShard().getFloc().hasContent()));
args.append("&").append(encoder.encode("shardMethod")).append("=").append(encoder.encode(shardState.getShardInstance().getShard().getFloc().getShardMethod().toString()));
args.append("&").append(encoder.encode("lastUpdated")).append("=").append(encoder.encode("" + shardState.getLastUpdated()));
args.append("&").append(encoder.encode("lastIndexedChangeSetCommitTime")).append("=").append(encoder.encode("" + shardState.getLastIndexedChangeSetCommitTime()));
args.append("&").append(encoder.encode("lastIndexedChangeSetId")).append("=").append(encoder.encode("" + shardState.getLastIndexedChangeSetId()));
args.append("&").append(encoder.encode("lastIndexedTxCommitTime")).append("=").append(encoder.encode("" + shardState.getLastIndexedTxCommitTime()));
args.append("&").append(encoder.encode("lastIndexedTxId")).append("=").append(encoder.encode("" + shardState.getLastIndexedTxId()));
}
url.append(args);