Skip to content

Commit

Permalink
finish of tower-async-hyper
Browse files Browse the repository at this point in the history
  • Loading branch information
glendc committed Nov 19, 2023
1 parent c652572 commit 3acf170
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 148 deletions.
2 changes: 2 additions & 0 deletions tower-async-hyper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ categories = ["asynchronous", "network-programming"]
edition = "2021"

[dependencies]
http-body = "1"
hyper = { version = "1.0", features = ["http1", "http2", "server"] }
pin-project-lite = "0.2"
tower-async-service = { version = "0.2", path = "../tower-async-service" }

[dev-dependencies]
Expand Down
58 changes: 0 additions & 58 deletions tower-async-hyper/examples/hello_hyper.rs

This file was deleted.

82 changes: 82 additions & 0 deletions tower-async-hyper/src/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use http_body::{Body as HttpBody, Frame, SizeHint};
use hyper::body::Incoming;

pin_project_lite::pin_project! {
/// A wrapper around `hyper::body::Incoming` that implements `http_body::Body`.
///
/// This type is used to bridge the `hyper` and `tower-async` ecosystems.
/// Reason is that a lot of middlewares in `tower-async-http` that
/// operate on `http_body::Body` which also have to implement `Default`.
#[derive(Debug, Default)]
pub struct Body {
#[pin]
inner: Option<Incoming>,
}
}

impl From<Incoming> for Body {
fn from(inner: Incoming) -> Self {
Self { inner: Some(inner) }
}
}

impl Body {
/// Return a reference to the inner [`hyper::body::Incoming`] value.
///
/// This is normally not needed,
/// but in case you do ever need it, it's here.
pub fn as_ref(&self) -> Option<&Incoming> {
self.inner.as_ref()
}

/// Return a mutable reference to the inner [`hyper::body::Incoming`] value.
///
/// This is normally not needed,
/// but in case you do ever need it, it's here.
pub fn as_mut(&mut self) -> Option<&mut Incoming> {
self.inner.as_mut()
}

/// Turn this [`Body`] into the inner [`hyper::body::Incoming`] value.
///
/// This is normally not needed,
/// but in case you do ever need it, it's here.
pub fn into_inner(self) -> Option<Incoming> {
self.inner
}
}

impl HttpBody for Body {
type Data = <Incoming as HttpBody>::Data;
type Error = <Incoming as HttpBody>::Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.project()
.inner
.as_pin_mut()
.map(|incoming| incoming.poll_frame(cx))
.unwrap_or_else(|| Poll::Ready(None))
}

fn is_end_stream(&self) -> bool {
self.inner
.as_ref()
.map(|incoming| incoming.is_end_stream())
.unwrap_or(true)
}

fn size_hint(&self) -> SizeHint {
self.inner
.as_ref()
.map(|incoming| incoming.size_hint())
.unwrap_or_default()
}
}
159 changes: 69 additions & 90 deletions tower-async-hyper/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,100 +1,79 @@
//! Bridges a `tower-async` `Service` to be used within a `hyper` (1.x) environment.
//!
//! In case you also make use of `tower-async-http`,
//! you can use its [`tower_async_http::map_request_body::MapRequestBodyLayer`] middleware
//! to convert the normal [`hyper::body::Incoming`] [`http_body::Body`] into a [`HyperBody`]
//! as it can be used with middlewares that require the [`http_body::Body`] to be [`Default`].
//!
//! [`tower_async_http::map_request_body::MapRequestBodyLayer`]: https://docs.rs/tower-async-http/latest/tower_async_http/map_request_body/struct.MapRequestBodyLayer.html
//!
//! # Example
//!
//! ```text
//! ```rust,no_run
//! use std::net::SocketAddr;
//!
//! use http::{Request, Response, StatusCode};
//! use hyper_util::rt::{TokioExecutor, TokioIo};
//! use hyper_util::server::conn::auto::Builder;
//! use tokio::net::TcpListener;
//! use tracing_subscriber::filter::LevelFilter;
//! use tracing_subscriber::layer::SubscriberExt;
//! use tracing_subscriber::util::SubscriberInitExt;
//! use tracing_subscriber::{fmt, EnvFilter};
//!
//! use tower_async::ServiceBuilder;
//! use tower_async_http::ServiceBuilderExt;
//! use tower_async_hyper::{HyperBody, TowerHyperServiceExt};
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
//! tracing_subscriber::registry()
//! .with(fmt::layer())
//! .with(
//! EnvFilter::builder()
//! .with_default_directive(LevelFilter::DEBUG.into())
//! .from_env_lossy(),
//! )
//! .init();
//!
//! let service = ServiceBuilder::new()
//! .map_request_body(HyperBody::from)
//! .timeout(std::time::Duration::from_secs(5))
//! .decompression()
//! .compression()
//! .follow_redirects()
//! .trace_for_http()
//! .service_fn(|_req: Request<HyperBody>| async move {
//! Response::builder()
//! .status(StatusCode::OK)
//! .header("content-type", "text/plain")
//! .body(String::from("hello"))
//! });
//!
//! let addr: SocketAddr = ([127, 0, 0, 1], 8080).into();
//! let listener = TcpListener::bind(addr).await?;
//!
//! loop {
//! let (stream, _) = listener.accept().await?;
//! let service = service.clone().into_hyper_service();
//! tokio::spawn(async move {
//! let stream = TokioIo::new(stream);
//! let result = Builder::new(TokioExecutor::new())
//! .serve_connection(stream, service)
//! .await;
//! if let Err(e) = result {
//! eprintln!("server connection error: {}", e);
//! }
//! });
//! }
//! }
//! ```

#![feature(return_type_notation)]
#![allow(incomplete_features)]

use std::pin::Pin;
use std::sync::Arc;

use hyper::service::Service as HyperService;

use tower_async_service::Service;

pub trait TowerHyperServiceExt<S, Request> {
fn into_hyper_service(self) -> HyperServiceWrapper<S>;
}

impl<S, Request> TowerHyperServiceExt<S, Request> for S
where
S: Service<Request>,
{
fn into_hyper_service(self) -> HyperServiceWrapper<S> {
HyperServiceWrapper {
service: Arc::new(self),
}
}
}

pub struct HyperServiceWrapper<S> {
service: Arc<S>,
}

impl<S, Request> HyperService<Request> for HyperServiceWrapper<S>
where
S: Service<Request, call(): Send> + Send + Sync + 'static,
Request: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn call(&self, req: Request) -> Self::Future {
let service = self.service.clone();
let fut = async move { service.call(req).await };
Box::pin(fut)
}
}

pub type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;

#[cfg(test)]
mod test {
use std::convert::Infallible;

use super::*;

fn require_send<T: Send>(t: T) -> T {
t
}

fn require_service<Request, S: Service<Request>>(s: S) -> S {
s
}

#[tokio::test]
async fn test_into_hyper_service() {
let service =
tower_async::service_fn(|req: &'static str| async move { Ok::<_, Infallible>(req) });
let service = require_service(service);
let hyper_service = service.into_hyper_service();
inner_test_hyper_service(hyper_service).await;
}

#[tokio::test]
async fn test_into_layered_hyper_service() {
let service = tower_async::ServiceBuilder::new()
.timeout(std::time::Duration::from_secs(5))
.service_fn(|req: &'static str| async move { Ok::<_, Infallible>(req) });
let service = require_service(service);
let hyper_service = service.into_hyper_service();
inner_test_hyper_service(hyper_service).await;
}

async fn inner_test_hyper_service<H>(hyper_service: H)
where
H: HyperService<&'static str, Response = &'static str>,
H::Error: std::fmt::Debug,
H::Future: Send,
{
let fut = hyper_service.call("hello");
let fut = require_send(fut);
mod service;
pub use service::{BoxFuture, HyperServiceWrapper, TowerHyperServiceExt};

let res = fut.await.expect("call hyper service");
assert_eq!(res, "hello");
}
}
mod body;
pub use body::Body as HyperBody;
Loading

0 comments on commit 3acf170

Please sign in to comment.