Skip to content

Commit

Permalink
Use payloadBodyAndTrailers() to drain payload body (#941)
Browse files Browse the repository at this point in the history
Motivation:

In some cases we need to consume payload body and trailers.
We usually use `payloadBody().ignoreElements()` pattern.
However, since the request or response comes from the network,
it's more efficient to do `payloadBodyAndTrailers().ignoreElements()`
to avoid splitting trailers from payload body.

Modifications:

- Replace all `payloadBody().ignoreElements()` usages with
`payloadBodyAndTrailers().ignoreElements()`;
- Do the same in tests for consistency;
- Update `performance.adoc` to demonstrate `payloadBodyAndTrailers`
for the draining use-case;

Result:

Less overhead when we need to drain payload body.
  • Loading branch information
idelpivnitskiy authored Feb 18, 2020
1 parent 75f1754 commit f93880e
Show file tree
Hide file tree
Showing 12 changed files with 21 additions and 20 deletions.
4 changes: 2 additions & 2 deletions docs/modules/ROOT/pages/performance.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ control
To avoid these issues, ServiceTalk HTTP servers will automatically drain the request payload content after the response
is sent. However this adds some additional complexity to the HTTP service control flow in ServiceTalk and adds some
overhead. If you know for sure that the payload is always consumed footnote:[typically compose the response with
`request.payloadBody().ignoreElements()`], or you are not using the streaming APIs, this mechanism can be disabled to
save some CPU and memory as follows:
`request.payloadBodyAndTrailers().ignoreElements()`], or you are not using the streaming APIs, this mechanism can be
disabled to save some CPU and memory as follows:

[source, java]
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public void onComplete() {

if (drainRequestPayloadBody) {
responsePublisher = responsePublisher.concat(defer(() -> payloadSubscribed.get() ?
completed() : request.payloadBody().ignoreElements()
completed() : request.payloadBodyAndTrailers().ignoreElements()
// Discarding the request payload body is an operation which should not impact the state of
// request/response processing. It's appropriate to recover from any error here.
// ST may introduce RejectedSubscribeError if user already consumed the request payload body
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt

DeferSslHandler deferSslHandler = channel.pipeline().get(DeferSslHandler.class);
if (deferSslHandler == null) {
return response.payloadBody().ignoreElements().concat(failed(
return response.payloadBodyAndTrailers().ignoreElements().concat(failed(
new IllegalStateException("Failed to find a handler of type " +
DeferSslHandler.class + " in channel pipeline.")));
}
Expand All @@ -112,9 +112,9 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt
// There is no need to apply offloading explicitly (despite completing `processor` on the
// EventLoop) because `payloadBody()` will be offloaded according to the strategy for the
// request.
return response.payloadBody().ignoreElements().concat(fromSource(processor));
return response.payloadBodyAndTrailers().ignoreElements().concat(fromSource(processor));
} else {
return response.payloadBody().ignoreElements().concat(
return response.payloadBodyAndTrailers().ignoreElements().concat(
failed(new ProxyResponseException("Bad response from proxy CONNECT " + connectAddress,
response.status())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void throwMaxConcurrencyExceededOnOversubscribedConnection() throws Excep
try (ServerContext serverContext = HttpServers.forAddress(localAddress(0))
.listenStreamingAndAwait((ctx, request, responseFactory) -> {
Publisher<Buffer> deferredPayload = fromSource(lastRequestFinished).concat(empty());
return request.payloadBody().ignoreElements()
return request.payloadBodyAndTrailers().ignoreElements()
.concat(Single.succeeded(responseFactory.ok().payloadBody(deferredPayload)));
});

Expand Down Expand Up @@ -197,7 +197,7 @@ public void throwConnectionClosedOnConnectionClose() throws Exception {

try (ServerContext serverContext = HttpServers.forAddress(localAddress(0))
.listenStreamingAndAwait((ctx, request, responseFactory) ->
request.payloadBody().ignoreElements().concat(
request.payloadBodyAndTrailers().ignoreElements().concat(
Single.succeeded(responseFactory.ok()
.setHeader(HttpHeaderNames.CONNECTION, "close"))));

Expand Down Expand Up @@ -231,7 +231,7 @@ public void throwConnectionClosedWithCauseOnUnexpectedConnectionClose() throws E
try (ServerContext serverContext = HttpServers.forAddress(localAddress(0))
.socketOption(StandardSocketOptions.SO_LINGER, 0) // Force connection reset on close
.listenStreamingAndAwait((ctx, request, responseFactory) ->
request.payloadBody().ignoreElements()
request.payloadBodyAndTrailers().ignoreElements()
.concat(ctx.closeAsync()) // trigger reset after client is done writing
.concat(Single.never()));
HttpClient client = forResolvedAddress(serverHostAndPort(serverContext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public Single<FilterableStreamingHttpConnection> newConnection(
if (OK.equals(response.status())) {
// In this test we have not enabled pipelining so we drain this response before
// indicating the connection is usable.
return response.payloadBody().ignoreElements().concat(succeeded(cnx));
return response.payloadBodyAndTrailers().ignoreElements().concat(succeeded(cnx));
}
cnx.closeAsync().subscribe();
return failed(new IllegalStateException("failed auth"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private static void makeClientRequestWithId(StreamingHttpClient connection, Stri
request.headers().set(REQUEST_ID_HEADER, requestId);
StreamingHttpResponse response = connection.request(request).toFuture().get();
assertEquals(OK, response.status());
response.payloadBody().ignoreElements().toFuture().get();
response.payloadBodyAndTrailers().ignoreElements().toFuture().get();
}

private static void assertAsyncContext(@Nullable CharSequence requestId, Queue<Throwable> errorQueue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void headRequestContentEmpty() throws Exception {
assertNotNull(contentLength);
assertEquals(expectedContentLength, parseInt(contentLength.toString()));
// Drain the current response content so we will be able to read the next response.
response.payloadBody().ignoreElements().toFuture().get();
response.payloadBodyAndTrailers().ignoreElements().toFuture().get();

response = awaitIndefinitelyNonNull(response2Single);
assertEquals(OK, response.status());
Expand All @@ -117,7 +117,7 @@ public void headRequestContentEmpty() throws Exception {
contentLength = response.headers().get(CONTENT_LENGTH);
assertNotNull(contentLength);
assertEquals(expectedContentLength, parseInt(contentLength.toString()));
response.payloadBody().ignoreElements().toFuture().get();
response.payloadBodyAndTrailers().ignoreElements().toFuture().get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class HttpServerMultipleRequestsTest {
@Test
public void consumeOfRequestBodyDoesNotCloseConnection() throws Exception {
StreamingHttpService service = (ctx, request, responseFactory) -> {
request.payloadBody().ignoreElements().subscribe();
request.payloadBodyAndTrailers().ignoreElements().subscribe();

CharSequence requestId = request.headers().get(REQUEST_ID_HEADER);
if (requestId != null) {
Expand Down Expand Up @@ -130,6 +130,6 @@ private static void makeClientRequestWithId(StreamingHttpConnection connection,
StreamingHttpResponse response = connection.request(request).toFuture().get();
assertEquals(OK, response.status());
assertTrue(request.headers().contains(REQUEST_ID_HEADER, requestId));
response.payloadBody().ignoreElements().subscribe();
response.payloadBodyAndTrailers().ignoreElements().subscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void requestIsAutoDrainedWhenUserFailsToConsume() throws Exception {
@Test
public void requestIsDrainedByUserWithDrainingDisabled() throws Exception {
try (ServerContext serverContext = server(false, (ctx, request, responseFactory) ->
request.payloadBody().ignoreElements() // User consumes payload (ignoring)
request.payloadBodyAndTrailers().ignoreElements() // User consumes payload (ignoring)
.concat(succeeded(responseFactory.ok().payloadBody(from("OK"), textSerializer()))));
BlockingHttpClient client = HttpClients.forSingleAddress(serverHostAndPort(serverContext))
.buildBlocking()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected ServerContext serverWithEmptyAsyncContextService(HttpServerBuilder ser

private static StreamingHttpService newEmptyAsyncContextService() {
return (ctx, request, factory) -> {
request.payloadBody().ignoreElements().subscribe();
request.payloadBodyAndTrailers().ignoreElements().subscribe();

if (!AsyncContext.isEmpty()) {
return succeeded(factory.internalServerError());
Expand Down Expand Up @@ -126,7 +126,7 @@ private Single<StreamingHttpResponse> doHandle(final StreamingHttpRequest reques
// The test doesn't wait until the request body is consumed and only cares when the request is received
// from the client. So we force the server to consume the entire request here which will make sure the
// AsyncContext is as expected while processing the request data in the filter.
return request.payloadBody().ignoreElements()
return request.payloadBodyAndTrailers().ignoreElements()
.concat(defer(() -> {
if (useImmediate && !currentThread().getName().startsWith(IO_THREAD_PREFIX)) {
// verify that if we expect to be offloaded, that we actually are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ public void onSuccess(@Nullable final StreamingHttpResponse result) {
errorRef.compareAndSet(null, new NullPointerException("result == null not expected."));
cancelledLatch.countDown();
} else {
result.payloadBody().ignoreElements().afterFinally(cancelledLatch::countDown).subscribe();
result.payloadBodyAndTrailers().ignoreElements().afterFinally(cancelledLatch::countDown)
.subscribe();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void onSuccess(@Nullable final StreamingHttpResponse result) {
result.headers().get(LOCATION), redirectSingle.originalRequest);
}
// Consume any payload of the redirect response
toSource(result.payloadBody().ignoreElements().concat(
toSource(result.payloadBodyAndTrailers().ignoreElements().concat(
redirectSingle.requester.request(redirectSingle.strategy, newRequest)))
.subscribe(new RedirectSubscriber(
target, redirectSingle, newRequest, redirectCount + 1, sequentialCancellable));
Expand Down

0 comments on commit f93880e

Please sign in to comment.