From ff0ec0feb7e96e4c221c96200638a0d55ff26dab Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 18 Oct 2022 21:05:55 +0300 Subject: [PATCH 1/3] Remove multiple DHTs support from `Discovery` --- client/network/src/behaviour.rs | 21 +- client/network/src/discovery.rs | 675 ++++++++++---------------- client/network/src/service.rs | 32 +- client/network/src/service/metrics.rs | 35 +- 4 files changed, 293 insertions(+), 470 deletions(-) diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index fa130fb4baacd..2e646956e9d8c 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -34,7 +34,6 @@ use libp2p::{ use sc_consensus::import_queue::{IncomingBlock, RuntimeOrigin}; use sc_network_common::{ - config::ProtocolId, protocol::{ event::DhtEvent, role::{ObservedRole, Roles}, @@ -79,7 +78,7 @@ pub enum BehaviourOut { JustificationImport(RuntimeOrigin, B::Hash, NumberFor, Justifications), /// Started a random iterative Kademlia discovery query. - RandomKademliaStarted(Vec), + RandomKademliaStarted, /// We have received a request from a peer and answered it. /// @@ -267,25 +266,20 @@ where self.discovery.add_known_address(peer_id, addr) } - /// Returns the number of nodes in each Kademlia kbucket for each Kademlia instance. + /// Returns the number of nodes in each Kademlia kbucket. /// - /// Identifies Kademlia instances by their [`ProtocolId`] and kbuckets by the base 2 logarithm - /// of their lower bound. - pub fn num_entries_per_kbucket( - &mut self, - ) -> impl ExactSizeIterator)> { + /// Identifies kbuckets by the base 2 logarithm of their lower bound. + pub fn num_entries_per_kbucket(&mut self) -> Option> { self.discovery.num_entries_per_kbucket() } /// Returns the number of records in the Kademlia record stores. - pub fn num_kademlia_records(&mut self) -> impl ExactSizeIterator { + pub fn num_kademlia_records(&mut self) -> Option { self.discovery.num_kademlia_records() } /// Returns the total size in bytes of all the records in the Kademlia record stores. - pub fn kademlia_records_total_size( - &mut self, - ) -> impl ExactSizeIterator { + pub fn kademlia_records_total_size(&mut self) -> Option { self.discovery.kademlia_records_total_size() } @@ -438,8 +432,7 @@ impl From for BehaviourOut { BehaviourOut::Dht(DhtEvent::ValuePut(key), duration), DiscoveryOut::ValuePutFailed(key, duration) => BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), duration), - DiscoveryOut::RandomKademliaStarted(protocols) => - BehaviourOut::RandomKademliaStarted(protocols), + DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted, } } } diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index da4dec70b29ab..869b58e59a1b2 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -66,11 +66,12 @@ use libp2p::{ mdns::{Mdns, MdnsConfig, MdnsEvent}, multiaddr::Protocol, swarm::{ - handler::multi::IntoMultiHandler, ConnectionHandler, DialError, IntoConnectionHandler, - NetworkBehaviour, NetworkBehaviourAction, PollParameters, + behaviour::toggle::{Toggle, ToggleIntoConnectionHandler}, + ConnectionHandler, DialError, IntoConnectionHandler, NetworkBehaviour, + NetworkBehaviourAction, PollParameters, }, }; -use log::{debug, error, info, trace, warn}; +use log::{debug, info, trace, warn}; use sc_network_common::{config::ProtocolId, utils::LruHashSet}; use sp_core::hexdisplay::HexDisplay; use std::{ @@ -100,7 +101,7 @@ pub struct DiscoveryConfig { discovery_only_if_under_num: u64, enable_mdns: bool, kademlia_disjoint_query_paths: bool, - protocol_ids: HashSet, + kademlia_protocol_id: Option, } impl DiscoveryConfig { @@ -115,7 +116,7 @@ impl DiscoveryConfig { discovery_only_if_under_num: std::u64::MAX, enable_mdns: false, kademlia_disjoint_query_paths: false, - protocol_ids: HashSet::new(), + kademlia_protocol_id: None, } } @@ -160,13 +161,8 @@ impl DiscoveryConfig { } /// Add discovery via Kademlia for the given protocol. - pub fn add_protocol(&mut self, id: ProtocolId) -> &mut Self { - if self.protocol_ids.contains(&id) { - warn!(target: "sub-libp2p", "Discovery already registered for protocol {:?}", id); - return self - } - - self.protocol_ids.insert(id); + pub fn with_kademlia(&mut self, id: ProtocolId) -> &mut Self { + self.kademlia_protocol_id = Some(id); self } @@ -189,37 +185,34 @@ impl DiscoveryConfig { discovery_only_if_under_num, enable_mdns, kademlia_disjoint_query_paths, - protocol_ids, + kademlia_protocol_id, } = self; - let kademlias = protocol_ids - .into_iter() - .map(|protocol_id| { - let proto_name = protocol_name_from_protocol_id(&protocol_id); + let kademlia = kademlia_protocol_id.map(|protocol_id| { + let proto_name = protocol_name_from_protocol_id(&protocol_id); - let mut config = KademliaConfig::default(); - config.set_protocol_names(std::iter::once(proto_name.into()).collect()); - // By default Kademlia attempts to insert all peers into its routing table once a - // dialing attempt succeeds. In order to control which peer is added, disable the - // auto-insertion and instead add peers manually. - config.set_kbucket_inserts(KademliaBucketInserts::Manual); - config.disjoint_query_paths(kademlia_disjoint_query_paths); + let mut config = KademliaConfig::default(); + config.set_protocol_names(std::iter::once(proto_name.into()).collect()); + // By default Kademlia attempts to insert all peers into its routing table once a + // dialing attempt succeeds. In order to control which peer is added, disable the + // auto-insertion and instead add peers manually. + config.set_kbucket_inserts(KademliaBucketInserts::Manual); + config.disjoint_query_paths(kademlia_disjoint_query_paths); - let store = MemoryStore::new(local_peer_id); - let mut kad = Kademlia::with_config(local_peer_id, store, config); + let store = MemoryStore::new(local_peer_id); + let mut kad = Kademlia::with_config(local_peer_id, store, config); - for (peer_id, addr) in &permanent_addresses { - kad.add_address(peer_id, addr.clone()); - } + for (peer_id, addr) in &permanent_addresses { + kad.add_address(peer_id, addr.clone()); + } - (protocol_id, kad) - }) - .collect(); + kad + }); DiscoveryBehaviour { permanent_addresses, ephemeral_addresses: HashMap::new(), - kademlias, + kademlia: Toggle::from(kademlia), next_kad_random_query: if dht_random_walk { Some(Delay::new(Duration::new(0, 0))) } else { @@ -260,7 +253,7 @@ pub struct DiscoveryBehaviour { /// removed. ephemeral_addresses: HashMap>, /// Kademlia requests and answers. - kademlias: HashMap>, + kademlia: Toggle>, /// Discovers nodes on the local network. mdns: Option, /// Stream that fires when we need to perform the next random Kademlia query. `None` if @@ -289,7 +282,7 @@ impl DiscoveryBehaviour { /// Returns the list of nodes that we know exist in the network. pub fn known_peers(&mut self) -> HashSet { let mut peers = HashSet::new(); - for k in self.kademlias.values_mut() { + if let Some(k) = self.kademlia.as_mut() { for b in k.kbuckets() { for e in b.iter() { if !peers.contains(e.node.key.preimage()) { @@ -309,7 +302,7 @@ impl DiscoveryBehaviour { pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default(); if !addrs_list.iter().any(|a| *a == addr) { - for k in self.kademlias.values_mut() { + if let Some(k) = self.kademlia.as_mut() { k.add_address(&peer_id, addr.clone()); } @@ -318,8 +311,8 @@ impl DiscoveryBehaviour { } } - /// Add a self-reported address of a remote peer to the k-buckets of the supported - /// DHTs (`supported_protocols`). + /// Add a self-reported address of a remote peer to the k-buckets of the DHT + /// if it has compatible `supported_protocols`. /// /// **Note**: It is important that you call this method. The discovery mechanism will not /// automatically add connecting peers to the Kademlia k-buckets. @@ -329,13 +322,15 @@ impl DiscoveryBehaviour { supported_protocols: &[impl AsRef<[u8]>], addr: Multiaddr, ) { - if !self.allow_non_globals_in_dht && !self.can_add_to_dht(&addr) { - trace!(target: "sub-libp2p", "Ignoring self-reported non-global address {} from {}.", addr, peer_id); - return - } + if let Some(kademlia) = self.kademlia.as_mut() { + if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) { + trace!( + target: "sub-libp2p", + "Ignoring self-reported non-global address {} from {}.", addr, peer_id + ); + return + } - let mut added = false; - for kademlia in self.kademlias.values_mut() { if let Some(matching_protocol) = supported_protocols .iter() .find(|p| kademlia.protocol_names().iter().any(|k| k.as_ref() == p.as_ref())) @@ -346,24 +341,21 @@ impl DiscoveryBehaviour { addr, peer_id, String::from_utf8_lossy(matching_protocol.as_ref()), ); kademlia.add_address(peer_id, addr.clone()); - added = true; + } else { + trace!( + target: "sub-libp2p", + "Ignoring self-reported address {} from {} as remote node is not part of the \ + Kademlia DHT supported by the local node.", addr, peer_id, + ); } } - - if !added { - trace!( - target: "sub-libp2p", - "Ignoring self-reported address {} from {} as remote node is not part of any \ - Kademlia DHTs supported by the local node.", addr, peer_id, - ); - } } /// Start fetching a record from the DHT. /// /// A corresponding `ValueFound` or `ValueNotFound` event will later be generated. pub fn get_value(&mut self, key: record::Key) { - for k in self.kademlias.values_mut() { + if let Some(k) = self.kademlia.as_mut() { k.get_record(key.clone(), Quorum::One); } } @@ -373,7 +365,7 @@ impl DiscoveryBehaviour { /// /// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated. pub fn put_value(&mut self, key: record::Key, value: Vec) { - for k in self.kademlias.values_mut() { + if let Some(k) = self.kademlia.as_mut() { if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) { warn!(target: "sub-libp2p", "Libp2p => Failed to put record: {:?}", e); self.pending_events @@ -386,37 +378,27 @@ impl DiscoveryBehaviour { /// /// Identifies Kademlia instances by their [`ProtocolId`] and kbuckets by the base 2 logarithm /// of their lower bound. - pub fn num_entries_per_kbucket( - &mut self, - ) -> impl ExactSizeIterator)> { - self.kademlias.iter_mut().map(|(id, kad)| { - let buckets = kad - .kbuckets() + pub fn num_entries_per_kbucket(&mut self) -> Option> { + self.kademlia.as_mut().map(|kad| { + kad.kbuckets() .map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count())) - .collect(); - (id, buckets) + .collect() }) } /// Returns the number of records in the Kademlia record stores. - pub fn num_kademlia_records(&mut self) -> impl ExactSizeIterator { + pub fn num_kademlia_records(&mut self) -> Option { // Note that this code is ok only because we use a `MemoryStore`. - self.kademlias.iter_mut().map(|(id, kad)| { - let num = kad.store_mut().records().count(); - (id, num) - }) + self.kademlia.as_mut().map(|kad| kad.store_mut().records().count()) } /// Returns the total size in bytes of all the records in the Kademlia record stores. - pub fn kademlia_records_total_size( - &mut self, - ) -> impl ExactSizeIterator { + pub fn kademlia_records_total_size(&mut self) -> Option { // Note that this code is ok only because we use a `MemoryStore`. If the records were // for example stored on disk, this would load every single one of them every single time. - self.kademlias.iter_mut().map(|(id, kad)| { - let size = kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()); - (id, size) - }) + self.kademlia + .as_mut() + .map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len())) } /// Can the given `Multiaddr` be put into the DHT? @@ -425,7 +407,7 @@ impl DiscoveryBehaviour { // NB: Currently all DNS names are allowed and no check for TLD suffixes is done // because the set of valid domains is highly dynamic and would require frequent // updates, for example by utilising publicsuffix.org or IANA. - pub fn can_add_to_dht(&self, addr: &Multiaddr) -> bool { + pub fn can_add_to_dht(addr: &Multiaddr) -> bool { let ip = match addr.iter().next() { Some(Protocol::Ip4(ip)) => IpNetwork::from(ip), Some(Protocol::Ip6(ip)) => IpNetwork::from(ip), @@ -435,29 +417,6 @@ impl DiscoveryBehaviour { }; ip.is_global() } - - fn new_handler_with_replacement( - &mut self, - pid: ProtocolId, - handler: KademliaHandlerProto, - ) -> ::ConnectionHandler { - let mut handlers: HashMap<_, _> = self - .kademlias - .iter_mut() - .map(|(p, k)| (p.clone(), NetworkBehaviour::new_handler(k))) - .collect(); - - if let Some(h) = handlers.get_mut(&pid) { - *h = handler - } - - IntoMultiHandler::try_from_iter(handlers).expect( - "There can be at most one handler per `ProtocolId` and protocol names contain the \ - `ProtocolId` so no two protocol names in `self.kademlias` can be equal which is the \ - only error `try_from_iter` can return, therefore this call is guaranteed to succeed; \ - qed", - ) - } } /// Event generated by the `DiscoveryBehaviour`. @@ -498,28 +457,18 @@ pub enum DiscoveryOut { /// Returning the corresponding key as well as the request duration. ValuePutFailed(record::Key, Duration), - /// Started a random Kademlia query for each DHT identified by the given `ProtocolId`s. + /// Started a random Kademlia query. /// /// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`. - RandomKademliaStarted(Vec), + RandomKademliaStarted, } impl NetworkBehaviour for DiscoveryBehaviour { - type ConnectionHandler = IntoMultiHandler>; + type ConnectionHandler = ToggleIntoConnectionHandler>; type OutEvent = DiscoveryOut; fn new_handler(&mut self) -> Self::ConnectionHandler { - let iter = self - .kademlias - .iter_mut() - .map(|(p, k)| (p.clone(), NetworkBehaviour::new_handler(k))); - - IntoMultiHandler::try_from_iter(iter).expect( - "There can be at most one handler per `ProtocolId` and protocol names contain the \ - `ProtocolId` so no two protocol names in `self.kademlias` can be equal which is the \ - only error `try_from_iter` can return, therefore this call is guaranteed to succeed; \ - qed", - ) + self.kademlia.new_handler() } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { @@ -534,10 +483,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { } { - let mut list_to_filter = Vec::new(); - for k in self.kademlias.values_mut() { - list_to_filter.extend(k.addresses_of_peer(peer_id)) - } + let mut list_to_filter = self.kademlia.addresses_of_peer(peer_id); if let Some(ref mut mdns) = self.mdns { list_to_filter.extend(mdns.addresses_of_peer(peer_id)); @@ -566,9 +512,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { old: &ConnectedPoint, new: &ConnectedPoint, ) { - for k in self.kademlias.values_mut() { - NetworkBehaviour::inject_address_change(k, peer_id, connection_id, old, new); - } + self.kademlia.inject_address_change(peer_id, connection_id, old, new) } fn inject_connection_established( @@ -580,16 +524,13 @@ impl NetworkBehaviour for DiscoveryBehaviour { other_established: usize, ) { self.num_connections += 1; - for k in self.kademlias.values_mut() { - NetworkBehaviour::inject_connection_established( - k, - peer_id, - conn, - endpoint, - failed_addresses, - other_established, - ) - } + self.kademlia.inject_connection_established( + peer_id, + conn, + endpoint, + failed_addresses, + other_established, + ) } fn inject_connection_closed( @@ -601,23 +542,19 @@ impl NetworkBehaviour for DiscoveryBehaviour { remaining_established: usize, ) { self.num_connections -= 1; - for (pid, event) in handler.into_iter() { - if let Some(kad) = self.kademlias.get_mut(&pid) { - kad.inject_connection_closed(peer_id, conn, endpoint, event, remaining_established) - } else { - error!( - target: "sub-libp2p", - "inject_connection_closed: no kademlia instance registered for protocol {:?}", - pid, - ) - } - } + self.kademlia.inject_connection_closed( + peer_id, + conn, + endpoint, + handler, + remaining_established, + ) } fn inject_dial_failure( &mut self, peer_id: Option, - _: Self::ConnectionHandler, + handler: Self::ConnectionHandler, error: &DialError, ) { if let Some(peer_id) = peer_id { @@ -630,32 +567,22 @@ impl NetworkBehaviour for DiscoveryBehaviour { } } - for k in self.kademlias.values_mut() { - let handler = k.new_handler(); - NetworkBehaviour::inject_dial_failure(k, peer_id, handler, error); - } + self.kademlia.inject_dial_failure(peer_id, handler, error) } fn inject_event( &mut self, peer_id: PeerId, connection: ConnectionId, - (pid, event): <::Handler as ConnectionHandler>::OutEvent, + event: <::Handler as ConnectionHandler>::OutEvent, ) { - if let Some(kad) = self.kademlias.get_mut(&pid) { - return kad.inject_event(peer_id, connection, event) - } - error!( - target: "sub-libp2p", - "inject_node_event: no kademlia instance registered for protocol {:?}", - pid, - ) + self.kademlia.inject_event(peer_id, connection, event) } fn inject_new_external_addr(&mut self, addr: &Multiaddr) { let new_addr = addr.clone().with(Protocol::P2p(self.local_peer_id.into())); - if self.can_add_to_dht(addr) { + if Self::can_add_to_dht(addr) { // NOTE: we might re-discover the same address multiple times // in which case we just want to refrain from logging. if self.known_external_addresses.insert(new_addr.clone()) { @@ -667,36 +594,26 @@ impl NetworkBehaviour for DiscoveryBehaviour { } } - for k in self.kademlias.values_mut() { - NetworkBehaviour::inject_new_external_addr(k, addr) - } + self.kademlia.inject_new_external_addr(addr) } fn inject_expired_external_addr(&mut self, addr: &Multiaddr) { // We intentionally don't remove the element from `known_external_addresses` in order // to not print the log line again. - for k in self.kademlias.values_mut() { - NetworkBehaviour::inject_expired_external_addr(k, addr) - } + self.kademlia.inject_expired_external_addr(addr) } fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { - for k in self.kademlias.values_mut() { - NetworkBehaviour::inject_expired_listen_addr(k, id, addr) - } + self.kademlia.inject_expired_listen_addr(id, addr) } fn inject_new_listener(&mut self, id: ListenerId) { - for k in self.kademlias.values_mut() { - NetworkBehaviour::inject_new_listener(k, id) - } + self.kademlia.inject_new_listener(id) } fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { - for k in self.kademlias.values_mut() { - NetworkBehaviour::inject_new_listen_addr(k, id, addr) - } + self.kademlia.inject_new_listen_addr(id, addr) } fn inject_listen_failure(&mut self, _: &Multiaddr, _: &Multiaddr, _: Self::ConnectionHandler) { @@ -704,15 +621,11 @@ impl NetworkBehaviour for DiscoveryBehaviour { } fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { - for k in self.kademlias.values_mut() { - NetworkBehaviour::inject_listener_error(k, id, err) - } + self.kademlia.inject_listener_error(id, err) } fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) { - for k in self.kademlias.values_mut() { - NetworkBehaviour::inject_listener_closed(k, id, reason) - } + self.kademlia.inject_listener_closed(id, reason) } fn poll( @@ -726,198 +639,189 @@ impl NetworkBehaviour for DiscoveryBehaviour { } // Poll the stream that fires when we need to start a random Kademlia query. - if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() { - while next_kad_random_query.poll_unpin(cx).is_ready() { - let actually_started = if self.num_connections < self.discovery_only_if_under_num { - let random_peer_id = PeerId::random(); - debug!( - target: "sub-libp2p", - "Libp2p <= Starting random Kademlia request for {:?}", - random_peer_id, - ); - for k in self.kademlias.values_mut() { - k.get_closest_peers(random_peer_id); + if let Some(kademlia) = self.kademlia.as_mut() { + if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() { + while next_kad_random_query.poll_unpin(cx).is_ready() { + let actually_started = + if self.num_connections < self.discovery_only_if_under_num { + let random_peer_id = PeerId::random(); + debug!( + target: "sub-libp2p", + "Libp2p <= Starting random Kademlia request for {:?}", + random_peer_id, + ); + kademlia.get_closest_peers(random_peer_id); + true + } else { + debug!( + target: "sub-libp2p", + "Kademlia paused due to high number of connections ({})", + self.num_connections + ); + false + }; + + // Schedule the next random query with exponentially increasing delay, + // capped at 60 seconds. + *next_kad_random_query = Delay::new(self.duration_to_next_kad); + self.duration_to_next_kad = + cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60)); + + if actually_started { + let ev = DiscoveryOut::RandomKademliaStarted; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) } - true - } else { - debug!( - target: "sub-libp2p", - "Kademlia paused due to high number of connections ({})", - self.num_connections - ); - false - }; - - // Schedule the next random query with exponentially increasing delay, - // capped at 60 seconds. - *next_kad_random_query = Delay::new(self.duration_to_next_kad); - self.duration_to_next_kad = - cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60)); - - if actually_started { - let ev = DiscoveryOut::RandomKademliaStarted( - self.kademlias.keys().cloned().collect(), - ); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) } } } - // Poll Kademlias. - for (pid, kademlia) in &mut self.kademlias { - while let Poll::Ready(ev) = kademlia.poll(cx, params) { - match ev { - NetworkBehaviourAction::GenerateEvent(ev) => match ev { - KademliaEvent::RoutingUpdated { peer, .. } => { - let ev = DiscoveryOut::Discovered(peer); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) - }, - KademliaEvent::UnroutablePeer { peer, .. } => { - let ev = DiscoveryOut::UnroutablePeer(peer); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) - }, - KademliaEvent::RoutablePeer { peer, .. } => { - let ev = DiscoveryOut::Discovered(peer); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) - }, - KademliaEvent::PendingRoutablePeer { .. } | - KademliaEvent::InboundRequest { .. } => { - // We are not interested in this event at the moment. + while let Poll::Ready(ev) = self.kademlia.poll(cx, params) { + match ev { + NetworkBehaviourAction::GenerateEvent(ev) => match ev { + KademliaEvent::RoutingUpdated { peer, .. } => { + let ev = DiscoveryOut::Discovered(peer); + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + }, + KademliaEvent::UnroutablePeer { peer, .. } => { + let ev = DiscoveryOut::UnroutablePeer(peer); + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + }, + KademliaEvent::RoutablePeer { peer, .. } => { + let ev = DiscoveryOut::Discovered(peer); + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + }, + KademliaEvent::PendingRoutablePeer { .. } | + KademliaEvent::InboundRequest { .. } => { + // We are not interested in this event at the moment. + }, + KademliaEvent::OutboundQueryCompleted { + result: QueryResult::GetClosestPeers(res), + .. + } => match res { + Err(GetClosestPeersError::Timeout { key, peers }) => { + debug!( + target: "sub-libp2p", + "Libp2p => Query for {:?} timed out with {} results", + HexDisplay::from(&key), peers.len(), + ); }, - KademliaEvent::OutboundQueryCompleted { - result: QueryResult::GetClosestPeers(res), - .. - } => match res { - Err(GetClosestPeersError::Timeout { key, peers }) => { + Ok(ok) => { + trace!( + target: "sub-libp2p", + "Libp2p => Query for {:?} yielded {:?} results", + HexDisplay::from(&ok.key), ok.peers.len(), + ); + if ok.peers.is_empty() && self.num_connections != 0 { debug!( target: "sub-libp2p", - "Libp2p => Query for {:?} timed out with {} results", - HexDisplay::from(&key), peers.len(), + "Libp2p => Random Kademlia query has yielded empty results", ); - }, + } + }, + }, + KademliaEvent::OutboundQueryCompleted { + result: QueryResult::GetRecord(res), + stats, + .. + } => { + let ev = match res { Ok(ok) => { + let results = ok + .records + .into_iter() + .map(|r| (r.record.key, r.record.value)) + .collect(); + + DiscoveryOut::ValueFound( + results, + stats.duration().unwrap_or_default(), + ) + }, + Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => { trace!( target: "sub-libp2p", - "Libp2p => Query for {:?} yielded {:?} results", - HexDisplay::from(&ok.key), ok.peers.len(), + "Libp2p => Failed to get record: {:?}", + e, ); - if ok.peers.is_empty() && self.num_connections != 0 { - debug!( - target: "sub-libp2p", - "Libp2p => Random Kademlia query has yielded empty results", - ); - } + DiscoveryOut::ValueNotFound( + e.into_key(), + stats.duration().unwrap_or_default(), + ) }, - }, - KademliaEvent::OutboundQueryCompleted { - result: QueryResult::GetRecord(res), - stats, - .. - } => { - let ev = match res { - Ok(ok) => { - let results = ok - .records - .into_iter() - .map(|r| (r.record.key, r.record.value)) - .collect(); - - DiscoveryOut::ValueFound( - results, - stats.duration().unwrap_or_default(), - ) - }, - Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => { - trace!( - target: "sub-libp2p", - "Libp2p => Failed to get record: {:?}", - e, - ); - DiscoveryOut::ValueNotFound( - e.into_key(), - stats.duration().unwrap_or_default(), - ) - }, - Err(e) => { - debug!( - target: "sub-libp2p", - "Libp2p => Failed to get record: {:?}", - e, - ); - DiscoveryOut::ValueNotFound( - e.into_key(), - stats.duration().unwrap_or_default(), - ) - }, - }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) - }, - KademliaEvent::OutboundQueryCompleted { - result: QueryResult::PutRecord(res), - stats, - .. - } => { - let ev = match res { - Ok(ok) => DiscoveryOut::ValuePut( - ok.key, + Err(e) => { + debug!( + target: "sub-libp2p", + "Libp2p => Failed to get record: {:?}", + e, + ); + DiscoveryOut::ValueNotFound( + e.into_key(), stats.duration().unwrap_or_default(), - ), - Err(e) => { - debug!( - target: "sub-libp2p", - "Libp2p => Failed to put record: {:?}", - e, - ); - DiscoveryOut::ValuePutFailed( - e.into_key(), - stats.duration().unwrap_or_default(), - ) - }, - }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) - }, - KademliaEvent::OutboundQueryCompleted { - result: QueryResult::RepublishRecord(res), - .. - } => match res { - Ok(ok) => debug!( - target: "sub-libp2p", - "Libp2p => Record republished: {:?}", - ok.key, - ), - Err(e) => debug!( - target: "sub-libp2p", - "Libp2p => Republishing of record {:?} failed with: {:?}", - e.key(), e, - ), - }, - // We never start any other type of query. - KademliaEvent::OutboundQueryCompleted { result: e, .. } => { - warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e) - }, + ) + }, + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) }, - NetworkBehaviourAction::Dial { opts, handler } => { - let pid = pid.clone(); - let handler = self.new_handler_with_replacement(pid, handler); - return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) + KademliaEvent::OutboundQueryCompleted { + result: QueryResult::PutRecord(res), + stats, + .. + } => { + let ev = match res { + Ok(ok) => + DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default()), + Err(e) => { + debug!( + target: "sub-libp2p", + "Libp2p => Failed to put record: {:?}", + e, + ); + DiscoveryOut::ValuePutFailed( + e.into_key(), + stats.duration().unwrap_or_default(), + ) + }, + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) }, - NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } => - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event: (pid.clone(), event), - }), - NetworkBehaviourAction::ReportObservedAddr { address, score } => - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - score, - }), - NetworkBehaviourAction::CloseConnection { peer_id, connection } => - return Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }), - } + KademliaEvent::OutboundQueryCompleted { + result: QueryResult::RepublishRecord(res), + .. + } => match res { + Ok(ok) => debug!( + target: "sub-libp2p", + "Libp2p => Record republished: {:?}", + ok.key, + ), + Err(e) => debug!( + target: "sub-libp2p", + "Libp2p => Republishing of record {:?} failed with: {:?}", + e.key(), e, + ), + }, + // We never start any other type of query. + KademliaEvent::OutboundQueryCompleted { result: e, .. } => { + warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e) + }, + }, + NetworkBehaviourAction::Dial { opts, handler } => + return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }), + NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } => + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event, + }), + NetworkBehaviourAction::ReportObservedAddr { address, score } => + return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { + address, + score, + }), + NetworkBehaviourAction::CloseConnection { peer_id, connection } => + return Poll::Ready(NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + }), } } @@ -1014,7 +918,7 @@ mod tests { .allow_private_ipv4(true) .allow_non_globals_in_dht(true) .discovery_limit(50) - .add_protocol(protocol_id.clone()); + .with_kademlia(protocol_id.clone()); config.finish() }; @@ -1078,7 +982,7 @@ mod tests { to_discover[swarm_n].remove(&other); }, - DiscoveryOut::RandomKademliaStarted(_) => {}, + DiscoveryOut::RandomKademliaStarted => {}, e => { panic!("Unexpected event: {:?}", e) }, @@ -1117,7 +1021,7 @@ mod tests { .allow_private_ipv4(true) .allow_non_globals_in_dht(true) .discovery_limit(50) - .add_protocol(supported_protocol_id.clone()); + .with_kademlia(supported_protocol_id.clone()); config.finish() }; @@ -1131,7 +1035,8 @@ mod tests { remote_addr.clone(), ); - for kademlia in discovery.kademlias.values_mut() { + { + let kademlia = discovery.kademlia.as_mut().unwrap(); assert!( kademlia .kbucket(remote_peer_id) @@ -1148,66 +1053,14 @@ mod tests { remote_addr.clone(), ); - for kademlia in discovery.kademlias.values_mut() { - assert_eq!( - 1, - kademlia - .kbucket(remote_peer_id) - .expect("Remote peer id not to be equal to local peer id.") - .num_entries(), - "Expect peer with supported protocol to be added." - ); - } - } - - #[test] - fn discovery_adds_peer_to_kademlia_of_same_protocol_only() { - let protocol_a = ProtocolId::from("a"); - let protocol_b = ProtocolId::from("b"); - - let mut discovery = { - let keypair = Keypair::generate_ed25519(); - let mut config = DiscoveryConfig::new(keypair.public()); - config - .allow_private_ipv4(true) - .allow_non_globals_in_dht(true) - .discovery_limit(50) - .add_protocol(protocol_a.clone()) - .add_protocol(protocol_b.clone()); - config.finish() - }; - - let remote_peer_id = PeerId::random(); - let remote_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); - - // Add remote peer with `protocol_a` only. - discovery.add_self_reported_address( - &remote_peer_id, - &[protocol_name_from_protocol_id(&protocol_a)], - remote_addr.clone(), - ); - + let kademlia = discovery.kademlia.as_mut().unwrap(); assert_eq!( 1, - discovery - .kademlias - .get_mut(&protocol_a) - .expect("Kademlia instance to exist.") + kademlia .kbucket(remote_peer_id) .expect("Remote peer id not to be equal to local peer id.") .num_entries(), - "Expected remote peer to be added to `protocol_a` Kademlia instance.", - ); - - assert!( - discovery - .kademlias - .get_mut(&protocol_b) - .expect("Kademlia instance to exist.") - .kbucket(remote_peer_id) - .expect("Remote peer id not to be equal to local peer id.") - .is_empty(), - "Expected remote peer not to be added to `protocol_b` Kademlia instance.", + "Expect peer with supported protocol to be added." ); } } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index f95e67df142b0..d24a1543c56fe 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -284,7 +284,7 @@ where config.discovery_limit( u64::from(params.network_config.default_peers_set.out_peers) + 15, ); - config.add_protocol(params.protocol_id.clone()); + config.with_kademlia(params.protocol_id.clone()); config.with_dht_random_walk(params.network_config.enable_dht_random_walk); config.allow_non_globals_in_dht(params.network_config.allow_non_globals_in_dht); config.use_kademlia_disjoint_query_paths( @@ -1665,16 +1665,9 @@ where .user_protocol_mut() .add_default_set_discovered_nodes(iter::once(peer_id)); }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted( - protocols, - ))) => + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted)) => if let Some(metrics) = this.metrics.as_ref() { - for protocol in protocols { - metrics - .kademlia_random_queries_total - .with_label_values(&[protocol.as_ref()]) - .inc(); - } + metrics.kademlia_random_queries_total.inc(); }, Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened { remote, @@ -2015,28 +2008,21 @@ where this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); if let Some(metrics) = this.metrics.as_ref() { - for (proto, buckets) in this.network_service.behaviour_mut().num_entries_per_kbucket() { + if let Some(buckets) = this.network_service.behaviour_mut().num_entries_per_kbucket() { for (lower_ilog2_bucket_bound, num_entries) in buckets { metrics .kbuckets_num_nodes - .with_label_values(&[proto.as_ref(), &lower_ilog2_bucket_bound.to_string()]) + .with_label_values(&[&lower_ilog2_bucket_bound.to_string()]) .set(num_entries as u64); } } - for (proto, num_entries) in this.network_service.behaviour_mut().num_kademlia_records() - { - metrics - .kademlia_records_count - .with_label_values(&[proto.as_ref()]) - .set(num_entries as u64); + if let Some(num_entries) = this.network_service.behaviour_mut().num_kademlia_records() { + metrics.kademlia_records_count.set(num_entries as u64); } - for (proto, num_entries) in + if let Some(num_entries) = this.network_service.behaviour_mut().kademlia_records_total_size() { - metrics - .kademlia_records_sizes_total - .with_label_values(&[proto.as_ref()]) - .set(num_entries as u64); + metrics.kademlia_records_sizes_total.set(num_entries as u64); } metrics .peerset_num_discovered diff --git a/client/network/src/service/metrics.rs b/client/network/src/service/metrics.rs index 4b63df00b8d66..db1b6f7f6500d 100644 --- a/client/network/src/service/metrics.rs +++ b/client/network/src/service/metrics.rs @@ -59,9 +59,9 @@ pub struct Metrics { pub incoming_connections_total: Counter, pub issued_light_requests: Counter, pub kademlia_query_duration: HistogramVec, - pub kademlia_random_queries_total: CounterVec, - pub kademlia_records_count: GaugeVec, - pub kademlia_records_sizes_total: GaugeVec, + pub kademlia_random_queries_total: Counter, + pub kademlia_records_count: Gauge, + pub kademlia_records_sizes_total: Gauge, pub kbuckets_num_nodes: GaugeVec, pub listeners_local_addresses: Gauge, pub listeners_errors_total: Counter, @@ -138,33 +138,24 @@ impl Metrics { }, &["type"] )?, registry)?, - kademlia_random_queries_total: prometheus::register(CounterVec::new( - Opts::new( - "substrate_sub_libp2p_kademlia_random_queries_total", - "Number of random Kademlia queries started" - ), - &["protocol"] + kademlia_random_queries_total: prometheus::register(Counter::new( + "substrate_sub_libp2p_kademlia_random_queries_total", + "Number of random Kademlia queries started", )?, registry)?, - kademlia_records_count: prometheus::register(GaugeVec::new( - Opts::new( - "substrate_sub_libp2p_kademlia_records_count", - "Number of records in the Kademlia records store" - ), - &["protocol"] + kademlia_records_count: prometheus::register(Gauge::new( + "substrate_sub_libp2p_kademlia_records_count", + "Number of records in the Kademlia records store", )?, registry)?, - kademlia_records_sizes_total: prometheus::register(GaugeVec::new( - Opts::new( - "substrate_sub_libp2p_kademlia_records_sizes_total", - "Total size of all the records in the Kademlia records store" - ), - &["protocol"] + kademlia_records_sizes_total: prometheus::register(Gauge::new( + "substrate_sub_libp2p_kademlia_records_sizes_total", + "Total size of all the records in the Kademlia records store", )?, registry)?, kbuckets_num_nodes: prometheus::register(GaugeVec::new( Opts::new( "substrate_sub_libp2p_kbuckets_num_nodes", "Number of nodes per kbucket per Kademlia instance" ), - &["protocol", "lower_ilog2_bucket_bound"] + &["lower_ilog2_bucket_bound"] )?, registry)?, listeners_local_addresses: prometheus::register(Gauge::new( "substrate_sub_libp2p_listeners_local_addresses", From 131abb33779521821345990a6be9e735e7dd60d8 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 19 Oct 2022 13:05:03 +0300 Subject: [PATCH 2/3] Apply suggestions: comment that Kad is always enabled --- client/network/src/discovery.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index 869b58e59a1b2..8a007095f085b 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -252,7 +252,8 @@ pub struct DiscoveryBehaviour { /// Same as `permanent_addresses`, except that addresses that fail to reach a peer are /// removed. ephemeral_addresses: HashMap>, - /// Kademlia requests and answers. + /// Kademlia requests and answers. Even though it's wrapped in `Toggle`, currently + /// it's always enabled in `NetworkWorker::new()`. kademlia: Toggle>, /// Discovers nodes on the local network. mdns: Option, From b4ecac1f798a7ab050e6c326bcabf7308395f4dd Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 20 Oct 2022 11:49:38 +0300 Subject: [PATCH 3/3] Apply suggestions: update `DiscoveryConfig` comment --- client/network/src/discovery.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index 8a007095f085b..712c3af97f58e 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -90,8 +90,8 @@ const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32; /// `DiscoveryBehaviour` configuration. /// -/// Note: In order to discover nodes or load and store values via Kademlia one has to add at least -/// one protocol via [`DiscoveryConfig::add_protocol`]. +/// Note: In order to discover nodes or load and store values via Kademlia one has to add +/// Kademlia protocol via [`DiscoveryConfig::with_kademlia`]. pub struct DiscoveryConfig { local_peer_id: PeerId, permanent_addresses: Vec<(PeerId, Multiaddr)>,