From 766164e6abf9503d48186aee99148053d7533639 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 24 Jan 2025 12:10:39 +0100 Subject: [PATCH] Use `n0_future` and remove `task` module in favor for it --- Cargo.lock | 3 +- iroh-net-report/Cargo.toml | 1 + iroh-net-report/src/defaults.rs | 2 +- iroh-net-report/src/dns.rs | 2 +- iroh-net-report/src/lib.rs | 9 +- iroh-net-report/src/ping.rs | 2 +- iroh-net-report/src/reportgen.rs | 6 +- iroh-net-report/src/reportgen/hairpin.rs | 5 +- iroh-net-report/src/reportgen/probes.rs | 2 +- iroh-net-report/src/task.rs | 322 ----------------------- 10 files changed, 17 insertions(+), 337 deletions(-) delete mode 100644 iroh-net-report/src/task.rs diff --git a/Cargo.lock b/Cargo.lock index 63b1dd1f0d..1f79f7cb1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2030,7 +2030,6 @@ dependencies = [ "axum", "backoff", "bytes", - "cfg_aliases", "clap", "concurrent-queue", "crypto_box", @@ -2083,6 +2082,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-stream", + "tokio-tungstenite", "tokio-tungstenite-wasm", "tokio-util", "tracing", @@ -2221,6 +2221,7 @@ dependencies = [ "iroh-relay", "iroh-test 0.31.0", "js-sys", + "n0-future", "netwatch", "pin-project", "portmapper", diff --git a/iroh-net-report/Cargo.toml b/iroh-net-report/Cargo.toml index ca6dd979c2..942e68d727 100644 --- a/iroh-net-report/Cargo.toml +++ b/iroh-net-report/Cargo.toml @@ -24,6 +24,7 @@ futures-lite = "2.3" iroh-base = { version = "0.31.0", path = "../iroh-base", default-features = false, features = ["relay"] } iroh-metrics = { version = "0.31", default-features = false } iroh-relay = { version = "0.31", path = "../iroh-relay", default-features = false } +n0-future = "0.0.1" quinn = { package = "iroh-quinn", version = "0.12.0", default-features = false } rand = "0.8" reqwest = { version = "0.12", default-features = false, features = ["stream"] } diff --git a/iroh-net-report/src/defaults.rs b/iroh-net-report/src/defaults.rs index ae3b27fac8..66f2709711 100644 --- a/iroh-net-report/src/defaults.rs +++ b/iroh-net-report/src/defaults.rs @@ -2,7 +2,7 @@ /// Contains all timeouts that we use in `iroh-net-report`. pub(crate) mod timeouts { - use iroh_relay::time::Duration; + use n0_future::time::Duration; // Timeouts for net_report diff --git a/iroh-net-report/src/dns.rs b/iroh-net-report/src/dns.rs index 1dcf665166..6c776e5304 100644 --- a/iroh-net-report/src/dns.rs +++ b/iroh-net-report/src/dns.rs @@ -3,7 +3,7 @@ use std::{fmt::Write, net::IpAddr}; use anyhow::Result; use futures_lite::{Future, StreamExt}; use hickory_resolver::{IntoName, TokioResolver}; -use iroh_relay::time; +use n0_future::time; use crate::defaults::timeouts::DNS_TIMEOUT; diff --git a/iroh-net-report/src/lib.rs b/iroh-net-report/src/lib.rs index bd32182e14..9d485f5d66 100644 --- a/iroh-net-report/src/lib.rs +++ b/iroh-net-report/src/lib.rs @@ -16,7 +16,6 @@ use std::{ sync::Arc, }; -use crate::task::AbortOnDropHandle; use anyhow::{anyhow, Result}; use bytes::Bytes; #[cfg(not(wasm_browser))] @@ -24,10 +23,10 @@ use hickory_resolver::TokioResolver as DnsResolver; use iroh_base::RelayUrl; #[cfg(feature = "metrics")] use iroh_metrics::inc; -use iroh_relay::{ - protos::stun, +use iroh_relay::{protos::stun, RelayMap}; +use n0_future::{ + task::{self, AbortOnDropHandle}, time::{self, Duration, Instant}, - RelayMap, }; #[cfg(not(wasm_browser))] use netwatch::UdpSocket; @@ -42,8 +41,6 @@ mod metrics; mod ping; mod reportgen; -pub mod task; - pub use metrics::Metrics; use reportgen::ProbeProto; pub use reportgen::QuicConfig; diff --git a/iroh-net-report/src/ping.rs b/iroh-net-report/src/ping.rs index 50ba3ebfe3..0e3bde904b 100644 --- a/iroh-net-report/src/ping.rs +++ b/iroh-net-report/src/ping.rs @@ -7,7 +7,7 @@ use std::{ }; use anyhow::{Context, Result}; -use iroh_relay::time::Duration; +use n0_future::time::Duration; use surge_ping::{Client, Config, IcmpPacket, PingIdentifier, PingSequence, ICMP}; use tracing::debug; diff --git a/iroh-net-report/src/reportgen.rs b/iroh-net-report/src/reportgen.rs index c4165fcd39..729ec6c4bd 100644 --- a/iroh-net-report/src/reportgen.rs +++ b/iroh-net-report/src/reportgen.rs @@ -25,7 +25,6 @@ use std::{ task::{Context, Poll}, }; -use crate::task::{self, AbortOnDropHandle, JoinSet}; use anyhow::{anyhow, bail, Context as _, Result}; use futures_lite::StreamExt as _; #[cfg(not(wasm_browser))] @@ -37,9 +36,12 @@ use iroh_relay::{ defaults::{DEFAULT_RELAY_QUIC_PORT, DEFAULT_STUN_PORT}, http::RELAY_PROBE_PATH, protos::stun, - time::{self, Duration, Instant}, RelayMap, RelayNode, }; +use n0_future::{ + task::{self, AbortOnDropHandle, JoinSet}, + time::{self, Duration, Instant}, +}; #[cfg(not(wasm_browser))] use netwatch::{interfaces, UdpSocket}; use rand::seq::IteratorRandom; diff --git a/iroh-net-report/src/reportgen/hairpin.rs b/iroh-net-report/src/reportgen/hairpin.rs index 466f06e6cc..feb1cb3909 100644 --- a/iroh-net-report/src/reportgen/hairpin.rs +++ b/iroh-net-report/src/reportgen/hairpin.rs @@ -16,6 +16,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use anyhow::{bail, Context, Result}; use iroh_relay::protos::stun; +use n0_future::task; use netwatch::UdpSocket; use tokio::sync::oneshot; use tokio_util::task::AbortOnDropHandle; @@ -24,7 +25,7 @@ use tracing::{debug, error, info_span, trace, warn, Instrument}; use crate::{ self as net_report, defaults::timeouts::HAIRPIN_CHECK_TIMEOUT, - reportgen, task, + reportgen, time::{self, Instant}, Inflight, }; @@ -180,7 +181,7 @@ impl Actor { #[cfg(test)] mod tests { use bytes::BytesMut; - use iroh_relay::time::Duration; + use n0_future::time::Duration; use tokio::sync::mpsc; use tracing::info; diff --git a/iroh-net-report/src/reportgen/probes.rs b/iroh-net-report/src/reportgen/probes.rs index c7facb03d1..ee8415453c 100644 --- a/iroh-net-report/src/reportgen/probes.rs +++ b/iroh-net-report/src/reportgen/probes.rs @@ -9,11 +9,11 @@ use std::{collections::BTreeSet, fmt, sync::Arc}; use anyhow::{ensure, Result}; use iroh_base::RelayUrl; use iroh_relay::{RelayMap, RelayNode}; +use n0_future::time::Duration; #[cfg(not(wasm_browser))] use netwatch::interfaces; use crate::Report; -use iroh_relay::time::Duration; /// The retransmit interval used when net_report first runs. /// diff --git a/iroh-net-report/src/task.rs b/iroh-net-report/src/task.rs deleted file mode 100644 index 4f6dae5a7c..0000000000 --- a/iroh-net-report/src/task.rs +++ /dev/null @@ -1,322 +0,0 @@ -//! Async rust task spawning and utilities that work natively (using tokio) and in browsers -//! (using wasm-bindgen-futures). - -#[cfg(not(wasm_browser))] -pub use tokio::spawn; -#[cfg(not(wasm_browser))] -pub use tokio::task::{JoinError, JoinHandle, JoinSet}; -#[cfg(not(wasm_browser))] -pub use tokio_util::task::AbortOnDropHandle; - -#[cfg(wasm_browser)] -pub use wasm::*; - -#[cfg(wasm_browser)] -mod wasm { - use std::{ - cell::RefCell, - fmt::Debug, - future::{Future, IntoFuture}, - pin::Pin, - rc::Rc, - task::{Context, Poll, Waker}, - }; - - use futures_lite::stream::StreamExt; - use wasm_bindgen::prelude::wasm_bindgen; - - /// Wasm shim for tokio's `JoinSet`. - /// - /// Uses a `futures_buffered::FuturesUnordered` queue of - /// `JoinHandle`s inside. - pub struct JoinSet { - handles: futures_buffered::FuturesUnordered>, - // We need to keep a second list of JoinHandles so we can access them for cancellation - to_cancel: Vec>, - } - - impl Default for JoinSet { - fn default() -> Self { - Self::new() - } - } - - impl JoinSet { - /// Creates a new, empty `JoinSet` - pub fn new() -> Self { - Self { - handles: futures_buffered::FuturesUnordered::new(), - to_cancel: Vec::new(), - } - } - - /// Spawns a task into this `JoinSet`. - /// - /// (Doesn't return an `AbortHandle` unlike the original `tokio::task::JoinSet` yet.) - pub fn spawn(&mut self, fut: impl IntoFuture + 'static) - where - T: 'static, - { - let handle = JoinHandle::new(); - let handle_for_spawn = JoinHandle { - task: handle.task.clone(), - }; - let handle_for_cancel = JoinHandle { - task: handle.task.clone(), - }; - - wasm_bindgen_futures::spawn_local(SpawnFuture { - handle: handle_for_spawn, - fut: fut.into_future(), - }); - - self.handles.push(handle); - self.to_cancel.push(handle_for_cancel); - } - - /// Aborts all tasks inside this `JoinSet` - pub fn abort_all(&self) { - self.to_cancel.iter().for_each(JoinHandle::abort); - } - - /// Awaits the next `JoinSet`'s completion. - /// - /// If you `.spawn` a new task onto this `JoinSet` while the future - /// returned from this is currently pending, then this future will - /// continue to be pending, even if the newly spawned future is already - /// finished. - /// - /// TODO(matheus23): Fix this limitation. - /// - /// Current work around is to recreate the `join_next` future when - /// you newly spawned a task onto it. This seems to be the usual way - /// the `JoinSet` is used *most of the time* in the iroh codebase anyways. - pub async fn join_next(&mut self) -> Option> { - futures_lite::future::poll_fn(|cx| { - let ret = self.handles.poll_next(cx); - match &ret { - Poll::Pending => tracing::info!("polled JoinSet::join_next (pending)"), - Poll::Ready(None) => tracing::info!("polled JoinSet::join_next: None"), - Poll::Ready(Some(Ok(_))) => { - tracing::info!("polled JoinSet::join_next: Some(Ok(_))") - } - Poll::Ready(Some(Err(e))) => { - tracing::info!("polled JoinSet::join_next: Some(Err({e:?}))") - } - } - // clean up handles that are either cancelled or have finished - self.to_cancel.retain(JoinHandle::is_running); - ret - }) - .await - } - - /// Returns whether there's any tasks that are either still running or - /// have pending results in this `JoinSet`. - pub fn is_empty(&self) -> bool { - self.handles.is_empty() - } - - /// Returns the amount of tasks that are either still running or have - /// pending results in this `JoinSet`. - pub fn len(&self) -> usize { - self.handles.len() - } - } - - impl Drop for JoinSet { - fn drop(&mut self) { - self.abort_all() - } - } - - /// A handle to a spawned task. - pub struct JoinHandle { - task: Rc>>, - } - - struct Task { - cancelled: bool, - completed: bool, - waker_handler: Option, - waker_spawn_fn: Option, - result: Option, - } - - impl Task { - fn cancel(&mut self) { - if !self.cancelled { - self.cancelled = true; - self.wake(); - } - } - - fn complete(&mut self, value: T) { - self.result = Some(value); - self.completed = true; - self.wake(); - } - - fn wake(&mut self) { - if let Some(waker) = self.waker_handler.take() { - waker.wake(); - } - if let Some(waker) = self.waker_spawn_fn.take() { - waker.wake(); - } - } - - fn register_handler(&mut self, cx: &mut Context<'_>) { - match self.waker_handler { - // clone_from can be marginally faster in some cases - Some(ref mut waker) => waker.clone_from(cx.waker()), - None => self.waker_handler = Some(cx.waker().clone()), - } - } - - fn register_spawn_fn(&mut self, cx: &mut Context<'_>) { - match self.waker_spawn_fn { - // clone_from can be marginally faster in some cases - Some(ref mut waker) => waker.clone_from(cx.waker()), - None => self.waker_spawn_fn = Some(cx.waker().clone()), - } - } - } - - impl JoinHandle { - fn new() -> Self { - Self { - task: Rc::new(RefCell::new(Task { - cancelled: false, - completed: false, - waker_handler: None, - waker_spawn_fn: None, - result: None, - })), - } - } - - /// Aborts this task. - pub fn abort(&self) { - self.task.borrow_mut().cancel(); - } - - fn is_running(&self) -> bool { - let task = self.task.borrow(); - !task.cancelled && !task.completed - } - } - - /// An error that can occur when waiting for the completion of a task. - #[derive(derive_more::Display, Debug, Clone, Copy)] - pub enum JoinError { - /// The error that's returned when the task that's being waited on - /// has been cancelled. - #[display("task was cancelled")] - Cancelled, - } - - impl Future for JoinHandle { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - tracing::info!("JoinHandle::poll"); - let mut task = self.task.borrow_mut(); - if task.cancelled { - tracing::info!("JoinHandle::poll: cancelled"); - return Poll::Ready(Err(JoinError::Cancelled)); - } - - if let Some(result) = task.result.take() { - tracing::info!("JoinHandle::poll: Ready(Ok(_))"); - return Poll::Ready(Ok(result)); - } - - tracing::info!("JoinHandle::poll: Pending"); - task.register_handler(cx); - Poll::Pending - } - } - - #[pin_project::pin_project] - struct SpawnFuture, T> { - handle: JoinHandle, - #[pin] - fut: Fut, - } - - impl, T> Future for SpawnFuture { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let mut task = this.handle.task.borrow_mut(); - - if task.cancelled { - return Poll::Ready(()); - } - - match this.fut.poll(cx) { - Poll::Ready(value) => { - tracing::info!("waking up"); - task.complete(value); - Poll::Ready(()) - } - Poll::Pending => { - task.register_spawn_fn(cx); - Poll::Pending - } - } - } - } - - /// Similar to a `JoinHandle`, except it automatically aborts - /// the task when it's dropped. - #[pin_project::pin_project(PinnedDrop)] - #[derive(derive_more::Debug)] - #[debug("AbortOnDropHandle")] - pub struct AbortOnDropHandle(#[pin] JoinHandle); - - #[pin_project::pinned_drop] - impl PinnedDrop for AbortOnDropHandle { - fn drop(self: Pin<&mut Self>) { - self.0.abort(); - } - } - - impl Future for AbortOnDropHandle { - type Output = as Future>::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().0.poll(cx) - } - } - - impl AbortOnDropHandle { - /// Converts a `JoinHandle` into one that aborts on drop. - pub fn new(task: JoinHandle) -> Self { - Self(task) - } - } - - /// Spawns a future as a task in the browser runtime. - /// - /// This is powered by `wasm_bidngen_futures`. - pub fn spawn(fut: impl IntoFuture + 'static) -> JoinHandle { - let handle = JoinHandle::new(); - - wasm_bindgen_futures::spawn_local(SpawnFuture { - handle: JoinHandle { - task: handle.task.clone(), - }, - fut: fut.into_future(), - }); - - handle - } -} - -#[cfg(test)] -mod test { - // TODO(matheus23): Test wasm shims using wasm-bindgen-test -}