diff --git a/Cargo.lock b/Cargo.lock index c86df05c32..1524adf5dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -253,7 +253,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", @@ -274,6 +274,40 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1236b4b292f6c4d6dc34604bb5120d85c3fe1d1aa596bd5cc52ca054d13e7b9e" +dependencies = [ + "async-trait", + "axum-core 0.4.3", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.2.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-core" version = "0.3.4" @@ -291,6 +325,27 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backoff" version = "0.4.0" @@ -1760,6 +1815,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51ee2dd2e4f378392eeff5d51618cd9a63166a2513846bbc55f21cfacd9199d4" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.1.0", + "indexmap 2.2.5", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.4.0" @@ -2054,7 +2128,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.25", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -2077,6 +2151,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", + "h2 0.4.3", "http 1.1.0", "http-body 1.0.0", "httparse", @@ -2543,6 +2618,7 @@ version = "0.13.0" dependencies = [ "aead", "anyhow", + "axum 0.7.4", "backoff", "bytes", "clap", @@ -2558,6 +2634,7 @@ dependencies = [ "futures", "governor", "hex", + "hickory-proto", "hickory-resolver", "hostname", "http 1.1.0", @@ -2576,6 +2653,7 @@ dependencies = [ "num_enum", "once_cell", "parking_lot", + "pkarr", "postcard", "pretty_assertions", "proptest", @@ -2619,6 +2697,7 @@ dependencies = [ "windows 0.51.1", "wmi", "x509-parser", + "z32", ] [[package]] @@ -3477,6 +3556,23 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkarr" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d4025a211a70a716314d4ea6464aed150f696deb81651bebf62f874cee5aac7" +dependencies = [ + "bytes", + "ed25519-dalek", + "rand", + "reqwest", + "self_cell", + "simple-dns", + "thiserror", + "url", + "z32", +] + [[package]] name = "pkcs1" version = "0.7.5" @@ -4171,7 +4267,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.25", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.28", @@ -4580,6 +4676,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.5" @@ -4725,6 +4831,15 @@ dependencies = [ "rand_core", ] +[[package]] +name = "simple-dns" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01607fe2e61894468c6dc0b26103abb073fb08b79a3d9e4b6d76a1a341549958" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "slab" version = "0.4.9" @@ -5397,10 +5512,10 @@ checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes", - "h2", + "h2 0.3.25", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.28", @@ -6163,6 +6278,12 @@ dependencies = [ "time", ] +[[package]] +name = "z32" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edb37266251c28b03d08162174a91c3a092e3bd4f476f8205ee1c507b78b7bdc" + [[package]] name = "zerocopy" version = "0.7.32" diff --git a/iroh-base/src/key.rs b/iroh-base/src/key.rs index a08e52f629..d00e358cd7 100644 --- a/iroh-base/src/key.rs +++ b/iroh-base/src/key.rs @@ -229,6 +229,12 @@ impl From for PublicKey { } } +impl From for VerifyingKey { + fn from(value: PublicKey) -> Self { + value.public() + } +} + impl Debug for PublicKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "PublicKey({})", base32::fmt_short(self.as_bytes())) @@ -391,6 +397,12 @@ impl From for SecretKey { } } +impl From for SigningKey { + fn from(secret: SecretKey) -> Self { + secret.secret + } +} + impl From<[u8; 32]> for SecretKey { fn from(value: [u8; 32]) -> Self { Self::from_bytes(&value) diff --git a/iroh-base/src/node_addr.rs b/iroh-base/src/node_addr.rs index bf54e1293e..5a29122382 100644 --- a/iroh-base/src/node_addr.rs +++ b/iroh-base/src/node_addr.rs @@ -4,7 +4,7 @@ use anyhow::Context; use serde::{Deserialize, Serialize}; use url::Url; -use crate::key::PublicKey; +use crate::key::{NodeId, PublicKey}; /// A peer and it's addressing information. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -63,6 +63,12 @@ impl From<(PublicKey, Option, &[SocketAddr])> for NodeAddr { } } +impl From for NodeAddr { + fn from(node_id: NodeId) -> Self { + NodeAddr::new(node_id) + } +} + /// Addressing information to connect to a peer. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] pub struct AddrInfo { @@ -142,6 +148,12 @@ impl FromStr for RelayUrl { } } +impl From for Url { + fn from(value: RelayUrl) -> Self { + value.0 + } +} + /// Dereference to the wrapped [`Url`]. /// /// Note that [`DerefMut`] is not implemented on purpose, so this type has more flexibility diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 6030dbd1cb..621d6c6b98 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -236,11 +236,6 @@ impl BlobCommands { return Err(anyhow::anyhow!("The input arguments refer to a collection of blobs and output is set to STDOUT. Only single blobs may be passed in this case.")); } - if node_addr.info.is_empty() { - return Err(anyhow::anyhow!( - "no relay url provided and no direct addresses provided" - )); - } let tag = match tag { Some(tag) => SetTagOption::Named(Tag::from(tag)), None => SetTagOption::Auto, diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index 417e7e17a3..e024c48bf2 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -29,6 +29,8 @@ flume = "0.11" futures = "0.3.25" governor = "0.6.0" hex = "0.4.3" +hickory-proto = "0.24.0" +hickory-resolver = "0.24.0" hostname = "0.3.1" http = "1" http-body-util = "0.1.0" @@ -40,6 +42,7 @@ libc = "0.2.139" num_enum = "0.7" once_cell = "1.18.0" parking_lot = "0.12.1" +pkarr = { version = "1.1.3", default-features = false, features = ["async", "relay"] } postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } quinn = "0.10" quinn-proto = "0.10.5" @@ -65,12 +68,12 @@ tokio-rustls = { version = "0.24" } tokio-rustls-acme = { version = "0.2" } tokio-util = { version = "0.7", features = ["io-util", "io", "codec"] } tracing = "0.1" -hickory-resolver = "0.24.0" url = { version = "2.4", features = ["serde"] } watchable = "1.1.2" webpki = { package = "rustls-webpki", version = "0.101.4", features = ["std"] } webpki-roots = "0.25" x509-parser = "0.15" +z32 = "1.0.3" # iroh-relay clap = { version = "4", features = ["derive"], optional = true } @@ -106,6 +109,7 @@ tokio = { version = "1", features = ["io-util", "sync", "rt", "net", "fs", "macr tracing-subscriber = { version = "0.3", features = ["env-filter"] } iroh-test = { path = "../iroh-test" } serde_json = "1.0.107" +axum = "0.7.4" [[bench]] name = "key" diff --git a/iroh-net/src/bin/iroh-relay.rs b/iroh-net/src/bin/iroh-relay.rs index e7b435cb8c..4efc97b812 100644 --- a/iroh-net/src/bin/iroh-relay.rs +++ b/iroh-net/src/bin/iroh-relay.rs @@ -32,6 +32,7 @@ use tracing::{debug, debug_span, error, info, info_span, trace, warn, Instrument use tracing_subscriber::{prelude::*, EnvFilter}; use metrics::StunMetrics; +use url::Url; type BytesBody = http_body_util::Full; type HyperError = Box; @@ -199,6 +200,8 @@ struct Config { #[cfg(feature = "metrics")] /// Metrics serve address. If not set, metrics are not served. metrics_addr: Option, + /// Pkarr relay to publish node announces to + pkarr_relay: Option, } #[derive(Serialize, Deserialize)] @@ -249,6 +252,7 @@ impl Default for Config { limits: None, #[cfg(feature = "metrics")] metrics_addr: None, + pkarr_relay: None, } } } @@ -451,6 +455,10 @@ async fn run( Box::new(serve_no_content_handler), ); } + + if let Some(pkarr_relay) = cfg.pkarr_relay { + builder = builder.pkarr_relay(pkarr_relay); + } let relay_server = builder.spawn().await?; // captive portal detections must be served over HTTP diff --git a/iroh-net/src/discovery.rs b/iroh-net/src/discovery.rs index 0bb3791fad..155dc442ec 100644 --- a/iroh-net/src/discovery.rs +++ b/iroh-net/src/discovery.rs @@ -10,6 +10,9 @@ use tracing::{debug, error_span, warn, Instrument}; use crate::{AddrInfo, MagicEndpoint, NodeId}; +pub mod dns; +pub mod pkarr_publish; + /// Node discovery for [`super::MagicEndpoint`]. /// /// The purpose of this trait is to hook up a node discovery mechanism that @@ -67,11 +70,16 @@ pub struct ConcurrentDiscovery { } impl ConcurrentDiscovery { - /// Create a new [`ConcurrentDiscovery`]. - pub fn new() -> Self { + /// Create a empty [`ConcurrentDiscovery`]. + pub fn empty() -> Self { Self::default() } + /// Create a new [`ConcurrentDiscovery`]. + pub fn new(services: Vec>) -> Self { + Self { services } + } + /// Add a [`Discovery`] service. pub fn add(&mut self, service: impl Discovery + 'static) { self.services.push(Box::new(service)); @@ -418,7 +426,7 @@ mod tests { let secret = SecretKey::generate(); let disco1 = EmptyDiscovery; let disco2 = disco_shared.create_discovery(secret.public()); - let mut disco = ConcurrentDiscovery::new(); + let mut disco = ConcurrentDiscovery::empty(); disco.add(disco1); disco.add(disco2); new_endpoint(secret, disco).await @@ -447,7 +455,7 @@ mod tests { let disco1 = EmptyDiscovery; let disco2 = disco_shared.create_lying_discovery(secret.public()); let disco3 = disco_shared.create_discovery(secret.public()); - let mut disco = ConcurrentDiscovery::new(); + let mut disco = ConcurrentDiscovery::empty(); disco.add(disco1); disco.add(disco2); disco.add(disco3); @@ -473,8 +481,7 @@ mod tests { let ep2 = { let secret = SecretKey::generate(); let disco1 = disco_shared.create_lying_discovery(secret.public()); - let mut disco = ConcurrentDiscovery::new(); - disco.add(disco1); + let disco = ConcurrentDiscovery::new(vec![Box::new(disco1)]); new_endpoint(secret, disco).await }; let ep1_addr = NodeAddr::new(ep1.node_id()); @@ -532,3 +539,401 @@ mod tests { .as_micros() as u64 } } + +/// This module contains end-to-end tests for DNS node discovery. +/// +/// The tests run a minimal test DNS server to resolve against, and a minimal pkarr relay to +/// publish to. The relay and DNS servers share their state. +#[cfg(test)] +mod test_dns_pkarr { + use std::net::SocketAddr; + use std::{future::Future, time::Duration}; + + use anyhow::Result; + use hickory_resolver::{config::NameServerConfig, AsyncResolver, TokioAsyncResolver}; + use iroh_base::key::SecretKey; + use pkarr::SignedPacket; + use tokio::task::JoinHandle; + use tokio_util::sync::CancellationToken; + use url::Url; + + use crate::{ + discovery::pkarr_publish, + dns::node_info::{lookup_by_id, parse_hickory_node_info_name, NodeInfo}, + relay::{RelayMap, RelayMode}, + test_utils::{ + dns_server::{run_dns_server, Resolver}, + run_relay_server, run_relay_server_with_pkarr, + }, + AddrInfo, MagicEndpoint, NodeAddr, + }; + + use self::{pkarr_relay::run_pkarr_relay, state::State}; + + use super::{dns::DnsDiscovery, ConcurrentDiscovery}; + + #[tokio::test] + async fn dns_resolve() -> Result<()> { + let _logging_guard = iroh_test::logging::setup(); + let cancel = CancellationToken::new(); + let origin = "testdns.example".to_string(); + let state = State::new(origin.clone()); + let (nameserver, dns_task) = run_dns_server(state.clone(), cancel.clone()).await?; + + let node_secret = SecretKey::generate(); + let (node_info, signed_packet) = generate_node_info(&node_secret); + state.upsert(signed_packet)?; + + let resolver = dns_resolver(nameserver)?; + let resolved = lookup_by_id(&resolver, &node_info.node_id, &origin).await?; + + assert_eq!(resolved, node_info.into()); + + cancel.cancel(); + dns_task.await??; + Ok(()) + } + + #[tokio::test] + async fn pkarr_publish_dns_resolve() -> Result<()> { + let _logging_guard = iroh_test::logging::setup(); + + let cancel = CancellationToken::new(); + let origin = "testdns.example".to_string(); + let (nameserver, pkarr_url, _state, task) = + run_dns_and_pkarr_servers(origin.clone(), cancel.clone()).await?; + + let secret_key = SecretKey::generate(); + let node_id = secret_key.public(); + let publisher = pkarr_publish::Publisher::new(secret_key, pkarr_url); + + let addr_info = AddrInfo { + relay_url: Some("https://relay.example".parse().unwrap()), + ..Default::default() + }; + publisher.publish_addr_info(&addr_info).await?; + + let resolver = dns_resolver(nameserver)?; + let resolved = lookup_by_id(&resolver, &node_id, &origin).await?; + + let expected = NodeAddr { + info: addr_info, + node_id, + }; + + assert_eq!(resolved, expected); + + cancel.cancel(); + task.await??; + Ok(()) + } + + const TEST_ALPN: &[u8] = b"TEST"; + + #[tokio::test] + async fn pkarr_publish_dns_discover() -> Result<()> { + let _logging_guard = iroh_test::logging::setup(); + + let cancel = CancellationToken::new(); + let origin = "testdns.example".to_string(); + let timeout = Duration::from_secs(2); + + let (nameserver, pkarr_url, state, task) = + run_dns_and_pkarr_servers(&origin, cancel.clone()).await?; + let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?; + + let ep1 = ep_with_discovery(relay_map.clone(), nameserver, &origin, &pkarr_url).await?; + let ep2 = ep_with_discovery(relay_map, nameserver, &origin, &pkarr_url).await?; + + // wait until our shared state received the update from pkarr publishing + state.on_node(&ep1.node_id(), timeout).await?; + + // we connect only by node id! + let res = ep2.connect(ep1.node_id().into(), TEST_ALPN).await; + assert!(res.is_ok(), "connection established"); + cancel.cancel(); + task.await??; + Ok(()) + } + + #[tokio::test] + async fn relay_pkarr_publish_dns_discover() -> Result<()> { + let _logging_guard = iroh_test::logging::setup(); + + let cancel = CancellationToken::new(); + let origin = "testdns.example".to_string(); + let timeout = Duration::from_secs(2); + + let (nameserver, pkarr_url, state, task) = + run_dns_and_pkarr_servers(origin.clone(), cancel.clone()).await?; + let (relay_map, _relay_url, _relay_guard) = + run_relay_server_with_pkarr(Some(pkarr_url)).await?; + + let ep1 = ep_with_discovery_publish_relay(relay_map.clone(), nameserver, &origin).await?; + let ep2 = ep_with_discovery_publish_relay(relay_map, nameserver, &origin).await?; + + // wait until our shared state received the update from pkarr publishing + state.on_node(&ep1.node_id(), timeout).await?; + + // we connect only by node id! + let res = ep2.connect(ep1.node_id().into(), TEST_ALPN).await; + assert!(res.is_ok(), "connection established"); + + cancel.cancel(); + task.await??; + Ok(()) + } + + async fn ep_with_discovery_publish_relay( + relay_map: RelayMap, + nameserver: SocketAddr, + node_origin: &str, + ) -> Result { + let secret_key = SecretKey::generate(); + let resolver = dns_resolver(nameserver)?; + let discovery = DnsDiscovery::new(node_origin.to_string()); + MagicEndpoint::builder() + .relay_mode(RelayMode::Custom(relay_map)) + .secret_key(secret_key) + .dns_resolver(resolver) + .pkarr_announce() + .alpns(vec![TEST_ALPN.to_vec()]) + .discovery(Box::new(discovery)) + .bind(0) + .await + } + + async fn ep_with_discovery( + relay_map: RelayMap, + nameserver: SocketAddr, + node_origin: &str, + pkarr_relay: &Url, + ) -> Result { + let secret_key = SecretKey::generate(); + let resolver = dns_resolver(nameserver)?; + let discovery = ConcurrentDiscovery::new(vec![ + Box::new(DnsDiscovery::new(node_origin.to_string())), + Box::new(pkarr_publish::Publisher::new( + secret_key.clone(), + pkarr_relay.clone(), + )), + ]); + MagicEndpoint::builder() + .relay_mode(RelayMode::Custom(relay_map)) + .secret_key(secret_key) + .dns_resolver(resolver) + .alpns(vec![TEST_ALPN.to_vec()]) + .discovery(Box::new(discovery)) + .bind(0) + .await + } + + fn dns_resolver(nameserver: SocketAddr) -> Result { + let mut config = hickory_resolver::config::ResolverConfig::new(); + let nameserver_config = + NameServerConfig::new(nameserver, hickory_resolver::config::Protocol::Udp); + config.add_name_server(nameserver_config); + let resolver = AsyncResolver::tokio(config, Default::default()); + Ok(resolver) + } + + fn generate_node_info(secret: &SecretKey) -> (NodeInfo, SignedPacket) { + let node_id = secret.public(); + let relay_url: Url = "https://relay.example".parse().expect("valid url"); + let node_info = NodeInfo { + node_id, + relay_url: Some(relay_url.clone()), + }; + let signed_packet = node_info + .to_pkarr_signed_packet(secret, 30) + .expect("valid packet"); + (node_info, signed_packet) + } + + async fn run_dns_and_pkarr_servers( + origin: impl ToString, + cancel: CancellationToken, + ) -> Result<(SocketAddr, Url, State, JoinHandle>)> { + let state = State::new(origin.to_string()); + let (nameserver, dns_task) = run_dns_server(state.clone(), cancel.clone()).await?; + let (pkarr_url, pkarr_task) = run_pkarr_relay(state.clone(), cancel.clone()).await?; + let join_handle = tokio::task::spawn(async move { + dns_task.await??; + pkarr_task.await??; + Ok(()) + }); + Ok((nameserver, pkarr_url, state, join_handle)) + } + + mod state { + use crate::NodeId; + use anyhow::{anyhow, Result}; + use parking_lot::{Mutex, MutexGuard}; + use pkarr::SignedPacket; + use std::{ + collections::{hash_map, HashMap}, + ops::Deref, + sync::Arc, + time::Duration, + }; + + #[derive(Debug, Clone)] + pub struct State { + packets: Arc>>, + pub origin: String, + notify: Arc, + } + + impl State { + pub fn new(origin: String) -> Self { + Self { + packets: Default::default(), + origin, + notify: Arc::new(tokio::sync::Notify::new()), + } + } + + pub fn on_update(&self) -> tokio::sync::futures::Notified<'_> { + self.notify.notified() + } + + pub async fn on_node(&self, node: &NodeId, timeout: Duration) -> Result<()> { + let timeout = tokio::time::sleep(timeout); + tokio::pin!(timeout); + loop { + if self.get(node).is_some() { + return Ok(()); + } + tokio::select! { + _ = &mut timeout => return Err(anyhow!("timeout")), + _ = self.on_update() => {} + } + } + } + + pub fn upsert(&self, signed_packet: SignedPacket) -> anyhow::Result { + let node_id = NodeId::from_bytes(&signed_packet.public_key().to_bytes())?; + let mut map = self.packets.lock(); + let updated = match map.entry(node_id) { + hash_map::Entry::Vacant(e) => { + e.insert(signed_packet); + true + } + hash_map::Entry::Occupied(mut e) => { + if signed_packet.more_recent_than(e.get()) { + e.insert(signed_packet); + true + } else { + false + } + } + }; + if updated { + self.notify.notify_waiters(); + } + Ok(updated) + } + pub fn get(&self, node_id: &NodeId) -> Option + '_> { + let map = self.packets.lock(); + if map.contains_key(node_id) { + let guard = MutexGuard::map(map, |state| state.get_mut(node_id).unwrap()); + Some(guard) + } else { + None + } + } + } + } + + impl Resolver for State { + fn resolve( + &self, + query: &hickory_proto::op::Message, + reply: &mut hickory_proto::op::Message, + ) -> impl Future> + Send { + const TTL: u32 = 30; + let this = self.clone(); + async move { + for query in query.queries() { + let Some(node_id) = parse_hickory_node_info_name(query.name()) else { + continue; + }; + let packet = this.get(&node_id); + let Some(packet) = packet.as_ref() else { + continue; + }; + let node_info = NodeInfo::from_pkarr_signed_packet(packet)?; + let record = node_info.to_hickory_record(&this.origin, TTL)?; + reply.add_answer(record); + } + Ok(()) + } + } + } + + mod pkarr_relay { + use std::net::{Ipv4Addr, SocketAddr}; + + use anyhow::Result; + use axum::{ + extract::{Path, State}, + response::IntoResponse, + routing::put, + Router, + }; + use bytes::Bytes; + use tokio::task::JoinHandle; + use tokio_util::sync::CancellationToken; + use tracing::warn; + use url::Url; + + use super::State as AppState; + + pub async fn run_pkarr_relay( + state: AppState, + cancel: CancellationToken, + ) -> Result<(Url, JoinHandle>)> { + let bind_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + let app = Router::new() + .route("/pkarr/:key", put(pkarr_put)) + .with_state(state); + let listener = tokio::net::TcpListener::bind(bind_addr).await?; + let bound_addr = listener.local_addr()?; + let url: Url = format!("http://{bound_addr}/pkarr") + .parse() + .expect("valid url"); + let join_handle = tokio::task::spawn(async move { + let serve = axum::serve(listener, app); + let serve = serve.with_graceful_shutdown(cancel.cancelled_owned()); + serve.await?; + Ok(()) + }); + Ok((url, join_handle)) + } + + async fn pkarr_put( + State(state): State, + Path(key): Path, + body: Bytes, + ) -> Result { + let key = pkarr::PublicKey::try_from(key.as_str())?; + let signed_packet = pkarr::SignedPacket::from_relay_response(key, body)?; + let _updated = state.upsert(signed_packet)?; + Ok(http::StatusCode::NO_CONTENT) + } + + #[derive(Debug)] + struct AppError(anyhow::Error); + impl> From for AppError { + fn from(value: T) -> Self { + Self(value.into()) + } + } + impl IntoResponse for AppError { + fn into_response(self) -> axum::response::Response { + warn!(err = ?self, "request failed"); + (http::StatusCode::INTERNAL_SERVER_ERROR, self.0.to_string()).into_response() + } + } + } +} diff --git a/iroh-net/src/discovery/dns.rs b/iroh-net/src/discovery/dns.rs new file mode 100644 index 0000000000..8aa37ca71e --- /dev/null +++ b/iroh-net/src/discovery/dns.rs @@ -0,0 +1,63 @@ +//! DNS node discovery for iroh-net + +use crate::{ + discovery::{Discovery, DiscoveryItem}, + MagicEndpoint, NodeId, +}; +use anyhow::Result; +use futures::{future::FutureExt, stream::BoxStream, StreamExt}; + +use crate::dns; + +/// The n0 testing DNS node origin +pub const N0_TESTDNS_NODE_ORIGIN: &str = "testdns.iroh.link"; + +/// DNS node discovery. +/// +/// The DNS discovery looks up node addressing information over the Domain Name System. +/// Node information is resolved via an _iroh_node.z32encodednodeid TXT record. +/// +/// The content of this record is expected to be a DNS attribute string, with a required +/// `node=` attribute containing the base32 encoded node id and a `relay=` attribute containing the +/// node's home iroh-relay server. +/// +/// The discovery has to be configured with a `node_origin`, which is the domain name under which +/// lookups for nodes will be made. +/// With a origin of mydns.example, a node info record would be searched at +/// _iroh_node.z32encodednodeid.mydns.example TXT +#[derive(Debug)] +pub struct DnsDiscovery { + node_origin: String, +} + +impl DnsDiscovery { + /// Create a new DNS discovery with `node_origin` appended to all lookups. + pub fn new(node_origin: String) -> Self { + Self { node_origin } + } + + /// Create a new DNS discovery which uses the n0 testdns origin. + pub fn n0_testdns() -> Self { + Self::new(N0_TESTDNS_NODE_ORIGIN.to_string()) + } +} + +impl Discovery for DnsDiscovery { + fn resolve( + &self, + ep: MagicEndpoint, + node_id: NodeId, + ) -> Option>> { + let resolver = ep.dns_resolver().clone(); + let fut = async move { + let node_addr = + dns::node_info::lookup_by_id(&resolver, &node_id, &self.node_origin).await?; + Ok(DiscoveryItem { + provenance: "iroh-dns", + last_updated: None, + addr_info: node_addr.info, + }) + }; + Some(fut.into_stream().boxed()) + } +} diff --git a/iroh-net/src/discovery/pkarr_publish.rs b/iroh-net/src/discovery/pkarr_publish.rs new file mode 100644 index 0000000000..18a14fbee4 --- /dev/null +++ b/iroh-net/src/discovery/pkarr_publish.rs @@ -0,0 +1,116 @@ +//! A discovery service which publishes node information to a [Pkarr] relay. +//! +//! This service only implements the [`Discovery::publish`] method and does not provide discovery. +//! It encodes the node information into a DNS packet in the format resolvable by the +//! [`super::dns::DnsDiscovery`], which means a single _iroh_node TXT record, under the z32 encoded +//! node id as origin domain. +//! +//! [pkarr]: https://pkarr.org + +// TODO: Decide what to do with this module once publishing over relays land. Either remove, or +// leave in the repo but do not enable it by default in the iroh node. + +use std::sync::Arc; + +use anyhow::Result; +use parking_lot::RwLock; +use pkarr::SignedPacket; +use tracing::warn; +use url::Url; + +use crate::{discovery::Discovery, dns::node_info::NodeInfo, key::SecretKey, AddrInfo}; + +/// The n0 testing pkarr relay +pub const N0_TESTDNS_PKARR_RELAY: &str = "https://testdns.iroh.link/pkarr"; + +/// Default TTL for the _iroh_node TXT record in the pkarr signed packet +pub const DEFAULT_PKARR_TTL: u32 = 30; + +/// Publish node info to a pkarr relay. +#[derive(derive_more::Debug, Clone)] +pub struct Publisher { + #[debug("SecretKey")] + secret_key: SecretKey, + #[debug("PkarrClient")] + pkarr_client: PkarrRelayClient, + last_published: Arc>>, + ttl: u32, +} + +impl Publisher { + /// Create a new config with a secret key and a pkarr relay URL. + pub fn new(secret_key: SecretKey, pkarr_relay: Url) -> Self { + let pkarr_client = PkarrRelayClient::new(pkarr_relay); + Self { + secret_key, + pkarr_client, + ttl: DEFAULT_PKARR_TTL, + last_published: Default::default(), + } + } + + /// Create a config that publishes to the n0 testdns server. + pub fn n0_testdns(secret_key: SecretKey) -> Self { + let pkarr_relay: Url = N0_TESTDNS_PKARR_RELAY.parse().expect("url is valid"); + Self::new(secret_key, pkarr_relay) + } + + /// Set the TTL for pkarr packets, in seconds. + /// + /// Default value is 30 seconds. + pub fn set_ttl(&mut self, ttl: u32) { + self.ttl = ttl; + } + + /// Publish [`AddrInfo`] about this node to a pkarr relay. + pub async fn publish_addr_info(&self, info: &AddrInfo) -> Result<()> { + let info = NodeInfo::new( + self.secret_key.public(), + info.relay_url.clone().map(Url::from), + ); + if self.last_published.read().as_ref() == Some(&info) { + return Ok(()); + } + let _ = self.last_published.write().insert(info.clone()); + let signed_packet = info.to_pkarr_signed_packet(&self.secret_key, self.ttl)?; + self.pkarr_client.publish(&signed_packet).await?; + Ok(()) + } +} + +impl Discovery for Publisher { + fn publish(&self, info: &AddrInfo) { + let this = self.clone(); + let info = info.clone(); + tokio::task::spawn(async move { + if let Err(err) = this.publish_addr_info(&info).await { + warn!("failed to publish address update: {err:?}"); + } + }); + } +} + +/// A pkarr client to publish [`pkarr::SignedPacket`]s to a pkarr relay. +#[derive(Debug, Clone)] +pub(crate) struct PkarrRelayClient { + inner: pkarr::PkarrClient, + pkarr_relay: Url, +} + +impl PkarrRelayClient { + /// Create a new client. + pub fn new(pkarr_relay: Url) -> Self { + Self { + inner: pkarr::PkarrClient::builder().build(), + pkarr_relay, + } + } + + /// Publish a [`SignedPacket`] + pub async fn publish(&self, signed_packet: &SignedPacket) -> anyhow::Result<()> { + self.inner + .relay_put(&self.pkarr_relay, signed_packet) + .await?; + Ok(()) + } +} diff --git a/iroh-net/src/dns.rs b/iroh-net/src/dns.rs index 60c1d6eabd..7cfb3117fb 100644 --- a/iroh-net/src/dns.rs +++ b/iroh-net/src/dns.rs @@ -8,6 +8,8 @@ use anyhow::Result; use hickory_resolver::{AsyncResolver, IntoName, TokioAsyncResolver, TryParseIp}; use once_cell::sync::Lazy; +pub mod node_info; + /// The DNS resolver type used throughout `iroh-net`. pub type DnsResolver = TokioAsyncResolver; @@ -22,6 +24,11 @@ pub fn default_resolver() -> &'static DnsResolver { &DNS_RESOLVER } +/// Get the DNS resolver used within iroh-net. +pub fn resolver() -> &'static TokioAsyncResolver { + Lazy::force(&DNS_RESOLVER) +} + /// Deprecated IPv6 site-local anycast addresses still configured by windows. /// /// Windows still configures these site-local addresses as soon even as an IPv6 loopback @@ -142,7 +149,7 @@ pub async fn lookup_ipv4_ipv6( } #[cfg(test)] -mod tests { +pub(crate) mod tests { use crate::defaults::NA_RELAY_HOSTNAME; use super::*; diff --git a/iroh-net/src/dns/node_info.rs b/iroh-net/src/dns/node_info.rs new file mode 100644 index 0000000000..c823919d2f --- /dev/null +++ b/iroh-net/src/dns/node_info.rs @@ -0,0 +1,265 @@ +//! This module contains functions and structs to lookup node information from DNS +//! and to encode node information in Pkarr signed packets. + +use std::{collections::HashMap, fmt, str::FromStr}; + +use anyhow::{anyhow, bail, Result}; +use hickory_proto::error::ProtoError; +use hickory_resolver::{Name, TokioAsyncResolver}; +use url::Url; + +use crate::{key::SecretKey, AddrInfo, NodeAddr, NodeId}; + +const ATTR_RELAY: &str = "relay"; +const ATTR_NODE_ID: &str = "node"; + +/// The label for the node info TXT record +pub const IROH_NODE_TXT_LABEL: &str = "_iroh_node"; + +/// Lookup node info by domain name +/// +/// The domain name must either contain an _iroh_node TXT record or be a CNAME record that leads to +/// an _iroh_node TXT record. +pub async fn lookup_by_domain(resolver: &TokioAsyncResolver, domain: &str) -> Result { + let name = Name::from_str(domain)?; + let info = lookup_node_info(resolver, name).await?; + Ok(info.into()) +} + +/// Lookup node info by node id and origin domain name. +pub async fn lookup_by_id( + resolver: &TokioAsyncResolver, + node_id: &NodeId, + origin: &str, +) -> Result { + let domain = format!("{}.{}", to_z32(node_id), origin); + lookup_by_domain(resolver, &domain).await +} + +pub(crate) async fn lookup_node_info( + resolver: &TokioAsyncResolver, + name: Name, +) -> Result { + let name = ensure_iroh_node_txt_label(name)?; + let lookup = resolver.txt_lookup(name).await?; + NodeInfo::from_hickory_records(lookup.as_lookup().records()) +} + +fn ensure_iroh_node_txt_label(name: Name) -> Result { + if name.iter().next() == Some(IROH_NODE_TXT_LABEL.as_bytes()) { + Ok(name) + } else { + Name::parse(IROH_NODE_TXT_LABEL, Some(&name)) + } +} + +/// Encode a [`NodeId`] in [`z-base-32`] encoding. +/// +/// [z-base-32]: https://philzimmermann.com/docs/human-oriented-base-32-encoding.txt +pub fn to_z32(node_id: &NodeId) -> String { + z32::encode(node_id.as_bytes()) +} + +/// Parse a [`NodeId`] from [`z-base-32`] encoding. +/// +/// [z-base-32]: https://philzimmermann.com/docs/human-oriented-base-32-encoding.txt +pub fn from_z32(s: &str) -> Result { + let bytes = z32::decode(s.as_bytes()).map_err(|_| anyhow!("invalid z32"))?; + let bytes: &[u8; 32] = &bytes.try_into().map_err(|_| anyhow!("not 32 bytes long"))?; + let node_id = NodeId::from_bytes(bytes)?; + Ok(node_id) +} + +/// Node info contained in a DNS _iroh_node TXT record. +#[derive(derive_more::Debug, Clone, Eq, PartialEq)] +pub struct NodeInfo { + /// The node id + pub node_id: NodeId, + /// Home relay server for this node + #[debug("{:?}", self.relay_url.as_ref().map(|s| s.to_string()))] + pub relay_url: Option, +} + +impl From for NodeAddr { + fn from(value: NodeInfo) -> Self { + NodeAddr { + node_id: value.node_id, + info: value.into(), + } + } +} + +impl From for AddrInfo { + fn from(value: NodeInfo) -> Self { + AddrInfo { + relay_url: value.relay_url.map(|u| u.into()), + direct_addresses: Default::default(), + } + } +} + +impl NodeInfo { + /// Create a new [`NodeInfo`] from its parts. + pub fn new(node_id: NodeId, relay_url: Option>) -> Self { + Self { + node_id, + relay_url: relay_url.map(Into::into), + } + } + + /// Convert this node info into a DNS attribute string. + /// + /// It will look like this: + /// `node=b32encodednodeid relay=https://myrelay.example` + pub fn to_attribute_string(&self) -> String { + let mut attrs = vec![]; + attrs.push(fmt_attr(ATTR_NODE_ID, self.node_id)); + if let Some(relay) = &self.relay_url { + attrs.push(fmt_attr(ATTR_RELAY, relay)); + } + attrs.join(" ") + } + + /// Try to parse a [`NodeInfo`] from a set of DNS records. + pub fn from_hickory_records(records: &[hickory_proto::rr::Record]) -> Result { + use hickory_proto::rr; + let (node_id, txt) = records + .iter() + .find_map(|rr| match rr.data() { + Some(rr::RData::TXT(txt)) => { + parse_hickory_node_info_name(rr.name()).map(|node_id| (node_id, txt)) + } + _ => None, + }) + .ok_or_else(|| anyhow!("no TXT record with name _iroh_node.b32encodedpubkey found"))?; + let node_info = Self::parse_from_attributes(&txt.to_string())?; + if node_info.node_id != node_id { + bail!("node id mismatch between record name and TXT value"); + } + Ok(node_info) + } + + /// Parse the [`NodeInfo`] from an attribute string. + /// + /// See [Self::to_attribute_string] for the expected format. + pub fn parse_from_attributes(attrs: &str) -> Result { + let attrs = parse_attrs(attrs); + let Some(node) = attrs.get(ATTR_NODE_ID) else { + bail!("missing required node attribute"); + }; + if node.len() != 1 { + bail!("more than one node attribute is not allowed"); + } + let node_id = NodeId::from_str(node[0])?; + let relay_url: Option = attrs + .get(ATTR_RELAY) + .into_iter() + .flatten() + .find_map(|x| Url::parse(x).ok()); + Ok(Self { node_id, relay_url }) + } + + /// Create a [`pkarr::SignedPacket`] by constructing a DNS packet and + /// signing it with a [`SecretKey`]. + pub fn to_pkarr_signed_packet( + &self, + secret_key: &SecretKey, + ttl: u32, + ) -> Result { + let packet = self.to_pkarr_dns_packet(ttl)?; + let keypair = pkarr::Keypair::from_secret_key(&secret_key.to_bytes()); + let signed_packet = pkarr::SignedPacket::from_packet(&keypair, &packet)?; + Ok(signed_packet) + } + + fn to_pkarr_dns_packet(&self, ttl: u32) -> Result> { + use pkarr::dns::{self, rdata}; + let name = dns::Name::new(IROH_NODE_TXT_LABEL)?.into_owned(); + let rdata = { + let value = self.to_attribute_string(); + let txt = rdata::TXT::new().with_string(&value)?.into_owned(); + rdata::RData::TXT(txt) + }; + + let mut packet = dns::Packet::new_reply(0); + packet + .answers + .push(dns::ResourceRecord::new(name, dns::CLASS::IN, ttl, rdata)); + Ok(packet) + } + + /// Convert into a [`hickory_proto::rr::Record`] DNS record. + pub fn to_hickory_record(&self, origin: &str, ttl: u32) -> Result { + use hickory_proto::rr; + let name = format!( + "{}.{}.{}", + IROH_NODE_TXT_LABEL, + to_z32(&self.node_id), + origin + ); + let name = rr::Name::from_utf8(name)?; + let value = self.to_attribute_string(); + let txt = rr::rdata::TXT::new(vec![value]); + let rdata = rr::RData::TXT(txt); + let record = rr::Record::from_rdata(name, ttl, rdata); + Ok(record) + } + + /// Try to parse a [`NodeInfo`] from a [`pkarr::SignedPacket`]. + pub fn from_pkarr_signed_packet(packet: &pkarr::SignedPacket) -> Result { + use pkarr::dns::{self, rdata::RData}; + let pubkey = packet.public_key(); + let pubkey_z32 = pubkey.to_z32(); + let node_id = NodeId::from(*pubkey.verifying_key()); + let zone = dns::Name::new(&pubkey_z32)?; + let inner = packet.packet(); + let txt_record = inner + .answers + .iter() + .find_map(|rr| match &rr.rdata { + RData::TXT(txt) => match rr.name.without(&zone) { + Some(name) if name.to_string() == IROH_NODE_TXT_LABEL => Some(txt), + Some(_) | None => None, + }, + _ => None, + }) + .ok_or_else(|| anyhow!("missing _iroh_node txt record"))?; + + let txt_record = txt_record.to_owned(); + let txt = String::try_from(txt_record)?; + let info = Self::parse_from_attributes(&txt)?; + if info.node_id != node_id { + bail!("node id mismatch between record name and TXT value"); + } + Ok(info) + } +} + +pub(crate) fn parse_hickory_node_info_name(name: &hickory_proto::rr::Name) -> Option { + if name.num_labels() < 2 { + return None; + } + let mut labels = name.iter(); + let label = std::str::from_utf8(labels.next().expect("num_labels checked")).ok()?; + if label != IROH_NODE_TXT_LABEL { + return None; + } + let label = std::str::from_utf8(labels.next().expect("num_labels checked")).ok()?; + let node_id = from_z32(label).ok()?; + Some(node_id) +} + +fn fmt_attr(label: &str, value: impl fmt::Display) -> String { + format!("{label}={value}") +} + +fn parse_attrs<'a>(s: &'a str) -> HashMap<&'a str, Vec<&'a str>> { + let mut map: HashMap<&'a str, Vec<&'a str>> = HashMap::new(); + let parts = s.split(' '); + for part in parts { + if let Some((name, value)) = part.split_once('=') { + map.entry(name).or_default().push(value); + } + } + map +} diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index 318ce5da92..5880ed874f 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -41,6 +41,7 @@ pub struct MagicEndpointBuilder { /// Path for known peers. See [`MagicEndpointBuilder::peers_data_path`]. peers_path: Option, dns_resolver: Option, + pkarr_announce: bool, } impl Default for MagicEndpointBuilder { @@ -55,6 +56,7 @@ impl Default for MagicEndpointBuilder { discovery: Default::default(), peers_path: None, dns_resolver: None, + pkarr_announce: false, } } } @@ -156,6 +158,21 @@ impl MagicEndpointBuilder { self } + /// Announce the endpoint through the home relay. + /// + /// If enabled, and connected to a relay server, the node will publish its basic node + /// information to the relay server as a [`pkarr::SignedPacket`]. The node information contains + /// only our [`NodeId`] and the URL of our home relay. This is the minimal information needed + /// for other nodes to be able to connect to us if they only know our [`NodeId`]. + /// + /// The default relay servers run by number0 will republish this information as a resolvable TXT record + /// in the Domain Name System (DNS), which makes the assocation of a [`NodeId`] to its home + /// relay globally resolvable. + pub fn pkarr_announce(mut self) -> Self { + self.pkarr_announce = true; + self + } + /// Bind the magic endpoint on the specified socket address. /// /// The *bind_port* is the port that should be bound locally. @@ -192,6 +209,7 @@ impl MagicEndpointBuilder { nodes_path: self.peers_path, discovery: self.discovery, dns_resolver, + pkarr_announce: self.pkarr_announce, }; MagicEndpoint::bind(Some(server_config), msock_opts, self.keylog).await } @@ -520,6 +538,11 @@ impl MagicEndpoint { Ok(()) } + /// Get a reference to the DNS resolver used in this [`MagicEndpoint`]. + pub fn dns_resolver(&self) -> &DnsResolver { + self.msock.dns_resolver() + } + /// Close the QUIC endpoint and the magic socket. /// /// This will close all open QUIC connections with the provided error_code and reason. See diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 05a250eac7..57532253f1 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -119,6 +119,11 @@ pub struct Options { /// You can use [`crate::dns::default_resolver`] for a resolver that uses the system's DNS /// configuration. pub dns_resolver: DnsResolver, + + /// Whether to announce ourselves to the relay with a pkarr signed packet. + /// + /// If set to false no self-announces will be published. + pub pkarr_announce: bool, } impl Default for Options { @@ -130,6 +135,7 @@ impl Default for Options { nodes_path: None, discovery: None, dns_resolver: crate::dns::default_resolver().clone(), + pkarr_announce: false, } } } @@ -220,6 +226,9 @@ struct Inner { /// Indicates the update endpoint state. endpoints_update_state: EndpointUpdateState, + + /// Whether to announce ourselves to the relay with a pkarr signed packet. + pkarr_announce: bool, } impl Inner { @@ -1147,6 +1156,7 @@ impl MagicSock { discovery, nodes_path, dns_resolver, + pkarr_announce, } = opts; let nodes_path = match nodes_path { @@ -1227,6 +1237,7 @@ impl MagicSock { pending_call_me_maybes: Default::default(), endpoints_update_state: EndpointUpdateState::new(), dns_resolver, + pkarr_announce, }); let mut actor_tasks = JoinSet::default(); @@ -1363,6 +1374,11 @@ impl MagicSock { self.inner.node_map.add_node_addr(addr); } + /// Get a reference to the DNS resolver used in this [`MagicSock`]. + pub fn dns_resolver(&self) -> &DnsResolver { + &self.inner.dns_resolver + } + /// Closes the connection. /// /// Only the first close does anything. Any later closes return nil. @@ -2198,10 +2214,10 @@ impl Actor { info!("home is now relay {}", relay_url); self.inner.publish_my_addr(); - self.send_relay_actor(RelayActorMessage::NotePreferred(relay_url.clone())); - self.send_relay_actor(RelayActorMessage::Connect { + // This will also send a NotePreferred message to the relay, + // and, if configured, a [`pkarr::SignedPacket`] with info about ourselves. + self.send_relay_actor(RelayActorMessage::ConnectAsHomeRelay { url: relay_url.clone(), - peer: None, }); } diff --git a/iroh-net/src/magicsock/relay_actor.rs b/iroh-net/src/magicsock/relay_actor.rs index eb25b66644..c69c1301b3 100644 --- a/iroh-net/src/magicsock/relay_actor.rs +++ b/iroh-net/src/magicsock/relay_actor.rs @@ -19,6 +19,8 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, info_span, trace, warn, Instrument}; use crate::{ + discovery::pkarr_publish::DEFAULT_PKARR_TTL, + dns::node_info::NodeInfo, key::{PublicKey, PUBLIC_KEY_LENGTH}, relay::{self, http::ClientError, ReceivedMessage, RelayUrl, MAX_PACKET_SIZE}, }; @@ -38,11 +40,9 @@ pub(super) enum RelayActorMessage { contents: RelayContents, peer: PublicKey, }, - Connect { + ConnectAsHomeRelay { url: RelayUrl, - peer: Option, }, - NotePreferred(RelayUrl), MaybeCloseRelaysOnRebind(Vec), } @@ -79,6 +79,7 @@ enum ActiveRelayMessage { GetPeerRoute(PublicKey, oneshot::Sender>), GetClient(oneshot::Sender), NotePreferred(bool), + PkarrPublish(pkarr::SignedPacket), Shutdown, } @@ -133,6 +134,9 @@ impl ActiveRelay { ActiveRelayMessage::NotePreferred(is_preferred) => { self.relay_client.note_preferred(is_preferred).await; } + ActiveRelayMessage::PkarrPublish(packet) => { + self.relay_client.pkarr_publish(packet).await; + } ActiveRelayMessage::GetPeerRoute(peer, r) => { let res = if self.relay_routes.contains(&peer) { Some(self.relay_client.clone()) @@ -349,11 +353,8 @@ impl RelayActor { } => { self.send_relay(&url, contents, peer).await; } - RelayActorMessage::Connect { url, peer } => { - self.connect_relay(&url, peer.as_ref()).await; - } - RelayActorMessage::NotePreferred(my_relay) => { - self.note_preferred(&my_relay).await; + RelayActorMessage::ConnectAsHomeRelay { url } => { + self.connect_relay_as_home(&url).await; } RelayActorMessage::MaybeCloseRelaysOnRebind(ifs) => { self.maybe_close_relays_on_rebind(&ifs).await; @@ -420,6 +421,29 @@ impl RelayActor { } /// Connect to the given relay node. + async fn connect_relay_as_home(&mut self, url: &RelayUrl) { + self.connect_relay(url, None).await; + self.note_preferred(url).await; + if let Err(err) = self.pkarr_announce_to_relay(url).await { + warn!(?err, %url, "failed to send pkarr self-announce to home derper"); + } + } + + async fn pkarr_announce_to_relay(&self, my_relay: &RelayUrl) -> anyhow::Result<()> { + if self.conn.pkarr_announce { + let s = self + .active_relay + .iter() + .find_map(|(relay_url, (s, _))| (relay_url == my_relay).then_some(s)) + .context("home derp not in list of active derps")?; + let info = NodeInfo::new(self.conn.secret_key.public(), Some(my_relay.clone())); + let packet = info.to_pkarr_signed_packet(&self.conn.secret_key, DEFAULT_PKARR_TTL)?; + s.send(ActiveRelayMessage::PkarrPublish(packet)).await?; + } + Ok(()) + } + + /// Connect to the given derp node. async fn connect_relay( &mut self, url: &RelayUrl, @@ -583,7 +607,7 @@ impl RelayActor { async fn close_or_reconnect_relay(&mut self, url: &RelayUrl, why: &'static str) { self.close_relay(url, why).await; if self.conn.my_relay().as_ref() == Some(url) { - self.connect_relay(url, None).await; + self.connect_relay_as_home(url).await; } } diff --git a/iroh-net/src/relay/client.rs b/iroh-net/src/relay/client.rs index d9f44b6832..81a7f48928 100644 --- a/iroh-net/src/relay/client.rs +++ b/iroh-net/src/relay/client.rs @@ -22,6 +22,7 @@ use super::{ }; use crate::key::{PublicKey, SecretKey}; +use crate::relay::codec::PkarrWirePacket; use crate::util::AbortingJoinHandle; const CLIENT_RECV_TIMEOUT: Duration = Duration::from_secs(120); @@ -122,6 +123,17 @@ impl Client { Ok(()) } + /// Send a pkarr packet to the derper to publish for us. + /// + /// Must be signed by our secret key, otherwise the derper will reject it. + pub async fn pkarr_publish_packet(&self, packet: pkarr::SignedPacket) -> Result<()> { + self.inner + .writer_channel + .send(ClientWriterMessage::PkarrPublish(packet)) + .await?; + Ok(()) + } + /// The local address that the [`Client`] is listening on. pub fn local_addr(&self) -> Result { Ok(self.inner.local_addr) @@ -205,6 +217,8 @@ enum ClientWriterMessage { Ping([u8; 8]), /// Tell the server whether or not this client is the user's preferred client NotePreferred(bool), + /// Publish a pkarr signed packet about ourselves + PkarrPublish(pkarr::SignedPacket), /// Shutdown the writer Shutdown, } @@ -239,6 +253,11 @@ impl ClientWriter { write_frame(&mut self.writer, Frame::NotePreferred { preferred }, None).await?; self.writer.flush().await?; } + ClientWriterMessage::PkarrPublish(packet) => { + let packet = PkarrWirePacket::V0(packet.as_bytes()); + write_frame(&mut self.writer, Frame::PkarrPublish { packet }, None).await?; + self.writer.flush().await?; + } ClientWriterMessage::Shutdown => { return Ok(()); } @@ -331,6 +350,7 @@ impl ClientBuilder { }; let mut buf = encrypted_message.to_vec(); shared_secret.open(&mut buf)?; + let info: ServerInfo = postcard::from_bytes(&buf)?; if info.version != PROTOCOL_VERSION { bail!( diff --git a/iroh-net/src/relay/client_conn.rs b/iroh-net/src/relay/client_conn.rs index 47c4e65400..62faaa9e2e 100644 --- a/iroh-net/src/relay/client_conn.rs +++ b/iroh-net/src/relay/client_conn.rs @@ -15,7 +15,7 @@ use crate::{disco::looks_like_disco_wrapper, key::PublicKey}; use iroh_metrics::{inc, inc_by}; -use super::codec::{DerpCodec, Frame}; +use super::codec::{DerpCodec, Frame, PkarrWirePacket}; use super::server::MaybeTlsStream; use super::{ codec::{write_frame, KEEP_ALIVE}, @@ -76,6 +76,7 @@ pub struct ClientConnBuilder { pub(crate) write_timeout: Option, pub(crate) channel_capacity: usize, pub(crate) server_channel: mpsc::Sender, + pub(crate) can_pkarr_publish: bool, } impl ClientConnBuilder { @@ -89,6 +90,7 @@ impl ClientConnBuilder { self.write_timeout, self.channel_capacity, self.server_channel, + self.can_pkarr_publish, ) } } @@ -105,6 +107,7 @@ impl ClientConnManager { write_timeout: Option, channel_capacity: usize, server_channel: mpsc::Sender, + can_pkarr_publish: bool, ) -> ClientConnManager { let done = CancellationToken::new(); let client_id = (key, conn_num); @@ -124,6 +127,7 @@ impl ClientConnManager { key, preferred: Arc::clone(&preferred), server_channel: server_channel.clone(), + can_pkarr_publish, }; // start io loop @@ -226,6 +230,8 @@ pub(crate) struct ClientConnIo { // might find that the alternative is better, once I have a better idea of how this is supposed // to be read. preferred: Arc, + /// Whether this server support publishing pkarr packets. + can_pkarr_publish: bool, } impl ClientConnIo { @@ -359,6 +365,13 @@ impl ClientConnIo { Frame::Health { .. } => { inc!(Metrics, other_packets_recv); } + Frame::PkarrPublish { packet } => { + if self.can_pkarr_publish { + self.handle_pkarr_publish(packet).await?; + } else { + trace!("dropping incoming pkarr packet (no pkarr relay configured)"); + } + } _ => { inc!(Metrics, unknown_frames); } @@ -418,6 +431,14 @@ impl ClientConnIo { Ok(()) } + async fn handle_pkarr_publish(&self, frame: PkarrWirePacket) -> Result<()> { + let res = frame.verify_and_decode(&self.key); + let packet = res?; + self.send_server(ServerMessage::PkarrPublish(packet)) + .await?; + Ok(()) + } + /// Parse the SEND_PACKET frame, getting the destination and packet content /// Then sends the packet to the server, who directs it to the destination. /// @@ -482,6 +503,7 @@ mod tests { key, server_channel: server_channel_s, preferred: Arc::clone(&preferred), + can_pkarr_publish: false, }; let done = CancellationToken::new(); @@ -617,6 +639,7 @@ mod tests { key, server_channel: server_channel_s, preferred: Arc::clone(&preferred), + can_pkarr_publish: false, }; let done = CancellationToken::new(); diff --git a/iroh-net/src/relay/clients.rs b/iroh-net/src/relay/clients.rs index b91f4a066e..5f0fd5eefe 100644 --- a/iroh-net/src/relay/clients.rs +++ b/iroh-net/src/relay/clients.rs @@ -283,6 +283,7 @@ mod tests { write_timeout: None, channel_capacity: 10, server_channel, + can_pkarr_publish: false, }, FramedRead::new(test_io, DerpCodec), ) diff --git a/iroh-net/src/relay/codec.rs b/iroh-net/src/relay/codec.rs index 670c709651..8493d68069 100644 --- a/iroh-net/src/relay/codec.rs +++ b/iroh-net/src/relay/codec.rs @@ -105,6 +105,9 @@ pub(crate) enum FrameType { Restarting = 15, /// 32B src pub key + 32B dst pub key + packet bytes ForwardPacket = 16, + /// Sent from the client to the server, contains a [`pkarr::SignedPacket`], which the server should + /// publish on behalf to the client. + PkarrPublish = 17, #[num_enum(default)] Unknown = 255, } @@ -234,6 +237,54 @@ pub(crate) enum Frame { reconnect_in: u32, try_for: u32, }, + PkarrPublish { + packet: PkarrWirePacket, + }, +} + +/// A pkarr signed packet. +/// +/// This is wrapped in a wire encoding to allow changing the package format in the future without +/// breaking the protocol. +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum PkarrWirePacket { + V0(Bytes), +} +impl PkarrWirePacket { + pub fn len(&self) -> usize { + match self { + PkarrWirePacket::V0(b) => b.len() + 1, + } + } + pub fn encode(&self, dst: &mut BytesMut) { + match self { + PkarrWirePacket::V0(b) => { + dst.put_u8(0u8); + dst.put(b.as_ref()); + } + } + } + pub fn from_bytes(mut src: Bytes) -> anyhow::Result { + ensure!(src.len() > 1, "packet too short"); + let version = src.split_to(1); + match version[0] { + 0 => Ok(Self::V0(src)), + _ => bail!("unsupported pkarr wire packet version"), + } + } + pub fn verify_and_decode(self, public_key: &PublicKey) -> anyhow::Result { + match self { + PkarrWirePacket::V0(bytes) => { + ensure!(bytes.len() > 104, "invalid pkarr packet"); + ensure!( + &bytes[..32] == public_key.as_bytes(), + "pkarr packet does not match client public key" + ); + let packet = pkarr::SignedPacket::from_bytes(bytes, true)?; + Ok(packet) + } + } + } } impl Frame { @@ -251,6 +302,7 @@ impl Frame { Frame::Pong { .. } => FrameType::Pong, Frame::Health { .. } => FrameType::Health, Frame::Restarting { .. } => FrameType::Restarting, + Frame::PkarrPublish { .. } => FrameType::PkarrPublish, } } @@ -275,6 +327,7 @@ impl Frame { Frame::Pong { .. } => 8, Frame::Health { problem } => problem.len(), Frame::Restarting { .. } => 4 + 4, + Frame::PkarrPublish { packet } => packet.len(), } } @@ -330,6 +383,7 @@ impl Frame { dst.put_u32(*reconnect_in); dst.put_u32(*try_for); } + Frame::PkarrPublish { packet } => packet.encode(dst), } } @@ -440,6 +494,10 @@ impl Frame { try_for, } } + FrameType::PkarrPublish => { + let packet = PkarrWirePacket::from_bytes(content)?; + Self::PkarrPublish { packet } + } _ => { anyhow::bail!("invalid frame type: {:?}", frame_type); } diff --git a/iroh-net/src/relay/http/client.rs b/iroh-net/src/relay/http/client.rs index e7c3346b72..f64b6c37b7 100644 --- a/iroh-net/src/relay/http/client.rs +++ b/iroh-net/src/relay/http/client.rs @@ -19,7 +19,7 @@ use tokio::net::TcpStream; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinSet; use tokio::time::Instant; -use tracing::{debug, error, info_span, trace, warn, Instrument}; +use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use url::Url; use crate::dns::{lookup_ipv4_ipv6, DnsResolver}; @@ -135,6 +135,7 @@ enum ActorMessage { Close(oneshot::Sender>), CloseForReconnect(oneshot::Sender>), IsConnected(oneshot::Sender>), + PkarrPublish(pkarr::SignedPacket), } /// Receiving end of a [`Client`]. @@ -377,6 +378,15 @@ impl Client { .ok(); } + /// Announce ourselves by sending a [`pkarr::SignedPacket`] to our relay, containing + /// information about ourselves, which the relay will publish to a pkarr relay. + pub async fn pkarr_publish(&self, packet: pkarr::SignedPacket) { + self.inner + .send(ActorMessage::PkarrPublish(packet)) + .await + .ok(); + } + /// Get the local addr of the connection. If there is no current underlying relay connection /// or the [`Client`] is closed, returns `None`. pub async fn local_addr(&self) -> Option { @@ -451,6 +461,11 @@ impl Actor { ActorMessage::NotePreferred(is_preferred) => { self.note_preferred(is_preferred).await; }, + ActorMessage::PkarrPublish(packet) => { + if let Err(err) = self.pkarr_publish(packet).await { + warn!("pkarr publish to relay failed: {err:?}"); + } + }, ActorMessage::LocalAddr(s) => { let res = self.local_addr(); s.send(Ok(res)).ok(); @@ -636,6 +651,16 @@ impl Actor { } } + async fn pkarr_publish(&mut self, signed_packet: pkarr::SignedPacket) -> anyhow::Result<()> { + if let Some((ref client, _)) = self.relay_client { + info!("publish pkarr packet to derper"); + client.pkarr_publish_packet(signed_packet).await?; + Ok(()) + } else { + bail!("not connected") + } + } + fn local_addr(&self) -> Option { if self.is_closed { return None; diff --git a/iroh-net/src/relay/http/server.rs b/iroh-net/src/relay/http/server.rs index b151212a1b..1d6eaf55d1 100644 --- a/iroh-net/src/relay/http/server.rs +++ b/iroh-net/src/relay/http/server.rs @@ -18,6 +18,7 @@ use tokio::task::JoinHandle; use tokio_rustls_acme::AcmeAcceptor; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, info_span, warn, Instrument}; +use url::Url; use crate::key::SecretKey; use crate::relay::http::HTTP_UPGRADE_PROTOCOL; @@ -143,6 +144,10 @@ pub struct ServerBuilder { /// When `None`, a default is provided. #[debug("{}", not_found_fn.as_ref().map_or("None", |_| "Some(Box Result> + Send + Sync + 'static>)"))] not_found_fn: Option, + /// Pkarr relay to publish node announces to. + /// + /// When `None`, publishing to pkarr is disabled. + pkarr_relay: Option, } impl ServerBuilder { @@ -157,6 +162,7 @@ impl ServerBuilder { relay_override: None, headers: HeaderMap::new(), not_found_fn: None, + pkarr_relay: None, } } @@ -211,11 +217,17 @@ impl ServerBuilder { self } + /// Set a pkarr relay. This enables node announce publishing. + pub fn pkarr_relay(mut self, pkarr_relay: Url) -> Self { + self.pkarr_relay = Some(pkarr_relay); + self + } + /// Build and spawn an HTTP(S) relay Server pub async fn spawn(self) -> Result { ensure!(self.secret_key.is_some() || self.relay_override.is_some(), "Must provide a `SecretKey` for the relay server OR pass in an override function for the 'relay' endpoint"); let (relay_handler, relay_server) = if let Some(secret_key) = self.secret_key { - let server = crate::relay::server::Server::new(secret_key.clone()); + let server = crate::relay::server::Server::new(secret_key.clone(), self.pkarr_relay); ( RelayHandler::ConnHandler(server.client_conn_handler(self.headers.clone())), Some(server), diff --git a/iroh-net/src/relay/server.rs b/iroh-net/src/relay/server.rs index a354fca028..fa83325359 100644 --- a/iroh-net/src/relay/server.rs +++ b/iroh-net/src/relay/server.rs @@ -8,6 +8,7 @@ use std::time::Duration; use anyhow::{Context as _, Result}; use futures::SinkExt; use hyper::HeaderMap; +use iroh_base::base32; use iroh_metrics::core::UsageStatsReport; use iroh_metrics::{inc, report_usage_stats}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -15,8 +16,10 @@ use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_util::codec::Framed; use tokio_util::sync::CancellationToken; -use tracing::{info_span, trace, Instrument}; +use tracing::{debug, info_span, trace, warn, Instrument}; +use url::Url; +use crate::discovery::pkarr_publish::PkarrRelayClient as PkarrClient; use crate::key::{PublicKey, SecretKey, SharedSecret}; use super::{ @@ -63,14 +66,18 @@ pub struct Server { loop_handler: JoinHandle>, /// Done token, forces a hard shutdown. To gracefully shutdown, use [`Server::close`] cancel: CancellationToken, + /// Whether this server has a pkarr relay configured to publish packets to. + can_pkarr_publish: bool, // TODO: stats collection } impl Server { /// TODO: replace with builder - pub fn new(key: SecretKey) -> Self { + pub fn new(key: SecretKey, pkarr_relay: Option) -> Self { let (server_channel_s, server_channel_r) = mpsc::channel(SERVER_CHANNEL_SIZE); - let server_actor = ServerActor::new(key.public(), server_channel_r); + let pkarr_client = pkarr_relay.map(PkarrClient::new); + let can_pkarr_publish = pkarr_client.is_some(); + let server_actor = ServerActor::new(key.public(), server_channel_r, pkarr_client); let cancel_token = CancellationToken::new(); let done = cancel_token.clone(); let server_task = tokio::spawn( @@ -88,6 +95,7 @@ impl Server { server_info: ServerInfo::no_rate_limit(), loop_handler: server_task, cancel: cancel_token, + can_pkarr_publish, } } @@ -134,6 +142,7 @@ impl Server { write_timeout: self.write_timeout, server_info: self.server_info.clone(), default_headers: Arc::new(default_headers), + can_pkarr_publish: self.can_pkarr_publish, } } @@ -156,6 +165,7 @@ pub struct ClientConnHandler { write_timeout: Option, server_info: ServerInfo, pub(super) default_headers: Arc, + can_pkarr_publish: bool, } impl Clone for ClientConnHandler { @@ -166,6 +176,7 @@ impl Clone for ClientConnHandler { write_timeout: self.write_timeout, server_info: self.server_info.clone(), default_headers: Arc::clone(&self.default_headers), + can_pkarr_publish: self.can_pkarr_publish, } } } @@ -200,6 +211,7 @@ impl ClientConnHandler { write_timeout: self.write_timeout, channel_capacity: PER_CLIENT_SEND_QUEUE_DEPTH, server_channel: self.server_channel.clone(), + can_pkarr_publish: self.can_pkarr_publish, }; trace!("accept: create client"); self.server_channel @@ -255,14 +267,21 @@ pub(crate) struct ServerActor { receiver: mpsc::Receiver, /// All clients connected to this server clients: Clients, + /// Pkarr client to publish client announces + pkarr_client: Option, } impl ServerActor { - pub(crate) fn new(key: PublicKey, receiver: mpsc::Receiver) -> Self { + pub(crate) fn new( + key: PublicKey, + receiver: mpsc::Receiver, + pkarr_client: Option, + ) -> Self { Self { key, receiver, clients: Clients::new(), + pkarr_client, } } @@ -345,6 +364,9 @@ impl ServerActor { self.clients.unregister(&key); } } + ServerMessage::PkarrPublish(packet) => { + self.pkarr_publish(packet); + } ServerMessage::Shutdown => { tracing::info!("server gracefully shutting down..."); // close all client connections and client read/write loops @@ -356,6 +378,25 @@ impl ServerActor { } } } + + pub(crate) fn pkarr_publish(&self, packet: pkarr::SignedPacket) { + if let Some(client) = &self.pkarr_client { + debug!( + "publish pkarr packet for {:?}", + base32::fmt_short(packet.public_key().to_bytes()) + ); + // TODO: Add a queue for publishing to the pkarr relay and not spawn one-off tasks for + // each packet. + let client = client.clone(); + tokio::task::spawn(async move { + if let Err(err) = client.publish(&packet).await { + warn!(?err, "failed to publish packet to pkarr relay") + } + }); + } else { + debug!("drop pkarr packet, no pkarr relay configured") + } + } } /// Initializes the [`Server`] with a self-signed x509 cert @@ -498,6 +539,7 @@ mod tests { write_timeout: None, channel_capacity: 10, server_channel, + can_pkarr_publish: false, }, Framed::new(test_io, DerpCodec), ) @@ -509,7 +551,7 @@ mod tests { // make server actor let (server_channel, server_channel_r) = mpsc::channel(20); - let server_actor: ServerActor = ServerActor::new(server_key, server_channel_r); + let server_actor: ServerActor = ServerActor::new(server_key, server_channel_r, None); let done = CancellationToken::new(); let server_done = done.clone(); @@ -581,6 +623,7 @@ mod tests { server_info: ServerInfo::no_rate_limit(), server_channel: server_channel_s, default_headers: Default::default(), + can_pkarr_publish: false, }; // create the parts needed for a client @@ -659,7 +702,7 @@ mod tests { // create the server! let server_key = SecretKey::generate(); - let server: Server = Server::new(server_key); + let server: Server = Server::new(server_key, None); // create client a and connect it to the server let key_a = SecretKey::generate(); @@ -729,7 +772,7 @@ mod tests { // create the server! let server_key = SecretKey::generate(); - let server: Server = Server::new(server_key); + let server: Server = Server::new(server_key, None); // create client a and connect it to the server let key_a = SecretKey::generate(); diff --git a/iroh-net/src/relay/types.rs b/iroh-net/src/relay/types.rs index 736b585eb6..d1fe50cca8 100644 --- a/iroh-net/src/relay/types.rs +++ b/iroh-net/src/relay/types.rs @@ -94,5 +94,6 @@ pub(crate) enum ServerMessage { #[debug("CreateClient")] CreateClient(ClientConnBuilder), RemoveClient((PublicKey, usize)), + PkarrPublish(pkarr::SignedPacket), Shutdown, } diff --git a/iroh-net/src/test_utils.rs b/iroh-net/src/test_utils.rs index 6292349a2b..b383ef016a 100644 --- a/iroh-net/src/test_utils.rs +++ b/iroh-net/src/test_utils.rs @@ -24,12 +24,30 @@ pub(crate) struct CleanupDropGuard(pub(crate) oneshot::Sender<()>); /// /// [`MagicEndpoint::connect`]: crate::magic_endpoint::MagicEndpoint pub(crate) async fn run_relay_server() -> Result<(RelayMap, RelayUrl, CleanupDropGuard)> { + run_relay_server_with_pkarr(None).await +} + +/// Runs a relay server with STUN enabled suitable for tests. +/// +/// The returned `Url` is the url of the relay server in the returned [`RelayMap`], it +/// is always `Some` as that is how the [`MagicEndpoint::connect`] API expects it. +/// +/// [`MagicEndpoint::connect`]: crate::magic_endpoint::MagicEndpoint +pub(crate) async fn run_relay_server_with_pkarr( + pkarr_relay: Option, +) -> Result<(RelayMap, RelayUrl, CleanupDropGuard)> { let server_key = SecretKey::generate(); let me = server_key.public().fmt_short(); let tls_config = crate::relay::http::make_tls_config(); let server = crate::relay::http::ServerBuilder::new("127.0.0.1:0".parse().unwrap()) .secret_key(Some(server_key)) - .tls_config(Some(tls_config)) + .tls_config(Some(tls_config)); + let server = if let Some(pkarr_relay) = pkarr_relay { + server.pkarr_relay(pkarr_relay) + } else { + server + }; + let server = server .spawn() .instrument(error_span!("relay server", %me)) .await?; @@ -62,3 +80,91 @@ pub(crate) async fn run_relay_server() -> Result<(RelayMap, RelayUrl, CleanupDro Ok((m, url, CleanupDropGuard(tx))) } + +pub mod dns_server { + use std::net::{Ipv4Addr, SocketAddr}; + + use anyhow::{ensure, Result}; + use futures::{future::BoxFuture, Future}; + use hickory_proto::{ + op::{header::MessageType, Message}, + serialize::binary::BinDecodable, + }; + use tokio::{net::UdpSocket, task::JoinHandle}; + use tokio_util::sync::CancellationToken; + use tracing::{debug, warn}; + + pub trait Resolver: Send + Sync + 'static { + fn resolve( + &self, + query: &Message, + reply: &mut Message, + ) -> impl Future> + Send; + } + + pub type ResolveCallback = Box< + dyn Fn(&Message, &mut Message) -> BoxFuture<'static, Result<()>> + Send + Sync + 'static, + >; + impl Resolver for ResolveCallback { + fn resolve( + &self, + query: &Message, + reply: &mut Message, + ) -> impl Future> + Send { + (self)(query, reply) + } + } + + pub async fn run_dns_server( + resolver: impl Resolver, + cancel: CancellationToken, + ) -> Result<(SocketAddr, JoinHandle>)> { + let bind_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + let socket = UdpSocket::bind(bind_addr).await?; + let bound_addr = socket.local_addr()?; + let s = TestDnsServer { + socket, + cancel, + resolver, + }; + let join_handle = tokio::task::spawn(async move { s.run().await }); + Ok((bound_addr, join_handle)) + } + + struct TestDnsServer { + resolver: R, + socket: UdpSocket, + cancel: CancellationToken, + } + + impl TestDnsServer { + async fn run(self) -> Result<()> { + let mut buf = [0; 1450]; + loop { + tokio::select! { + _ = self.cancel.cancelled() => break, + res = self.socket.recv_from(&mut buf) => { + let (len, from) = res?; + if let Err(err) = self.handle_datagram(from, &buf[..len]).await { + warn!(?err, %from, "failed to handle incoming datagram"); + } + } + }; + } + Ok(()) + } + + async fn handle_datagram(&self, from: SocketAddr, buf: &[u8]) -> Result<()> { + let packet = Message::from_bytes(buf)?; + debug!(queries = ?packet.queries(), %from, "received query"); + let mut reply = packet.clone(); + reply.set_message_type(MessageType::Response); + self.resolver.resolve(&packet, &mut reply).await?; + debug!(?reply, %from, "send reply"); + let buf = reply.to_vec()?; + let len = self.socket.send_to(&buf, from).await?; + ensure!(len == buf.len(), "failed to send complete packet"); + Ok(()) + } + } +} diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 67120b4502..e365d3c174 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -42,7 +42,7 @@ mod builder; mod rpc; mod rpc_status; -pub use builder::{Builder, GcPolicy, StorageConfig}; +pub use builder::{Builder, GcPolicy, NodeAnnounceConfig, NodeDiscoveryConfig, StorageConfig}; pub use rpc_status::RpcStatus; type EventCallback = Box BoxFuture<'static, ()> + 'static + Sync + Send>; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 39785dbcc0..c41be71e32 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -16,7 +16,11 @@ use iroh_bytes::{ }; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_net::{ - magic_endpoint::get_alpn, relay::RelayMode, util::AbortingJoinHandle, MagicEndpoint, + discovery::{dns::DnsDiscovery, Discovery}, + magic_endpoint::get_alpn, + relay::RelayMode, + util::AbortingJoinHandle, + MagicEndpoint, }; use iroh_sync::net::SYNC_ALPN; use quic_rpc::{ @@ -80,7 +84,9 @@ where keylog: bool, relay_mode: RelayMode, gc_policy: GcPolicy, + node_discovery: NodeDiscoveryConfig, docs_store: iroh_sync::store::fs::Store, + node_announce: NodeAnnounceConfig, } /// Configuration for storage. @@ -92,6 +98,39 @@ pub enum StorageConfig { Persistent(PathBuf), } +/// Configuration for node discovery. +#[derive(Debug, Default)] +pub enum NodeDiscoveryConfig { + /// Use no node discovery mechanism. + None, + /// Use the default discovery mechanism. + /// + /// This enables the [`DnsDiscovery`] service. + #[default] + Default, + /// Use a custom discovery mechanism. + Custom(Box), +} + +/// Configuration for node self-announces. +#[derive(Debug, Default)] +pub enum NodeAnnounceConfig { + /// Do not self-announce. + None, + /// Announce ourselves to the home relay. + /// + /// If enabled, and connected to a relay server, the node will publish its basic node + /// information to the relay server as a signed packet. The node information currently contains + /// only our [`iroh_net::NodeId`] and the URL of our home relay. This is the minimal information + /// needed for other nodes to be able to connect to us. + /// + /// The default relay servers run by n0 will republish this information as a TXT record + /// in the Domain Name System (DNS), which makes the assocation of a node id to its home + /// relay globally resolvable. + #[default] + PublishHomeRelay, +} + impl Default for Builder { fn default() -> Self { Self { @@ -104,6 +143,8 @@ impl Default for Builder { rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, docs_store: iroh_sync::store::Store::memory(), + node_discovery: Default::default(), + node_announce: Default::default(), } } } @@ -125,6 +166,8 @@ impl Builder { rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, docs_store, + node_discovery: Default::default(), + node_announce: Default::default(), } } } @@ -183,6 +226,8 @@ where relay_mode: self.relay_mode, gc_policy: self.gc_policy, docs_store, + node_discovery: self.node_discovery, + node_announce: self.node_announce, }) } @@ -199,6 +244,8 @@ where relay_mode: self.relay_mode, gc_policy: self.gc_policy, docs_store: self.docs_store, + node_discovery: self.node_discovery, + node_announce: self.node_announce, } } @@ -222,6 +269,8 @@ where relay_mode: self.relay_mode, gc_policy: self.gc_policy, docs_store: self.docs_store, + node_discovery: self.node_discovery, + node_announce: self.node_announce, }) } @@ -247,6 +296,24 @@ where self } + /// Sets the node discovery mechanism. + /// + /// The default is [`NodeDiscoveryConfig::Default`]. Use [`NodeDiscoveryConfig::Custom`] to pass a + /// custom [`Discovery`]. + pub fn node_discovery(mut self, config: NodeDiscoveryConfig) -> Self { + self.node_discovery = config; + self + } + + /// Announce the endpoint through the home relay. + /// + /// The default is [`NodeAnnounceConfig::PublishHomeRelay`], which means that we publish a signed + /// packet with the URL of our home relay server. See [`NodeAnnounceConfig`] for details. + pub fn node_announce(mut self, config: NodeAnnounceConfig) -> Self { + self.node_announce = config; + self + } + /// Binds the node service to a different socket. /// /// By default it binds to `127.0.0.1:11204`. @@ -294,6 +361,24 @@ where .max_concurrent_bidi_streams(MAX_STREAMS.try_into()?) .max_concurrent_uni_streams(0u32.into()); + let discovery: Option> = match self.node_discovery { + NodeDiscoveryConfig::None => None, + NodeDiscoveryConfig::Custom(discovery) => Some(discovery), + NodeDiscoveryConfig::Default => { + // let discovery = ConcurrentDiscovery::new(vec![ + // // Enable DNS discovery by default + // Box::new(DnsDiscovery::n0_testdns()), + // // Enable pkarr publishing by default + // // TODO: We don't want nodes to self-publish. Remove once publishing over derpers lands. + // Box::new(pkarr_publish::Publisher::n0_testdns( + // self.secret_key.clone(), + // )), + // ]); + let discovery = DnsDiscovery::n0_testdns(); + Some(Box::new(discovery)) + } + }; + let endpoint = MagicEndpoint::builder() .secret_key(self.secret_key.clone()) .alpns(PROTOCOLS.iter().map(|p| p.to_vec()).collect()) @@ -301,6 +386,14 @@ where .transport_config(transport_config) .concurrent_connections(MAX_CONNECTIONS) .relay_mode(self.relay_mode); + let endpoint = match discovery { + Some(discovery) => endpoint.discovery(discovery), + None => endpoint, + }; + let endpoint = match self.node_announce { + NodeAnnounceConfig::PublishHomeRelay => endpoint.pkarr_announce(), + NodeAnnounceConfig::None => endpoint, + }; let endpoint = match self.storage { StorageConfig::Persistent(ref root) => { let peers_data_path = IrohPaths::PeerData.with_root(root);