Skip to content

Commit

Permalink
Fix a regression where response exceptions are logged unintentionally (
Browse files Browse the repository at this point in the history
…#6035)

Motivation:

We've received reports that the following log message is printed, and also exposed when completing the response exceptionally.
```
Unexpected exception while closing a request:
com.linecorp.armeria.client.UnprocessedRequestException: com.linecorp.armeria.client.GoAwayReceivedException
    at com.linecorp.armeria.client.UnprocessedRequestException.of(UnprocessedRequestException.java:45)
    at com.linecorp.armeria.client.Http2ResponseDecoder.onStreamClosed(Http2ResponseDecoder.java:146)
```

It seems like this is a regression of the refactoring done at: https://github.com/line/armeria/pull/5800/files#diff-d82983b1c20c5f80257da8c39bb74a80d49e81efceb7a1ce63847776de9b40a9R255

I propose that if an exception completes the corresponding `HttpResponse` and `RequestLog`, then we don't log the exception in `HttpResponseWrapper`

Modifications:

- `boolean DefaultStreamMessage#tryClose` has been added which signals whether the call closed the `StreamMessage`
- the `CancelledSubscriptionException` check has been moved to `tryClose` for more consistent behavior
- If `HttpResponseWrapper#closeAction` actually closed the response, then return early so that a log isn't printed
- Added a test as a separate module to avoid flakiness due to parallel test runs

Result:

- Unnecessary logs aren't left from `HttpResponseWrapper`

<!--
Visit this URL to learn more about how to write a pull request description:
https://armeria.dev/community/developer-guide#how-to-write-pull-request-description
-->
  • Loading branch information
jrhee17 authored Feb 11, 2025
1 parent 806800f commit e60e623
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpObject;
Expand Down Expand Up @@ -58,6 +60,8 @@ class HttpResponseWrapper implements StreamWriter<HttpObject> {
private final EventLoop eventLoop;
private final ClientRequestContext ctx;
private final long maxContentLength;
@VisibleForTesting
static final String UNEXPECTED_EXCEPTION_MSG = "Unexpected exception while closing a request";

private boolean responseStarted;
private long contentLengthHeaderValue = -1;
Expand Down Expand Up @@ -232,14 +236,16 @@ void close(@Nullable Throwable cause, boolean cancel) {
requestAutoAbortDelayMillis, TimeUnit.MILLISECONDS);
}

private void closeAction(@Nullable Throwable cause) {
private boolean closeAction(@Nullable Throwable cause) {
final boolean closed;
if (cause != null) {
delegate.close(cause);
closed = delegate.tryClose(cause);
ctx.logBuilder().endResponse(cause);
} else {
delegate.close();
closed = delegate.tryClose();
ctx.logBuilder().endResponse();
}
return closed;
}

private void cancelAction(@Nullable Throwable cause) {
Expand All @@ -262,8 +268,10 @@ private void cancelTimeoutAndLog(@Nullable Throwable cause, boolean cancel) {
cancelAction(cause);
return;
}
if (delegate.isOpen()) {
closeAction(cause);

// don't log if the cause will be exposed via the response/log
if (delegate.isOpen() && closeAction(cause)) {
return;
}

// the context has been cancelled either by timeout or by user invocation
Expand All @@ -275,7 +283,7 @@ private void cancelTimeoutAndLog(@Nullable Throwable cause, boolean cancel) {
return;
}

final StringBuilder logMsg = new StringBuilder("Unexpected exception while closing a request");
final StringBuilder logMsg = new StringBuilder(UNEXPECTED_EXCEPTION_MSG);
final HttpRequest request = ctx.request();
assert request != null;
final String authority = request.authority();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,28 +441,41 @@ private void handleCloseEvent(SubscriptionImpl subscription, CloseEvent o) {

@Override
public void close() {
if (setState(State.OPEN, State.CLOSED)) {
addObjectOrEvent(SUCCESSFUL_CLOSE);
}
tryClose();
}

@Override
public final void close(Throwable cause) {
requireNonNull(cause, "cause");
if (cause instanceof CancelledSubscriptionException) {
throw new IllegalArgumentException("cause: " + cause + " (must use Subscription.cancel())");
}

tryClose(cause);
}

/**
* Tries to close the stream.
*
* @return {@code true} if the stream has been closed by this method call.
* {@code false} if the stream has been closed already by another party.
*/
@UnstableApi
public final boolean tryClose() {
if (setState(State.OPEN, State.CLOSED)) {
addObjectOrEvent(SUCCESSFUL_CLOSE);
return true;
}
return false;
}

/**
* Tries to close the stream with the specified {@code cause}.
*
* @return {@code true} if the stream has been closed by this method call.
* {@code false} if the stream has been closed already by other party.
* {@code false} if the stream has been closed already by another party.
*/
public final boolean tryClose(Throwable cause) {
if (cause instanceof CancelledSubscriptionException) {
throw new IllegalArgumentException("cause: " + cause + " (must use Subscription.cancel())");
}
if (setState(State.OPEN, State.CLOSED)) {
addObjectOrEvent(new CloseEvent(cause));
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
*/
package com.linecorp.armeria.client;

import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static com.linecorp.armeria.internal.testing.Http2ByteUtil.handleInitialExchange;
import static com.linecorp.armeria.internal.testing.Http2ByteUtil.newClientFactory;
import static com.linecorp.armeria.internal.testing.Http2ByteUtil.readFrame;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
Expand All @@ -31,14 +32,9 @@
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.google.common.io.ByteStreams;

import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.testing.junit5.common.EventLoopExtension;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2FrameTypes;

@Timeout(10)
Expand All @@ -53,7 +49,7 @@ class Http2GoAwayTest {
@Test
void streamEndsBeforeGoAway() throws Exception {
try (ServerSocket ss = new ServerSocket(0);
ClientFactory clientFactory = newClientFactory()) {
ClientFactory clientFactory = newClientFactory(eventLoop.get())) {

final int port = ss.getLocalPort();

Expand Down Expand Up @@ -101,7 +97,7 @@ void streamEndsBeforeGoAway() throws Exception {
@Test
void streamEndsAfterGoAway() throws Exception {
try (ServerSocket ss = new ServerSocket(0);
ClientFactory clientFactory = newClientFactory()) {
ClientFactory clientFactory = newClientFactory(eventLoop.get())) {

final int port = ss.getLocalPort();

Expand Down Expand Up @@ -150,7 +146,7 @@ void streamEndsAfterGoAway() throws Exception {
@Test
void streamGreaterThanLastStreamId() throws Exception {
try (ServerSocket ss = new ServerSocket(0);
ClientFactory clientFactory = newClientFactory()) {
ClientFactory clientFactory = newClientFactory(eventLoop.get())) {

final int port = ss.getLocalPort();

Expand Down Expand Up @@ -208,55 +204,4 @@ void streamGreaterThanLastStreamId() throws Exception {
}
}
}

private static ClientFactory newClientFactory() {
return ClientFactory.builder()
.useHttp2Preface(true)
// Set the window size to the HTTP/2 default values to simplify the traffic.
.http2InitialConnectionWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE)
.http2InitialStreamWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE)
.workerGroup(eventLoop.get(), false)
.build();
}

private static void handleInitialExchange(InputStream in, BufferedOutputStream out) throws IOException {
// Read the connection preface and discard it.
readBytes(in, connectionPrefaceBuf().readableBytes());

// Read a SETTINGS frame.
assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.SETTINGS);

// Send a SETTINGS frame and the ack for the received SETTINGS frame.
sendEmptySettingsAndAckFrame(out);

// Read a SETTINGS ack frame.
assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.SETTINGS);
}

private static byte[] readBytes(InputStream in, int length) throws IOException {
final byte[] buf = new byte[length];
ByteStreams.readFully(in, buf);
return buf;
}

private static void sendEmptySettingsAndAckFrame(BufferedOutputStream bos) throws IOException {
// Send an empty SETTINGS frame.
bos.write(new byte[] { 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00 });
// Send a SETTINGS_ACK frame.
bos.write(new byte[] { 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00 });
bos.flush();
}

private static int payloadLength(byte[] buf) {
return (buf[0] & 0xff) << 16 | (buf[1] & 0xff) << 8 | (buf[2] & 0xff);
}

private static ByteBuf readFrame(InputStream in) throws IOException {
final byte[] frameBuf = readBytes(in, 9);
final int payloadLength = payloadLength(frameBuf);
final ByteBuf buffer = Unpooled.buffer(9 + payloadLength);
buffer.writeBytes(frameBuf);
buffer.writeBytes(in, payloadLength);
return buffer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.client;

import static com.linecorp.armeria.internal.testing.Http2ByteUtil.handleInitialExchange;
import static com.linecorp.armeria.internal.testing.Http2ByteUtil.newClientFactory;
import static com.linecorp.armeria.internal.testing.Http2ByteUtil.readFrame;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.testing.junit5.common.EventLoopExtension;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import io.netty.handler.codec.http2.Http2FrameTypes;

class HttpResponseWrapperLogTest {

@RegisterExtension
static final EventLoopExtension eventLoop = new EventLoopExtension();

private static final LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
private static final Logger logger =
(Logger) LoggerFactory.getLogger(HttpResponseWrapper.class);
private static final ListAppender<ILoggingEvent> appender = new ListAppender<>();

@BeforeEach
void beforeEach() {
appender.setContext(context);
appender.start();
logger.addAppender(appender);
}

@AfterEach
void afterEach() {
appender.stop();
logger.detachAppender(appender);
}

@Test
void goAwayNotLogged() throws Exception {
try (ServerSocket ss = new ServerSocket(0);
ClientFactory clientFactory = newClientFactory(eventLoop.get())) {

final int port = ss.getLocalPort();

final WebClient client = WebClient.builder("h2c://127.0.0.1:" + port)
.factory(clientFactory)
.build();
final HttpRequest req = HttpRequest.streaming(HttpMethod.GET, "/");
final CompletableFuture<AggregatedHttpResponse> resFuture = client.execute(req).aggregate();
try (Socket s = ss.accept()) {

final InputStream in = s.getInputStream();
final BufferedOutputStream bos = new BufferedOutputStream(s.getOutputStream());
handleInitialExchange(in, bos);

// Read a HEADERS frame.
assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.HEADERS);

// Send a GOAWAY frame.
bos.write(new byte[] {
0x00, 0x00, 0x08, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x03, // lastStreamId = 3
0x00, 0x00, 0x00, 0x00 // errorCode = 0
});
bos.flush();

// The second request should fail with UnprocessedRequestException
// which has a cause of GoAwayReceivedException.
await().untilAsserted(resFuture::isCompletedExceptionally);
assertThatThrownBy(resFuture::join).isInstanceOf(CompletionException.class)
.hasCauseInstanceOf(ClosedSessionException.class);

// Read a GOAWAY frame.
assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.GO_AWAY);

assertThat(in.read()).isEqualTo(-1);
}
}
assertThat(appender.list).allSatisfy(event -> {
assertThat(event.getMessage())
.doesNotContain(HttpResponseWrapper.UNEXPECTED_EXCEPTION_MSG);
});
}
}
3 changes: 2 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,15 @@ includeWithFlags ':it:builders', 'java'
includeWithFlags ':it:context-storage', 'java'
includeWithFlags ':it:dgs', 'java17'
includeWithFlags ':it:flags-cyclic-dep', 'java'
includeWithFlags ':it:flags-provider', 'java', 'relocate'
includeWithFlags ':it:graphql-multipart', 'java17'
includeWithFlags ':it:grpcweb', 'java', 'akka-grpc_2.13'
includeWithFlags ':it:grpc:java', 'java'
includeWithFlags ':it:grpc:kotlin', 'java', 'relocate', 'kotlin-grpc', 'kotlin'
includeWithFlags ':it:grpc:kotlin-coroutine-context-provider', 'java', 'relocate', 'kotlin-grpc', 'kotlin'
includeWithFlags ':it:grpc:scala', 'java', 'relocate', 'scala-grpc_2.13', 'scala_2.13'
includeWithFlags ':it:grpc:reactor', 'java', 'relocate', 'reactor-grpc'
includeWithFlags ':it:flags-provider', 'java', 'relocate'
includeWithFlags ':it:internal-logging', 'java', 'relocate'
includeWithFlags ':it:jackson-provider', 'java', 'relocate'
includeWithFlags ':it:kotlin', 'java', 'relocate', 'kotlin'
includeWithFlags ':it:kubernetes-chaos-tests', 'java', 'relocate'
Expand Down
Loading

0 comments on commit e60e623

Please sign in to comment.