Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for ext_proc stream resets #38346

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/common/http/codec_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void CodecClient::completeRequest(ActiveRequest& request) {
}

void CodecClient::onReset(ActiveRequest& request, StreamResetReason reason) {
ENVOY_CONN_LOG(debug, "request reset", *connection_);
ENVOY_CONN_LOG(debug, "Request reset. Reason {}", *connection_, static_cast<int>(reason));
if (codec_client_callbacks_) {
codec_client_callbacks_->onStreamReset(reason);
}
Expand Down
30 changes: 30 additions & 0 deletions test/common/grpc/grpc_client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,41 @@ INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, GrpcClientIntegrationTest,
TEST_P(GrpcClientIntegrationTest, BasicStream) {
initialize();
auto stream = createStream(empty_metadata_);
// Send request without END_STREAM set to true. By default Envoy gRPC client will reset
// the stream upon receiving response with trailers. Google gRPC client will not reset the
// stream as it by default supports independent half-close. auto stream =
// createStream(empty_metadata_);
stream->sendRequest();
stream->sendServerInitialMetadata(empty_metadata_);
stream->sendReply();
stream->sendServerTrailers(Status::WellKnownGrpcStatus::Ok, "", empty_metadata_);
dispatcher_helper_.runDispatcher();

if (clientType() == ClientType::EnvoyGrpc) { // Envoy gRPC based AsyncGrpcClient should reset
// stream, since server half-closed before client.
EXPECT_EQ(
cm_.thread_local_cluster_.cluster_.info_->trafficStats()->upstream_rq_tx_reset_.value(), 1);
stream->waitForReset();
}
}

// Validate that a simple request-reply stream works.
TEST_P(GrpcClientIntegrationTest, BasicStreamGracefulClose) {
initialize();
auto stream = createStream(empty_metadata_);
// Send request with end_stream set to true, causing gRPC client to half close
// the stream.
RequestArgs request_args{nullptr, true};
stream->sendRequest(request_args);
stream->sendServerInitialMetadata(empty_metadata_);
stream->sendReply();
stream->sendServerTrailers(Status::WellKnownGrpcStatus::Ok, "", empty_metadata_);
dispatcher_helper_.runDispatcher();

// AsyncGrpcClient should not cause local reset, completing the stream gracefully.
EXPECT_EQ(cm_.thread_local_cluster_.cluster_.info_->trafficStats()->upstream_rq_tx_reset_.value(),
0);
stream->waitForEndStream();
}

// A simple request-reply stream, "x-envoy-internal" and `x-forward-for` headers
Expand Down
9 changes: 9 additions & 0 deletions test/common/grpc/grpc_client_integration_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,19 @@ class HelloworldStream : public MockAsyncStreamCallbacks<helloworld::HelloReply>

void closeStream() {
grpc_stream_->closeStream();
waitForEndStream();
}

void waitForEndStream() {
AssertionResult result = fake_stream_->waitForEndStream(dispatcher_helper_.dispatcher_);
RELEASE_ASSERT(result, result.message());
}

void waitForReset() {
AssertionResult result = fake_stream_->waitForReset(dispatcher_helper_.dispatcher_);
RELEASE_ASSERT(result, result.message());
}

DispatcherHelper& dispatcher_helper_;
FakeStream* fake_stream_{};
AsyncStream<helloworld::HelloRequest> grpc_stream_{};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,45 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStreamOnResponse) {
verifyDownstreamResponse(*response, 500);
}

TEST_P(ExtProcIntegrationTest, OnlyRequestHeaders) {
// Skip the header processing on response path.
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
initializeConfig();
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequest(absl::nullopt);

processRequestHeadersMessage(
*grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& headers_resp) {
// The response does not really matter, it just needs to be non-empty.
auto response_header_mutation = headers_resp.mutable_response()->mutable_header_mutation();
auto* mut1 = response_header_mutation->add_set_headers();
mut1->mutable_header()->set_key("x-new-header");
mut1->mutable_header()->set_raw_value("new");
return true;
});
// ext_proc is configured to only send request headers. In this case, server indicates that it is
// not expecting any more messages from ext_proc filter and half-closes the stream.
processor_stream_->finishGrpcStream(Grpc::Status::Ok);

// ext_proc will immediately close side stream in this case, because by default Envoy gRPC client
// will reset the stream if the server half-closes before the client. Note that the ext_proc
// filter has not yet half-closed the sidestream, since it is doing it during its destruction.
// TODO(yanavlasov): Enable independent half-close for Envoy gRPC client and remove this check.
// TODO(yanavlasov): Reset in Google gRPC case is unexpected. Diagnose and fix.
EXPECT_TRUE(processor_stream_->waitForReset());

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));

EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new"));

upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(100, true);

verifyDownstreamResponse(*response, 200);
}

// Test the filter using the default configuration by connecting to
// an ext_proc server that responds to the request_headers message
// by requesting to modify the request headers.
Expand Down Expand Up @@ -2106,6 +2145,11 @@ TEST_P(ExtProcIntegrationTest, GetAndRespondImmediately) {
hdr2->mutable_header()->set_raw_value("application/json");
});

// ext_proc will immediately close side stream in this case, which causes it to be reset,
// since side stream codec had not yet observed server trailers.
// TODO(yanavlasov): Separate lifetimes of ext_proc and sidestream.
EXPECT_TRUE(processor_stream_->waitForReset());

verifyDownstreamResponse(*response, 401);
EXPECT_THAT(response->headers(), SingleHeaderValueIs("x-failure-reason", "testing"));
EXPECT_THAT(response->headers(), SingleHeaderValueIs("content-type", "application/json"));
Expand Down
11 changes: 11 additions & 0 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ AssertionResult FakeStream::waitForReset(milliseconds timeout) {
return AssertionSuccess();
}

AssertionResult FakeStream::waitForReset(Event::Dispatcher& client_dispatcher,
std::chrono::milliseconds timeout) {
absl::MutexLock lock(&lock_);
if (!waitForWithDispatcherRun(
time_system_, lock_, [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return saw_reset_; },
client_dispatcher, timeout)) {
return AssertionFailure() << "Timed out waiting for reset of stream.";
}
return AssertionSuccess();
}

void FakeStream::startGrpcStream(bool send_headers) {
ASSERT(!grpc_stream_started_, "gRPC stream should not be started more than once");
grpc_stream_started_ = true;
Expand Down
5 changes: 5 additions & 0 deletions test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ class FakeStream : public Http::RequestDecoder,
testing::AssertionResult
waitForReset(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

ABSL_MUST_USE_RESULT
testing::AssertionResult
waitForReset(Event::Dispatcher& client_dispatcher,
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

// gRPC convenience methods.
void startGrpcStream(bool send_headers = true);
void finishGrpcStream(Grpc::Status::GrpcStatus status);
Expand Down
Loading