Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use n0-future in favor of futures-* libraries and tokio::{spawn,task,time} #3156

Merged
merged 11 commits into from
Jan 29, 2025
16 changes: 7 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-dns-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ derive_more = { version = "1.0.0", features = [
"from",
] }
dirs-next = "2.0.0"
futures-lite = "2.5"
governor = "0.6.3" #needs new release of tower_governor for 0.7.0
hickory-server = { version = "=0.25.0-alpha.4", features = ["dns-over-rustls", "dns-over-https-rustls"] }
http = "1.0.0"
humantime-serde = "1.1.1"
iroh-metrics = { version = "0.31.0" }
lru = "0.12.3"
n0-future = "0.1.2"
pkarr = { version = "2.3.1", features = [ "async", "relay", "dht"], default-features = false }
rcgen = "0.13"
redb = "2.0.0"
Expand Down
2 changes: 1 addition & 1 deletion iroh-dns-server/src/http/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use axum_server::{
accept::Accept,
tls_rustls::{RustlsAcceptor, RustlsConfig},
};
use futures_lite::{future::Boxed as BoxFuture, FutureExt};
use n0_future::{future::Boxed as BoxFuture, FutureExt};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls_acme::{axum::AxumAcceptor, caches::DirCache, AcmeConfig};
Expand Down
3 changes: 1 addition & 2 deletions iroh-net-report/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ workspace = true
anyhow = "1"
bytes = "1.7"
derive_more = { version = "1.0.0", features = ["display"] }
futures-buffered = "0.2.8"
futures-lite = "2.3"
hickory-resolver = "=0.25.0-alpha.4"
iroh-base = { version = "0.31.0", path = "../iroh-base", default-features = false, features = ["relay"] }
iroh-metrics = { version = "0.31", default-features = false }
iroh-relay = { version = "0.31", path = "../iroh-relay" }
n0-future = "0.1.2"
netwatch = { version = "0.3" }
portmapper = { version = "0.3", default-features = false }
quinn = { package = "iroh-quinn", version = "0.13.0" }
Expand Down
2 changes: 1 addition & 1 deletion iroh-net-report/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/// Contains all timeouts that we use in `iroh-net-report`.
pub(crate) mod timeouts {
use std::time::Duration;
use n0_future::time::Duration;

// Timeouts for net_report

Expand Down
12 changes: 6 additions & 6 deletions iroh-net-report/src/dns.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{fmt::Write, net::IpAddr};

use anyhow::Result;
use futures_lite::{Future, StreamExt};
use hickory_resolver::{IntoName, TokioResolver};
use n0_future::{time, Future, StreamExt};

use crate::defaults::timeouts::DNS_TIMEOUT;

Expand Down Expand Up @@ -66,12 +66,12 @@ pub(crate) trait ResolverExt {

impl ResolverExt for TokioResolver {
async fn lookup_ipv4<N: IntoName>(&self, host: N) -> Result<impl Iterator<Item = IpAddr>> {
let addrs = tokio::time::timeout(DNS_TIMEOUT, self.ipv4_lookup(host)).await??;
let addrs = time::timeout(DNS_TIMEOUT, self.ipv4_lookup(host)).await??;
Ok(addrs.into_iter().map(|ip| IpAddr::V4(ip.0)))
}

async fn lookup_ipv6<N: IntoName>(&self, host: N) -> Result<impl Iterator<Item = IpAddr>> {
let addrs = tokio::time::timeout(DNS_TIMEOUT, self.ipv6_lookup(host)).await??;
let addrs = time::timeout(DNS_TIMEOUT, self.ipv6_lookup(host)).await??;
Ok(addrs.into_iter().map(|ip| IpAddr::V6(ip.0)))
}

Expand Down Expand Up @@ -150,14 +150,14 @@ async fn stagger_call<T, F: Fn() -> Fut, Fut: Future<Output = Result<T>>>(
f: F,
delays_ms: &[u64],
) -> Result<T> {
let mut calls = futures_buffered::FuturesUnorderedBounded::new(delays_ms.len() + 1);
let mut calls = n0_future::FuturesUnorderedBounded::new(delays_ms.len() + 1);
// NOTE: we add the 0 delay here to have a uniform set of futures. This is more performant than
// using alternatives that allow futures of different types.
for delay in std::iter::once(&0u64).chain(delays_ms) {
let delay = std::time::Duration::from_millis(*delay);
let delay = time::Duration::from_millis(*delay);
let fut = f();
let staggered_fut = async move {
tokio::time::sleep(delay).await;
time::sleep(delay).await;
fut.await
};
calls.push(staggered_fut)
Expand Down
15 changes: 7 additions & 8 deletions iroh-net-report/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use iroh_base::RelayUrl;
#[cfg(feature = "metrics")]
use iroh_metrics::inc;
use iroh_relay::{protos::stun, RelayMap};
use netwatch::UdpSocket;
use tokio::{
sync::{self, mpsc, oneshot},
use n0_future::{
task::{self, AbortOnDropHandle},
time::{Duration, Instant},
};
use tokio_util::task::AbortOnDropHandle;
use netwatch::UdpSocket;
use tokio::sync::{self, mpsc, oneshot};
use tracing::{debug, error, info_span, trace, warn, Instrument};

mod defaults;
Expand Down Expand Up @@ -351,7 +351,7 @@ impl Client {
pub fn new(port_mapper: Option<portmapper::Client>, dns_resolver: DnsResolver) -> Result<Self> {
let mut actor = Actor::new(port_mapper, dns_resolver)?;
let addr = actor.addr();
let task = tokio::spawn(
let task = task::spawn(
async move { actor.run().await }.instrument(info_span!("net_report.actor")),
);
let drop_guard = AbortOnDropHandle::new(task);
Expand Down Expand Up @@ -899,7 +899,7 @@ pub(crate) mod stun_utils {
);
{
let sock = sock.clone();
tokio::spawn(
task::spawn(
async move {
debug!("udp stun socket listener started");
// TODO: Can we do better for buffers here? Probably doesn't matter much.
Expand Down Expand Up @@ -988,7 +988,6 @@ mod tests {

use bytes::BytesMut;
use netwatch::IpFamily;
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::info;

Expand Down Expand Up @@ -1388,7 +1387,7 @@ mod tests {
let mut actor = Actor::new(None, resolver.clone()).unwrap();
for s in &mut tt.steps {
// trigger the timer
time::advance(Duration::from_secs(s.after)).await;
tokio::time::advance(Duration::from_secs(s.after)).await;
let r = Arc::try_unwrap(s.r.take().unwrap()).unwrap();
s.r = Some(actor.add_report_history_and_set_preferred_relay(r));
}
Expand Down
29 changes: 13 additions & 16 deletions iroh-net-report/src/reportgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use anyhow::{anyhow, bail, Context as _, Result};
Expand All @@ -37,14 +36,13 @@ use iroh_relay::{
protos::stun,
RelayMap, RelayNode,
};
use n0_future::{
task::{self, AbortOnDropHandle, JoinSet},
time::{self, Duration, Instant},
};
use netwatch::{interfaces, UdpSocket};
use rand::seq::IteratorRandom;
use tokio::{
sync::{mpsc, oneshot},
task::JoinSet,
time::{self, Instant},
};
use tokio_util::task::AbortOnDropHandle;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, debug_span, error, info_span, trace, warn, Instrument, Span};
use url::Host;

Expand Down Expand Up @@ -115,9 +113,8 @@ impl Client {
dns_resolver,
protocols,
};
let task = tokio::spawn(
async move { actor.run().await }.instrument(info_span!("reportgen.actor")),
);
let task =
task::spawn(async move { actor.run().await }.instrument(info_span!("reportgen.actor")));
Self {
_drop_guard: AbortOnDropHandle::new(task),
}
Expand Down Expand Up @@ -245,9 +242,9 @@ impl Actor {
let mut captive_task = self.prepare_captive_portal_task();
let mut probes = self.spawn_probes_task().await?;

let total_timer = tokio::time::sleep(OVERALL_REPORT_TIMEOUT);
let total_timer = time::sleep(OVERALL_REPORT_TIMEOUT);
tokio::pin!(total_timer);
let probe_timer = tokio::time::sleep(PROBES_TIMEOUT);
let probe_timer = time::sleep(PROBES_TIMEOUT);
tokio::pin!(probe_timer);

loop {
Expand Down Expand Up @@ -385,7 +382,7 @@ impl Actor {
delay=?timeout,
"Have enough probe reports, aborting further probes soon",
);
tokio::spawn(
task::spawn(
async move {
time::sleep(timeout).await;
// Because we do this after a timeout it is entirely normal that the
Expand Down Expand Up @@ -488,9 +485,9 @@ impl Actor {
self.outstanding_tasks.captive_task = true;
MaybeFuture {
inner: Some(Box::pin(async move {
tokio::time::sleep(CAPTIVE_PORTAL_DELAY).await;
time::sleep(CAPTIVE_PORTAL_DELAY).await;
debug!("Captive portal check started after {CAPTIVE_PORTAL_DELAY:?}");
let captive_portal_check = tokio::time::timeout(
let captive_portal_check = time::timeout(
CAPTIVE_PORTAL_TIMEOUT,
check_captive_portal(&dns_resolver, &dm, preferred_relay)
.instrument(debug_span!("captive-portal")),
Expand Down Expand Up @@ -719,7 +716,7 @@ async fn run_probe(
) -> Result<ProbeReport, ProbeError> {
if !probe.delay().is_zero() {
trace!("delaying probe");
tokio::time::sleep(probe.delay()).await;
time::sleep(probe.delay()).await;
}
debug!("starting probe");

Expand Down
11 changes: 7 additions & 4 deletions iroh-net-report/src/reportgen/hairpin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};

use anyhow::{bail, Context, Result};
use iroh_relay::protos::stun;
use n0_future::{
task::{self, AbortOnDropHandle},
time::{self, Instant},
};
use netwatch::UdpSocket;
use tokio::{sync::oneshot, time::Instant};
use tokio_util::task::AbortOnDropHandle;
use tokio::sync::oneshot;
use tracing::{debug, error, info_span, trace, warn, Instrument};

use crate::{self as net_report, defaults::timeouts::HAIRPIN_CHECK_TIMEOUT, reportgen, Inflight};
Expand All @@ -43,7 +46,7 @@ impl Client {
};

let task =
tokio::spawn(async move { actor.run().await }.instrument(info_span!("hairpin.actor")));
task::spawn(async move { actor.run().await }.instrument(info_span!("hairpin.actor")));
Self {
addr: Some(addr),
_drop_guard: AbortOnDropHandle::new(task),
Expand Down Expand Up @@ -127,7 +130,7 @@ impl Actor {
}

let now = Instant::now();
let hairpinning_works = match tokio::time::timeout(HAIRPIN_CHECK_TIMEOUT, stun_rx).await {
let hairpinning_works = match time::timeout(HAIRPIN_CHECK_TIMEOUT, stun_rx).await {
Ok(Ok(_)) => true,
Ok(Err(_)) => bail!("net_report actor dropped stun response channel"),
Err(_) => false, // Elapsed
Expand Down
2 changes: 1 addition & 1 deletion iroh-net-report/src/reportgen/probes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use std::{collections::BTreeSet, fmt, sync::Arc};
use anyhow::{ensure, Result};
use iroh_base::RelayUrl;
use iroh_relay::{RelayMap, RelayNode};
use n0_future::time::Duration;
use netwatch::interfaces;
use tokio::time::Duration;

use crate::Report;

Expand Down
2 changes: 1 addition & 1 deletion iroh-relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ hyper = { version = "1", features = ["server", "client", "http1"] }
hyper-util = "0.1.1"
iroh-base = { version = "0.31.0", path = "../iroh-base", default-features = false, features = ["key", "relay"] }
iroh-metrics = { version = "0.31", default-features = false }
n0-future = { version = "0.0.1" }
n0-future = "0.1.2"
num_enum = "0.7"
pin-project = "1"
postcard = { version = "1", default-features = false, features = [
Expand Down
3 changes: 1 addition & 2 deletions iroh-relay/src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ use std::{
io,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use anyhow::{bail, Result};
use bytes::Bytes;
use iroh_base::{NodeId, SecretKey};
use n0_future::{Sink, Stream};
use n0_future::{time::Duration, Sink, Stream};
use tokio_tungstenite_wasm::WebSocketStream;
#[cfg(not(wasm_browser))]
use tokio_util::codec::Framed;
Expand Down
Loading
Loading