diff --git a/src/error.rs b/src/error.rs index 9ad4c0e5b3..277217229a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -143,10 +143,6 @@ pub(super) enum User { #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "server")] UnexpectedHeader, - /// User tried to respond with a 1xx (not 101) response code. - #[cfg(feature = "http1")] - #[cfg(feature = "server")] - UnsupportedStatusCode, /// User tried polling for an upgrade that doesn't exist. NoUpgrade, @@ -360,12 +356,6 @@ impl Error { Error::new(Kind::HeaderTimeout) } - #[cfg(feature = "http1")] - #[cfg(feature = "server")] - pub(super) fn new_user_unsupported_status_code() -> Error { - Error::new_user(User::UnsupportedStatusCode) - } - pub(super) fn new_user_no_upgrade() -> Error { Error::new_user(User::NoUpgrade) } @@ -496,11 +486,6 @@ impl Error { #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "server")] Kind::User(User::UnexpectedHeader) => "user sent unexpected header", - #[cfg(feature = "http1")] - #[cfg(feature = "server")] - Kind::User(User::UnsupportedStatusCode) => { - "response has 1xx status code, not supported by server" - } Kind::User(User::NoUpgrade) => "no upgrade available", #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use", diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 1235202291..67369e5753 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -235,3 +235,13 @@ impl OriginalHeaderOrder { self.entry_order.iter() } } + +/// Request extension type for sending one or more 1xx informational responses +/// prior to the final response. +/// +/// This extension is meant to be attached to inbound `Request`s, allowing a +/// server to send informational responses immediately (i.e. without delaying +/// them until it has constructed a final, non-informational response). +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +#[derive(Clone, Debug)] +pub struct InformationalSender(pub futures_channel::mpsc::Sender>); diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 8ddf7558e1..6e1606e02e 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -642,7 +642,7 @@ where head.extensions.remove::(); } - Some(encoder) + encoder } Err(err) => { self.state.error = Some(err); diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 4d921a3b83..d852e55e6d 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -8,14 +8,22 @@ use std::{ use crate::rt::{Read, Write}; use bytes::{Buf, Bytes}; +#[cfg(feature = "server")] +use futures_channel::mpsc::{self, Receiver}; use futures_util::ready; +#[cfg(feature = "server")] +use futures_util::StreamExt; use http::Request; +#[cfg(feature = "server")] +use http::Response; use super::{Http1Transaction, Wants}; use crate::body::{Body, DecodedLength, Incoming as IncomingBody}; #[cfg(feature = "client")] use crate::client::dispatch::TrySendError; use crate::common::task; +#[cfg(feature = "server")] +use crate::ext::InformationalSender; use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead}; use crate::upgrade::OnUpgrade; @@ -35,7 +43,7 @@ pub(crate) trait Dispatch { fn poll_msg( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>>; + ) -> Poll), Self::PollError>>>; fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; @@ -46,6 +54,7 @@ cfg_server! { use crate::service::HttpService; pub(crate) struct Server, B> { + informational_rx: Option>>, in_flight: Pin>>, pub(crate) service: S, } @@ -336,17 +345,22 @@ where if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) { let (head, body) = msg.map_err(crate::Error::new_user_service)?; - let body_type = if body.is_end_stream() { + let body_type = if let Some(body) = body { + if body.is_end_stream() { + self.body_rx.set(None); + None + } else { + let btype = body + .size_hint() + .exact() + .map(BodyLength::Known) + .or(Some(BodyLength::Unknown)); + self.body_rx.set(Some(body)); + btype + } + } else { self.body_rx.set(None); None - } else { - let btype = body - .size_hint() - .exact() - .map(BodyLength::Known) - .or(Some(BodyLength::Unknown)); - self.body_rx.set(Some(body)); - btype }; self.conn.write_head(head, body_type); } else { @@ -505,6 +519,7 @@ cfg_server! { { pub(crate) fn new(service: S) -> Server { Server { + informational_rx: None, in_flight: Box::pin(None), service, } @@ -532,8 +547,33 @@ cfg_server! { fn poll_msg( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll), Self::PollError>>> { let mut this = self.as_mut(); + + if let Some(informational_rx) = this.informational_rx.as_mut() { + if let Poll::Ready(informational) = informational_rx.poll_next_unpin(cx) { + if let Some(informational) = informational { + let (parts, _) = informational.into_parts(); + if parts.status.is_informational() { + let head = MessageHead { + version: parts.version, + subject: parts.status, + headers: parts.headers, + extensions: parts.extensions, + }; + return Poll::Ready(Some(Ok((head, None)))); + } else { + // TODO: We should return an error here, but we have + // no way of creating a `Self::PollError`; might + // need to change the signature of + // `Dispatch::poll_msg`. + } + } else { + this.informational_rx = None; + } + } + } + let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() { let resp = ready!(fut.as_mut().poll(cx)?); let (parts, body) = resp.into_parts(); @@ -543,13 +583,14 @@ cfg_server! { headers: parts.headers, extensions: parts.extensions, }; - Poll::Ready(Some(Ok((head, body)))) + Poll::Ready(Some(Ok((head, Some(body))))) } else { unreachable!("poll_msg shouldn't be called if no inflight"); }; // Since in_flight finished, remove it this.in_flight.set(None); + this.informational_rx = None; ret } @@ -561,7 +602,10 @@ cfg_server! { *req.headers_mut() = msg.headers; *req.version_mut() = msg.version; *req.extensions_mut() = msg.extensions; + let (informational_tx, informational_rx) = mpsc::channel(1); + assert!(req.extensions_mut().insert(InformationalSender(informational_tx)).is_none()); let fut = self.service.call(req); + self.informational_rx = Some(informational_rx); self.in_flight.set(Some(fut)); Ok(()) } @@ -607,7 +651,7 @@ cfg_client! { fn poll_msg( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll), Infallible>>> { let mut this = self.as_mut(); debug_assert!(!this.rx_closed); match this.rx.poll_recv(cx) { @@ -627,7 +671,7 @@ cfg_client! { extensions: parts.extensions, }; this.callback = Some(cb); - Poll::Ready(Some(Ok((head, body)))) + Poll::Ready(Some(Ok((head, Some(body))))) } } } diff --git a/src/proto/h1/mod.rs b/src/proto/h1/mod.rs index 017b8671fb..ef07cb8cfd 100644 --- a/src/proto/h1/mod.rs +++ b/src/proto/h1/mod.rs @@ -33,7 +33,8 @@ pub(crate) trait Http1Transaction { #[cfg(feature = "tracing")] const LOG: &'static str; fn parse(bytes: &mut BytesMut, ctx: ParseContext<'_>) -> ParseResult; - fn encode(enc: Encode<'_, Self::Outgoing>, dst: &mut Vec) -> crate::Result; + fn encode(enc: Encode<'_, Self::Outgoing>, dst: &mut Vec) + -> crate::Result>; fn on_error(err: &crate::Error) -> Option>; diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index 528c2b81dd..c07a0d38ae 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -111,7 +111,7 @@ fn is_complete_fast(bytes: &[u8], prev_len: usize) -> bool { pub(super) fn encode_headers( enc: Encode<'_, T::Outgoing>, dst: &mut Vec, -) -> crate::Result +) -> crate::Result> where T: Http1Transaction, { @@ -358,7 +358,10 @@ impl Http1Transaction for Server { })) } - fn encode(mut msg: Encode<'_, Self::Outgoing>, dst: &mut Vec) -> crate::Result { + fn encode( + msg: Encode<'_, Self::Outgoing>, + dst: &mut Vec, + ) -> crate::Result> { trace!( "Server::encode status={:?}, body={:?}, req_method={:?}", msg.head.subject, @@ -368,25 +371,19 @@ impl Http1Transaction for Server { let mut wrote_len = false; - // hyper currently doesn't support returning 1xx status codes as a Response - // This is because Service only allows returning a single Response, and - // so if you try to reply with a e.g. 100 Continue, you have no way of - // replying with the latter status code response. - let (ret, is_last) = if msg.head.subject == StatusCode::SWITCHING_PROTOCOLS { - (Ok(()), true) + let informational = msg.head.subject.is_informational(); + + let is_last = if msg.head.subject == StatusCode::SWITCHING_PROTOCOLS { + true } else if msg.req_method == &Some(Method::CONNECT) && msg.head.subject.is_success() { // Sending content-length or transfer-encoding header on 2xx response // to CONNECT is forbidden in RFC 7231. wrote_len = true; - (Ok(()), true) - } else if msg.head.subject.is_informational() { - warn!("response with 1xx status code not supported"); - *msg.head = MessageHead::default(); - msg.head.subject = StatusCode::INTERNAL_SERVER_ERROR; - msg.body = None; - (Err(crate::Error::new_user_unsupported_status_code()), true) + true + } else if informational { + false } else { - (Ok(()), !msg.keep_alive) + !msg.keep_alive }; // In some error cases, we don't know about the invalid message until already @@ -444,6 +441,7 @@ impl Http1Transaction for Server { } orig_headers => orig_headers, }; + let encoder = if let Some(orig_headers) = orig_headers { Self::encode_headers_with_original_case( msg, @@ -457,7 +455,11 @@ impl Http1Transaction for Server { Self::encode_headers_with_lower_case(msg, dst, is_last, orig_len, wrote_len)? }; - ret.map(|()| encoder) + // If we're sending a 1xx informational response, it won't have a body, + // so we'll return `None` here. Additionally, that will tell + // `Conn::write_head` to stay in the `Writing::Init` state since we + // haven't yet sent the final response. + Ok(if informational { None } else { Some(encoder) }) } fn on_error(err: &crate::Error) -> Option> { @@ -1168,7 +1170,10 @@ impl Http1Transaction for Client { } } - fn encode(msg: Encode<'_, Self::Outgoing>, dst: &mut Vec) -> crate::Result { + fn encode( + msg: Encode<'_, Self::Outgoing>, + dst: &mut Vec, + ) -> crate::Result> { trace!( "Client::encode method={:?}, body={:?}", msg.head.subject.0, @@ -1214,7 +1219,7 @@ impl Http1Transaction for Client { extend(dst, b"\r\n"); msg.head.headers.clear(); //TODO: remove when switching to drain() - Ok(body) + Ok(Some(body)) } fn on_error(_err: &crate::Error) -> Option> {