From b7bda03ceff933206ed3052ff03e24519f00cead Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 00:13:18 +0000 Subject: [PATCH 01/12] dynamic_modules: adds integration tests Signed-off-by: Takeshi Yoneda --- .../filters/http/dynamic_modules/filter.cc | 2 + test/extensions/dynamic_modules/http/BUILD | 2 +- .../dynamic_modules/http/integration_test.cc | 73 ++++++++++++++++--- .../dynamic_modules/test_data/rust/BUILD | 2 + .../dynamic_modules/test_data/rust/Cargo.toml | 6 ++ .../dynamic_modules/test_data/rust/http.rs | 18 ----- 6 files changed, 73 insertions(+), 30 deletions(-) diff --git a/source/extensions/filters/http/dynamic_modules/filter.cc b/source/extensions/filters/http/dynamic_modules/filter.cc index 3f93bcb4a9c2..c6246f2fabfa 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.cc +++ b/source/extensions/filters/http/dynamic_modules/filter.cc @@ -56,6 +56,7 @@ Filter1xxHeadersStatus DynamicModuleHttpFilter::encode1xxHeaders(ResponseHeaderM FilterHeadersStatus DynamicModuleHttpFilter::encodeHeaders(ResponseHeaderMap& headers, bool end_of_stream) { + printf("DynamicModuleHttpFilter::encodeHeaders: %d\n", end_of_stream); response_headers_ = &headers; const envoy_dynamic_module_type_on_http_filter_response_headers_status status = config_->on_http_filter_response_headers_(thisAsVoidPtr(), in_module_filter_, end_of_stream); @@ -63,6 +64,7 @@ FilterHeadersStatus DynamicModuleHttpFilter::encodeHeaders(ResponseHeaderMap& he }; FilterDataStatus DynamicModuleHttpFilter::encodeData(Buffer::Instance&, bool end_of_stream) { + printf("DynamicModuleHttpFilter::encodeData: %d\n", end_of_stream); const envoy_dynamic_module_type_on_http_filter_response_body_status status = config_->on_http_filter_response_body_(thisAsVoidPtr(), in_module_filter_, end_of_stream); return static_cast(status); diff --git a/test/extensions/dynamic_modules/http/BUILD b/test/extensions/dynamic_modules/http/BUILD index 27d3b2729794..1282c20ed0dc 100644 --- a/test/extensions/dynamic_modules/http/BUILD +++ b/test/extensions/dynamic_modules/http/BUILD @@ -66,7 +66,7 @@ envoy_cc_test( name = "integration_test", srcs = ["integration_test.cc"], data = [ - "//test/extensions/dynamic_modules/test_data/rust:http", + "//test/extensions/dynamic_modules/test_data/rust:http_integration_test", ], rbe_pool = "6gig", deps = [ diff --git a/test/extensions/dynamic_modules/http/integration_test.cc b/test/extensions/dynamic_modules/http/integration_test.cc index 41eab4a09fea..f3043ef85d77 100644 --- a/test/extensions/dynamic_modules/http/integration_test.cc +++ b/test/extensions/dynamic_modules/http/integration_test.cc @@ -6,19 +6,26 @@ class DynamicModulesIntegrationTest : public testing::TestWithParambody().size()); } +TEST_P(DynamicModulesIntegrationTest, HeaderCallbacks) { + initializeFilter("header_callbacks", "dog:cat"); + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + + Http::TestRequestHeaderMapImpl request_headers{{"foo", "bar"}, + {":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}; + Http::TestRequestTrailerMapImpl request_trailers{{"foo", "bar"}}; + Http::TestResponseHeaderMapImpl response_headers{{":status", "200"}, {"foo", "bar"}}; + Http::TestResponseTrailerMapImpl response_trailers{{"foo", "bar"}}; + + auto encoder_decoder = codec_client_->startRequest(request_headers); + auto response = std::move(encoder_decoder.second); + codec_client_->sendData(encoder_decoder.first, 10, false); + codec_client_->sendTrailers(encoder_decoder.first, request_trailers); + + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(response_headers, false); + upstream_request_->encodeData(10, false); + upstream_request_->encodeTrailers(response_trailers); + + ASSERT_TRUE(response->waitForEndStream()); + + // Verify the proxied request was received upstream, as expected. + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_EQ(10U, upstream_request_->bodyLength()); + // Verify that the headers/trailers are added as expected. + EXPECT_EQ( + "cat", + upstream_request_->headers().get(Http::LowerCaseString("dog"))[0]->value().getStringView()); + EXPECT_EQ("cat", upstream_request_->trailers() + .get() + ->get(Http::LowerCaseString("dog"))[0] + ->value() + .getStringView()); + // Verify the proxied response was received downstream, as expected. + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + EXPECT_EQ(10U, response->body().size()); + // Verify that the headers/trailers are added as expected. + EXPECT_EQ("cat", + response->headers().get(Http::LowerCaseString("dog"))[0]->value().getStringView()); + EXPECT_EQ( + "cat", + response->trailers().get()->get(Http::LowerCaseString("dog"))[0]->value().getStringView()); +} + } // namespace Envoy diff --git a/test/extensions/dynamic_modules/test_data/rust/BUILD b/test/extensions/dynamic_modules/test_data/rust/BUILD index 09ba65ee820b..79e9d82a9d67 100644 --- a/test/extensions/dynamic_modules/test_data/rust/BUILD +++ b/test/extensions/dynamic_modules/test_data/rust/BUILD @@ -16,3 +16,5 @@ test_program(name = "program_init_fail") test_program(name = "abi_version_mismatch") test_program(name = "http") + +test_program(name = "http_integration_test") diff --git a/test/extensions/dynamic_modules/test_data/rust/Cargo.toml b/test/extensions/dynamic_modules/test_data/rust/Cargo.toml index 838f95273f69..e7c0ec4ef36e 100644 --- a/test/extensions/dynamic_modules/test_data/rust/Cargo.toml +++ b/test/extensions/dynamic_modules/test_data/rust/Cargo.toml @@ -37,3 +37,9 @@ name = "http" path = "http.rs" crate-type = ["cdylib"] test = true + +[[example]] +name = "http_integration_test" +path = "http_integration_test.rs" +crate-type = ["cdylib"] +test = true diff --git a/test/extensions/dynamic_modules/test_data/rust/http.rs b/test/extensions/dynamic_modules/test_data/rust/http.rs index 48410b28362d..375b64d0c2e2 100644 --- a/test/extensions/dynamic_modules/test_data/rust/http.rs +++ b/test/extensions/dynamic_modules/test_data/rust/http.rs @@ -21,10 +21,8 @@ fn new_http_filter_config_fn( match name { "header_callbacks" => Some(Box::new(HeaderCallbacksFilterConfig {})), "send_response" => Some(Box::new(SendResponseFilterConfig {})), - "passthrough" => Some(Box::new(PassthroughHttpFilterConfig {})), "dynamic_metadata_callbacks" => Some(Box::new(DynamicMetadataCallbacksFilterConfig {})), "body_callbacks" => Some(Box::new(BodyCallbacksFilterConfig {})), - // TODO: add various configs for body, etc. _ => panic!("Unknown filter name: {}", name), } } @@ -268,22 +266,6 @@ impl HttpFilter for SendResponseFilter { } } -/// [`envoy_proxy_dynamic_modules_rust_sdk::HttpFilterConfig`], but simply passes through to -/// the default implementation. -struct PassthroughHttpFilterConfig {} - -impl HttpFilterConfig - for PassthroughHttpFilterConfig -{ - fn new_http_filter(&self, _envoy: &mut EC) -> Box> { - Box::new(PassthroughHttpFilter {}) - } -} - -struct PassthroughHttpFilter {} - -impl HttpFilter for PassthroughHttpFilter {} - /// A HTTP filter configuration that implements /// [`envoy_proxy_dynamic_modules_rust_sdk::HttpFilterConfig`] to test the dynamic metadata related /// callbacks. From 5eac7bc11daef20b670114c4adb351fdcc3141ea Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 00:14:07 +0000 Subject: [PATCH 02/12] more Signed-off-by: Takeshi Yoneda --- source/extensions/filters/http/dynamic_modules/filter.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/extensions/filters/http/dynamic_modules/filter.cc b/source/extensions/filters/http/dynamic_modules/filter.cc index c6246f2fabfa..3f93bcb4a9c2 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.cc +++ b/source/extensions/filters/http/dynamic_modules/filter.cc @@ -56,7 +56,6 @@ Filter1xxHeadersStatus DynamicModuleHttpFilter::encode1xxHeaders(ResponseHeaderM FilterHeadersStatus DynamicModuleHttpFilter::encodeHeaders(ResponseHeaderMap& headers, bool end_of_stream) { - printf("DynamicModuleHttpFilter::encodeHeaders: %d\n", end_of_stream); response_headers_ = &headers; const envoy_dynamic_module_type_on_http_filter_response_headers_status status = config_->on_http_filter_response_headers_(thisAsVoidPtr(), in_module_filter_, end_of_stream); @@ -64,7 +63,6 @@ FilterHeadersStatus DynamicModuleHttpFilter::encodeHeaders(ResponseHeaderMap& he }; FilterDataStatus DynamicModuleHttpFilter::encodeData(Buffer::Instance&, bool end_of_stream) { - printf("DynamicModuleHttpFilter::encodeData: %d\n", end_of_stream); const envoy_dynamic_module_type_on_http_filter_response_body_status status = config_->on_http_filter_response_body_(thisAsVoidPtr(), in_module_filter_, end_of_stream); return static_cast(status); From 8c7250c5f1090781fae0e47515d957341bdbbe55 Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 00:14:30 +0000 Subject: [PATCH 03/12] add Signed-off-by: Takeshi Yoneda --- .../test_data/rust/http_integration_test.rs | 151 ++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs diff --git a/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs b/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs new file mode 100644 index 000000000000..fb662cdfe12a --- /dev/null +++ b/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs @@ -0,0 +1,151 @@ +use envoy_proxy_dynamic_modules_rust_sdk::*; + +declare_init_functions!(init, new_http_filter_config_fn); + +fn init() -> bool { + true +} + +fn new_http_filter_config_fn( + _envoy_filter_config: &mut EC, + name: &str, + config: &str, +) -> Option>> { + match name { + "passthrough" => Some(Box::new(PassthroughHttpFilterConfig {})), + "header_callbacks" => Some(Box::new(HeadersHttpFilterConfig { + headers_to_add: config.to_string(), + })), + _ => panic!("Unknown filter name: {}", name), + } +} + +struct PassthroughHttpFilterConfig {} + +impl HttpFilterConfig + for PassthroughHttpFilterConfig +{ + fn new_http_filter(&self, _envoy: &mut EC) -> Box> { + Box::new(PassthroughHttpFilter {}) + } +} + +struct PassthroughHttpFilter {} + +impl HttpFilter for PassthroughHttpFilter {} + +struct HeadersHttpFilterConfig { + headers_to_add: String, +} + +impl HttpFilterConfig + for HeadersHttpFilterConfig +{ + fn new_http_filter(&self, _envoy: &mut EC) -> Box> { + let headers_to_add: Vec<(String, String)> = self + .headers_to_add + .split(',') + .map(|header| { + let parts: Vec<&str> = header.split(':').collect(); + (parts[0].to_string(), parts[1].to_string()) + }) + .collect(); + Box::new(HeadersHttpFilter { + headers_to_add, + request_headers_called: false, + request_trailers_called: false, + response_headers_called: false, + response_trailers_called: false, + }) + } +} + +struct HeadersHttpFilter { + headers_to_add: Vec<(String, String)>, + request_headers_called: bool, + request_trailers_called: bool, + response_headers_called: bool, + response_trailers_called: bool, +} + +impl HttpFilter for HeadersHttpFilter { + fn on_request_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> abi::envoy_dynamic_module_type_on_http_filter_request_headers_status { + self.request_headers_called = true; + let path_header = envoy_filter + .get_request_header_value(":path") + .expect(":path header"); + assert_eq!(path_header.as_slice(), b"/test/long/url"); + let method_header = envoy_filter + .get_request_header_value(":method") + .expect(":method header"); + assert_eq!(method_header.as_slice(), b"POST"); + + let foo_header = envoy_filter + .get_request_header_value("foo") + .expect("foo header"); + assert_eq!(foo_header.as_slice(), b"bar"); + for (name, value) in &self.headers_to_add { + envoy_filter.set_request_header(name, value.as_bytes()); + } + abi::envoy_dynamic_module_type_on_http_filter_request_headers_status::Continue + } + + fn on_request_trailers( + &mut self, + envoy_filter: &mut EHF, + ) -> abi::envoy_dynamic_module_type_on_http_filter_request_trailers_status { + self.request_trailers_called = true; + let foo_trailer = envoy_filter + .get_request_trailer_value("foo") + .expect("foo trailer"); + assert_eq!(foo_trailer.as_slice(), b"bar"); + for (name, value) in &self.headers_to_add { + envoy_filter.set_request_trailer(name, value.as_bytes()); + } + abi::envoy_dynamic_module_type_on_http_filter_request_trailers_status::Continue + } + + fn on_response_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> abi::envoy_dynamic_module_type_on_http_filter_response_headers_status { + self.response_headers_called = true; + let foo_header = envoy_filter + .get_response_header_value("foo") + .expect("foo header"); + assert_eq!(foo_header.as_slice(), b"bar"); + for (name, value) in &self.headers_to_add { + envoy_filter.set_response_header(name, value.as_bytes()); + } + abi::envoy_dynamic_module_type_on_http_filter_response_headers_status::Continue + } + + fn on_response_trailers( + &mut self, + envoy_filter: &mut EHF, + ) -> abi::envoy_dynamic_module_type_on_http_filter_response_trailers_status { + self.response_trailers_called = true; + let foo_trailer = envoy_filter + .get_response_trailer_value("foo") + .expect("foo trailer"); + assert_eq!(foo_trailer.as_slice(), b"bar"); + for (name, value) in &self.headers_to_add { + envoy_filter.set_response_trailer(name, value.as_bytes()); + } + abi::envoy_dynamic_module_type_on_http_filter_response_trailers_status::Continue + } +} + +impl Drop for HeadersHttpFilter { + fn drop(&mut self) { + assert!(self.request_headers_called); + assert!(self.request_trailers_called); + assert!(self.response_headers_called); + assert!(self.response_trailers_called); + } +} From 819dc458d5f5345bd9d6c3dd8ae7cc74c5767f1f Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 01:37:40 +0000 Subject: [PATCH 04/12] adds more test Signed-off-by: Takeshi Yoneda --- .../dynamic_modules/http/integration_test.cc | 37 ++++++++++ .../test_data/rust/http_integration_test.rs | 72 ++++++++++++++++--- 2 files changed, 101 insertions(+), 8 deletions(-) diff --git a/test/extensions/dynamic_modules/http/integration_test.cc b/test/extensions/dynamic_modules/http/integration_test.cc index f3043ef85d77..f0441ba08b49 100644 --- a/test/extensions/dynamic_modules/http/integration_test.cc +++ b/test/extensions/dynamic_modules/http/integration_test.cc @@ -107,4 +107,41 @@ TEST_P(DynamicModulesIntegrationTest, HeaderCallbacks) { response->trailers().get()->get(Http::LowerCaseString("dog"))[0]->value().getStringView()); } +TEST_P(DynamicModulesIntegrationTest, SendResponseFromOnRequestHeaders) { + initializeFilter("send_response", "on_request_headers"); + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + auto response = std::move(encoder_decoder.second); + + ASSERT_TRUE(response->waitForEndStream()); + + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + auto body = response->body(); + EXPECT_EQ("local_response_body_from_on_request_headers", body); + EXPECT_EQ( + "some_value", + response->headers().get(Http::LowerCaseString("some_header"))[0]->value().getStringView()); +} + +TEST_P(DynamicModulesIntegrationTest, SendResponseFromOnRequestBody) { + initializeFilter("send_response", "on_request_body"); + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + auto response = std::move(encoder_decoder.second); + codec_client_->sendData(encoder_decoder.first, 10, true); + + ASSERT_TRUE(response->waitForEndStream()); + + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + auto body = response->body(); + EXPECT_EQ("local_response_body_from_on_request_body", body); + EXPECT_EQ( + "some_value", + response->headers().get(Http::LowerCaseString("some_header"))[0]->value().getStringView()); +} + } // namespace Envoy diff --git a/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs b/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs index fb662cdfe12a..5bff27c9dd50 100644 --- a/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs +++ b/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs @@ -1,3 +1,4 @@ +use abi::*; use envoy_proxy_dynamic_modules_rust_sdk::*; declare_init_functions!(init, new_http_filter_config_fn); @@ -16,6 +17,10 @@ fn new_http_filter_config_fn( "header_callbacks" => Some(Box::new(HeadersHttpFilterConfig { headers_to_add: config.to_string(), })), + "body_callbacks" => None, + "send_response" => Some(Box::new(SendResponseHttpFilterConfig { + on_request_headers: config == "on_request_headers", + })), _ => panic!("Unknown filter name: {}", name), } } @@ -47,6 +52,7 @@ impl HttpFilterConfig .split(',') .map(|header| { let parts: Vec<&str> = header.split(':').collect(); + assert_eq!(parts.len(), 2); (parts[0].to_string(), parts[1].to_string()) }) .collect(); @@ -73,7 +79,7 @@ impl HttpFilter for HeadersHttpFilter { &mut self, envoy_filter: &mut EHF, _end_of_stream: bool, - ) -> abi::envoy_dynamic_module_type_on_http_filter_request_headers_status { + ) -> envoy_dynamic_module_type_on_http_filter_request_headers_status { self.request_headers_called = true; let path_header = envoy_filter .get_request_header_value(":path") @@ -91,13 +97,13 @@ impl HttpFilter for HeadersHttpFilter { for (name, value) in &self.headers_to_add { envoy_filter.set_request_header(name, value.as_bytes()); } - abi::envoy_dynamic_module_type_on_http_filter_request_headers_status::Continue + envoy_dynamic_module_type_on_http_filter_request_headers_status::Continue } fn on_request_trailers( &mut self, envoy_filter: &mut EHF, - ) -> abi::envoy_dynamic_module_type_on_http_filter_request_trailers_status { + ) -> envoy_dynamic_module_type_on_http_filter_request_trailers_status { self.request_trailers_called = true; let foo_trailer = envoy_filter .get_request_trailer_value("foo") @@ -106,14 +112,14 @@ impl HttpFilter for HeadersHttpFilter { for (name, value) in &self.headers_to_add { envoy_filter.set_request_trailer(name, value.as_bytes()); } - abi::envoy_dynamic_module_type_on_http_filter_request_trailers_status::Continue + envoy_dynamic_module_type_on_http_filter_request_trailers_status::Continue } fn on_response_headers( &mut self, envoy_filter: &mut EHF, _end_of_stream: bool, - ) -> abi::envoy_dynamic_module_type_on_http_filter_response_headers_status { + ) -> envoy_dynamic_module_type_on_http_filter_response_headers_status { self.response_headers_called = true; let foo_header = envoy_filter .get_response_header_value("foo") @@ -122,13 +128,13 @@ impl HttpFilter for HeadersHttpFilter { for (name, value) in &self.headers_to_add { envoy_filter.set_response_header(name, value.as_bytes()); } - abi::envoy_dynamic_module_type_on_http_filter_response_headers_status::Continue + envoy_dynamic_module_type_on_http_filter_response_headers_status::Continue } fn on_response_trailers( &mut self, envoy_filter: &mut EHF, - ) -> abi::envoy_dynamic_module_type_on_http_filter_response_trailers_status { + ) -> envoy_dynamic_module_type_on_http_filter_response_trailers_status { self.response_trailers_called = true; let foo_trailer = envoy_filter .get_response_trailer_value("foo") @@ -137,7 +143,7 @@ impl HttpFilter for HeadersHttpFilter { for (name, value) in &self.headers_to_add { envoy_filter.set_response_trailer(name, value.as_bytes()); } - abi::envoy_dynamic_module_type_on_http_filter_response_trailers_status::Continue + envoy_dynamic_module_type_on_http_filter_response_trailers_status::Continue } } @@ -149,3 +155,53 @@ impl Drop for HeadersHttpFilter { assert!(self.response_trailers_called); } } + +struct SendResponseHttpFilterConfig { + on_request_headers: bool, +} + +impl HttpFilterConfig + for SendResponseHttpFilterConfig +{ + fn new_http_filter(&self, _envoy: &mut EC) -> Box> { + Box::new(SendResponseHttpFilter { + on_request_headers: self.on_request_headers, + }) + } +} + +struct SendResponseHttpFilter { + on_request_headers: bool, +} + +impl HttpFilter for SendResponseHttpFilter { + fn on_request_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_request_headers_status { + if self.on_request_headers { + envoy_filter.send_response( + 200, + vec![("some_header", b"some_value")], + Some(b"local_response_body_from_on_request_headers"), + ); + envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration + } else { + envoy_dynamic_module_type_on_http_filter_request_headers_status::Continue + } + } + + fn on_request_body( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_request_body_status { + envoy_filter.send_response( + 200, + vec![("some_header", b"some_value")], + Some(b"local_response_body_from_on_request_body"), + ); + envoy_dynamic_module_type_on_http_filter_request_body_status::StopIterationAndBuffer + } +} From 512dd94bfd1436190705b43e2a15ee6a6c8a9eec Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 02:50:20 +0000 Subject: [PATCH 05/12] more tests Signed-off-by: Takeshi Yoneda --- .../filters/http/dynamic_modules/filter.cc | 10 ++- .../filters/http/dynamic_modules/filter.h | 7 ++ .../dynamic_modules/http/integration_test.cc | 31 +++++++ .../test_data/rust/http_integration_test.rs | 86 ++++++++++++++++++- 4 files changed, 131 insertions(+), 3 deletions(-) diff --git a/source/extensions/filters/http/dynamic_modules/filter.cc b/source/extensions/filters/http/dynamic_modules/filter.cc index 3f93bcb4a9c2..1a5a22042acf 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.cc +++ b/source/extensions/filters/http/dynamic_modules/filter.cc @@ -31,10 +31,16 @@ FilterHeadersStatus DynamicModuleHttpFilter::decodeHeaders(RequestHeaderMap& hea return static_cast(status); }; -FilterDataStatus DynamicModuleHttpFilter::decodeData(Buffer::Instance&, bool end_of_stream) { +FilterDataStatus DynamicModuleHttpFilter::decodeData(Buffer::Instance& b, bool end_of_stream) { + if (request_body_buffering_ && end_of_stream) { + decoder_callbacks_->addDecodedData(b, false); + } const envoy_dynamic_module_type_on_http_filter_request_body_status status = config_->on_http_filter_request_body_(thisAsVoidPtr(), in_module_filter_, end_of_stream); - return static_cast(status); + auto ret = static_cast(status); + request_body_buffering_ = ret == FilterDataStatus::StopIterationAndBuffer || + ret == FilterDataStatus::StopIterationAndWatermark; + return ret; }; FilterTrailersStatus DynamicModuleHttpFilter::decodeTrailers(RequestTrailerMap& trailers) { diff --git a/source/extensions/filters/http/dynamic_modules/filter.h b/source/extensions/filters/http/dynamic_modules/filter.h index e7b7465e3513..3b56b4996e8f 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.h +++ b/source/extensions/filters/http/dynamic_modules/filter.h @@ -64,6 +64,13 @@ class DynamicModuleHttpFilter : public Http::StreamFilter, ResponseHeaderMap* response_headers_ = nullptr; ResponseTrailerMap* response_trailers_ = nullptr; + // This is necessary to make it possible to see the last chunk of data in the buffer. + // Note(mathetake): this seems a bit hacky, but is used everywhere in the codebase. + // Maybe we should fix the buffering logic in the core? + // + // TODO: add an integration test where end_of_stream is true at the first chunk of data. + bool request_body_buffering_ = false; + /** * Helper to get the downstream information of the stream. */ diff --git a/test/extensions/dynamic_modules/http/integration_test.cc b/test/extensions/dynamic_modules/http/integration_test.cc index f0441ba08b49..065d8048ba1b 100644 --- a/test/extensions/dynamic_modules/http/integration_test.cc +++ b/test/extensions/dynamic_modules/http/integration_test.cc @@ -107,6 +107,37 @@ TEST_P(DynamicModulesIntegrationTest, HeaderCallbacks) { response->trailers().get()->get(Http::LowerCaseString("dog"))[0]->value().getStringView()); } +TEST_P(DynamicModulesIntegrationTest, BodyCallbacks) { + initializeFilter("body_callbacks"); + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + + Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "test.com"}}; + auto encoder_decoder = codec_client_->startRequest(request_headers, false); + auto response = std::move(encoder_decoder.second); + codec_client_->sendData(encoder_decoder.first, "request", false); + codec_client_->sendData(encoder_decoder.first, "_b", false); + codec_client_->sendData(encoder_decoder.first, "ody", true); + + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(default_response_headers_, false); + upstream_request_->encodeData("res", false); + upstream_request_->encodeData("ponse", false); + upstream_request_->encodeData("_body", true); + + ASSERT_TRUE(response->waitForEndStream()); + + // Verify the proxied request was received upstream, as expected. + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_EQ("new_request_body", upstream_request_->body().toString()); + // Verify the proxied response was received downstream, as expected. + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + EXPECT_EQ("new_response_body", response->body()); +} + TEST_P(DynamicModulesIntegrationTest, SendResponseFromOnRequestHeaders) { initializeFilter("send_response", "on_request_headers"); codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); diff --git a/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs b/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs index 5bff27c9dd50..45c327b49c1d 100644 --- a/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs +++ b/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs @@ -17,7 +17,7 @@ fn new_http_filter_config_fn( "header_callbacks" => Some(Box::new(HeadersHttpFilterConfig { headers_to_add: config.to_string(), })), - "body_callbacks" => None, + "body_callbacks" => Some(Box::new(BodyCallbacksFilterConfig {})), "send_response" => Some(Box::new(SendResponseHttpFilterConfig { on_request_headers: config == "on_request_headers", })), @@ -156,6 +156,90 @@ impl Drop for HeadersHttpFilter { } } +struct BodyCallbacksFilterConfig {} + +impl HttpFilterConfig + for BodyCallbacksFilterConfig +{ + fn new_http_filter(&self, _envoy: &mut EC) -> Box> { + Box::new(BodyCallbacksFilter { + seen_request_body: false, + seen_response_body: false, + }) + } +} + +struct BodyCallbacksFilter { + seen_request_body: bool, + seen_response_body: bool, +} + +impl HttpFilter for BodyCallbacksFilter { + fn on_request_body( + &mut self, + envoy_filter: &mut EHF, + end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_request_body_status { + if !end_of_stream { + // Buffer the request body until the end of stream. + return envoy_dynamic_module_type_on_http_filter_request_body_status::StopIterationAndBuffer; + } + self.seen_request_body = true; + + let request_body = envoy_filter.get_request_body().expect("request body"); + let mut body = String::new(); + for chunk in request_body { + println!("chunk: {:?}", chunk); + body.push_str(std::str::from_utf8(chunk.as_slice()).unwrap()); + } + assert_eq!(body, "request_body"); + + // Drain the request body. + envoy_filter.drain_request_body(body.len()); + // Append the new request body. + envoy_filter.append_request_body(b"new_request_body"); + // Plus we need to set the content length. + envoy_filter.set_request_header("content-length", b"16"); + + envoy_dynamic_module_type_on_http_filter_request_body_status::Continue + } + + fn on_response_body( + &mut self, + envoy_filter: &mut EHF, + end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_response_body_status { + if !end_of_stream { + // Buffer the response body until the end of stream. + return envoy_dynamic_module_type_on_http_filter_response_body_status::StopIterationAndBuffer; + } + self.seen_response_body = true; + + let response_body = envoy_filter.get_response_body().expect("response body"); + let mut body = String::new(); + for chunk in response_body { + body.push_str(std::str::from_utf8(chunk.as_slice()).unwrap()); + } + assert_eq!(body, "response_body"); + + // Drain the response body. + envoy_filter.drain_response_body(body.len()); + // Append the new response body. + envoy_filter.append_response_body(b"new_response_body"); + // Plus we need to set the content length. + envoy_filter.set_response_header("content-length", b"17"); + + envoy_dynamic_module_type_on_http_filter_response_body_status::Continue + } +} + +impl Drop for BodyCallbacksFilter { + fn drop(&mut self) { + assert!(self.seen_request_body); + assert!(self.seen_response_body); + } +} + struct SendResponseHttpFilterConfig { on_request_headers: bool, } From 86aad04ad18bfd278f2d4fee6d723be571c8b8e4 Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 07:22:05 +0000 Subject: [PATCH 06/12] more tests Signed-off-by: Takeshi Yoneda --- .../filters/http/dynamic_modules/abi_impl.cc | 21 +++++++++++++++++-- .../filters/http/dynamic_modules/filter.cc | 2 ++ .../filters/http/dynamic_modules/filter.h | 20 ++++++++++++++---- .../dynamic_modules/http/integration_test.cc | 21 +++++++++++++++++++ .../test_data/rust/http_integration_test.rs | 21 ++++++++++++++----- 5 files changed, 74 insertions(+), 11 deletions(-) diff --git a/source/extensions/filters/http/dynamic_modules/abi_impl.cc b/source/extensions/filters/http/dynamic_modules/abi_impl.cc index 5fb88d807fa0..b1efe2067df9 100644 --- a/source/extensions/filters/http/dynamic_modules/abi_impl.cc +++ b/source/extensions/filters/http/dynamic_modules/abi_impl.cc @@ -375,7 +375,11 @@ bool envoy_dynamic_module_callback_http_get_request_body_vector( auto filter = static_cast(filter_envoy_ptr); auto buffer = filter->decoder_callbacks_->decodingBuffer(); if (!buffer) { - return false; + buffer = filter->current_chunk_; + if (!buffer) { + return false; + } + // See the comment on current_chunk_ for when we reach this line when we use the current_chunk_. } auto raw_slices = buffer->getRawSlices(std::nullopt); auto counter = 0; @@ -392,7 +396,11 @@ bool envoy_dynamic_module_callback_http_get_request_body_vector_size( auto filter = static_cast(filter_envoy_ptr); auto buffer = filter->decoder_callbacks_->decodingBuffer(); if (!buffer) { - return false; + buffer = filter->current_chunk_; + if (!buffer) { + return false; + } + // See the comment on current_chunk_ for when we reach this line when we use the current_chunk_. } *size = buffer->getRawSlices(std::nullopt).size(); return true; @@ -403,6 +411,10 @@ bool envoy_dynamic_module_callback_http_append_request_body( envoy_dynamic_module_type_buffer_module_ptr data, size_t length) { auto filter = static_cast(filter_envoy_ptr); if (!filter->decoder_callbacks_->decodingBuffer()) { + if (filter->current_chunk_) { // See the comment on current_chunk_ for when we enter this block. + filter->current_chunk_->add(absl::string_view(static_cast(data), length)); + return true; + } return false; } filter->decoder_callbacks_->modifyDecodingBuffer([data, length](Buffer::Instance& buffer) { @@ -415,6 +427,11 @@ bool envoy_dynamic_module_callback_http_drain_request_body( envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, size_t number_of_bytes) { auto filter = static_cast(filter_envoy_ptr); if (!filter->decoder_callbacks_->decodingBuffer()) { + if (filter->current_chunk_) { // See the comment on current_chunk_ for when we enter this block. + auto size = std::min(filter->current_chunk_->length(), number_of_bytes); + filter->current_chunk_->drain(size); + return true; + } return false; } diff --git a/source/extensions/filters/http/dynamic_modules/filter.cc b/source/extensions/filters/http/dynamic_modules/filter.cc index 1a5a22042acf..866267ceab22 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.cc +++ b/source/extensions/filters/http/dynamic_modules/filter.cc @@ -35,11 +35,13 @@ FilterDataStatus DynamicModuleHttpFilter::decodeData(Buffer::Instance& b, bool e if (request_body_buffering_ && end_of_stream) { decoder_callbacks_->addDecodedData(b, false); } + current_chunk_ = &b; const envoy_dynamic_module_type_on_http_filter_request_body_status status = config_->on_http_filter_request_body_(thisAsVoidPtr(), in_module_filter_, end_of_stream); auto ret = static_cast(status); request_body_buffering_ = ret == FilterDataStatus::StopIterationAndBuffer || ret == FilterDataStatus::StopIterationAndWatermark; + current_chunk_ = nullptr; return ret; }; diff --git a/source/extensions/filters/http/dynamic_modules/filter.h b/source/extensions/filters/http/dynamic_modules/filter.h index 3b56b4996e8f..90124dd236e9 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.h +++ b/source/extensions/filters/http/dynamic_modules/filter.h @@ -64,11 +64,23 @@ class DynamicModuleHttpFilter : public Http::StreamFilter, ResponseHeaderMap* response_headers_ = nullptr; ResponseTrailerMap* response_trailers_ = nullptr; - // This is necessary to make it possible to see the last chunk of data in the buffer. - // Note(mathetake): this seems a bit hacky, but is used everywhere in the codebase. - // Maybe we should fix the buffering logic in the core? + // These are necessary to currectly deal with the "last chunk" of data in decodeData. // - // TODO: add an integration test where end_of_stream is true at the first chunk of data. + // current_chunk_ is used to track the current chunk of data being processed in decodeData. + // When the very first chunk of data is proceed while end_stream is true, we need to use the + // current_chunk_ as a buffer to handle the body related callbacks since decoder_callbacks_ will + // not see the first chunk of data yet. That might make you feel like why not just add the first + // chunk to the decoder_callbacks_ directly. In the case, *if* the callback doesn't return + // StopBuffer* kind of status, the data will be lost when end_stream is true. + // + // request_body_buffering_ is used to judge whether or not we should add the last chunk of data to + // the buffer returned by decoder_callbacks_->decodingBuffer(). That is the case only when the + // last chunk is not the very first chunk of data. As per the code comment on decoder callback's + // addDecodedData method, we specially handle that case. + // + // Note(mathatke): this asymmetry between decdeData and encodeData feels a bit odd but I see this + // in a lot of places across the extension codebase. Maybe refactor the core code? + Buffer::Instance* current_chunk_ = nullptr; bool request_body_buffering_ = false; /** diff --git a/test/extensions/dynamic_modules/http/integration_test.cc b/test/extensions/dynamic_modules/http/integration_test.cc index 065d8048ba1b..62640e4d7b3b 100644 --- a/test/extensions/dynamic_modules/http/integration_test.cc +++ b/test/extensions/dynamic_modules/http/integration_test.cc @@ -138,6 +138,27 @@ TEST_P(DynamicModulesIntegrationTest, BodyCallbacks) { EXPECT_EQ("new_response_body", response->body()); } +TEST_P(DynamicModulesIntegrationTest, BodyCallbacks_WithoutBuffering) { + initializeFilter("body_callbacks", "immediate_end_of_stream"); + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + + auto response = codec_client_->makeRequestWithBody(default_request_headers_, "request_body"); + + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(default_response_headers_, false); + upstream_request_->encodeData("response_body", true); + + ASSERT_TRUE(response->waitForEndStream()); + + // Verify the proxied request was received upstream, as expected. + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_EQ("new_request_body", upstream_request_->body().toString()); + // Verify the proxied response was received downstream, as expected. + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + EXPECT_EQ("new_response_body", response->body()); +} + TEST_P(DynamicModulesIntegrationTest, SendResponseFromOnRequestHeaders) { initializeFilter("send_response", "on_request_headers"); codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); diff --git a/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs b/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs index 45c327b49c1d..eaeee88f7dc1 100644 --- a/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs +++ b/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs @@ -17,7 +17,9 @@ fn new_http_filter_config_fn( "header_callbacks" => Some(Box::new(HeadersHttpFilterConfig { headers_to_add: config.to_string(), })), - "body_callbacks" => Some(Box::new(BodyCallbacksFilterConfig {})), + "body_callbacks" => Some(Box::new(BodyCallbacksFilterConfig { + immediate_end_of_stream: config == "immediate_end_of_stream", + })), "send_response" => Some(Box::new(SendResponseHttpFilterConfig { on_request_headers: config == "on_request_headers", })), @@ -156,13 +158,16 @@ impl Drop for HeadersHttpFilter { } } -struct BodyCallbacksFilterConfig {} +struct BodyCallbacksFilterConfig { + immediate_end_of_stream: bool, +} impl HttpFilterConfig for BodyCallbacksFilterConfig { fn new_http_filter(&self, _envoy: &mut EC) -> Box> { Box::new(BodyCallbacksFilter { + immediate_end_of_stream: self.immediate_end_of_stream, seen_request_body: false, seen_response_body: false, }) @@ -170,6 +175,8 @@ impl HttpFilterConfig } struct BodyCallbacksFilter { + /// This is true when we should not see end_of_stream=false, configured by the filter config. + immediate_end_of_stream: bool, seen_request_body: bool, seen_response_body: bool, } @@ -181,15 +188,17 @@ impl HttpFilter for BodyCallbacksFilter { end_of_stream: bool, ) -> envoy_dynamic_module_type_on_http_filter_request_body_status { if !end_of_stream { + assert!(!self.immediate_end_of_stream); // Buffer the request body until the end of stream. return envoy_dynamic_module_type_on_http_filter_request_body_status::StopIterationAndBuffer; } self.seen_request_body = true; - let request_body = envoy_filter.get_request_body().expect("request body"); + let request_body = envoy_filter + .get_request_body() + .expect("request body not available"); let mut body = String::new(); for chunk in request_body { - println!("chunk: {:?}", chunk); body.push_str(std::str::from_utf8(chunk.as_slice()).unwrap()); } assert_eq!(body, "request_body"); @@ -215,7 +224,9 @@ impl HttpFilter for BodyCallbacksFilter { } self.seen_response_body = true; - let response_body = envoy_filter.get_response_body().expect("response body"); + let response_body = envoy_filter + .get_response_body() + .expect("response body not available"); let mut body = String::new(); for chunk in response_body { body.push_str(std::str::from_utf8(chunk.as_slice()).unwrap()); From 95847bd4f985b32937b03687a7dad664322d70e0 Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 07:40:16 +0000 Subject: [PATCH 07/12] more Signed-off-by: Takeshi Yoneda --- .../filters/http/dynamic_modules/abi_impl.cc | 48 ++++++++++++++----- .../filters/http/dynamic_modules/filter.cc | 15 ++++-- .../filters/http/dynamic_modules/filter.h | 21 ++++---- 3 files changed, 57 insertions(+), 27 deletions(-) diff --git a/source/extensions/filters/http/dynamic_modules/abi_impl.cc b/source/extensions/filters/http/dynamic_modules/abi_impl.cc index b1efe2067df9..57a5d40a419a 100644 --- a/source/extensions/filters/http/dynamic_modules/abi_impl.cc +++ b/source/extensions/filters/http/dynamic_modules/abi_impl.cc @@ -375,11 +375,12 @@ bool envoy_dynamic_module_callback_http_get_request_body_vector( auto filter = static_cast(filter_envoy_ptr); auto buffer = filter->decoder_callbacks_->decodingBuffer(); if (!buffer) { - buffer = filter->current_chunk_; + buffer = filter->current_request_body_; if (!buffer) { return false; } - // See the comment on current_chunk_ for when we reach this line when we use the current_chunk_. + // See the comment on current_request_body_ for when we reach this line when we use the + // current_request_body_chunk_. } auto raw_slices = buffer->getRawSlices(std::nullopt); auto counter = 0; @@ -396,11 +397,12 @@ bool envoy_dynamic_module_callback_http_get_request_body_vector_size( auto filter = static_cast(filter_envoy_ptr); auto buffer = filter->decoder_callbacks_->decodingBuffer(); if (!buffer) { - buffer = filter->current_chunk_; + buffer = filter->current_request_body_; if (!buffer) { return false; } - // See the comment on current_chunk_ for when we reach this line when we use the current_chunk_. + // See the comment on current_request_body_ for when we reach this line when we use the + // current_request_body_chunk_. } *size = buffer->getRawSlices(std::nullopt).size(); return true; @@ -411,8 +413,9 @@ bool envoy_dynamic_module_callback_http_append_request_body( envoy_dynamic_module_type_buffer_module_ptr data, size_t length) { auto filter = static_cast(filter_envoy_ptr); if (!filter->decoder_callbacks_->decodingBuffer()) { - if (filter->current_chunk_) { // See the comment on current_chunk_ for when we enter this block. - filter->current_chunk_->add(absl::string_view(static_cast(data), length)); + if (filter->current_request_body_) { // See the comment on current_request_body_ for when we + // enter this block. + filter->current_request_body_->add(absl::string_view(static_cast(data), length)); return true; } return false; @@ -427,9 +430,10 @@ bool envoy_dynamic_module_callback_http_drain_request_body( envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, size_t number_of_bytes) { auto filter = static_cast(filter_envoy_ptr); if (!filter->decoder_callbacks_->decodingBuffer()) { - if (filter->current_chunk_) { // See the comment on current_chunk_ for when we enter this block. - auto size = std::min(filter->current_chunk_->length(), number_of_bytes); - filter->current_chunk_->drain(size); + if (filter->current_request_body_) { // See the comment on current_request_body_ for when we + // enter this block. + auto size = std::min(filter->current_request_body_->length(), number_of_bytes); + filter->current_request_body_->drain(size); return true; } return false; @@ -448,7 +452,12 @@ bool envoy_dynamic_module_callback_http_get_response_body_vector( auto filter = static_cast(filter_envoy_ptr); auto buffer = filter->encoder_callbacks_->encodingBuffer(); if (!buffer) { - return false; + buffer = filter->current_response_body_; + if (!buffer) { + return false; + } + // See the comment on current_response_body_ for when we reach this line when we use the + // current_response_body_chunk_. } auto raw_slices = buffer->getRawSlices(std::nullopt); auto counter = 0; @@ -465,7 +474,12 @@ bool envoy_dynamic_module_callback_http_get_response_body_vector_size( auto filter = static_cast(filter_envoy_ptr); auto buffer = filter->encoder_callbacks_->encodingBuffer(); if (!buffer) { - return false; + buffer = filter->current_response_body_; + if (!buffer) { + return false; + } + // See the comment on current_response_body_ for when we reach this line when we use the + // current_response_body_chunk_. } *size = buffer->getRawSlices(std::nullopt).size(); return true; @@ -476,6 +490,12 @@ bool envoy_dynamic_module_callback_http_append_response_body( envoy_dynamic_module_type_buffer_module_ptr data, size_t length) { auto filter = static_cast(filter_envoy_ptr); if (!filter->encoder_callbacks_->encodingBuffer()) { + if (filter->current_response_body_) { // See the comment on current_response_body_ for when we + // enter this block. + filter->current_response_body_->add( + absl::string_view(static_cast(data), length)); + return true; + } return false; } filter->encoder_callbacks_->modifyEncodingBuffer([data, length](Buffer::Instance& buffer) { @@ -488,6 +508,12 @@ bool envoy_dynamic_module_callback_http_drain_response_body( envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, size_t number_of_bytes) { auto filter = static_cast(filter_envoy_ptr); if (!filter->encoder_callbacks_->encodingBuffer()) { + if (filter->current_response_body_) { // See the comment on current_response_body_ for when we + // enter this block. + auto size = std::min(filter->current_response_body_->length(), number_of_bytes); + filter->current_response_body_->drain(size); + return true; + } return false; } diff --git a/source/extensions/filters/http/dynamic_modules/filter.cc b/source/extensions/filters/http/dynamic_modules/filter.cc index 866267ceab22..0732d4eb52ca 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.cc +++ b/source/extensions/filters/http/dynamic_modules/filter.cc @@ -33,15 +33,17 @@ FilterHeadersStatus DynamicModuleHttpFilter::decodeHeaders(RequestHeaderMap& hea FilterDataStatus DynamicModuleHttpFilter::decodeData(Buffer::Instance& b, bool end_of_stream) { if (request_body_buffering_ && end_of_stream) { + // To make the very last chunk of the body available to the filter when buffering is enabled, + // we need to call addDecodedData. See the code comment there for more details. decoder_callbacks_->addDecodedData(b, false); } - current_chunk_ = &b; + current_request_body_ = &b; const envoy_dynamic_module_type_on_http_filter_request_body_status status = config_->on_http_filter_request_body_(thisAsVoidPtr(), in_module_filter_, end_of_stream); auto ret = static_cast(status); request_body_buffering_ = ret == FilterDataStatus::StopIterationAndBuffer || ret == FilterDataStatus::StopIterationAndWatermark; - current_chunk_ = nullptr; + current_request_body_ = nullptr; return ret; }; @@ -70,9 +72,16 @@ FilterHeadersStatus DynamicModuleHttpFilter::encodeHeaders(ResponseHeaderMap& he return static_cast(status); }; -FilterDataStatus DynamicModuleHttpFilter::encodeData(Buffer::Instance&, bool end_of_stream) { +FilterDataStatus DynamicModuleHttpFilter::encodeData(Buffer::Instance& b, bool end_of_stream) { + if (response_body_buffering_ && end_of_stream) { + // To make the very last chunk of the body available to the filter when buffering is enabled, + // we need to call addDecodedData. See the code comment there for more details. + encoder_callbacks_->addEncodedData(b, false); + } + current_response_body_ = &b; const envoy_dynamic_module_type_on_http_filter_response_body_status status = config_->on_http_filter_response_body_(thisAsVoidPtr(), in_module_filter_, end_of_stream); + current_response_body_ = nullptr; return static_cast(status); }; diff --git a/source/extensions/filters/http/dynamic_modules/filter.h b/source/extensions/filters/http/dynamic_modules/filter.h index 90124dd236e9..c2e901cbe51e 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.h +++ b/source/extensions/filters/http/dynamic_modules/filter.h @@ -64,24 +64,19 @@ class DynamicModuleHttpFilter : public Http::StreamFilter, ResponseHeaderMap* response_headers_ = nullptr; ResponseTrailerMap* response_trailers_ = nullptr; - // These are necessary to currectly deal with the "last chunk" of data in decodeData. - // - // current_chunk_ is used to track the current chunk of data being processed in decodeData. - // When the very first chunk of data is proceed while end_stream is true, we need to use the - // current_chunk_ as a buffer to handle the body related callbacks since decoder_callbacks_ will - // not see the first chunk of data yet. That might make you feel like why not just add the first - // chunk to the decoder_callbacks_ directly. In the case, *if* the callback doesn't return - // StopBuffer* kind of status, the data will be lost when end_stream is true. - // + // current_request_body_chunk_ is used to store the current chunk of data being processed by the + // decodeData callback. This is used when the filter is not buffering the request body where + // decoder_callbacks_->decodingBuffer() is nullptr. + Buffer::Instance* current_request_body_ = nullptr; + // This is the same as current_response_body_ but for the response body. + Buffer::Instance* current_response_body_ = nullptr; // request_body_buffering_ is used to judge whether or not we should add the last chunk of data to // the buffer returned by decoder_callbacks_->decodingBuffer(). That is the case only when the // last chunk is not the very first chunk of data. As per the code comment on decoder callback's // addDecodedData method, we specially handle that case. - // - // Note(mathatke): this asymmetry between decdeData and encodeData feels a bit odd but I see this - // in a lot of places across the extension codebase. Maybe refactor the core code? - Buffer::Instance* current_chunk_ = nullptr; bool request_body_buffering_ = false; + // This is the same as request_body_buffering_ but for the response body. + bool response_body_buffering_ = false; /** * Helper to get the downstream information of the stream. From c0b8a3ca6765e67735ca47e6f066e36fe9de3c9a Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 07:43:29 +0000 Subject: [PATCH 08/12] more Signed-off-by: Takeshi Yoneda --- .../filters/http/dynamic_modules/filter.cc | 21 ++++++++----------- .../filters/http/dynamic_modules/filter.h | 7 ------- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/source/extensions/filters/http/dynamic_modules/filter.cc b/source/extensions/filters/http/dynamic_modules/filter.cc index 0732d4eb52ca..a3f84eb59d71 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.cc +++ b/source/extensions/filters/http/dynamic_modules/filter.cc @@ -31,20 +31,17 @@ FilterHeadersStatus DynamicModuleHttpFilter::decodeHeaders(RequestHeaderMap& hea return static_cast(status); }; -FilterDataStatus DynamicModuleHttpFilter::decodeData(Buffer::Instance& b, bool end_of_stream) { - if (request_body_buffering_ && end_of_stream) { +FilterDataStatus DynamicModuleHttpFilter::decodeData(Buffer::Instance& chunk, bool end_of_stream) { + if (end_of_stream && decoder_callbacks_->decodingBuffer()) { // To make the very last chunk of the body available to the filter when buffering is enabled, // we need to call addDecodedData. See the code comment there for more details. - decoder_callbacks_->addDecodedData(b, false); + decoder_callbacks_->addDecodedData(chunk, false); } - current_request_body_ = &b; + current_request_body_ = &chunk; const envoy_dynamic_module_type_on_http_filter_request_body_status status = config_->on_http_filter_request_body_(thisAsVoidPtr(), in_module_filter_, end_of_stream); - auto ret = static_cast(status); - request_body_buffering_ = ret == FilterDataStatus::StopIterationAndBuffer || - ret == FilterDataStatus::StopIterationAndWatermark; current_request_body_ = nullptr; - return ret; + return static_cast(status); }; FilterTrailersStatus DynamicModuleHttpFilter::decodeTrailers(RequestTrailerMap& trailers) { @@ -72,13 +69,13 @@ FilterHeadersStatus DynamicModuleHttpFilter::encodeHeaders(ResponseHeaderMap& he return static_cast(status); }; -FilterDataStatus DynamicModuleHttpFilter::encodeData(Buffer::Instance& b, bool end_of_stream) { - if (response_body_buffering_ && end_of_stream) { +FilterDataStatus DynamicModuleHttpFilter::encodeData(Buffer::Instance& chunk, bool end_of_stream) { + if (end_of_stream && encoder_callbacks_->encodingBuffer()) { // To make the very last chunk of the body available to the filter when buffering is enabled, // we need to call addDecodedData. See the code comment there for more details. - encoder_callbacks_->addEncodedData(b, false); + encoder_callbacks_->addEncodedData(chunk, false); } - current_response_body_ = &b; + current_response_body_ = &chunk; const envoy_dynamic_module_type_on_http_filter_response_body_status status = config_->on_http_filter_response_body_(thisAsVoidPtr(), in_module_filter_, end_of_stream); current_response_body_ = nullptr; diff --git a/source/extensions/filters/http/dynamic_modules/filter.h b/source/extensions/filters/http/dynamic_modules/filter.h index c2e901cbe51e..d3ce6f95978e 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.h +++ b/source/extensions/filters/http/dynamic_modules/filter.h @@ -70,13 +70,6 @@ class DynamicModuleHttpFilter : public Http::StreamFilter, Buffer::Instance* current_request_body_ = nullptr; // This is the same as current_response_body_ but for the response body. Buffer::Instance* current_response_body_ = nullptr; - // request_body_buffering_ is used to judge whether or not we should add the last chunk of data to - // the buffer returned by decoder_callbacks_->decodingBuffer(). That is the case only when the - // last chunk is not the very first chunk of data. As per the code comment on decoder callback's - // addDecodedData method, we specially handle that case. - bool request_body_buffering_ = false; - // This is the same as request_body_buffering_ but for the response body. - bool response_body_buffering_ = false; /** * Helper to get the downstream information of the stream. From 5325902d41ac535a7e1c140d465c9b348a552c38 Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 07:45:21 +0000 Subject: [PATCH 09/12] more Signed-off-by: Takeshi Yoneda --- .../filters/http/dynamic_modules/abi_impl.cc | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/source/extensions/filters/http/dynamic_modules/abi_impl.cc b/source/extensions/filters/http/dynamic_modules/abi_impl.cc index 57a5d40a419a..d7706b8622dc 100644 --- a/source/extensions/filters/http/dynamic_modules/abi_impl.cc +++ b/source/extensions/filters/http/dynamic_modules/abi_impl.cc @@ -379,8 +379,7 @@ bool envoy_dynamic_module_callback_http_get_request_body_vector( if (!buffer) { return false; } - // See the comment on current_request_body_ for when we reach this line when we use the - // current_request_body_chunk_. + // See the comment on current_request_body_ for when we reach this. } auto raw_slices = buffer->getRawSlices(std::nullopt); auto counter = 0; @@ -401,8 +400,7 @@ bool envoy_dynamic_module_callback_http_get_request_body_vector_size( if (!buffer) { return false; } - // See the comment on current_request_body_ for when we reach this line when we use the - // current_request_body_chunk_. + // See the comment on current_request_body_ for when we reach this line. } *size = buffer->getRawSlices(std::nullopt).size(); return true; @@ -456,8 +454,7 @@ bool envoy_dynamic_module_callback_http_get_response_body_vector( if (!buffer) { return false; } - // See the comment on current_response_body_ for when we reach this line when we use the - // current_response_body_chunk_. + // See the comment on current_response_body_ for when we reach this line. } auto raw_slices = buffer->getRawSlices(std::nullopt); auto counter = 0; @@ -478,8 +475,7 @@ bool envoy_dynamic_module_callback_http_get_response_body_vector_size( if (!buffer) { return false; } - // See the comment on current_response_body_ for when we reach this line when we use the - // current_response_body_chunk_. + // See the comment on current_response_body_ for when we reach this line. } *size = buffer->getRawSlices(std::nullopt).size(); return true; From f093d3a8bfbcc034f813a850a1d0c0aa37e95fa0 Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 07:46:07 +0000 Subject: [PATCH 10/12] more Signed-off-by: Takeshi Yoneda --- source/extensions/filters/http/dynamic_modules/filter.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/extensions/filters/http/dynamic_modules/filter.cc b/source/extensions/filters/http/dynamic_modules/filter.cc index a3f84eb59d71..4f201535006b 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.cc +++ b/source/extensions/filters/http/dynamic_modules/filter.cc @@ -72,7 +72,7 @@ FilterHeadersStatus DynamicModuleHttpFilter::encodeHeaders(ResponseHeaderMap& he FilterDataStatus DynamicModuleHttpFilter::encodeData(Buffer::Instance& chunk, bool end_of_stream) { if (end_of_stream && encoder_callbacks_->encodingBuffer()) { // To make the very last chunk of the body available to the filter when buffering is enabled, - // we need to call addDecodedData. See the code comment there for more details. + // we need to call addEncodedData. See the code comment there for more details. encoder_callbacks_->addEncodedData(chunk, false); } current_response_body_ = &chunk; From 84a3296a821779498d418f58d7d433e21f77ec6f Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 07:55:27 +0000 Subject: [PATCH 11/12] test Signed-off-by: Takeshi Yoneda --- test/extensions/dynamic_modules/http/filter_test.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/extensions/dynamic_modules/http/filter_test.cc b/test/extensions/dynamic_modules/http/filter_test.cc index 0849b32a88c3..d39e1d0acdcd 100644 --- a/test/extensions/dynamic_modules/http/filter_test.cc +++ b/test/extensions/dynamic_modules/http/filter_test.cc @@ -168,8 +168,12 @@ TEST(DynamiModulesTest, BodyCallbacks) { filter->setEncoderFilterCallbacks(encoder_callbacks); Buffer::OwnedImpl request_body; EXPECT_CALL(decoder_callbacks, decodingBuffer()).WillRepeatedly(testing::Return(&request_body)); + EXPECT_CALL(decoder_callbacks, addDecodedData(_, _)) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> void {})); Buffer::OwnedImpl response_body; EXPECT_CALL(encoder_callbacks, encodingBuffer()).WillRepeatedly(testing::Return(&response_body)); + EXPECT_CALL(encoder_callbacks, addEncodedData(_, _)) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> void {})); EXPECT_CALL(decoder_callbacks, modifyDecodingBuffer(_)) .WillRepeatedly(Invoke([&](std::function callback) -> void { callback(request_body); From 48440873ede062c3359e4b7c5e03bd5ee688ce28 Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Sat, 1 Feb 2025 17:33:01 +0000 Subject: [PATCH 12/12] fix comment Signed-off-by: Takeshi Yoneda --- source/extensions/filters/http/dynamic_modules/filter.h | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/extensions/filters/http/dynamic_modules/filter.h b/source/extensions/filters/http/dynamic_modules/filter.h index d3ce6f95978e..7c99890d3b65 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.h +++ b/source/extensions/filters/http/dynamic_modules/filter.h @@ -64,11 +64,10 @@ class DynamicModuleHttpFilter : public Http::StreamFilter, ResponseHeaderMap* response_headers_ = nullptr; ResponseTrailerMap* response_trailers_ = nullptr; - // current_request_body_chunk_ is used to store the current chunk of data being processed by the - // decodeData callback. This is used when the filter is not buffering the request body where - // decoder_callbacks_->decodingBuffer() is nullptr. + // These are used to hold the current chunk of the request/response body during the decodeData and + // encodeData callbacks. It is only valid during the call and should not be used outside of the + // call. Buffer::Instance* current_request_body_ = nullptr; - // This is the same as current_response_body_ but for the response body. Buffer::Instance* current_response_body_ = nullptr; /**