Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Upgrade hyper from 0.14 to 1 in tests #222

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Dependencies
- Bumps `aws-*` dependencies to `1` ([#219](https://github.com/opensearch-project/opensearch-rs/pull/219))
- Bumps `itertools` from 0.11.0 to 0.12.0
- Bumps `hyper` from 0.14 to 1 in tests ([#221](https://github.com/opensearch-project/opensearch-rs/pull/221))

### Changed

Expand Down
7 changes: 4 additions & 3 deletions opensearch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ aws-smithy-async = "1"
chrono = { version = "0.4", features = ["serde"] }
clap = "2"
futures = "0.3.1"
http = "0.2"
hyper = { version = "0.14", default-features = false, features = ["tcp", "stream", "server"] }
http-body-util = "0.1.0"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
regex="1.4"
sysinfo = "0.29.0"
test-case = "3"
textwrap = "0.16"
tokio = { version = "1.0", default-features = false, features = ["macros", "net", "time", "rt-multi-thread", "sync"] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
xml-rs = "0.8"
Expand Down
6 changes: 3 additions & 3 deletions opensearch/tests/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn basic_auth_header() -> anyhow::Result<()> {
"authorization",
String::from_utf8(header_value).unwrap()
);
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand All @@ -76,7 +76,7 @@ async fn api_key_header() -> anyhow::Result<()> {
"authorization",
String::from_utf8(header_value).unwrap()
);
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand All @@ -92,7 +92,7 @@ async fn api_key_header() -> anyhow::Result<()> {
async fn bearer_header() -> anyhow::Result<()> {
let server = server::http(move |req| async move {
assert_header_eq!(req, "authorization", "Bearer access_token");
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand Down
6 changes: 3 additions & 3 deletions opensearch/tests/aws_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use aws_credential_types::Credentials as AwsCredentials;
use aws_smithy_async::time::StaticTimeSource;
use aws_types::region::Region;
use common::*;
use http::header::HOST;
use opensearch::{auth::Credentials, indices::IndicesCreateParts, OpenSearch};
use regex::Regex;
use reqwest::header::HOST;
use serde_json::json;
use std::convert::TryInto;
use test_case::test_case;
Expand Down Expand Up @@ -105,7 +105,7 @@ async fn aws_auth_get() -> anyhow::Result<()> {
"x-amz-content-sha256",
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
); // SHA of empty string
http::Response::default()
server::empty_response()
});

let client = create_aws_client(format!("http://{}", server.addr()).as_ref())?;
Expand All @@ -122,7 +122,7 @@ async fn aws_auth_post() -> anyhow::Result<()> {
"x-amz-content-sha256",
"f3a842f988a653a734ebe4e57c45f19293a002241a72f0b3abbff71e4f5297b9"
); // SHA of the JSON
http::Response::default()
server::empty_response()
});

let client = create_aws_client(format!("http://{}", server.addr()).as_ref())?;
Expand Down
16 changes: 8 additions & 8 deletions opensearch/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn default_user_agent_content_type_accept_headers() -> anyhow::Result<()>
assert_header_eq!(req, "user-agent", DEFAULT_USER_AGENT);
assert_header_eq!(req, "content-type", "application/json");
assert_header_eq!(req, "accept", "application/json");
http::Response::default()
server::empty_response()
});

let client = client::create_for_url(format!("http://{}", server.addr()).as_ref());
Expand All @@ -68,7 +68,7 @@ async fn default_user_agent_content_type_accept_headers() -> anyhow::Result<()>
async fn default_header() -> anyhow::Result<()> {
let server = server::http(move |req| async move {
assert_header_eq!(req, "x-opaque-id", "foo");
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()).header(
Expand All @@ -86,7 +86,7 @@ async fn default_header() -> anyhow::Result<()> {
async fn override_default_header() -> anyhow::Result<()> {
let server = server::http(move |req| async move {
assert_header_eq!(req, "x-opaque-id", "bar");
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()).header(
Expand All @@ -111,7 +111,7 @@ async fn override_default_header() -> anyhow::Result<()> {
async fn x_opaque_id_header() -> anyhow::Result<()> {
let server = server::http(move |req| async move {
assert_header_eq!(req, "x-opaque-id", "foo");
http::Response::default()
server::empty_response()
});

let client = client::create_for_url(format!("http://{}", server.addr()).as_ref());
Expand All @@ -131,7 +131,7 @@ async fn x_opaque_id_header() -> anyhow::Result<()> {
async fn uses_global_request_timeout() {
let server = server::http(move |_| async move {
std::thread::sleep(Duration::from_secs(1));
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand All @@ -150,7 +150,7 @@ async fn uses_global_request_timeout() {
async fn uses_call_request_timeout() {
let server = server::http(move |_| async move {
std::thread::sleep(Duration::from_secs(1));
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand All @@ -173,7 +173,7 @@ async fn uses_call_request_timeout() {
async fn call_request_timeout_supersedes_global_timeout() {
let server = server::http(move |_| async move {
std::thread::sleep(Duration::from_secs(1));
http::Response::default()
server::empty_response()
});

let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
Expand Down Expand Up @@ -246,7 +246,7 @@ async fn serialize_querystring() -> anyhow::Result<()> {
req.uri().query(),
Some("filter_path=took%2C_shards&pretty=true&q=title%3AOpenSearch&track_total_hits=100000")
);
http::Response::default()
server::empty_response()
});

let client = client::create_for_url(format!("http://{}", server.addr()).as_ref());
Expand Down
109 changes: 70 additions & 39 deletions opensearch/tests/common/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,34 @@
// https://github.com/seanmonstar/reqwest/blob/master/LICENSE-APACHE

use std::{
convert::Infallible, future::Future, net, sync::mpsc as std_mpsc, thread, time::Duration,
convert::Infallible,
future::Future,
net::{self, SocketAddr},
sync::mpsc as std_mpsc,
thread,
time::Duration,
};

use http::Request;
use hyper::Body;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
oneshot,
use bytes::Bytes;
use http_body_util::Empty;
use hyper::{
body::{Body, Incoming},
server::conn::http1,
service::service_fn,
Request, Response,
};
use hyper_util::rt::TokioIo;
use tokio::{
net::{TcpListener, TcpStream},
sync::{broadcast, mpsc},
};

pub use http::Response;
use tokio::runtime;

pub struct Server {
addr: net::SocketAddr,
panic_rx: std_mpsc::Receiver<()>,
shutdown_tx: Option<oneshot::Sender<()>>,
shutdown_tx: Option<broadcast::Sender<()>>,
}

impl Server {
Expand All @@ -61,7 +72,7 @@ impl Server {
impl Drop for Server {
fn drop(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
tx.send(()).unwrap();
}

if !::std::thread::panicking() {
Expand All @@ -72,46 +83,62 @@ impl Drop for Server {
}
}

pub fn http<F, Fut>(func: F) -> Server
pub fn http<F, Fut, B>(func: F) -> Server
where
F: Fn(http::Request<hyper::Body>) -> Fut + Clone + Send + 'static,
Fut: Future<Output = http::Response<hyper::Body>> + Send + 'static,
F: Fn(Request<Incoming>) -> Fut + Clone + Send + 'static,
Fut: Future<Output = Response<B>> + Send + 'static,
B: Body + Send + 'static,
B::Data: Send,
B::Error: std::error::Error + Send + Sync,
{
//Spawn new runtime in thread to prevent reactor execution context conflict
let thread_name = thread::current().name().unwrap_or("<unknown>").to_owned();

thread::spawn(move || {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("new rt");
let _ = rt.enter();

let (shutdown_tx, mut shutdown_rx) = broadcast::channel(1);
let listener = rt
.block_on(TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))))
.unwrap();
let addr = listener.local_addr().unwrap();

let srv = async move {
loop {
let (stream, _) = tokio::select! {
res = listener.accept() => res?,
_ = shutdown_rx.recv() => break
};
let io = TokioIo::new(stream);

let srv = {
let _guard = rt.enter();
hyper::Server::bind(&([127, 0, 0, 1], 0).into()).serve(hyper::service::make_service_fn(
move |_| {
let func = func.clone();
async move {
Ok::<_, Infallible>(hyper::service::service_fn(move |req| {
let fut = func(req);
async move { Ok::<_, Infallible>(fut.await) }
}))
let mut func = func.clone();
let mut shutdown_rx = shutdown_rx.resubscribe();

tokio::task::spawn(async move {
let conn = http1::Builder::new().serve_connection(
io,
service_fn(move |req| {
let func = func.clone();
async move { Ok::<_, Infallible>(func(req).await) }
}),
);
tokio::pin!(conn);
tokio::select! {
res = conn.as_mut() => {},
_ = shutdown_rx.recv() => conn.as_mut().graceful_shutdown()
}
},
))
});
}
Ok::<(), anyhow::Error>(())
};

let addr = srv.local_addr();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let srv = srv.with_graceful_shutdown(async move {
let _ = shutdown_rx.await;
});

let (panic_tx, panic_rx) = std_mpsc::channel();
let tname = format!(
"test({})-support-server",
thread::current().name().unwrap_or("<unknown>")
);
let thread_name = format!("test({})-support-server", thread_name);
thread::Builder::new()
.name(tname)
.name(thread_name)
.spawn(move || {
rt.block_on(srv).unwrap();
let _ = panic_tx.send(());
Expand All @@ -128,14 +155,18 @@ where
.unwrap()
}

pub fn capturing_http() -> (Server, UnboundedReceiver<Request<Body>>) {
let (tx, rx) = unbounded_channel();
pub fn capturing_http() -> (Server, mpsc::UnboundedReceiver<Request<Incoming>>) {
let (tx, rx) = mpsc::unbounded_channel();
let server = http(move |req| {
let tx = tx.clone();
async move {
tx.send(req).unwrap();
http::Response::default()
empty_response()
}
});
(server, rx)
}

pub fn empty_response() -> Response<Empty<Bytes>> {
Default::default()
}
Loading