diff --git a/src/server.rs b/src/server.rs index 3c093f7ee..7f1cf84d8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -116,7 +116,9 @@ //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html use crate::codec::{Codec, RecvError, UserError}; -use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId}; +use crate::frame::{ + self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId, +}; use crate::proto::{self, Config, Prioritized}; use crate::{FlowControl, PingPong, RecvStream, SendStream}; @@ -396,7 +398,8 @@ where /// Accept the next incoming request on this connection. pub async fn accept( &mut self, - ) -> Option, SendResponse), crate::Error>> { + ) -> Option, SendResponse), crate::Error>> + { futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await } @@ -404,7 +407,9 @@ where pub fn poll_accept( &mut self, cx: &mut Context<'_>, - ) -> Poll, SendResponse), crate::Error>>> { + ) -> Poll< + Option, SendResponse), crate::Error>>, + > { // Always try to advance the internal state. Getting Pending also is // needed to allow this function to return Pending. if let Poll::Ready(_) = self.poll_closed(cx)? { @@ -416,7 +421,8 @@ where if let Some(inner) = self.connection.next_incoming() { tracing::trace!("received incoming"); let (head, _) = inner.take_request().into_parts(); - let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque())); + let body = + RecvStream::new(FlowControl::new(inner.clone_to_opaque())); let request = Request::from_parts(head, body); let respond = SendResponse { inner }; @@ -462,7 +468,10 @@ where /// /// Returns an error if a previous call is still pending acknowledgement /// from the remote endpoint. - pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> { + pub fn set_initial_window_size( + &mut self, + size: u32, + ) -> Result<(), crate::Error> { assert!(size <= proto::MAX_WINDOW_SIZE); self.connection.set_initial_window_size(size)?; Ok(()) @@ -481,13 +490,19 @@ where /// [`poll_accept`]: struct.Connection.html#method.poll_accept /// [`RecvStream`]: ../struct.RecvStream.html /// [`SendStream`]: ../struct.SendStream.html - pub fn poll_closed(&mut self, cx: &mut Context) -> Poll> { + pub fn poll_closed( + &mut self, + cx: &mut Context, + ) -> Poll> { self.connection.poll(cx).map_err(Into::into) } #[doc(hidden)] #[deprecated(note = "renamed to poll_closed")] - pub fn poll_close(&mut self, cx: &mut Context) -> Poll> { + pub fn poll_close( + &mut self, + cx: &mut Context, + ) -> Poll> { self.poll_closed(cx) } @@ -539,7 +554,10 @@ where { type Item = Result<(Request, SendResponse), crate::Error>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.poll_accept(cx) } } @@ -586,7 +604,9 @@ impl Builder { /// ``` pub fn new() -> Builder { Builder { - reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), + reset_stream_duration: Duration::from_secs( + proto::DEFAULT_RESET_STREAM_SECS, + ), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, settings: Settings::default(), initial_target_connection_window_size: None, @@ -1023,7 +1043,10 @@ impl SendResponse { /// /// Calling this method after having called `send_response` will return /// a user error. - pub fn poll_reset(&mut self, cx: &mut Context) -> Poll> { + pub fn poll_reset( + &mut self, + cx: &mut Context, + ) -> Poll> { self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders) } @@ -1095,7 +1118,10 @@ impl SendPushedResponse { /// /// Calling this method after having called `send_response` will return /// a user error. - pub fn poll_reset(&mut self, cx: &mut Context) -> Poll> { + pub fn poll_reset( + &mut self, + cx: &mut Context, + ) -> Poll> { self.inner.poll_reset(cx) } @@ -1124,9 +1150,13 @@ where { type Output = Result, crate::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { // Flush the codec - ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?; + ready!(self.codec.as_mut().unwrap().flush(cx)) + .map_err(crate::Error::from_io)?; // Return the codec Poll::Ready(Ok(self.codec.take().unwrap())) @@ -1153,7 +1183,10 @@ where { type Output = Result, crate::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { let mut buf = [0; 24]; let mut rem = PREFACE.len() - self.pos; @@ -1161,10 +1194,12 @@ where let n = ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf[..rem])) .map_err(crate::Error::from_io)?; if n == 0 { - return Poll::Ready(Err(crate::Error::from_io(io::Error::new( - io::ErrorKind::UnexpectedEof, - "connection closed before reading preface", - )))); + return Poll::Ready(Err(crate::Error::from_io( + io::Error::new( + io::ErrorKind::UnexpectedEof, + "connection closed before reading preface", + ), + ))); } if PREFACE[self.pos..self.pos + n] != buf[..n] { @@ -1190,7 +1225,10 @@ where { type Output = Result, crate::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { let span = self.span.clone(); // XXX(eliza): T_T let _e = span.enter(); tracing::trace!(state = ?self.state); @@ -1245,7 +1283,8 @@ where tracing::trace!("connection established!"); let mut c = Connection { connection }; - if let Some(sz) = self.builder.initial_target_connection_window_size { + if let Some(sz) = self.builder.initial_target_connection_window_size + { c.set_target_window_size(sz); } Ok(c) @@ -1383,7 +1422,9 @@ impl proto::Peer for Peer { // Specifying :status for a request is a protocol error if pseudo.status.is_some() { - tracing::trace!("malformed headers: :status field on request; PROTOCOL_ERROR"); + tracing::trace!( + "malformed headers: :status field on request; PROTOCOL_ERROR" + ); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); } @@ -1393,14 +1434,21 @@ impl proto::Peer for Peer { // A request translated from HTTP/1 must not include the :authority // header if let Some(authority) = pseudo.authority { - let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner()); - parts.authority = Some(maybe_authority.or_else(|why| { - malformed!( - "malformed headers: malformed authority ({:?}): {}", - authority, - why, - ) - })?); + // When connecting to a UNIX Domain Socket (UDS), then we might get a path for the + // authority field. If it's a local path and exists, then we do not error in that case + // and assume an UDS. + if !authority.as_str().ends_with(".sock") { + let maybe_authority = uri::Authority::from_maybe_shared( + authority.clone().into_inner(), + ); + parts.authority = Some(maybe_authority.or_else(|why| { + malformed!( + "malformed headers: malformed authority ({:?}): {}", + authority, + why, + ) + })?); + } } // A :scheme is required, except CONNECT. @@ -1437,9 +1485,14 @@ impl proto::Peer for Peer { malformed!("malformed headers: missing path"); } - let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner()); + let maybe_path = + uri::PathAndQuery::from_maybe_shared(path.clone().into_inner()); parts.path_and_query = Some(maybe_path.or_else(|why| { - malformed!("malformed headers: malformed path ({:?}): {}", path, why,) + malformed!( + "malformed headers: malformed path ({:?}): {}", + path, + why, + ) })?); } @@ -1474,7 +1527,9 @@ where fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { match *self { Handshaking::Flushing(_) => write!(f, "Handshaking::Flushing(_)"), - Handshaking::ReadingPreface(_) => write!(f, "Handshaking::ReadingPreface(_)"), + Handshaking::ReadingPreface(_) => { + write!(f, "Handshaking::ReadingPreface(_)") + } Handshaking::Empty => write!(f, "Handshaking::Empty"), } } @@ -1498,7 +1553,9 @@ where { #[inline] fn from(read: ReadPreface>) -> Self { - Handshaking::ReadingPreface(read.instrument(tracing::trace_span!("read_preface"))) + Handshaking::ReadingPreface( + read.instrument(tracing::trace_span!("read_preface")), + ) } }