Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kad): expose a kad query facility allowing dynamic num_results #5555

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.47.0", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.45.0", path = "protocols/identify" }
libp2p-identity = { version = "0.2.9" }
libp2p-kad = { version = "0.46.2", path = "protocols/kad" }
libp2p-kad = { version = "0.47.0", path = "protocols/kad" }
libp2p-mdns = { version = "0.46.0", path = "protocols/mdns" }
libp2p-memory-connection-limits = { version = "0.3.0", path = "misc/memory-connection-limits" }
libp2p-metrics = { version = "0.15.0", path = "misc/metrics" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.47.0

- Expose a kad query facility allowing specify num_results dynamicly.
See [PR 5555](https://github.com/libp2p/rust-libp2p/pull/5555).

## 0.46.2

- Emit `ToSwarm::NewExternalAddrOfPeer`.
Expand Down
2 changes: 1 addition & 1 deletion protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-kad"
edition = "2021"
rust-version = { workspace = true }
description = "Kademlia protocol for libp2p"
version = "0.46.2"
version = "0.47.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
32 changes: 30 additions & 2 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,31 @@ where
/// The result of the query is delivered in a
/// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
where
K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
{
self.get_closest_peers_inner(key, None)
}

/// Initiates an iterative query for the closest peers to the given key.
/// The expected responding peers is specified by `num_results`
/// Note that the result is capped after exceeds K_VALUE
///
/// The result of the query is delivered in a
/// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
pub fn get_n_closest_peers<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
where
K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
{
// The inner code never expect higher than K_VALUE results to be returned.
// And removing such cap will be tricky,
// since it would involve forging a new key and additional requests.
// Hence bound to K_VALUE here to set clear expectation and prevent unexpected behaviour.
let capped_num_results = std::cmp::min(num_results, K_VALUE);
self.get_closest_peers_inner(key, Some(capped_num_results))
}

fn get_closest_peers_inner<K>(&mut self, key: K, num_results: Option<NonZeroUsize>) -> QueryId
where
K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
{
Expand All @@ -740,6 +765,7 @@ where
let info = QueryInfo::GetClosestPeers {
key,
step: ProgressStep::first(),
num_results,
};
let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
self.queries.add_iter_closest(target, peer_keys, info)
Expand Down Expand Up @@ -1485,7 +1511,7 @@ where
})
}

QueryInfo::GetClosestPeers { key, mut step } => {
QueryInfo::GetClosestPeers { key, mut step, .. } => {
step.last = true;

Some(Event::OutboundQueryProgressed {
Expand Down Expand Up @@ -1702,7 +1728,7 @@ where
},
}),

QueryInfo::GetClosestPeers { key, mut step } => {
QueryInfo::GetClosestPeers { key, mut step, .. } => {
step.last = true;
Some(Event::OutboundQueryProgressed {
id: query_id,
Expand Down Expand Up @@ -3181,6 +3207,8 @@ pub enum QueryInfo {
key: Vec<u8>,
/// Current index of events.
step: ProgressStep,
/// If required, `num_results` specifies expected responding peers
num_results: Option<NonZeroUsize>,
},

/// A (repeated) query initiated by [`Behaviour::get_providers`].
Expand Down
64 changes: 63 additions & 1 deletion protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ fn query_iter() {

match swarms[0].behaviour_mut().query(&qid) {
Some(q) => match q.info() {
QueryInfo::GetClosestPeers { key, step } => {
QueryInfo::GetClosestPeers { key, step, .. } => {
assert_eq!(&key[..], search_target.to_bytes().as_slice());
assert_eq!(usize::from(step.count), 1);
}
Expand Down Expand Up @@ -425,6 +425,68 @@ fn unresponsive_not_returned_indirect() {
}))
}

// Test the result of get_closest_peers with different num_results
// Note that the result is capped after exceeds K_VALUE
#[test]
fn get_closest_with_different_num_results() {
let k_value = K_VALUE.get();
for replication_factor in [5, k_value / 2, k_value] {
for num_results in k_value / 2..k_value * 2 {
get_closest_with_different_num_results_inner(num_results, replication_factor)
jxs marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

fn get_closest_with_different_num_results_inner(num_results: usize, replication_factor: usize) {
let k_value = K_VALUE.get();
let num_of_nodes = 3 * k_value;
let mut cfg = Config::new(PROTOCOL_NAME);
cfg.set_replication_factor(NonZeroUsize::new(replication_factor).unwrap());
let swarms = build_connected_nodes_with_config(num_of_nodes, replication_factor - 1, cfg);

let mut swarms = swarms
.into_iter()
.map(|(_addr, swarm)| swarm)
.collect::<Vec<_>>();

// Ask first to search a random value.
let search_target = PeerId::random();
let Some(num_results_nonzero) = std::num::NonZeroUsize::new(num_results) else {
panic!("Unexpected NonZeroUsize val of {num_results}");
};
swarms[0]
.behaviour_mut()
.get_n_closest_peers(search_target, num_results_nonzero);

block_on(poll_fn(move |ctx| {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(Ok(ok)),
..
}))) => {
assert_eq!(&ok.key[..], search_target.to_bytes().as_slice());
if num_results > k_value {
assert_eq!(ok.peers.len(), k_value, "Failed with replication_factor: {replication_factor}, num_results: {num_results}");
} else {
assert_eq!(ok.peers.len(), num_results, "Failed with replication_factor: {replication_factor}, num_results: {num_results}");
}

return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {e:?}"),
Poll::Pending => break,
}
}
}

Poll::Pending
}))
}

#[test]
fn get_record_not_found() {
let mut swarms = build_nodes(3);
Expand Down
10 changes: 9 additions & 1 deletion protocols/kad/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,16 @@ impl QueryPool {
T: Into<KeyBytes> + Clone,
I: IntoIterator<Item = Key<PeerId>>,
{
let num_results = match info {
QueryInfo::GetClosestPeers {
num_results: Some(val),
..
} => val,
_ => self.config.replication_factor,
};

let cfg = ClosestPeersIterConfig {
num_results: self.config.replication_factor,
num_results,
parallelism: self.config.parallelism,
..ClosestPeersIterConfig::default()
};
Expand Down
Loading