Skip to content

Commit

Permalink
feat(kad): expose a kad query facility allowing dynamic num_results (#…
Browse files Browse the repository at this point in the history
…5555)

## Description

This PR is to expose a kad query facility that allowing specify
num_results dynamically.
It is related to the [Sybil Defence
issue](#4769),
that during the attempt of implementation on higher level code, it is
find will be useful if libp2p-kad can expose such facility.

The PR try not to cause any interference to the existing work flow, only
introduce an `extra exposal`.

## Change checklist

<!-- Please add a Changelog entry in the appropriate crates and bump the
crate versions if needed. See
<https://github.com/libp2p/rust-libp2p/blob/master/docs/release.md#development-between-releases>-->

- [x] I have performed a self-review of my own code
- [x] I have made corresponding changes to the documentation
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [x] A changelog entry has been made in the appropriate crates

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
maqi and mergify[bot] authored Aug 27, 2024
1 parent aa9317f commit 56b6c62
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.47.0", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.45.0", path = "protocols/identify" }
libp2p-identity = { version = "0.2.9" }
libp2p-kad = { version = "0.46.2", path = "protocols/kad" }
libp2p-kad = { version = "0.47.0", path = "protocols/kad" }
libp2p-mdns = { version = "0.46.0", path = "protocols/mdns" }
libp2p-memory-connection-limits = { version = "0.3.0", path = "misc/memory-connection-limits" }
libp2p-metrics = { version = "0.15.0", path = "misc/metrics" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.47.0

- Expose a kad query facility allowing specify num_results dynamicly.
See [PR 5555](https://github.com/libp2p/rust-libp2p/pull/5555).

## 0.46.2

- Emit `ToSwarm::NewExternalAddrOfPeer`.
Expand Down
2 changes: 1 addition & 1 deletion protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-kad"
edition = "2021"
rust-version = { workspace = true }
description = "Kademlia protocol for libp2p"
version = "0.46.2"
version = "0.47.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
32 changes: 30 additions & 2 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,31 @@ where
/// The result of the query is delivered in a
/// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
where
K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
{
self.get_closest_peers_inner(key, None)
}

/// Initiates an iterative query for the closest peers to the given key.
/// The expected responding peers is specified by `num_results`
/// Note that the result is capped after exceeds K_VALUE
///
/// The result of the query is delivered in a
/// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
pub fn get_n_closest_peers<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
where
K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
{
// The inner code never expect higher than K_VALUE results to be returned.
// And removing such cap will be tricky,
// since it would involve forging a new key and additional requests.
// Hence bound to K_VALUE here to set clear expectation and prevent unexpected behaviour.
let capped_num_results = std::cmp::min(num_results, K_VALUE);
self.get_closest_peers_inner(key, Some(capped_num_results))
}

fn get_closest_peers_inner<K>(&mut self, key: K, num_results: Option<NonZeroUsize>) -> QueryId
where
K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
{
Expand All @@ -740,6 +765,7 @@ where
let info = QueryInfo::GetClosestPeers {
key,
step: ProgressStep::first(),
num_results,
};
let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
self.queries.add_iter_closest(target, peer_keys, info)
Expand Down Expand Up @@ -1485,7 +1511,7 @@ where
})
}

QueryInfo::GetClosestPeers { key, mut step } => {
QueryInfo::GetClosestPeers { key, mut step, .. } => {
step.last = true;

Some(Event::OutboundQueryProgressed {
Expand Down Expand Up @@ -1702,7 +1728,7 @@ where
},
}),

QueryInfo::GetClosestPeers { key, mut step } => {
QueryInfo::GetClosestPeers { key, mut step, .. } => {
step.last = true;
Some(Event::OutboundQueryProgressed {
id: query_id,
Expand Down Expand Up @@ -3181,6 +3207,8 @@ pub enum QueryInfo {
key: Vec<u8>,
/// Current index of events.
step: ProgressStep,
/// If required, `num_results` specifies expected responding peers
num_results: Option<NonZeroUsize>,
},

/// A (repeated) query initiated by [`Behaviour::get_providers`].
Expand Down
64 changes: 63 additions & 1 deletion protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ fn query_iter() {

match swarms[0].behaviour_mut().query(&qid) {
Some(q) => match q.info() {
QueryInfo::GetClosestPeers { key, step } => {
QueryInfo::GetClosestPeers { key, step, .. } => {
assert_eq!(&key[..], search_target.to_bytes().as_slice());
assert_eq!(usize::from(step.count), 1);
}
Expand Down Expand Up @@ -425,6 +425,68 @@ fn unresponsive_not_returned_indirect() {
}))
}

// Test the result of get_closest_peers with different num_results
// Note that the result is capped after exceeds K_VALUE
#[test]
fn get_closest_with_different_num_results() {
let k_value = K_VALUE.get();
for replication_factor in [5, k_value / 2, k_value] {
for num_results in k_value / 2..k_value * 2 {
get_closest_with_different_num_results_inner(num_results, replication_factor)
}
}
}

fn get_closest_with_different_num_results_inner(num_results: usize, replication_factor: usize) {
let k_value = K_VALUE.get();
let num_of_nodes = 3 * k_value;
let mut cfg = Config::new(PROTOCOL_NAME);
cfg.set_replication_factor(NonZeroUsize::new(replication_factor).unwrap());
let swarms = build_connected_nodes_with_config(num_of_nodes, replication_factor - 1, cfg);

let mut swarms = swarms
.into_iter()
.map(|(_addr, swarm)| swarm)
.collect::<Vec<_>>();

// Ask first to search a random value.
let search_target = PeerId::random();
let Some(num_results_nonzero) = std::num::NonZeroUsize::new(num_results) else {
panic!("Unexpected NonZeroUsize val of {num_results}");
};
swarms[0]
.behaviour_mut()
.get_n_closest_peers(search_target, num_results_nonzero);

block_on(poll_fn(move |ctx| {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(Ok(ok)),
..
}))) => {
assert_eq!(&ok.key[..], search_target.to_bytes().as_slice());
if num_results > k_value {
assert_eq!(ok.peers.len(), k_value, "Failed with replication_factor: {replication_factor}, num_results: {num_results}");
} else {
assert_eq!(ok.peers.len(), num_results, "Failed with replication_factor: {replication_factor}, num_results: {num_results}");
}

return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {e:?}"),
Poll::Pending => break,
}
}
}

Poll::Pending
}))
}

#[test]
fn get_record_not_found() {
let mut swarms = build_nodes(3);
Expand Down
10 changes: 9 additions & 1 deletion protocols/kad/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,16 @@ impl QueryPool {
T: Into<KeyBytes> + Clone,
I: IntoIterator<Item = Key<PeerId>>,
{
let num_results = match info {
QueryInfo::GetClosestPeers {
num_results: Some(val),
..
} => val,
_ => self.config.replication_factor,
};

let cfg = ClosestPeersIterConfig {
num_results: self.config.replication_factor,
num_results,
parallelism: self.config.parallelism,
..ClosestPeersIterConfig::default()
};
Expand Down

0 comments on commit 56b6c62

Please sign in to comment.