Skip to content

Commit

Permalink
Expect:100-continue fixes for Netty
Browse files Browse the repository at this point in the history
Signed-off-by: Maxim Nesen <[email protected]>
  • Loading branch information
senivam committed Oct 12, 2023
1 parent cbda4fc commit dbbf057
Show file tree
Hide file tree
Showing 12 changed files with 322 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.ClientRequest;
import org.glassfish.jersey.client.RequestEntityProcessing;
import org.glassfish.jersey.client.internal.ConnectorExtension;

import javax.ws.rs.HttpMethod;
import java.io.IOException;
import java.net.ProtocolException;

Expand All @@ -47,8 +47,9 @@ public void invoke(ClientRequest request, HttpRequest extensionParam) {
final boolean allowStreaming = length > expectContinueSizeThreshold
|| entityProcessing == RequestEntityProcessing.CHUNKED;

if (!Boolean.TRUE.equals(expectContinueActivated)
|| !(HttpMethod.POST.equals(request.getMethod()) || HttpMethod.PUT.equals(request.getMethod()))
if (extensionParam.protocolVersion().equals(HttpVersion.HTTP_1_0)
|| !Boolean.TRUE.equals(expectContinueActivated)
|| !request.hasEntity()
|| !allowStreaming) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.netty.connector;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import org.glassfish.jersey.client.ClientRequest;

import javax.ws.rs.ProcessingException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class JerseyExpectContinueHandler extends ChannelInboundHandlerAdapter {

private boolean isExpected;

private static final List<HttpResponseStatus> statusesToBeConsidered = Arrays.asList(HttpResponseStatus.CONTINUE,
HttpResponseStatus.UNAUTHORIZED, HttpResponseStatus.EXPECTATION_FAILED,
HttpResponseStatus.METHOD_NOT_ALLOWED, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);

private CompletableFuture<HttpResponseStatus> expectedFuture = new CompletableFuture<>();

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (isExpected && msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;
if (statusesToBeConsidered.contains(response.status())) {
expectedFuture.complete(response.status());
}
if (!HttpResponseStatus.CONTINUE.equals(response.status())) {
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
} else {
ctx.pipeline().remove(this);
}
} else {
if (!isExpected) {
ctx.pipeline().remove(this);
}
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
}
}

CompletableFuture<HttpResponseStatus> processExpect100ContinueRequest(HttpRequest nettyRequest,
ClientRequest jerseyRequest,
Channel ch,
Integer timeout)
throws InterruptedException, ExecutionException, TimeoutException {
//check for 100-Continue presence/availability
final Expect100ContinueConnectorExtension expect100ContinueExtension
= new Expect100ContinueConnectorExtension();

final DefaultFullHttpRequest nettyRequestHeaders =
new DefaultFullHttpRequest(nettyRequest.protocolVersion(), nettyRequest.method(), nettyRequest.uri());
nettyRequestHeaders.headers().setAll(nettyRequest.headers());
//If Expect:100-continue feature is enabled and client supports it, the nettyRequestHeaders will be
//enriched with the 'Expect:100-continue' header.
expect100ContinueExtension.invoke(jerseyRequest, nettyRequestHeaders);

final ChannelFuture expect100ContinueFuture = (HttpUtil.is100ContinueExpected(nettyRequestHeaders))
// Send only head of the HTTP request enriched with Expect:100-continue header.
? ch.writeAndFlush(nettyRequestHeaders)
// Expect:100-Continue either is not supported or is turned off
: null;
isExpected = expect100ContinueFuture != null;
if (!isExpected) {
ch.pipeline().remove(this);
} else {
final HttpResponseStatus status = expectedFuture
.get(timeout, TimeUnit.MILLISECONDS);

processExpectationStatus(status);
}
return expectedFuture;
}

private void processExpectationStatus(HttpResponseStatus status)
throws TimeoutException {
if (!statusesToBeConsidered.contains(status)) {
throw new ProcessingException(LocalizationMessages
.UNEXPECTED_VALUE_FOR_EXPECT_100_CONTINUE_STATUSES(status.code()), null);
}
if (!expectedFuture.isDone() || HttpResponseStatus.EXPECTATION_FAILED.equals(status)) {
isExpected = false;
throw new TimeoutException(); // continue without expectations
}
if (!HttpResponseStatus.CONTINUE.equals(status)) {
throw new ProcessingException(LocalizationMessages
.UNEXPECTED_VALUE_FOR_EXPECT_100_CONTINUE_STATUSES(status.code()), null);
}
}

boolean isExpected() {
return isExpected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,21 @@ public class NettyClientProperties {
*/
public static final String PRESERVE_METHOD_ON_REDIRECT = "jersey.config.client.redirect.preserve.method";


/**
* This timeout is used for waiting for 100-Continue response when 100-Continue is sent by the client.
*
* @since 2.41
*/
public static final String
EXPECT_100_CONTINUE_TIMEOUT = "jersey.config.client.request.expect.100.continue.timeout";

/**
* The default value of EXPECT_100_CONTINUE_TIMEOUT.
*
* @since 2.41
*/
public static final Integer
DEFAULT_EXPECT_100_CONTINUE_TIMEOUT_VALUE = 500;

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import javax.net.ssl.SSLContext;
Expand All @@ -46,7 +48,6 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -124,6 +125,7 @@ class NettyConnector implements Connector {
private static final String PRUNE_INACTIVE_POOL = "prune_inactive_pool";
private static final String READ_TIMEOUT_HANDLER = "read_timeout_handler";
private static final String REQUEST_HANDLER = "request_handler";
private static final String EXPECT_100_CONTINUE_HANDLER = "expect_100_continue_handler";

NettyConnector(Client client) {

Expand Down Expand Up @@ -190,6 +192,9 @@ public Future<?> apply(final ClientRequest jerseyRequest, final AsyncConnectorCa
protected void execute(final ClientRequest jerseyRequest, final Set<URI> redirectUriHistory,
final CompletableFuture<ClientResponse> responseAvailable) {
Integer timeout = jerseyRequest.resolveProperty(ClientProperties.READ_TIMEOUT, 0);
final Integer expect100ContinueTimeout = jerseyRequest.resolveProperty(
NettyClientProperties.EXPECT_100_CONTINUE_TIMEOUT,
NettyClientProperties.DEFAULT_EXPECT_100_CONTINUE_TIMEOUT_VALUE);
if (timeout == null || timeout < 0) {
throw new ProcessingException(LocalizationMessages.WRONG_READ_TIMEOUT(timeout));
}
Expand Down Expand Up @@ -321,9 +326,11 @@ protected void initChannel(SocketChannel ch) throws Exception {
final Channel ch = chan;
JerseyClientHandler clientHandler =
new JerseyClientHandler(jerseyRequest, responseAvailable, responseDone, redirectUriHistory, this);
final JerseyExpectContinueHandler expect100ContinueHandler = new JerseyExpectContinueHandler();
// read timeout makes sense really as an inactivity timeout
ch.pipeline().addLast(READ_TIMEOUT_HANDLER,
new IdleStateHandler(0, 0, timeout, TimeUnit.MILLISECONDS));
ch.pipeline().addLast(EXPECT_100_CONTINUE_HANDLER, expect100ContinueHandler);
ch.pipeline().addLast(REQUEST_HANDLER, clientHandler);

responseDone.whenComplete((_r, th) -> {
Expand Down Expand Up @@ -408,26 +415,22 @@ public void operationComplete(io.netty.util.concurrent.Future<? super Void> futu
// // Set later after the entity is "written"
// break;
}
try {
expect100ContinueHandler.processExpect100ContinueRequest(nettyRequest, jerseyRequest,
ch, expect100ContinueTimeout);
} catch (ExecutionException e) {
responseDone.completeExceptionally(e);
} catch (TimeoutException e) {
//Expect:100-continue allows timeouts by the spec
//just removing the pipeline from processing
if (ch.pipeline().context(JerseyExpectContinueHandler.class) != null) {
ch.pipeline().remove(EXPECT_100_CONTINUE_HANDLER);
}
}

//check for 100-Continue presence/availability
final Expect100ContinueConnectorExtension expect100ContinueExtension
= new Expect100ContinueConnectorExtension();

final DefaultFullHttpRequest rq = new DefaultFullHttpRequest(nettyRequest.protocolVersion(),
nettyRequest.method(), nettyRequest.uri());
rq.headers().setAll(nettyRequest.headers());
expect100ContinueExtension.invoke(jerseyRequest, rq);

ChannelFutureListener expect100ContinueListener = null;
ChannelFuture expect100ContinueFuture = null;

if (HttpUtil.is100ContinueExpected(rq)) {
expect100ContinueListener =
future -> ch.pipeline().writeAndFlush(nettyRequest);
expect100ContinueFuture = ch.pipeline().writeAndFlush(rq).sync().awaitUninterruptibly()
.addListener(expect100ContinueListener);
} else {
// Send the HTTP request.
if (!expect100ContinueHandler.isExpected()) {
// Send the HTTP request. Expect:100-continue processing is not applicable
// in this case.
entityWriter.writeAndFlush(nettyRequest);
}

Expand All @@ -443,9 +446,6 @@ public OutputStream getOutputStream(int contentLength) throws IOException {
} else {
entityWriter.write(entityWriter.getChunkedInput());
}
if (expect100ContinueFuture != null && expect100ContinueListener != null) {
expect100ContinueFuture.removeListener(expect100ContinueListener);
}

executorService.execute(new Runnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2016, 2022 Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -22,3 +22,4 @@ redirect.no.location="Received redirect that does not contain a location or the
redirect.error.determining.location="Error determining redirect location: ({0})."
redirect.infinite.loop="Infinite loop in chained redirects detected."
redirect.limit.reached="Max chained redirect limit ({0}) exceeded."
unexpected.value.for.expect.100.continue.statuses=Unexpected value: ("{0}").
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -20,6 +20,7 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerExpectContinueHandler;
import io.netty.handler.codec.http2.Http2MultiplexCodecBuilder;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
Expand Down Expand Up @@ -55,6 +56,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) thr

if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
ctx.pipeline().addLast(new HttpServerCodec(),
new HttpServerExpectContinueHandler(),
new ChunkedWriteHandler(),
new JerseyServerHandler(baseUri, container, resourceConfig));
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -76,10 +76,6 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
final HttpRequest req = (HttpRequest) msg;

if (HttpUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
}

nettyInputStream.clear(); // clearing the content - possible leftover from previous request processing.
final ContainerRequest requestContext = createContainerRequest(ctx, req);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void initChannel(SocketChannel ch) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpServerCodec());
p.addLast(new HttpServerExpectContinueHandler());
p.addLast(new ChunkedWriteHandler());
p.addLast(new JerseyServerHandler(baseUri, container, resourceConfig));
}
Expand Down
14 changes: 13 additions & 1 deletion docs/src/main/docbook/appendix-properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@
<para>
Property for threshold size for content length after which Expect:100-Continue header would be applied
before the main request.
Default threshold size (64kb) after which which Expect:100-Continue header would be applied before
Default threshold size (64kb) after which Expect:100-Continue header would be applied before
the main request.
<literal>Since 2.32</literal>
</para>
Expand Down Expand Up @@ -2062,6 +2062,18 @@
</para>
</entry>
</row>
<row>
<entry>&jersey.netty.NettyClientProperties.EXPECT_100_CONTINUE_TIMEOUT;</entry>
<entry><literal>jersey.config.client.request.expect.100.continue.timeout</literal></entry>
<entry>
<para>
This timeout is used for waiting for 100-Continue response when 100-Continue
is sent by the client.
Default timeout value is 500 ms after which Expect:100-Continue feature is ignored.
<literal>Since 2.41</literal>
</para>
</entry>
</row>
</tbody>
</tgroup>
</table>
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/docbook/jersey.ent
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@
<!ENTITY jersey.netty.NettyClientProperties.MAX_CONNECTIONS_TOTAL "<link xlink:href='&jersey.javadoc.uri.prefix;/netty/connector/NettyClientProperties.html#MAX_CONNECTIONS_TOTAL'>NettyClientProperties.MAX_CONNECTIONS_TOTAL</link>" >
<!ENTITY jersey.netty.NettyClientProperties.MAX_REDIRECTS "<link xlink:href='&jersey.javadoc.uri.prefix;/netty/connector/NettyClientProperties.html#MAX_REDIRECTS'>NettyClientProperties.MAX_REDIRECTS</link>" >
<!ENTITY jersey.netty.NettyClientProperties.PRESERVE_METHOD_ON_REDIRECT "<link xlink:href='&jersey.javadoc.uri.prefix;/netty/connector/NettyClientProperties.html#PRESERVE_METHOD_ON_REDIRECT'>NettyClientProperties.PRESERVE_METHOD_ON_REDIRECT</link>" >
<!ENTITY jersey.netty.NettyClientProperties.EXPECT_100_CONTINUE_TIMEOUT "<link xlink:href='&jersey.javadoc.uri.prefix;/netty/connector/NettyClientProperties.html#EXPECT_100_CONTINUE_TIMEOUT'>NettyClientProperties.EXPECT_100_CONTINUE_TIMEOUT</link>" >
<!ENTITY jersey.netty.NettyConnectorProvider "<link xlink:href='&jersey.javadoc.uri.prefix;/netty/connector/NettyConnectorProvider.html'>NettyConnectorProvider</link>">
<!ENTITY jersey.server.ApplicationHandler "<link xlink:href='&jersey.javadoc.uri.prefix;/server/ApplicationHandler.html'>ApplicationHandler</link>">
<!ENTITY jersey.server.BackgroundScheduler "<link xlink:href='&jersey.javadoc.uri.prefix;/server/BackgroundScheduler.html'>@BackgroundScheduler</link>">
Expand Down
Loading

0 comments on commit dbbf057

Please sign in to comment.