Skip to content

Commit

Permalink
refactor: Use n0-future in favor of futures-* libraries and `toki…
Browse files Browse the repository at this point in the history
…o::{spawn,task,time}` (#3156)

## Description

- replaces all imports of `futures_lite`, `futures_util`, `futures_sink`
and `futures_buffered` with re-exports from `n0_future`.
- replaces all imports of `std::time` with `n0_future::time`
- replaces all occurrences of `tokio::{spawn,task,time}` with
`n0_future::{spawn,task,time}`
- **does not affect tests**: we use a bunch of special things there,
like `tokio::time::advance` that we don't have n0-future yet. It's also
not strictly necessary for shipping e.g. browser stuff.

The goal of this PR is
- to get us closer to browser compatibility (I've verified that I've
replaced enough of all task spawning, etc. such that mostly cfg-ing out
and other unrelated changes remain browser support)
- to make the diffs of the browser-support PRs easier to follow

Essentially this is work extracted out of the browser PRs.

## Breaking Changes

I don't think there are any breaking changes from this.
It's possible that `JoinSet` or `JoinHandle` types leak from the
interfaces somewhere, but they're identical to the old ones (just
re-exports) outside of `wasm*-*-unknown` targets.

## Notes & open questions

I've tried setting up semgrep, but (1) IIUC, setting it up for CI
requires that you have an account with them, only running locally is
free and (2) I haven't found a way to ignore e.g. `std::time` imports
when you're inside a `*::tests` module, which means we'd either need to
convert all our tests to `n0-future` or ignore warnings and I think both
of these options might suck.

## Change checklist

- [x] Self-review.
- [x] All breaking changes documented.
  • Loading branch information
matheus23 authored Jan 29, 2025
1 parent 03e3e3c commit 617fa50
Show file tree
Hide file tree
Showing 39 changed files with 220 additions and 229 deletions.
16 changes: 7 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-dns-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ derive_more = { version = "1.0.0", features = [
"from",
] }
dirs-next = "2.0.0"
futures-lite = "2.5"
governor = "0.6.3" #needs new release of tower_governor for 0.7.0
hickory-server = { version = "=0.25.0-alpha.4", features = ["dns-over-rustls", "dns-over-https-rustls"] }
http = "1.0.0"
humantime-serde = "1.1.1"
iroh-metrics = { version = "0.31.0" }
lru = "0.12.3"
n0-future = "0.1.2"
pkarr = { version = "2.3.1", features = [ "async", "relay", "dht"], default-features = false }
rcgen = "0.13"
redb = "2.0.0"
Expand Down
2 changes: 1 addition & 1 deletion iroh-dns-server/src/http/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use axum_server::{
accept::Accept,
tls_rustls::{RustlsAcceptor, RustlsConfig},
};
use futures_lite::{future::Boxed as BoxFuture, FutureExt};
use n0_future::{future::Boxed as BoxFuture, FutureExt};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls_acme::{axum::AxumAcceptor, caches::DirCache, AcmeConfig};
Expand Down
3 changes: 1 addition & 2 deletions iroh-net-report/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ workspace = true
anyhow = "1"
bytes = "1.7"
derive_more = { version = "1.0.0", features = ["display"] }
futures-buffered = "0.2.8"
futures-lite = "2.3"
hickory-resolver = "=0.25.0-alpha.4"
iroh-base = { version = "0.31.0", path = "../iroh-base", default-features = false, features = ["relay"] }
iroh-metrics = { version = "0.31", default-features = false }
iroh-relay = { version = "0.31", path = "../iroh-relay" }
n0-future = "0.1.2"
netwatch = { version = "0.3" }
portmapper = { version = "0.3", default-features = false }
quinn = { package = "iroh-quinn", version = "0.13.0" }
Expand Down
2 changes: 1 addition & 1 deletion iroh-net-report/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/// Contains all timeouts that we use in `iroh-net-report`.
pub(crate) mod timeouts {
use std::time::Duration;
use n0_future::time::Duration;

// Timeouts for net_report

Expand Down
12 changes: 6 additions & 6 deletions iroh-net-report/src/dns.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{fmt::Write, net::IpAddr};

use anyhow::Result;
use futures_lite::{Future, StreamExt};
use hickory_resolver::{IntoName, TokioResolver};
use n0_future::{time, Future, StreamExt};

use crate::defaults::timeouts::DNS_TIMEOUT;

Expand Down Expand Up @@ -66,12 +66,12 @@ pub(crate) trait ResolverExt {

impl ResolverExt for TokioResolver {
async fn lookup_ipv4<N: IntoName>(&self, host: N) -> Result<impl Iterator<Item = IpAddr>> {
let addrs = tokio::time::timeout(DNS_TIMEOUT, self.ipv4_lookup(host)).await??;
let addrs = time::timeout(DNS_TIMEOUT, self.ipv4_lookup(host)).await??;
Ok(addrs.into_iter().map(|ip| IpAddr::V4(ip.0)))
}

async fn lookup_ipv6<N: IntoName>(&self, host: N) -> Result<impl Iterator<Item = IpAddr>> {
let addrs = tokio::time::timeout(DNS_TIMEOUT, self.ipv6_lookup(host)).await??;
let addrs = time::timeout(DNS_TIMEOUT, self.ipv6_lookup(host)).await??;
Ok(addrs.into_iter().map(|ip| IpAddr::V6(ip.0)))
}

Expand Down Expand Up @@ -150,14 +150,14 @@ async fn stagger_call<T, F: Fn() -> Fut, Fut: Future<Output = Result<T>>>(
f: F,
delays_ms: &[u64],
) -> Result<T> {
let mut calls = futures_buffered::FuturesUnorderedBounded::new(delays_ms.len() + 1);
let mut calls = n0_future::FuturesUnorderedBounded::new(delays_ms.len() + 1);
// NOTE: we add the 0 delay here to have a uniform set of futures. This is more performant than
// using alternatives that allow futures of different types.
for delay in std::iter::once(&0u64).chain(delays_ms) {
let delay = std::time::Duration::from_millis(*delay);
let delay = time::Duration::from_millis(*delay);
let fut = f();
let staggered_fut = async move {
tokio::time::sleep(delay).await;
time::sleep(delay).await;
fut.await
};
calls.push(staggered_fut)
Expand Down
15 changes: 7 additions & 8 deletions iroh-net-report/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use iroh_base::RelayUrl;
#[cfg(feature = "metrics")]
use iroh_metrics::inc;
use iroh_relay::{protos::stun, RelayMap};
use netwatch::UdpSocket;
use tokio::{
sync::{self, mpsc, oneshot},
use n0_future::{
task::{self, AbortOnDropHandle},
time::{Duration, Instant},
};
use tokio_util::task::AbortOnDropHandle;
use netwatch::UdpSocket;
use tokio::sync::{self, mpsc, oneshot};
use tracing::{debug, error, info_span, trace, warn, Instrument};

mod defaults;
Expand Down Expand Up @@ -351,7 +351,7 @@ impl Client {
pub fn new(port_mapper: Option<portmapper::Client>, dns_resolver: DnsResolver) -> Result<Self> {
let mut actor = Actor::new(port_mapper, dns_resolver)?;
let addr = actor.addr();
let task = tokio::spawn(
let task = task::spawn(
async move { actor.run().await }.instrument(info_span!("net_report.actor")),
);
let drop_guard = AbortOnDropHandle::new(task);
Expand Down Expand Up @@ -899,7 +899,7 @@ pub(crate) mod stun_utils {
);
{
let sock = sock.clone();
tokio::spawn(
task::spawn(
async move {
debug!("udp stun socket listener started");
// TODO: Can we do better for buffers here? Probably doesn't matter much.
Expand Down Expand Up @@ -988,7 +988,6 @@ mod tests {

use bytes::BytesMut;
use netwatch::IpFamily;
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::info;

Expand Down Expand Up @@ -1388,7 +1387,7 @@ mod tests {
let mut actor = Actor::new(None, resolver.clone()).unwrap();
for s in &mut tt.steps {
// trigger the timer
time::advance(Duration::from_secs(s.after)).await;
tokio::time::advance(Duration::from_secs(s.after)).await;
let r = Arc::try_unwrap(s.r.take().unwrap()).unwrap();
s.r = Some(actor.add_report_history_and_set_preferred_relay(r));
}
Expand Down
29 changes: 13 additions & 16 deletions iroh-net-report/src/reportgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use anyhow::{anyhow, bail, Context as _, Result};
Expand All @@ -37,14 +36,13 @@ use iroh_relay::{
protos::stun,
RelayMap, RelayNode,
};
use n0_future::{
task::{self, AbortOnDropHandle, JoinSet},
time::{self, Duration, Instant},
};
use netwatch::{interfaces, UdpSocket};
use rand::seq::IteratorRandom;
use tokio::{
sync::{mpsc, oneshot},
task::JoinSet,
time::{self, Instant},
};
use tokio_util::task::AbortOnDropHandle;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, debug_span, error, info_span, trace, warn, Instrument, Span};
use url::Host;

Expand Down Expand Up @@ -115,9 +113,8 @@ impl Client {
dns_resolver,
protocols,
};
let task = tokio::spawn(
async move { actor.run().await }.instrument(info_span!("reportgen.actor")),
);
let task =
task::spawn(async move { actor.run().await }.instrument(info_span!("reportgen.actor")));
Self {
_drop_guard: AbortOnDropHandle::new(task),
}
Expand Down Expand Up @@ -245,9 +242,9 @@ impl Actor {
let mut captive_task = self.prepare_captive_portal_task();
let mut probes = self.spawn_probes_task().await?;

let total_timer = tokio::time::sleep(OVERALL_REPORT_TIMEOUT);
let total_timer = time::sleep(OVERALL_REPORT_TIMEOUT);
tokio::pin!(total_timer);
let probe_timer = tokio::time::sleep(PROBES_TIMEOUT);
let probe_timer = time::sleep(PROBES_TIMEOUT);
tokio::pin!(probe_timer);

loop {
Expand Down Expand Up @@ -385,7 +382,7 @@ impl Actor {
delay=?timeout,
"Have enough probe reports, aborting further probes soon",
);
tokio::spawn(
task::spawn(
async move {
time::sleep(timeout).await;
// Because we do this after a timeout it is entirely normal that the
Expand Down Expand Up @@ -488,9 +485,9 @@ impl Actor {
self.outstanding_tasks.captive_task = true;
MaybeFuture {
inner: Some(Box::pin(async move {
tokio::time::sleep(CAPTIVE_PORTAL_DELAY).await;
time::sleep(CAPTIVE_PORTAL_DELAY).await;
debug!("Captive portal check started after {CAPTIVE_PORTAL_DELAY:?}");
let captive_portal_check = tokio::time::timeout(
let captive_portal_check = time::timeout(
CAPTIVE_PORTAL_TIMEOUT,
check_captive_portal(&dns_resolver, &dm, preferred_relay)
.instrument(debug_span!("captive-portal")),
Expand Down Expand Up @@ -719,7 +716,7 @@ async fn run_probe(
) -> Result<ProbeReport, ProbeError> {
if !probe.delay().is_zero() {
trace!("delaying probe");
tokio::time::sleep(probe.delay()).await;
time::sleep(probe.delay()).await;
}
debug!("starting probe");

Expand Down
11 changes: 7 additions & 4 deletions iroh-net-report/src/reportgen/hairpin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};

use anyhow::{bail, Context, Result};
use iroh_relay::protos::stun;
use n0_future::{
task::{self, AbortOnDropHandle},
time::{self, Instant},
};
use netwatch::UdpSocket;
use tokio::{sync::oneshot, time::Instant};
use tokio_util::task::AbortOnDropHandle;
use tokio::sync::oneshot;
use tracing::{debug, error, info_span, trace, warn, Instrument};

use crate::{self as net_report, defaults::timeouts::HAIRPIN_CHECK_TIMEOUT, reportgen, Inflight};
Expand All @@ -43,7 +46,7 @@ impl Client {
};

let task =
tokio::spawn(async move { actor.run().await }.instrument(info_span!("hairpin.actor")));
task::spawn(async move { actor.run().await }.instrument(info_span!("hairpin.actor")));
Self {
addr: Some(addr),
_drop_guard: AbortOnDropHandle::new(task),
Expand Down Expand Up @@ -127,7 +130,7 @@ impl Actor {
}

let now = Instant::now();
let hairpinning_works = match tokio::time::timeout(HAIRPIN_CHECK_TIMEOUT, stun_rx).await {
let hairpinning_works = match time::timeout(HAIRPIN_CHECK_TIMEOUT, stun_rx).await {
Ok(Ok(_)) => true,
Ok(Err(_)) => bail!("net_report actor dropped stun response channel"),
Err(_) => false, // Elapsed
Expand Down
2 changes: 1 addition & 1 deletion iroh-net-report/src/reportgen/probes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use std::{collections::BTreeSet, fmt, sync::Arc};
use anyhow::{ensure, Result};
use iroh_base::RelayUrl;
use iroh_relay::{RelayMap, RelayNode};
use n0_future::time::Duration;
use netwatch::interfaces;
use tokio::time::Duration;

use crate::Report;

Expand Down
2 changes: 1 addition & 1 deletion iroh-relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ hyper = { version = "1", features = ["server", "client", "http1"] }
hyper-util = "0.1.1"
iroh-base = { version = "0.31.0", path = "../iroh-base", default-features = false, features = ["key", "relay"] }
iroh-metrics = { version = "0.31", default-features = false }
n0-future = { version = "0.0.1" }
n0-future = "0.1.2"
num_enum = "0.7"
pin-project = "1"
postcard = { version = "1", default-features = false, features = [
Expand Down
3 changes: 1 addition & 2 deletions iroh-relay/src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ use std::{
io,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use anyhow::{bail, Result};
use bytes::Bytes;
use iroh_base::{NodeId, SecretKey};
use n0_future::{Sink, Stream};
use n0_future::{time::Duration, Sink, Stream};
use tokio_tungstenite_wasm::WebSocketStream;
#[cfg(not(wasm_browser))]
use tokio_util::codec::Framed;
Expand Down
Loading

0 comments on commit 617fa50

Please sign in to comment.