Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make Endpoint::node_addr watchable and add trait Watcher & combinators #3045

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions iroh/bench/src/iroh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use anyhow::{Context, Result};
use bytes::Bytes;
use iroh::{
endpoint::{Connection, ConnectionError, RecvStream, SendStream, TransportConfig},
watcher::Watcher as _,
Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl,
};
use tracing::{trace, warn};
Expand Down
4 changes: 2 additions & 2 deletions iroh/examples/connect-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use std::net::SocketAddr;

use clap::Parser;
use iroh::{Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey};
use iroh::{watcher::Watcher as _, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey};
use tracing::info;

// An example ALPN that we are using to communicate over the `Endpoint`
Expand Down Expand Up @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
.bind()
.await?;

let node_addr = endpoint.node_addr().await?;
let node_addr = endpoint.node_addr().initialized().await?;
let me = node_addr.node_id;
println!("node id: {me}");
println!("node listening addresses:");
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::net::SocketAddr;

use anyhow::Context;
use clap::Parser;
use iroh::{Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey};
use iroh::{watcher::Watcher as _, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey};
use tracing::info;

// An example ALPN that we are using to communicate over the `Endpoint`
Expand Down
3 changes: 2 additions & 1 deletion iroh/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures_lite::future::Boxed as BoxedFuture;
use iroh::{
endpoint::Connecting,
protocol::{ProtocolHandler, Router},
watcher::Watcher as _,
Endpoint, NodeAddr,
};

Expand All @@ -23,7 +24,7 @@ const ALPN: &[u8] = b"iroh-example/echo/0";
#[tokio::main]
async fn main() -> Result<()> {
let router = accept_side().await?;
let node_addr = router.endpoint().node_addr().await?;
let node_addr = router.endpoint().node_addr().initialized().await?;

connect_side(node_addr).await?;

Expand Down
4 changes: 2 additions & 2 deletions iroh/examples/listen-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! This example uses the default relay servers to attempt to holepunch, and will use that relay server to relay packets if the two devices cannot establish a direct UDP connection.
//! run this example from the project root:
//! $ cargo run --example listen-unreliable
use iroh::{Endpoint, RelayMode, SecretKey};
use iroh::{watcher::Watcher as _, Endpoint, RelayMode, SecretKey};
use tracing::{info, warn};

// An example ALPN that we are using to communicate over the `Endpoint`
Expand Down Expand Up @@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> {
println!("node id: {me}");
println!("node listening addresses:");

let node_addr = endpoint.node_addr().await?;
let node_addr = endpoint.node_addr().initialized().await?;
let local_addrs = node_addr
.direct_addresses
.into_iter()
Expand Down
4 changes: 2 additions & 2 deletions iroh/examples/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! $ cargo run --example listen
use std::time::Duration;

use iroh::{endpoint::ConnectionError, Endpoint, RelayMode, SecretKey};
use iroh::{endpoint::ConnectionError, watcher::Watcher as _, Endpoint, RelayMode, SecretKey};
use tracing::{debug, info, warn};

// An example ALPN that we are using to communicate over the `Endpoint`
Expand Down Expand Up @@ -37,7 +37,7 @@ async fn main() -> anyhow::Result<()> {
println!("node id: {me}");
println!("node listening addresses:");

let node_addr = endpoint.node_addr().await?;
let node_addr = endpoint.node_addr().initialized().await?;
let local_addrs = node_addr
.direct_addresses
.into_iter()
Expand Down
3 changes: 2 additions & 1 deletion iroh/examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use bytes::Bytes;
use clap::{Parser, Subcommand};
use indicatif::HumanBytes;
use iroh::{
endpoint::ConnectionError, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, SecretKey,
endpoint::ConnectionError, watcher::Watcher as _, Endpoint, NodeAddr, RelayMap, RelayMode,
RelayUrl, SecretKey,
};
use iroh_base::ticket::NodeTicket;
use tracing::info;
Expand Down
12 changes: 6 additions & 6 deletions iroh/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ mod tests {
use tokio_util::task::AbortOnDropHandle;

use super::*;
use crate::RelayMode;
use crate::{watcher::Watcher as _, RelayMode};

type InfoStore = HashMap<NodeId, (Option<RelayUrl>, BTreeSet<SocketAddr>, u64)>;

Expand Down Expand Up @@ -580,7 +580,7 @@ mod tests {
};
let ep1_addr = NodeAddr::new(ep1.node_id());
// wait for our address to be updated and thus published at least once
ep1.node_addr().await?;
ep1.node_addr().initialized().await?;
let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
Ok(())
}
Expand All @@ -606,7 +606,7 @@ mod tests {
};
let ep1_addr = NodeAddr::new(ep1.node_id());
// wait for out address to be updated and thus published at least once
ep1.node_addr().await?;
ep1.node_addr().initialized().await?;
let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
Ok(())
}
Expand Down Expand Up @@ -636,7 +636,7 @@ mod tests {
};
let ep1_addr = NodeAddr::new(ep1.node_id());
// wait for out address to be updated and thus published at least once
ep1.node_addr().await?;
ep1.node_addr().initialized().await?;
let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
Ok(())
}
Expand All @@ -659,7 +659,7 @@ mod tests {
};
let ep1_addr = NodeAddr::new(ep1.node_id());
// wait for out address to be updated and thus published at least once
ep1.node_addr().await?;
ep1.node_addr().initialized().await?;
let res = ep2.connect(ep1_addr, TEST_ALPN).await;
assert!(res.is_err());
Ok(())
Expand All @@ -682,7 +682,7 @@ mod tests {
new_endpoint(secret, disco).await
};
// wait for out address to be updated and thus published at least once
ep1.node_addr().await?;
ep1.node_addr().initialized().await?;
let ep1_wrong_addr = NodeAddr {
node_id: ep1.node_id(),
relay_url: None,
Expand Down
2 changes: 1 addition & 1 deletion iroh/src/discovery/local_swarm_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use tracing::{debug, error, info_span, trace, warn, Instrument};

use crate::{
discovery::{Discovery, DiscoveryItem},
watchable::Watchable,
watcher::{Watchable, Watcher as _},
Endpoint,
};

Expand Down
4 changes: 2 additions & 2 deletions iroh/src/discovery/pkarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::{
discovery::{Discovery, DiscoveryItem},
dns::node_info::NodeInfo,
endpoint::force_staging_infra,
watchable::{Disconnected, Watchable, Watcher},
watcher::{self, Disconnected, Watchable, Watcher as _},
Endpoint,
};

Expand Down Expand Up @@ -221,7 +221,7 @@ struct PublisherService {
secret_key: SecretKey,
#[debug("PkarrClient")]
pkarr_client: PkarrRelayClient,
watcher: Watcher<Option<NodeInfo>>,
watcher: watcher::Direct<Option<NodeInfo>>,
ttl: u32,
republish_interval: Duration,
}
Expand Down
Loading
Loading