From d326beea450321e2a4027190a6f7db5fbce55f49 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 23 Sep 2022 08:42:33 +0200 Subject: [PATCH] fix(multistream-select): remove parallel dialing optimization This is to avoid the usage of the now optional `ls` command, and stay compatible with go-multistream. Closes #2925 --- misc/multistream-select/CHANGELOG.md | 4 + misc/multistream-select/src/dialer_select.rs | 211 +------------------ misc/multistream-select/src/tests.rs | 35 +-- 3 files changed, 10 insertions(+), 240 deletions(-) diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index 5b3b336775b..7f6ec7bfbc0 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,3 +1,7 @@ +# Unreleased + +- Remove parallel dialing optimization, to avoid requiring the use of the `ls` command. + # 0.11.0 [2022-01-27] - Migrate to Rust edition 2021 (see [PR 2339]). diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 7a8c75daa6f..ed6dcfe6e96 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -23,7 +23,7 @@ use crate::protocol::{HeaderLine, Message, MessageIO, Protocol, ProtocolError}; use crate::{Negotiated, NegotiationError, Version}; -use futures::{future::Either, prelude::*}; +use futures::prelude::*; use std::{ convert::TryFrom as _, iter, mem, @@ -39,12 +39,6 @@ use std::{ /// returned `Future` resolves with the name of the negotiated protocol and /// a [`Negotiated`] I/O stream. /// -/// The chosen message flow for protocol negotiation depends on the numbers of -/// supported protocols given. That is, this function delegates to serial or -/// parallel variant based on the number of protocols given. The number of -/// protocols is determined through the `size_hint` of the given iterator and -/// thus an inaccurate size estimate may result in a suboptimal choice. -/// /// Within the scope of this library, a dialer always commits to a specific /// multistream-select [`Version`], whereas a listener always supports /// all versions supported by this library. Frictionless multistream-select @@ -55,44 +49,13 @@ pub fn dialer_select_proto( protocols: I, version: Version, ) -> DialerSelectFuture -where - R: AsyncRead + AsyncWrite, - I: IntoIterator, - I::Item: AsRef<[u8]>, -{ - let iter = protocols.into_iter(); - // We choose between the "serial" and "parallel" strategies based on the number of protocols. - if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { - Either::Left(dialer_select_proto_serial(inner, iter, version)) - } else { - Either::Right(dialer_select_proto_parallel(inner, iter, version)) - } -} - -/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer -/// either trying protocols in-order, or by requesting all protocols supported -/// by the remote upfront, from which the first protocol found in the dialer's -/// list of protocols is selected. -pub type DialerSelectFuture = Either, DialerSelectPar>; - -/// Returns a `Future` that negotiates a protocol on the given I/O stream. -/// -/// Just like [`dialer_select_proto`] but always using an iterative message flow, -/// trying the given list of supported protocols one-by-one. -/// -/// This strategy is preferable if the dialer only supports a few protocols. -pub(crate) fn dialer_select_proto_serial( - inner: R, - protocols: I, - version: Version, -) -> DialerSelectSeq where R: AsyncRead + AsyncWrite, I: IntoIterator, I::Item: AsRef<[u8]>, { let protocols = protocols.into_iter().peekable(); - DialerSelectSeq { + DialerSelectFuture { version, protocols, state: SeqState::SendHeader { @@ -101,39 +64,10 @@ where } } -/// Returns a `Future` that negotiates a protocol on the given I/O stream. -/// -/// Just like [`dialer_select_proto`] but always using a message flow that first -/// requests all supported protocols from the remote, selecting the first -/// protocol from the given list of supported protocols that is supported -/// by the remote. -/// -/// This strategy may be beneficial if the dialer supports many protocols -/// and it is unclear whether the remote supports one of the first few. -pub(crate) fn dialer_select_proto_parallel( - inner: R, - protocols: I, - version: Version, -) -> DialerSelectPar -where - R: AsyncRead + AsyncWrite, - I: IntoIterator, - I::Item: AsRef<[u8]>, -{ - let protocols = protocols.into_iter(); - DialerSelectPar { - version, - protocols, - state: ParState::SendHeader { - io: MessageIO::new(inner), - }, - } -} - -/// A `Future` returned by [`dialer_select_proto_serial`] which negotiates +/// A `Future` returned by [`dialer_select_proto`] which negotiates /// a protocol iteratively by considering one protocol after the other. #[pin_project::pin_project] -pub struct DialerSelectSeq { +pub struct DialerSelectFuture { // TODO: It would be nice if eventually N = I::Item = Protocol. protocols: iter::Peekable, state: SeqState, @@ -148,7 +82,7 @@ enum SeqState { Done, } -impl Future for DialerSelectSeq +impl Future for DialerSelectFuture where // The Unpin bound here is required because we produce a `Negotiated` as the output. // It also makes the implementation considerably easier to write. @@ -267,138 +201,3 @@ where } } } - -/// A `Future` returned by [`dialer_select_proto_parallel`] which negotiates -/// a protocol selectively by considering all supported protocols of the remote -/// "in parallel". -#[pin_project::pin_project] -pub struct DialerSelectPar { - protocols: I, - state: ParState, - version: Version, -} - -enum ParState { - SendHeader { io: MessageIO }, - SendProtocolsRequest { io: MessageIO }, - Flush { io: MessageIO }, - RecvProtocols { io: MessageIO }, - SendProtocol { io: MessageIO, protocol: N }, - Done, -} - -impl Future for DialerSelectPar -where - // The Unpin bound here is required because we produce a `Negotiated` as the output. - // It also makes the implementation considerably easier to write. - R: AsyncRead + AsyncWrite + Unpin, - I: Iterator, - I::Item: AsRef<[u8]>, -{ - type Output = Result<(I::Item, Negotiated), NegotiationError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - loop { - match mem::replace(this.state, ParState::Done) { - ParState::SendHeader { mut io } => { - match Pin::new(&mut io).poll_ready(cx)? { - Poll::Ready(()) => {} - Poll::Pending => { - *this.state = ParState::SendHeader { io }; - return Poll::Pending; - } - } - - let msg = Message::Header(HeaderLine::from(*this.version)); - if let Err(err) = Pin::new(&mut io).start_send(msg) { - return Poll::Ready(Err(From::from(err))); - } - - *this.state = ParState::SendProtocolsRequest { io }; - } - - ParState::SendProtocolsRequest { mut io } => { - match Pin::new(&mut io).poll_ready(cx)? { - Poll::Ready(()) => {} - Poll::Pending => { - *this.state = ParState::SendProtocolsRequest { io }; - return Poll::Pending; - } - } - - if let Err(err) = Pin::new(&mut io).start_send(Message::ListProtocols) { - return Poll::Ready(Err(From::from(err))); - } - - log::debug!("Dialer: Requested supported protocols."); - *this.state = ParState::Flush { io } - } - - ParState::Flush { mut io } => match Pin::new(&mut io).poll_flush(cx)? { - Poll::Ready(()) => *this.state = ParState::RecvProtocols { io }, - Poll::Pending => { - *this.state = ParState::Flush { io }; - return Poll::Pending; - } - }, - - ParState::RecvProtocols { mut io } => { - let msg = match Pin::new(&mut io).poll_next(cx)? { - Poll::Ready(Some(msg)) => msg, - Poll::Pending => { - *this.state = ParState::RecvProtocols { io }; - return Poll::Pending; - } - // Treat EOF error as [`NegotiationError::Failed`], not as - // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O - // stream as a permissible way to "gracefully" fail a negotiation. - Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), - }; - - match &msg { - Message::Header(h) if h == &HeaderLine::from(*this.version) => { - *this.state = ParState::RecvProtocols { io } - } - Message::Protocols(supported) => { - let protocol = this - .protocols - .by_ref() - .find(|p| supported.iter().any(|s| s.as_ref() == p.as_ref())) - .ok_or(NegotiationError::Failed)?; - log::debug!( - "Dialer: Found supported protocol: {}", - String::from_utf8_lossy(protocol.as_ref()) - ); - *this.state = ParState::SendProtocol { io, protocol }; - } - _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())), - } - } - - ParState::SendProtocol { mut io, protocol } => { - match Pin::new(&mut io).poll_ready(cx)? { - Poll::Ready(()) => {} - Poll::Pending => { - *this.state = ParState::SendProtocol { io, protocol }; - return Poll::Pending; - } - } - - let p = Protocol::try_from(protocol.as_ref())?; - if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) { - return Poll::Ready(Err(From::from(err))); - } - - log::debug!("Dialer: Expecting proposed protocol: {}", p); - let io = Negotiated::expecting(io.into_reader(), p, None); - - return Poll::Ready(Ok((protocol, io))); - } - - ParState::Done => panic!("ParState::poll called after completion"), - } - } - } -} diff --git a/misc/multistream-select/src/tests.rs b/misc/multistream-select/src/tests.rs index ca627d24fcf..4a1592b741f 100644 --- a/misc/multistream-select/src/tests.rs +++ b/misc/multistream-select/src/tests.rs @@ -22,7 +22,6 @@ #![cfg(test)] -use crate::dialer_select::{dialer_select_proto_parallel, dialer_select_proto_serial}; use crate::{dialer_select_proto, listener_select_proto}; use crate::{NegotiationError, Version}; @@ -182,38 +181,6 @@ fn negotiation_failed() { } } -#[test] -fn select_proto_parallel() { - async fn run(version: Version) { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let listener_addr = listener.local_addr().unwrap(); - - let server = async_std::task::spawn(async move { - let connec = listener.accept().await.unwrap().0; - let protos = vec![b"/proto1", b"/proto2"]; - let (proto, io) = listener_select_proto(connec, protos).await.unwrap(); - assert_eq!(proto, b"/proto2"); - io.complete().await.unwrap(); - }); - - let client = async_std::task::spawn(async move { - let connec = TcpStream::connect(&listener_addr).await.unwrap(); - let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto_parallel(connec, protos.into_iter(), version) - .await - .unwrap(); - assert_eq!(proto, b"/proto2"); - io.complete().await.unwrap(); - }); - - server.await; - client.await; - } - - async_std::task::block_on(run(Version::V1)); - async_std::task::block_on(run(Version::V1Lazy)); -} - #[test] fn select_proto_serial() { async fn run(version: Version) { @@ -231,7 +198,7 @@ fn select_proto_serial() { let client = async_std::task::spawn(async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto_serial(connec, protos.into_iter(), version) + let (proto, io) = dialer_select_proto(connec, protos.into_iter(), version) .await .unwrap(); assert_eq!(proto, b"/proto2");