Skip to content

Commit

Permalink
fix(http): ensure HTTP response body streams are not prematurely clos…
Browse files Browse the repository at this point in the history
…ed (#482)
  • Loading branch information
andrewazores authored Sep 25, 2024
1 parent ef87143 commit b3d11ce
Showing 1 changed file with 32 additions and 22 deletions.
54 changes: 32 additions & 22 deletions src/main/java/io/cryostat/agent/CryostatClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ public CompletableFuture<Boolean> checkRegistration(PluginInfo pluginInfo) {
+ "?token="
+ pluginInfo.getToken()));
log.trace("{}", req);
return supply(req, (res) -> logResponse(req, res)).thenApply(this::isOkStatus);
return supply(req, (res) -> logResponse(req, res))
.thenApply(this::isOkStatus)
.whenComplete((v, t) -> req.reset());
}

public CompletableFuture<PluginInfo> register(
Expand Down Expand Up @@ -174,7 +176,8 @@ public CompletableFuture<PluginInfo> register(
log.error("Unable to parse response as JSON", e);
throw new RegistrationException(e);
}
});
})
.whenComplete((v, t) -> req.reset());
} catch (JsonProcessingException e) {
return CompletableFuture.failedFuture(e);
}
Expand Down Expand Up @@ -209,7 +212,8 @@ public CompletableFuture<Integer> submitCredentialsIfRequired(
return CompletableFuture.completedFuture(prevId);
}
return submitCredentials(prevId, credentials, callback);
});
})
.whenComplete((v, t) -> req.reset());
}

private CompletableFuture<Integer> queryExistingCredentials(URI callback) {
Expand Down Expand Up @@ -254,7 +258,8 @@ private CompletableFuture<Integer> queryExistingCredentials(URI callback) {
selfMatchExpression(callback)))
.map(sc -> sc.id)
.findFirst()
.orElse(-1));
.orElse(-1))
.whenComplete((v, t) -> req.reset());
}

private CompletableFuture<Integer> submitCredentials(
Expand Down Expand Up @@ -317,7 +322,8 @@ private CompletableFuture<Integer> submitCredentials(
location.substring(
location.lastIndexOf('/') + 1, location.length());
return Integer.valueOf(id);
});
})
.whenComplete((v, t) -> req.reset());
}

public CompletableFuture<Void> deleteCredentials(int id) {
Expand All @@ -326,7 +332,9 @@ public CompletableFuture<Void> deleteCredentials(int id) {
}
HttpDelete req = new HttpDelete(baseUri.resolve(CREDENTIALS_API_PATH + "/" + id));
log.trace("{}", req);
return supply(req, (res) -> logResponse(req, res)).thenApply(res -> null);
return supply(req, (res) -> logResponse(req, res))
.whenComplete((v, t) -> req.reset())
.thenApply(res -> null);
}

public CompletableFuture<Void> deregister(PluginInfo pluginInfo) {
Expand All @@ -341,6 +349,7 @@ public CompletableFuture<Void> deregister(PluginInfo pluginInfo) {
log.trace("{}", req);
return supply(req, (res) -> logResponse(req, res))
.thenApply(res -> assertOkStatus(req, res))
.whenComplete((v, t) -> req.reset())
.thenApply(res -> null);
}

Expand All @@ -362,6 +371,7 @@ public CompletableFuture<Void> update(
log.trace("{}", req);
return supply(req, (res) -> logResponse(req, res))
.thenApply(res -> assertOkStatus(req, res))
.whenComplete((v, t) -> req.reset())
.thenApply(res -> null);
} catch (JsonProcessingException e) {
return CompletableFuture.failedFuture(e);
Expand Down Expand Up @@ -432,20 +442,21 @@ public CompletableFuture<Void> upload(
.build());
req.setEntity(entityBuilder.build());
return supply(
req,
(res) -> {
Instant finish = Instant.now();
log.trace(
"{} {} ({} -> {}): {}/{}",
req.getMethod(),
res.getStatusLine().getStatusCode(),
fileName,
req.getURI(),
FileUtils.byteCountToDisplaySize(is.getByteCount()),
Duration.between(start, finish));
assertOkStatus(req, res);
return (Void) null;
});
req,
(res) -> {
Instant finish = Instant.now();
log.trace(
"{} {} ({} -> {}): {}/{}",
req.getMethod(),
res.getStatusLine().getStatusCode(),
fileName,
req.getURI(),
FileUtils.byteCountToDisplaySize(is.getByteCount()),
Duration.between(start, finish));
assertOkStatus(req, res);
return (Void) null;
})
.whenComplete((v, t) -> req.reset());
}

private HttpResponse logResponse(HttpRequestBase req, HttpResponse res) {
Expand All @@ -460,8 +471,7 @@ private <T> CompletableFuture<T> supply(HttpRequestBase req, Function<HttpRespon
// it responds with an auth challenge, and then send the auth information we have, and use
// the client auth cache. This flow is supported for Bearer tokens in httpclient 5.
authorizationSupplier.get().ifPresent(v -> req.addHeader(HttpHeaders.AUTHORIZATION, v));
return CompletableFuture.supplyAsync(() -> fn.apply(executeQuiet(req)), executor)
.whenComplete((v, t) -> req.reset());
return CompletableFuture.supplyAsync(() -> fn.apply(executeQuiet(req)), executor);
}

private HttpResponse executeQuiet(HttpUriRequest req) {
Expand Down

0 comments on commit b3d11ce

Please sign in to comment.