diff --git a/Cargo.lock b/Cargo.lock index b7806d1027..ecf069a373 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -335,7 +335,6 @@ dependencies = [ "instant", "pin-project-lite", "rand", - "tokio", ] [[package]] @@ -1998,6 +1997,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" dependencies = [ "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", ] [[package]] @@ -2027,9 +2029,11 @@ version = "0.31.0" dependencies = [ "aead", "anyhow", + "atomic-waker", "axum", "backoff", "bytes", + "cfg_aliases", "clap", "concurrent-queue", "crypto_box", @@ -2037,18 +2041,11 @@ dependencies = [ "der", "derive_more", "ed25519-dalek", - "futures-buffered", - "futures-lite", - "futures-sink", - "futures-util", - "governor 0.7.0", "hickory-resolver", "http 1.2.0", - "http-body-util", - "hyper", - "hyper-util", "igd-next", "indicatif", + "instant", "iroh-base", "iroh-metrics", "iroh-net-report", @@ -2057,6 +2054,7 @@ dependencies = [ "iroh-quinn-udp", "iroh-relay", "iroh-test 0.31.0", + "n0-future", "netdev", "netwatch", "parse-size", @@ -2079,13 +2077,14 @@ dependencies = [ "swarm-discovery", "testresult", "thiserror 2.0.11", + "time", "tokio", - "tokio-rustls", "tokio-stream", "tokio-util", "tracing", "tracing-subscriber", "url", + "wasm-bindgen-futures", "webpki-roots", "x509-parser", "z32", @@ -2119,11 +2118,11 @@ dependencies = [ "anyhow", "bytes", "clap", - "futures-lite", "hdrhistogram", "iroh", "iroh-metrics", "iroh-quinn", + "n0-future", "rcgen", "rustls", "tokio", @@ -2145,7 +2144,6 @@ dependencies = [ "criterion", "derive_more", "dirs-next", - "futures-lite", "governor 0.6.3", "hickory-resolver", "hickory-server", @@ -2155,6 +2153,7 @@ dependencies = [ "iroh-metrics", "iroh-test 0.31.0", "lru", + "n0-future", "pkarr", "rand", "rand_chacha", @@ -2207,8 +2206,8 @@ version = "0.31.0" dependencies = [ "anyhow", "bytes", + "cfg_aliases", "derive_more", - "futures-buffered", "futures-lite", "hickory-resolver", "iroh-base", @@ -2216,6 +2215,7 @@ dependencies = [ "iroh-quinn", "iroh-relay", "iroh-test 0.31.0", + "n0-future", "netwatch", "portmapper", "pretty_assertions", @@ -2634,15 +2634,14 @@ dependencies = [ [[package]] name = "n0-future" -version = "0.0.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7d4800d43db24b202e42bb129710ee5f809d46345d2e31ed702d5c6ed02601e" +checksum = "399e11dc3b0e8d9d65b27170d22f5d779d52d9bed888db70d7e0c2c7ce3dfc52" dependencies = [ "cfg_aliases", "derive_more", "futures-buffered", "futures-lite", - "futures-sink", "futures-util", "js-sys", "pin-project", @@ -3720,11 +3719,13 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-rustls", + "tokio-util", "tower 0.5.2", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots", "windows-registry", @@ -4557,6 +4558,7 @@ checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" dependencies = [ "deranged", "itoa", + "js-sys", "libc", "num-conv", "num_threads", @@ -5193,6 +5195,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.77" diff --git a/iroh-dns-server/Cargo.toml b/iroh-dns-server/Cargo.toml index d9f05da4c8..32c796920e 100644 --- a/iroh-dns-server/Cargo.toml +++ b/iroh-dns-server/Cargo.toml @@ -24,13 +24,13 @@ derive_more = { version = "1.0.0", features = [ "from", ] } dirs-next = "2.0.0" -futures-lite = "2.5" governor = "0.6.3" #needs new release of tower_governor for 0.7.0 hickory-server = { version = "=0.25.0-alpha.4", features = ["dns-over-rustls", "dns-over-https-rustls"] } http = "1.0.0" humantime-serde = "1.1.1" iroh-metrics = { version = "0.31.0" } lru = "0.12.3" +n0-future = "0.1.2" pkarr = { version = "2.3.1", features = [ "async", "relay", "dht"], default-features = false } rcgen = "0.13" redb = "2.0.0" diff --git a/iroh-dns-server/benches/write.rs b/iroh-dns-server/benches/write.rs index e076c16c13..ea2f928efc 100644 --- a/iroh-dns-server/benches/write.rs +++ b/iroh-dns-server/benches/write.rs @@ -1,6 +1,6 @@ use anyhow::Result; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use iroh::{discovery::pkarr::PkarrRelayClient, dns::node_info::NodeInfo, SecretKey}; +use iroh::{discovery::pkarr::PkarrRelayClient, node_info::NodeInfo, SecretKey}; use iroh_dns_server::{config::Config, server::Server, ZoneStore}; use rand_chacha::rand_core::SeedableRng; use tokio::runtime::Runtime; diff --git a/iroh-dns-server/examples/publish.rs b/iroh-dns-server/examples/publish.rs index 660450f85f..83598953f2 100644 --- a/iroh-dns-server/examples/publish.rs +++ b/iroh-dns-server/examples/publish.rs @@ -7,7 +7,7 @@ use iroh::{ dns::{N0_DNS_NODE_ORIGIN_PROD, N0_DNS_NODE_ORIGIN_STAGING}, pkarr::{PkarrRelayClient, N0_DNS_PKARR_RELAY_PROD, N0_DNS_PKARR_RELAY_STAGING}, }, - dns::node_info::{to_z32, NodeInfo, IROH_TXT_NAME}, + node_info::{to_z32, NodeInfo, IROH_TXT_NAME}, NodeId, SecretKey, }; use url::Url; diff --git a/iroh-dns-server/examples/resolve.rs b/iroh-dns-server/examples/resolve.rs index 90fdc2d5f0..0540d444a4 100644 --- a/iroh-dns-server/examples/resolve.rs +++ b/iroh-dns-server/examples/resolve.rs @@ -8,7 +8,8 @@ use hickory_resolver::{ }; use iroh::{ discovery::dns::{N0_DNS_NODE_ORIGIN_PROD, N0_DNS_NODE_ORIGIN_STAGING}, - dns::{node_info::TxtAttrs, DnsResolver}, + dns::DnsResolver, + node_info::TxtAttrs, NodeId, }; diff --git a/iroh-dns-server/src/http/tls.rs b/iroh-dns-server/src/http/tls.rs index f1a78fe2e3..a151e9ad65 100644 --- a/iroh-dns-server/src/http/tls.rs +++ b/iroh-dns-server/src/http/tls.rs @@ -10,7 +10,7 @@ use axum_server::{ accept::Accept, tls_rustls::{RustlsAcceptor, RustlsConfig}, }; -use futures_lite::{future::Boxed as BoxFuture, FutureExt}; +use n0_future::{future::Boxed as BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_rustls_acme::{axum::AxumAcceptor, caches::DirCache, AcmeConfig}; diff --git a/iroh-dns-server/src/lib.rs b/iroh-dns-server/src/lib.rs index d69b8a5ea4..efcbe761fd 100644 --- a/iroh-dns-server/src/lib.rs +++ b/iroh-dns-server/src/lib.rs @@ -29,7 +29,8 @@ mod tests { use hickory_server::proto::xfer::Protocol; use iroh::{ discovery::pkarr::PkarrRelayClient, - dns::{node_info::NodeInfo, DnsResolver, ResolverExt}, + dns::{DnsResolver, ResolverExt}, + node_info::NodeInfo, SecretKey, }; use pkarr::{PkarrClient, SignedPacket}; diff --git a/iroh-net-report/Cargo.toml b/iroh-net-report/Cargo.toml index 23e0bd3648..5deb3b7199 100644 --- a/iroh-net-report/Cargo.toml +++ b/iroh-net-report/Cargo.toml @@ -19,32 +19,39 @@ workspace = true anyhow = "1" bytes = "1.7" derive_more = { version = "1.0.0", features = ["display"] } -futures-buffered = "0.2.8" -futures-lite = "2.3" -hickory-resolver = "=0.25.0-alpha.4" iroh-base = { version = "0.31.0", path = "../iroh-base", default-features = false, features = ["relay"] } iroh-metrics = { version = "0.31", default-features = false } -iroh-relay = { version = "0.31", path = "../iroh-relay" } -netwatch = { version = "0.3" } -portmapper = { version = "0.3", default-features = false } -quinn = { package = "iroh-quinn", version = "0.13.0" } +iroh-relay = { version = "0.31", path = "../iroh-relay", default-features = false } +n0-future = "0.1.2" +quinn = { package = "iroh-quinn", version = "0.13.0", default-features = false } rand = "0.8" -reqwest = { version = "0.12", default-features = false } +reqwest = { version = "0.12", default-features = false, features = ["stream"] } rustls = { version = "0.23", default-features = false } -surge-ping = "0.8.0" thiserror = "2" tokio = { version = "1", default-features = false, features = ["sync", "time", "macros", "rt"] } tokio-util = { version = "0.7.12", default-features = false } tracing = "0.1" url = { version = "2.4" } +# non-wasm-in-browser dependencies +[target.'cfg(not(all(target_family = "wasm", target_os = "unknown")))'.dependencies] +hickory-resolver = "=0.25.0-alpha.4" +netwatch = { version = "0.3" } +portmapper = { version = "0.3", default-features = false } +surge-ping = "0.8.0" + [dev-dependencies] +futures-lite = "2.3" iroh-relay = { path = "../iroh-relay", features = ["test-utils", "server"] } iroh-test = { path = "../iroh-test" } pretty_assertions = "1.4" +quinn = { package = "iroh-quinn", version = "0.13.0" } testresult = "0.4.0" tokio = { version = "1", default-features = false, features = ["test-util"] } +[build-dependencies] +cfg_aliases = { version = "0.2" } + [features] default = ["metrics"] metrics = ["iroh-metrics/metrics", "portmapper/metrics"] diff --git a/iroh-net-report/build.rs b/iroh-net-report/build.rs new file mode 100644 index 0000000000..7aae56820c --- /dev/null +++ b/iroh-net-report/build.rs @@ -0,0 +1,9 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + // Setup cfg aliases + cfg_aliases! { + // Convenience aliases + wasm_browser: { all(target_family = "wasm", target_os = "unknown") }, + } +} diff --git a/iroh-net-report/src/defaults.rs b/iroh-net-report/src/defaults.rs index 6ac78db120..66f2709711 100644 --- a/iroh-net-report/src/defaults.rs +++ b/iroh-net-report/src/defaults.rs @@ -2,7 +2,7 @@ /// Contains all timeouts that we use in `iroh-net-report`. pub(crate) mod timeouts { - use std::time::Duration; + use n0_future::time::Duration; // Timeouts for net_report diff --git a/iroh-net-report/src/dns.rs b/iroh-net-report/src/dns.rs index 325d4bbf83..659b6d6d3f 100644 --- a/iroh-net-report/src/dns.rs +++ b/iroh-net-report/src/dns.rs @@ -1,8 +1,8 @@ use std::{fmt::Write, net::IpAddr}; use anyhow::Result; -use futures_lite::{Future, StreamExt}; use hickory_resolver::{IntoName, TokioResolver}; +use n0_future::{time, Future, StreamExt}; use crate::defaults::timeouts::DNS_TIMEOUT; @@ -66,12 +66,12 @@ pub(crate) trait ResolverExt { impl ResolverExt for TokioResolver { async fn lookup_ipv4(&self, host: N) -> Result> { - let addrs = tokio::time::timeout(DNS_TIMEOUT, self.ipv4_lookup(host)).await??; + let addrs = time::timeout(DNS_TIMEOUT, self.ipv4_lookup(host)).await??; Ok(addrs.into_iter().map(|ip| IpAddr::V4(ip.0))) } async fn lookup_ipv6(&self, host: N) -> Result> { - let addrs = tokio::time::timeout(DNS_TIMEOUT, self.ipv6_lookup(host)).await??; + let addrs = time::timeout(DNS_TIMEOUT, self.ipv6_lookup(host)).await??; Ok(addrs.into_iter().map(|ip| IpAddr::V6(ip.0))) } @@ -150,14 +150,14 @@ async fn stagger_call Fut, Fut: Future>>( f: F, delays_ms: &[u64], ) -> Result { - let mut calls = futures_buffered::FuturesUnorderedBounded::new(delays_ms.len() + 1); + let mut calls = n0_future::FuturesUnorderedBounded::new(delays_ms.len() + 1); // NOTE: we add the 0 delay here to have a uniform set of futures. This is more performant than // using alternatives that allow futures of different types. for delay in std::iter::once(&0u64).chain(delays_ms) { - let delay = std::time::Duration::from_millis(*delay); + let delay = time::Duration::from_millis(*delay); let fut = f(); let staggered_fut = async move { - tokio::time::sleep(delay).await; + time::sleep(delay).await; fut.await }; calls.push(staggered_fut) diff --git a/iroh-net-report/src/lib.rs b/iroh-net-report/src/lib.rs index 4e309aff34..6bdcea7b00 100644 --- a/iroh-net-report/src/lib.rs +++ b/iroh-net-report/src/lib.rs @@ -18,22 +18,26 @@ use std::{ use anyhow::{anyhow, Result}; use bytes::Bytes; +#[cfg(not(wasm_browser))] use hickory_resolver::TokioResolver as DnsResolver; use iroh_base::RelayUrl; #[cfg(feature = "metrics")] use iroh_metrics::inc; use iroh_relay::{protos::stun, RelayMap}; -use netwatch::UdpSocket; -use tokio::{ - sync::{self, mpsc, oneshot}, +use n0_future::{ + task::{self, AbortOnDropHandle}, time::{Duration, Instant}, }; -use tokio_util::task::AbortOnDropHandle; +#[cfg(not(wasm_browser))] +use netwatch::UdpSocket; +use tokio::sync::{self, mpsc, oneshot}; use tracing::{debug, error, info_span, trace, warn, Instrument}; mod defaults; +#[cfg(not(wasm_browser))] mod dns; mod metrics; +#[cfg(not(wasm_browser))] mod ping; mod reportgen; @@ -86,6 +90,7 @@ pub struct Report { /// public IP address (on IPv4). pub hair_pinning: Option, /// Probe indicating the presence of port mapping protocols on the LAN. + #[cfg(not(wasm_browser))] pub portmap_probe: Option, /// `None` for unknown pub preferred_relay: Option, @@ -224,6 +229,7 @@ pub struct Options { /// other packets from in the magicsocket (`MagicSock`). /// /// If not provided, STUN probes will not be sent over IPv4. + #[cfg(not(wasm_browser))] stun_sock_v4: Option>, /// Socket to send IPv6 STUN probes from. /// @@ -232,18 +238,22 @@ pub struct Options { /// other packets from in the magicsocket (`MagicSock`). /// /// If not provided, STUN probes will not be sent over IPv6. + #[cfg(not(wasm_browser))] stun_sock_v6: Option>, /// The configuration needed to launch QUIC address discovery probes. /// /// If not provided, will not run QUIC address discovery. + #[cfg(not(wasm_browser))] quic_config: Option, /// Enable icmp_v4 probes /// /// On by default + #[cfg(not(wasm_browser))] icmp_v4: bool, /// Enable icmp_v6 probes /// /// On by default + #[cfg(not(wasm_browser))] icmp_v6: bool, /// Enable https probes /// @@ -254,10 +264,15 @@ pub struct Options { impl Default for Options { fn default() -> Self { Self { + #[cfg(not(wasm_browser))] stun_sock_v4: None, + #[cfg(not(wasm_browser))] stun_sock_v6: None, + #[cfg(not(wasm_browser))] quic_config: None, + #[cfg(not(wasm_browser))] icmp_v4: true, + #[cfg(not(wasm_browser))] icmp_v6: true, https: true, } @@ -268,40 +283,50 @@ impl Options { /// Create an [`Options`] that disables all probes pub fn disabled() -> Self { Self { + #[cfg(not(wasm_browser))] stun_sock_v4: None, + #[cfg(not(wasm_browser))] stun_sock_v6: None, + #[cfg(not(wasm_browser))] quic_config: None, + #[cfg(not(wasm_browser))] icmp_v4: false, + #[cfg(not(wasm_browser))] icmp_v6: false, https: false, } } /// Set the ipv4 stun socket and enable ipv4 stun probes + #[cfg(not(wasm_browser))] pub fn stun_v4(mut self, sock: Option>) -> Self { self.stun_sock_v4 = sock; self } /// Set the ipv6 stun socket and enable ipv6 stun probes + #[cfg(not(wasm_browser))] pub fn stun_v6(mut self, sock: Option>) -> Self { self.stun_sock_v6 = sock; self } /// Enable quic probes + #[cfg(not(wasm_browser))] pub fn quic_config(mut self, quic_config: Option) -> Self { self.quic_config = quic_config; self } /// Enable or disable icmp_v4 probe + #[cfg(not(wasm_browser))] pub fn icmp_v4(mut self, enable: bool) -> Self { self.icmp_v4 = enable; self } /// Enable or disable icmp_v6 probe + #[cfg(not(wasm_browser))] pub fn icmp_v6(mut self, enable: bool) -> Self { self.icmp_v6 = enable; self @@ -316,12 +341,15 @@ impl Options { /// Turn the options into set of valid protocols fn to_protocols(&self) -> BTreeSet { let mut protocols = BTreeSet::new(); + #[cfg(not(wasm_browser))] if self.stun_sock_v4.is_some() { protocols.insert(ProbeProto::StunIpv4); } + #[cfg(not(wasm_browser))] if self.stun_sock_v6.is_some() { protocols.insert(ProbeProto::StunIpv6); } + #[cfg(not(wasm_browser))] if let Some(ref quic) = self.quic_config { if quic.ipv4 { protocols.insert(ProbeProto::QuicIpv4); @@ -330,9 +358,11 @@ impl Options { protocols.insert(ProbeProto::QuicIpv6); } } + #[cfg(not(wasm_browser))] if self.icmp_v4 { protocols.insert(ProbeProto::IcmpV4); } + #[cfg(not(wasm_browser))] if self.icmp_v6 { protocols.insert(ProbeProto::IcmpV6); } @@ -348,10 +378,18 @@ impl Client { /// /// This starts a connected actor in the background. Once the client is dropped it will /// stop running. - pub fn new(port_mapper: Option, dns_resolver: DnsResolver) -> Result { - let mut actor = Actor::new(port_mapper, dns_resolver)?; + pub fn new( + #[cfg(not(wasm_browser))] port_mapper: Option, + #[cfg(not(wasm_browser))] dns_resolver: DnsResolver, + ) -> Result { + let mut actor = Actor::new( + #[cfg(not(wasm_browser))] + port_mapper, + #[cfg(not(wasm_browser))] + dns_resolver, + )?; let addr = actor.addr(); - let task = tokio::spawn( + let task = task::spawn( async move { actor.run().await }.instrument(info_span!("net_report.actor")), ); let drop_guard = AbortOnDropHandle::new(task); @@ -394,14 +432,17 @@ impl Client { pub async fn get_report( &mut self, relay_map: RelayMap, - stun_sock_v4: Option>, - stun_sock_v6: Option>, - quic_config: Option, + #[cfg(not(wasm_browser))] stun_sock_v4: Option>, + #[cfg(not(wasm_browser))] stun_sock_v6: Option>, + #[cfg(not(wasm_browser))] quic_config: Option, ) -> Result> { + #[cfg(not(wasm_browser))] let opts = Options::default() .stun_v4(stun_sock_v4) .stun_v6(stun_sock_v6) .quic_config(quic_config); + #[cfg(wasm_browser)] + let opts = Options::default(); let rx = self.get_report_channel(relay_map.clone(), opts).await?; match rx.await { Ok(res) => res, @@ -554,6 +595,7 @@ struct Actor { /// /// The port mapper is responsible for talking to routers via UPnP and the like to try /// and open ports. + #[cfg(not(wasm_browser))] port_mapper: Option, // Actor state. @@ -565,6 +607,7 @@ struct Actor { current_report_run: Option, /// The DNS resolver to use for probes that need to perform DNS lookups + #[cfg(not(wasm_browser))] dns_resolver: DnsResolver, } @@ -573,16 +616,21 @@ impl Actor { /// /// This does not start the actor, see [`Actor::run`] for this. You should not /// normally create this directly but rather create a [`Client`]. - fn new(port_mapper: Option, dns_resolver: DnsResolver) -> Result { + fn new( + #[cfg(not(wasm_browser))] port_mapper: Option, + #[cfg(not(wasm_browser))] dns_resolver: DnsResolver, + ) -> Result { // TODO: consider an instrumented flume channel so we have metrics. let (sender, receiver) = mpsc::channel(32); Ok(Self { receiver, sender, reports: Default::default(), + #[cfg(not(wasm_browser))] port_mapper, in_flight_stun_requests: Default::default(), current_report_run: None, + #[cfg(not(wasm_browser))] dns_resolver, }) } @@ -638,6 +686,7 @@ impl Actor { response_tx: oneshot::Sender>>, ) { let protocols = opts.to_protocols(); + #[cfg(not(wasm_browser))] let Options { stun_sock_v4, stun_sock_v6, @@ -679,11 +728,16 @@ impl Actor { let actor = reportgen::Client::new( self.addr(), self.reports.last.clone(), + #[cfg(not(wasm_browser))] self.port_mapper.clone(), relay_map, + #[cfg(not(wasm_browser))] stun_sock_v4, + #[cfg(not(wasm_browser))] stun_sock_v6, + #[cfg(not(wasm_browser))] quic_config, + #[cfg(not(wasm_browser))] self.dns_resolver.clone(), protocols, ); @@ -861,6 +915,7 @@ struct ReportRun { } /// Test if IPv6 works at all, or if it's been hard disabled at the OS level. +#[cfg(not(wasm_browser))] pub fn os_has_ipv6() -> bool { UdpSocket::bind_local_v6(0).is_ok() } @@ -899,7 +954,7 @@ pub(crate) mod stun_utils { ); { let sock = sock.clone(); - tokio::spawn( + task::spawn( async move { debug!("udp stun socket listener started"); // TODO: Can we do better for buffers here? Probably doesn't matter much. @@ -988,7 +1043,6 @@ mod tests { use bytes::BytesMut; use netwatch::IpFamily; - use tokio::time; use tokio_util::sync::CancellationToken; use tracing::info; @@ -1388,7 +1442,7 @@ mod tests { let mut actor = Actor::new(None, resolver.clone()).unwrap(); for s in &mut tt.steps { // trigger the timer - time::advance(Duration::from_secs(s.after)).await; + tokio::time::advance(Duration::from_secs(s.after)).await; let r = Arc::try_unwrap(s.r.take().unwrap()).unwrap(); s.r = Some(actor.add_report_history_and_set_preferred_relay(r)); } diff --git a/iroh-net-report/src/ping.rs b/iroh-net-report/src/ping.rs index 2dcebe403e..0e3bde904b 100644 --- a/iroh-net-report/src/ping.rs +++ b/iroh-net-report/src/ping.rs @@ -4,10 +4,10 @@ use std::{ fmt::Debug, net::IpAddr, sync::{Arc, Mutex}, - time::Duration, }; use anyhow::{Context, Result}; +use n0_future::time::Duration; use surge_ping::{Client, Config, IcmpPacket, PingIdentifier, PingSequence, ICMP}; use tracing::debug; diff --git a/iroh-net-report/src/reportgen.rs b/iroh-net-report/src/reportgen.rs index fb144cdc25..d235493199 100644 --- a/iroh-net-report/src/reportgen.rs +++ b/iroh-net-report/src/reportgen.rs @@ -23,10 +23,10 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::Duration, }; use anyhow::{anyhow, bail, Context as _, Result}; +#[cfg(not(wasm_browser))] use hickory_resolver::TokioResolver as DnsResolver; use iroh_base::RelayUrl; #[cfg(feature = "metrics")] @@ -37,26 +37,27 @@ use iroh_relay::{ protos::stun, RelayMap, RelayNode, }; +use n0_future::{ + task::{self, AbortOnDropHandle, JoinSet}, + time::{self, Duration, Instant}, + StreamExt as _, +}; +#[cfg(not(wasm_browser))] use netwatch::{interfaces, UdpSocket}; use rand::seq::IteratorRandom; -use tokio::{ - sync::{mpsc, oneshot}, - task::JoinSet, - time::{self, Instant}, -}; -use tokio_util::task::AbortOnDropHandle; +use tokio::sync::{mpsc, oneshot}; use tracing::{debug, debug_span, error, info_span, trace, warn, Instrument, Span}; use url::Host; +#[cfg(not(wasm_browser))] +use crate::dns::ResolverExt; +#[cfg(not(wasm_browser))] +use crate::ping::{PingError, Pinger}; #[cfg(feature = "metrics")] use crate::Metrics; -use crate::{ - self as net_report, - dns::ResolverExt, - ping::{PingError, Pinger}, - Report, -}; +use crate::{self as net_report, Report}; +#[cfg(not(wasm_browser))] mod hairpin; mod probes; @@ -87,12 +88,12 @@ impl Client { pub(super) fn new( net_report: net_report::Addr, last_report: Option>, - port_mapper: Option, + #[cfg(not(wasm_browser))] port_mapper: Option, relay_map: RelayMap, - stun_sock4: Option>, - stun_sock6: Option>, - quic_config: Option, - dns_resolver: DnsResolver, + #[cfg(not(wasm_browser))] stun_sock4: Option>, + #[cfg(not(wasm_browser))] stun_sock6: Option>, + #[cfg(not(wasm_browser))] quic_config: Option, + #[cfg(not(wasm_browser))] dns_resolver: DnsResolver, protocols: BTreeSet, ) -> Self { let (msg_tx, msg_rx) = mpsc::channel(32); @@ -104,20 +105,25 @@ impl Client { msg_rx, net_report: net_report.clone(), last_report, + #[cfg(not(wasm_browser))] port_mapper, relay_map, + #[cfg(not(wasm_browser))] stun_sock4, + #[cfg(not(wasm_browser))] stun_sock6, + #[cfg(not(wasm_browser))] quic_config, report: Report::default(), + #[cfg(not(wasm_browser))] hairpin_actor: hairpin::Client::new(net_report, addr), outstanding_tasks: OutstandingTasks::default(), + #[cfg(not(wasm_browser))] dns_resolver, protocols, }; - let task = tokio::spawn( - async move { actor.run().await }.instrument(info_span!("reportgen.actor")), - ); + let task = + task::spawn(async move { actor.run().await }.instrument(info_span!("reportgen.actor"))); Self { _drop_guard: AbortOnDropHandle::new(task), } @@ -176,26 +182,32 @@ struct Actor { /// The previous report, if it exists. last_report: Option>, /// The portmapper client, if there is one. + #[cfg(not(wasm_browser))] port_mapper: Option, /// The relay configuration. relay_map: RelayMap, /// Socket to send IPv4 STUN requests from. + #[cfg(not(wasm_browser))] stun_sock4: Option>, /// Socket so send IPv6 STUN requests from. + #[cfg(not(wasm_browser))] stun_sock6: Option>, /// QUIC configuration to do QUIC address Discovery + #[cfg(not(wasm_browser))] quic_config: Option, // Internal state. /// The report being built. report: Report, /// The hairpin actor. + #[cfg(not(wasm_browser))] hairpin_actor: hairpin::Client, /// Which tasks the [`Actor`] is still waiting on. /// /// This is essentially the summary of all the work the [`Actor`] is doing. outstanding_tasks: OutstandingTasks, /// The DNS resolver to use for probes that need to resolve DNS records. + #[cfg(not(wasm_browser))] dns_resolver: DnsResolver, /// Protocols we should attempt to create probes for, if we have the correct /// configuration for that protocol. @@ -234,20 +246,30 @@ impl Actor { /// - Updates the report, cancels unneeded futures. /// - Sends the report to the net_report actor. async fn run_inner(&mut self) -> Result<()> { - debug!( - port_mapper = %self.port_mapper.is_some(), - "reportstate actor starting", - ); + #[cfg(not(wasm_browser))] + let port_mapper = self.port_mapper.is_some(); + #[cfg(wasm_browser)] + let port_mapper = false; + debug!(%port_mapper, "reportstate actor starting"); - self.report.os_has_ipv6 = super::os_has_ipv6(); + #[cfg(not(wasm_browser))] + { + self.report.os_has_ipv6 = super::os_has_ipv6(); + } + #[cfg(not(wasm_browser))] let mut port_mapping = self.prepare_portmapper_task(); + #[cfg(wasm_browser)] + let mut port_mapping = n0_future::future::pending(); + #[cfg(not(wasm_browser))] let mut captive_task = self.prepare_captive_portal_task(); + #[cfg(wasm_browser)] + let mut captive_task = n0_future::future::pending(); let mut probes = self.spawn_probes_task().await?; - let total_timer = tokio::time::sleep(OVERALL_REPORT_TIMEOUT); + let total_timer = time::sleep(OVERALL_REPORT_TIMEOUT); tokio::pin!(total_timer); - let probe_timer = tokio::time::sleep(PROBES_TIMEOUT); + let probe_timer = time::sleep(PROBES_TIMEOUT); tokio::pin!(probe_timer); loop { @@ -275,10 +297,14 @@ impl Actor { // Drive the portmapper. pm = &mut port_mapping, if self.outstanding_tasks.port_mapper => { - debug!(report=?pm, "tick: portmapper probe report"); - self.report.portmap_probe = pm; - port_mapping.inner = None; - self.outstanding_tasks.port_mapper = false; + // This future is completely disabled in wasm. + #[cfg(not(wasm_browser))] + { + debug!(report=?pm, "tick: portmapper probe report"); + self.report.portmap_probe = pm; + port_mapping.inner = None; + self.outstanding_tasks.port_mapper = false; + } } // Check for probes finishing. @@ -299,10 +325,14 @@ impl Actor { // Drive the captive task. found = &mut captive_task, if self.outstanding_tasks.captive_task => { - trace!("tick: captive portal task done"); - self.report.captive_portal = found; - captive_task.inner = None; - self.outstanding_tasks.captive_task = false; + // This future is completely disabled in wasm. + #[cfg(not(wasm_browser))] + { + trace!("tick: captive portal task done"); + self.report.captive_portal = found; + captive_task.inner = None; + self.outstanding_tasks.captive_task = false; + } } // Handle actor messages. @@ -361,6 +391,7 @@ impl Actor { update_report(&mut self.report, probe_report); // When we discover the first IPv4 address we want to start the hairpin actor. + #[cfg(not(wasm_browser))] if let Some(ref addr) = self.report.global_v4 { if !self.hairpin_actor.has_started() { self.hairpin_actor.start_check(*addr); @@ -385,7 +416,7 @@ impl Actor { delay=?timeout, "Have enough probe reports, aborting further probes soon", ); - tokio::spawn( + task::spawn( async move { time::sleep(timeout).await; // Because we do this after a timeout it is entirely normal that the @@ -409,6 +440,7 @@ impl Actor { } // If the probe is for IPv6 and we don't yet have an IPv6 report, that would help. + #[cfg(not(wasm_browser))] if probe.proto() == ProbeProto::StunIpv6 && self.report.relay_v6_latency.is_empty() { return true; } @@ -419,6 +451,7 @@ impl Actor { // talking to. If we don't yet have two results yet // (`mapping_varies_by_dest_ip` is blank), then another IPv4 probe // would be good. + #[cfg(not(wasm_browser))] if probe.proto() == ProbeProto::StunIpv4 && self.report.mapping_varies_by_dest_ip.is_none() { return true; @@ -445,6 +478,7 @@ impl Actor { /// Creates the future which will perform the portmapper task. /// /// The returned future will run the portmapper, if enabled, resolving to it's result. + #[cfg(not(wasm_browser))] fn prepare_portmapper_task( &mut self, ) -> MaybeFuture>>>> { @@ -469,6 +503,7 @@ impl Actor { } /// Creates the future which will perform the captive portal check. + #[cfg(not(wasm_browser))] fn prepare_captive_portal_task( &mut self, ) -> MaybeFuture>>>> { @@ -488,9 +523,9 @@ impl Actor { self.outstanding_tasks.captive_task = true; MaybeFuture { inner: Some(Box::pin(async move { - tokio::time::sleep(CAPTIVE_PORTAL_DELAY).await; + time::sleep(CAPTIVE_PORTAL_DELAY).await; debug!("Captive portal check started after {CAPTIVE_PORTAL_DELAY:?}"); - let captive_portal_check = tokio::time::timeout( + let captive_portal_check = time::timeout( CAPTIVE_PORTAL_TIMEOUT, check_captive_portal(&dns_resolver, &dm, preferred_relay) .instrument(debug_span!("captive-portal")), @@ -540,19 +575,31 @@ impl Actor { /// - Once there are [`ProbeReport`]s from enough nodes, all remaining probes are /// aborted. That is, the main actor loop stops polling them. async fn spawn_probes_task(&mut self) -> Result>> { + #[cfg(not(wasm_browser))] let if_state = interfaces::State::new().await; + #[cfg(not(wasm_browser))] debug!(%if_state, "Local interfaces"); let plan = match self.last_report { - Some(ref report) => { - ProbePlan::with_last_report(&self.relay_map, &if_state, report, &self.protocols) - } - None => ProbePlan::initial(&self.relay_map, &if_state, &self.protocols), + Some(ref report) => ProbePlan::with_last_report( + &self.relay_map, + #[cfg(not(wasm_browser))] + &if_state, + report, + &self.protocols, + ), + None => ProbePlan::initial( + &self.relay_map, + #[cfg(not(wasm_browser))] + &if_state, + &self.protocols, + ), }; trace!(%plan, "probe plan"); // The pinger is created here so that any sockets that might be bound for it are // shared between the probes that use it. It binds sockets lazily, so we can always // create it. + #[cfg(not(wasm_browser))] let pinger = Pinger::new(); // A collection of futures running probe sets. @@ -561,25 +608,35 @@ impl Actor { let mut set = JoinSet::default(); for probe in probe_set { let reportstate = self.addr(); + #[cfg(not(wasm_browser))] let stun_sock4 = self.stun_sock4.clone(); + #[cfg(not(wasm_browser))] let stun_sock6 = self.stun_sock6.clone(); + #[cfg(not(wasm_browser))] let quic_config = self.quic_config.clone(); let relay_node = probe.node().clone(); let probe = probe.clone(); let net_report = self.net_report.clone(); + #[cfg(not(wasm_browser))] let pinger = pinger.clone(); + #[cfg(not(wasm_browser))] let dns_resolver = self.dns_resolver.clone(); set.spawn( run_probe( reportstate, + #[cfg(not(wasm_browser))] stun_sock4, + #[cfg(not(wasm_browser))] stun_sock6, + #[cfg(not(wasm_browser))] quic_config, relay_node, probe.clone(), net_report, + #[cfg(not(wasm_browser))] pinger, + #[cfg(not(wasm_browser))] dns_resolver, ) .instrument(debug_span!("run_probe", %probe)), @@ -708,18 +765,18 @@ pub struct QuicConfig { #[allow(clippy::too_many_arguments)] async fn run_probe( reportstate: Addr, - stun_sock4: Option>, - stun_sock6: Option>, - quic_config: Option, + #[cfg(not(wasm_browser))] stun_sock4: Option>, + #[cfg(not(wasm_browser))] stun_sock6: Option>, + #[cfg(not(wasm_browser))] quic_config: Option, relay_node: Arc, probe: Probe, net_report: net_report::Addr, - pinger: Pinger, - dns_resolver: DnsResolver, + #[cfg(not(wasm_browser))] pinger: Pinger, + #[cfg(not(wasm_browser))] dns_resolver: DnsResolver, ) -> Result { if !probe.delay().is_zero() { trace!("delaying probe"); - tokio::time::sleep(probe.delay()).await; + time::sleep(probe.delay()).await; } debug!("starting probe"); @@ -749,6 +806,7 @@ async fn run_probe( )); } + #[cfg(not(wasm_browser))] let relay_addr = get_relay_addr(&dns_resolver, &relay_node, probe.proto()) .await .context("no relay node addr") @@ -756,6 +814,7 @@ async fn run_probe( let mut result = ProbeReport::new(probe.clone()); match probe { + #[cfg(not(wasm_browser))] Probe::StunIpv4 { .. } | Probe::StunIpv6 { .. } => { let maybe_sock = if matches!(probe, Probe::StunIpv4 { .. }) { stun_sock4.as_ref() @@ -774,13 +833,22 @@ async fn run_probe( } } } + #[cfg(not(wasm_browser))] Probe::IcmpV4 { .. } | Probe::IcmpV6 { .. } => { result = run_icmp_probe(probe, relay_addr, pinger).await? } Probe::Https { ref node, .. } => { debug!("sending probe HTTPS"); - match measure_https_latency(&dns_resolver, node, None).await { + match measure_https_latency( + #[cfg(not(wasm_browser))] + &dns_resolver, + node, + None, + ) + .await + { Ok((latency, ip)) => { + debug!(?latency, "latency"); result.latency = Some(latency); // We set these IPv4 and IPv6 but they're not really used // and we don't necessarily set them both. If UDP is blocked @@ -798,6 +866,7 @@ async fn run_probe( } } + #[cfg(not(wasm_browser))] Probe::QuicIpv4 { ref node, .. } | Probe::QuicIpv6 { ref node, .. } => { debug!("sending QUIC address discovery probe"); let url = node.url.clone(); @@ -820,6 +889,7 @@ async fn run_probe( } /// Run a STUN IPv4 or IPv6 probe. +#[cfg(not(wasm_browser))] async fn run_stun_probe( sock: &Arc, relay_addr: SocketAddr, @@ -903,6 +973,7 @@ async fn run_stun_probe( } /// Run a QUIC address discovery probe. +#[cfg(not(wasm_browser))] async fn run_quic_probe( quic_config: QuicConfig, url: RelayUrl, @@ -945,6 +1016,7 @@ async fn run_quic_probe( /// return a "204 No Content" response and checking if that's what we get. /// /// The boolean return is whether we think we have a captive portal. +#[cfg(not(wasm_browser))] async fn check_captive_portal( dns_resolver: &DnsResolver, dm: &RelayMap, @@ -978,6 +1050,7 @@ async fn check_captive_portal( }; let mut builder = reqwest::ClientBuilder::new().redirect(reqwest::redirect::Policy::none()); + if let Some(Host::Domain(domain)) = url.host() { // Use our own resolver rather than getaddrinfo // @@ -1028,6 +1101,7 @@ async fn check_captive_portal( /// Returns the proper port based on the protocol of the probe. fn get_port(relay_node: &RelayNode, proto: &ProbeProto) -> Result { match proto { + #[cfg(not(wasm_browser))] ProbeProto::QuicIpv4 | ProbeProto::QuicIpv6 => { if let Some(ref quic) = relay_node.quic { if quic.port == 0 { @@ -1054,6 +1128,7 @@ fn get_port(relay_node: &RelayNode, proto: &ProbeProto) -> Result { /// *proto* specifies the protocol of the probe. Depending on the protocol we may return /// different results. Obviously IPv4 vs IPv6 but a [`RelayNode`] may also have disabled /// some protocols. +#[cfg(not(wasm_browser))] async fn get_relay_addr( dns_resolver: &DnsResolver, relay_node: &RelayNode, @@ -1111,6 +1186,7 @@ async fn get_relay_addr( /// /// The `pinger` is passed in so the ping sockets are only bound once /// for the probe set. +#[cfg(not(wasm_browser))] async fn run_icmp_probe( probe: Probe, relay_addr: SocketAddr, @@ -1131,6 +1207,7 @@ async fn run_icmp_probe( anyhow!("Failed to create pinger ({err:#}), aborting probeset"), probe.clone(), ), + #[cfg(not(wasm_browser))] PingError::Ping(err) => ProbeError::Error(err.into(), probe.clone()), })?; debug!(dst = %relay_addr, len = DATA.len(), ?latency, "ICMP ping done"); @@ -1155,7 +1232,7 @@ async fn run_icmp_probe( /// use of self-signed certificates for servers. Currently this is used for testing. #[allow(clippy::unused_async)] async fn measure_https_latency( - dns_resolver: &DnsResolver, + #[cfg(not(wasm_browser))] dns_resolver: &DnsResolver, node: &RelayNode, certs: Option>>, ) -> Result<(Duration, IpAddr)> { @@ -1164,7 +1241,14 @@ async fn measure_https_latency( // This should also use same connection establishment as relay client itself, which // needs to be more configurable so users can do more crazy things: // https://github.com/n0-computer/iroh/issues/2901 - let mut builder = reqwest::ClientBuilder::new().redirect(reqwest::redirect::Policy::none()); + let mut builder = reqwest::ClientBuilder::new(); + + #[cfg(not(wasm_browser))] + { + builder = builder.redirect(reqwest::redirect::Policy::none()); + } + + #[cfg(not(wasm_browser))] if let Some(Host::Domain(domain)) = url.host() { // Use our own resolver rather than getaddrinfo // @@ -1180,6 +1264,8 @@ async fn measure_https_latency( .collect(); builder = builder.resolve_to_addrs(domain, &addrs); } + + #[cfg(not(wasm_browser))] if let Some(certs) = certs { for cert in certs { let cert = reqwest::Certificate::from_der(&cert)?; @@ -1189,24 +1275,30 @@ async fn measure_https_latency( let client = builder.build()?; let start = Instant::now(); - let mut response = client.request(reqwest::Method::GET, url).send().await?; + let response = client.request(reqwest::Method::GET, url).send().await?; let latency = start.elapsed(); if response.status().is_success() { + // Only `None` if a different hyper HttpConnector in the request. + #[cfg(not(wasm_browser))] + let remote_ip = response + .remote_addr() + .context("missing HttpInfo from HttpConnector")? + .ip(); + #[cfg(wasm_browser)] + let remote_ip = IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED); + // Drain the response body to be nice to the server, up to a limit. const MAX_BODY_SIZE: usize = 8 << 10; // 8 KiB let mut body_size = 0; - while let Some(chunk) = response.chunk().await? { + let mut stream = response.bytes_stream(); + // ignore failing frames + while let Some(Ok(chunk)) = stream.next().await { body_size += chunk.len(); if body_size >= MAX_BODY_SIZE { break; } } - // Only `None` if a different hyper HttpConnector in the request. - let remote_ip = response - .remote_addr() - .context("missing HttpInfo from HttpConnector")? - .ip(); Ok((latency, remote_ip)) } else { Err(anyhow!( @@ -1224,6 +1316,7 @@ fn update_report(report: &mut Report, probe_report: ProbeReport) { .relay_latency .update_relay(relay_node.url.clone(), latency); + #[cfg(not(wasm_browser))] if matches!( probe_report.probe.proto(), ProbeProto::StunIpv4 diff --git a/iroh-net-report/src/reportgen/hairpin.rs b/iroh-net-report/src/reportgen/hairpin.rs index 17fd49e4f5..15c67da38a 100644 --- a/iroh-net-report/src/reportgen/hairpin.rs +++ b/iroh-net-report/src/reportgen/hairpin.rs @@ -16,9 +16,12 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use anyhow::{bail, Context, Result}; use iroh_relay::protos::stun; +use n0_future::{ + task::{self, AbortOnDropHandle}, + time::{self, Instant}, +}; use netwatch::UdpSocket; -use tokio::{sync::oneshot, time::Instant}; -use tokio_util::task::AbortOnDropHandle; +use tokio::sync::oneshot; use tracing::{debug, error, info_span, trace, warn, Instrument}; use crate::{self as net_report, defaults::timeouts::HAIRPIN_CHECK_TIMEOUT, reportgen, Inflight}; @@ -43,7 +46,7 @@ impl Client { }; let task = - tokio::spawn(async move { actor.run().await }.instrument(info_span!("hairpin.actor"))); + task::spawn(async move { actor.run().await }.instrument(info_span!("hairpin.actor"))); Self { addr: Some(addr), _drop_guard: AbortOnDropHandle::new(task), @@ -127,7 +130,7 @@ impl Actor { } let now = Instant::now(); - let hairpinning_works = match tokio::time::timeout(HAIRPIN_CHECK_TIMEOUT, stun_rx).await { + let hairpinning_works = match time::timeout(HAIRPIN_CHECK_TIMEOUT, stun_rx).await { Ok(Ok(_)) => true, Ok(Err(_)) => bail!("net_report actor dropped stun response channel"), Err(_) => false, // Elapsed diff --git a/iroh-net-report/src/reportgen/probes.rs b/iroh-net-report/src/reportgen/probes.rs index 38bf51202f..ecfa8e17fc 100644 --- a/iroh-net-report/src/reportgen/probes.rs +++ b/iroh-net-report/src/reportgen/probes.rs @@ -9,8 +9,9 @@ use std::{collections::BTreeSet, fmt, sync::Arc}; use anyhow::{ensure, Result}; use iroh_base::RelayUrl; use iroh_relay::{RelayMap, RelayNode}; +use n0_future::time::Duration; +#[cfg(not(wasm_browser))] use netwatch::interfaces; -use tokio::time::Duration; use crate::Report; @@ -47,24 +48,31 @@ const NUM_INCREMENTAL_RELAYS: usize = 3; #[repr(u8)] pub enum ProbeProto { /// STUN IPv4 + #[cfg(not(wasm_browser))] StunIpv4, /// STUN IPv6 + #[cfg(not(wasm_browser))] StunIpv6, /// HTTPS Https, /// ICMP IPv4 + #[cfg(not(wasm_browser))] IcmpV4, /// ICMP IPv6 + #[cfg(not(wasm_browser))] IcmpV6, /// QUIC Address Discovery Ipv4 + #[cfg(not(wasm_browser))] QuicIpv4, /// QUIC Address Discovery Ipv6 + #[cfg(not(wasm_browser))] QuicIpv6, } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, derive_more::Display)] pub(super) enum Probe { #[display("STUN Ipv4 after {delay:?} to {node}")] + #[cfg(not(wasm_browser))] StunIpv4 { /// When the probe is started, relative to the time that `get_report` is called. /// One probe in each `ProbePlan` should have a delay of 0. Non-zero values @@ -75,6 +83,7 @@ pub(super) enum Probe { node: Arc, }, #[display("STUN Ipv6 after {delay:?} to {node}")] + #[cfg(not(wasm_browser))] StunIpv6 { delay: Duration, node: Arc, @@ -85,21 +94,25 @@ pub(super) enum Probe { node: Arc, }, #[display("ICMPv4 after {delay:?} to {node}")] + #[cfg(not(wasm_browser))] IcmpV4 { delay: Duration, node: Arc, }, #[display("ICMPv6 after {delay:?} to {node}")] + #[cfg(not(wasm_browser))] IcmpV6 { delay: Duration, node: Arc, }, #[display("QAD Ipv4 after {delay:?} to {node}")] + #[cfg(not(wasm_browser))] QuicIpv4 { delay: Duration, node: Arc, }, #[display("QAD Ipv6 after {delay:?} to {node}")] + #[cfg(not(wasm_browser))] QuicIpv6 { delay: Duration, node: Arc, @@ -109,6 +122,7 @@ pub(super) enum Probe { impl Probe { pub(super) fn delay(&self) -> Duration { match self { + #[cfg(not(wasm_browser))] Probe::StunIpv4 { delay, .. } | Probe::StunIpv6 { delay, .. } | Probe::Https { delay, .. } @@ -116,23 +130,32 @@ impl Probe { | Probe::IcmpV6 { delay, .. } | Probe::QuicIpv4 { delay, .. } | Probe::QuicIpv6 { delay, .. } => *delay, + #[cfg(wasm_browser)] + Probe::Https { delay, .. } => *delay, } } pub(super) fn proto(&self) -> ProbeProto { match self { + #[cfg(not(wasm_browser))] Probe::StunIpv4 { .. } => ProbeProto::StunIpv4, + #[cfg(not(wasm_browser))] Probe::StunIpv6 { .. } => ProbeProto::StunIpv6, Probe::Https { .. } => ProbeProto::Https, + #[cfg(not(wasm_browser))] Probe::IcmpV4 { .. } => ProbeProto::IcmpV4, + #[cfg(not(wasm_browser))] Probe::IcmpV6 { .. } => ProbeProto::IcmpV6, + #[cfg(not(wasm_browser))] Probe::QuicIpv4 { .. } => ProbeProto::QuicIpv4, + #[cfg(not(wasm_browser))] Probe::QuicIpv6 { .. } => ProbeProto::QuicIpv6, } } pub(super) fn node(&self) -> &Arc { match self { + #[cfg(not(wasm_browser))] Probe::StunIpv4 { node, .. } | Probe::StunIpv6 { node, .. } | Probe::Https { node, .. } @@ -140,6 +163,8 @@ impl Probe { | Probe::IcmpV6 { node, .. } | Probe::QuicIpv4 { node, .. } | Probe::QuicIpv6 { node, .. } => node, + #[cfg(wasm_browser)] + Probe::Https { node, .. } => node, } } } @@ -221,6 +246,7 @@ pub(super) struct ProbePlan { impl ProbePlan { /// Creates an initial probe plan. + #[cfg(not(wasm_browser))] pub(super) fn initial( relay_map: &RelayMap, if_state: &interfaces::State, @@ -323,7 +349,45 @@ impl ProbePlan { plan } + /// Creates an initial probe plan. + #[cfg(wasm_browser)] + pub(super) fn initial(relay_map: &RelayMap, protocols: &BTreeSet) -> Self { + let mut plan = Self { + set: BTreeSet::new(), + protocols: protocols.clone(), + }; + + // The first time we need add probes after the STUN we record this delay, so that + // further relay server can reuse this delay. + let mut max_high_prio_delay: Option = None; + + for relay_node in relay_map.nodes() { + let mut https_probes = ProbeSet::new(ProbeProto::Https); + + for attempt in 0..3 { + let mut start = *max_high_prio_delay.get_or_insert_with(|| plan.max_delay()); + // if there are high priority probes, ensure there is a buffer between + // the highest probe delay and the next probes we create + // if there are no high priority probes, we don't need a buffer + if plan.has_priority_probes() { + start += DEFAULT_INITIAL_RETRANSMIT; + } + let delay = start + DEFAULT_INITIAL_RETRANSMIT * attempt as u32; + https_probes + .push(Probe::Https { + delay, + node: relay_node.clone(), + }) + .expect("adding Https probe to a Https probe set"); + } + + plan.add_if_enabled(https_probes); + } + plan + } + /// Creates a follow up probe plan using a previous net_report report. + #[cfg(not(wasm_browser))] pub(super) fn with_last_report( relay_map: &RelayMap, if_state: &interfaces::State, @@ -465,6 +529,68 @@ impl ProbePlan { plan } + #[cfg(wasm_browser)] + pub(super) fn with_last_report( + relay_map: &RelayMap, + last_report: &Report, + protocols: &BTreeSet, + ) -> Self { + if last_report.relay_latency.is_empty() { + return Self::initial(relay_map, protocols); + } + let mut plan = Self { + set: Default::default(), + protocols: protocols.clone(), + }; + + // The first time we need add probes after the STUN we record this delay, so that + // further relay servers can reuse this delay. + let mut max_stun_delay: Option = None; + + let sorted_relays = sort_relays(relay_map, last_report); + for (ri, (url, relay_node)) in sorted_relays.into_iter().enumerate() { + if ri == NUM_INCREMENTAL_RELAYS { + break; + } + + // By default, each node only gets one STUN packet sent, + // except the fastest two from the previous round. + let mut attempts = 1; + let is_fastest_two = ri < 2; + + if is_fastest_two { + attempts = 2; + } + if Some(url) == last_report.preferred_relay.as_ref() { + // But if we already had a relay home, try extra hard to + // make sure it's there so we don't flip flop around. + attempts = 4; + } + let retransmit_delay = last_report + .relay_latency + .get(url) + .map(|l| l * 120 / 100) // increases latency by 20%, why? + .unwrap_or(DEFAULT_ACTIVE_RETRANSMIT_DELAY); + + let mut https_probes = ProbeSet::new(ProbeProto::Https); + let start = *max_stun_delay.get_or_insert_with(|| plan.max_delay()); + for attempt in 0..attempts { + let delay = start + + (retransmit_delay * attempt as u32) + + (ACTIVE_RETRANSMIT_EXTRA_DELAY * (attempt as u32 + 1)); + https_probes + .push(Probe::Https { + delay, + node: relay_node.clone(), + }) + .expect("Pushing Https Probe to an Https ProbeSet"); + } + + plan.add_if_enabled(https_probes); + } + plan + } + /// Returns an iterator over the [`ProbeSet`]s in this plan. pub(super) fn iter(&self) -> impl Iterator { self.set.iter() @@ -490,6 +616,7 @@ impl ProbePlan { /// Stun & Quic probes are "priority" probes fn has_priority_probes(&self) -> bool { + #[cfg(not(wasm_browser))] for probe in &self.set { if matches!( probe.proto, diff --git a/iroh-relay/Cargo.toml b/iroh-relay/Cargo.toml index ea715d5768..028e033875 100644 --- a/iroh-relay/Cargo.toml +++ b/iroh-relay/Cargo.toml @@ -29,7 +29,7 @@ hyper = { version = "1", features = ["server", "client", "http1"] } hyper-util = "0.1.1" iroh-base = { version = "0.31.0", path = "../iroh-base", default-features = false, features = ["key", "relay"] } iroh-metrics = { version = "0.31", default-features = false } -n0-future = { version = "0.0.1" } +n0-future = "0.1.2" num_enum = "0.7" pin-project = "1" postcard = { version = "1", default-features = false, features = [ diff --git a/iroh-relay/src/client/conn.rs b/iroh-relay/src/client/conn.rs index 918ce38c14..dd84ce3e0b 100644 --- a/iroh-relay/src/client/conn.rs +++ b/iroh-relay/src/client/conn.rs @@ -6,13 +6,12 @@ use std::{ io, pin::Pin, task::{Context, Poll}, - time::Duration, }; use anyhow::{bail, Result}; use bytes::Bytes; use iroh_base::{NodeId, SecretKey}; -use n0_future::{Sink, Stream}; +use n0_future::{time::Duration, Sink, Stream}; use tokio_tungstenite_wasm::WebSocketStream; #[cfg(not(wasm_browser))] use tokio_util::codec::Framed; diff --git a/iroh-relay/src/client/connect_relay.rs b/iroh-relay/src/client/connect_relay.rs index 06e1df1087..329ecce121 100644 --- a/iroh-relay/src/client/connect_relay.rs +++ b/iroh-relay/src/client/connect_relay.rs @@ -25,6 +25,7 @@ use hyper::{ upgrade::Parts, Request, }; +use n0_future::{task, time}; use rustls::client::Resumption; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{error, info_span, Instrument}; @@ -117,7 +118,7 @@ impl ClientBuilder { let (mut request_sender, connection) = hyper::client::conn::http1::Builder::new() .handshake(io) .await?; - tokio::spawn( + task::spawn( // This task drives the HTTP exchange, completes once connection is upgraded. async move { debug!("HTTP upgrade driver started"); @@ -169,14 +170,13 @@ impl ClientBuilder { let addr = SocketAddr::new(dst_ip, port); debug!("connecting to {}", addr); - let tcp_stream = - tokio::time::timeout( - DIAL_NODE_TIMEOUT, - async move { TcpStream::connect(addr).await }, - ) - .await - .context("Timeout connecting")? - .context("Failed connecting")?; + let tcp_stream = time::timeout( + DIAL_NODE_TIMEOUT, + async move { TcpStream::connect(addr).await }, + ) + .await + .context("Timeout connecting")? + .context("Failed connecting")?; tcp_stream.set_nodelay(true)?; Ok(tcp_stream) @@ -203,7 +203,7 @@ impl ClientBuilder { debug!(%proxy_addr, "connecting to proxy"); - let tcp_stream = tokio::time::timeout(DIAL_NODE_TIMEOUT, async move { + let tcp_stream = time::timeout(DIAL_NODE_TIMEOUT, async move { TcpStream::connect(proxy_addr).await }) .await @@ -256,7 +256,7 @@ impl ClientBuilder { debug!("Sending proxy request: {:?}", req); let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; - tokio::task::spawn(async move { + task::spawn(async move { if let Err(err) = conn.with_upgrades().await { error!("Proxy connection failed: {:?}", err); } diff --git a/iroh-relay/src/server/resolver.rs b/iroh-relay/src/server/resolver.rs index f4e9b8f2bc..eb06ccca0f 100644 --- a/iroh-relay/src/server/resolver.rs +++ b/iroh-relay/src/server/resolver.rs @@ -1,12 +1,17 @@ -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use anyhow::{anyhow, Result}; +use n0_future::{ + task::{self, AbortOnDropHandle}, + time, + time::Duration, +}; use reloadable_state::Reloadable; use rustls::{ server::{ClientHello, ResolvesServerCert}, sign::CertifiedKey, }; -use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; +use tokio_util::sync::CancellationToken; /// The default certificate reload interval. pub const DEFAULT_CERT_RELOAD_INTERVAL: Duration = Duration::from_secs(60 * 60 * 24); @@ -38,8 +43,8 @@ where // Spawn a task to reload the certificate every interval. let _reloadable = reloadable.clone(); let _cancel_token = cancel_token.clone(); - let _handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(interval); + let _handle = task::spawn(async move { + let mut interval = time::interval(interval); loop { tokio::select! { _ = interval.tick() => { diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index fc6b9537fc..26882c721a 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -19,8 +19,7 @@ workspace = true aead = { version = "0.5.2", features = ["bytes"] } anyhow = { version = "1" } concurrent-queue = "2.5" -axum = { version = "0.7", optional = true } -backoff = { version = "0.4.0", features = ["futures", "tokio"]} +backoff = { version = "0.4.0", features = ["futures"] } bytes = "1.7" crypto_box = { version = "0.9.1", features = ["serde", "chacha20"] } data-encoding = "2.2" @@ -34,28 +33,16 @@ derive_more = { version = "1.0.0", features = [ "from_str" ] } ed25519-dalek = "2.0" -futures-buffered = "0.2.8" -futures-lite = "2.5" -futures-sink = "0.3" -futures-util = "0.3" -governor = "0.7.0" -hickory-resolver = { version = "=0.25.0-alpha.4" } -http = "1" -http-body-util = "0.1.0" -hyper = { version = "1", features = ["server", "client", "http1"] } -hyper-util = "0.1.1" -igd-next = { version = "0.15.1", features = ["aio_tokio"] } +n0-future = "0.1.2" iroh-base = { version = "0.31.0", default-features = false, features = ["key", "relay"], path = "../iroh-base" } iroh-relay = { version = "0.31", path = "../iroh-relay", default-features = false } -netdev = "0.31.0" -netwatch = { version = "0.3" } +net-report = { package = "iroh-net-report", path = "../iroh-net-report", version = "0.31", default-features = false } pin-project = "1" pkarr = { version = "2", default-features = false, features = [ "async", "relay", ] } -portmapper = { version = "0.3", default-features = false } -quinn = { package = "iroh-quinn", version = "0.13.0" } +quinn = { package = "iroh-quinn", version = "0.13.0", default-features = false, features = ["rustls-ring"] } quinn-proto = { package = "iroh-quinn-proto", version = "0.13.0" } quinn-udp = { package = "iroh-quinn-udp", version = "0.5.7" } rand = "0.8" @@ -75,25 +62,14 @@ tokio = { version = "1", features = [ "macros", "sync", "rt", - "net", - "fs", - "io-std", - "signal", - "process", -] } -tokio-rustls = { version = "0.26", default-features = false, features = [ - "logging", - "ring", ] } -tokio-stream = { version = "0.1.15" } -tokio-util = { version = "0.7", features = ["io-util", "io", "codec", "rt"] } +tokio-util = { version = "0.7", features = ["io-util", "io", "rt"] } tracing = "0.1" url = { version = "2.5", features = ["serde"] } webpki = { package = "rustls-webpki", version = "0.102" } webpki-roots = "0.26" x509-parser = "0.16" z32 = "1.0.3" -net-report = { package = "iroh-net-report", path = "../iroh-net-report", version = "0.31", default-features = false } # metrics iroh-metrics = { version = "0.31", default-features = false } @@ -101,6 +77,10 @@ iroh-metrics = { version = "0.31", default-features = false } # local-swarm-discovery swarm-discovery = { version = "0.3.0-alpha.1", optional = true } +# test_utils +axum = { version = "0.7", optional = true } +http = "1" + # Examples clap = { version = "4", features = ["derive"], optional = true } tracing-subscriber = { version = "0.3", features = [ @@ -108,6 +88,35 @@ tracing-subscriber = { version = "0.3", features = [ ], optional = true } indicatif = { version = "0.17", features = ["tokio"], optional = true } parse-size = { version = "=1.0.0", optional = true } # pinned version to avoid bumping msrv to 1.81 +atomic-waker = "1.1.2" + +# non-wasm-in-browser dependencies +[target.'cfg(not(all(target_family = "wasm", target_os = "unknown")))'.dependencies] +hickory-resolver = { version = "=0.25.0-alpha.4" } +igd-next = { version = "0.15.1", features = ["aio_tokio"] } +netdev = { version = "0.31.0" } +netwatch = { version = "0.3" } +portmapper = { version = "0.3", default-features = false } +quinn = { package = "iroh-quinn", version = "0.13.0", default-features = false, features = ["platform-verifier", "runtime-tokio", "rustls-ring"] } +tokio-stream = { version = "0.1.15" } +tokio = { version = "1", features = [ + "io-util", + "macros", + "sync", + "rt", + "net", + "fs", + "io-std", + "signal", + "process", +] } + +# wasm-in-browser dependencies +[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies] +wasm-bindgen-futures = "0.4" +# we don't use instant nor time directly, but need to enable it because backoff and x509_parser use these in browsers and we need to enable some feature flags for that to work +instant = { version = "0.1", features = ["wasm-bindgen"] } +time = { version = "0.3", features = ["wasm-bindgen"] } [dev-dependencies] axum = { version = "0.7" } @@ -130,6 +139,9 @@ serde_json = "1" testresult = "0.4.0" iroh-relay = { path = "../iroh-relay", default-features = false, features = ["test-utils", "server"] } +[build-dependencies] +cfg_aliases = { version = "0.2" } + [features] default = ["metrics"] metrics = ["iroh-metrics/metrics", "iroh-relay/metrics", "net-report/metrics", "portmapper/metrics"] diff --git a/iroh/bench/Cargo.toml b/iroh/bench/Cargo.toml index 6b3af918f3..6be3f82c15 100644 --- a/iroh/bench/Cargo.toml +++ b/iroh/bench/Cargo.toml @@ -11,6 +11,7 @@ bytes = "1.7" hdrhistogram = { version = "7.2", default-features = false } iroh = { path = ".." } iroh-metrics = "0.31" +n0-future = "0.1.1" quinn = { package = "iroh-quinn", version = "0.13" } rcgen = "0.13" rustls = { version = "0.23", default-features = false, features = ["ring"] } @@ -24,7 +25,6 @@ tracing-subscriber = { version = "0.3.0", default-features = false, features = [ "time", "local-time", ] } -futures-lite = "2.5" [features] default = [] diff --git a/iroh/bench/src/stats.rs b/iroh/bench/src/stats.rs index 17816523f3..668ba22d40 100644 --- a/iroh/bench/src/stats.rs +++ b/iroh/bench/src/stats.rs @@ -1,6 +1,5 @@ -use std::time::Duration; - use hdrhistogram::Histogram; +use n0_future::time::Duration; #[derive(Default, Debug)] pub struct Stats { diff --git a/iroh/build.rs b/iroh/build.rs new file mode 100644 index 0000000000..7aae56820c --- /dev/null +++ b/iroh/build.rs @@ -0,0 +1,9 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + // Setup cfg aliases + cfg_aliases! { + // Convenience aliases + wasm_browser: { all(target_family = "wasm", target_os = "unknown") }, + } +} diff --git a/iroh/examples/echo.rs b/iroh/examples/echo.rs index 89bd643a34..29548be502 100644 --- a/iroh/examples/echo.rs +++ b/iroh/examples/echo.rs @@ -7,12 +7,12 @@ //! cargo run --example echo --features=examples use anyhow::Result; -use futures_lite::future::Boxed as BoxedFuture; use iroh::{ endpoint::Connecting, protocol::{ProtocolHandler, Router}, Endpoint, NodeAddr, }; +use n0_future::boxed::BoxFuture; /// Each protocol is identified by its ALPN string. /// @@ -51,6 +51,9 @@ async fn connect_side(addr: NodeAddr) -> Result<()> { let response = recv.read_to_end(1000).await?; assert_eq!(&response, b"Hello, world!"); + conn.close(0u32.into(), b"bye!"); + conn.closed().await; + Ok(()) } @@ -71,7 +74,7 @@ impl ProtocolHandler for Echo { /// /// The returned future runs on a newly spawned tokio task, so it can run as long as /// the connection lasts. - fn accept(&self, connecting: Connecting) -> BoxedFuture> { + fn accept(&self, connecting: Connecting) -> BoxFuture> { // We have to return a boxed future from the handler. Box::pin(async move { // Wait for the connection to be fully established. diff --git a/iroh/examples/search.rs b/iroh/examples/search.rs index d60b629038..1cd265dfa8 100644 --- a/iroh/examples/search.rs +++ b/iroh/examples/search.rs @@ -33,12 +33,12 @@ use std::{collections::BTreeSet, sync::Arc}; use anyhow::Result; use clap::Parser; -use futures_lite::future::Boxed as BoxedFuture; use iroh::{ endpoint::Connecting, protocol::{ProtocolHandler, Router}, Endpoint, NodeId, }; +use n0_future::boxed::BoxFuture; use tokio::sync::Mutex; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -127,7 +127,7 @@ impl ProtocolHandler for BlobSearch { /// /// The returned future runs on a newly spawned tokio task, so it can run as long as /// the connection lasts. - fn accept(&self, connecting: Connecting) -> BoxedFuture> { + fn accept(&self, connecting: Connecting) -> BoxFuture> { let this = self.clone(); // We have to return a boxed future from the handler. Box::pin(async move { diff --git a/iroh/src/defaults.rs b/iroh/src/defaults.rs index 9c910f003e..8acf51c0d2 100644 --- a/iroh/src/defaults.rs +++ b/iroh/src/defaults.rs @@ -139,7 +139,7 @@ pub mod staging { /// Contains all timeouts that we use in `iroh`. pub(crate) mod timeouts { - use std::time::Duration; + use n0_future::time::Duration; // Timeouts for net_report diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index a580e341cf..cc1ef84392 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -104,17 +104,22 @@ //! [`LocalSwarmDiscovery`]: local_swarm_discovery::LocalSwarmDiscovery //! [`StaticProvider`]: static_provider::StaticProvider -use std::{collections::BTreeSet, net::SocketAddr, sync::Arc, time::Duration}; +use std::{collections::BTreeSet, net::SocketAddr, sync::Arc}; use anyhow::{anyhow, ensure, Result}; -use futures_lite::stream::{Boxed as BoxStream, StreamExt}; use iroh_base::{NodeAddr, NodeId, RelayUrl}; +use n0_future::{ + boxed::BoxStream, + stream::StreamExt, + task::{self, AbortOnDropHandle}, + time::{self, Duration}, +}; use tokio::sync::oneshot; -use tokio_util::task::AbortOnDropHandle; use tracing::{debug, error_span, warn, Instrument}; use crate::Endpoint; +#[cfg(not(wasm_browser))] pub mod dns; #[cfg(feature = "discovery-local-network")] @@ -257,7 +262,7 @@ impl Discovery for ConcurrentDiscovery { .iter() .filter_map(|service| service.resolve(endpoint.clone(), node_id)); - let streams = futures_buffered::MergeBounded::from_iter(streams); + let streams = n0_future::MergeBounded::from_iter(streams); Some(Box::pin(streams)) } @@ -269,7 +274,7 @@ impl Discovery for ConcurrentDiscovery { } } - let streams = futures_buffered::MergeBounded::from_iter(streams); + let streams = n0_future::MergeBounded::from_iter(streams); Some(Box::pin(streams)) } } @@ -290,7 +295,7 @@ impl DiscoveryTask { ensure!(ep.discovery().is_some(), "No discovery services configured"); let (on_first_tx, on_first_rx) = oneshot::channel(); let me = ep.node_id(); - let task = tokio::task::spawn( + let task = task::spawn( async move { Self::run(ep, node_id, on_first_tx).await }.instrument( error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()), ), @@ -322,11 +327,11 @@ impl DiscoveryTask { let (on_first_tx, on_first_rx) = oneshot::channel(); let ep = ep.clone(); let me = ep.node_id(); - let task = tokio::task::spawn( + let task = task::spawn( async move { // If delay is set, wait and recheck if discovery is needed. If not, early-exit. if let Some(delay) = delay { - tokio::time::sleep(delay).await; + time::sleep(delay).await; if !Self::needs_discovery(&ep, node_id) { debug!("no discovery needed, abort"); on_first_tx.send(Ok(())).ok(); @@ -423,12 +428,6 @@ impl DiscoveryTask { } } -impl Drop for DiscoveryTask { - fn drop(&mut self) { - self.task.abort(); - } -} - #[cfg(test)] mod tests { use std::{ @@ -526,7 +525,7 @@ mod tests { }; let delay = self.delay; let fut = async move { - tokio::time::sleep(delay).await; + time::sleep(delay).await; tracing::debug!( "resolve on {}: {} = {item:?}", endpoint.node_id().fmt_short(), @@ -534,9 +533,9 @@ mod tests { ); Ok(item) }; - futures_lite::stream::once_future(fut).boxed() + n0_future::stream::once_future(fut).boxed() } - None => futures_lite::stream::empty().boxed(), + None => n0_future::stream::empty().boxed(), }; Some(stream) } @@ -552,7 +551,7 @@ mod tests { _endpoint: Endpoint, _node_id: NodeId, ) -> Option>> { - Some(futures_lite::stream::empty().boxed()) + Some(n0_future::stream::empty().boxed()) } } @@ -751,16 +750,16 @@ mod tests { /// publish to. The DNS and pkarr servers share their state. #[cfg(test)] mod test_dns_pkarr { - use std::time::Duration; - use anyhow::Result; use iroh_base::{NodeAddr, SecretKey}; use iroh_relay::RelayMap; + use n0_future::time::Duration; use tokio_util::task::AbortOnDropHandle; use crate::{ discovery::pkarr::PkarrPublisher, - dns::{node_info::NodeInfo, ResolverExt}, + dns::ResolverExt, + node_info::NodeInfo, test_utils::{ dns_server::{create_dns_resolver, run_dns_server}, pkarr_dns_state::State, diff --git a/iroh/src/discovery/dns.rs b/iroh/src/discovery/dns.rs index 9e45ea201d..5167caf387 100644 --- a/iroh/src/discovery/dns.rs +++ b/iroh/src/discovery/dns.rs @@ -1,8 +1,8 @@ //! DNS node discovery for iroh use anyhow::Result; -use futures_lite::stream::Boxed as BoxStream; use iroh_base::NodeId; +use n0_future::boxed::BoxStream; use crate::{ discovery::{Discovery, DiscoveryItem}, @@ -81,7 +81,7 @@ impl Discovery for DnsDiscovery { last_updated: None, }) }; - let stream = futures_lite::stream::once_future(fut); + let stream = n0_future::stream::once_future(fut); Some(Box::pin(stream)) } } diff --git a/iroh/src/discovery/local_swarm_discovery.rs b/iroh/src/discovery/local_swarm_discovery.rs index b4975807c4..d639329f0c 100644 --- a/iroh/src/discovery/local_swarm_discovery.rs +++ b/iroh/src/discovery/local_swarm_discovery.rs @@ -33,23 +33,19 @@ use std::{ collections::{BTreeSet, HashMap}, net::{IpAddr, SocketAddr}, - time::Duration, }; use anyhow::Result; use derive_more::FromStr; -use futures_lite::stream::Boxed as BoxStream; -use futures_util::FutureExt; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl}; -use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer}; -use tokio::{ - sync::mpsc::{ - error::TrySendError, - {self}, - }, - task::JoinSet, +use n0_future::{ + boxed::BoxStream, + task::{self, AbortOnDropHandle, JoinSet}, + time::{self, Duration}, + FutureExt, }; -use tokio_util::task::AbortOnDropHandle; +use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer}; +use tokio::sync::mpsc::{self, error::TrySendError}; use tracing::{debug, error, info_span, trace, warn, Instrument}; use crate::{ @@ -270,7 +266,7 @@ impl LocalSwarmDiscovery { } let timeout_sender = task_sender.clone(); timeouts.spawn(async move { - tokio::time::sleep(DISCOVERY_DURATION).await; + time::sleep(DISCOVERY_DURATION).await; trace!(?node_id, "discovery timeout"); timeout_sender .send(Message::Timeout(node_id, id)) @@ -294,7 +290,7 @@ impl LocalSwarmDiscovery { } } }; - let handle = tokio::spawn(discovery_fut.instrument(info_span!("swarm-discovery.actor"))); + let handle = task::spawn(discovery_fut.instrument(info_span!("swarm-discovery.actor"))); Ok(Self { handle: AbortOnDropHandle::new(handle), sender: send, @@ -402,8 +398,8 @@ mod tests { /// This module's name signals nextest to run test in a single thread (no other concurrent /// tests) mod run_in_isolation { - use futures_lite::StreamExt; use iroh_base::SecretKey; + use n0_future::StreamExt; use testresult::TestResult; use super::super::*; diff --git a/iroh/src/discovery/pkarr.rs b/iroh/src/discovery/pkarr.rs index 80fcf0425d..0e24d05f9b 100644 --- a/iroh/src/discovery/pkarr.rs +++ b/iroh/src/discovery/pkarr.rs @@ -47,20 +47,20 @@ use std::{collections::BTreeSet, net::SocketAddr, sync::Arc}; use anyhow::{anyhow, bail, Result}; -use futures_util::stream::BoxStream; use iroh_base::{NodeId, RelayUrl, SecretKey}; -use pkarr::SignedPacket; -use tokio::{ - task::JoinHandle, - time::{Duration, Instant}, +use n0_future::{ + boxed::BoxStream, + task::{self, AbortOnDropHandle}, + time::{self, Duration, Instant}, }; +use pkarr::SignedPacket; use tracing::{debug, error_span, info, warn, Instrument}; use url::Url; use crate::{ discovery::{Discovery, DiscoveryItem}, - dns::node_info::NodeInfo, endpoint::force_staging_infra, + node_info::NodeInfo, watchable::{Disconnected, Watchable, Watcher}, Endpoint, }; @@ -116,7 +116,7 @@ pub const DEFAULT_REPUBLISH_INTERVAL: Duration = Duration::from_secs(60 * 5); pub struct PkarrPublisher { node_id: NodeId, watchable: Watchable>, - join_handle: Arc>, + _drop_guard: Arc>, } impl PkarrPublisher { @@ -145,7 +145,7 @@ impl PkarrPublisher { secret_key: SecretKey, pkarr_relay: Url, ttl: u32, - republish_interval: std::time::Duration, + republish_interval: Duration, ) -> Self { debug!("creating pkarr publisher that publishes to {pkarr_relay}"); let node_id = secret_key.public(); @@ -158,7 +158,7 @@ impl PkarrPublisher { pkarr_client, republish_interval, }; - let join_handle = tokio::task::spawn( + let join_handle = task::spawn( service .run() .instrument(error_span!("pkarr_publish", me=%node_id.fmt_short())), @@ -166,7 +166,7 @@ impl PkarrPublisher { Self { watchable, node_id, - join_handle: Arc::new(join_handle), + _drop_guard: Arc::new(AbortOnDropHandle::new(join_handle)), } } @@ -212,15 +212,6 @@ impl Discovery for PkarrPublisher { } } -impl Drop for PkarrPublisher { - fn drop(&mut self) { - // this means we're dropping the last reference - if let Some(handle) = Arc::get_mut(&mut self.join_handle) { - handle.abort(); - } - } -} - /// Publish node info to a pkarr relay. #[derive(derive_more::Debug, Clone)] struct PublisherService { @@ -236,8 +227,8 @@ struct PublisherService { impl PublisherService { async fn run(mut self) { let mut failed_attempts = 0; - let republish = tokio::time::sleep(Duration::MAX); - tokio::pin!(republish); + let republish = time::sleep(Duration::MAX); + n0_future::pin!(republish); loop { let Ok(info) = self.watcher.get() else { break; // disconnected @@ -336,11 +327,7 @@ impl PkarrResolver { } impl Discovery for PkarrResolver { - fn resolve( - &self, - _ep: Endpoint, - node_id: NodeId, - ) -> Option>> { + fn resolve(&self, _ep: Endpoint, node_id: NodeId) -> Option>> { let pkarr_client = self.pkarr_client.clone(); let fut = async move { let signed_packet = pkarr_client.resolve(node_id).await?; @@ -352,7 +339,7 @@ impl Discovery for PkarrResolver { }; Ok(item) }; - let stream = futures_lite::stream::once_future(fut); + let stream = n0_future::stream::once_future(fut); Some(Box::pin(stream)) } } @@ -377,7 +364,8 @@ impl PkarrRelayClient { /// Resolves a [`SignedPacket`] for the given [`NodeId`]. pub async fn resolve(&self, node_id: NodeId) -> anyhow::Result { - let public_key = pkarr::PublicKey::try_from(node_id.as_bytes())?; + let public_key = pkarr::PublicKey::try_from(node_id.as_bytes()) + .map_err(|e| anyhow::anyhow!(e.to_string()))?; let mut url = self.pkarr_relay_url.clone(); url.path_segments_mut() .map_err(|_| anyhow!("Failed to resolve: Invalid relay URL"))? @@ -393,7 +381,8 @@ impl PkarrRelayClient { } let payload = response.bytes().await?; - Ok(SignedPacket::from_relay_payload(&public_key, &payload)?) + Ok(SignedPacket::from_relay_payload(&public_key, &payload) + .map_err(|e| anyhow::anyhow!(e.to_string()))?) } /// Publishes a [`SignedPacket`]. diff --git a/iroh/src/discovery/pkarr/dht.rs b/iroh/src/discovery/pkarr/dht.rs index 2729a57270..01bb41dd26 100644 --- a/iroh/src/discovery/pkarr/dht.rs +++ b/iroh/src/discovery/pkarr/dht.rs @@ -9,20 +9,21 @@ use std::{ collections::BTreeSet, net::SocketAddr, sync::{Arc, Mutex}, - time::Duration, }; use anyhow::Result; -use futures_lite::{ - stream::{Boxed, StreamExt}, +use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey}; +use n0_future::{ + boxed::BoxStream, + stream::StreamExt, + task::{self, AbortOnDropHandle}, + time::{self, Duration}, FutureExt, }; -use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey}; use pkarr::{ PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, RelaySettings, SignedPacket, }; -use tokio_util::task::AbortOnDropHandle; use url::Url; use crate::{ @@ -30,7 +31,7 @@ use crate::{ pkarr::{DEFAULT_PKARR_TTL, N0_DNS_PKARR_RELAY_PROD}, Discovery, DiscoveryItem, }, - dns::node_info::NodeInfo, + node_info::NodeInfo, Endpoint, }; @@ -304,7 +305,7 @@ impl DhtDiscovery { .to_z32(); // initial delay. If the task gets aborted before this delay is over, // we have not published anything to the DHT yet. - tokio::time::sleep(this.0.initial_publish_delay).await; + time::sleep(this.0.initial_publish_delay).await; loop { // publish to the DHT if enabled let dht_publish = async { @@ -345,7 +346,7 @@ impl DhtDiscovery { }; // do both at the same time tokio::join!(relay_publish, dht_publish); - tokio::time::sleep(this.0.republish_delay).await; + time::sleep(this.0.republish_delay).await; } } } @@ -371,7 +372,7 @@ impl Discovery for DhtDiscovery { return; }; let this = self.clone(); - let curr = tokio::spawn(this.publish_loop(keypair.clone(), signed_packet)); + let curr = task::spawn(this.publish_loop(keypair.clone(), signed_packet)); let mut task = self.0.task.lock().expect("poisoned"); *task = Some(AbortOnDropHandle::new(curr)); } @@ -380,7 +381,7 @@ impl Discovery for DhtDiscovery { &self, _endpoint: Endpoint, node_id: NodeId, - ) -> Option>> { + ) -> Option>> { let pkarr_public_key = pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key"); tracing::info!("resolving {} as {}", node_id, pkarr_public_key.to_z32()); diff --git a/iroh/src/discovery/static_provider.rs b/iroh/src/discovery/static_provider.rs index 77757ef3d9..a7ea2c337e 100644 --- a/iroh/src/discovery/static_provider.rs +++ b/iroh/src/discovery/static_provider.rs @@ -14,11 +14,14 @@ use std::{ collections::{btree_map::Entry, BTreeMap, BTreeSet}, net::SocketAddr, sync::{Arc, RwLock}, - time::SystemTime, }; -use futures_lite::stream::{self, StreamExt}; use iroh_base::{NodeAddr, NodeId, RelayUrl}; +use n0_future::{ + boxed::BoxStream, + stream::{self, StreamExt}, + time::SystemTime, +}; use super::{Discovery, DiscoveryItem}; @@ -206,7 +209,7 @@ impl Discovery for StaticProvider { &self, _endpoint: crate::Endpoint, node_id: NodeId, - ) -> Option>> { + ) -> Option>> { let guard = self.nodes.read().expect("poisoned"); let info = guard.get(&node_id); match info { diff --git a/iroh/src/dns.rs b/iroh/src/dns.rs index 5aa09b1a5e..a946f16f09 100644 --- a/iroh/src/dns.rs +++ b/iroh/src/dns.rs @@ -9,15 +9,17 @@ use std::{ fmt::Write, net::{IpAddr, Ipv6Addr}, sync::OnceLock, - time::Duration, }; use anyhow::Result; -use futures_lite::{Future, StreamExt}; use hickory_resolver::{IntoName, Resolver, TokioResolver}; use iroh_base::{NodeAddr, NodeId}; +use n0_future::{ + time::{self, Duration}, + Future, StreamExt, +}; -pub mod node_info; +use crate::node_info; /// The DNS resolver type used throughout `iroh`. pub type DnsResolver = TokioResolver; @@ -187,7 +189,7 @@ impl ResolverExt for DnsResolver { host: N, timeout: Duration, ) -> Result> { - let addrs = tokio::time::timeout(timeout, self.ipv4_lookup(host)).await??; + let addrs = time::timeout(timeout, self.ipv4_lookup(host)).await??; Ok(addrs.into_iter().map(|ip| IpAddr::V4(ip.0))) } @@ -196,7 +198,7 @@ impl ResolverExt for DnsResolver { host: N, timeout: Duration, ) -> Result> { - let addrs = tokio::time::timeout(timeout, self.ipv6_lookup(host)).await??; + let addrs = time::timeout(timeout, self.ipv6_lookup(host)).await??; Ok(addrs.into_iter().map(|ip| IpAddr::V6(ip.0))) } @@ -347,14 +349,14 @@ async fn stagger_call Fut, Fut: Future>>( f: F, delays_ms: &[u64], ) -> Result { - let mut calls = futures_buffered::FuturesUnorderedBounded::new(delays_ms.len() + 1); + let mut calls = n0_future::FuturesUnorderedBounded::new(delays_ms.len() + 1); // NOTE: we add the 0 delay here to have a uniform set of futures. This is more performant than // using alternatives that allow futures of different types. for delay in std::iter::once(&0u64).chain(delays_ms) { - let delay = std::time::Duration::from_millis(*delay); + let delay = Duration::from_millis(*delay); let fut = f(); let staggered_fut = async move { - tokio::time::sleep(delay).await; + time::sleep(delay).await; fut.await }; calls.push(staggered_fut) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 960859a2aa..edec1b4fd4 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -19,21 +19,26 @@ use std::{ pin::Pin, sync::Arc, task::Poll, - time::Duration, }; use anyhow::{bail, Context, Result}; use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey}; use iroh_relay::RelayMap; +use n0_future::time::Duration; use pin_project::pin_project; use tracing::{debug, instrument, trace, warn}; use url::Url; +#[cfg(not(wasm_browser))] +use crate::{ + discovery::dns::DnsDiscovery, + dns::{default_resolver, DnsResolver}, +}; use crate::{ discovery::{ - dns::DnsDiscovery, pkarr::PkarrPublisher, ConcurrentDiscovery, Discovery, DiscoveryTask, + pkarr::{PkarrPublisher, PkarrResolver}, + ConcurrentDiscovery, Discovery, DiscoveryTask, }, - dns::{default_resolver, DnsResolver}, magicsock::{self, Handle, QuicMappedAddr}, tls, watchable::Watcher, @@ -102,6 +107,7 @@ pub struct Builder { proxy_url: Option, /// List of known nodes. See [`Builder::known_nodes`]. node_map: Option>, + #[cfg(not(wasm_browser))] dns_resolver: Option, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: bool, @@ -124,6 +130,7 @@ impl Default for Builder { discovery: Default::default(), proxy_url: None, node_map: None, + #[cfg(not(wasm_browser))] dns_resolver: None, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -152,6 +159,7 @@ impl Builder { keylog: self.keylog, secret_key: secret_key.clone(), }; + #[cfg(not(wasm_browser))] let dns_resolver = self .dns_resolver .unwrap_or_else(|| default_resolver().clone()); @@ -173,6 +181,7 @@ impl Builder { node_map: self.node_map, discovery, proxy_url: self.proxy_url, + #[cfg(not(wasm_browser))] dns_resolver, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, @@ -320,8 +329,18 @@ impl Builder { self.discovery.push(Box::new(|secret_key| { Some(Box::new(PkarrPublisher::n0_dns(secret_key.clone()))) })); - self.discovery - .push(Box::new(|_| Some(Box::new(DnsDiscovery::n0_dns())))); + // Resolve using HTTPS requests to our DNS server's /pkarr path in browsers + #[cfg(wasm_browser)] + { + self.discovery + .push(Box::new(|_| Some(Box::new(PkarrResolver::n0_dns())))); + } + // Resolve using DNS queries outside browsers. + #[cfg(not(wasm_browser))] + { + self.discovery + .push(Box::new(|_| Some(Box::new(DnsDiscovery::n0_dns())))); + } self } @@ -398,6 +417,7 @@ impl Builder { /// By default, all endpoints share a DNS resolver, which is configured to use the /// host system's DNS configuration. You can pass a custom instance of [`DnsResolver`] /// here to use a differently configured DNS resolver for this endpoint. + #[cfg(not(wasm_browser))] pub fn dns_resolver(mut self, dns_resolver: DnsResolver) -> Self { self.dns_resolver = Some(dns_resolver); self @@ -560,7 +580,10 @@ impl Endpoint { endpoint_config, Some(server_config), Arc::new(msock.clone()), + #[cfg(not(wasm_browser))] Arc::new(quinn::TokioRuntime), + #[cfg(wasm_browser)] + Arc::new(crate::web_runtime::WebRuntime), )?; trace!("created quinn endpoint"); debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created"); @@ -904,6 +927,7 @@ impl Endpoint { /// /// The [`Endpoint`] always binds on an IPv4 address and also tries to bind on an IPv6 /// address if available. + #[cfg(not(wasm_browser))] pub fn bound_sockets(&self) -> (SocketAddr, Option) { self.msock.local_addr() } @@ -975,6 +999,7 @@ impl Endpoint { /// Returns the DNS resolver used in this [`Endpoint`]. /// /// See [`Builder::dns_resolver`]. + #[cfg(not(wasm_browser))] pub fn dns_resolver(&self) -> &DnsResolver { self.msock.dns_resolver() } @@ -1747,8 +1772,8 @@ mod tests { use std::time::Instant; - use futures_lite::StreamExt; use iroh_test::CallOnDrop; + use n0_future::StreamExt; use rand::SeedableRng; use tracing::{error_span, info, info_span, Instrument}; @@ -1862,7 +1887,7 @@ mod tests { let (server, client) = tokio::time::timeout( Duration::from_secs(30), - futures_lite::future::zip(server, client), + n0_future::future::zip(server, client), ) .await .expect("timeout"); diff --git a/iroh/src/endpoint/rtt_actor.rs b/iroh/src/endpoint/rtt_actor.rs index e9b0b1aecb..6fe4e0ac38 100644 --- a/iroh/src/endpoint/rtt_actor.rs +++ b/iroh/src/endpoint/rtt_actor.rs @@ -2,12 +2,13 @@ use std::{pin::Pin, task::Poll}; -use futures_buffered::MergeUnbounded; -use futures_lite::{Stream, StreamExt}; use iroh_base::NodeId; use iroh_metrics::inc; +use n0_future::{ + task::{self, AbortOnDropHandle}, + MergeUnbounded, Stream, StreamExt, +}; use tokio::sync::mpsc; -use tokio_util::task::AbortOnDropHandle; use tracing::{debug, info_span, Instrument}; use crate::{magicsock::ConnectionType, metrics::MagicsockMetrics, watchable::WatcherStream}; @@ -25,7 +26,7 @@ impl RttHandle { connection_events: Default::default(), }; let (msg_tx, msg_rx) = mpsc::channel(16); - let handle = tokio::spawn( + let handle = task::spawn( async move { actor.run(msg_rx).await; } diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index b844020fc0..217434107e 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -229,23 +229,28 @@ //! [`Connection::accept_bi`]: crate::endpoint::Connection::accept_bi #![recursion_limit = "256"] -#![deny(missing_docs, rustdoc::broken_intra_doc_links)] +// #![deny(missing_docs, rustdoc::broken_intra_doc_links)] +#![allow(unused)] #![cfg_attr(not(test), deny(clippy::unwrap_used))] #![cfg_attr(iroh_docsrs, feature(doc_auto_cfg))] mod disco; mod key; mod magicsock; +mod tls; pub(crate) mod util; +#[cfg(wasm_browser)] +pub(crate) mod web_runtime; pub mod defaults; pub mod discovery; +#[cfg(not(wasm_browser))] pub mod dns; pub mod endpoint; pub mod metrics; +pub mod node_info; pub mod protocol; -mod tls; pub mod watchable; pub use endpoint::{Endpoint, RelayMode}; diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index a0af9b25f7..6d94d8f81a 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -26,28 +26,29 @@ use std::{ Arc, RwLock, }, task::{Context, Poll, Waker}, - time::{Duration, Instant}, }; use anyhow::{anyhow, Context as _, Result}; +use atomic_waker::AtomicWaker; use bytes::Bytes; use concurrent_queue::ConcurrentQueue; use data_encoding::HEXLOWER; -use futures_lite::{FutureExt, StreamExt}; -use futures_util::{stream::BoxStream, task::AtomicWaker}; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey}; use iroh_metrics::{inc, inc_by}; use iroh_relay::{protos::stun, RelayMap}; +use n0_future::{ + boxed::BoxStream, + task::{self, JoinSet}, + time::{self, Duration, Instant}, + FutureExt, StreamExt, +}; +#[cfg(not(wasm_browser))] use netwatch::{interfaces, ip::LocalAddresses, netmon, UdpSocket}; use quinn::AsyncUdpSocket; use rand::{seq::SliceRandom, Rng, SeedableRng}; use relay_actor::RelaySendItem; use smallvec::{smallvec, SmallVec}; -use tokio::{ - sync::{self, mpsc, Mutex}, - task::JoinSet, - time, -}; +use tokio::sync::{self, mpsc, Mutex}; use tokio_util::sync::CancellationToken; use tracing::{ debug, error, error_span, event, info, info_span, instrument, trace, trace_span, warn, @@ -55,19 +56,21 @@ use tracing::{ }; use url::Url; +#[cfg(not(wasm_browser))] +use self::udp_conn::UdpConn; use self::{ metrics::Metrics as MagicsockMetrics, node_map::{NodeMap, PingAction, PingRole, SendPing}, relay_actor::{RelayActor, RelayActorMessage, RelayRecvDatagram}, - udp_conn::UdpConn, }; +#[cfg(not(wasm_browser))] +use crate::dns::DnsResolver; #[cfg(any(test, feature = "test-utils"))] use crate::endpoint::PathSelection; use crate::{ defaults::timeouts::NET_REPORT_TIMEOUT, disco::{self, CallMeMaybe, SendAddr}, discovery::{Discovery, DiscoveryItem}, - dns::DnsResolver, key::{public_ed_box, secret_ed_box, DecryptionError, SharedSecret}, watchable::{Watchable, Watcher}, }; @@ -75,6 +78,7 @@ use crate::{ mod metrics; mod node_map; mod relay_actor; +#[cfg(not(wasm_browser))] mod udp_conn; pub use node_map::Source; @@ -118,6 +122,7 @@ pub(crate) struct Options { /// /// You can use [`crate::dns::default_resolver`] for a resolver that uses the system's DNS /// configuration. + #[cfg(not(wasm_browser))] pub(crate) dns_resolver: DnsResolver, /// Proxy configuration. @@ -144,6 +149,7 @@ impl Default for Options { node_map: None, discovery: None, proxy_url: None, + #[cfg(not(wasm_browser))] dns_resolver: crate::dns::default_resolver().clone(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -197,6 +203,7 @@ pub(crate) struct MagicSock { poll_recv_counter: AtomicUsize, /// The DNS resolver to be used in this magicsock. + #[cfg(not(wasm_browser))] dns_resolver: DnsResolver, /// Key for this node. @@ -205,6 +212,7 @@ pub(crate) struct MagicSock { secret_encryption_key: crypto_box::SecretKey, /// Cached version of the Ipv4 and Ipv6 addrs of the current connection. + #[cfg(not(wasm_browser))] local_addrs: std::sync::RwLock<(SocketAddr, Option)>, /// Preferred port from `Options::port`; 0 means auto. @@ -224,8 +232,10 @@ pub(crate) struct MagicSock { /// Tracks the networkmap node entity for each node discovery key. node_map: NodeMap, /// UDP IPv4 socket + #[cfg(not(wasm_browser))] pconn4: UdpConn, /// UDP IPv6 socket + #[cfg(not(wasm_browser))] pconn6: Option, /// NetReport client net_reporter: net_report::Addr, @@ -293,6 +303,7 @@ impl MagicSock { } /// Get the cached version of the Ipv4 and Ipv6 addrs of the current connection. + #[cfg(not(wasm_browser))] pub(crate) fn local_addr(&self) -> (SocketAddr, Option) { *self.local_addrs.read().expect("not poisoned") } @@ -393,6 +404,7 @@ impl MagicSock { } /// Get a reference to the DNS resolver used in this [`MagicSock`]. + #[cfg(not(wasm_browser))] pub(crate) fn dns_resolver(&self) -> &DnsResolver { &self.dns_resolver } @@ -418,6 +430,7 @@ impl MagicSock { .ok(); } + #[cfg(not(wasm_browser))] #[cfg_attr(windows, allow(dead_code))] fn normalized_local_addr(&self) -> io::Result { let (v4, v6) = self.local_addr(); @@ -440,11 +453,15 @@ impl MagicSock { // Right now however we have one single poller behaving the same for each // connection. It checks all paths and returns Poll::Ready as soon as any path is // ready. + #[cfg(not(wasm_browser))] let ipv4_poller = self.pconn4.create_io_poller(); + #[cfg(not(wasm_browser))] let ipv6_poller = self.pconn6.as_ref().map(|sock| sock.create_io_poller()); let relay_sender = self.relay_datagram_send_channel.clone(); Box::pin(IoPoller { + #[cfg(not(wasm_browser))] ipv4_poller, + #[cfg(not(wasm_browser))] ipv6_poller, relay_sender, }) @@ -493,11 +510,12 @@ impl MagicSock { } let mut udp_sent = false; - let mut udp_error = None; + let mut udp_error: Option = None; let mut relay_sent = false; let mut relay_error = None; // send udp + #[cfg(not(wasm_browser))] if let Some(addr) = udp_addr { // rewrite target address transmit.destination = addr; @@ -533,10 +551,13 @@ impl MagicSock { } } + #[cfg(not(wasm_browser))] let udp_pending = udp_error .as_ref() .map(|err| err.kind() == io::ErrorKind::WouldBlock) .unwrap_or_default(); + #[cfg(wasm_browser)] + let udp_pending = false; let relay_pending = relay_error .as_ref() .map(|err| err.kind() == io::ErrorKind::WouldBlock) @@ -633,6 +654,7 @@ impl MagicSock { } } + #[cfg(not(wasm_browser))] fn try_send_udp(&self, addr: SocketAddr, transmit: &quinn_udp::Transmit) -> io::Result<()> { let conn = self.conn_for_addr(addr)?; conn.try_send(transmit)?; @@ -645,6 +667,7 @@ impl MagicSock { Ok(()) } + #[cfg(not(wasm_browser))] fn conn_for_addr(&self, addr: SocketAddr) -> io::Result<&UdpConn> { let sock = match addr { SocketAddr::V4(_) => &self.pconn4, @@ -672,6 +695,7 @@ impl MagicSock { // Three macros to help polling: they return if they get a result, execution // continues if they were Pending and we need to poll others (or finally return // Pending). + #[cfg(not(wasm_browser))] macro_rules! poll_ipv4 { () => { match self.pconn4.poll_recv(cx, bufs, metas)? { @@ -683,6 +707,7 @@ impl MagicSock { } }; } + #[cfg(not(wasm_browser))] macro_rules! poll_ipv6 { () => { if let Some(ref pconn) = self.pconn6 { @@ -705,7 +730,9 @@ impl MagicSock { }; } + #[cfg(not(wasm_browser))] let counter = self.poll_recv_counter.fetch_add(1, Ordering::Relaxed); + #[cfg(not(wasm_browser))] match counter % 3 { 0 => { // order of polling: UDPv4, UDPv6, relay @@ -729,6 +756,11 @@ impl MagicSock { Poll::Pending } } + #[cfg(wasm_browser)] + { + poll_relay!(); + Poll::Pending + } } /// Process datagrams received from UDP sockets. @@ -737,6 +769,7 @@ impl MagicSock { /// /// This fixes up the datagrams to use the correct [`QuicMappedAddr`] and extracts DISCO /// packets, processing them inside the magic socket. + #[cfg(not(wasm_browser))] fn process_udp_datagrams( &self, from_ipv4: bool, @@ -940,11 +973,11 @@ impl MagicSock { let quic_mapped_addr = self.node_map.receive_relay(&dm.url, dm.src); // Normalize local_ip - #[cfg(not(windows))] + #[cfg(not(any(windows, wasm_browser)))] let dst_ip = self.normalized_local_addr().ok().map(|addr| addr.ip()); // Reasoning for this here: // https://github.com/n0-computer/iroh/pull/2595#issuecomment-2290947319 - #[cfg(windows)] + #[cfg(any(windows, wasm_browser))] let dst_ip = None; let meta = quinn_udp::RecvMeta { @@ -1139,10 +1172,16 @@ impl MagicSock { node_key: self.public_key(), }); let sent = match dst { + #[cfg(not(wasm_browser))] SendAddr::Udp(addr) => self .udp_disco_sender .try_send((addr, dst_node, msg)) .is_ok(), + #[cfg(wasm_browser)] + SendAddr::Udp(_) => { + // Ignoring sending pings over UDP. We don't have a UDP socket. + return; + } SendAddr::Relay(ref url) => self.send_disco_message_relay(url, dst_node, msg), }; if sent { @@ -1208,9 +1247,14 @@ impl MagicSock { msg: disco::Message, ) -> io::Result<()> { match dst { + #[cfg(not(wasm_browser))] SendAddr::Udp(addr) => { self.try_send_disco_message_udp(addr, dst_key, &msg)?; } + #[cfg(wasm_browser)] + SendAddr::Udp(addr) => { + error!(?addr, "Asked to send on UDP in browser code"); + } SendAddr::Relay(ref url) => { if !self.send_disco_message_relay(url, dst_key, msg) { return Err(io::Error::new(io::ErrorKind::Other, "Relay channel full")); @@ -1243,13 +1287,14 @@ impl MagicSock { } } + #[cfg(not(wasm_browser))] async fn send_disco_message_udp( &self, dst: SocketAddr, dst_node: NodeId, msg: &disco::Message, ) -> io::Result<()> { - futures_lite::future::poll_fn(move |cx| { + n0_future::future::poll_fn(move |cx| { loop { match self.try_send_disco_message_udp(dst, dst_node, msg) { Ok(()) => return Poll::Ready(Ok(())), @@ -1269,6 +1314,7 @@ impl MagicSock { .await } + #[cfg(not(wasm_browser))] fn try_send_disco_message_udp( &self, dst: SocketAddr, @@ -1508,6 +1554,7 @@ impl Handle { } async fn with_name(me: String, opts: Options) -> Result { + #[cfg(not(wasm_browser))] let port_mapper = portmapper::Client::default(); let Options { @@ -1517,6 +1564,7 @@ impl Handle { relay_map, node_map, discovery, + #[cfg(not(wasm_browser))] dns_resolver, proxy_url, #[cfg(any(test, feature = "test-utils"))] @@ -1525,30 +1573,42 @@ impl Handle { path_selection, } = opts; - let relay_datagram_recv_queue = Arc::new(RelayDatagramRecvQueue::new()); - + #[cfg(not(wasm_browser))] let (pconn4, pconn6) = bind(addr_v4, addr_v6)?; + #[cfg(not(wasm_browser))] let port = pconn4.port(); + #[cfg(wasm_browser)] + let port = 0; // NOTE: we can end up with a zero port if `std::net::UdpSocket::socket_addr` fails + #[cfg(not(wasm_browser))] match port.try_into() { Ok(non_zero_port) => { port_mapper.update_local_port(non_zero_port); } Err(_zero_port) => debug!("Skipping port mapping with zero local port"), } + #[cfg(not(wasm_browser))] let ipv4_addr = pconn4.local_addr()?; + #[cfg(not(wasm_browser))] let ipv6_addr = pconn6.as_ref().and_then(|c| c.local_addr().ok()); - let net_reporter = - net_report::Client::new(Some(port_mapper.clone()), dns_resolver.clone())?; + let net_reporter = net_report::Client::new( + #[cfg(not(wasm_browser))] + Some(port_mapper.clone()), + #[cfg(not(wasm_browser))] + dns_resolver.clone(), + )?; + #[cfg(not(wasm_browser))] let pconn4_sock = pconn4.as_socket(); + #[cfg(not(wasm_browser))] let pconn6_sock = pconn6.as_ref().map(|p| p.as_socket()); let (actor_sender, actor_receiver) = mpsc::channel(256); let (relay_actor_sender, relay_actor_receiver) = mpsc::channel(256); let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_send_channel(); + let relay_datagram_recv_queue = Arc::new(RelayDatagramRecvQueue::new()); let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256); // load the node data @@ -1566,6 +1626,7 @@ impl Handle { secret_key, secret_encryption_key, proxy_url, + #[cfg(not(wasm_browser))] local_addrs: std::sync::RwLock::new((ipv4_addr, ipv6_addr)), closing: AtomicBool::new(false), closed: AtomicBool::new(false), @@ -1577,7 +1638,9 @@ impl Handle { relay_map, my_relay: Default::default(), net_reporter: net_reporter.addr(), + #[cfg(not(wasm_browser))] pconn4, + #[cfg(not(wasm_browser))] pconn6, disco_secrets: DiscoSecrets::default(), node_map, @@ -1586,6 +1649,7 @@ impl Handle { direct_addrs: Default::default(), pending_call_me_maybes: Default::default(), direct_addr_update_state: DirectAddrUpdateState::new(), + #[cfg(not(wasm_browser))] dns_resolver, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify, @@ -1604,16 +1668,22 @@ impl Handle { .instrument(info_span!("relay-actor")), ); - let inner2 = inner.clone(); - actor_tasks.spawn(async move { - while let Some((dst, dst_key, msg)) = udp_disco_receiver.recv().await { - if let Err(err) = inner2.send_disco_message_udp(dst, dst_key, &msg).await { - warn!(%dst, node = %dst_key.fmt_short(), ?err, "failed to send disco message (UDP)"); + #[cfg(not(wasm_browser))] + { + actor_tasks.spawn({ + let inner = inner.clone(); + async move { + while let Some((dst, dst_key, msg)) = udp_disco_receiver.recv().await { + if let Err(err) = inner.send_disco_message_udp(dst, dst_key, &msg).await { + warn!(%dst, node = %dst_key.fmt_short(), ?err, "failed to send disco message (UDP)"); + } + } } - } - }); + }); + } let inner2 = inner.clone(); + #[cfg(not(wasm_browser))] let network_monitor = netmon::Monitor::new().await?; actor_tasks.spawn( async move { @@ -1625,11 +1695,15 @@ impl Handle { msock: inner2, periodic_re_stun_timer: new_re_stun_timer(false), net_info_last: None, + #[cfg(not(wasm_browser))] port_mapper, + #[cfg(not(wasm_browser))] pconn4: pconn4_sock, + #[cfg(not(wasm_browser))] pconn6: pconn6_sock, no_v4_send: false, net_reporter, + #[cfg(not(wasm_browser))] network_monitor, }; @@ -1907,6 +1981,7 @@ impl AsyncUdpSocket for Handle { } fn local_addr(&self) -> io::Result { + #[cfg(not(wasm_browser))] match &*self.msock.local_addrs.read().expect("not poisoned") { (ipv4, None) => { // Pretend to be IPv6, because our QuinnMappedAddrs @@ -1919,8 +1994,12 @@ impl AsyncUdpSocket for Handle { } (_, Some(ipv6)) => Ok(*ipv6), } + // Again, we need to pretend we're IPv6, because of our QuinnMappedAddrs. + #[cfg(wasm_browser)] + return Ok(SocketAddr::new(std::net::Ipv6Addr::LOCALHOST.into(), 0)); } + #[cfg(not(wasm_browser))] fn max_transmit_segments(&self) -> usize { if let Some(pconn6) = self.pconn6.as_ref() { std::cmp::min( @@ -1932,6 +2011,12 @@ impl AsyncUdpSocket for Handle { } } + #[cfg(wasm_browser)] + fn max_transmit_segments(&self) -> usize { + 1 + } + + #[cfg(not(wasm_browser))] fn max_receive_segments(&self) -> usize { if let Some(pconn6) = self.pconn6.as_ref() { // `max_receive_segments` controls the size of the `RecvMeta` buffer @@ -1949,6 +2034,12 @@ impl AsyncUdpSocket for Handle { } } + #[cfg(wasm_browser)] + fn max_receive_segments(&self) -> usize { + 1 + } + + #[cfg(not(wasm_browser))] fn may_fragment(&self) -> bool { if let Some(pconn6) = self.pconn6.as_ref() { pconn6.may_fragment() || self.pconn4.may_fragment() @@ -1956,11 +2047,18 @@ impl AsyncUdpSocket for Handle { self.pconn4.may_fragment() } } + + #[cfg(wasm_browser)] + fn may_fragment(&self) -> bool { + false + } } #[derive(Debug)] struct IoPoller { + #[cfg(not(wasm_browser))] ipv4_poller: Pin>, + #[cfg(not(wasm_browser))] ipv6_poller: Option>>, relay_sender: RelayDatagramSendChannelSender, } @@ -1969,10 +2067,12 @@ impl quinn::UdpPoller for IoPoller { fn poll_writable(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { // This version returns Ready as soon as any of them are ready. let this = &mut *self; + #[cfg(not(wasm_browser))] match this.ipv4_poller.as_mut().poll_writable(cx) { Poll::Ready(_) => return Poll::Ready(Ok(())), Poll::Pending => (), } + #[cfg(not(wasm_browser))] if let Some(ref mut ipv6_poller) = this.ipv6_poller { match ipv6_poller.as_mut().poll_writable(cx) { Poll::Ready(_) => return Poll::Ready(Ok(())), @@ -2005,10 +2105,13 @@ struct Actor { net_info_last: Option, // The underlying UDP sockets used to send/rcv packets. + #[cfg(not(wasm_browser))] pconn4: Arc, + #[cfg(not(wasm_browser))] pconn6: Option>, /// The NAT-PMP/PCP/UPnP prober/client, for requesting port mappings from NAT devices. + #[cfg(not(wasm_browser))] port_mapper: portmapper::Client, /// Whether IPv4 UDP is known to be unable to transmit @@ -2019,13 +2122,16 @@ struct Actor { /// The prober that discovers local network conditions, including the closest relay relay and NAT mappings. net_reporter: net_report::Client, + #[cfg(not(wasm_browser))] network_monitor: netmon::Monitor, } impl Actor { async fn run(mut self) -> Result<()> { // Setup network monitoring + #[cfg(not(wasm_browser))] let (link_change_s, mut link_change_r) = mpsc::channel(8); + #[cfg(not(wasm_browser))] let _token = self .network_monitor .subscribe(move |is_major| { @@ -2038,16 +2144,17 @@ impl Actor { .await?; // Let the the heartbeat only start a couple seconds later + #[cfg(not(wasm_browser))] let mut direct_addr_heartbeat_timer = time::interval_at( time::Instant::now() + HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, ); let mut direct_addr_update_receiver = self.msock.direct_addr_update_state.running.subscribe(); + #[cfg(not(wasm_browser))] let mut portmap_watcher = self.port_mapper.watch_external_address(); - let mut discovery_events: BoxStream = - Box::pin(futures_lite::stream::empty()); + let mut discovery_events: BoxStream = Box::pin(n0_future::stream::empty()); if let Some(d) = self.msock.discovery() { if let Some(events) = d.subscribe() { discovery_events = events; @@ -2055,10 +2162,26 @@ impl Actor { } let mut receiver_closed = false; + #[cfg_attr(wasm_browser, allow(unused_mut))] let mut portmap_watcher_closed = false; let mut link_change_closed = false; loop { inc!(Metrics, actor_tick_main); + #[cfg(not(wasm_browser))] + let portmap_watcher_changed = portmap_watcher.changed(); + #[cfg(wasm_browser)] + let portmap_watcher_changed = n0_future::future::pending(); + + #[cfg(not(wasm_browser))] + let direct_addr_heartbeat_timer_tick = direct_addr_heartbeat_timer.tick(); + #[cfg(wasm_browser)] + let direct_addr_heartbeat_timer_tick = n0_future::future::pending(); + + #[cfg(not(wasm_browser))] + let link_change_r_recv = link_change_r.recv(); + #[cfg(wasm_browser)] + let link_change_r_recv = n0_future::future::pending(); + tokio::select! { msg = self.msg_receiver.recv(), if !receiver_closed => { let Some(msg) = msg else { @@ -2080,32 +2203,40 @@ impl Actor { inc!(Metrics, actor_tick_re_stun); self.msock.re_stun("periodic"); } - change = portmap_watcher.changed(), if !portmap_watcher_closed => { - if change.is_err() { - trace!("tick: portmap watcher closed"); - inc!(Metrics, actor_tick_other); + change = portmap_watcher_changed, if !portmap_watcher_closed => { + #[cfg(not(wasm_browser))] + { + if change.is_err() { + trace!("tick: portmap watcher closed"); + inc!(Metrics, actor_tick_other); + + portmap_watcher_closed = true; + continue; + } - portmap_watcher_closed = true; - continue; + trace!("tick: portmap changed"); + inc!(Metrics, actor_tick_portmap_changed); + let new_external_address = *portmap_watcher.borrow(); + debug!("external address updated: {new_external_address:?}"); + self.msock.re_stun("portmap_updated"); } - - trace!("tick: portmap changed"); - inc!(Metrics, actor_tick_portmap_changed); - let new_external_address = *portmap_watcher.borrow(); - debug!("external address updated: {new_external_address:?}"); - self.msock.re_stun("portmap_updated"); + #[cfg(wasm_browser)] + let _unused_in_browsers = change; }, - _ = direct_addr_heartbeat_timer.tick() => { - trace!( - "tick: direct addr heartbeat {} direct addrs", - self.msock.node_map.node_count(), - ); - inc!(Metrics, actor_tick_direct_addr_heartbeat); - // TODO: this might trigger too many packets at once, pace this + _ = direct_addr_heartbeat_timer_tick => { + #[cfg(not(wasm_browser))] + { + trace!( + "tick: direct addr heartbeat {} direct addrs", + self.msock.node_map.node_count(), + ); + inc!(Metrics, actor_tick_direct_addr_heartbeat); + // TODO: this might trigger too many packets at once, pace this - self.msock.node_map.prune_inactive(); - let msgs = self.msock.node_map.nodes_stayin_alive(); - self.handle_ping_actions(msgs).await; + self.msock.node_map.prune_inactive(); + let msgs = self.msock.node_map.nodes_stayin_alive(); + self.handle_ping_actions(msgs).await; + } } _ = direct_addr_update_receiver.changed() => { let reason = *direct_addr_update_receiver.borrow(); @@ -2115,18 +2246,23 @@ impl Actor { self.refresh_direct_addrs(reason).await; } } - is_major = link_change_r.recv(), if !link_change_closed => { - let Some(is_major) = is_major else { - trace!("tick: link change receiver closed"); - inc!(Metrics, actor_tick_other); - - link_change_closed = true; - continue; - }; + is_major = link_change_r_recv, if !link_change_closed => { + #[cfg(not(wasm_browser))] + { + let Some(is_major) = is_major else { + trace!("tick: link change receiver closed"); + inc!(Metrics, actor_tick_other); + + link_change_closed = true; + continue; + }; - trace!("tick: link change {}", is_major); - inc!(Metrics, actor_link_change); - self.handle_network_change(is_major).await; + trace!("tick: link change {}", is_major); + inc!(Metrics, actor_link_change); + self.handle_network_change(is_major).await; + } + #[cfg(wasm_browser)] + let _unused_in_browsers = is_major; } // Even if `discovery_events` yields `None`, it could begin to yield // `Some` again in the future, so we don't want to disable this branch @@ -2145,6 +2281,7 @@ impl Actor { } } + #[cfg(not(wasm_browser))] async fn handle_network_change(&mut self, is_major: bool) { debug!("link change detected: major? {}", is_major); @@ -2186,6 +2323,7 @@ impl Actor { debug!("shutting down"); self.msock.node_map.notify_shutdown(); + #[cfg(not(wasm_browser))] self.port_mapper.deactivate(); self.relay_actor_cancel_token.cancel(); @@ -2210,6 +2348,7 @@ impl Actor { self.finalize_direct_addrs_update(why); } ActorMessage::NetworkChange => { + #[cfg(not(wasm_browser))] self.network_monitor.network_change().await.ok(); } #[cfg(test)] @@ -2233,6 +2372,7 @@ impl Actor { inc!(MagicsockMetrics, update_direct_addrs); debug!("starting direct addr update ({})", why); + #[cfg(not(wasm_browser))] self.port_mapper.procure_mapping(); self.update_net_info(why).await; } @@ -2245,6 +2385,7 @@ impl Actor { /// - The portmapper. /// - A net_report report. /// - The local interfaces IP addresses. + #[cfg(not(wasm_browser))] fn update_direct_addresses(&mut self, net_report_report: Option>) { let portmap_watcher = self.port_mapper.watch_external_address(); @@ -2307,7 +2448,7 @@ impl Actor { // The following code can be slow, we do not want to block the caller since it would // block the actor loop. - tokio::spawn( + task::spawn( async move { // If a socket is bound to the unspecified address, create SocketAddrs for // each local IP address by pairing it with the port the socket is bound on. @@ -2380,7 +2521,10 @@ impl Actor { self.msock.direct_addr_update_state.run(new_why); return; } - self.periodic_re_stun_timer = new_re_stun_timer(true); + #[cfg(not(wasm_browser))] + { + self.periodic_re_stun_timer = new_re_stun_timer(true); + } } self.msock.direct_addr_update_state.finish_run(); @@ -2428,15 +2572,18 @@ impl Actor { } let relay_map = self.msock.relay_map.clone(); + #[cfg(not(wasm_browser))] let opts = net_report::Options::default() .stun_v4(Some(self.pconn4.clone())) .stun_v6(self.pconn6.clone()); + #[cfg(wasm_browser)] + let opts = net_report::Options::default(); debug!("requesting net_report report"); match self.net_reporter.get_report_channel(relay_map, opts).await { Ok(rx) => { let msg_sender = self.msg_sender.clone(); - tokio::task::spawn(async move { + task::spawn(async move { let report = time::timeout(NET_REPORT_TIMEOUT, rx).await; let report: anyhow::Result<_> = match report { Ok(Ok(Ok(report))) => Ok(Some(report)), @@ -2472,11 +2619,16 @@ impl Actor { ); self.no_v4_send = !r.ipv4_can_send; + #[cfg(not(wasm_browser))] let have_port_map = self.port_mapper.watch_external_address().borrow().is_some(); + #[cfg(wasm_browser)] + let have_port_map = false; + let mut ni = NetInfo { relay_latency: Default::default(), mapping_varies_by_dest_ip: r.mapping_varies_by_dest_ip, hair_pinning: r.hair_pinning, + #[cfg(not(wasm_browser))] portmap_probe: r.portmap_probe.clone(), have_port_map, working_ipv6: Some(r.ipv6), @@ -2507,6 +2659,7 @@ impl Actor { // TODO: set link type self.call_net_info_callback(ni).await; } + #[cfg(not(wasm_browser))] self.update_direct_addresses(report); } @@ -2572,6 +2725,7 @@ impl Actor { /// The relay connections who's local endpoints no longer exist after a network change /// will error out soon enough. Closing them eagerly speeds this up however and allows /// re-establishing a relay connection faster. + #[cfg(not(wasm_browser))] async fn close_stale_relay_connections(&self) { let ifs = interfaces::State::new().await; let local_ips = ifs @@ -2614,6 +2768,7 @@ fn new_re_stun_timer(initial_delay: bool) -> time::Interval { } /// Initial connection setup. +#[cfg(not(wasm_browser))] fn bind( addr_v4: Option, addr_v6: Option, @@ -2876,6 +3031,7 @@ struct NetInfo { have_port_map: bool, /// Probe indicating the presence of port mapping protocols on the LAN. + #[cfg(not(wasm_browser))] portmap_probe: Option, /// This node's preferred relay server for incoming traffic. @@ -2906,6 +3062,12 @@ impl NetInfo { (Some(slf), Some(other)) => slf == other, _ => true, // ignore for comparison if only one report had this info }; + + #[cfg(not(wasm_browser))] + let probe_eq = self.portmap_probe == other.portmap_probe; + #[cfg(wasm_browser)] + let probe_eq = true; + self.mapping_varies_by_dest_ip == other.mapping_varies_by_dest_ip && self.hair_pinning == other.hair_pinning && self.working_ipv6 == other.working_ipv6 @@ -2914,7 +3076,7 @@ impl NetInfo { && eq_icmp_v4 && eq_icmp_v6 && self.have_port_map == other.have_port_map - && self.portmap_probe == other.portmap_probe + && probe_eq && self.preferred_relay == other.preferred_relay } } @@ -3056,7 +3218,7 @@ mod tests { if ready.iter().all(|meshed| *meshed) { break; } - tokio::time::sleep(Duration::from_millis(200)).await; + time::sleep(Duration::from_millis(200)).await; } }) .await @@ -3792,7 +3954,7 @@ mod tests { // no relay, nothing to report assert_eq!( - futures_lite::future::poll_once(relay_stream.next()).await, + n0_future::future::poll_once(relay_stream.next()).await, None ); @@ -3805,7 +3967,7 @@ mod tests { let mut relay_stream = msock.home_relay().stream().filter_map(|r| r); assert_eq!( - futures_lite::future::poll_once(relay_stream.next()).await, + n0_future::future::poll_once(relay_stream.next()).await, Some(Some(url)) ); } @@ -4103,7 +4265,7 @@ mod tests { async move { let mut expected_msgs: BTreeSet = (0..capacity).collect(); while !expected_msgs.is_empty() { - let datagram = futures_lite::future::poll_fn(|cx| { + let datagram = n0_future::future::poll_fn(|cx| { queue.poll_recv(cx).map(|result| result.unwrap()) }) .await; diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index c466de7ba3..b9ac4ff1a5 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -3,11 +3,11 @@ use std::{ hash::Hash, net::{IpAddr, SocketAddr}, sync::Mutex, - time::Instant, }; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl}; use iroh_metrics::inc; +use n0_future::time::Instant; use serde::{Deserialize, Serialize}; use stun_rs::TransactionId; use tracing::{debug, info, instrument, trace, warn}; @@ -158,6 +158,7 @@ impl NodeMap { self.inner.lock().expect("poisoned").node_count() } + #[cfg(not(wasm_browser))] pub(super) fn receive_udp(&self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { self.inner.lock().expect("poisoned").receive_udp(udp_addr) } @@ -435,6 +436,7 @@ impl NodeMapInner { } /// Marks the node we believe to be at `ipp` as recently used. + #[cfg(not(wasm_browser))] fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, QuicMappedAddr)> { let ip_port: IpPort = udp_addr.into(); let Some(node_state) = self.get_mut(NodeStateKey::IpPort(ip_port)) else { diff --git a/iroh/src/magicsock/node_map/best_addr.rs b/iroh/src/magicsock/node_map/best_addr.rs index d5550b7c49..18d9ef960b 100644 --- a/iroh/src/magicsock/node_map/best_addr.rs +++ b/iroh/src/magicsock/node_map/best_addr.rs @@ -1,10 +1,8 @@ //! The [`BestAddr`] is the currently active best address for UDP sends. -use std::{ - net::SocketAddr, - time::{Duration, Instant}, -}; +use std::net::SocketAddr; +use n0_future::time::{Duration, Instant}; use tracing::{debug, info}; /// How long we trust a UDP address as the exclusive path (without using relay) without having heard a Pong reply. @@ -135,6 +133,7 @@ impl BestAddr { } /// Reset the expiry, if the passed in addr matches the currently used one. + #[cfg(not(wasm_browser))] pub fn reconfirm_if_used(&mut self, addr: SocketAddr, source: Source, confirmed_at: Instant) { if let Some(state) = self.0.as_mut() { if state.addr.addr == addr { diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index 3a2f76f610..e7f48df7dc 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -2,17 +2,18 @@ use std::{ collections::{btree_map::Entry, BTreeSet, HashMap}, hash::Hash, net::{IpAddr, SocketAddr}, - time::{Duration, Instant}, }; use data_encoding::HEXLOWER; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl}; use iroh_metrics::inc; use iroh_relay::protos::stun; -use netwatch::ip::is_unicast_link_local; +use n0_future::{ + task::{self, AbortOnDropHandle}, + time::{self, Duration, Instant}, +}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; -use tokio_util::task::AbortOnDropHandle; use tracing::{debug, event, info, instrument, trace, warn, Level}; use super::{ @@ -527,8 +528,8 @@ impl NodeState { } let id = self.id; - let _expiry_task = AbortOnDropHandle::new(tokio::spawn(async move { - tokio::time::sleep(PING_TIMEOUT_DURATION).await; + let _expiry_task = AbortOnDropHandle::new(task::spawn(async move { + time::sleep(PING_TIMEOUT_DURATION).await; sender .send(ActorMessage::EndpointPingExpired(id, tx_id)) .await @@ -1039,6 +1040,7 @@ impl NodeState { } /// Marks this node as having received a UDP payload message. + #[cfg(not(wasm_browser))] pub(super) fn receive_udp(&mut self, addr: IpPort, now: Instant) { let Some(state) = self.udp_paths.paths.get_mut(&addr) else { debug_assert!(false, "node map inconsistency by_ip_port <-> direct addr"); @@ -1432,6 +1434,12 @@ pub enum ConnectionType { None, } +/// Returns true if the address is a unicast address with link-local scope, as defined in RFC 4291. +// Copied from std lib, not stable yet +pub const fn is_unicast_link_local(addr: std::net::Ipv6Addr) -> bool { + (addr.segments()[0] & 0xffc0) == 0xfe80 +} + #[cfg(test)] mod tests { use std::{collections::BTreeMap, net::Ipv4Addr}; diff --git a/iroh/src/magicsock/node_map/path_state.rs b/iroh/src/magicsock/node_map/path_state.rs index 8279bf7cbd..7241121722 100644 --- a/iroh/src/magicsock/node_map/path_state.rs +++ b/iroh/src/magicsock/node_map/path_state.rs @@ -3,11 +3,11 @@ use std::{ collections::{BTreeMap, HashMap}, net::SocketAddr, - time::{Duration, Instant}, }; use iroh_base::NodeId; use iroh_relay::protos::stun; +use n0_future::time::{Duration, Instant}; use tracing::{debug, event, Level}; use super::{ diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index 6952a5dd9d..0d96c3bba0 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -5,12 +5,9 @@ //! the inside and instead only notifies this struct of state changes to each path. //! //! [`NodeState`]: super::node_state::NodeState -use std::{ - collections::BTreeMap, - net::SocketAddr, - time::{Duration, Instant}, -}; +use std::{collections::BTreeMap, net::SocketAddr}; +use n0_future::time::{Duration, Instant}; use rand::seq::IteratorRandom; use tracing::warn; diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index c8cacd1b85..7c3eb4108e 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -40,9 +40,6 @@ use std::{ use anyhow::{anyhow, Result}; use backoff::exponential::{ExponentialBackoff, ExponentialBackoffBuilder}; use bytes::{Bytes, BytesMut}; -use futures_buffered::FuturesUnorderedBounded; -use futures_lite::StreamExt; -use futures_util::SinkExt; use iroh_base::{NodeId, PublicKey, RelayUrl, SecretKey}; use iroh_metrics::{inc, inc_by}; use iroh_relay::{ @@ -50,18 +47,20 @@ use iroh_relay::{ client::{Client, ReceivedMessage, SendMessage}, PingTracker, MAX_PACKET_SIZE, }; -use tokio::{ - sync::{mpsc, oneshot}, +use n0_future::{ task::JoinSet, - time::{Duration, Instant, MissedTickBehavior}, + time::{self, Duration, Instant, MissedTickBehavior}, + FuturesUnorderedBounded, SinkExt, StreamExt, }; +use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, event, info_span, instrument, trace, warn, Instrument, Level}; use url::Url; use super::RelayDatagramSendChannelReceiver; +#[cfg(not(wasm_browser))] +use crate::dns::DnsResolver; use crate::{ - dns::DnsResolver, magicsock::{MagicSock, Metrics as MagicsockMetrics, RelayContents, RelayDatagramRecvQueue}, util::MaybeFuture, }; @@ -155,7 +154,7 @@ struct ActiveRelayActor { /// Unless it is managing the home relay connection. Inactivity is only tracked on the /// last datagram sent to the relay, received datagrams will trigger QUIC ACKs which is /// sufficient to keep active connections open. - inactive_timeout: Pin>, + inactive_timeout: Pin>, /// Token indicating the [`ActiveRelayActor`] should stop. stop_token: CancellationToken, } @@ -206,6 +205,7 @@ struct ActiveRelayActorOptions { #[derive(Debug, Clone)] struct RelayConnectionOptions { secret_key: SecretKey, + #[cfg(not(wasm_browser))] dns_resolver: DnsResolver, proxy_url: Option, prefer_ipv6: Arc, @@ -233,7 +233,7 @@ impl ActiveRelayActor { url, relay_client_builder, is_home_relay: false, - inactive_timeout: Box::pin(tokio::time::sleep(RELAY_INACTIVE_CLEANUP_TIME)), + inactive_timeout: Box::pin(time::sleep(RELAY_INACTIVE_CLEANUP_TIME)), stop_token, } } @@ -244,14 +244,21 @@ impl ActiveRelayActor { ) -> relay::client::ClientBuilder { let RelayConnectionOptions { secret_key, + #[cfg(not(wasm_browser))] dns_resolver, proxy_url, prefer_ipv6, #[cfg(any(test, feature = "test-utils"))] insecure_skip_cert_verify, } = opts; - let mut builder = relay::client::ClientBuilder::new(url, secret_key, dns_resolver) - .address_family_selector(move || prefer_ipv6.load(Ordering::Relaxed)); + + let mut builder = relay::client::ClientBuilder::new( + url, + secret_key, + #[cfg(not(wasm_browser))] + dns_resolver, + ) + .address_family_selector(move || prefer_ipv6.load(Ordering::Relaxed)); if let Some(proxy_url) = proxy_url { builder = builder.proxy_url(proxy_url); } @@ -318,7 +325,7 @@ impl ActiveRelayActor { // is not an ideal mechanism, an alternative approach would be to use // e.g. ConcurrentQueue with force_push, though now you might still send very stale // packets when eventually connected. So perhaps this is a reasonable compromise. - let mut send_datagram_flush = tokio::time::interval(UNDELIVERABLE_DATAGRAM_TIMEOUT); + let mut send_datagram_flush = time::interval(UNDELIVERABLE_DATAGRAM_TIMEOUT); send_datagram_flush.set_missed_tick_behavior(MissedTickBehavior::Delay); send_datagram_flush.reset(); // Skip the immediate interval @@ -395,7 +402,7 @@ impl ActiveRelayActor { /// The future only completes once the connection is established and retries /// connections. It currently does not ever return `Err` as the retries continue /// forever. - fn dial_relay(&self) -> Pin> + Send>> { + fn dial_relay(&self) -> BoxedFut> { let backoff: ExponentialBackoff = ExponentialBackoffBuilder::new() .with_initial_interval(Duration::from_millis(10)) .with_max_interval(Duration::from_secs(5)) @@ -405,7 +412,7 @@ impl ActiveRelayActor { move || { let client_builder = client_builder.clone(); async move { - match tokio::time::timeout(CONNECT_TIMEOUT, client_builder.connect()).await { + match time::timeout(CONNECT_TIMEOUT, client_builder.connect()).await { Ok(Ok(client)) => Ok(client), Ok(Err(err)) => { warn!("Relay connection failed: {err:#}"); @@ -419,7 +426,24 @@ impl ActiveRelayActor { } } }; - let retry_fut = backoff::future::retry(backoff, connect_fn); + + struct Sleeper; + + impl backoff::future::Sleeper for Sleeper { + type Sleep = time::Sleep; + + fn sleep(&self, dur: Duration) -> Self::Sleep { + time::sleep(dur) + } + } + + struct NoopNotify; + + impl backoff::Notify for NoopNotify { + fn notify(&mut self, _: E, _: Duration) {} + } + + let retry_fut = backoff::future::Retry::new(Sleeper, backoff, NoopNotify, connect_fn); Box::pin(retry_fut) } @@ -451,7 +475,7 @@ impl ActiveRelayActor { let mut send_datagrams_buf = Vec::with_capacity(SEND_DATAGRAM_BATCH_SIZE); // Regularly send pings so we know the connection is healthy. - let mut ping_interval = tokio::time::interval(PING_INTERVAL); + let mut ping_interval = time::interval(PING_INTERVAL); ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); ping_interval.reset(); // skip the ping at current time. @@ -547,7 +571,7 @@ impl ActiveRelayActor { }) .map(Ok) }); - let mut packet_stream = futures_util::stream::iter(packet_iter); + let mut packet_stream = n0_future::stream::iter(packet_iter); let fut = client_sink.send_all(&mut packet_stream); self.run_sending(fut, &mut state, &mut client_stream).await?; } @@ -647,7 +671,7 @@ impl ActiveRelayActor { // we use the same time as for our ping interval let send_timeout = PING_INTERVAL; - let mut timeout = pin!(tokio::time::sleep(send_timeout)); + let mut timeout = pin!(time::sleep(send_timeout)); let mut sending_fut = pin!(sending_fut); loop { tokio::select! { @@ -825,7 +849,7 @@ impl RelayActor { } // try shutdown - if tokio::time::timeout(Duration::from_secs(3), self.close_all_active_relays()) + if time::timeout(Duration::from_secs(3), self.close_all_active_relays()) .await .is_err() { @@ -874,7 +898,7 @@ impl RelayActor { async fn set_home_relay(&mut self, home_url: RelayUrl) { let home_url_ref = &home_url; - futures_buffered::join_all(self.active_relays.iter().map(|(url, handle)| async move { + n0_future::join_all(self.active_relays.iter().map(|(url, handle)| async move { let is_preferred = url == home_url_ref; handle .inbox_addr @@ -956,6 +980,7 @@ impl RelayActor { let connection_opts = RelayConnectionOptions { secret_key: self.msock.secret_key.clone(), + #[cfg(not(wasm_browser))] dns_resolver: self.msock.dns_resolver.clone(), proxy_url: self.msock.proxy_url().cloned(), prefer_ipv6: self.msock.ipv6_reported.clone(), @@ -1009,7 +1034,7 @@ impl RelayActor { .await .ok(); }); - futures_buffered::join_all(send_futs).await; + n0_future::join_all(send_futs).await; self.log_active_relay(); } @@ -1193,11 +1218,17 @@ impl Iterator for PacketSplitIter { } } +#[cfg(not(wasm_browser))] +type BoxedFut = Pin + Send>>; + +#[cfg(wasm_browser)] +type BoxedFut = Pin>>; + #[cfg(test)] mod tests { use anyhow::Context; - use futures_lite::future; use iroh_base::SecretKey; + use n0_future::future; use smallvec::smallvec; use testresult::TestResult; use tokio_util::task::AbortOnDropHandle; diff --git a/iroh/src/metrics.rs b/iroh/src/metrics.rs index 1d9906b3bf..2799d842c5 100644 --- a/iroh/src/metrics.rs +++ b/iroh/src/metrics.rs @@ -2,6 +2,7 @@ #[cfg(feature = "test-utils")] pub use iroh_relay::server::Metrics as RelayMetrics; pub use net_report::Metrics as NetReportMetrics; +#[cfg(not(wasm_browser))] pub use portmapper::Metrics as PortmapMetrics; pub use crate::magicsock::Metrics as MagicsockMetrics; diff --git a/iroh/src/dns/node_info.rs b/iroh/src/node_info.rs similarity index 97% rename from iroh/src/dns/node_info.rs rename to iroh/src/node_info.rs index 1d7c1f471f..468df314d2 100644 --- a/iroh/src/dns/node_info.rs +++ b/iroh/src/node_info.rs @@ -41,6 +41,7 @@ use std::{ }; use anyhow::{anyhow, Result}; +#[cfg(not(wasm_browser))] use hickory_resolver::{proto::ProtoError, Name, TokioResolver}; use iroh_base::{NodeAddr, NodeId, SecretKey}; use tracing::warn; @@ -163,6 +164,7 @@ impl NodeInfo { self.into() } + #[cfg(not(wasm_browser))] /// Parses a [`NodeInfo`] from a TXT records lookup. pub fn from_hickory_lookup(lookup: hickory_resolver::lookup::TxtLookup) -> Result { let attrs = TxtAttrs::from_hickory_lookup(lookup)?; @@ -187,6 +189,7 @@ impl NodeInfo { } /// Converts into a [`hickory_resolver::proto::rr::Record`] DNS record. + #[cfg(not(wasm_browser))] pub fn to_hickory_records( &self, origin: &str, @@ -203,6 +206,7 @@ impl NodeInfo { /// Takes a [`hickory_resolver::proto::rr::Name`] DNS name and expects the first label to be /// [`IROH_TXT_NAME`] and the second label to be a z32 encoded [`NodeId`]. Ignores /// subsequent labels. +#[cfg(not(wasm_browser))] pub(crate) fn node_id_from_hickory_name( name: &hickory_resolver::proto::rr::Name, ) -> Option { @@ -256,6 +260,7 @@ impl TxtAttrs { Ok(Self { attrs, node_id }) } + #[cfg(not(wasm_browser))] async fn lookup(resolver: &TokioResolver, name: Name) -> Result { let name = ensure_iroh_txt_label(name)?; let lookup = resolver.txt_lookup(name).await?; @@ -264,6 +269,7 @@ impl TxtAttrs { } /// Looks up attributes by [`NodeId`] and origin domain. + #[cfg(not(wasm_browser))] pub async fn lookup_by_id( resolver: &TokioResolver, node_id: &NodeId, @@ -274,6 +280,7 @@ impl TxtAttrs { } /// Looks up attributes by DNS name. + #[cfg(not(wasm_browser))] pub async fn lookup_by_name(resolver: &TokioResolver, name: &str) -> Result { let name = Name::from_str(name)?; TxtAttrs::lookup(resolver, name).await @@ -313,6 +320,7 @@ impl TxtAttrs { } /// Parses a TXT records lookup. + #[cfg(not(wasm_browser))] pub fn from_hickory_lookup(lookup: hickory_resolver::lookup::TxtLookup) -> Result { let queried_node_id = node_id_from_hickory_name(lookup.query().name()) .ok_or_else(|| anyhow!("invalid DNS answer: not a query for _iroh.z32encodedpubkey"))?; @@ -360,6 +368,7 @@ impl TxtAttrs { } /// Converts to a list of [`hickory_resolver::proto::rr::Record`] resource records. + #[cfg(not(wasm_browser))] pub fn to_hickory_records( &self, origin: &str, @@ -386,7 +395,8 @@ impl TxtAttrs { ) -> Result { let packet = self.to_pkarr_dns_packet(ttl)?; let keypair = pkarr::Keypair::from_secret_key(&secret_key.to_bytes()); - let signed_packet = pkarr::SignedPacket::from_packet(&keypair, &packet)?; + let signed_packet = pkarr::SignedPacket::from_packet(&keypair, &packet) + .map_err(|e| anyhow::anyhow!(e.to_string()))?; Ok(signed_packet) } @@ -410,6 +420,7 @@ impl TxtAttrs { } } +#[cfg(not(wasm_browser))] fn ensure_iroh_txt_label(name: Name) -> Result { if name.iter().next() == Some(IROH_TXT_NAME.as_bytes()) { Ok(name) @@ -418,6 +429,7 @@ fn ensure_iroh_txt_label(name: Name) -> Result { } } +#[cfg(not(wasm_browser))] fn node_domain(node_id: &NodeId, origin: &str) -> Result { let domain = format!("{}.{}", to_z32(node_id), origin); let domain = Name::from_str(&domain)?; @@ -443,7 +455,7 @@ mod tests { use testresult::TestResult; use super::NodeInfo; - use crate::dns::node_info::to_z32; + use crate::node_info::to_z32; #[test] fn txt_attr_roundtrip() { diff --git a/iroh/src/protocol.rs b/iroh/src/protocol.rs index 4aa22d34bf..574ec68918 100644 --- a/iroh/src/protocol.rs +++ b/iroh/src/protocol.rs @@ -41,10 +41,13 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::Result; -use futures_buffered::join_all; -use futures_lite::future::Boxed as BoxedFuture; -use tokio::{sync::Mutex, task::JoinSet}; -use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; +use n0_future::{ + boxed::BoxFuture, + join_all, + task::{self, AbortOnDropHandle, JoinSet}, +}; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; use tracing::{error, info_span, trace, warn, Instrument}; use crate::{endpoint::Connecting, Endpoint}; @@ -109,30 +112,30 @@ pub trait ProtocolHandler: Send + Sync + std::fmt::Debug + 'static { /// Handle an incoming connection. /// /// This runs on a freshly spawned tokio task so this can be long-running. - fn accept(&self, conn: Connecting) -> BoxedFuture>; + fn accept(&self, conn: Connecting) -> BoxFuture>; /// Called when the node shuts down. - fn shutdown(&self) -> BoxedFuture<()> { + fn shutdown(&self) -> BoxFuture<()> { Box::pin(async move {}) } } impl ProtocolHandler for Arc { - fn accept(&self, conn: Connecting) -> BoxedFuture> { + fn accept(&self, conn: Connecting) -> BoxFuture> { self.as_ref().accept(conn) } - fn shutdown(&self) -> BoxedFuture<()> { + fn shutdown(&self) -> BoxFuture<()> { self.as_ref().shutdown() } } impl ProtocolHandler for Box { - fn accept(&self, conn: Connecting) -> BoxedFuture> { + fn accept(&self, conn: Connecting) -> BoxFuture> { self.as_ref().accept(conn) } - fn shutdown(&self) -> BoxedFuture<()> { + fn shutdown(&self) -> BoxFuture<()> { self.as_ref().shutdown() } } @@ -306,7 +309,7 @@ impl RouterBuilder { tracing::info!("Shutting down remaining tasks"); join_set.shutdown().await; }; - let task = tokio::task::spawn(run_loop_fut); + let task = task::spawn(run_loop_fut); let task = AbortOnDropHandle::new(task); Ok(Router { diff --git a/iroh/src/test_utils.rs b/iroh/src/test_utils.rs index 52a1879c92..26a44addf3 100644 --- a/iroh/src/test_utils.rs +++ b/iroh/src/test_utils.rs @@ -196,7 +196,6 @@ pub(crate) mod dns_server { }; use anyhow::{ensure, Result}; - use futures_lite::future::Boxed as BoxFuture; use hickory_resolver::{ config::NameServerConfig, proto::{ @@ -205,6 +204,7 @@ pub(crate) mod dns_server { }, TokioResolver, }; + use n0_future::future::Boxed as BoxFuture; use tokio::{net::UdpSocket, sync::oneshot}; use tracing::{debug, error, warn}; @@ -388,7 +388,7 @@ pub(crate) mod pkarr_dns_state { use pkarr::SignedPacket; use crate::{ - dns::node_info::{node_id_from_hickory_name, NodeInfo}, + node_info::{node_id_from_hickory_name, NodeInfo}, test_utils::dns_server::QueryHandler, }; diff --git a/iroh/src/watchable.rs b/iroh/src/watchable.rs index 9ed6cd9056..c94ee49642 100644 --- a/iroh/src/watchable.rs +++ b/iroh/src/watchable.rs @@ -18,9 +18,9 @@ use std::{ task::{self, Poll, Waker}, }; -use futures_lite::stream::Stream; #[cfg(iroh_loom)] use loom::sync; +use n0_future::stream::Stream; use sync::{Mutex, RwLock}; /// A wrapper around a value that notifies [`Watcher`]s when the value is modified. @@ -236,7 +236,7 @@ impl Future for WatchInitializedFut<'_, T> { let Some(shared) = self.watcher.shared.upgrade() else { return Poll::Ready(Err(Disconnected)); }; - let (epoch, value) = futures_lite::ready!(shared.poll_next(cx, self.watcher.epoch)); + let (epoch, value) = n0_future::ready!(shared.poll_next(cx, self.watcher.epoch)); self.watcher.epoch = epoch; if let Some(value) = value { @@ -354,7 +354,7 @@ impl Shared { mod tests { use std::time::{Duration, Instant}; - use futures_lite::StreamExt; + use n0_future::StreamExt; use rand::{thread_rng, Rng}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -458,12 +458,12 @@ mod tests { let mut watcher = watchable.watch(); let mut initialized = watcher.initialized(); - let poll = futures_lite::future::poll_once(&mut initialized).await; + let poll = n0_future::future::poll_once(&mut initialized).await; assert!(poll.is_none()); watchable.set(Some(1u8)).ok(); - let poll = futures_lite::future::poll_once(&mut initialized).await; + let poll = n0_future::future::poll_once(&mut initialized).await; assert_eq!(poll.unwrap().unwrap(), 1u8); } @@ -474,7 +474,7 @@ mod tests { let mut watcher = watchable.watch(); let mut initialized = watcher.initialized(); - let poll = futures_lite::future::poll_once(&mut initialized).await; + let poll = n0_future::future::poll_once(&mut initialized).await; assert_eq!(poll.unwrap().unwrap(), 1u8); } @@ -490,7 +490,7 @@ mod tests { let watchable = Watchable::>::new(None); let mut watch = watchable.watch(); - let thread = thread::spawn(move || futures_lite::future::block_on(watch.initialized())); + let thread = thread::spawn(move || n0_future::future::block_on(watch.initialized())); watchable.set(Some(42)).ok(); diff --git a/iroh/src/web_runtime.rs b/iroh/src/web_runtime.rs new file mode 100644 index 0000000000..4e69ea5d06 --- /dev/null +++ b/iroh/src/web_runtime.rs @@ -0,0 +1,33 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use n0_future::time; + +#[derive(Debug, Clone)] +pub struct WebRuntime; + +#[derive(Debug)] +struct Timer(time::Sleep); + +impl quinn::Runtime for WebRuntime { + fn new_timer(&self, deadline: time::Instant) -> Pin> { + Box::pin(Timer(time::sleep_until(deadline))) + } + + fn spawn(&self, future: Pin + Send>>) { + wasm_bindgen_futures::spawn_local(future); + } +} + +impl quinn::AsyncTimer for Timer { + fn reset(mut self: Pin<&mut Self>, deadline: time::Instant) { + Pin::new(&mut self.0).reset(deadline) + } + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + Pin::new(&mut self.0).poll(cx) + } +}