diff --git a/docs/src/storage-provider-cli/server.md b/docs/src/storage-provider-cli/server.md index e3febfde7..311a38624 100644 --- a/docs/src/storage-provider-cli/server.md +++ b/docs/src/storage-provider-cli/server.md @@ -97,6 +97,10 @@ Rendezvous point address that the registration node connects to or the bootstrap Peer ID of the rendezvous point that the registration node connects to. Only needed if running a registration P2P node. +### `--registration-ttl` + +The TTL of the p2p registration in seconds. After the node registration expires, the server automatically re-registers itself. + ### `--config` Takes in a path to a configuration file, it supports both JSON and TOML (files _must_ have the right extension). @@ -117,6 +121,7 @@ The supported configuration parameters are: | `p2p_key` | NA | | `rendezvous_point_address` | NA | | `rendezvous_point` | `None` | +| `registration_ttl` | `24 hours` | #### Bare bones configuration diff --git a/storage-provider/server/src/config.rs b/storage-provider/server/src/config.rs index 9b72ae5c0..7019a0ad4 100644 --- a/storage-provider/server/src/config.rs +++ b/storage-provider/server/src/config.rs @@ -15,6 +15,8 @@ use crate::{ DEFAULT_NODE_ADDRESS, }; +pub const DEFAULT_REGISTRATION_TTL: u64 = 86400; + /// Default address to bind the RPC server to. const fn default_rpc_listen_address() -> SocketAddr { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8000) @@ -31,6 +33,11 @@ const fn default_parallel_prove_commits() -> NonZero { unsafe { NonZero::new_unchecked(2) } } +/// Default registration TTL, how long the node is registered. +const fn default_registration_ttl() -> u64 { + DEFAULT_REGISTRATION_TTL +} + fn default_node_address() -> Url { Url::parse(DEFAULT_NODE_ADDRESS).expect("DEFAULT_NODE_ADDRESS must be a valid Url") } @@ -123,4 +130,9 @@ pub struct ConfigurationArgs { #[serde(default, deserialize_with = "string_to_peer_id_option")] #[arg(long)] pub(crate) rendezvous_point: Option, + + /// TTL of the p2p registration in seconds + #[serde(default = "default_registration_ttl")] + #[arg(long, default_value_t = DEFAULT_REGISTRATION_TTL)] + pub(crate) registration_ttl: u64, } diff --git a/storage-provider/server/src/main.rs b/storage-provider/server/src/main.rs index 388d841af..8ac00ac15 100644 --- a/storage-provider/server/src/main.rs +++ b/storage-provider/server/src/main.rs @@ -272,6 +272,9 @@ pub struct Server { /// PeerID of the bootstrap node used by the registration node. /// Optional because it is not used by the bootstrap node. rendezvous_point: Option, + + /// TTL of the p2p registration in seconds + registration_ttl: u64, } impl TryFrom for Server { @@ -353,6 +356,7 @@ impl TryFrom for Server { p2p_key: args.p2p_key, rendezvous_point_address: args.rendezvous_point_address, rendezvous_point: args.rendezvous_point, + registration_ttl: args.registration_ttl, }) } } @@ -481,6 +485,7 @@ impl Server { p2p_key: self.p2p_key, rendezvous_point_address: self.rendezvous_point_address, rendezvous_point: self.rendezvous_point, + registration_ttl: self.registration_ttl, }; Ok(SetupOutput { @@ -568,6 +573,7 @@ fn spawn_p2p_task( p2p_state.p2p_key, p2p_state.rendezvous_point_address, rendezvous_point, + p2p_state.registration_ttl, ); Ok(tokio::spawn(run_register_node(config, cancellation_token))) } diff --git a/storage-provider/server/src/p2p/bootstrap.rs b/storage-provider/server/src/p2p/bootstrap.rs index ebc8cc146..7f6ca1b24 100644 --- a/storage-provider/server/src/p2p/bootstrap.rs +++ b/storage-provider/server/src/p2p/bootstrap.rs @@ -10,6 +10,7 @@ use libp2p::{ }; use super::P2PError; +use crate::config::DEFAULT_REGISTRATION_TTL; #[derive(NetworkBehaviour)] pub struct BootstrapBehaviour { @@ -39,7 +40,7 @@ impl BootstrapConfig { .with_behaviour(|key| BootstrapBehaviour { // Rendezvous server behaviour for serving new peers to connecting nodes. rendezvous: rendezvous::server::Behaviour::new( - rendezvous::server::Config::default(), + rendezvous::server::Config::default().with_max_ttl(DEFAULT_REGISTRATION_TTL), // Max TTL of 24 hours ), // The identify behaviour is used to share the external address and the public key with connecting clients. identify: identify::Behaviour::new(identify::Config::new( @@ -65,11 +66,8 @@ pub(crate) async fn bootstrap( swarm.listen_on(addr)?; while let Some(event) = swarm.next().await { match event { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - tracing::info!("Connected to {}", peer_id); - } - SwarmEvent::ConnectionClosed { peer_id, .. } => { - tracing::info!("Disconnected from {}", peer_id); + SwarmEvent::NewListenAddr { address, .. } => { + tracing::info!("Listening on {}", address); } SwarmEvent::Behaviour(BootstrapBehaviourEvent::Rendezvous( rendezvous::server::Event::PeerRegistered { peer, registration }, @@ -95,7 +93,16 @@ pub(crate) async fn bootstrap( ); } } - _other => {} + SwarmEvent::Behaviour(BootstrapBehaviourEvent::Rendezvous( + rendezvous::server::Event::RegistrationExpired(registration), + )) => { + tracing::info!( + "Registration for peer {} expired in namespace {}", + registration.record.peer_id(), + registration.namespace + ); + } + other => tracing::debug!("Encountered event: {other:?}"), } } Ok(()) diff --git a/storage-provider/server/src/p2p/mod.rs b/storage-provider/server/src/p2p/mod.rs index 7b24c0811..83814adac 100644 --- a/storage-provider/server/src/p2p/mod.rs +++ b/storage-provider/server/src/p2p/mod.rs @@ -6,7 +6,7 @@ use ed25519_dalek::{pkcs8::DecodePrivateKey, SigningKey}; use libp2p::{identity::Keypair, rendezvous::Namespace, Multiaddr, PeerId}; use register::register; use serde::{de, Deserialize}; -use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tokio_util::sync::CancellationToken; mod bootstrap; mod register; @@ -17,6 +17,7 @@ pub(crate) use register::RegisterConfig; const P2P_NAMESPACE: &str = "polka-storage"; #[derive(Default, Debug, Clone, Copy, ValueEnum, Deserialize)] +#[serde(rename_all = "lowercase")] pub enum NodeType { #[default] Bootstrap, @@ -67,6 +68,9 @@ pub(crate) struct P2PState { /// PeerID of the bootstrap node used by the registration node. /// Optional because it is not used by the bootstrap node. pub(crate) rendezvous_point: Option, + + /// TTL of the p2p registration in seconds + pub(crate) registration_ttl: u64, } /// Deserializes a ED25519 private key into a Keypair. @@ -112,7 +116,6 @@ pub async fn run_bootstrap_node( token: CancellationToken, ) -> Result<(), P2PError> { tracing::info!("Starting P2P bootstrap node"); - let tracker = TaskTracker::new(); let (swarm, addr) = config.create_swarm()?; tokio::select! { @@ -127,9 +130,6 @@ pub async fn run_bootstrap_node( }, } - tracker.close(); - tracker.wait().await; - Ok(()) } @@ -140,15 +140,17 @@ pub async fn run_register_node( token: CancellationToken, ) -> Result<(), P2PError> { tracing::info!("Starting P2P register node"); - let tracker = TaskTracker::new(); - let (swarm, rendezvous_point_address, rendezvous_point) = config.create_swarm()?; + let rendezvous_point = config.rendezvous_point; + let rendezvous_point_address = config.rendezvous_point_address.clone(); + let registration_ttl = config.registration_ttl; + let mut swarm = config.create_swarm()?; tokio::select! { res = register( - swarm, + &mut swarm, rendezvous_point, rendezvous_point_address, - None, + registration_ttl, Namespace::from_static(P2P_NAMESPACE), ) => { if let Err(e) = res { @@ -161,8 +163,5 @@ pub async fn run_register_node( }, } - tracker.close(); - tracker.wait().await; - Ok(()) } diff --git a/storage-provider/server/src/p2p/register.rs b/storage-provider/server/src/p2p/register.rs index c5dc986d3..4c809e2cd 100644 --- a/storage-provider/server/src/p2p/register.rs +++ b/storage-provider/server/src/p2p/register.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use libp2p::{ futures::StreamExt, identify, @@ -9,6 +7,7 @@ use libp2p::{ swarm::{NetworkBehaviour, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, }; +use tokio::time::Duration; use super::P2PError; @@ -20,8 +19,9 @@ pub struct RegisterBehaviour { pub struct RegisterConfig { keypair: Keypair, - rendezvous_point_address: Multiaddr, - rendezvous_point: PeerId, + pub(crate) rendezvous_point_address: Multiaddr, + pub(crate) rendezvous_point: PeerId, + pub(crate) registration_ttl: u64, } impl RegisterConfig { @@ -29,15 +29,17 @@ impl RegisterConfig { keypair: Keypair, rendezvous_point_address: Multiaddr, rendezvous_point: PeerId, + registration_ttl: u64, ) -> Self { Self { keypair, rendezvous_point_address, rendezvous_point, + registration_ttl, } } - pub fn create_swarm(self) -> Result<(Swarm, Multiaddr, PeerId), P2PError> { + pub fn create_swarm(self) -> Result, P2PError> { let swarm = SwarmBuilder::with_existing_identity(self.keypair) .with_tokio() .with_tcp( @@ -59,83 +61,112 @@ impl RegisterConfig { .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(10))) .build(); - Ok((swarm, self.rendezvous_point_address, self.rendezvous_point)) + Ok(swarm) } } /// Register the peer with the rendezvous point. /// The ttl is how long the peer will remain registered in seconds. +#[tracing::instrument( + skip(swarm), + fields( + rendezvous_point = %rendezvous_point, + rendezvous_point_address = %rendezvous_point_address, + ttl = %ttl, + namespace = %namespace + ) +)] pub(crate) async fn register( - mut swarm: Swarm, + swarm: &mut Swarm, rendezvous_point: PeerId, rendezvous_point_address: Multiaddr, - ttl: Option, + ttl: u64, namespace: Namespace, ) -> Result<(), P2PError> { tracing::info!("Attempting to register with rendezvous point {rendezvous_point} at {rendezvous_point_address}"); + let mut register_tick = tokio::time::interval(Duration::from_secs(ttl)); + + // Dial into bootstrap address swarm.dial(rendezvous_point_address.clone())?; + // Get and add external address + let external_addr = get_external_address(swarm).await; + swarm.add_external_address(external_addr); - while let Some(event) = swarm.next().await { - match event { - SwarmEvent::NewListenAddr { address, .. } => { - tracing::info!("Listening on {}", address); - } - SwarmEvent::ConnectionClosed { - peer_id, - cause: Some(error), - .. - } if peer_id == rendezvous_point => { - tracing::info!("Lost connection to rendezvous point {}", error); - } - // once `/identify` did its job, we know our external address and can register - SwarmEvent::Behaviour(RegisterBehaviourEvent::Identify( - identify::Event::Received { info, .. }, - )) => { - // Register our external address. - tracing::info!("Registering external address {}", info.observed_addr); - swarm.add_external_address(info.observed_addr); - if let Err(error) = swarm.behaviour_mut().rendezvous.register( - namespace.clone(), - rendezvous_point, - ttl, - ) { + loop { + tokio::select! { + // Poll tick every TTL to re-register. + // First tick completes immediately. + _ = register_tick.tick() => { + tracing::info!("Registering with p2p node"); + // Dial to establish a connection. + // Dial is needed because the connection is not kept alive using rendezvous protocol. + swarm.dial(rendezvous_point_address.clone())?; + // Register with bootstrap node. + if let Err(error) = + swarm + .behaviour_mut() + .rendezvous + .register(namespace.clone(), rendezvous_point, Some(ttl)) + { tracing::error!("Failed to register: {error}"); return Err(P2PError::RegistrationFailed(rendezvous_point)); + } else { + tracing::info!("Registration requested with {rendezvous_point}"); } } - SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( - rendezvous::client::Event::Registered { - namespace, - ttl, - rendezvous_node, - }, - )) => { - tracing::info!( - "Registered for namespace '{}' at rendezvous point {} for the next {} seconds", - namespace, - rendezvous_node, - ttl - ); - return Ok(()); - } - SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( - rendezvous::client::Event::RegisterFailed { - rendezvous_node, - namespace, - error, - }, - )) => { - tracing::error!( - "Failed to register: rendezvous_node={}, namespace={}, error_code={:?}", - rendezvous_node, - namespace, - error - ); - return Err(P2PError::RegistrationFailed(rendezvous_node)); - } - _other => {} + // Check incoming event. + event = swarm.select_next_some() => on_swarm_event(event)? } } +} +/// Checks swarm events related to registration and returns an error if the registration failed. +fn on_swarm_event(event: SwarmEvent) -> Result<(), P2PError> { + match event { + SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( + rendezvous::client::Event::Registered { + namespace, + ttl, + rendezvous_node, + }, + )) => { + tracing::info!( + "Registered for namespace '{}' at rendezvous point {} for the next {} seconds", + namespace, + rendezvous_node, + ttl + ); + } + SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( + rendezvous::client::Event::RegisterFailed { + rendezvous_node, + namespace, + error, + }, + )) => { + tracing::error!(%rendezvous_node, %namespace, + "Failed to register error = {error:?}" + ); + return Err(P2PError::RegistrationFailed(rendezvous_node)); + } + other => tracing::debug!("Encountered event: {other:?}"), + } Ok(()) } + +/// Checks the swarm for the `Identify` event to get its external address +/// so we can add it to the swarm with `add_external_address`. +async fn get_external_address(swarm: &mut Swarm) -> Multiaddr { + loop { + match swarm.select_next_some().await { + // once `/identify` did its job, we know our external address and can return it + SwarmEvent::Behaviour(RegisterBehaviourEvent::Identify( + identify::Event::Received { info, .. }, + )) => { + tracing::info!("Identity information exchanged, external address received"); + return info.observed_addr; + } + other => tracing::debug!("Encountered event: {other:?}"), + } + } +}