From 104832058e337d5d90063f5ba4a53df33e6003fa Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 22 Aug 2024 15:15:19 +0000 Subject: [PATCH 1/4] Introduce `ADD_PROVIDER` query --- src/protocol/libp2p/kademlia/handle.rs | 36 ++++++- src/protocol/libp2p/kademlia/mod.rs | 113 +++++++++++++++++++--- src/protocol/libp2p/kademlia/query/mod.rs | 18 ++-- 3 files changed, 145 insertions(+), 22 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 5d3b4630..f1b4c218 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -130,6 +130,18 @@ pub(crate) enum KademliaCommand { query_id: QueryId, }, + /// Register as a content provider for `key`. + StartProviding { + /// Provided key. + key: RecordKey, + + /// Our external addresses to publish. + public_addresses: Vec, + + /// Query ID for the query. + query_id: QueryId, + }, + /// Store record locally. StoreRecord { // Record. @@ -175,7 +187,8 @@ pub enum KademliaEvent { }, /// `PUT_VALUE` query succeeded. - PutRecordSucess { + // TODO: this is never emitted. Implement + add `AddProviderSuccess`. + PutRecordSuccess { /// Query ID. query_id: QueryId, @@ -299,6 +312,27 @@ impl KademliaHandle { query_id } + /// Register as a content provider on the DHT. + /// + /// Register the local peer ID & its `public_addresses` as a provider for a given `key`. + pub async fn start_providing( + &mut self, + key: RecordKey, + public_addresses: Vec, + ) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx + .send(KademliaCommand::StartProviding { + key, + public_addresses, + query_id, + }) + .await; + + query_id + } + /// Store the record in the local store. Used in combination with /// [`IncomingRecordValidationMode::Manual`]. pub async fn store_record(&mut self, record: Record) { diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index b59a1fcf..c0f809e5 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -791,7 +791,11 @@ impl Kademlia { event = self.service.next() => match event { Some(TransportEvent::ConnectionEstablished { peer, .. }) => { if let Err(error) = self.on_connection_established(peer) { - tracing::debug!(target: LOG_TARGET, ?error, "failed to handle established connection"); + tracing::debug!( + target: LOG_TARGET, + ?error, + "failed to handle established connection", + ); } } Some(TransportEvent::ConnectionClosed { peer }) => { @@ -801,7 +805,10 @@ impl Kademlia { match direction { Direction::Inbound => self.on_inbound_substream(peer, substream).await, Direction::Outbound(substream_id) => { - if let Err(error) = self.on_outbound_substream(peer, substream_id, substream).await { + if let Err(error) = self + .on_outbound_substream(peer, substream_id, substream) + .await + { tracing::debug!( target: LOG_TARGET, ?peer, @@ -816,7 +823,8 @@ impl Kademlia { Some(TransportEvent::SubstreamOpenFailure { substream, error }) => { self.on_substream_open_failure(substream, error).await; } - Some(TransportEvent::DialFailure { peer, address }) => self.on_dial_failure(peer, address), + Some(TransportEvent::DialFailure { peer, address }) => + self.on_dial_failure(peer, address), None => return Err(Error::EssentialTaskClosed), }, context = self.executor.next() => { @@ -824,14 +832,32 @@ impl Kademlia { match result { QueryResult::SendSuccess { substream } => { - tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message sent to peer"); + tracing::trace!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + "message sent to peer", + ); let _ = substream.close().await; } QueryResult::ReadSuccess { substream, message } => { - tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message read from peer"); + tracing::trace!(target: LOG_TARGET, + ?peer, + query = ?query_id, + "message read from peer", + ); - if let Err(error) = self.on_message_received(peer, query_id, message, substream).await { - tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to process message"); + if let Err(error) = self.on_message_received( + peer, + query_id, + message, + substream + ).await { + tracing::debug!(target: LOG_TARGET, + ?peer, + ?error, + "failed to process message", + ); } } QueryResult::SubstreamClosed | QueryResult::Timeout => { @@ -850,22 +876,36 @@ impl Kademlia { command = self.cmd_rx.recv() => { match command { Some(KademliaCommand::FindNode { peer, query_id }) => { - tracing::debug!(target: LOG_TARGET, ?peer, query = ?query_id, "starting `FIND_NODE` query"); + tracing::debug!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + "starting `FIND_NODE` query", + ); self.engine.start_find_node( query_id, peer, - self.routing_table.closest(Key::from(peer), self.replication_factor).into() + self.routing_table + .closest(Key::from(peer), self.replication_factor) + .into() ); } Some(KademliaCommand::PutRecord { mut record, query_id }) => { - tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT"); + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + key = ?record.key, + "store record to DHT", + ); // For `PUT_VALUE` requests originating locally we are always the publisher. record.publisher = Some(self.local_key.clone().into_preimage()); // Make sure TTL is set. - record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); + record.expires = record + .expires + .or_else(|| Some(Instant::now() + self.record_ttl)); let key = Key::new(record.key.clone()); @@ -877,11 +917,23 @@ impl Kademlia { self.routing_table.closest(key, self.replication_factor).into(), ); } - Some(KademliaCommand::PutRecordToPeers { mut record, query_id, peers, update_local_store }) => { - tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT to specified peers"); + Some(KademliaCommand::PutRecordToPeers { + mut record, + query_id, + peers, + update_local_store, + }) => { + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + key = ?record.key, + "store record to DHT to specified peers", + ); // Make sure TTL is set. - record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); + record.expires = record + .expires + .or_else(|| Some(Instant::now() + self.record_ttl)); if update_local_store { self.store.put(record.clone()); @@ -895,7 +947,8 @@ impl Kademlia { match self.routing_table.entry(Key::from(peer)) { KBucketEntry::Occupied(entry) => Some(entry.clone()), - KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()), + KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => + Some(entry.clone()), _ => None, } }).collect(); @@ -906,6 +959,36 @@ impl Kademlia { peers, ); } + Some(KademliaCommand::StartProviding { + key, + public_addresses, + query_id + }) => { + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + ?key, + ?public_addresses, + "register as content provider" + ); + + let provider = ProviderRecord { + key: key.clone(), + provider: self.service.local_peer_id, + addresses: public_addresses, + expires: Instant::now() + self.provider_ttl, + }; + + self.store.put_provider(provider); + + self.engine.start_add_provider( + query_id, + provider, + self.routing_table + .closest(Key::new(key), self.replication_factor) + .into(), + ); + } Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index f29af805..da293556 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -25,7 +25,7 @@ use crate::{ find_node::{FindNodeConfig, FindNodeContext}, get_record::{GetRecordConfig, GetRecordContext}, }, - record::{Key as RecordKey, Record}, + record::{Key as RecordKey, ProviderRecord, Record}, types::{KademliaPeer, Key}, PeerRecord, Quorum, }, @@ -45,8 +45,6 @@ mod get_record; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query"; -// TODO: store record key instead of the actual record - /// Type representing a query ID. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct QueryId(pub usize); @@ -56,7 +54,7 @@ pub struct QueryId(pub usize); enum QueryType { /// `FIND_NODE` query. FindNode { - /// Context for the `FIND_NODE` query + /// Context for the `FIND_NODE` query. context: FindNodeContext, }, @@ -65,7 +63,7 @@ enum QueryType { /// Record that needs to be stored. record: Record, - /// Context for the `FIND_NODE` query + /// Context for the `FIND_NODE` query. context: FindNodeContext, }, @@ -83,6 +81,15 @@ enum QueryType { /// Context for the `GET_VALUE` query. context: GetRecordContext, }, + + /// `ADD_PROVIDER` query. + AddProvider { + /// Provider record that need to be stored. + provider: ProviderRecord, + + /// Context for the `FIND_NODE` query. + context: FindNodeConfig, + }, } /// Query action. @@ -131,7 +138,6 @@ pub enum QueryAction { records: Vec, }, - // TODO: remove /// Query succeeded. QuerySucceeded { /// ID of the query that succeeded. From 778078b52790e82d6674e73a07cc32be5353dd9f Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 23 Aug 2024 15:33:22 +0000 Subject: [PATCH 2/4] Execute `ADD_PROVIDER` query --- src/protocol/libp2p/kademlia/message.rs | 7 ++- src/protocol/libp2p/kademlia/mod.rs | 60 ++++++++++++++++++++-- src/protocol/libp2p/kademlia/query/mod.rs | 61 ++++++++++++++++++++++- 3 files changed, 120 insertions(+), 8 deletions(-) diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index 4f53fbc1..bba2b285 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -172,8 +172,7 @@ impl KademliaMessage { } /// Create `ADD_PROVIDER` message with `provider`. - #[allow(unused)] - pub fn add_provider(provider: ProviderRecord) -> Vec { + pub fn add_provider(provider: ProviderRecord) -> Bytes { let peer = KademliaPeer::new( provider.provider, provider.addresses, @@ -187,10 +186,10 @@ impl KademliaMessage { ..Default::default() }; - let mut buf = Vec::with_capacity(message.encoded_len()); + let mut buf = BytesMut::with_capacity(message.encoded_len()); message.encode(&mut buf).expect("Vec to provide needed capacity"); - buf + buf.freeze() } /// Create `GET_PROVIDERS` request for `key`. diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index c0f809e5..fc6373cd 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -83,13 +83,16 @@ mod schema { } /// Peer action. -#[derive(Debug)] +#[derive(Debug, Clone)] enum PeerAction { /// Send `FIND_NODE` message to peer. SendFindNode(QueryId), /// Send `PUT_VALUE` message to peer. SendPutValue(Bytes), + + /// Send `ADD_PROVIDER` message to peer. + SendAddProvider(Bytes), } /// Peer context. @@ -335,7 +338,12 @@ impl Kademlia { } } Some(PeerAction::SendPutValue(message)) => { - tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` response"); + tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message"); + + self.executor.send_message(peer, message, substream); + } + Some(PeerAction::SendAddProvider(message)) => { + tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message"); self.executor.send_message(peer, message, substream); } @@ -755,6 +763,52 @@ impl Kademlia { Ok(()) } + QueryAction::AddProviderToFoundNodes { provider, peers } => { + tracing::trace!( + target: LOG_TARGET, + provided_key = ?provider.key, + num_peers = ?peers.len(), + "add provider record to found peers", + ); + + let provided_key = provider.key.clone(); + let message = KademliaMessage::add_provider(provider); + let peer_action = PeerAction::SendAddProvider(message); + + for peer in peers { + match self.service.open_substream(peer.peer) { + Ok(substream_id) => { + self.pending_substreams.insert(substream_id, peer.peer); + self.peers + .entry(peer.peer) + .or_default() + .pending_actions + .insert(substream_id, peer_action.clone()); + } + Err(_) => match self.service.dial(&peer.peer) { + Ok(_) => match self.pending_dials.entry(peer.peer) { + Entry::Occupied(entry) => { + entry.into_mut().push(peer_action.clone()); + } + Entry::Vacant(entry) => { + entry.insert(vec![peer_action.clone()]); + } + }, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?provided_key, + ?error, + "failed to dial peer", + ) + } + }, + } + } + + Ok(()) + } QueryAction::GetRecordQueryDone { query_id, records } => { let _ = self .event_tx @@ -979,7 +1033,7 @@ impl Kademlia { expires: Instant::now() + self.provider_ttl, }; - self.store.put_provider(provider); + self.store.put_provider(provider.clone()); self.engine.start_add_provider( query_id, diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index da293556..34f6e84e 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -88,7 +88,7 @@ enum QueryType { provider: ProviderRecord, /// Context for the `FIND_NODE` query. - context: FindNodeConfig, + context: FindNodeContext, }, } @@ -129,6 +129,15 @@ pub enum QueryAction { peers: Vec, }, + /// Add the provider record to nodes closest to the target key. + AddProviderToFoundNodes { + /// Provider record. + provider: ProviderRecord, + + /// Peers for whom the `ADD_PROVIDER` must be sent to. + peers: Vec, + }, + /// `GET_VALUE` query succeeded. GetRecordQueryDone { /// Query ID. @@ -314,6 +323,41 @@ impl QueryEngine { query_id } + /// Start `ADD_PROVIDER` query. + pub fn start_add_provider( + &mut self, + query_id: QueryId, + provider: ProviderRecord, + candidates: VecDeque, + ) -> QueryId { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + ?provider, + num_peers = ?candidates.len(), + "start `ADD_PROVIDER` query", + ); + + let target = Key::new(provider.key.clone()); + let config = FindNodeConfig { + local_peer_id: self.local_peer_id, + replication_factor: self.replication_factor, + parallelism_factor: self.parallelism_factor, + query: query_id, + target, + }; + + self.queries.insert( + query_id, + QueryType::AddProvider { + provider, + context: FindNodeContext::new(config, candidates), + }, + ); + + query_id + } + /// Register response failure from a queried peer. pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure"); @@ -334,6 +378,9 @@ impl QueryEngine { Some(QueryType::GetRecord { context }) => { context.register_response_failure(peer); } + Some(QueryType::AddProvider { context, .. }) => { + context.register_response_failure(peer); + } } } @@ -369,6 +416,12 @@ impl QueryEngine { } _ => unreachable!(), }, + Some(QueryType::AddProvider { context, .. }) => match message { + KademliaMessage::FindNode { peers, .. } => { + context.register_response(peer, peers); + } + _ => unreachable!(), + }, } } @@ -385,6 +438,7 @@ impl QueryEngine { Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer), Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer), Some(QueryType::GetRecord { context }) => context.next_peer_action(peer), + Some(QueryType::AddProvider { context, .. }) => context.next_peer_action(peer), } } @@ -409,6 +463,10 @@ impl QueryEngine { query_id: context.config.query, records: context.found_records(), }, + QueryType::AddProvider { provider, context } => QueryAction::AddProviderToFoundNodes { + provider, + peers: context.responses.into_values().collect::>(), + }, } } @@ -428,6 +486,7 @@ impl QueryEngine { QueryType::PutRecord { context, .. } => context.next_action(), QueryType::PutRecordToPeers { context, .. } => context.next_action(), QueryType::GetRecord { context } => context.next_action(), + QueryType::AddProvider { context, .. } => context.next_action(), }; match action { From 5e97484bfc2f54b2477c563aea4b193480d7f90e Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 30 Aug 2024 14:48:13 +0000 Subject: [PATCH 3/4] Use getter `TransportService::local_peer_id()` instead of accessing directly --- src/protocol/libp2p/kademlia/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index a6135dbc..25976bdd 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1031,7 +1031,7 @@ impl Kademlia { let provider = ProviderRecord { key: key.clone(), - provider: self.service.local_peer_id, + provider: self.service.local_peer_id(), addresses: public_addresses, expires: Instant::now() + self.provider_ttl, }; From 57f17ce3890d4bda0484fc359efb884d05beb74d Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 5 Sep 2024 13:49:11 +0000 Subject: [PATCH 4/4] Use `open_substream_or_dial()` to add provider records to peers --- src/protocol/libp2p/kademlia/mod.rs | 41 +++++++++-------------------- 1 file changed, 12 insertions(+), 29 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 25976bdd..3d00c7f8 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -776,37 +776,20 @@ impl Kademlia { let provided_key = provider.key.clone(); let message = KademliaMessage::add_provider(provider); - let peer_action = PeerAction::SendAddProvider(message); for peer in peers { - match self.service.open_substream(peer.peer) { - Ok(substream_id) => { - self.pending_substreams.insert(substream_id, peer.peer); - self.peers - .entry(peer.peer) - .or_default() - .pending_actions - .insert(substream_id, peer_action.clone()); - } - Err(_) => match self.service.dial(&peer.peer) { - Ok(_) => match self.pending_dials.entry(peer.peer) { - Entry::Occupied(entry) => { - entry.into_mut().push(peer_action.clone()); - } - Entry::Vacant(entry) => { - entry.insert(vec![peer_action.clone()]); - } - }, - Err(error) => { - tracing::debug!( - target: LOG_TARGET, - ?peer, - ?provided_key, - ?error, - "failed to dial peer", - ) - } - }, + if let Err(error) = self.open_substream_or_dial( + peer.peer, + PeerAction::SendAddProvider(message.clone()), + None, + ) { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?provided_key, + ?error, + "failed to add provider record to peer", + ) } }