Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle text/* in response content type, so we can show message for e.… #33284

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.fasterxml.jackson.core.StreamReadConstraints;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.time.Duration;
Expand Down Expand Up @@ -48,7 +47,9 @@ class HttpFeedClient implements FeedClient {

private static final Duration maxTimeout = Duration.ofMinutes(15);
private static final JsonFactory jsonParserFactory = new JsonFactoryBuilder()
.streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build())
.streamReadConstraints(StreamReadConstraints.builder()
.maxStringLength(Integer.MAX_VALUE)
.build())
.build();

private final Map<String, Supplier<String>> requestHeaders;
Expand Down Expand Up @@ -122,7 +123,8 @@ private CompletableFuture<Result> send(String method, DocumentId documentId, Str
getQuery(params, speedTest),
requestHeaders,
operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way?
params.timeout().orElse(maxTimeout),
params.timeout()
.orElse(maxTimeout),
nanoClock);

CompletableFuture<Result> promise = new CompletableFuture<>();
Expand All @@ -134,8 +136,7 @@ private CompletableFuture<Result> send(String method, DocumentId documentId, Str
thrown = thrown.getCause();

promise.completeExceptionally(thrown);
}
else
} else
promise.complete(result);
});
return promise;
Expand All @@ -155,61 +156,76 @@ private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clus
cluster.dispatch(request, future);
HttpResponse response = future.get(20, TimeUnit.SECONDS);
if (response.code() != 200) {
String message;
if (response.body() != null) switch (response.contentType()) {
case "application/json": message = parseMessage(response.body()); break;
case "text/plain": message = new String(response.body(), UTF_8); break;
default: message = response.toString(); break;
}
else message = response.toString();

String message = getMessageFromResponse(response);
// Old server ignores ?dryRun=true, but getting this particular error message means everything else is OK.
if (response.code() == 400 && "Could not read document, no document?".equals(message)) {
if (builder.speedTest) throw new FeedException("server does not support speed test; upgrade to a newer version");
if (builder.speedTest)
throw new FeedException("server does not support speed test; upgrade to a newer version");
return;
}
throw new FeedException("server responded non-OK to handshake: " + message);
}
}
catch (ExecutionException e) {
} catch (ExecutionException e) {
Duration duration = Duration.between(start, Instant.now());
throw new FeedException("failed handshake with server after " + duration + ": " + e.getCause(), e.getCause());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
throw new FeedException("interrupted during handshake with server", e);
}
catch (TimeoutException e) {
} catch (TimeoutException e) {
throw new FeedException("timed out during handshake with server", e);
}
}

private static String getMessageFromResponse(HttpResponse response) {
byte[] body = response.body();
if (body == null) return response.toString();

String message;
String contentType = response.contentType();
if (contentType.equals("application/json"))
message = parseMessage(body);
else if (contentType.startsWith("text/"))
message = new String(body, UTF_8);
else
message = response.toString();

return message;
}

private static String parseMessage(byte[] json) {
try {
return parse(null, json).message;
}
catch (Exception e) {
} catch (Exception e) {
return new String(json, UTF_8);
}
}

private enum Outcome { success, conditionNotMet, vespaFailure, transportFailure };
private enum Outcome {success, conditionNotMet, vespaFailure, transportFailure}

;

static Result.Type toResultType(Outcome outcome) {
switch (outcome) {
case success: return Result.Type.success;
case conditionNotMet: return Result.Type.conditionNotMet;
default: throw new IllegalArgumentException("No corresponding result type for '" + outcome + "'");
case success:
return Result.Type.success;
case conditionNotMet:
return Result.Type.conditionNotMet;
default:
throw new IllegalArgumentException("No corresponding result type for '" + outcome + "'");
}
}

private static class MessageAndTrace {

final String message;
final String trace;

MessageAndTrace(String message, String trace) {
this.message = message;
this.trace = trace;
}

}

static MessageAndTrace parse(DocumentId documentId, byte[] json) {
Expand All @@ -220,7 +236,7 @@ static MessageAndTrace parse(DocumentId documentId, byte[] json) {
throw new ResultParseException(
documentId,
"Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " +
new String(json, UTF_8));
new String(json, UTF_8));

String name;
while ((name = parser.nextFieldName()) != null) {
Expand All @@ -232,14 +248,20 @@ static MessageAndTrace parse(DocumentId documentId, byte[] json) {
if (parser.nextToken() != JsonToken.START_ARRAY)
throw new ResultParseException(documentId,
"Expected 'trace' to be an array, but got '" + parser.currentToken() + "' in: " +
new String(json, UTF_8));
int start = (int) parser.currentTokenLocation().getByteOffset();
new String(json, UTF_8));
int start = (int) parser.currentTokenLocation()
.getByteOffset();
int depth = 1;
while (depth > 0) switch (parser.nextToken()) {
case START_ARRAY: ++depth; break;
case END_ARRAY: --depth; break;
case START_ARRAY:
++depth;
break;
case END_ARRAY:
--depth;
break;
}
int end = (int) parser.currentTokenLocation().getByteOffset() + 1;
int end = (int) parser.currentTokenLocation()
.getByteOffset() + 1;
trace = new String(json, start, end - start, UTF_8);
break;
default:
Expand All @@ -252,9 +274,8 @@ static MessageAndTrace parse(DocumentId documentId, byte[] json) {
throw new ResultParseException(
documentId,
"Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: "
+ new String(json, UTF_8));
}
catch (IOException e) {
+ new String(json, UTF_8));
} catch (IOException e) {
throw new ResultParseException(documentId, e);
}

Expand All @@ -264,12 +285,20 @@ static MessageAndTrace parse(DocumentId documentId, byte[] json) {
static Result toResult(HttpRequest request, HttpResponse response, DocumentId documentId) {
Outcome outcome;
switch (response.code()) {
case 200: outcome = Outcome.success; break;
case 412: outcome = Outcome.conditionNotMet; break;
case 200:
outcome = Outcome.success;
break;
case 412:
outcome = Outcome.conditionNotMet;
break;
case 502:
case 504:
case 507: outcome = Outcome.vespaFailure; break;
default: outcome = Outcome.transportFailure; break;
case 507:
outcome = Outcome.vespaFailure;
break;
default:
outcome = Outcome.transportFailure;
break;
}

MessageAndTrace mat = parse(documentId, response.body());
Expand All @@ -292,15 +321,17 @@ static String getPath(DocumentId documentId) {
path.add("v1");
path.add(encode(documentId.namespace()));
path.add(encode(documentId.documentType()));
if (documentId.number().isPresent()) {
if (documentId.number()
.isPresent()) {
path.add("number");
path.add(Long.toUnsignedString(documentId.number().getAsLong()));
}
else if (documentId.group().isPresent()) {
path.add(Long.toUnsignedString(documentId.number()
.getAsLong()));
} else if (documentId.group()
.isPresent()) {
path.add("group");
path.add(encode(documentId.group().get()));
}
else {
path.add(encode(documentId.group()
.get()));
} else {
path.add("docid");
}
path.add(encode(documentId.userSpecific()));
Expand All @@ -311,23 +342,27 @@ else if (documentId.group().isPresent()) {
static String encode(String raw) {
try {
return URLEncoder.encode(raw, UTF_8.name());
}
catch (UnsupportedEncodingException e) {
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e);
}
}

static String getQuery(OperationParameters params, boolean speedTest) {
StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue("");
if (params.createIfNonExistent()) query.add("create=true");
params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition)));
params.route().ifPresent(route -> query.add("route=" + encode(route)));
params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel));
params.testAndSetCondition()
.ifPresent(condition -> query.add("condition=" + encode(condition)));
params.route()
.ifPresent(route -> query.add("route=" + encode(route)));
params.tracelevel()
.ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel));
if (speedTest) query.add("dryRun=true");
return query.toString();
}

/** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */
/**
* Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster.
*/
interface ClusterFactory {

Cluster create() throws IOException;
Expand Down
Loading