Skip to content

Commit

Permalink
Handle text/* in response content type, so we can show message for e.…
Browse files Browse the repository at this point in the history
…g. text/html
  • Loading branch information
hmusum committed Feb 9, 2025
1 parent b7ca21b commit 4c922a8
Showing 1 changed file with 88 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.fasterxml.jackson.core.StreamReadConstraints;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.time.Duration;
Expand Down Expand Up @@ -48,7 +47,9 @@ class HttpFeedClient implements FeedClient {

private static final Duration maxTimeout = Duration.ofMinutes(15);
private static final JsonFactory jsonParserFactory = new JsonFactoryBuilder()
.streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build())
.streamReadConstraints(StreamReadConstraints.builder()
.maxStringLength(Integer.MAX_VALUE)
.build())
.build();

private final Map<String, Supplier<String>> requestHeaders;
Expand Down Expand Up @@ -122,7 +123,8 @@ private CompletableFuture<Result> send(String method, DocumentId documentId, Str
getQuery(params, speedTest),
requestHeaders,
operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way?
params.timeout().orElse(maxTimeout),
params.timeout()
.orElse(maxTimeout),
nanoClock);

CompletableFuture<Result> promise = new CompletableFuture<>();
Expand All @@ -134,8 +136,7 @@ private CompletableFuture<Result> send(String method, DocumentId documentId, Str
thrown = thrown.getCause();

promise.completeExceptionally(thrown);
}
else
} else
promise.complete(result);
});
return promise;
Expand All @@ -155,61 +156,76 @@ private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clus
cluster.dispatch(request, future);
HttpResponse response = future.get(20, TimeUnit.SECONDS);
if (response.code() != 200) {
String message;
if (response.body() != null) switch (response.contentType()) {
case "application/json": message = parseMessage(response.body()); break;
case "text/plain": message = new String(response.body(), UTF_8); break;
default: message = response.toString(); break;
}
else message = response.toString();

String message = getMessageFromResponse(response);
// Old server ignores ?dryRun=true, but getting this particular error message means everything else is OK.
if (response.code() == 400 && "Could not read document, no document?".equals(message)) {
if (builder.speedTest) throw new FeedException("server does not support speed test; upgrade to a newer version");
if (builder.speedTest)
throw new FeedException("server does not support speed test; upgrade to a newer version");
return;
}
throw new FeedException("server responded non-OK to handshake: " + message);
}
}
catch (ExecutionException e) {
} catch (ExecutionException e) {
Duration duration = Duration.between(start, Instant.now());
throw new FeedException("failed handshake with server after " + duration + ": " + e.getCause(), e.getCause());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
throw new FeedException("interrupted during handshake with server", e);
}
catch (TimeoutException e) {
} catch (TimeoutException e) {
throw new FeedException("timed out during handshake with server", e);
}
}

private static String getMessageFromResponse(HttpResponse response) {
byte[] body = response.body();
if (body == null) return response.toString();

String message;
String contentType = response.contentType();
if (contentType.equals("application/json"))
message = parseMessage(body);
else if (contentType.startsWith("text/"))
message = new String(body, UTF_8);
else
message = response.toString();

return message;
}

private static String parseMessage(byte[] json) {
try {
return parse(null, json).message;
}
catch (Exception e) {
} catch (Exception e) {
return new String(json, UTF_8);
}
}

private enum Outcome { success, conditionNotMet, vespaFailure, transportFailure };
private enum Outcome {success, conditionNotMet, vespaFailure, transportFailure}

;

static Result.Type toResultType(Outcome outcome) {
switch (outcome) {
case success: return Result.Type.success;
case conditionNotMet: return Result.Type.conditionNotMet;
default: throw new IllegalArgumentException("No corresponding result type for '" + outcome + "'");
case success:
return Result.Type.success;
case conditionNotMet:
return Result.Type.conditionNotMet;
default:
throw new IllegalArgumentException("No corresponding result type for '" + outcome + "'");
}
}

private static class MessageAndTrace {

final String message;
final String trace;

MessageAndTrace(String message, String trace) {
this.message = message;
this.trace = trace;
}

}

static MessageAndTrace parse(DocumentId documentId, byte[] json) {
Expand All @@ -220,7 +236,7 @@ static MessageAndTrace parse(DocumentId documentId, byte[] json) {
throw new ResultParseException(
documentId,
"Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " +
new String(json, UTF_8));
new String(json, UTF_8));

String name;
while ((name = parser.nextFieldName()) != null) {
Expand All @@ -232,14 +248,20 @@ static MessageAndTrace parse(DocumentId documentId, byte[] json) {
if (parser.nextToken() != JsonToken.START_ARRAY)
throw new ResultParseException(documentId,
"Expected 'trace' to be an array, but got '" + parser.currentToken() + "' in: " +
new String(json, UTF_8));
int start = (int) parser.currentTokenLocation().getByteOffset();
new String(json, UTF_8));
int start = (int) parser.currentTokenLocation()
.getByteOffset();
int depth = 1;
while (depth > 0) switch (parser.nextToken()) {
case START_ARRAY: ++depth; break;
case END_ARRAY: --depth; break;
case START_ARRAY:
++depth;
break;
case END_ARRAY:
--depth;
break;
}
int end = (int) parser.currentTokenLocation().getByteOffset() + 1;
int end = (int) parser.currentTokenLocation()
.getByteOffset() + 1;
trace = new String(json, start, end - start, UTF_8);
break;
default:
Expand All @@ -252,9 +274,8 @@ static MessageAndTrace parse(DocumentId documentId, byte[] json) {
throw new ResultParseException(
documentId,
"Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: "
+ new String(json, UTF_8));
}
catch (IOException e) {
+ new String(json, UTF_8));
} catch (IOException e) {
throw new ResultParseException(documentId, e);
}

Expand All @@ -264,12 +285,20 @@ static MessageAndTrace parse(DocumentId documentId, byte[] json) {
static Result toResult(HttpRequest request, HttpResponse response, DocumentId documentId) {
Outcome outcome;
switch (response.code()) {
case 200: outcome = Outcome.success; break;
case 412: outcome = Outcome.conditionNotMet; break;
case 200:
outcome = Outcome.success;
break;
case 412:
outcome = Outcome.conditionNotMet;
break;
case 502:
case 504:
case 507: outcome = Outcome.vespaFailure; break;
default: outcome = Outcome.transportFailure; break;
case 507:
outcome = Outcome.vespaFailure;
break;
default:
outcome = Outcome.transportFailure;
break;
}

MessageAndTrace mat = parse(documentId, response.body());
Expand All @@ -292,15 +321,17 @@ static String getPath(DocumentId documentId) {
path.add("v1");
path.add(encode(documentId.namespace()));
path.add(encode(documentId.documentType()));
if (documentId.number().isPresent()) {
if (documentId.number()
.isPresent()) {
path.add("number");
path.add(Long.toUnsignedString(documentId.number().getAsLong()));
}
else if (documentId.group().isPresent()) {
path.add(Long.toUnsignedString(documentId.number()
.getAsLong()));
} else if (documentId.group()
.isPresent()) {
path.add("group");
path.add(encode(documentId.group().get()));
}
else {
path.add(encode(documentId.group()
.get()));
} else {
path.add("docid");
}
path.add(encode(documentId.userSpecific()));
Expand All @@ -311,23 +342,27 @@ else if (documentId.group().isPresent()) {
static String encode(String raw) {
try {
return URLEncoder.encode(raw, UTF_8.name());
}
catch (UnsupportedEncodingException e) {
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e);
}
}

static String getQuery(OperationParameters params, boolean speedTest) {
StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue("");
if (params.createIfNonExistent()) query.add("create=true");
params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition)));
params.route().ifPresent(route -> query.add("route=" + encode(route)));
params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel));
params.testAndSetCondition()
.ifPresent(condition -> query.add("condition=" + encode(condition)));
params.route()
.ifPresent(route -> query.add("route=" + encode(route)));
params.tracelevel()
.ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel));
if (speedTest) query.add("dryRun=true");
return query.toString();
}

/** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */
/**
* Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster.
*/
interface ClusterFactory {

Cluster create() throws IOException;
Expand Down

0 comments on commit 4c922a8

Please sign in to comment.