-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add re-registration on expiry for p2p node #685
base: develop
Are you sure you want to change the base?
Changes from 4 commits
b764064
b95b682
7e5a451
112e999
d8be7ee
c40a8b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ use libp2p::{ | |
}; | ||
|
||
use super::P2PError; | ||
use crate::config::DEFAULT_REGISTRATION_TTL; | ||
|
||
#[derive(NetworkBehaviour)] | ||
pub struct BootstrapBehaviour { | ||
|
@@ -39,7 +40,7 @@ impl BootstrapConfig { | |
.with_behaviour(|key| BootstrapBehaviour { | ||
// Rendezvous server behaviour for serving new peers to connecting nodes. | ||
rendezvous: rendezvous::server::Behaviour::new( | ||
rendezvous::server::Config::default(), | ||
rendezvous::server::Config::default().with_max_ttl(DEFAULT_REGISTRATION_TTL), // Max TTL of 24 hours | ||
), | ||
// The identify behaviour is used to share the external address and the public key with connecting clients. | ||
identify: identify::Behaviour::new(identify::Config::new( | ||
|
@@ -65,11 +66,8 @@ pub(crate) async fn bootstrap( | |
swarm.listen_on(addr)?; | ||
while let Some(event) = swarm.next().await { | ||
match event { | ||
SwarmEvent::ConnectionEstablished { peer_id, .. } => { | ||
tracing::info!("Connected to {}", peer_id); | ||
} | ||
SwarmEvent::ConnectionClosed { peer_id, .. } => { | ||
tracing::info!("Disconnected from {}", peer_id); | ||
SwarmEvent::NewListenAddr { address, .. } => { | ||
tracing::info!("Listening on {}", address); | ||
} | ||
SwarmEvent::Behaviour(BootstrapBehaviourEvent::Rendezvous( | ||
rendezvous::server::Event::PeerRegistered { peer, registration }, | ||
|
@@ -95,7 +93,16 @@ pub(crate) async fn bootstrap( | |
); | ||
} | ||
} | ||
_other => {} | ||
SwarmEvent::Behaviour(BootstrapBehaviourEvent::Rendezvous( | ||
rendezvous::server::Event::RegistrationExpired(registration), | ||
)) => { | ||
tracing::info!( | ||
"Registration for peer {} expired in namespace {}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do we do when the registration expires? What does it mean for the node to be registered? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the registration expires the registration node re-registers automatically. |
||
registration.record.peer_id(), | ||
registration.namespace | ||
); | ||
} | ||
other => tracing::debug!("Encountered event: {other:?}"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't this catch the previous |
||
} | ||
} | ||
Ok(()) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,3 @@ | ||
use std::time::Duration; | ||
|
||
use libp2p::{ | ||
futures::StreamExt, | ||
identify, | ||
|
@@ -9,6 +7,7 @@ use libp2p::{ | |
swarm::{NetworkBehaviour, SwarmEvent}, | ||
tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, | ||
}; | ||
use tokio::time::Duration; | ||
|
||
use super::P2PError; | ||
|
||
|
@@ -20,24 +19,27 @@ pub struct RegisterBehaviour { | |
|
||
pub struct RegisterConfig { | ||
keypair: Keypair, | ||
rendezvous_point_address: Multiaddr, | ||
rendezvous_point: PeerId, | ||
pub(crate) rendezvous_point_address: Multiaddr, | ||
pub(crate) rendezvous_point: PeerId, | ||
pub(crate) registration_ttl: u64, | ||
} | ||
|
||
impl RegisterConfig { | ||
pub fn new( | ||
keypair: Keypair, | ||
rendezvous_point_address: Multiaddr, | ||
rendezvous_point: PeerId, | ||
registration_ttl: u64, | ||
) -> Self { | ||
Self { | ||
keypair, | ||
rendezvous_point_address, | ||
rendezvous_point, | ||
registration_ttl, | ||
} | ||
} | ||
|
||
pub fn create_swarm(self) -> Result<(Swarm<RegisterBehaviour>, Multiaddr, PeerId), P2PError> { | ||
pub fn create_swarm(self) -> Result<Swarm<RegisterBehaviour>, P2PError> { | ||
let swarm = SwarmBuilder::with_existing_identity(self.keypair) | ||
.with_tokio() | ||
.with_tcp( | ||
|
@@ -59,83 +61,98 @@ impl RegisterConfig { | |
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(10))) | ||
.build(); | ||
|
||
Ok((swarm, self.rendezvous_point_address, self.rendezvous_point)) | ||
Ok(swarm) | ||
} | ||
} | ||
|
||
/// Register the peer with the rendezvous point. | ||
/// The ttl is how long the peer will remain registered in seconds. | ||
pub(crate) async fn register( | ||
mut swarm: Swarm<RegisterBehaviour>, | ||
swarm: &mut Swarm<RegisterBehaviour>, | ||
rendezvous_point: PeerId, | ||
rendezvous_point_address: Multiaddr, | ||
ttl: Option<u64>, | ||
ttl: u64, | ||
namespace: Namespace, | ||
) -> Result<(), P2PError> { | ||
tracing::info!("Attempting to register with rendezvous point {rendezvous_point} at {rendezvous_point_address}"); | ||
let mut register_tick = tokio::time::interval(Duration::from_secs(ttl)); | ||
|
||
// Dial into bootstrap address | ||
swarm.dial(rendezvous_point_address.clone())?; | ||
// Get and add external address | ||
let external_addr = get_external_address(swarm).await; | ||
swarm.add_external_address(external_addr); | ||
|
||
while let Some(event) = swarm.next().await { | ||
match event { | ||
SwarmEvent::NewListenAddr { address, .. } => { | ||
tracing::info!("Listening on {}", address); | ||
} | ||
SwarmEvent::ConnectionClosed { | ||
peer_id, | ||
cause: Some(error), | ||
.. | ||
} if peer_id == rendezvous_point => { | ||
tracing::info!("Lost connection to rendezvous point {}", error); | ||
} | ||
// once `/identify` did its job, we know our external address and can register | ||
SwarmEvent::Behaviour(RegisterBehaviourEvent::Identify( | ||
identify::Event::Received { info, .. }, | ||
)) => { | ||
// Register our external address. | ||
tracing::info!("Registering external address {}", info.observed_addr); | ||
swarm.add_external_address(info.observed_addr); | ||
if let Err(error) = swarm.behaviour_mut().rendezvous.register( | ||
namespace.clone(), | ||
rendezvous_point, | ||
ttl, | ||
) { | ||
loop { | ||
tokio::select! { | ||
jmg-duarte marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Poll tick every TTL to re-register. | ||
// First tick completes immediately. | ||
_ = register_tick.tick() => { | ||
tracing::info!("Registering with p2p node"); | ||
// Dial to establish a connection. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think @cernicc knows a bit more than me on this, but I think these logs should be contained inside an |
||
// Dial is needed because the connection is not kept alive using rendezvous protocol. | ||
swarm.dial(rendezvous_point_address.clone())?; | ||
// Register with bootstrap node. | ||
if let Err(error) = | ||
swarm | ||
.behaviour_mut() | ||
.rendezvous | ||
.register(namespace.clone(), rendezvous_point, Some(ttl)) | ||
{ | ||
tracing::error!("Failed to register: {error}"); | ||
return Err(P2PError::RegistrationFailed(rendezvous_point)); | ||
} else { | ||
tracing::info!("Registration with {rendezvous_point} successful"); | ||
} | ||
aidan46 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( | ||
rendezvous::client::Event::Registered { | ||
namespace, | ||
ttl, | ||
rendezvous_node, | ||
}, | ||
)) => { | ||
tracing::info!( | ||
"Registered for namespace '{}' at rendezvous point {} for the next {} seconds", | ||
namespace, | ||
rendezvous_node, | ||
ttl | ||
); | ||
return Ok(()); | ||
} | ||
SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( | ||
rendezvous::client::Event::RegisterFailed { | ||
rendezvous_node, | ||
namespace, | ||
error, | ||
}, | ||
// Check incoming event. | ||
event = swarm.select_next_some() => check_swarm_event(event) | ||
} | ||
} | ||
} | ||
|
||
fn check_swarm_event(event: SwarmEvent<RegisterBehaviourEvent>) { | ||
match event { | ||
aidan46 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( | ||
rendezvous::client::Event::Registered { | ||
namespace, | ||
ttl, | ||
rendezvous_node, | ||
}, | ||
)) => { | ||
tracing::info!( | ||
"Registered for namespace '{}' at rendezvous point {} for the next {} seconds", | ||
namespace, | ||
rendezvous_node, | ||
ttl | ||
); | ||
} | ||
SwarmEvent::Behaviour(RegisterBehaviourEvent::Rendezvous( | ||
rendezvous::client::Event::RegisterFailed { | ||
rendezvous_node, | ||
namespace, | ||
error, | ||
}, | ||
)) => { | ||
tracing::error!(%rendezvous_node, %namespace, | ||
"Failed to register error = {error:?}" | ||
aidan46 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
} | ||
other => tracing::debug!("Encountered event: {other:?}"), | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A docstring would be appreciated |
||
async fn get_external_address(swarm: &mut Swarm<RegisterBehaviour>) -> Multiaddr { | ||
loop { | ||
match swarm.select_next_some().await { | ||
// once `/identify` did its job, we know our external address and can return it | ||
SwarmEvent::Behaviour(RegisterBehaviourEvent::Identify( | ||
identify::Event::Received { info, .. }, | ||
)) => { | ||
tracing::error!( | ||
"Failed to register: rendezvous_node={}, namespace={}, error_code={:?}", | ||
rendezvous_node, | ||
namespace, | ||
error | ||
); | ||
return Err(P2PError::RegistrationFailed(rendezvous_node)); | ||
tracing::info!("Identity information exchanged, external address received"); | ||
return info.observed_addr; | ||
} | ||
_other => {} | ||
other => tracing::debug!("Encountered event: {other:?}"), | ||
} | ||
} | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better if we use Duration here. That way the reader can immediately see how the duration is parsed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the tick it makes sense but we re-use this value for the register call in the swarm which expects a
Option<u64>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok as is, but for reference, you can always just
.as_secs