From 2f5e921c615189d768499b268f8d8110dc96dedb Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 7 Jan 2024 21:21:49 +0600 Subject: [PATCH] Async fn in trait (#24) --- CHANGELOG.md | 4 + Cargo.toml | 20 +-- src/client/simple.rs | 2 +- src/client/stream.rs | 9 +- src/connection.rs | 3 +- src/default.rs | 16 +- src/dispatcher.rs | 346 +++++++++++++----------------------------- src/server/service.rs | 35 ++--- src/stream.rs | 7 +- 9 files changed, 161 insertions(+), 281 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ddd7aef..09c7510 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changes +## [0.5.0-b.0] - 2024-01-07 + +* Use "async fn" in trait for Service definition + ## [0.4.4] - 2023-11-11 * Update ntex-io diff --git a/Cargo.toml b/Cargo.toml index 775228d..99e93a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-h2" -version = "0.4.4" +version = "0.5.0-b.0" license = "MIT" authors = ["Nikolay Kim "] description = "An HTTP/2 client and server" @@ -19,14 +19,14 @@ default = [] unstable = [] [dependencies] -ntex-connect = "0.3.2" -ntex-io = "0.3.6" +ntex-connect = "1.0.0-b.0" +ntex-io = "1.0.0-b.0" ntex-http = "0.1.10" -ntex-bytes = "0.1.20" +ntex-bytes = "0.1.21" ntex-codec = "0.6.2" -ntex-service = "1.2.7" -ntex-util = "0.3.2" -ntex-rt = "0.4.7" +ntex-service = "2.0.0-b.0" +ntex-util = "1.0.0-b.0" +ntex-rt = "0.4.10" bitflags = "2.4" fxhash = "0.2.1" @@ -46,13 +46,13 @@ walkdir = "2.3.2" serde = "1.0.0" serde_json = "1.0.0" -ntex = { version = "0.7.6", features = ["openssl", "tokio"] } -ntex-tls = { version = "0.3.0", features = ["openssl"] } +ntex = { version = "1.0.0-b.0", features = ["openssl", "tokio"] } +ntex-tls = { version = "1.0.0-b.0", features = ["openssl"] } openssl = "0.10" # Examples env_logger = { version = "0.10", default-features = false } -ntex-connect = { version = "0.3.2", features = ["openssl", "tokio"] } +ntex-connect = { version = "1.0.0-b.0", features = ["openssl", "tokio"] } [patch.crates-io] ntex-h2 = { path = "." } diff --git a/src/client/simple.rs b/src/client/simple.rs index f56f0c8..ba3580b 100644 --- a/src/client/simple.rs +++ b/src/client/simple.rs @@ -54,7 +54,7 @@ impl SimpleClient { HandleService::new(storage.clone()), ); - let fut = IoDispatcher::with_config( + let fut = IoDispatcher::new( io, con.codec().clone(), disp, diff --git a/src/client/stream.rs b/src/client/stream.rs index c3082bf..fe9d809 100644 --- a/src/client/stream.rs +++ b/src/client/stream.rs @@ -1,10 +1,10 @@ use std::task::{Context, Poll}; -use std::{cell::RefCell, collections::VecDeque, fmt, pin::Pin, rc::Rc}; +use std::{cell::RefCell, collections::VecDeque, fmt, future::poll_fn, pin::Pin, rc::Rc}; use ntex_bytes::Bytes; use ntex_http::HeaderMap; use ntex_service::{Service, ServiceCtx}; -use ntex_util::future::{poll_fn, Either, Ready}; +use ntex_util::future::Either; use ntex_util::{task::LocalWaker, HashMap, Stream as FutStream}; use crate::error::OperationError; @@ -198,9 +198,8 @@ impl HandleService { impl Service for HandleService { type Response = (); type Error = (); - type Future<'f> = Ready<(), ()>; - fn call<'a>(&'a self, msg: Message, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { + async fn call(&self, msg: Message, _: ServiceCtx<'_, Self>) -> Result<(), ()> { let id = msg.id(); if let Some(inflight) = self.0 .0.inflight.borrow_mut().get_mut(&id) { let eof = match msg.kind() { @@ -214,7 +213,7 @@ impl Service for HandleService { } inflight.push(msg); } - Ready::Ok(()) + Ok(()) } } diff --git a/src/connection.rs b/src/connection.rs index 8ad56b7..59614e6 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -27,6 +27,7 @@ bitflags::bitflags! { const DISCONNECT_WHEN_READY = 0b0000_1000; const SECURE = 0b0001_0000; const STREAM_REFUSED = 0b0010_0000; + const KA_TIMER = 0b0100_0000; } } @@ -673,7 +674,7 @@ impl RecvHalfConnection { } pub(crate) fn recv_pong(&self, _: frame::Ping) { - self.0.io.stop_keepalive_timer(); + self.0.io.stop_timer(); } pub(crate) fn recv_go_away( diff --git a/src/default.rs b/src/default.rs index bb54d58..3adf837 100644 --- a/src/default.rs +++ b/src/default.rs @@ -1,7 +1,6 @@ use std::fmt; use ntex_service::{Service, ServiceCtx, ServiceFactory}; -use ntex_util::future::Ready; use super::control::{ControlMessage, ControlResult}; @@ -14,21 +13,22 @@ impl ServiceFactory> for DefaultContr type Error = E; type InitError = E; type Service = DefaultControlService; - type Future<'f> = Ready; - fn create(&self, _: ()) -> Self::Future<'_> { - Ready::Ok(DefaultControlService) + async fn create(&self, _: ()) -> Result { + Ok(DefaultControlService) } } impl Service> for DefaultControlService { type Response = ControlResult; type Error = E; - type Future<'f> = Ready; - #[inline] - fn call<'a>(&'a self, msg: ControlMessage, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { + async fn call( + &self, + msg: ControlMessage, + _: ServiceCtx<'_, Self>, + ) -> Result { log::trace!("Default control service is used: {:?}", msg); - Ready::Ok(msg.ack()) + Ok(msg.ack()) } } diff --git a/src/dispatcher.rs b/src/dispatcher.rs index f85b406..d31135b 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -1,10 +1,10 @@ -use std::{cell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; +use std::{cell, fmt, future::poll_fn, future::Future, rc::Rc, task::Context, task::Poll}; use ntex_io::DispatchItem; use ntex_rt::spawn; -use ntex_service::{Pipeline, Service, ServiceCall, ServiceCtx}; -use ntex_util::future::{join_all, BoxFuture, Either, Ready}; -use ntex_util::{ready, HashMap}; +use ntex_service::{Pipeline, Service, ServiceCtx}; +use ntex_util::future::{join_all, BoxFuture, Either}; +use ntex_util::HashMap; use crate::connection::{Connection, RecvHalfConnection}; use crate::control::{ControlMessage, ControlResult}; @@ -40,14 +40,6 @@ where last_stream_id: StreamId, } -type ServiceFut<'f, Pub, Ctl> = Either< - PublishResponse<'f, Pub, Ctl>, - Either< - ControlResponse<'f, Ctl, Pub>, - Either, ()>, BoxFuture<'f, Result, ()>>>, - >, ->; - impl Dispatcher where Ctl: Service, Response = ControlResult> + 'static, @@ -68,23 +60,17 @@ where } } - fn handle_message<'f>( + async fn handle_message<'f>( &'f self, result: Result, Either>, ctx: ServiceCtx<'f, Self>, - ) -> ServiceFut<'f, Pub, Ctl> { + ) -> Result, ()> { match result { - Ok(Some((stream, msg))) => { - Either::Left(PublishResponse::new(msg, stream, &self.inner, ctx)) - } - Ok(None) => Either::Right(Either::Right(Either::Left(Ready::Ok(None)))), + Ok(Some((stream, msg))) => publish(msg, stream, &self.inner, ctx).await, + Ok(None) => Ok(None), Err(Either::Left(err)) => { self.connection.proto_error(&err); - Either::Right(Either::Left(ControlResponse::new( - ControlMessage::proto_error(err), - &self.inner, - ctx, - ))) + control(ControlMessage::proto_error(err), &self.inner, ctx).await } Err(Either::Right(err)) => { let (stream, kind) = err.into_inner(); @@ -92,12 +78,7 @@ where self.connection .encode(Reset::new(stream.id(), kind.reason())); - Either::Left(PublishResponse::new( - Message::error(kind, &stream), - stream, - &self.inner, - ctx, - )) + publish(Message::error(kind, &stream), stream, &self.inner, ctx).await } } } @@ -125,7 +106,6 @@ where { type Response = Option; type Error = (); - type Future<'f> = ServiceFut<'f, Pub, Ctl>; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { // check publish service readiness @@ -180,66 +160,66 @@ where } } - fn call<'a>( - &'a self, + async fn call( + &self, request: DispatchItem, - ctx: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { + ctx: ServiceCtx<'_, Self>, + ) -> Result { log::debug!("Handle h2 message: {:?}", request); match request { DispatchItem::Item(frame) => match frame { Frame::Headers(hdrs) => { self.handle_message(self.connection.recv_headers(hdrs), ctx) + .await + } + Frame::Data(data) => { + self.handle_message(self.connection.recv_data(data), ctx) + .await } - Frame::Data(data) => self.handle_message(self.connection.recv_data(data), ctx), Frame::Settings(settings) => match self.connection.recv_settings(settings) { Err(Either::Left(err)) => { self.connection.proto_error(&err); - Either::Right(Either::Left(ControlResponse::new( - ControlMessage::proto_error(err), - &self.inner, - ctx, - ))) + control(ControlMessage::proto_error(err), &self.inner, ctx).await } Err(Either::Right(errs)) => { - Either::Right(Either::Right(Either::Right(Box::pin(async move { - // handle stream errors - for err in errs { - let (stream, kind) = err.into_inner(); - stream.set_failed_stream(kind.into()); - - self.connection - .encode(Reset::new(stream.id(), kind.reason())); - let _ = PublishResponse::::new( - Message::error(kind, &stream), - stream, - &self.inner, - ctx, - ) - .await; - } - Ok(None) - })))) + // handle stream errors + for err in errs { + let (stream, kind) = err.into_inner(); + stream.set_failed_stream(kind.into()); + + self.connection + .encode(Reset::new(stream.id(), kind.reason())); + let _ = publish::( + Message::error(kind, &stream), + stream, + &self.inner, + ctx, + ) + .await; + } + Ok(None) } - Ok(_) => Either::Right(Either::Right(Either::Left(Ready::Ok(None)))), + Ok(_) => Ok(None), }, - Frame::WindowUpdate(update) => self.handle_message( - self.connection.recv_window_update(update).map(|_| None), - ctx, - ), + Frame::WindowUpdate(update) => { + self.handle_message( + self.connection.recv_window_update(update).map(|_| None), + ctx, + ) + .await + } Frame::Reset(reset) => { self.handle_message(self.connection.recv_rst_stream(reset).map(|_| None), ctx) + .await } Frame::Ping(ping) => { log::trace!("processing PING: {:#?}", ping); if ping.is_ack() { self.connection.recv_pong(ping); - Either::Right(Either::Right(Either::Left(Ready::Ok(None)))) + Ok(None) } else { - Either::Right(Either::Right(Either::Left(Ready::Ok(Some( - Ping::pong(ping.into_payload()).into(), - ))))) + Ok(Some(Ping::pong(ping.into_payload()).into())) } } Frame::GoAway(frm) => { @@ -247,237 +227,129 @@ where let reason = frm.reason(); let streams = self.connection.recv_go_away(reason, frm.data()); self.handle_connection_error(streams, ConnectionError::GoAway(reason).into()); - Either::Right(Either::Left(ControlResponse::new( - ControlMessage::go_away(frm), - &self.inner, - ctx, - ))) + control(ControlMessage::go_away(frm), &self.inner, ctx).await } Frame::Priority(prio) => { log::debug!("PRIORITY frame is not supported: {:#?}", prio); - Either::Right(Either::Right(Either::Left(Ready::Ok(None)))) + Ok(None) } }, DispatchItem::EncoderError(err) => { let err = ConnectionError::from(err); let streams = self.connection.proto_error(&err); self.handle_connection_error(streams, err.into()); - Either::Right(Either::Left(ControlResponse::new( - ControlMessage::proto_error(err), - &self.inner, - ctx, - ))) + control(ControlMessage::proto_error(err), &self.inner, ctx).await } DispatchItem::DecoderError(err) => { let err = ConnectionError::from(err); let streams = self.connection.proto_error(&err); self.handle_connection_error(streams, err.into()); - Either::Right(Either::Left(ControlResponse::new( - ControlMessage::proto_error(err), - &self.inner, - ctx, - ))) + control(ControlMessage::proto_error(err), &self.inner, ctx).await } DispatchItem::KeepAliveTimeout => { log::warn!("did not receive pong response in time, closing connection"); let streams = self.connection.ping_timeout(); self.handle_connection_error(streams, ConnectionError::KeepaliveTimeout.into()); - Either::Right(Either::Left(ControlResponse::new( + control( ControlMessage::proto_error(ConnectionError::KeepaliveTimeout), &self.inner, ctx, - ))) + ) + .await } DispatchItem::ReadTimeout => { log::warn!("did not receive complete frame in time, closing connection"); let streams = self.connection.read_timeout(); self.handle_connection_error(streams, ConnectionError::ReadTimeout.into()); - Either::Right(Either::Left(ControlResponse::new( + control( ControlMessage::proto_error(ConnectionError::ReadTimeout), &self.inner, ctx, - ))) + ) + .await } DispatchItem::Disconnect(err) => { let streams = self.connection.disconnect(); self.handle_connection_error(streams, OperationError::Disconnected); - Either::Right(Either::Left(ControlResponse::new( - ControlMessage::peer_gone(err), - &self.inner, - ctx, - ))) - } - DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => { - Either::Right(Either::Right(Either::Left(Ready::Ok(None)))) + control(ControlMessage::peer_gone(err), &self.inner, ctx).await } + DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => Ok(None), } } } -pin_project_lite::pin_project! { - /// Publish service response future - pub(crate) struct PublishResponse<'f, P: Service, C: Service>> { - stream: StreamRef, - #[pin] - state: PublishResponseState<'f, P, C>, - ctx: ServiceCtx<'f, Dispatcher>, - inner: &'f Inner, - } -} - -pin_project_lite::pin_project! { - #[project = PublishResponseStateProject] - enum PublishResponseState<'f, P: Service, C: Service>> - where P: 'f - { - Publish { #[pin] fut: ServiceCall<'f, P, Message> }, - Control { #[pin] fut: ControlResponse<'f, C, P> }, - } -} - -impl<'f, P, C> PublishResponse<'f, P, C> -where - P: Service, - P::Error: fmt::Debug, - C: Service, Response = ControlResult>, - C::Error: fmt::Debug, -{ - fn new( - msg: Message, - stream: StreamRef, - inner: &'f Inner, - ctx: ServiceCtx<'f, Dispatcher>, - ) -> Self { - let state = PublishResponseState::Publish { - fut: ctx.call(&inner.publish, msg), - }; - Self { - ctx, - state, - stream, - inner, - } - } -} - -impl<'f, P, C> Future for PublishResponse<'f, P, C> +async fn publish<'f, P, C>( + msg: Message, + stream: StreamRef, + inner: &'f Inner, + ctx: ServiceCtx<'f, Dispatcher>, +) -> Result, ()> where P: Service, P::Error: fmt::Debug, C: Service, Response = ControlResult>, C::Error: fmt::Debug, { - type Output = Result, ()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - loop { - return match this.state.as_mut().project() { - PublishResponseStateProject::Publish { fut } => { - if this.stream.is_remote() { - match this.stream.poll_send_reset(cx) { - Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => { - log::trace!("Stream is closed {:?}", this.stream.id()); - return Poll::Ready(Ok(None)); - } - Poll::Pending => (), - } - } - - match fut.poll(cx) { - Poll::Ready(Ok(_)) => Poll::Ready(Ok(None)), - Poll::Ready(Err(e)) => { - this.state.set(PublishResponseState::Control { - fut: ControlResponse::new( - ControlMessage::app_error(e, this.stream.clone()), - this.inner, - *this.ctx, - ), - }); - continue; - } - Poll::Pending => Poll::Pending, - } + let fut = ctx.call(&inner.publish, msg); + let mut pinned = std::pin::pin!(fut); + let result = poll_fn(|cx| { + if stream.is_remote() { + match stream.poll_send_reset(cx) { + Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => { + log::trace!("Stream is closed {:?}", stream.id()); + return Poll::Ready(Ok(None)); } - PublishResponseStateProject::Control { fut } => fut.poll(cx), - }; + Poll::Pending => (), + } } - } -} - -pin_project_lite::pin_project! { - /// Control service response future - pub(crate) struct ControlResponse<'f, Ctl: Service>, Pub> - where - Ctl: Service>, - Ctl: 'f, - Pub: Service, - Pub: 'f, - { - #[pin] - fut: ServiceCall<'f, Ctl, ControlMessage>, - inner: &'f Inner, - } -} -impl<'f, Ctl, Pub> ControlResponse<'f, Ctl, Pub> -where - Ctl: Service, Response = ControlResult>, - Ctl::Error: fmt::Debug, - Pub: Service, - Pub::Error: fmt::Debug, -{ - fn new( - pkt: ControlMessage, - inner: &'f Inner, - ctx: ServiceCtx<'f, Dispatcher>, - ) -> Self { - Self { - fut: ctx.call(&inner.control, pkt), - inner, + match pinned.as_mut().poll(cx) { + Poll::Ready(Ok(_)) => Poll::Ready(Ok(None)), + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, } + }) + .await; + + match result { + Ok(v) => Ok(v), + Err(e) => control(ControlMessage::app_error(e, stream), inner, ctx).await, } } -impl<'f, Ctl, Pub> Future for ControlResponse<'f, Ctl, Pub> +async fn control<'f, Ctl, Pub>( + pkt: ControlMessage, + inner: &'f Inner, + ctx: ServiceCtx<'f, Dispatcher>, +) -> Result, ()> where Ctl: Service, Response = ControlResult>, Ctl::Error: fmt::Debug, Pub: Service, Pub::Error: fmt::Debug, { - type Output = Result, ()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - match ready!(this.fut.poll(cx)) { - Ok(res) => { - if let Some(Frame::Reset(ref rst)) = res.frame { - if !rst.stream_id().is_zero() { - this.inner - .connection - .rst_stream(rst.stream_id(), rst.reason()); - } - } - if let Some(frm) = res.frame { - this.inner.connection.encode(frm); - } - if res.disconnect { - this.inner.connection.close(); + match ctx.call(&inner.control, pkt).await { + Ok(res) => { + if let Some(Frame::Reset(ref rst)) = res.frame { + if !rst.stream_id().is_zero() { + inner.connection.rst_stream(rst.stream_id(), rst.reason()); } } - Err(err) => { - log::error!("control service has failed with {:?}", err); - // we cannot handle control service errors, close connection - this.inner.connection.encode( - GoAway::new(Reason::INTERNAL_ERROR) - .set_last_stream_id(this.inner.last_stream_id), - ); - this.inner.connection.close(); + if let Some(frm) = res.frame { + inner.connection.encode(frm); } + if res.disconnect { + inner.connection.close(); + } + } + Err(err) => { + log::error!("control service has failed with {:?}", err); + // we cannot handle control service errors, close connection + inner.connection.encode( + GoAway::new(Reason::INTERNAL_ERROR).set_last_stream_id(inner.last_stream_id), + ); + inner.connection.close(); } - Poll::Ready(Ok(None)) } + Ok(None) } diff --git a/src/server/service.rs b/src/server/service.rs index 92e5568..10a3d55 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -2,7 +2,6 @@ use std::{fmt, rc::Rc}; use ntex_io::{Dispatcher as IoDispatcher, Filter, Io, IoBoxed}; use ntex_service::{Service, ServiceCtx, ServiceFactory}; -use ntex_util::future::{BoxFuture, Ready}; use ntex_util::time::{sleep, timeout_checked}; use crate::connection::{Connection, ConnectionFlags}; @@ -69,10 +68,9 @@ where type Error = ServerError<()>; type Service = ServerHandler; type InitError = (); - type Future<'f> = Ready; - fn create(&self, _: ()) -> Self::Future<'_> { - Ready::Ok(ServerHandler(self.0.clone())) + async fn create(&self, _: ()) -> Result { + Ok(ServerHandler(self.0.clone())) } } @@ -90,10 +88,9 @@ where type Error = ServerError<()>; type Service = ServerHandler; type InitError = (); - type Future<'f> = Ready; - fn create(&self, _: ()) -> Self::Future<'_> { - Ready::Ok(ServerHandler(self.0.clone())) + async fn create(&self, _: ()) -> Result { + Ok(ServerHandler(self.0.clone())) } } @@ -143,7 +140,7 @@ where let (codec, con) = create_connection(&io, &inner.config); // start protocol dispatcher - IoDispatcher::with_config( + IoDispatcher::new( io, codec, Dispatcher::new(con, ctl_srv, pub_srv), @@ -165,11 +162,13 @@ where { type Response = (); type Error = ServerError<()>; - type Future<'f> = BoxFuture<'f, Result>; - fn call<'a>(&'a self, io: IoBoxed, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - let slf = ServerHandler(self.0.clone()); - Box::pin(async move { slf.run(io).await }) + async fn call( + &self, + io: IoBoxed, + _: ServiceCtx<'_, Self>, + ) -> Result { + self.run(io).await } } @@ -185,11 +184,13 @@ where { type Response = (); type Error = ServerError<()>; - type Future<'f> = BoxFuture<'f, Result>; - fn call<'a>(&'a self, req: Io, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - let slf = ServerHandler(self.0.clone()); - Box::pin(async move { slf.run(req.into()).await }) + async fn call( + &self, + req: Io, + _: ServiceCtx<'_, Self>, + ) -> Result { + self.run(req.into()).await } } @@ -270,7 +271,7 @@ where let (codec, con) = create_connection(&io, &config); // start protocol dispatcher - IoDispatcher::with_config( + IoDispatcher::new( io, codec, Dispatcher::new(con, ctl_svc, pub_svc), diff --git a/src/stream.rs b/src/stream.rs index 00e7ed1..1af63b7 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,8 +1,11 @@ -use std::{cell::Cell, cmp, cmp::Ordering, fmt, mem, ops, rc::Rc, task::Context, task::Poll}; +use std::{ + cell::Cell, cmp, cmp::Ordering, fmt, future::poll_fn, mem, ops, rc::Rc, task::Context, + task::Poll, +}; use ntex_bytes::Bytes; use ntex_http::{header::CONTENT_LENGTH, HeaderMap, StatusCode}; -use ntex_util::{future::poll_fn, task::LocalWaker}; +use ntex_util::task::LocalWaker; use crate::error::{OperationError, StreamError}; use crate::frame::{