diff --git a/CHANGELOG.md b/CHANGELOG.md index e325c2fc..818e7558 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Dependencies - Bumps `aws-*` dependencies to `1` ([#219](https://github.com/opensearch-project/opensearch-rs/pull/219)) - Bumps `itertools` from 0.11.0 to 0.12.0 +- Bumps `hyper` from 0.14 to 1 in tests ([#221](https://github.com/opensearch-project/opensearch-rs/pull/221)) ### Changed diff --git a/opensearch/Cargo.toml b/opensearch/Cargo.toml index 7f9c4526..28bb3ba3 100644 --- a/opensearch/Cargo.toml +++ b/opensearch/Cargo.toml @@ -52,13 +52,14 @@ aws-smithy-async = "1" chrono = { version = "0.4", features = ["serde"] } clap = "2" futures = "0.3.1" -http = "0.2" -hyper = { version = "0.14", default-features = false, features = ["tcp", "stream", "server"] } +http-body-util = "0.1.0" +hyper = { version = "1", features = ["full"] } +hyper-util = { version = "0.1", features = ["full"] } regex="1.4" sysinfo = "0.29.0" test-case = "3" textwrap = "0.16" -tokio = { version = "1.0", default-features = false, features = ["macros", "net", "time", "rt-multi-thread", "sync"] } +tokio = { version = "1", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } xml-rs = "0.8" diff --git a/opensearch/tests/auth.rs b/opensearch/tests/auth.rs index 4967fed7..96719305 100644 --- a/opensearch/tests/auth.rs +++ b/opensearch/tests/auth.rs @@ -50,7 +50,7 @@ async fn basic_auth_header() -> anyhow::Result<()> { "authorization", String::from_utf8(header_value).unwrap() ); - http::Response::default() + server::empty_response() }); let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()) @@ -76,7 +76,7 @@ async fn api_key_header() -> anyhow::Result<()> { "authorization", String::from_utf8(header_value).unwrap() ); - http::Response::default() + server::empty_response() }); let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()) @@ -92,7 +92,7 @@ async fn api_key_header() -> anyhow::Result<()> { async fn bearer_header() -> anyhow::Result<()> { let server = server::http(move |req| async move { assert_header_eq!(req, "authorization", "Bearer access_token"); - http::Response::default() + server::empty_response() }); let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()) diff --git a/opensearch/tests/aws_auth.rs b/opensearch/tests/aws_auth.rs index cfa6b4b9..200d421f 100644 --- a/opensearch/tests/aws_auth.rs +++ b/opensearch/tests/aws_auth.rs @@ -18,9 +18,9 @@ use aws_credential_types::Credentials as AwsCredentials; use aws_smithy_async::time::StaticTimeSource; use aws_types::region::Region; use common::*; -use http::header::HOST; use opensearch::{auth::Credentials, indices::IndicesCreateParts, OpenSearch}; use regex::Regex; +use reqwest::header::HOST; use serde_json::json; use std::convert::TryInto; use test_case::test_case; @@ -105,7 +105,7 @@ async fn aws_auth_get() -> anyhow::Result<()> { "x-amz-content-sha256", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" ); // SHA of empty string - http::Response::default() + server::empty_response() }); let client = create_aws_client(format!("http://{}", server.addr()).as_ref())?; @@ -122,7 +122,7 @@ async fn aws_auth_post() -> anyhow::Result<()> { "x-amz-content-sha256", "f3a842f988a653a734ebe4e57c45f19293a002241a72f0b3abbff71e4f5297b9" ); // SHA of the JSON - http::Response::default() + server::empty_response() }); let client = create_aws_client(format!("http://{}", server.addr()).as_ref())?; diff --git a/opensearch/tests/client.rs b/opensearch/tests/client.rs index 48fa6e7f..f4358f67 100644 --- a/opensearch/tests/client.rs +++ b/opensearch/tests/client.rs @@ -55,7 +55,7 @@ async fn default_user_agent_content_type_accept_headers() -> anyhow::Result<()> assert_header_eq!(req, "user-agent", DEFAULT_USER_AGENT); assert_header_eq!(req, "content-type", "application/json"); assert_header_eq!(req, "accept", "application/json"); - http::Response::default() + server::empty_response() }); let client = client::create_for_url(format!("http://{}", server.addr()).as_ref()); @@ -68,7 +68,7 @@ async fn default_user_agent_content_type_accept_headers() -> anyhow::Result<()> async fn default_header() -> anyhow::Result<()> { let server = server::http(move |req| async move { assert_header_eq!(req, "x-opaque-id", "foo"); - http::Response::default() + server::empty_response() }); let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()).header( @@ -86,7 +86,7 @@ async fn default_header() -> anyhow::Result<()> { async fn override_default_header() -> anyhow::Result<()> { let server = server::http(move |req| async move { assert_header_eq!(req, "x-opaque-id", "bar"); - http::Response::default() + server::empty_response() }); let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()).header( @@ -111,7 +111,7 @@ async fn override_default_header() -> anyhow::Result<()> { async fn x_opaque_id_header() -> anyhow::Result<()> { let server = server::http(move |req| async move { assert_header_eq!(req, "x-opaque-id", "foo"); - http::Response::default() + server::empty_response() }); let client = client::create_for_url(format!("http://{}", server.addr()).as_ref()); @@ -131,7 +131,7 @@ async fn x_opaque_id_header() -> anyhow::Result<()> { async fn uses_global_request_timeout() { let server = server::http(move |_| async move { std::thread::sleep(Duration::from_secs(1)); - http::Response::default() + server::empty_response() }); let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()) @@ -150,7 +150,7 @@ async fn uses_global_request_timeout() { async fn uses_call_request_timeout() { let server = server::http(move |_| async move { std::thread::sleep(Duration::from_secs(1)); - http::Response::default() + server::empty_response() }); let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()) @@ -173,7 +173,7 @@ async fn uses_call_request_timeout() { async fn call_request_timeout_supersedes_global_timeout() { let server = server::http(move |_| async move { std::thread::sleep(Duration::from_secs(1)); - http::Response::default() + server::empty_response() }); let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()) @@ -246,7 +246,7 @@ async fn serialize_querystring() -> anyhow::Result<()> { req.uri().query(), Some("filter_path=took%2C_shards&pretty=true&q=title%3AOpenSearch&track_total_hits=100000") ); - http::Response::default() + server::empty_response() }); let client = client::create_for_url(format!("http://{}", server.addr()).as_ref()); diff --git a/opensearch/tests/common/server.rs b/opensearch/tests/common/server.rs index ec9e798a..07069105 100644 --- a/opensearch/tests/common/server.rs +++ b/opensearch/tests/common/server.rs @@ -33,23 +33,34 @@ // https://github.com/seanmonstar/reqwest/blob/master/LICENSE-APACHE use std::{ - convert::Infallible, future::Future, net, sync::mpsc as std_mpsc, thread, time::Duration, + convert::Infallible, + future::Future, + net::{self, SocketAddr}, + sync::mpsc as std_mpsc, + thread, + time::Duration, }; -use http::Request; -use hyper::Body; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver}, - oneshot, +use bytes::Bytes; +use http_body_util::Empty; +use hyper::{ + body::{Body, Incoming}, + server::conn::http1, + service::service_fn, + Request, Response, +}; +use hyper_util::rt::TokioIo; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::{broadcast, mpsc}, }; -pub use http::Response; use tokio::runtime; pub struct Server { addr: net::SocketAddr, panic_rx: std_mpsc::Receiver<()>, - shutdown_tx: Option>, + shutdown_tx: Option>, } impl Server { @@ -61,7 +72,7 @@ impl Server { impl Drop for Server { fn drop(&mut self) { if let Some(tx) = self.shutdown_tx.take() { - let _ = tx.send(()); + tx.send(()).unwrap(); } if !::std::thread::panicking() { @@ -72,46 +83,62 @@ impl Drop for Server { } } -pub fn http(func: F) -> Server +pub fn http(func: F) -> Server where - F: Fn(http::Request) -> Fut + Clone + Send + 'static, - Fut: Future> + Send + 'static, + F: Fn(Request) -> Fut + Clone + Send + 'static, + Fut: Future> + Send + 'static, + B: Body + Send + 'static, + B::Data: Send, + B::Error: std::error::Error + Send + Sync, { - //Spawn new runtime in thread to prevent reactor execution context conflict + let thread_name = thread::current().name().unwrap_or("").to_owned(); + thread::spawn(move || { let rt = runtime::Builder::new_current_thread() .enable_all() .build() .expect("new rt"); + let _ = rt.enter(); + + let (shutdown_tx, mut shutdown_rx) = broadcast::channel(1); + let listener = rt + .block_on(TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))) + .unwrap(); + let addr = listener.local_addr().unwrap(); + + let srv = async move { + loop { + let (stream, _) = tokio::select! { + res = listener.accept() => res?, + _ = shutdown_rx.recv() => break + }; + let io = TokioIo::new(stream); - let srv = { - let _guard = rt.enter(); - hyper::Server::bind(&([127, 0, 0, 1], 0).into()).serve(hyper::service::make_service_fn( - move |_| { - let func = func.clone(); - async move { - Ok::<_, Infallible>(hyper::service::service_fn(move |req| { - let fut = func(req); - async move { Ok::<_, Infallible>(fut.await) } - })) + let mut func = func.clone(); + let mut shutdown_rx = shutdown_rx.resubscribe(); + + tokio::task::spawn(async move { + let conn = http1::Builder::new().serve_connection( + io, + service_fn(move |req| { + let func = func.clone(); + async move { Ok::<_, Infallible>(func(req).await) } + }), + ); + tokio::pin!(conn); + tokio::select! { + res = conn.as_mut() => {}, + _ = shutdown_rx.recv() => conn.as_mut().graceful_shutdown() } - }, - )) + }); + } + Ok::<(), anyhow::Error>(()) }; - let addr = srv.local_addr(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let srv = srv.with_graceful_shutdown(async move { - let _ = shutdown_rx.await; - }); - let (panic_tx, panic_rx) = std_mpsc::channel(); - let tname = format!( - "test({})-support-server", - thread::current().name().unwrap_or("") - ); + let thread_name = format!("test({})-support-server", thread_name); thread::Builder::new() - .name(tname) + .name(thread_name) .spawn(move || { rt.block_on(srv).unwrap(); let _ = panic_tx.send(()); @@ -128,14 +155,18 @@ where .unwrap() } -pub fn capturing_http() -> (Server, UnboundedReceiver>) { - let (tx, rx) = unbounded_channel(); +pub fn capturing_http() -> (Server, mpsc::UnboundedReceiver>) { + let (tx, rx) = mpsc::unbounded_channel(); let server = http(move |req| { let tx = tx.clone(); async move { tx.send(req).unwrap(); - http::Response::default() + empty_response() } }); (server, rx) } + +pub fn empty_response() -> Response> { + Default::default() +}