Skip to content

Commit

Permalink
Upgrade hyper from 0.14 to 1 in tests (#221)
Browse files Browse the repository at this point in the history
* Upgrade `hyper` from `0.14` to `1` in tests

Signed-off-by: Thomas Farr <[email protected]>

* Changelog entry

Signed-off-by: Thomas Farr <[email protected]>

---------

Signed-off-by: Thomas Farr <[email protected]>
  • Loading branch information
Xtansia authored Nov 27, 2023
1 parent ec68493 commit 7548c9f
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Dependencies
- Bumps `aws-*` dependencies to `1` ([#219](https://github.com/opensearch-project/opensearch-rs/pull/219))
- Bumps `itertools` from 0.11.0 to 0.12.0
- Bumps `hyper` from 0.14 to 1 in tests ([#221](https://github.com/opensearch-project/opensearch-rs/pull/221))

### Changed

Expand Down
7 changes: 4 additions & 3 deletions opensearch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ aws-smithy-async = "1"
chrono = { version = "0.4", features = ["serde"] }
clap = "2"
futures = "0.3.1"
http = "0.2"
hyper = { version = "0.14", default-features = false, features = ["tcp", "stream", "server"] }
http-body-util = "0.1.0"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
regex="1.4"
sysinfo = "0.29.0"
test-case = "3"
textwrap = "0.16"
tokio = { version = "1.0", default-features = false, features = ["macros", "net", "time", "rt-multi-thread", "sync"] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
xml-rs = "0.8"
Expand Down
6 changes: 3 additions & 3 deletions opensearch/tests/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn basic_auth_header() -> anyhow::Result<()> {
"authorization",
String::from_utf8(header_value).unwrap()
);
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand All @@ -76,7 +76,7 @@ async fn api_key_header() -> anyhow::Result<()> {
"authorization",
String::from_utf8(header_value).unwrap()
);
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand All @@ -92,7 +92,7 @@ async fn api_key_header() -> anyhow::Result<()> {
async fn bearer_header() -> anyhow::Result<()> {
let server = server::http(move |req| async move {
assert_header_eq!(req, "authorization", "Bearer access_token");
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand Down
6 changes: 3 additions & 3 deletions opensearch/tests/aws_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use aws_credential_types::Credentials as AwsCredentials;
use aws_smithy_async::time::StaticTimeSource;
use aws_types::region::Region;
use common::*;
use http::header::HOST;
use opensearch::{auth::Credentials, indices::IndicesCreateParts, OpenSearch};
use regex::Regex;
use reqwest::header::HOST;
use serde_json::json;
use std::convert::TryInto;
use test_case::test_case;
Expand Down Expand Up @@ -105,7 +105,7 @@ async fn aws_auth_get() -> anyhow::Result<()> {
"x-amz-content-sha256",
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
); // SHA of empty string
http::Response::default()
server::empty_response()
});

let client = create_aws_client(format!("http://{}", server.addr()).as_ref())?;
Expand All @@ -122,7 +122,7 @@ async fn aws_auth_post() -> anyhow::Result<()> {
"x-amz-content-sha256",
"f3a842f988a653a734ebe4e57c45f19293a002241a72f0b3abbff71e4f5297b9"
); // SHA of the JSON
http::Response::default()
server::empty_response()
});

let client = create_aws_client(format!("http://{}", server.addr()).as_ref())?;
Expand Down
16 changes: 8 additions & 8 deletions opensearch/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn default_user_agent_content_type_accept_headers() -> anyhow::Result<()>
assert_header_eq!(req, "user-agent", DEFAULT_USER_AGENT);
assert_header_eq!(req, "content-type", "application/json");
assert_header_eq!(req, "accept", "application/json");
http::Response::default()
server::empty_response()
});

let client = client::create_for_url(format!("http://{}", server.addr()).as_ref());
Expand All @@ -68,7 +68,7 @@ async fn default_user_agent_content_type_accept_headers() -> anyhow::Result<()>
async fn default_header() -> anyhow::Result<()> {
let server = server::http(move |req| async move {
assert_header_eq!(req, "x-opaque-id", "foo");
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()).header(
Expand All @@ -86,7 +86,7 @@ async fn default_header() -> anyhow::Result<()> {
async fn override_default_header() -> anyhow::Result<()> {
let server = server::http(move |req| async move {
assert_header_eq!(req, "x-opaque-id", "bar");
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()).header(
Expand All @@ -111,7 +111,7 @@ async fn override_default_header() -> anyhow::Result<()> {
async fn x_opaque_id_header() -> anyhow::Result<()> {
let server = server::http(move |req| async move {
assert_header_eq!(req, "x-opaque-id", "foo");
http::Response::default()
server::empty_response()
});

let client = client::create_for_url(format!("http://{}", server.addr()).as_ref());
Expand All @@ -131,7 +131,7 @@ async fn x_opaque_id_header() -> anyhow::Result<()> {
async fn uses_global_request_timeout() {
let server = server::http(move |_| async move {
std::thread::sleep(Duration::from_secs(1));
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand All @@ -150,7 +150,7 @@ async fn uses_global_request_timeout() {
async fn uses_call_request_timeout() {
let server = server::http(move |_| async move {
std::thread::sleep(Duration::from_secs(1));
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand All @@ -173,7 +173,7 @@ async fn uses_call_request_timeout() {
async fn call_request_timeout_supersedes_global_timeout() {
let server = server::http(move |_| async move {
std::thread::sleep(Duration::from_secs(1));
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand Down Expand Up @@ -246,7 +246,7 @@ async fn serialize_querystring() -> anyhow::Result<()> {
req.uri().query(),
Some("filter_path=took%2C_shards&pretty=true&q=title%3AOpenSearch&track_total_hits=100000")
);
http::Response::default()
server::empty_response()
});

let client = client::create_for_url(format!("http://{}", server.addr()).as_ref());
Expand Down
109 changes: 70 additions & 39 deletions opensearch/tests/common/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,34 @@
// https://github.com/seanmonstar/reqwest/blob/master/LICENSE-APACHE

use std::{
convert::Infallible, future::Future, net, sync::mpsc as std_mpsc, thread, time::Duration,
convert::Infallible,
future::Future,
net::{self, SocketAddr},
sync::mpsc as std_mpsc,
thread,
time::Duration,
};

use http::Request;
use hyper::Body;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
oneshot,
use bytes::Bytes;
use http_body_util::Empty;
use hyper::{
body::{Body, Incoming},
server::conn::http1,
service::service_fn,
Request, Response,
};
use hyper_util::rt::TokioIo;
use tokio::{
net::{TcpListener, TcpStream},
sync::{broadcast, mpsc},
};

pub use http::Response;
use tokio::runtime;

pub struct Server {
addr: net::SocketAddr,
panic_rx: std_mpsc::Receiver<()>,
shutdown_tx: Option<oneshot::Sender<()>>,
shutdown_tx: Option<broadcast::Sender<()>>,
}

impl Server {
Expand All @@ -61,7 +72,7 @@ impl Server {
impl Drop for Server {
fn drop(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
tx.send(()).unwrap();
}

if !::std::thread::panicking() {
Expand All @@ -72,46 +83,62 @@ impl Drop for Server {
}
}

pub fn http<F, Fut>(func: F) -> Server
pub fn http<F, Fut, B>(func: F) -> Server
where
F: Fn(http::Request<hyper::Body>) -> Fut + Clone + Send + 'static,
Fut: Future<Output = http::Response<hyper::Body>> + Send + 'static,
F: Fn(Request<Incoming>) -> Fut + Clone + Send + 'static,
Fut: Future<Output = Response<B>> + Send + 'static,
B: Body + Send + 'static,
B::Data: Send,
B::Error: std::error::Error + Send + Sync,
{
//Spawn new runtime in thread to prevent reactor execution context conflict
let thread_name = thread::current().name().unwrap_or("<unknown>").to_owned();

thread::spawn(move || {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("new rt");
let _ = rt.enter();

let (shutdown_tx, mut shutdown_rx) = broadcast::channel(1);
let listener = rt
.block_on(TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))))
.unwrap();
let addr = listener.local_addr().unwrap();

let srv = async move {
loop {
let (stream, _) = tokio::select! {
res = listener.accept() => res?,
_ = shutdown_rx.recv() => break
};
let io = TokioIo::new(stream);

let srv = {
let _guard = rt.enter();
hyper::Server::bind(&([127, 0, 0, 1], 0).into()).serve(hyper::service::make_service_fn(
move |_| {
let func = func.clone();
async move {
Ok::<_, Infallible>(hyper::service::service_fn(move |req| {
let fut = func(req);
async move { Ok::<_, Infallible>(fut.await) }
}))
let mut func = func.clone();
let mut shutdown_rx = shutdown_rx.resubscribe();

tokio::task::spawn(async move {
let conn = http1::Builder::new().serve_connection(
io,
service_fn(move |req| {
let func = func.clone();
async move { Ok::<_, Infallible>(func(req).await) }
}),
);
tokio::pin!(conn);
tokio::select! {
res = conn.as_mut() => {},
_ = shutdown_rx.recv() => conn.as_mut().graceful_shutdown()
}
},
))
});
}
Ok::<(), anyhow::Error>(())
};

let addr = srv.local_addr();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let srv = srv.with_graceful_shutdown(async move {
let _ = shutdown_rx.await;
});

let (panic_tx, panic_rx) = std_mpsc::channel();
let tname = format!(
"test({})-support-server",
thread::current().name().unwrap_or("<unknown>")
);
let thread_name = format!("test({})-support-server", thread_name);
thread::Builder::new()
.name(tname)
.name(thread_name)
.spawn(move || {
rt.block_on(srv).unwrap();
let _ = panic_tx.send(());
Expand All @@ -128,14 +155,18 @@ where
.unwrap()
}

pub fn capturing_http() -> (Server, UnboundedReceiver<Request<Body>>) {
let (tx, rx) = unbounded_channel();
pub fn capturing_http() -> (Server, mpsc::UnboundedReceiver<Request<Incoming>>) {
let (tx, rx) = mpsc::unbounded_channel();
let server = http(move |req| {
let tx = tx.clone();
async move {
tx.send(req).unwrap();
http::Response::default()
empty_response()
}
});
(server, rx)
}

pub fn empty_response() -> Response<Empty<Bytes>> {
Default::default()
}

0 comments on commit 7548c9f

Please sign in to comment.