From a4d896681952726fcb6c73a04c2a7b4d94ee91bd Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 16:28:08 +1100 Subject: [PATCH 01/18] Seal `Provider` trait --- protocols/mdns/src/behaviour.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index bc102f832df..9d1f9e3a17d 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -23,6 +23,7 @@ mod socket; mod timer; use self::iface::InterfaceState; +use crate::behaviour::sealed::Sealed; use crate::behaviour::{socket::AsyncSocket, timer::Builder}; use crate::Config; use futures::Stream; @@ -39,7 +40,8 @@ use std::collections::hash_map::{Entry, HashMap}; use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time::Instant}; /// An abstraction to allow for compatibility with various async runtimes. -pub trait Provider: 'static { +#[doc(hidden)] +pub trait Provider: 'static + Sealed { /// The Async Socket type. type Socket: AsyncSocket; /// The Async Timer type. @@ -51,6 +53,12 @@ pub trait Provider: 'static { fn new_watcher() -> Result; } +mod sealed { + pub trait Sealed {} + impl Sealed for super::async_io::AsyncIo {} + impl Sealed for super::tokio::Tokio {} +} + /// The type of a [`Behaviour`] using the `async-io` implementation. #[cfg(feature = "async-io")] pub mod async_io { From 1438a493505823f9a41f4acbf910eea74f3ea8a7 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 16:31:20 +1100 Subject: [PATCH 02/18] Bump mdns version --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/mdns/CHANGELOG.md | 6 ++++++ protocols/mdns/Cargo.toml | 2 +- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96f5b8e73c3..5b928b144b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2707,7 +2707,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" -version = "0.44.0" +version = "0.44.1" dependencies = [ "async-io", "async-std", diff --git a/Cargo.toml b/Cargo.toml index 5334d279963..4bde5ea0e62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.43.1", path = "protocols/identify" } libp2p-identity = { version = "0.2.5" } libp2p-kad = { version = "0.44.6", path = "protocols/kad" } -libp2p-mdns = { version = "0.44.0", path = "protocols/mdns" } +libp2p-mdns = { version = "0.44.1", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.1.0", path = "misc/memory-connection-limits" } libp2p-metrics = { version = "0.13.1", path = "misc/metrics" } libp2p-mplex = { version = "0.40.0", path = "muxers/mplex" } diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 3a287e9031a..825649c5f12 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.44.1 - unreleased + +- Seal `Provider` trait. + Whilst technically a breaking change, this was never considered public API. + See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX). + ## 0.44.0 - Change `mdns::Event` to hold `Vec` and remove `DiscoveredAddrsIter` and `ExpiredAddrsIter`. diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 74530d41e8b..bba485aa478 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-mdns" edition = "2021" rust-version = { workspace = true } -version = "0.44.0" +version = "0.44.1" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] license = "MIT" From f4673ea33cca55c7d7acd2e258d3250faaab7b42 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 16:40:22 +1100 Subject: [PATCH 03/18] Use return value from `on_swarm_event` handler to fire timer --- protocols/mdns/src/behaviour.rs | 24 +++++------------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 9d1f9e3a17d..de4ad9aceb7 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -233,27 +233,13 @@ where } fn on_swarm_event(&mut self, event: FromSwarm) { - self.listen_addresses.on_swarm_event(&event); + let listen_addresses_changed = self.listen_addresses.on_swarm_event(&event); - match event { - FromSwarm::NewListener(_) => { - log::trace!("waking interface state because listening address changed"); - for iface in self.iface_states.values_mut() { - iface.fire_timer(); - } + if listen_addresses_changed { + log::trace!("waking interface state because listening address changed"); + for iface in self.iface_states.values_mut() { + iface.fire_timer(); } - FromSwarm::ConnectionClosed(_) - | FromSwarm::ConnectionEstablished(_) - | FromSwarm::DialFailure(_) - | FromSwarm::AddressChange(_) - | FromSwarm::ListenFailure(_) - | FromSwarm::NewListenAddr(_) - | FromSwarm::ExpiredListenAddr(_) - | FromSwarm::ListenerError(_) - | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) - | FromSwarm::ExternalAddrExpired(_) - | FromSwarm::ExternalAddrConfirmed(_) => {} } } From a9035b9d3d2c5e925705cf5327d00113e6f4b231 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 16:46:52 +1100 Subject: [PATCH 04/18] Deprecated unused APIs --- protocols/mdns/src/behaviour.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index de4ad9aceb7..49061eebf97 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -155,6 +155,7 @@ where } /// Returns true if the given `PeerId` is in the list of nodes discovered through mDNS. + #[deprecated(note = "Use `discovered_nodes` iterator instead.")] pub fn has_node(&self, peer_id: &PeerId) -> bool { self.discovered_nodes().any(|p| p == peer_id) } @@ -165,6 +166,7 @@ where } /// Expires a node before the ttl. + #[deprecated(note = "Unused API. Will be removed in the next release.")] pub fn expire_node(&mut self, peer_id: &PeerId) { let now = Instant::now(); for (peer, _addr, expires) in &mut self.discovered_nodes { From 6e35466207ca27f33eff464b73a4a8546e57fabe Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 16:58:22 +1100 Subject: [PATCH 05/18] Don't fire timers on interfaces changes We create and destroy the interface states as the come up. Waking the interface states only _sends_ queries but does not influence what we send in response. We always respond with the up-to-date listen addresses anyway. --- protocols/mdns/src/behaviour.rs | 9 +-------- protocols/mdns/src/behaviour/iface.rs | 4 ---- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 49061eebf97..b0c6377f072 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -235,14 +235,7 @@ where } fn on_swarm_event(&mut self, event: FromSwarm) { - let listen_addresses_changed = self.listen_addresses.on_swarm_event(&event); - - if listen_addresses_changed { - log::trace!("waking interface state because listening address changed"); - for iface in self.iface_states.values_mut() { - iface.fire_timer(); - } - } + self.listen_addresses.on_swarm_event(&event); } fn poll( diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 54d6c657380..8eca13c7f29 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -172,10 +172,6 @@ where self.timeout = T::interval(interval); } - pub(crate) fn fire_timer(&mut self) { - self.timeout = T::interval_at(Instant::now(), INITIAL_TIMEOUT_INTERVAL); - } - pub(crate) fn poll( &mut self, cx: &mut Context, From 9ba036b7b78d490cd78720caad061ef5661b1a82 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 17:21:08 +1100 Subject: [PATCH 06/18] Perform IO on separate task --- protocols/mdns/Cargo.toml | 3 +- protocols/mdns/src/behaviour.rs | 93 +++++++++++++++++++-------- protocols/mdns/src/behaviour/iface.rs | 51 +++++++++++---- 3 files changed, 108 insertions(+), 39 deletions(-) diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index bba485aa478..d294c096f20 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -11,6 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +async-std = { version = "1.12.0", optional = true } async-io = { version = "1.13.0", optional = true } data-encoding = "2.4.0" futures = "0.3.28" @@ -28,7 +29,7 @@ void = "1.0.2" [features] tokio = ["dep:tokio", "if-watch/tokio"] -async-io = ["dep:async-io", "if-watch/smol"] +async-io = ["dep:async-io", "dep:async-std", "if-watch/smol"] [dev-dependencies] async-std = { version = "1.9.0", features = ["attributes"] } diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index b0c6377f072..fce07f55634 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -26,7 +26,8 @@ use self::iface::InterfaceState; use crate::behaviour::sealed::Sealed; use crate::behaviour::{socket::AsyncSocket, timer::Builder}; use crate::Config; -use futures::Stream; +use futures::channel::mpsc; +use futures::{Stream, StreamExt}; use if_watch::IfEvent; use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; @@ -37,6 +38,8 @@ use libp2p_swarm::{ }; use smallvec::SmallVec; use std::collections::hash_map::{Entry, HashMap}; +use std::future::Future; +use std::sync::{Arc, RwLock}; use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time::Instant}; /// An abstraction to allow for compatibility with various async runtimes. @@ -49,13 +52,19 @@ pub trait Provider: 'static + Sealed { /// The IfWatcher type. type Watcher: Stream> + fmt::Debug + Unpin; + type TaskHandle; + /// Create a new instance of the `IfWatcher` type. fn new_watcher() -> Result; + + fn spawn(task: impl Future + Send + 'static) -> Self::TaskHandle; } mod sealed { pub trait Sealed {} + #[cfg(feature = "async-io")] impl Sealed for super::async_io::AsyncIo {} + #[cfg(feature = "tokio")] impl Sealed for super::tokio::Tokio {} } @@ -64,7 +73,9 @@ mod sealed { pub mod async_io { use super::Provider; use crate::behaviour::{socket::asio::AsyncUdpSocket, timer::asio::AsyncTimer}; + use async_std::task::JoinHandle; use if_watch::smol::IfWatcher; + use std::future::Future; #[doc(hidden)] pub enum AsyncIo {} @@ -73,10 +84,15 @@ pub mod async_io { type Socket = AsyncUdpSocket; type Timer = AsyncTimer; type Watcher = IfWatcher; + type TaskHandle = JoinHandle<()>; fn new_watcher() -> Result { IfWatcher::new() } + + fn spawn(task: impl Future + Send + 'static) -> JoinHandle<()> { + async_std::task::spawn(task) + } } pub type Behaviour = super::Behaviour; @@ -88,6 +104,8 @@ pub mod tokio { use super::Provider; use crate::behaviour::{socket::tokio::TokioUdpSocket, timer::tokio::TokioTimer}; use if_watch::tokio::IfWatcher; + use std::future::Future; + use tokio::task::JoinHandle; #[doc(hidden)] pub enum Tokio {} @@ -96,10 +114,15 @@ pub mod tokio { type Socket = TokioUdpSocket; type Timer = TokioTimer; type Watcher = IfWatcher; + type TaskHandle = JoinHandle<()>; fn new_watcher() -> Result { IfWatcher::new() } + + fn spawn(task: impl Future + Send + 'static) -> Self::TaskHandle { + tokio::spawn(task) + } } pub type Behaviour = super::Behaviour; @@ -118,8 +141,11 @@ where /// Iface watcher. if_watch: P::Watcher, - /// Mdns interface states. - iface_states: HashMap>, + /// Handles to tasks running the mDNS queries. + if_tasks: HashMap, + + query_response_receiver: mpsc::Receiver<(PeerId, Multiaddr, Instant)>, + query_response_sender: mpsc::Sender<(PeerId, Multiaddr, Instant)>, /// List of nodes that we have discovered, the address, and when their TTL expires. /// @@ -132,7 +158,7 @@ where /// `None` if `discovered_nodes` is empty. closest_expiration: Option, - listen_addresses: ListenAddresses, + listen_addresses: Arc>, local_peer_id: PeerId, } @@ -143,10 +169,14 @@ where { /// Builds a new `Mdns` behaviour. pub fn new(config: Config, local_peer_id: PeerId) -> io::Result { + let (tx, rx) = mpsc::channel(10); // Chosen arbitrarily. + Ok(Self { config, if_watch: P::new_watcher()?, - iface_states: Default::default(), + if_tasks: Default::default(), + query_response_receiver: rx, + query_response_sender: tx, discovered_nodes: Default::default(), closest_expiration: Default::default(), listen_addresses: Default::default(), @@ -235,7 +265,10 @@ where } fn on_swarm_event(&mut self, event: FromSwarm) { - self.listen_addresses.on_swarm_event(&event); + self.listen_addresses + .write() + .unwrap_or_else(|e| e.into_inner()) + .on_swarm_event(&event); } fn poll( @@ -256,19 +289,27 @@ where { continue; } - if let Entry::Vacant(e) = self.iface_states.entry(addr) { - match InterfaceState::new(addr, self.config.clone(), self.local_peer_id) { - Ok(iface_state) => { - e.insert(iface_state); + if let Entry::Vacant(e) = self.if_tasks.entry(addr) { + match InterfaceState::::new( + addr, + self.config.clone(), + self.local_peer_id, + self.listen_addresses.clone(), + self.query_response_sender.clone(), + ) { + Ok(mut iface_state) => { + e.insert(P::spawn(async move { + futures::future::poll_fn(move |cx| iface_state.poll(cx)).await; + })); } Err(err) => log::error!("failed to create `InterfaceState`: {}", err), } } } Ok(IfEvent::Down(inet)) => { - if self.iface_states.contains_key(&inet.addr()) { + if self.if_tasks.contains_key(&inet.addr()) { log::info!("dropping instance {}", inet.addr()); - self.iface_states.remove(&inet.addr()); + self.if_tasks.remove(&inet.addr()); } } Err(err) => log::error!("if watch returned an error: {}", err), @@ -276,23 +317,23 @@ where } // Emit discovered event. let mut discovered = Vec::new(); - for iface_state in self.iface_states.values_mut() { - while let Poll::Ready((peer, addr, expiration)) = - iface_state.poll(cx, &self.listen_addresses) + + while let Poll::Ready(Some((peer, addr, expiration))) = + self.query_response_receiver.poll_next_unpin(cx) + { + if let Some((_, _, cur_expires)) = self + .discovered_nodes + .iter_mut() + .find(|(p, a, _)| *p == peer && *a == addr) { - if let Some((_, _, cur_expires)) = self - .discovered_nodes - .iter_mut() - .find(|(p, a, _)| *p == peer && *a == addr) - { - *cur_expires = cmp::max(*cur_expires, expiration); - } else { - log::info!("discovered: {} {}", peer, addr); - self.discovered_nodes.push((peer, addr.clone(), expiration)); - discovered.push((peer, addr)); - } + *cur_expires = cmp::max(*cur_expires, expiration); + } else { + log::info!("discovered: {} {}", peer, addr); + self.discovered_nodes.push((peer, addr.clone(), expiration)); + discovered.push((peer, addr)); } } + if !discovered.is_empty() { let event = Event::Discovered(discovered); return Poll::Ready(ToSwarm::GenerateEvent(event)); diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 8eca13c7f29..60fdb908955 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -18,17 +18,17 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -mod dns; -mod query; - use self::dns::{build_query, build_query_response, build_service_discovery_response}; use self::query::MdnsPacket; use crate::behaviour::{socket::AsyncSocket, timer::Builder}; use crate::Config; +use futures::channel::mpsc; +use futures::SinkExt; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_swarm::ListenAddresses; use socket2::{Domain, Socket, Type}; +use std::sync::{Arc, RwLock}; use std::{ collections::VecDeque, io, @@ -38,6 +38,9 @@ use std::{ time::{Duration, Instant}, }; +mod dns; +mod query; + /// Initial interval for starting probe const INITIAL_TIMEOUT_INTERVAL: Duration = Duration::from_millis(500); @@ -72,6 +75,11 @@ pub(crate) struct InterfaceState { recv_socket: U, /// Send socket. send_socket: U, + + listen_addresses: Arc>, + + query_response_sender: mpsc::Sender<(PeerId, Multiaddr, Instant)>, + /// Buffer used for receiving data from the main socket. /// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000 /// bytes, if it can be ensured that all participating devices can handle such large packets. @@ -101,7 +109,13 @@ where T: Builder + futures::Stream, { /// Builds a new [`InterfaceState`]. - pub(crate) fn new(addr: IpAddr, config: Config, local_peer_id: PeerId) -> io::Result { + pub(crate) fn new( + addr: IpAddr, + config: Config, + local_peer_id: PeerId, + listen_addresses: Arc>, + query_response_sender: mpsc::Sender<(PeerId, Multiaddr, Instant)>, + ) -> io::Result { log::info!("creating instance on iface {}", addr); let recv_socket = match addr { IpAddr::V4(addr) => { @@ -154,6 +168,8 @@ where addr, recv_socket, send_socket, + listen_addresses, + query_response_sender, recv_buffer: [0; 4096], send_buffer: Default::default(), discovered: Default::default(), @@ -172,11 +188,7 @@ where self.timeout = T::interval(interval); } - pub(crate) fn poll( - &mut self, - cx: &mut Context, - listen_addresses: &ListenAddresses, - ) -> Poll<(PeerId, Multiaddr, Instant)> { + pub(crate) fn poll(&mut self, cx: &mut Context) -> Poll<()> { loop { // 1st priority: Low latency: Create packet ASAP after timeout. if Pin::new(&mut self.timeout).poll_next(cx).is_ready() { @@ -219,8 +231,20 @@ where } // 3rd priority: Keep local buffers small: Return discovered addresses. - if let Some(discovered) = self.discovered.pop_front() { - return Poll::Ready(discovered); + if self.query_response_sender.poll_ready_unpin(cx).is_ready() { + if let Some(discovered) = self.discovered.pop_front() { + match self.query_response_sender.try_send(discovered) { + Ok(()) => {} + Err(e) if e.is_disconnected() => { + return Poll::Ready(()); + } + Err(e) => { + self.discovered.push_front(e.into_inner()); + } + } + + continue; + } } // 4th priority: Remote work: Answer incoming requests. @@ -238,7 +262,10 @@ where self.send_buffer.extend(build_query_response( query.query_id(), self.local_peer_id, - listen_addresses.iter(), + self.listen_addresses + .read() + .unwrap_or_else(|e| e.into_inner()) + .iter(), self.ttl, )); continue; From 0d009090c93054aa2c10aebee41be771d86ebfea Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 17:22:12 +1100 Subject: [PATCH 07/18] Update changelog --- protocols/mdns/CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 825649c5f12..f5a22538f79 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,7 +1,6 @@ ## 0.44.1 - unreleased -- Seal `Provider` trait. - Whilst technically a breaking change, this was never considered public API. +- Move IO off main behaviour task. See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX). ## 0.44.0 From 71ae247996bb183557b537ff43a7044f07841833 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 17:26:32 +1100 Subject: [PATCH 08/18] Update docs --- protocols/mdns/src/behaviour.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index fce07f55634..a329924aab5 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -158,6 +158,10 @@ where /// `None` if `discovered_nodes` is empty. closest_expiration: Option, + /// The current set of listen addresses. + /// + /// This is shared across all interface tasks using an [`RwLock`]. + /// The [`Behaviour`] updates this upon new [`FromSwarm`] events where as [`InterfaceState`]s read from it to answer inbound mDNS queries. listen_addresses: Arc>, local_peer_id: PeerId, From 831af15672ff97ab68be3d6d0d623032316467b1 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 17:28:37 +1100 Subject: [PATCH 09/18] Undo sealing --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/mdns/Cargo.toml | 2 +- protocols/mdns/src/behaviour.rs | 10 +--------- 4 files changed, 4 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b928b144b7..69e7c5dbe8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2707,7 +2707,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" -version = "0.44.1" +version = "0.45.0" dependencies = [ "async-io", "async-std", diff --git a/Cargo.toml b/Cargo.toml index 4bde5ea0e62..fbeeb5f7578 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.43.1", path = "protocols/identify" } libp2p-identity = { version = "0.2.5" } libp2p-kad = { version = "0.44.6", path = "protocols/kad" } -libp2p-mdns = { version = "0.44.1", path = "protocols/mdns" } +libp2p-mdns = { version = "0.45.0", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.1.0", path = "misc/memory-connection-limits" } libp2p-metrics = { version = "0.13.1", path = "misc/metrics" } libp2p-mplex = { version = "0.40.0", path = "muxers/mplex" } diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index d294c096f20..d568333a0d4 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-mdns" edition = "2021" rust-version = { workspace = true } -version = "0.44.1" +version = "0.45.0" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] license = "MIT" diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index a329924aab5..8538ca97ebb 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -44,7 +44,7 @@ use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time:: /// An abstraction to allow for compatibility with various async runtimes. #[doc(hidden)] -pub trait Provider: 'static + Sealed { +pub trait Provider: 'static { /// The Async Socket type. type Socket: AsyncSocket; /// The Async Timer type. @@ -60,14 +60,6 @@ pub trait Provider: 'static + Sealed { fn spawn(task: impl Future + Send + 'static) -> Self::TaskHandle; } -mod sealed { - pub trait Sealed {} - #[cfg(feature = "async-io")] - impl Sealed for super::async_io::AsyncIo {} - #[cfg(feature = "tokio")] - impl Sealed for super::tokio::Tokio {} -} - /// The type of a [`Behaviour`] using the `async-io` implementation. #[cfg(feature = "async-io")] pub mod async_io { From 83e2352c0c93e44fd50d3cc45318ab2ac7da5ad5 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 17:29:28 +1100 Subject: [PATCH 10/18] Update changelog --- protocols/mdns/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index f5a22538f79..2836cd1de0b 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.44.1 - unreleased +## 0.45.0 - unreleased - Move IO off main behaviour task. See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX). From 3d36593fedd3775eb8474585efe3ffa0881baa3b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 17:29:39 +1100 Subject: [PATCH 11/18] Fill in PR number --- protocols/mdns/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 2836cd1de0b..3b25ac6c9ee 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,7 +1,7 @@ ## 0.45.0 - unreleased - Move IO off main behaviour task. - See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX). + See [PR 4623](https://github.com/libp2p/rust-libp2p/pull/4623). ## 0.44.0 From 6e5aa122c7d51d757df14d7b49b67af5c53c826a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 17:39:49 +1100 Subject: [PATCH 12/18] Abort tasks for shut-down interfaces --- protocols/mdns/src/behaviour.rs | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 8538ca97ebb..7146f812a77 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -23,7 +23,6 @@ mod socket; mod timer; use self::iface::InterfaceState; -use crate::behaviour::sealed::Sealed; use crate::behaviour::{socket::AsyncSocket, timer::Builder}; use crate::Config; use futures::channel::mpsc; @@ -43,7 +42,6 @@ use std::sync::{Arc, RwLock}; use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time::Instant}; /// An abstraction to allow for compatibility with various async runtimes. -#[doc(hidden)] pub trait Provider: 'static { /// The Async Socket type. type Socket: AsyncSocket; @@ -52,7 +50,7 @@ pub trait Provider: 'static { /// The IfWatcher type. type Watcher: Stream> + fmt::Debug + Unpin; - type TaskHandle; + type TaskHandle: Abort; /// Create a new instance of the `IfWatcher` type. fn new_watcher() -> Result; @@ -60,11 +58,16 @@ pub trait Provider: 'static { fn spawn(task: impl Future + Send + 'static) -> Self::TaskHandle; } +#[allow(unreachable_pub)] // Not re-exported. +pub trait Abort { + fn abort(self); +} + /// The type of a [`Behaviour`] using the `async-io` implementation. #[cfg(feature = "async-io")] pub mod async_io { use super::Provider; - use crate::behaviour::{socket::asio::AsyncUdpSocket, timer::asio::AsyncTimer}; + use crate::behaviour::{socket::asio::AsyncUdpSocket, timer::asio::AsyncTimer, Abort}; use async_std::task::JoinHandle; use if_watch::smol::IfWatcher; use std::future::Future; @@ -87,6 +90,12 @@ pub mod async_io { } } + impl Abort for JoinHandle<()> { + fn abort(self) { + async_std::task::spawn(self.cancel()); + } + } + pub type Behaviour = super::Behaviour; } @@ -94,7 +103,7 @@ pub mod async_io { #[cfg(feature = "tokio")] pub mod tokio { use super::Provider; - use crate::behaviour::{socket::tokio::TokioUdpSocket, timer::tokio::TokioTimer}; + use crate::behaviour::{socket::tokio::TokioUdpSocket, timer::tokio::TokioTimer, Abort}; use if_watch::tokio::IfWatcher; use std::future::Future; use tokio::task::JoinHandle; @@ -117,6 +126,12 @@ pub mod tokio { } } + impl Abort for JoinHandle<()> { + fn abort(self) { + JoinHandle::abort(&self) + } + } + pub type Behaviour = super::Behaviour; } @@ -303,9 +318,10 @@ where } } Ok(IfEvent::Down(inet)) => { - if self.if_tasks.contains_key(&inet.addr()) { + if let Some(handle) = self.if_tasks.remove(&inet.addr()) { log::info!("dropping instance {}", inet.addr()); - self.if_tasks.remove(&inet.addr()); + + handle.abort(); } } Err(err) => log::error!("if watch returned an error: {}", err), From d904116e14f913d25e845c928fac665a1d3b27a0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 17:40:27 +1100 Subject: [PATCH 13/18] It is not a breaking change! --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/mdns/CHANGELOG.md | 2 +- protocols/mdns/Cargo.toml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69e7c5dbe8f..5b928b144b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2707,7 +2707,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" -version = "0.45.0" +version = "0.44.1" dependencies = [ "async-io", "async-std", diff --git a/Cargo.toml b/Cargo.toml index fbeeb5f7578..4bde5ea0e62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.43.1", path = "protocols/identify" } libp2p-identity = { version = "0.2.5" } libp2p-kad = { version = "0.44.6", path = "protocols/kad" } -libp2p-mdns = { version = "0.45.0", path = "protocols/mdns" } +libp2p-mdns = { version = "0.44.1", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.1.0", path = "misc/memory-connection-limits" } libp2p-metrics = { version = "0.13.1", path = "misc/metrics" } libp2p-mplex = { version = "0.40.0", path = "muxers/mplex" } diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 3b25ac6c9ee..8cdaf54d817 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.45.0 - unreleased +## 0.44.1 - unreleased - Move IO off main behaviour task. See [PR 4623](https://github.com/libp2p/rust-libp2p/pull/4623). diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index d568333a0d4..d294c096f20 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-mdns" edition = "2021" rust-version = { workspace = true } -version = "0.45.0" +version = "0.44.1" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] license = "MIT" From a631449fbcaa46a178038f461429df5552f9cc50 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 17:41:24 +1100 Subject: [PATCH 14/18] Update protocols/mdns/CHANGELOG.md --- protocols/mdns/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 8cdaf54d817..454e43b9ae4 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,6 +1,6 @@ ## 0.44.1 - unreleased -- Move IO off main behaviour task. +- Don't perform IO in `Behaviour::poll`. See [PR 4623](https://github.com/libp2p/rust-libp2p/pull/4623). ## 0.44.0 From 5efd0f7b39f81081672ca066c1b72d21d7741e4e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 11 Oct 2023 17:42:40 +1100 Subject: [PATCH 15/18] Reduce diff --- protocols/mdns/src/behaviour/iface.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 60fdb908955..803ea0e6ff1 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -18,6 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +mod dns; +mod query; + use self::dns::{build_query, build_query_response, build_service_discovery_response}; use self::query::MdnsPacket; use crate::behaviour::{socket::AsyncSocket, timer::Builder}; @@ -38,9 +41,6 @@ use std::{ time::{Duration, Instant}, }; -mod dns; -mod query; - /// Initial interval for starting probe const INITIAL_TIMEOUT_INTERVAL: Duration = Duration::from_millis(500); From 35e294650ee5b67530b251f8c71c52abc2b55767 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 15 Oct 2023 08:53:05 +1100 Subject: [PATCH 16/18] Implement `Future` --- misc/quick-protobuf-codec/src/lib.rs | 4 +- protocols/mdns/src/behaviour.rs | 6 +- protocols/mdns/src/behaviour/iface.rs | 92 +++++++++++++++------------ 3 files changed, 56 insertions(+), 46 deletions(-) diff --git a/misc/quick-protobuf-codec/src/lib.rs b/misc/quick-protobuf-codec/src/lib.rs index 04ee4980d3a..2d1fda99a70 100644 --- a/misc/quick-protobuf-codec/src/lib.rs +++ b/misc/quick-protobuf-codec/src/lib.rs @@ -31,10 +31,10 @@ impl Codec { } impl Encoder for Codec { - type Item = In; + type Item<'a> = In; type Error = Error; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> { let mut encoded_msg = Vec::new(); let mut writer = Writer::new(&mut encoded_msg); item.write_message(&mut writer) diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 7146f812a77..9e937272e8c 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -308,10 +308,8 @@ where self.listen_addresses.clone(), self.query_response_sender.clone(), ) { - Ok(mut iface_state) => { - e.insert(P::spawn(async move { - futures::future::poll_fn(move |cx| iface_state.poll(cx)).await; - })); + Ok(iface_state) => { + e.insert(P::spawn(iface_state)); } Err(err) => log::error!("failed to create `InterfaceState`: {}", err), } diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 803ea0e6ff1..47601088fdc 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -26,11 +26,12 @@ use self::query::MdnsPacket; use crate::behaviour::{socket::AsyncSocket, timer::Builder}; use crate::Config; use futures::channel::mpsc; -use futures::SinkExt; +use futures::{SinkExt, StreamExt}; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_swarm::ListenAddresses; use socket2::{Domain, Socket, Type}; +use std::future::Future; use std::sync::{Arc, RwLock}; use std::{ collections::VecDeque, @@ -188,58 +189,68 @@ where self.timeout = T::interval(interval); } - pub(crate) fn poll(&mut self, cx: &mut Context) -> Poll<()> { + fn mdns_socket(&self) -> SocketAddr { + SocketAddr::new(self.multicast_addr, 5353) + } +} + +impl Future for InterfaceState +where + U: AsyncSocket, + T: Builder + futures::Stream, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + loop { // 1st priority: Low latency: Create packet ASAP after timeout. - if Pin::new(&mut self.timeout).poll_next(cx).is_ready() { - log::trace!("sending query on iface {}", self.addr); - self.send_buffer.push_back(build_query()); - log::trace!("tick on {:#?} {:#?}", self.addr, self.probe_state); + if this.timeout.poll_next_unpin(cx).is_ready() { + log::trace!("sending query on iface {}", this.addr); + this.send_buffer.push_back(build_query()); + log::trace!("tick on {:#?} {:#?}", this.addr, this.probe_state); // Stop to probe when the initial interval reach the query interval - if let ProbeState::Probing(interval) = self.probe_state { + if let ProbeState::Probing(interval) = this.probe_state { let interval = interval * 2; - self.probe_state = if interval >= self.query_interval { - ProbeState::Finished(self.query_interval) + this.probe_state = if interval >= this.query_interval { + ProbeState::Finished(this.query_interval) } else { ProbeState::Probing(interval) }; } - self.reset_timer(); + this.reset_timer(); } // 2nd priority: Keep local buffers small: Send packets to remote. - if let Some(packet) = self.send_buffer.pop_front() { - match Pin::new(&mut self.send_socket).poll_write( - cx, - &packet, - SocketAddr::new(self.multicast_addr, 5353), - ) { + if let Some(packet) = this.send_buffer.pop_front() { + match this.send_socket.poll_write(cx, &packet, this.mdns_socket()) { Poll::Ready(Ok(_)) => { - log::trace!("sent packet on iface {}", self.addr); + log::trace!("sent packet on iface {}", this.addr); continue; } Poll::Ready(Err(err)) => { - log::error!("error sending packet on iface {} {}", self.addr, err); + log::error!("error sending packet on iface {} {}", this.addr, err); continue; } Poll::Pending => { - self.send_buffer.push_front(packet); + this.send_buffer.push_front(packet); } } } // 3rd priority: Keep local buffers small: Return discovered addresses. - if self.query_response_sender.poll_ready_unpin(cx).is_ready() { - if let Some(discovered) = self.discovered.pop_front() { - match self.query_response_sender.try_send(discovered) { + if this.query_response_sender.poll_ready_unpin(cx).is_ready() { + if let Some(discovered) = this.discovered.pop_front() { + match this.query_response_sender.try_send(discovered) { Ok(()) => {} Err(e) if e.is_disconnected() => { return Poll::Ready(()); } Err(e) => { - self.discovered.push_front(e.into_inner()); + this.discovered.push_front(e.into_inner()); } } @@ -248,25 +259,26 @@ where } // 4th priority: Remote work: Answer incoming requests. - match Pin::new(&mut self.recv_socket) - .poll_read(cx, &mut self.recv_buffer) - .map_ok(|(len, from)| MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from)) + match this + .recv_socket + .poll_read(cx, &mut this.recv_buffer) + .map_ok(|(len, from)| MdnsPacket::new_from_bytes(&this.recv_buffer[..len], from)) { Poll::Ready(Ok(Ok(Some(MdnsPacket::Query(query))))) => { log::trace!( "received query from {} on {}", query.remote_addr(), - self.addr + this.addr ); - self.send_buffer.extend(build_query_response( + this.send_buffer.extend(build_query_response( query.query_id(), - self.local_peer_id, - self.listen_addresses + this.local_peer_id, + this.listen_addresses .read() .unwrap_or_else(|e| e.into_inner()) .iter(), - self.ttl, + this.ttl, )); continue; } @@ -274,16 +286,16 @@ where log::trace!( "received response from {} on {}", response.remote_addr(), - self.addr + this.addr ); - self.discovered - .extend(response.extract_discovered(Instant::now(), self.local_peer_id)); + this.discovered + .extend(response.extract_discovered(Instant::now(), this.local_peer_id)); // Stop probing when we have a valid response - if !self.discovered.is_empty() { - self.probe_state = ProbeState::Finished(self.query_interval); - self.reset_timer(); + if !this.discovered.is_empty() { + this.probe_state = ProbeState::Finished(this.query_interval); + this.reset_timer(); } continue; } @@ -291,11 +303,11 @@ where log::trace!( "received service discovery from {} on {}", disc.remote_addr(), - self.addr + this.addr ); - self.send_buffer - .push_back(build_service_discovery_response(disc.query_id(), self.ttl)); + this.send_buffer + .push_back(build_service_discovery_response(disc.query_id(), this.ttl)); continue; } Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => { From 9dde2b4c3ffdab22e08f171915b0ef94f14f7f99 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 15 Oct 2023 09:14:25 +1100 Subject: [PATCH 17/18] Undo unrelated changes --- misc/quick-protobuf-codec/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/misc/quick-protobuf-codec/src/lib.rs b/misc/quick-protobuf-codec/src/lib.rs index 2d1fda99a70..04ee4980d3a 100644 --- a/misc/quick-protobuf-codec/src/lib.rs +++ b/misc/quick-protobuf-codec/src/lib.rs @@ -31,10 +31,10 @@ impl Codec { } impl Encoder for Codec { - type Item<'a> = In; + type Item = In; type Error = Error; - fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { let mut encoded_msg = Vec::new(); let mut writer = Writer::new(&mut encoded_msg); item.write_message(&mut writer) From 029c38fad4318af7ed804234cc5423a0f6d6116c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 15 Oct 2023 09:20:34 +1100 Subject: [PATCH 18/18] Bump to breaking change --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/mdns/CHANGELOG.md | 2 +- protocols/mdns/Cargo.toml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3577d85aeae..4802fe6cdc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2707,7 +2707,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" -version = "0.44.1" +version = "0.45.0" dependencies = [ "async-io", "async-std", diff --git a/Cargo.toml b/Cargo.toml index fd5c7d99729..92639920ba9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.43.1", path = "protocols/identify" } libp2p-identity = { version = "0.2.5" } libp2p-kad = { version = "0.44.6", path = "protocols/kad" } -libp2p-mdns = { version = "0.44.1", path = "protocols/mdns" } +libp2p-mdns = { version = "0.45.0", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.1.0", path = "misc/memory-connection-limits" } libp2p-metrics = { version = "0.13.1", path = "misc/metrics" } libp2p-mplex = { version = "0.40.0", path = "muxers/mplex" } diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 454e43b9ae4..060fac8c51c 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.44.1 - unreleased +## 0.45.0 - unreleased - Don't perform IO in `Behaviour::poll`. See [PR 4623](https://github.com/libp2p/rust-libp2p/pull/4623). diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index ddc4e5903f4..a8b6a7da1b9 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-mdns" edition = "2021" rust-version = { workspace = true } -version = "0.44.1" +version = "0.45.0" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] license = "MIT"