Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad: Update routing table on kademlia established connections #184

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
28 changes: 15 additions & 13 deletions src/protocol/libp2p/kademlia/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ impl KBucket {
}

/// Get entry into the bucket.
// TODO: this is horrible code
pub fn entry<K: Clone>(&mut self, key: Key<K>) -> KBucketEntry<'_> {
for i in 0..self.nodes.len() {
if self.nodes[i].key == key {
return KBucketEntry::Occupied(&mut self.nodes[i]);
let mut replace_candidate = None;
for (index, node) in self.nodes.iter().enumerate() {
// If the node is already present in the k-bucket, return it.
if node.key.as_ref() == key.as_ref() {
return KBucketEntry::Occupied(&mut self.nodes[index]);
}

// Cache a not-connected node to replace it if necessary.
if node.connection == ConnectionType::NotConnected {
replace_candidate = Some(index);
}
}

Expand All @@ -83,17 +89,13 @@ impl KBucket {
vec![],
ConnectionType::NotConnected,
));
let len = self.nodes.len() - 1;
return KBucketEntry::Vacant(&mut self.nodes[len]);

let index: usize = self.nodes.len() - 1;
return KBucketEntry::Vacant(&mut self.nodes[index]);
}

for i in 0..self.nodes.len() {
match self.nodes[i].connection {
ConnectionType::NotConnected | ConnectionType::CannotConnect => {
return KBucketEntry::Vacant(&mut self.nodes[i]);
}
_ => continue,
}
if let Some(replace_candidate) = replace_candidate {
return KBucketEntry::Vacant(&mut self.nodes[replace_candidate]);
}

KBucketEntry::NoSlot
Expand Down
39 changes: 34 additions & 5 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
Direction, TransportEvent, TransportService,
},
substream::Substream,
transport::Endpoint,
types::SubstreamId,
PeerId,
};
Expand Down Expand Up @@ -194,13 +195,37 @@ impl Kademlia {
}

/// Connection established to remote peer.
fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
fn on_connection_established(&mut self, peer: PeerId, endpoint: Endpoint) -> crate::Result<()> {
tracing::trace!(target: LOG_TARGET, ?peer, "connection established");

match self.peers.entry(peer) {
Entry::Vacant(entry) => {
if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) {
entry.connection = ConnectionType::Connected;
match self.routing_table.entry(Key::from(peer)) {
KBucketEntry::Occupied(entry) => {
entry.connection = ConnectionType::Connected;

// Update the address if not already present.
if !entry.addresses.iter().any(|address| address == endpoint.address()) {
entry.addresses.push(endpoint.address().clone());
}
}
mut vacant @ KBucketEntry::Vacant(_) => {
// Can only insert a new peer if the routing table update mode is set to
// automatic.
//
// Otherwise, the user is responsible of adding the peer manually if it
// deems necessary.
if std::matches!(self.update_mode, RoutingTableUpdateMode::Automatic) {
vacant.insert(KademliaPeer::new(
peer,
vec![endpoint.address().clone()],
ConnectionType::Connected,
));
}
}
entry => {
tracing::debug!(target: LOG_TARGET, ?peer, ?entry, "failed to update routing table on connection");
}
}

let Some(actions) = self.pending_dials.remove(&peer) else {
Expand Down Expand Up @@ -263,6 +288,10 @@ impl Kademlia {
});
}

// Don't add the peer to the routing table into a vacant (or already disconnected) entry.
//
// Update the state if the peer could enter the kbucket during `add_known_peer` or
// `on_connection_established`.
if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) {
entry.connection = ConnectionType::NotConnected;
}
Expand Down Expand Up @@ -714,8 +743,8 @@ impl Kademlia {

tokio::select! {
event = self.service.next() => match event {
Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
if let Err(error) = self.on_connection_established(peer) {
Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => {
if let Err(error) = self.on_connection_established(peer, endpoint) {
tracing::debug!(target: LOG_TARGET, ?error, "failed to handle established connection");
}
}
Expand Down
20 changes: 17 additions & 3 deletions src/protocol/libp2p/kademlia/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,24 @@ impl RoutingTable {

match self.entry(Key::from(peer)) {
KBucketEntry::Occupied(entry) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
?connection,
?entry,
"Peer present into the routing table, overwriting entry",
);

entry.addresses = addresses;
entry.connection = connection;
}
mut entry @ KBucketEntry::Vacant(_) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
?connection,
"Adding peer to the routing table into a vacant entry",
);
entry.insert(KademliaPeer::new(peer, addresses, connection));
}
KBucketEntry::LocalNode => tracing::warn!(
Expand Down Expand Up @@ -445,7 +459,7 @@ mod tests {
entry.insert(KademliaPeer::new(
peer,
vec!["/ip6/::1/tcp/8888".parse().unwrap()],
ConnectionType::CanConnect,
ConnectionType::Connected,
));

// verify the node is still there
Expand All @@ -456,7 +470,7 @@ mod tests {
KBucketEntry::Occupied(&mut KademliaPeer::new(
peer,
addresses,
ConnectionType::CanConnect,
ConnectionType::Connected,
))
);
}
Expand Down Expand Up @@ -497,7 +511,7 @@ mod tests {
entry.insert(KademliaPeer::new(
peer,
vec!["/ip6/::1/tcp/8888".parse().unwrap()],
ConnectionType::CanConnect,
ConnectionType::Connected,
));
}

Expand Down
10 changes: 0 additions & 10 deletions src/protocol/libp2p/kademlia/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,6 @@ pub enum ConnectionType {

/// Sender is connected to the peer.
Connected,

/// Sender has recently been connected to the peer.
CanConnect,

/// Sender is unable to connect to the peer.
CannotConnect,
}

impl TryFrom<i32> for ConnectionType {
Expand All @@ -216,8 +210,6 @@ impl TryFrom<i32> for ConnectionType {
match value {
0 => Ok(ConnectionType::NotConnected),
1 => Ok(ConnectionType::Connected),
2 => Ok(ConnectionType::CanConnect),
3 => Ok(ConnectionType::CannotConnect),
_ => Err(()),
}
}
Expand All @@ -228,8 +220,6 @@ impl From<ConnectionType> for i32 {
match connection {
ConnectionType::NotConnected => 0,
ConnectionType::Connected => 1,
ConnectionType::CanConnect => 2,
ConnectionType::CannotConnect => 3,
}
}
}
Expand Down