From c8bd32e24075c5f49537294c8349dbb69dc50da6 Mon Sep 17 00:00:00 2001 From: Sattva Date: Wed, 28 Feb 2024 15:50:31 +0530 Subject: [PATCH] hyper-1.0 client-server example (#639) - Implement hyper's runtime traits for glommio - Update hyper server example for hyper 1.0 - Add hyper client example for hyper 1.0 --- examples/Cargo.toml | 17 ++- examples/hyper.rs | 142 ------------------- examples/hyper_client.rs | 287 +++++++++++++++++++++++++++++++++++++++ examples/hyper_server.rs | 233 +++++++++++++++++++++++++++++++ 4 files changed, 531 insertions(+), 148 deletions(-) delete mode 100644 examples/hyper.rs create mode 100644 examples/hyper_client.rs create mode 100644 examples/hyper_server.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index df3fa8444..6f716b3f1 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -14,11 +14,12 @@ futures-lite = "1.11.1" glommio = { path = "../glommio" } # hyper and tokio for the hyper example. We just need the traits from Tokio -hyper = { version = "0.14", features = ["full"] } +hyper = { version = "1.2.0", features = ["full"] } num_cpus = "1.13.0" pretty-bytes = "~0.2.2" sys-info = "~0.8.0" -tokio = { version = "1" } +http-body-util = "0.1.0" +serde_json = "1.0.114" [[example]] name = "echo" @@ -52,10 +53,14 @@ path = "storage.rs" name = "channel_mesh" path = "sharding.rs" -[[example]] -name = "hyper" -path = "hyper.rs" - [[example]] name = "gate" path = "gate.rs" + +[[example]] +name = "hyper_server" +path = "hyper_server.rs" + +[[example]] +name = "hyper_client" +path = "hyper_client.rs" diff --git a/examples/hyper.rs b/examples/hyper.rs deleted file mode 100644 index 1851c8900..000000000 --- a/examples/hyper.rs +++ /dev/null @@ -1,142 +0,0 @@ -/// Example on how to use the Hyper server in !Send mode. -/// The clients are harder, see https://github.com/hyperium/hyper/issues/2341 for details -/// -/// Essentially what we do is we wrap our types around the Tokio traits. The -/// `!Send` limitation makes it harder to deal with high level hyper primitives, -/// but it works in the end. -mod hyper_compat { - use futures_lite::{AsyncRead, AsyncWrite, Future}; - use hyper::service::service_fn; - use std::{ - net::SocketAddr, - pin::Pin, - task::{Context, Poll}, - }; - - use glommio::{ - enclose, - net::{TcpListener, TcpStream}, - sync::Semaphore, - }; - use hyper::{server::conn::Http, Body, Request, Response}; - use std::{io, rc::Rc}; - use tokio::io::ReadBuf; - - #[derive(Clone)] - struct HyperExecutor; - - impl hyper::rt::Executor for HyperExecutor - where - F: Future + 'static, - F::Output: 'static, - { - fn execute(&self, fut: F) { - glommio::spawn_local(fut).detach(); - } - } - - struct HyperStream(pub TcpStream); - impl tokio::io::AsyncRead for HyperStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.0) - .poll_read(cx, buf.initialize_unfilled()) - .map(|n| { - if let Ok(n) = n { - buf.advance(n); - } - Ok(()) - }) - } - } - - impl tokio::io::AsyncWrite for HyperStream { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.0).poll_close(cx) - } - } - - pub(crate) async fn serve_http( - addr: A, - service: S, - max_connections: usize, - ) -> io::Result<()> - where - S: FnMut(Request) -> F + 'static + Copy, - F: Future, R>> + 'static, - R: std::error::Error + 'static + Send + Sync, - A: Into, - { - let listener = TcpListener::bind(addr.into())?; - let conn_control = Rc::new(Semaphore::new(max_connections as _)); - loop { - match listener.accept().await { - Err(x) => { - return Err(x.into()); - } - Ok(stream) => { - let addr = stream.local_addr().unwrap(); - glommio::spawn_local(enclose!{(conn_control) async move { - let _permit = conn_control.acquire_permit(1).await; - if let Err(x) = Http::new().with_executor(HyperExecutor).serve_connection(HyperStream(stream), service_fn(service)).await { - if !x.is_incomplete_message() { - eprintln!("Stream from {addr:?} failed with error {x:?}"); - } - } - }}).detach(); - } - } - } - } -} - -use glommio::{CpuSet, LocalExecutorPoolBuilder, PoolPlacement}; -use hyper::{Body, Method, Request, Response, StatusCode}; -use std::convert::Infallible; - -async fn hyper_demo(req: Request) -> Result, Infallible> { - match (req.method(), req.uri().path()) { - (&Method::GET, "/hello") => Ok(Response::new(Body::from("world"))), - (&Method::GET, "/world") => Ok(Response::new(Body::from("hello"))), - _ => Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("notfound")) - .unwrap()), - } -} - -fn main() { - // Issue curl -X GET http://127.0.0.1:8000/hello or curl -X GET http://127.0.0.1:8000/world to - // see it in action - - println!("Starting server on port 8000"); - - LocalExecutorPoolBuilder::new(PoolPlacement::MaxSpread( - num_cpus::get(), - CpuSet::online().ok(), - )) - .on_all_shards(|| async move { - let id = glommio::executor().id(); - println!("Starting executor {id}"); - hyper_compat::serve_http(([0, 0, 0, 0], 8000), hyper_demo, 1024) - .await - .unwrap(); - }) - .unwrap() - .join_all(); -} diff --git a/examples/hyper_client.rs b/examples/hyper_client.rs new file mode 100644 index 000000000..260d7c8dc --- /dev/null +++ b/examples/hyper_client.rs @@ -0,0 +1,287 @@ +// Provide --http1 or --http2 arg in run command +// cargo run --example hyper_client -- --http1 +mod hyper_compat { + use futures_lite::{AsyncRead, AsyncWrite, Future}; + use std::{ + io::Write, + pin::Pin, + slice, + task::{Context, Poll}, + vec, + }; + + use glommio::net::TcpStream; + + use http_body_util::BodyExt; + use hyper::body::{Body as HttpBody, Bytes, Frame}; + use hyper::Error; + use hyper::Request; + use std::io; + use std::marker::PhantomData; + + #[derive(Clone)] + struct HyperExecutor; + + impl hyper::rt::Executor for HyperExecutor + where + F: Future + 'static, + F::Output: 'static, + { + fn execute(&self, fut: F) { + glommio::spawn_local(fut).detach(); + } + } + + struct HyperStream(pub TcpStream); + + impl hyper::rt::Write for HyperStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.0).poll_close(cx) + } + } + + impl hyper::rt::Read for HyperStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + unsafe { + let read_slice = { + let buffer = buf.as_mut(); + buffer.as_mut_ptr().write_bytes(0, buffer.len()); + slice::from_raw_parts_mut(buffer.as_mut_ptr() as *mut u8, buffer.len()) + }; + Pin::new(&mut self.0).poll_read(cx, read_slice).map(|n| { + if let Ok(n) = n { + buf.advance(n); + } + Ok(()) + }) + } + } + } + + struct GlommioSleep(glommio::timer::Timer); + + impl Future for GlommioSleep { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + match Pin::new(&mut self.0).poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } + } + + impl hyper::rt::Sleep for GlommioSleep {} + unsafe impl Send for GlommioSleep {} + unsafe impl Sync for GlommioSleep {} + + #[derive(Clone, Copy, Debug)] + pub struct GlommioTimer; + + impl hyper::rt::Timer for GlommioTimer { + fn sleep(&self, duration: std::time::Duration) -> Pin> { + Box::pin(GlommioSleep(glommio::timer::Timer::new(duration))) + } + + fn sleep_until(&self, deadline: std::time::Instant) -> Pin> { + Box::pin(GlommioSleep(glommio::timer::Timer::new( + deadline - std::time::Instant::now(), + ))) + } + } + + struct Body { + // Our Body type is !Send and !Sync: + _marker: PhantomData<*const ()>, + data: Option, + } + + impl From<&[u8]> for Body { + fn from(data: &[u8]) -> Self { + Body { + _marker: PhantomData, + data: Some(Bytes::copy_from_slice(data)), + } + } + } + + impl HttpBody for Body { + type Data = Bytes; + type Error = Error; + + fn poll_frame( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Poll::Ready(self.get_mut().data.take().map(|d| Ok(Frame::data(d)))) + } + } + + pub async fn http1_client( + executor_id: usize, + url: hyper::Uri, + ) -> Result<(), Box> { + let host = url.host().expect("uri has no host"); + let port = url.port_u16().unwrap_or(80); + let addr = format!("{}:{}", host, port); + let stream = TcpStream::connect(addr).await?; + + let io = HyperStream(stream); + + let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; + + glommio::spawn_local(async move { + if let Err(err) = conn.await { + println!("{executor_id}: Connection failed: {:?}", err); + } + }) + .detach(); + + let body = serde_json::json!({"test": {}}); + let body_bytes = serde_json::to_vec(&body).unwrap(); + + let authority = url.authority().unwrap().clone(); + for request_id in 0..4 { + let request = Request::builder() + .uri(url.clone()) + .header(hyper::header::HOST, authority.as_str()) + .body(Body::from(body_bytes.as_slice()))?; + + let mut response = sender.send_request(request).await.unwrap(); + + // Print the response body + let mut res_buff = vec![]; + while let Some(next) = response.frame().await { + let frame = next.unwrap(); + if let Some(chunk) = frame.data_ref() { + res_buff.write_all(&chunk).unwrap(); + } + } + + println!( + "{executor_id}: request_id = {request_id} | response_status = {} | response_body = {}", + response.status(), + String::from_utf8_lossy(&res_buff) + ); + } + + Ok(()) + } + + pub async fn http2_client( + executor_id: usize, + url: hyper::Uri, + ) -> Result<(), Box> { + let host = url.host().expect("uri has no host"); + let port = url.port_u16().unwrap_or(80); + let addr = format!("{}:{}", host, port); + let stream = TcpStream::connect(addr).await?; + + let io = HyperStream(stream); + + let (mut sender, conn) = hyper::client::conn::http2::handshake(HyperExecutor, io).await?; + + glommio::spawn_local(async move { + if let Err(err) = conn.await { + println!("{executor_id}: Connection failed: {:?}", err); + } + }) + .detach(); + + let body = serde_json::json!({"test": {}}); + let body_bytes = serde_json::to_vec(&body).unwrap(); + + let authority = url.authority().unwrap().clone(); + for request_id in 0..4 { + let request = Request::builder() + .uri(url.clone()) + .header(hyper::header::HOST, authority.as_str()) + .body(Body::from(body_bytes.as_slice()))?; + + let mut response = sender.send_request(request).await.unwrap(); + + // Print the response body + let mut res_buff = vec![]; + while let Some(next) = response.frame().await { + let frame = next.unwrap(); + if let Some(chunk) = frame.data_ref() { + res_buff.write_all(&chunk).unwrap(); + } + } + + println!( + "{executor_id}: request_id = {request_id} | response_status = {} | response_body = {}", + response.status(), + String::from_utf8_lossy(&res_buff) + ); + } + + Ok(()) + } +} + +use glommio::{CpuSet, LocalExecutorPoolBuilder, PoolPlacement}; + +fn main() { + let args: Vec = std::env::args().collect(); + if args.len() != 2 { + println!("Provide args --http1 or --http2"); + return; + } + + match args[1].as_str() { + "--http1" => { + LocalExecutorPoolBuilder::new(PoolPlacement::MaxSpread( + num_cpus::get(), + CpuSet::online().ok(), + )) + .on_all_shards(|| async move { + let executor_id = glommio::executor().id(); + println!("Starting executor {executor_id}"); + hyper_compat::http1_client( + executor_id, + "http://0.0.0.0:8000/hello".parse::().unwrap(), + ) + .await + .unwrap() + }) + .unwrap() + .join_all(); + } + "--http2" => { + LocalExecutorPoolBuilder::new(PoolPlacement::MaxSpread( + num_cpus::get(), + CpuSet::online().ok(), + )) + .on_all_shards(|| async move { + let executor_id = glommio::executor().id(); + println!("Starting executor {executor_id}"); + hyper_compat::http2_client( + executor_id, + "http://0.0.0.0:8000/hello".parse::().unwrap(), + ) + .await + .unwrap() + }) + .unwrap() + .join_all(); + } + _ => println!("Provide args --http1 or --http2"), + } +} diff --git a/examples/hyper_server.rs b/examples/hyper_server.rs new file mode 100644 index 000000000..f20cd4708 --- /dev/null +++ b/examples/hyper_server.rs @@ -0,0 +1,233 @@ +// Provide --http1 or --http2 arg in run command +// cargo run --example hyper_server -- --http1 +mod hyper_compat { + use futures_lite::{AsyncRead, AsyncWrite, Future}; + use glommio::{ + enclose, + net::{TcpListener, TcpStream}, + sync::Semaphore, + }; + use hyper::{ + body::{Body as HttpBody, Bytes, Frame, Incoming}, + service::service_fn, + Error, Request, Response, + }; + + use std::{ + io, + marker::PhantomData, + net::SocketAddr, + pin::Pin, + rc::Rc, + slice, + task::{Context, Poll}, + }; + + #[derive(Clone)] + struct HyperExecutor; + impl hyper::rt::Executor for HyperExecutor + where + F: Future + 'static, + F::Output: 'static, + { + fn execute(&self, fut: F) { + glommio::spawn_local(fut).detach(); + } + } + + struct HyperStream(pub TcpStream); + + impl hyper::rt::Write for HyperStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.0).poll_close(cx) + } + } + + impl hyper::rt::Read for HyperStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + unsafe { + let read_slice = { + let buffer = buf.as_mut(); + buffer.as_mut_ptr().write_bytes(0, buffer.len()); + slice::from_raw_parts_mut(buffer.as_mut_ptr() as *mut u8, buffer.len()) + }; + Pin::new(&mut self.0).poll_read(cx, read_slice).map(|n| { + if let Ok(n) = n { + buf.advance(n); + } + Ok(()) + }) + } + } + } + + pub struct ResponseBody { + // Our ResponseBody type is !Send and !Sync + _marker: PhantomData<*const ()>, + data: Option, + } + + impl From<&'static str> for ResponseBody { + fn from(data: &'static str) -> Self { + ResponseBody { + _marker: PhantomData, + data: Some(Bytes::from(data)), + } + } + } + + impl HttpBody for ResponseBody { + type Data = Bytes; + type Error = Error; + fn poll_frame( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Poll::Ready(self.get_mut().data.take().map(|d| Ok(Frame::data(d)))) + } + } + + pub(crate) async fn serve_http1( + addr: A, + service: S, + max_connections: usize, + ) -> io::Result<()> + where + S: Fn(Request) -> F + 'static + Copy, + F: Future, R>> + 'static, + R: std::error::Error + 'static + Send + Sync, + A: Into, + { + let listener = TcpListener::bind(addr.into())?; + let conn_control = Rc::new(Semaphore::new(max_connections as _)); + loop { + match listener.accept().await { + Err(x) => { + return Err(x.into()); + } + Ok(stream) => { + let addr = stream.local_addr().unwrap(); + let io = HyperStream(stream); + glommio::spawn_local(enclose! {(conn_control) async move { + let _permit = conn_control.acquire_permit(1).await; + if let Err(err) = hyper::server::conn::http1::Builder::new().serve_connection(io, service_fn(service)).await { + if !err.is_incomplete_message() { + eprintln!("Stream from {addr:?} failed with error {err:?}"); + } + } + }}).detach(); + } + } + } + } + + pub(crate) async fn serve_http2( + addr: A, + service: S, + max_connections: usize, + ) -> io::Result<()> + where + S: Fn(Request) -> F + 'static + Copy, + F: Future, R>> + 'static, + R: std::error::Error + 'static + Send + Sync, + A: Into, + { + let listener = TcpListener::bind(addr.into())?; + let conn_control = Rc::new(Semaphore::new(max_connections as _)); + loop { + match listener.accept().await { + Err(x) => { + return Err(x.into()); + } + Ok(stream) => { + let addr = stream.local_addr().unwrap(); + let io = HyperStream(stream); + glommio::spawn_local(enclose! {(conn_control) async move { + let _permit = conn_control.acquire_permit(1).await; + if let Err(err) = hyper::server::conn::http2::Builder::new(HyperExecutor).serve_connection(io, service_fn(service)).await { + if !err.is_incomplete_message() { + eprintln!("Stream from {addr:?} failed with error {err:?}"); + } + } + }}).detach(); + } + } + } + } +} + +use glommio::{CpuSet, LocalExecutorPoolBuilder, PoolPlacement}; +use hyper::{body::Incoming, Method, Request, Response, StatusCode}; +use hyper_compat::ResponseBody; +use std::convert::Infallible; + +async fn hyper_demo(req: Request) -> Result, Infallible> { + match (req.method(), req.uri().path()) { + (&Method::GET, "/hello") => Ok(Response::new(ResponseBody::from("world"))), + _ => Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(ResponseBody::from("notfound")) + .unwrap()), + } +} + +fn main() { + let args: Vec = std::env::args().collect(); + if args.len() != 2 { + println!("Provide args --http1 or --http2"); + return; + } + + match args[1].as_str() { + "--http1" => { + // Issue curl -X GET http://127.0.0.1:8000/hello to see it in action + LocalExecutorPoolBuilder::new(PoolPlacement::MaxSpread( + num_cpus::get(), + CpuSet::online().ok(), + )) + .on_all_shards(|| async move { + let id = glommio::executor().id(); + println!("Starting executor {id}"); + hyper_compat::serve_http1(([0, 0, 0, 0], 8000), hyper_demo, 1024) + .await + .unwrap(); + }) + .unwrap() + .join_all(); + } + "--http2" => { + // Issue curl --http2-prior-knowledge -X GET http://127.0.0.1:8000/hello to see it in action + println!("Starting http2 server on port 8000"); + LocalExecutorPoolBuilder::new(PoolPlacement::MaxSpread( + num_cpus::get(), + CpuSet::online().ok(), + )) + .on_all_shards(|| async move { + let id = glommio::executor().id(); + println!("Starting executor {id}"); + hyper_compat::serve_http2(([0, 0, 0, 0], 8000), hyper_demo, 1024) + .await + .unwrap(); + }) + .unwrap() + .join_all(); + } + _ => println!("Provide args --http1 or --http2"), + } +}