From b764064c6fa082dad296edc24d9fc9fb1d4039a9 Mon Sep 17 00:00:00 2001 From: aidan46 Date: Mon, 20 Jan 2025 13:18:24 +0800 Subject: [PATCH 1/8] feat: Add re-registration on expiry for p2p node --- docs/src/storage-provider-cli/server.md | 5 +++ storage-provider/server/src/config.rs | 3 ++ storage-provider/server/src/main.rs | 6 +++ storage-provider/server/src/p2p/bootstrap.rs | 13 ++++++- storage-provider/server/src/p2p/mod.rs | 17 +++++++-- storage-provider/server/src/p2p/register.rs | 39 +++++++++++++------- 6 files changed, 65 insertions(+), 18 deletions(-) diff --git a/docs/src/storage-provider-cli/server.md b/docs/src/storage-provider-cli/server.md index e3febfde..41247493 100644 --- a/docs/src/storage-provider-cli/server.md +++ b/docs/src/storage-provider-cli/server.md @@ -97,6 +97,10 @@ Rendezvous point address that the registration node connects to or the bootstrap Peer ID of the rendezvous point that the registration node connects to. Only needed if running a registration P2P node. +### `--registration-ttl` + +The TTL of the p2p registration in seconds. After the node registration expires, the server automatically re-registers itself. + ### `--config` Takes in a path to a configuration file, it supports both JSON and TOML (files _must_ have the right extension). @@ -117,6 +121,7 @@ The supported configuration parameters are: | `p2p_key` | NA | | `rendezvous_point_address` | NA | | `rendezvous_point` | `None` | +| `registration_ttl` | `None` | #### Bare bones configuration diff --git a/storage-provider/server/src/config.rs b/storage-provider/server/src/config.rs index 9b72ae5c..c1ae90c4 100644 --- a/storage-provider/server/src/config.rs +++ b/storage-provider/server/src/config.rs @@ -123,4 +123,7 @@ pub struct ConfigurationArgs { #[serde(default, deserialize_with = "string_to_peer_id_option")] #[arg(long)] pub(crate) rendezvous_point: Option, + + /// TTL of the p2p registration in seconds + pub(crate) registration_ttl: Option, } diff --git a/storage-provider/server/src/main.rs b/storage-provider/server/src/main.rs index 388d841a..a93e7d16 100644 --- a/storage-provider/server/src/main.rs +++ b/storage-provider/server/src/main.rs @@ -272,6 +272,9 @@ pub struct Server { /// PeerID of the bootstrap node used by the registration node. /// Optional because it is not used by the bootstrap node. rendezvous_point: Option, + + /// TTL of the p2p registration in seconds + registration_ttl: Option, } impl TryFrom for Server { @@ -353,6 +356,7 @@ impl TryFrom for Server { p2p_key: args.p2p_key, rendezvous_point_address: args.rendezvous_point_address, rendezvous_point: args.rendezvous_point, + registration_ttl: args.registration_ttl, }) } } @@ -481,6 +485,7 @@ impl Server { p2p_key: self.p2p_key, rendezvous_point_address: self.rendezvous_point_address, rendezvous_point: self.rendezvous_point, + registration_ttl: self.registration_ttl, }; Ok(SetupOutput { @@ -568,6 +573,7 @@ fn spawn_p2p_task( p2p_state.p2p_key, p2p_state.rendezvous_point_address, rendezvous_point, + p2p_state.registration_ttl, ); Ok(tokio::spawn(run_register_node(config, cancellation_token))) } diff --git a/storage-provider/server/src/p2p/bootstrap.rs b/storage-provider/server/src/p2p/bootstrap.rs index ebc8cc14..b02bc554 100644 --- a/storage-provider/server/src/p2p/bootstrap.rs +++ b/storage-provider/server/src/p2p/bootstrap.rs @@ -9,7 +9,7 @@ use libp2p::{ tcp, yamux, Multiaddr, Swarm, SwarmBuilder, }; -use super::P2PError; +use super::{P2PError, TTL_24_HOURS}; #[derive(NetworkBehaviour)] pub struct BootstrapBehaviour { @@ -39,7 +39,7 @@ impl BootstrapConfig { .with_behaviour(|key| BootstrapBehaviour { // Rendezvous server behaviour for serving new peers to connecting nodes. rendezvous: rendezvous::server::Behaviour::new( - rendezvous::server::Config::default(), + rendezvous::server::Config::default().with_max_ttl(TTL_24_HOURS), // Max TTL of 24 hours ), // The identify behaviour is used to share the external address and the public key with connecting clients. identify: identify::Behaviour::new(identify::Config::new( @@ -95,6 +95,15 @@ pub(crate) async fn bootstrap( ); } } + SwarmEvent::Behaviour(BootstrapBehaviourEvent::Rendezvous( + rendezvous::server::Event::RegistrationExpired(registration), + )) => { + tracing::info!( + "Registration for peer {} expired in namespace {}", + registration.record.peer_id(), + registration.namespace + ); + } _other => {} } } diff --git a/storage-provider/server/src/p2p/mod.rs b/storage-provider/server/src/p2p/mod.rs index 7b24c081..11ce26e4 100644 --- a/storage-provider/server/src/p2p/mod.rs +++ b/storage-provider/server/src/p2p/mod.rs @@ -15,8 +15,10 @@ pub(crate) use bootstrap::BootstrapConfig; pub(crate) use register::RegisterConfig; const P2P_NAMESPACE: &str = "polka-storage"; +const TTL_24_HOURS: u64 = 86400; #[derive(Default, Debug, Clone, Copy, ValueEnum, Deserialize)] +#[serde(rename_all = "lowercase")] pub enum NodeType { #[default] Bootstrap, @@ -67,6 +69,9 @@ pub(crate) struct P2PState { /// PeerID of the bootstrap node used by the registration node. /// Optional because it is not used by the bootstrap node. pub(crate) rendezvous_point: Option, + + /// TTL of the p2p registration in seconds + pub(crate) registration_ttl: Option, } /// Deserializes a ED25519 private key into a Keypair. @@ -141,14 +146,20 @@ pub async fn run_register_node( ) -> Result<(), P2PError> { tracing::info!("Starting P2P register node"); let tracker = TaskTracker::new(); - let (swarm, rendezvous_point_address, rendezvous_point) = config.create_swarm()?; + // If TTL is not set, set it to 24 hours + let ttl = if config.registration_ttl.is_some() { + config.registration_ttl + } else { + Some(TTL_24_HOURS) + }; + let (mut swarm, rendezvous_point_address, rendezvous_point) = config.create_swarm()?; tokio::select! { res = register( - swarm, + &mut swarm, rendezvous_point, rendezvous_point_address, - None, + ttl, Namespace::from_static(P2P_NAMESPACE), ) => { if let Err(e) = res { diff --git a/storage-provider/server/src/p2p/register.rs b/storage-provider/server/src/p2p/register.rs index c5dc986d..8768f8be 100644 --- a/storage-provider/server/src/p2p/register.rs +++ b/storage-provider/server/src/p2p/register.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use libp2p::{ futures::StreamExt, identify, @@ -9,8 +7,10 @@ use libp2p::{ swarm::{NetworkBehaviour, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, }; +use tokio::time::Duration; use super::P2PError; +use crate::p2p::TTL_24_HOURS; #[derive(NetworkBehaviour)] pub struct RegisterBehaviour { @@ -22,6 +22,7 @@ pub struct RegisterConfig { keypair: Keypair, rendezvous_point_address: Multiaddr, rendezvous_point: PeerId, + pub(crate) registration_ttl: Option, } impl RegisterConfig { @@ -29,11 +30,13 @@ impl RegisterConfig { keypair: Keypair, rendezvous_point_address: Multiaddr, rendezvous_point: PeerId, + registration_ttl: Option, ) -> Self { Self { keypair, rendezvous_point_address, rendezvous_point, + registration_ttl, } } @@ -66,26 +69,39 @@ impl RegisterConfig { /// Register the peer with the rendezvous point. /// The ttl is how long the peer will remain registered in seconds. pub(crate) async fn register( - mut swarm: Swarm, + swarm: &mut Swarm, rendezvous_point: PeerId, rendezvous_point_address: Multiaddr, ttl: Option, namespace: Namespace, ) -> Result<(), P2PError> { tracing::info!("Attempting to register with rendezvous point {rendezvous_point} at {rendezvous_point_address}"); - swarm.dial(rendezvous_point_address.clone())?; + let mut register_tick = tokio::time::interval(Duration::from_secs(ttl.unwrap_or(TTL_24_HOURS))); + loop { + register_tick.tick().await; + swarm.dial(rendezvous_point_address.clone())?; + register_and_check_events(swarm, rendezvous_point, ttl, namespace.clone()).await?; + } +} + +async fn register_and_check_events( + swarm: &mut Swarm, + rendezvous_point: PeerId, + ttl: Option, + namespace: Namespace, +) -> Result<(), P2PError> { while let Some(event) = swarm.next().await { match event { - SwarmEvent::NewListenAddr { address, .. } => { - tracing::info!("Listening on {}", address); + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + tracing::info!(%peer_id, "Connection established"); } SwarmEvent::ConnectionClosed { peer_id, cause: Some(error), .. } if peer_id == rendezvous_point => { - tracing::info!("Lost connection to rendezvous point {}", error); + tracing::info!(%peer_id, %error, "Lost connection to rendezvous point"); } // once `/identify` did its job, we know our external address and can register SwarmEvent::Behaviour(RegisterBehaviourEvent::Identify( @@ -125,15 +141,12 @@ pub(crate) async fn register( error, }, )) => { - tracing::error!( - "Failed to register: rendezvous_node={}, namespace={}, error_code={:?}", - rendezvous_node, - namespace, - error + tracing::error!(%rendezvous_node, %namespace, + "Failed to register error = {error:?}" ); return Err(P2PError::RegistrationFailed(rendezvous_node)); } - _other => {} + other => tracing::debug!("Unimplemented event encountered: {other:?}"), } } From b95b68293f7d7682cf53548417b979b05c47ac93 Mon Sep 17 00:00:00 2001 From: aidan46 Date: Tue, 21 Jan 2025 18:05:24 +0800 Subject: [PATCH 2/8] fix: Registration ttl default --- storage-provider/server/src/config.rs | 11 +++++++++- storage-provider/server/src/main.rs | 2 +- storage-provider/server/src/p2p/bootstrap.rs | 5 +++-- storage-provider/server/src/p2p/mod.rs | 16 ++++++--------- storage-provider/server/src/p2p/register.rs | 21 ++++++++++---------- 5 files changed, 30 insertions(+), 25 deletions(-) diff --git a/storage-provider/server/src/config.rs b/storage-provider/server/src/config.rs index c1ae90c4..7019a0ad 100644 --- a/storage-provider/server/src/config.rs +++ b/storage-provider/server/src/config.rs @@ -15,6 +15,8 @@ use crate::{ DEFAULT_NODE_ADDRESS, }; +pub const DEFAULT_REGISTRATION_TTL: u64 = 86400; + /// Default address to bind the RPC server to. const fn default_rpc_listen_address() -> SocketAddr { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8000) @@ -31,6 +33,11 @@ const fn default_parallel_prove_commits() -> NonZero { unsafe { NonZero::new_unchecked(2) } } +/// Default registration TTL, how long the node is registered. +const fn default_registration_ttl() -> u64 { + DEFAULT_REGISTRATION_TTL +} + fn default_node_address() -> Url { Url::parse(DEFAULT_NODE_ADDRESS).expect("DEFAULT_NODE_ADDRESS must be a valid Url") } @@ -125,5 +132,7 @@ pub struct ConfigurationArgs { pub(crate) rendezvous_point: Option, /// TTL of the p2p registration in seconds - pub(crate) registration_ttl: Option, + #[serde(default = "default_registration_ttl")] + #[arg(long, default_value_t = DEFAULT_REGISTRATION_TTL)] + pub(crate) registration_ttl: u64, } diff --git a/storage-provider/server/src/main.rs b/storage-provider/server/src/main.rs index a93e7d16..8ac00ac1 100644 --- a/storage-provider/server/src/main.rs +++ b/storage-provider/server/src/main.rs @@ -274,7 +274,7 @@ pub struct Server { rendezvous_point: Option, /// TTL of the p2p registration in seconds - registration_ttl: Option, + registration_ttl: u64, } impl TryFrom for Server { diff --git a/storage-provider/server/src/p2p/bootstrap.rs b/storage-provider/server/src/p2p/bootstrap.rs index b02bc554..55ef4e24 100644 --- a/storage-provider/server/src/p2p/bootstrap.rs +++ b/storage-provider/server/src/p2p/bootstrap.rs @@ -9,7 +9,8 @@ use libp2p::{ tcp, yamux, Multiaddr, Swarm, SwarmBuilder, }; -use super::{P2PError, TTL_24_HOURS}; +use super::P2PError; +use crate::config::DEFAULT_REGISTRATION_TTL; #[derive(NetworkBehaviour)] pub struct BootstrapBehaviour { @@ -39,7 +40,7 @@ impl BootstrapConfig { .with_behaviour(|key| BootstrapBehaviour { // Rendezvous server behaviour for serving new peers to connecting nodes. rendezvous: rendezvous::server::Behaviour::new( - rendezvous::server::Config::default().with_max_ttl(TTL_24_HOURS), // Max TTL of 24 hours + rendezvous::server::Config::default().with_max_ttl(DEFAULT_REGISTRATION_TTL), // Max TTL of 24 hours ), // The identify behaviour is used to share the external address and the public key with connecting clients. identify: identify::Behaviour::new(identify::Config::new( diff --git a/storage-provider/server/src/p2p/mod.rs b/storage-provider/server/src/p2p/mod.rs index 11ce26e4..102f23ed 100644 --- a/storage-provider/server/src/p2p/mod.rs +++ b/storage-provider/server/src/p2p/mod.rs @@ -15,7 +15,6 @@ pub(crate) use bootstrap::BootstrapConfig; pub(crate) use register::RegisterConfig; const P2P_NAMESPACE: &str = "polka-storage"; -const TTL_24_HOURS: u64 = 86400; #[derive(Default, Debug, Clone, Copy, ValueEnum, Deserialize)] #[serde(rename_all = "lowercase")] @@ -71,7 +70,7 @@ pub(crate) struct P2PState { pub(crate) rendezvous_point: Option, /// TTL of the p2p registration in seconds - pub(crate) registration_ttl: Option, + pub(crate) registration_ttl: u64, } /// Deserializes a ED25519 private key into a Keypair. @@ -146,20 +145,17 @@ pub async fn run_register_node( ) -> Result<(), P2PError> { tracing::info!("Starting P2P register node"); let tracker = TaskTracker::new(); - // If TTL is not set, set it to 24 hours - let ttl = if config.registration_ttl.is_some() { - config.registration_ttl - } else { - Some(TTL_24_HOURS) - }; - let (mut swarm, rendezvous_point_address, rendezvous_point) = config.create_swarm()?; + let rendezvous_point = config.rendezvous_point; + let rendezvous_point_address = config.rendezvous_point_address.clone(); + let registration_ttl = config.registration_ttl; + let mut swarm = config.create_swarm()?; tokio::select! { res = register( &mut swarm, rendezvous_point, rendezvous_point_address, - ttl, + registration_ttl, Namespace::from_static(P2P_NAMESPACE), ) => { if let Err(e) = res { diff --git a/storage-provider/server/src/p2p/register.rs b/storage-provider/server/src/p2p/register.rs index 8768f8be..b45218d2 100644 --- a/storage-provider/server/src/p2p/register.rs +++ b/storage-provider/server/src/p2p/register.rs @@ -10,7 +10,6 @@ use libp2p::{ use tokio::time::Duration; use super::P2PError; -use crate::p2p::TTL_24_HOURS; #[derive(NetworkBehaviour)] pub struct RegisterBehaviour { @@ -20,9 +19,9 @@ pub struct RegisterBehaviour { pub struct RegisterConfig { keypair: Keypair, - rendezvous_point_address: Multiaddr, - rendezvous_point: PeerId, - pub(crate) registration_ttl: Option, + pub(crate) rendezvous_point_address: Multiaddr, + pub(crate) rendezvous_point: PeerId, + pub(crate) registration_ttl: u64, } impl RegisterConfig { @@ -30,7 +29,7 @@ impl RegisterConfig { keypair: Keypair, rendezvous_point_address: Multiaddr, rendezvous_point: PeerId, - registration_ttl: Option, + registration_ttl: u64, ) -> Self { Self { keypair, @@ -40,7 +39,7 @@ impl RegisterConfig { } } - pub fn create_swarm(self) -> Result<(Swarm, Multiaddr, PeerId), P2PError> { + pub fn create_swarm(self) -> Result, P2PError> { let swarm = SwarmBuilder::with_existing_identity(self.keypair) .with_tokio() .with_tcp( @@ -62,7 +61,7 @@ impl RegisterConfig { .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(10))) .build(); - Ok((swarm, self.rendezvous_point_address, self.rendezvous_point)) + Ok(swarm) } } @@ -72,11 +71,11 @@ pub(crate) async fn register( swarm: &mut Swarm, rendezvous_point: PeerId, rendezvous_point_address: Multiaddr, - ttl: Option, + ttl: u64, namespace: Namespace, ) -> Result<(), P2PError> { tracing::info!("Attempting to register with rendezvous point {rendezvous_point} at {rendezvous_point_address}"); - let mut register_tick = tokio::time::interval(Duration::from_secs(ttl.unwrap_or(TTL_24_HOURS))); + let mut register_tick = tokio::time::interval(Duration::from_secs(ttl)); loop { register_tick.tick().await; @@ -88,7 +87,7 @@ pub(crate) async fn register( async fn register_and_check_events( swarm: &mut Swarm, rendezvous_point: PeerId, - ttl: Option, + ttl: u64, namespace: Namespace, ) -> Result<(), P2PError> { while let Some(event) = swarm.next().await { @@ -113,7 +112,7 @@ async fn register_and_check_events( if let Err(error) = swarm.behaviour_mut().rendezvous.register( namespace.clone(), rendezvous_point, - ttl, + Some(ttl), ) { tracing::error!("Failed to register: {error}"); return Err(P2PError::RegistrationFailed(rendezvous_point)); From 7e5a451b4b0eece6aed7aed5c407f6e8430d18b1 Mon Sep 17 00:00:00 2001 From: aidan46 Date: Tue, 21 Jan 2025 18:05:24 +0800 Subject: [PATCH 3/8] docs: Update registration ttl docs --- docs/src/storage-provider-cli/server.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/storage-provider-cli/server.md b/docs/src/storage-provider-cli/server.md index 41247493..311a3862 100644 --- a/docs/src/storage-provider-cli/server.md +++ b/docs/src/storage-provider-cli/server.md @@ -121,7 +121,7 @@ The supported configuration parameters are: | `p2p_key` | NA | | `rendezvous_point_address` | NA | | `rendezvous_point` | `None` | -| `registration_ttl` | `None` | +| `registration_ttl` | `24 hours` | #### Bare bones configuration From 112e999d7842e3a85a0d5e2ce2bda3410e4f22f6 Mon Sep 17 00:00:00 2001 From: aidan46 Date: Wed, 22 Jan 2025 14:50:26 +0800 Subject: [PATCH 4/8] fix: Add select so swarm keeps polling --- storage-provider/server/src/p2p/bootstrap.rs | 9 +- storage-provider/server/src/p2p/register.rs | 131 ++++++++++--------- 2 files changed, 71 insertions(+), 69 deletions(-) diff --git a/storage-provider/server/src/p2p/bootstrap.rs b/storage-provider/server/src/p2p/bootstrap.rs index 55ef4e24..7f6ca1b2 100644 --- a/storage-provider/server/src/p2p/bootstrap.rs +++ b/storage-provider/server/src/p2p/bootstrap.rs @@ -66,11 +66,8 @@ pub(crate) async fn bootstrap( swarm.listen_on(addr)?; while let Some(event) = swarm.next().await { match event { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - tracing::info!("Connected to {}", peer_id); - } - SwarmEvent::ConnectionClosed { peer_id, .. } => { - tracing::info!("Disconnected from {}", peer_id); + SwarmEvent::NewListenAddr { address, .. } => { + tracing::info!("Listening on {}", address); } SwarmEvent::Behaviour(BootstrapBehaviourEvent::Rendezvous( rendezvous::server::Event::PeerRegistered { peer, registration }, @@ -105,7 +102,7 @@ pub(crate) async fn bootstrap( registration.namespace ); } - _other => {} + other => tracing::debug!("Encountered event: {other:?}"), } } Ok(()) diff --git a/storage-provider/server/src/p2p/register.rs b/storage-provider/server/src/p2p/register.rs index b45218d2..8867a18a 100644 --- a/storage-provider/server/src/p2p/register.rs +++ b/storage-provider/server/src/p2p/register.rs @@ -77,77 +77,82 @@ pub(crate) async fn register( tracing::info!("Attempting to register with rendezvous point {rendezvous_point} at {rendezvous_point_address}"); let mut register_tick = tokio::time::interval(Duration::from_secs(ttl)); + // Dial into bootstrap address + swarm.dial(rendezvous_point_address.clone())?; + // Get and add external address + let external_addr = get_external_address(swarm).await; + swarm.add_external_address(external_addr); + loop { - register_tick.tick().await; - swarm.dial(rendezvous_point_address.clone())?; - register_and_check_events(swarm, rendezvous_point, ttl, namespace.clone()).await?; + tokio::select! { + // Poll tick every TTL to re-register. + // First tick completes immediately. + _ = register_tick.tick() => { + tracing::info!("Registering with p2p node"); + // Dial to establish a connection. + // Dial is needed because the connection is not kept alive using rendezvous protocol. + swarm.dial(rendezvous_point_address.clone())?; + // Register with bootstrap node. + if let Err(error) = + swarm + .behaviour_mut() + .rendezvous + .register(namespace.clone(), rendezvous_point, Some(ttl)) + { + tracing::error!("Failed to register: {error}"); + return Err(P2PError::RegistrationFailed(rendezvous_point)); + } else { + tracing::info!("Registration with {rendezvous_point} successful"); + } + } + // Check incoming event. + event = swarm.select_next_some() => check_swarm_event(event) + } } } -async fn register_and_check_events( - swarm: &mut Swarm, - rendezvous_point: PeerId, - ttl: u64, - namespace: Namespace, -) -> Result<(), P2PError> { - while let Some(event) = swarm.next().await { - match event { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - tracing::info!(%peer_id, "Connection established"); - } - SwarmEvent::ConnectionClosed { - peer_id, - cause: Some(error), - .. - } if peer_id == rendezvous_point => { - tracing::info!(%peer_id, %error, "Lost connection to rendezvous point"); - } - // once `/identify` did its job, we know our external address and can register +fn check_swarm_event(event: SwarmEvent) { + match event { + SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( + rendezvous::client::Event::Registered { + namespace, + ttl, + rendezvous_node, + }, + )) => { + tracing::info!( + "Registered for namespace '{}' at rendezvous point {} for the next {} seconds", + namespace, + rendezvous_node, + ttl + ); + } + SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( + rendezvous::client::Event::RegisterFailed { + rendezvous_node, + namespace, + error, + }, + )) => { + tracing::error!(%rendezvous_node, %namespace, + "Failed to register error = {error:?}" + ); + } + other => tracing::debug!("Encountered event: {other:?}"), + } +} + +async fn get_external_address(swarm: &mut Swarm) -> Multiaddr { + loop { + match swarm.select_next_some().await { + // once `/identify` did its job, we know our external address and can return it SwarmEvent::Behaviour(RegisterBehaviourEvent::Identify( identify::Event::Received { info, .. }, )) => { - // Register our external address. - tracing::info!("Registering external address {}", info.observed_addr); - swarm.add_external_address(info.observed_addr); - if let Err(error) = swarm.behaviour_mut().rendezvous.register( - namespace.clone(), - rendezvous_point, - Some(ttl), - ) { - tracing::error!("Failed to register: {error}"); - return Err(P2PError::RegistrationFailed(rendezvous_point)); - } - } - SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( - rendezvous::client::Event::Registered { - namespace, - ttl, - rendezvous_node, - }, - )) => { - tracing::info!( - "Registered for namespace '{}' at rendezvous point {} for the next {} seconds", - namespace, - rendezvous_node, - ttl - ); - return Ok(()); + tracing::info!("Identity information exchanged, external address received"); + return info.observed_addr; } - SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( - rendezvous::client::Event::RegisterFailed { - rendezvous_node, - namespace, - error, - }, - )) => { - tracing::error!(%rendezvous_node, %namespace, - "Failed to register error = {error:?}" - ); - return Err(P2PError::RegistrationFailed(rendezvous_node)); - } - other => tracing::debug!("Unimplemented event encountered: {other:?}"), + other => tracing::debug!("Encountered event: {other:?}"), } } - - Ok(()) } From d8be7ee20b85c24ed511ea4109372e5327748ef2 Mon Sep 17 00:00:00 2001 From: aidan46 Date: Thu, 23 Jan 2025 13:01:21 +0800 Subject: [PATCH 5/8] fix: Return error on failed registration --- storage-provider/server/src/p2p/register.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/storage-provider/server/src/p2p/register.rs b/storage-provider/server/src/p2p/register.rs index 8867a18a..6b1b81ea 100644 --- a/storage-provider/server/src/p2p/register.rs +++ b/storage-provider/server/src/p2p/register.rs @@ -102,16 +102,16 @@ pub(crate) async fn register( tracing::error!("Failed to register: {error}"); return Err(P2PError::RegistrationFailed(rendezvous_point)); } else { - tracing::info!("Registration with {rendezvous_point} successful"); + tracing::info!("Registration requested with {rendezvous_point}"); } } // Check incoming event. - event = swarm.select_next_some() => check_swarm_event(event) + event = swarm.select_next_some() => on_swarm_event(event)? } } } -fn check_swarm_event(event: SwarmEvent) { +fn on_swarm_event(event: SwarmEvent) -> Result<(), P2PError> { match event { SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( rendezvous::client::Event::Registered { @@ -137,9 +137,11 @@ fn check_swarm_event(event: SwarmEvent) { tracing::error!(%rendezvous_node, %namespace, "Failed to register error = {error:?}" ); + return Err(P2PError::RegistrationFailed(rendezvous_node)); } other => tracing::debug!("Encountered event: {other:?}"), } + Ok(()) } async fn get_external_address(swarm: &mut Swarm) -> Multiaddr { From c40a8b3e28ac883b1ea1b7411158db1e4e5db9db Mon Sep 17 00:00:00 2001 From: aidan46 Date: Thu, 23 Jan 2025 13:01:42 +0800 Subject: [PATCH 6/8] fix: Cancellation tracker in p2p node --- storage-provider/server/src/p2p/mod.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/storage-provider/server/src/p2p/mod.rs b/storage-provider/server/src/p2p/mod.rs index 102f23ed..0eeede34 100644 --- a/storage-provider/server/src/p2p/mod.rs +++ b/storage-provider/server/src/p2p/mod.rs @@ -128,12 +128,11 @@ pub async fn run_bootstrap_node( }, _ = token.cancelled() => { tracing::info!("P2P node has been stopped by the cancellation token..."); + tracker.close(); + tracker.wait().await; }, } - tracker.close(); - tracker.wait().await; - Ok(()) } @@ -165,11 +164,10 @@ pub async fn run_register_node( }, _ = token.cancelled() => { tracing::info!("P2P node has been stopped by the cancellation token..."); + tracker.close(); + tracker.wait().await; }, } - tracker.close(); - tracker.wait().await; - Ok(()) } From 4cb0dbd1350f338849eea2c384dd38ac58772496 Mon Sep 17 00:00:00 2001 From: aidan46 Date: Fri, 24 Jan 2025 12:46:04 +0800 Subject: [PATCH 7/8] fix: Remove TaskTracker for p2p task --- storage-provider/server/src/p2p/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/storage-provider/server/src/p2p/mod.rs b/storage-provider/server/src/p2p/mod.rs index 0eeede34..116e656a 100644 --- a/storage-provider/server/src/p2p/mod.rs +++ b/storage-provider/server/src/p2p/mod.rs @@ -116,7 +116,6 @@ pub async fn run_bootstrap_node( token: CancellationToken, ) -> Result<(), P2PError> { tracing::info!("Starting P2P bootstrap node"); - let tracker = TaskTracker::new(); let (swarm, addr) = config.create_swarm()?; tokio::select! { @@ -128,8 +127,6 @@ pub async fn run_bootstrap_node( }, _ = token.cancelled() => { tracing::info!("P2P node has been stopped by the cancellation token..."); - tracker.close(); - tracker.wait().await; }, } From e936b4a6475ccb5cc505123ccda695139da8ab52 Mon Sep 17 00:00:00 2001 From: aidan46 Date: Fri, 24 Jan 2025 12:56:26 +0800 Subject: [PATCH 8/8] docs: Add docs to register functions --- storage-provider/server/src/p2p/register.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/storage-provider/server/src/p2p/register.rs b/storage-provider/server/src/p2p/register.rs index 6b1b81ea..f76bba16 100644 --- a/storage-provider/server/src/p2p/register.rs +++ b/storage-provider/server/src/p2p/register.rs @@ -111,6 +111,7 @@ pub(crate) async fn register( } } +/// Checks swarm events related to registration and returns an error if the registration failed. fn on_swarm_event(event: SwarmEvent) -> Result<(), P2PError> { match event { SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( @@ -144,6 +145,8 @@ fn on_swarm_event(event: SwarmEvent) -> Result<(), P2PEr Ok(()) } +/// Checks the swarm for the `Identify` event to get its external address +/// so we can add it to the swarm with `add_external_address`. async fn get_external_address(swarm: &mut Swarm) -> Multiaddr { loop { match swarm.select_next_some().await {