Skip to content

Commit

Permalink
Merge pull request eqlabs#2261 from eqlabs/chris/bootstrap
Browse files Browse the repository at this point in the history
feat: use libp2p::kad internal bootstrap trigger
  • Loading branch information
CHr15F0x authored Sep 24, 2024
2 parents d97e61b + a5694f8 commit b5400d3
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 321 deletions.
1 change: 1 addition & 0 deletions crates/p2p/src/behaviour/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl Builder {
kademlia_config.set_record_ttl(Some(Duration::from_secs(0)));
kademlia_config.set_provider_record_ttl(Some(PROVIDER_PUBLICATION_INTERVAL * 3));
kademlia_config.set_provider_publication_interval(Some(PROVIDER_PUBLICATION_INTERVAL));
kademlia_config.set_periodic_bootstrap_interval(Some(cfg.bootstrap_period));

let peer_id = identity.public().to_peer_id();
let secret = Secret::new(&identity);
Expand Down
4 changes: 2 additions & 2 deletions crates/p2p/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Builder {
let client = Client::new(command_sender, local_peer_id);

let (behaviour, relay_transport) = behaviour_builder
.unwrap_or_else(|| Behaviour::builder(keypair.clone(), chain_id, cfg.clone()))
.unwrap_or_else(|| Behaviour::builder(keypair.clone(), chain_id, cfg))
.build(client.clone());

let swarm = Swarm::new(
Expand All @@ -65,7 +65,7 @@ impl Builder {
(
client,
event_receiver,
MainLoop::new(swarm, command_receiver, event_sender, cfg),
MainLoop::new(swarm, command_receiver, event_sender),
)
}
}
23 changes: 3 additions & 20 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ pub struct Config {
pub max_inbound_relayed_peers: usize,
/// Maximum number of outbound peers.
pub max_outbound_peers: usize,
/// The minimum number of peers to maintain. If the number of outbound peers
/// drops below this number, the node will attempt to connect to more
/// peers.
pub low_watermark: usize,
/// How long to prevent evicted peers from reconnecting.
pub eviction_timeout: Duration,
pub ip_whitelist: Vec<IpNet>,
pub bootstrap: BootstrapConfig,
/// If the number of peers is below the low watermark, the node will attempt
/// periodic bootstrapping at this interval.
pub bootstrap_period: Duration,
pub inbound_connections_rate_limit: RateLimit,
/// Custom protocol name for Kademlia
pub kad_name: Option<String>,
Expand All @@ -79,21 +77,6 @@ pub struct RateLimit {
pub interval: Duration,
}

#[derive(Copy, Clone, Debug)]
pub struct BootstrapConfig {
pub start_offset: Duration,
pub period: Duration,
}

impl Default for BootstrapConfig {
fn default() -> Self {
Self {
start_offset: Duration::from_secs(5),
period: Duration::from_secs(2 * 60),
}
}
}

pub type HeadTx = tokio::sync::watch::Sender<Option<(BlockNumber, BlockHash)>>;
pub type HeadRx = tokio::sync::watch::Receiver<Option<(BlockNumber, BlockHash)>>;

Expand Down
123 changes: 48 additions & 75 deletions crates/p2p/src/main_loop.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::num::NonZeroUsize;

use futures::channel::mpsc::Receiver as ResponseReceiver;
use futures::StreamExt;
use libp2p::gossipsub::{self, IdentTopic};
use libp2p::kad::{
self,
BootstrapError,
BootstrapOk,
ProgressStep,
QueryId,
QueryInfo,
QueryResult,
};
use libp2p::kad::{self, BootstrapError, BootstrapOk, QueryId, QueryResult};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::SwarmEvent;
Expand All @@ -29,10 +22,9 @@ use tokio::time::Duration;

#[cfg(test)]
use crate::test_utils;
use crate::{behaviour, Command, Config, EmptyResultSender, Event, TestCommand, TestEvent};
use crate::{behaviour, Command, EmptyResultSender, Event, TestCommand, TestEvent};

pub struct MainLoop {
cfg: crate::Config,
swarm: libp2p::swarm::Swarm<behaviour::Behaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
Expand All @@ -46,8 +38,6 @@ pub struct MainLoop {
// 2. update the sync head info of our peers using a different mechanism
// request_sync_status: HashSetDelay<PeerId>,
pending_queries: PendingQueries,
/// Ongoing Kademlia bootstrap query.
ongoing_bootstrap: Option<QueryId>,
_pending_test_queries: TestQueries,
}

Expand Down Expand Up @@ -86,36 +76,24 @@ impl MainLoop {
swarm: libp2p::swarm::Swarm<behaviour::Behaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
cfg: Config,
) -> Self {
Self {
cfg,
swarm,
command_receiver,
event_sender,
pending_dials: Default::default(),
pending_sync_requests: Default::default(),
pending_queries: Default::default(),
ongoing_bootstrap: None,
_pending_test_queries: Default::default(),
}
}

pub async fn run(mut self) {
// Delay bootstrap so that by the time we attempt it we've connected to the
// bootstrap node
let bootstrap_start = tokio::time::Instant::now() + self.cfg.bootstrap.start_offset;
let mut bootstrap_interval =
tokio::time::interval_at(bootstrap_start, self.cfg.bootstrap.period);

let mut network_status_interval = tokio::time::interval(Duration::from_secs(5));
let mut peer_status_interval = tokio::time::interval(Duration::from_secs(30));
let me = *self.swarm.local_peer_id();

loop {
let bootstrap_interval_tick = bootstrap_interval.tick();
tokio::pin!(bootstrap_interval_tick);

let network_status_interval_tick = network_status_interval.tick();
tokio::pin!(network_status_interval_tick);

Expand Down Expand Up @@ -162,31 +140,6 @@ impl MainLoop {
dht,
);
}
_ = bootstrap_interval_tick => {
tracing::debug!("Checking low watermark");
if let Some(query_id) = self.ongoing_bootstrap {
match self.swarm.behaviour_mut().kademlia_mut().query_mut(&query_id) {
Some(mut query) if matches!(query.info(), QueryInfo::Bootstrap {
step: ProgressStep { last: false, .. }, .. }
) => {
tracing::debug!("Previous bootstrap still in progress, aborting it");
query.finish();
continue;
}
_ => self.ongoing_bootstrap = None,
}
}
if self.swarm.behaviour_mut().outbound_peers().count() < self.cfg.low_watermark {
if let Ok(query_id) = self.swarm.behaviour_mut().kademlia_mut().bootstrap() {
self.ongoing_bootstrap = Some(query_id);
send_test_event(
&self.event_sender,
TestEvent::KademliaBootstrapStarted,
)
.await;
}
}
}
command = self.command_receiver.recv() => {
match command {
Some(c) => self.handle_command(c).await,
Expand Down Expand Up @@ -397,7 +350,6 @@ impl MainLoop {
Err(peer)
}
};
self.ongoing_bootstrap = None;
send_test_event(
&self.event_sender,
TestEvent::KademliaBootstrapCompleted(result),
Expand Down Expand Up @@ -451,31 +403,52 @@ impl MainLoop {
}
_ => self.test_query_completed(id, result).await,
}
} else if let QueryResult::GetProviders(result) = result {
use libp2p::kad::GetProvidersOk;

let result = match result {
Ok(GetProvidersOk::FoundProviders { providers, .. }) => Ok(providers),
Ok(_) => Ok(Default::default()),
Err(_) => {
unreachable!(
"when a query times out libp2p makes it the last stage"
)
}
};

let sender = self
.pending_queries
.get_providers
.get(&id)
.expect("Query to be pending");

sender
.send(result)
.await
.expect("Receiver not to be dropped");
} else {
self.test_query_progressed(id, result).await;
match result {
QueryResult::GetProviders(result) => {
use libp2p::kad::GetProvidersOk;

let result = match result {
Ok(GetProvidersOk::FoundProviders { providers, .. }) => {
Ok(providers)
}
Ok(_) => Ok(Default::default()),
Err(_) => {
unreachable!(
"when a query times out libp2p makes it the last stage"
)
}
};

let sender = self
.pending_queries
.get_providers
.get(&id)
.expect("Query to be pending");

sender
.send(result)
.await
.expect("Receiver not to be dropped");
}
QueryResult::Bootstrap(_) => {
tracing::debug!("Checking low watermark");
// Starting from libp2p-v0.54.1 bootstrap queries are started
// automatically in the kad behaviour:
// 1. periodically,
// 2. after a peer is added to the routing table, if the number of
// peers in the DHT is lower than 20. See `bootstrap_on_low_peers` for more details:
// https://github.com/libp2p/rust-libp2p/blob/d7beb55f672dce54017fa4b30f67ecb8d66b9810/protocols/kad/src/behaviour.rs#L1401).
if step.count == NonZeroUsize::new(1).expect("1>0") {
send_test_event(
&self.event_sender,
TestEvent::KademliaBootstrapStarted,
)
.await;
}
}
_ => self.test_query_progressed(id, result).await,
}
}
}
kad::Event::RoutingUpdated {
Expand Down
5 changes: 2 additions & 3 deletions crates/p2p/src/test_utils/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ impl Config {
max_inbound_direct_peers: 10,
max_inbound_relayed_peers: 10,
max_outbound_peers: 10,
low_watermark: 10,
ip_whitelist: vec!["::/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()],
bootstrap: Default::default(),
ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()],
bootstrap_period: Duration::from_millis(500),
eviction_timeout: Duration::from_secs(15 * 60),
inbound_connections_rate_limit: RateLimit {
max: 1000,
Expand Down
Loading

0 comments on commit b5400d3

Please sign in to comment.