mirror of
https://github.com/Alfresco/SearchServices.git
synced 2025-09-17 14:21:20 +00:00
Merge branch 'fix/SEARCH-2180' into 'fix/SEARCH_2175_CascadeTracker'
fix/SEARCH_2180 LookAheadBufferedReader in fix/SEARCH_2175 CascadeTracker See merge request search_discovery/insightengine!452
This commit is contained in:
@@ -0,0 +1,245 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2020 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.alfresco.solr.client;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import static java.util.stream.Collector.of;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Reader;
|
||||
|
||||
/**
|
||||
* This is an enhanced {@link BufferedReader} used for transparently collect data from the incoming stream before
|
||||
* it gets consumed by the usual buffered reader logic. The data is not actually consumed, it is just buffered beside and
|
||||
* it can be printed in case we want to debug the underlying stream content.
|
||||
*
|
||||
* The name refers to the usage pattern: this reader is used for wrapping a character stream coming from a remote call and, in case
|
||||
* of issues, it collects the remaining (i.e. unread) part of the stream so it will be available for debugging purposes.
|
||||
*
|
||||
* It provides two different collecting modes:
|
||||
*
|
||||
* <ul>
|
||||
* <li>
|
||||
* windowing: the collected data is a window of the original stream (about 500 chars), and the character that
|
||||
* caused a stop in the reading is more or less in the middle of that window.
|
||||
* </li>
|
||||
* <li>
|
||||
* everything: the collected data is the whole character stream
|
||||
* </li>
|
||||
* </ul>
|
||||
*
|
||||
* The two modes are activated on each instance depending on the level of the {@link Logger} passed in input.
|
||||
* Specifically:
|
||||
*
|
||||
* <ul>
|
||||
* <li>DEBUG: enables the windowing mode</li>
|
||||
* <li>TRACE: enables the "collect everything" mode</li>
|
||||
* <li>other levels simply disables the buffering behaviour (i.e. nothing is collected)</li>
|
||||
* </ul>
|
||||
*
|
||||
*/
|
||||
public class LookAheadBufferedReader extends BufferedReader
|
||||
{
|
||||
final static String BUFFERING_DISABLED_INFO_MESSAGE = "Not available: please set the logging LEVEL to DEBUG or TRACE.";
|
||||
|
||||
private interface BufferingMode
|
||||
{
|
||||
void append(char ch);
|
||||
|
||||
void forceAppend(char ch);
|
||||
|
||||
boolean canAccept(boolean force);
|
||||
}
|
||||
|
||||
private static class Windowing implements BufferingMode
|
||||
{
|
||||
private final LinkedList<Character> window = new LinkedList<>();
|
||||
private final int maxSize;
|
||||
|
||||
private Windowing(int maxSize)
|
||||
{
|
||||
this.maxSize = maxSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(char ch)
|
||||
{
|
||||
window.add(ch);
|
||||
if (window.size() == maxSize)
|
||||
{
|
||||
window.removeFirst();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forceAppend(char ch)
|
||||
{
|
||||
window.add(ch);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canAccept(boolean force)
|
||||
{
|
||||
return window.size() < (maxSize * 2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return window.stream()
|
||||
.collect(
|
||||
of(
|
||||
StringBuilder::new,
|
||||
StringBuilder::append,
|
||||
StringBuilder::append,
|
||||
StringBuilder::toString));
|
||||
}
|
||||
}
|
||||
|
||||
private static class WholeValue implements BufferingMode
|
||||
{
|
||||
private final StringBuilder content = new StringBuilder();
|
||||
|
||||
@Override
|
||||
public void append(char ch)
|
||||
{
|
||||
content.append(ch);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forceAppend(char ch)
|
||||
{
|
||||
content.append(ch);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return content.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canAccept(boolean force)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static class NoOp implements BufferingMode
|
||||
{
|
||||
@Override
|
||||
public void append(char ch)
|
||||
{
|
||||
// Nothing to be done here
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forceAppend(char ch)
|
||||
{
|
||||
// Nothing to be done here
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canAccept(boolean force)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return BUFFERING_DISABLED_INFO_MESSAGE;
|
||||
}
|
||||
}
|
||||
|
||||
private final BufferingMode bufferingMode;
|
||||
|
||||
LookAheadBufferedReader(Reader in, final int windowSize, boolean isDebugEnabled, boolean isTraceEnabled)
|
||||
{
|
||||
super(in);
|
||||
if (isTraceEnabled)
|
||||
{
|
||||
bufferingMode = new WholeValue();
|
||||
}
|
||||
else if(isDebugEnabled)
|
||||
{
|
||||
bufferingMode = new Windowing(windowSize);
|
||||
}
|
||||
else {
|
||||
bufferingMode = new NoOp();
|
||||
}
|
||||
}
|
||||
|
||||
public LookAheadBufferedReader(Reader in, final int windowSize, Logger logger)
|
||||
{
|
||||
this(in, windowSize, logger.isDebugEnabled(), logger.isTraceEnabled());
|
||||
}
|
||||
|
||||
public LookAheadBufferedReader(Reader in, Logger logger)
|
||||
{
|
||||
this(in, 250, logger);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException
|
||||
{
|
||||
int ch = super.read();
|
||||
|
||||
if (ch != -1) bufferingMode.append((char)ch);
|
||||
|
||||
return ch;
|
||||
}
|
||||
|
||||
public String lookAheadAndGetBufferedContent()
|
||||
{
|
||||
try
|
||||
{
|
||||
int ch;
|
||||
while ((ch = super.read()) != -1 && bufferingMode.canAccept(true))
|
||||
{
|
||||
bufferingMode.forceAppend((char) ch);
|
||||
}
|
||||
}
|
||||
catch (Exception ignore)
|
||||
{
|
||||
// Ignore any I/O exception causing further reading on the underlying stream
|
||||
// Just return the collected data
|
||||
}
|
||||
return bufferingMode.toString();
|
||||
}
|
||||
|
||||
boolean isInWindowingMode()
|
||||
{
|
||||
return bufferingMode instanceof Windowing;
|
||||
}
|
||||
|
||||
boolean isInCollectEverythingMode()
|
||||
{
|
||||
return bufferingMode instanceof WholeValue;
|
||||
}
|
||||
|
||||
boolean isBufferingDisabled()
|
||||
{
|
||||
return bufferingMode instanceof NoOp;
|
||||
}
|
||||
}
|
@@ -25,11 +25,13 @@
|
||||
*/
|
||||
package org.alfresco.solr.client;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import static java.util.Optional.ofNullable;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.Reader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
@@ -46,6 +48,7 @@ import org.alfresco.httpclient.AlfrescoHttpClient;
|
||||
import org.alfresco.httpclient.AuthenticationException;
|
||||
import org.alfresco.httpclient.GetRequest;
|
||||
import org.alfresco.httpclient.PostRequest;
|
||||
import org.alfresco.httpclient.Request;
|
||||
import org.alfresco.httpclient.Response;
|
||||
import org.alfresco.repo.dictionary.M2Model;
|
||||
import org.alfresco.repo.dictionary.NamespaceDAO;
|
||||
@@ -94,7 +97,7 @@ import com.fasterxml.jackson.core.JsonToken;
|
||||
*/
|
||||
public class SOLRAPIClient
|
||||
{
|
||||
protected final static Logger log = LoggerFactory.getLogger(SOLRAPIClient.class);
|
||||
protected final static Logger LOGGER = LoggerFactory.getLogger(SOLRAPIClient.class);
|
||||
private static final String GET_ACL_CHANGESETS_URL = "api/solr/aclchangesets";
|
||||
private static final String GET_ACLS = "api/solr/acls";
|
||||
private static final String GET_ACLS_READERS = "api/solr/aclsReaders";
|
||||
@@ -142,7 +145,7 @@ public class SOLRAPIClient
|
||||
this.jsonFactory = new JsonFactory();
|
||||
this.compression = compression;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the ACL ChangeSets
|
||||
*
|
||||
@@ -179,32 +182,7 @@ public class SOLRAPIClient
|
||||
url.append(args);
|
||||
|
||||
GetRequest req = new GetRequest(url.toString());
|
||||
Response response = null;
|
||||
JSONObject json = null;
|
||||
try
|
||||
{
|
||||
response = repositoryHttpClient.sendRequest(req);
|
||||
|
||||
if (response.getStatus() != HttpStatus.SC_OK)
|
||||
{
|
||||
throw new AlfrescoRuntimeException(GET_ACL_CHANGESETS_URL + " return status:" + response.getStatus());
|
||||
}
|
||||
|
||||
Reader reader = new BufferedReader(new InputStreamReader(response.getContentAsStream(), "UTF-8"));
|
||||
json = new JSONObject(new JSONTokener(reader));
|
||||
}
|
||||
finally
|
||||
{
|
||||
if(response != null)
|
||||
{
|
||||
response.release();
|
||||
}
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled())
|
||||
{
|
||||
log.debug(json.toString(3));
|
||||
}
|
||||
JSONObject json = callRepository(GET_ACL_CHANGESETS_URL, req);
|
||||
|
||||
JSONArray aclChangeSetsJSON = json.getJSONArray("aclChangeSets");
|
||||
List<AclChangeSet> aclChangeSets = new ArrayList<AclChangeSet>(aclChangeSetsJSON.length());
|
||||
@@ -217,8 +195,7 @@ public class SOLRAPIClient
|
||||
AclChangeSet aclChangeSet = new AclChangeSet(aclChangeSetId, commitTimeMs, aclCount);
|
||||
aclChangeSets.add(aclChangeSet);
|
||||
}
|
||||
|
||||
|
||||
|
||||
Long maxChangeSetCommitTime = null;
|
||||
if(json.has("maxChangeSetCommitTime"))
|
||||
{
|
||||
@@ -273,32 +250,7 @@ public class SOLRAPIClient
|
||||
jsonReq.put("aclChangeSetIds", aclChangeSetIdsJSON);
|
||||
|
||||
PostRequest req = new PostRequest(url.toString(), jsonReq.toString(), "application/json");
|
||||
Response response = null;
|
||||
JSONObject json = null;
|
||||
try
|
||||
{
|
||||
response = repositoryHttpClient.sendRequest(req);
|
||||
|
||||
if (response.getStatus() != HttpStatus.SC_OK)
|
||||
{
|
||||
throw new AlfrescoRuntimeException(GET_ACL_CHANGESETS_URL + " return status:" + response.getStatus());
|
||||
}
|
||||
|
||||
Reader reader = new BufferedReader(new InputStreamReader(response.getContentAsStream(), "UTF-8"));
|
||||
json = new JSONObject(new JSONTokener(reader));
|
||||
}
|
||||
finally
|
||||
{
|
||||
if(response != null)
|
||||
{
|
||||
response.release();
|
||||
}
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled())
|
||||
{
|
||||
log.debug(json.toString(3));
|
||||
}
|
||||
JSONObject json = callRepository(GET_ACL_CHANGESETS_URL, req);
|
||||
|
||||
JSONArray aclsJSON = json.getJSONArray("acls");
|
||||
List<Acl> acls = new ArrayList<Acl>(aclsJSON.length());
|
||||
@@ -336,32 +288,7 @@ public class SOLRAPIClient
|
||||
jsonReq.put("aclIds", aclIdsJSON);
|
||||
|
||||
PostRequest req = new PostRequest(url.toString(), jsonReq.toString(), "application/json");
|
||||
Response response = null;
|
||||
JSONObject json = null;
|
||||
try
|
||||
{
|
||||
response = repositoryHttpClient.sendRequest(req);
|
||||
|
||||
if (response.getStatus() != HttpStatus.SC_OK)
|
||||
{
|
||||
throw new AlfrescoRuntimeException(GET_ACLS_READERS + " return status:" + response.getStatus());
|
||||
}
|
||||
|
||||
Reader reader = new BufferedReader(new InputStreamReader(response.getContentAsStream(), "UTF-8"));
|
||||
json = new JSONObject(new JSONTokener(reader));
|
||||
}
|
||||
finally
|
||||
{
|
||||
if(response != null)
|
||||
{
|
||||
response.release();
|
||||
}
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled())
|
||||
{
|
||||
log.debug(json.toString(3));
|
||||
}
|
||||
JSONObject json = callRepository(GET_ACLS_READERS, req);
|
||||
|
||||
JSONArray aclsReadersJSON = json.getJSONArray("aclsReaders");
|
||||
List<AclReaders> aclsReaders = new ArrayList<AclReaders>(aclsReadersJSON.length());
|
||||
@@ -421,7 +348,6 @@ public class SOLRAPIClient
|
||||
|
||||
public Transactions getTransactions(Long fromCommitTime, Long minTxnId, Long toCommitTime, Long maxTxnId, int maxResults, ShardState shardState) throws AuthenticationException, IOException, JSONException, EncoderException
|
||||
{
|
||||
log.debug("### get transactions ###");
|
||||
URLCodec encoder = new URLCodec();
|
||||
|
||||
StringBuilder url = new StringBuilder(GET_TRANSACTIONS_URL);
|
||||
@@ -448,7 +374,7 @@ public class SOLRAPIClient
|
||||
}
|
||||
if(shardState != null)
|
||||
{
|
||||
log.debug("### Shard state exists ###");
|
||||
LOGGER.debug("### Shard state exists ###");
|
||||
args.append(args.length() == 0 ? "?" : "&");
|
||||
args.append(encoder.encode("baseUrl")).append("=").append(encoder.encode(shardState.getShardInstance().getBaseUrl()));
|
||||
args.append("&").append(encoder.encode("hostName")).append("=").append(encoder.encode(shardState.getShardInstance().getHostName()));
|
||||
@@ -497,12 +423,14 @@ public class SOLRAPIClient
|
||||
}
|
||||
|
||||
url.append(args);
|
||||
log.debug("### GetRequest: " + url.toString());
|
||||
LOGGER.debug("### GetRequest: " + url.toString());
|
||||
GetRequest req = new GetRequest(url.toString());
|
||||
Response response = null;
|
||||
List<Transaction> transactions = new ArrayList<Transaction>();
|
||||
Long maxTxnCommitTime = null;
|
||||
Long maxTxnIdOnServer = null;
|
||||
|
||||
LookAheadBufferedReader reader = null;
|
||||
try
|
||||
{
|
||||
response = repositoryHttpClient.sendRequest(req);
|
||||
@@ -510,7 +438,8 @@ public class SOLRAPIClient
|
||||
{
|
||||
throw new AlfrescoRuntimeException("GetTransactions return status is " + response.getStatus());
|
||||
}
|
||||
Reader reader = new BufferedReader(new InputStreamReader(response.getContentAsStream(), "UTF-8"));
|
||||
|
||||
reader = new LookAheadBufferedReader(new InputStreamReader(response.getContentAsStream(), StandardCharsets.UTF_8), LOGGER);
|
||||
JsonParser parser = jsonFactory.createParser(reader);
|
||||
|
||||
JsonToken token = parser.nextValue();
|
||||
@@ -556,18 +485,25 @@ public class SOLRAPIClient
|
||||
token = parser.nextValue();
|
||||
}
|
||||
parser.close();
|
||||
reader.close();
|
||||
|
||||
}
|
||||
catch (JSONException exception)
|
||||
{
|
||||
String message = "Received a malformed JSON payload. Request was \"" +
|
||||
req.getFullUri() +
|
||||
"Data: "
|
||||
+ ofNullable(reader)
|
||||
.map(LookAheadBufferedReader::lookAheadAndGetBufferedContent)
|
||||
.orElse("Not available");
|
||||
LOGGER.error(message);
|
||||
throw exception;
|
||||
}
|
||||
finally
|
||||
{
|
||||
log.debug("## end getTransactions");
|
||||
if(response != null)
|
||||
{
|
||||
response.release();
|
||||
}
|
||||
ofNullable(response).ifPresent(Response::release);
|
||||
ofNullable(reader).ifPresent(this::silentlyClose);
|
||||
}
|
||||
log.debug("### Transactions found maxTxnCommitTime: " + maxTxnCommitTime );
|
||||
|
||||
LOGGER.debug("### Transactions found maxTxnCommitTime: " + maxTxnCommitTime );
|
||||
return new Transactions(transactions, maxTxnCommitTime, maxTxnIdOnServer);
|
||||
}
|
||||
|
||||
@@ -637,35 +573,10 @@ public class SOLRAPIClient
|
||||
|
||||
|
||||
PostRequest req = new PostRequest(url.toString(), body.toString(), "application/json");
|
||||
|
||||
Response response = null;
|
||||
JSONObject json = null;
|
||||
try
|
||||
{
|
||||
response = repositoryHttpClient.sendRequest(req);
|
||||
if(response.getStatus() != HttpStatus.SC_OK)
|
||||
{
|
||||
throw new AlfrescoRuntimeException("GetNodes return status is " + response.getStatus());
|
||||
}
|
||||
|
||||
Reader reader = new BufferedReader(new InputStreamReader(response.getContentAsStream(), "UTF-8"));
|
||||
json = new JSONObject(new JSONTokener(reader));
|
||||
}
|
||||
finally
|
||||
{
|
||||
if(response != null)
|
||||
{
|
||||
response.release();
|
||||
}
|
||||
}
|
||||
|
||||
if(log.isDebugEnabled())
|
||||
{
|
||||
log.debug(json.toString());
|
||||
}
|
||||
JSONObject json = callRepository(GET_NODES_URL, req);
|
||||
|
||||
JSONArray jsonNodes = json.getJSONArray("nodes");
|
||||
List<Node> nodes = new ArrayList<Node>(jsonNodes.length());
|
||||
List<Node> nodes = new ArrayList<>(jsonNodes.length());
|
||||
for(int i = 0; i < jsonNodes.length(); i++)
|
||||
{
|
||||
JSONObject jsonNodeInfo = jsonNodes.getJSONObject(i);
|
||||
@@ -893,34 +804,9 @@ public class SOLRAPIClient
|
||||
body.put("maxResults", maxResults);
|
||||
|
||||
PostRequest req = new PostRequest(url.toString(), body.toString(), "application/json");
|
||||
Response response = null;
|
||||
JSONObject json = null;
|
||||
try
|
||||
{
|
||||
response = repositoryHttpClient.sendRequest(req);
|
||||
if(response.getStatus() != HttpStatus.SC_OK)
|
||||
{
|
||||
throw new AlfrescoRuntimeException("GetNodeMetaData return status is " + response.getStatus());
|
||||
}
|
||||
|
||||
Reader reader = new BufferedReader(new InputStreamReader(response.getContentAsStream(), "UTF-8"));
|
||||
json = new JSONObject(new JSONTokener(reader));
|
||||
}
|
||||
finally
|
||||
{
|
||||
if(response != null)
|
||||
{
|
||||
response.release();
|
||||
}
|
||||
}
|
||||
JSONObject json = callRepository(GET_METADATA_URL, req);
|
||||
|
||||
if (log.isDebugEnabled())
|
||||
{
|
||||
log.debug(json.toString(3));
|
||||
}
|
||||
|
||||
JSONArray jsonNodes = json.getJSONArray("nodes");
|
||||
|
||||
List<NodeMetaData> nodes = new ArrayList<NodeMetaData>(jsonNodes.length());
|
||||
for(int i = 0; i < jsonNodes.length(); i++)
|
||||
{
|
||||
@@ -1159,7 +1045,7 @@ public class SOLRAPIClient
|
||||
return new GetTextContentResponse(response);
|
||||
}
|
||||
|
||||
public AlfrescoModel getModel(String coreName, QName modelName) throws AuthenticationException, IOException, JSONException
|
||||
public AlfrescoModel getModel(String coreName, QName modelName) throws AuthenticationException, IOException
|
||||
{
|
||||
// If the model is new to the SOLR side the prefix will be unknown so we can not generate prefixes for the request!
|
||||
// Always use the full QName with explicit URI
|
||||
@@ -1209,31 +1095,8 @@ public class SOLRAPIClient
|
||||
body.put("models", jsonModels);
|
||||
|
||||
PostRequest req = new PostRequest(url.toString(), body.toString(), "application/json");
|
||||
Response response = null;
|
||||
JSONObject json = null;
|
||||
try
|
||||
{
|
||||
response = repositoryHttpClient.sendRequest(req);
|
||||
if(response.getStatus() != HttpStatus.SC_OK)
|
||||
{
|
||||
throw new AlfrescoRuntimeException(coreName + " GetModelsDiff return status is " + response.getStatus());
|
||||
}
|
||||
|
||||
Reader reader = new BufferedReader(new InputStreamReader(response.getContentAsStream(), "UTF-8"));
|
||||
json = new JSONObject(new JSONTokener(reader));
|
||||
}
|
||||
finally
|
||||
{
|
||||
if(response != null)
|
||||
{
|
||||
response.release();
|
||||
}
|
||||
}
|
||||
|
||||
if(log.isDebugEnabled())
|
||||
{
|
||||
log.debug(json.toString());
|
||||
}
|
||||
JSONObject json = callRepository(GET_MODELS_DIFF, req);
|
||||
|
||||
JSONArray jsonDiffs = json.getJSONArray("diffs");
|
||||
if(jsonDiffs == null)
|
||||
{
|
||||
@@ -1271,6 +1134,7 @@ public class SOLRAPIClient
|
||||
GetRequest get = new GetRequest(url.toString());
|
||||
Response response = null;
|
||||
JSONObject json = null;
|
||||
LookAheadBufferedReader reader = null;
|
||||
try
|
||||
{
|
||||
response = repositoryHttpClient.sendRequest(get);
|
||||
@@ -1280,19 +1144,29 @@ public class SOLRAPIClient
|
||||
+ response.getStatus() + " when invoking " + url);
|
||||
}
|
||||
|
||||
Reader reader = new BufferedReader(new InputStreamReader(response.getContentAsStream(), "UTF-8"));
|
||||
json = new JSONObject(new JSONTokener(reader));
|
||||
reader = new LookAheadBufferedReader(new InputStreamReader(response.getContentAsStream(), StandardCharsets.UTF_8), LOGGER);
|
||||
json = new JSONObject(new JSONTokener(reader));
|
||||
}
|
||||
catch (JSONException exception)
|
||||
{
|
||||
String message = "Received a malformed JSON payload. Request was \"" +
|
||||
get.getFullUri() +
|
||||
"Data: "
|
||||
+ ofNullable(reader)
|
||||
.map(LookAheadBufferedReader::lookAheadAndGetBufferedContent)
|
||||
.orElse("Not available");
|
||||
LOGGER.error(message);
|
||||
throw exception;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (response != null)
|
||||
{
|
||||
response.release();
|
||||
}
|
||||
ofNullable(response).ifPresent(Response::release);
|
||||
ofNullable(reader).ifPresent(this::silentlyClose);
|
||||
}
|
||||
if (log.isDebugEnabled())
|
||||
|
||||
if (LOGGER.isDebugEnabled())
|
||||
{
|
||||
log.debug(json.toString());
|
||||
LOGGER.debug(json.toString());
|
||||
}
|
||||
|
||||
return Long.parseLong(json.get("nextTransactionCommitTimeMs").toString());
|
||||
@@ -1318,6 +1192,7 @@ public class SOLRAPIClient
|
||||
GetRequest get = new GetRequest(url.toString());
|
||||
Response response = null;
|
||||
JSONObject json = null;
|
||||
LookAheadBufferedReader reader = null;
|
||||
try
|
||||
{
|
||||
response = repositoryHttpClient.sendRequest(get);
|
||||
@@ -1327,19 +1202,29 @@ public class SOLRAPIClient
|
||||
+ response.getStatus() + " when invoking " + url);
|
||||
}
|
||||
|
||||
Reader reader = new BufferedReader(new InputStreamReader(response.getContentAsStream(), "UTF-8"));
|
||||
reader = new LookAheadBufferedReader(new InputStreamReader(response.getContentAsStream(), StandardCharsets.UTF_8), LOGGER);
|
||||
json = new JSONObject(new JSONTokener(reader));
|
||||
}
|
||||
catch(JSONException exception)
|
||||
{
|
||||
String message = "Received a malformed JSON payload. Request was \"" +
|
||||
get.getFullUri() +
|
||||
"Data: "
|
||||
+ ofNullable(reader)
|
||||
.map(LookAheadBufferedReader::lookAheadAndGetBufferedContent)
|
||||
.orElse("Not available");
|
||||
LOGGER.error(message);
|
||||
throw exception;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (response != null)
|
||||
{
|
||||
response.release();
|
||||
}
|
||||
ofNullable(response).ifPresent(Response::release);
|
||||
ofNullable(reader).ifPresent(this::silentlyClose);
|
||||
}
|
||||
if (log.isDebugEnabled())
|
||||
|
||||
if (LOGGER.isDebugEnabled())
|
||||
{
|
||||
log.debug(json.toString());
|
||||
LOGGER.debug(json.toString());
|
||||
}
|
||||
|
||||
return new Pair<Long, Long>(Long.parseLong(json.get("minTransactionCommitTimeMs").toString()),
|
||||
@@ -1692,4 +1577,56 @@ public class SOLRAPIClient
|
||||
{
|
||||
repositoryHttpClient.close();
|
||||
}
|
||||
|
||||
private JSONObject callRepository(String msgId, Request req) throws IOException, AuthenticationException
|
||||
{
|
||||
Response response = null;
|
||||
LookAheadBufferedReader reader = null;
|
||||
JSONObject json;
|
||||
try
|
||||
{
|
||||
response = repositoryHttpClient.sendRequest(req);
|
||||
if (response.getStatus() != HttpStatus.SC_OK)
|
||||
{
|
||||
throw new AlfrescoRuntimeException(msgId + " return status:" + response.getStatus());
|
||||
}
|
||||
|
||||
reader = new LookAheadBufferedReader(new InputStreamReader(response.getContentAsStream(), StandardCharsets.UTF_8), LOGGER);
|
||||
json = new JSONObject(new JSONTokener(reader));
|
||||
|
||||
if (LOGGER.isDebugEnabled())
|
||||
{
|
||||
LOGGER.debug(json.toString(3));
|
||||
}
|
||||
return json;
|
||||
}
|
||||
catch (JSONException exception)
|
||||
{
|
||||
String message = "Received a malformed JSON payload. Request was \"" +
|
||||
req.getFullUri() +
|
||||
"Data: "
|
||||
+ ofNullable(reader)
|
||||
.map(LookAheadBufferedReader::lookAheadAndGetBufferedContent)
|
||||
.orElse("Not available");
|
||||
LOGGER.error(message);
|
||||
throw exception;
|
||||
}
|
||||
finally
|
||||
{
|
||||
ofNullable(response).ifPresent(Response::release);
|
||||
ofNullable(reader).ifPresent(this::silentlyClose);
|
||||
}
|
||||
}
|
||||
|
||||
private void silentlyClose(Closeable closeable)
|
||||
{
|
||||
try
|
||||
{
|
||||
closeable.close();
|
||||
}
|
||||
catch (Exception ignore)
|
||||
{
|
||||
// Nothing to be done here
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -0,0 +1,210 @@
|
||||
/*-
|
||||
* #%L
|
||||
* Alfresco Remote API
|
||||
* %%
|
||||
* Copyright (C) 2005 - 2020 Alfresco Software Limited
|
||||
* %%
|
||||
* This file is part of the Alfresco software.
|
||||
* If the software was purchased under a paid Alfresco license, the terms of
|
||||
* the paid license agreement will prevail. Otherwise, the software is
|
||||
* provided under the following open source license terms:
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
* #L%
|
||||
*/
|
||||
package org.alfresco.solr.client;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.io.Reader;
|
||||
import java.io.StringReader;
|
||||
|
||||
import static java.util.stream.IntStream.range;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class LookAheadBufferedReaderTest
|
||||
{
|
||||
@Mock
|
||||
Reader reader;
|
||||
|
||||
private final String data = "1234567890ABCDEFGHILMNOPQRSTUVYXZ";
|
||||
|
||||
@Test
|
||||
public void windowingModeEnabled()
|
||||
{
|
||||
LookAheadBufferedReader classUnderTest = new LookAheadBufferedReader(reader, data.length(), true, false);
|
||||
assertTrue(classUnderTest.isInWindowingMode());
|
||||
assertFalse(classUnderTest.isInCollectEverythingMode());
|
||||
assertFalse(classUnderTest.isBufferingDisabled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void collectEverythingModeEnabled()
|
||||
{
|
||||
LookAheadBufferedReader classUnderTest = new LookAheadBufferedReader(reader, data.length(), false, true);
|
||||
assertTrue(classUnderTest.isInCollectEverythingMode());
|
||||
assertFalse(classUnderTest.isInWindowingMode());
|
||||
assertFalse(classUnderTest.isBufferingDisabled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bufferingDisabled()
|
||||
{
|
||||
LookAheadBufferedReader classUnderTest = new LookAheadBufferedReader(reader, data.length(), false, false);
|
||||
assertTrue(classUnderTest.isBufferingDisabled());
|
||||
assertFalse(classUnderTest.isInCollectEverythingMode());
|
||||
assertFalse(classUnderTest.isInWindowingMode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void collectEverythingWinsOverWindowing()
|
||||
{
|
||||
LookAheadBufferedReader classUnderTest = new LookAheadBufferedReader(reader, data.length(), true, true);
|
||||
assertTrue(classUnderTest.isInCollectEverythingMode());
|
||||
assertFalse(classUnderTest.isInWindowingMode());
|
||||
assertFalse(classUnderTest.isBufferingDisabled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void windowingModeShouldCollectPartialWindowsOfData()
|
||||
{
|
||||
int windowSize = 10;
|
||||
Reader reader = new StringReader(data);
|
||||
LookAheadBufferedReader classUnderTest = new LookAheadBufferedReader(reader, windowSize, true, false);
|
||||
|
||||
// Read only 12 chars from the underlying stream
|
||||
range(0, 12).forEach(index -> consume(classUnderTest));
|
||||
|
||||
String collectedWindow = classUnderTest.lookAheadAndGetBufferedContent();
|
||||
|
||||
assertEquals(windowSize * 2, collectedWindow.length());
|
||||
assertEquals("4567890ABCDEFGHILMNO", collectedWindow);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void notEnoughCharsForTheWindow()
|
||||
{
|
||||
int windowSize = 10;
|
||||
Reader reader = new StringReader(data);
|
||||
LookAheadBufferedReader classUnderTest = new LookAheadBufferedReader(reader, windowSize, true, false);
|
||||
|
||||
range(0, data.length() - 3).forEach(index -> consume(classUnderTest));
|
||||
|
||||
String collectedWindow = classUnderTest.lookAheadAndGetBufferedContent();
|
||||
|
||||
assertEquals(windowSize + 2, collectedWindow.length());
|
||||
assertEquals("NOPQRSTUVYXZ", collectedWindow);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyDataShouldCollectEmptyWindow()
|
||||
{
|
||||
Reader reader = new StringReader("");
|
||||
LookAheadBufferedReader classUnderTest = new LookAheadBufferedReader(reader, 10, true, false);
|
||||
|
||||
range(0, data.length() - 3).forEach(index -> consume(classUnderTest));
|
||||
|
||||
String collectedWindow = classUnderTest.lookAheadAndGetBufferedContent();
|
||||
|
||||
assertEquals(0, collectedWindow.length());
|
||||
assertEquals("", collectedWindow);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void collectEverythingModeShouldCollectTheWholeStream()
|
||||
{
|
||||
Reader reader = new StringReader(data);
|
||||
LookAheadBufferedReader classUnderTest =
|
||||
new LookAheadBufferedReader(
|
||||
reader,
|
||||
10, // this has no effect
|
||||
false,
|
||||
true);
|
||||
|
||||
// Read only 12 chars from the underlying stream
|
||||
range(0, 12).forEach(index -> consume(classUnderTest));
|
||||
|
||||
String collectedData = classUnderTest.lookAheadAndGetBufferedContent();
|
||||
|
||||
assertEquals(data.length(), collectedData.length());
|
||||
assertEquals(data, collectedData);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyDataShouldCollectEmptyStringInCollectEverythingMode()
|
||||
{
|
||||
Reader reader = new StringReader("");
|
||||
LookAheadBufferedReader classUnderTest =
|
||||
new LookAheadBufferedReader(
|
||||
reader,
|
||||
10, // this has no effect
|
||||
false,
|
||||
true);
|
||||
|
||||
range(0, data.length() - 3).forEach(index -> consume(classUnderTest));
|
||||
|
||||
String collectedData = classUnderTest.lookAheadAndGetBufferedContent();
|
||||
|
||||
assertEquals(0, collectedData.length());
|
||||
assertEquals("", collectedData);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bufferingModeDisableShouldCollectNoData()
|
||||
{
|
||||
int windowSize = 10;
|
||||
Reader reader = new StringReader(data);
|
||||
LookAheadBufferedReader classUnderTest = new LookAheadBufferedReader(reader, windowSize, false, false);
|
||||
|
||||
// Read only 12 chars from the underlying stream
|
||||
range(0, 12).forEach(index -> consume(classUnderTest));
|
||||
assertEquals(LookAheadBufferedReader.BUFFERING_DISABLED_INFO_MESSAGE, classUnderTest.lookAheadAndGetBufferedContent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyDataShouldCollectEmptyDataWhenBufferingIsDisabled()
|
||||
{
|
||||
Reader reader = new StringReader("");
|
||||
LookAheadBufferedReader classUnderTest =
|
||||
new LookAheadBufferedReader(
|
||||
reader,
|
||||
10, // this has no effect
|
||||
false,
|
||||
false);
|
||||
|
||||
range(0, data.length() - 3).forEach(index -> consume(classUnderTest));
|
||||
|
||||
String collectedData = classUnderTest.lookAheadAndGetBufferedContent();
|
||||
|
||||
assertEquals(LookAheadBufferedReader.BUFFERING_DISABLED_INFO_MESSAGE, collectedData);
|
||||
}
|
||||
|
||||
private void consume(Reader reader)
|
||||
{
|
||||
try
|
||||
{
|
||||
reader.read();
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
throw new RuntimeException(exception);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user