From 8e724f054974941a29d4413ddc01ce9cde0f2289 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 3 Jun 2024 13:36:07 +0300 Subject: [PATCH] Support manual DHT record insertion (#135) Support manual DHT record insertion to comply with `libp2p` implementation. --- src/protocol/libp2p/kademlia/config.rs | 23 +- src/protocol/libp2p/kademlia/handle.rs | 39 +++ src/protocol/libp2p/kademlia/mod.rs | 24 +- src/protocol/libp2p/kademlia/store.rs | 16 ++ tests/protocol/kademlia.rs | 317 ++++++++++++++++++++++++- 5 files changed, 415 insertions(+), 4 deletions(-) diff --git a/src/protocol/libp2p/kademlia/config.rs b/src/protocol/libp2p/kademlia/config.rs index 887c4440..b5f3650d 100644 --- a/src/protocol/libp2p/kademlia/config.rs +++ b/src/protocol/libp2p/kademlia/config.rs @@ -21,7 +21,8 @@ use crate::{ codec::ProtocolCodec, protocol::libp2p::kademlia::handle::{ - KademliaCommand, KademliaEvent, KademliaHandle, RoutingTableUpdateMode, + IncomingRecordValidationMode, KademliaCommand, KademliaEvent, KademliaHandle, + RoutingTableUpdateMode, }, types::protocol::ProtocolName, PeerId, DEFAULT_CHANNEL_SIZE, @@ -59,6 +60,9 @@ pub struct Config { /// Routing table update mode. pub(super) update_mode: RoutingTableUpdateMode, + /// Incoming records validation mode. + pub(super) validation_mode: IncomingRecordValidationMode, + /// TX channel for sending events to `KademliaHandle`. pub(super) event_tx: Sender, @@ -72,6 +76,7 @@ impl Config { known_peers: HashMap>, mut protocol_names: Vec, update_mode: RoutingTableUpdateMode, + validation_mode: IncomingRecordValidationMode, ) -> (Self, KademliaHandle) { let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE); let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE); @@ -85,6 +90,7 @@ impl Config { Config { protocol_names, update_mode, + validation_mode, codec: ProtocolCodec::UnsignedVarint(None), replication_factor, known_peers, @@ -102,6 +108,7 @@ impl Config { HashMap::new(), Vec::new(), RoutingTableUpdateMode::Automatic, + IncomingRecordValidationMode::Automatic, ) } } @@ -115,6 +122,9 @@ pub struct ConfigBuilder { /// Routing table update mode. pub(super) update_mode: RoutingTableUpdateMode, + /// Incoming records validation mode. + pub(super) validation_mode: IncomingRecordValidationMode, + /// Known peers. pub(super) known_peers: HashMap>, @@ -136,6 +146,7 @@ impl ConfigBuilder { known_peers: HashMap::new(), protocol_names: Vec::new(), update_mode: RoutingTableUpdateMode::Automatic, + validation_mode: IncomingRecordValidationMode::Automatic, } } @@ -157,6 +168,15 @@ impl ConfigBuilder { self } + /// Set incoming records validation mode. + pub fn with_incoming_records_validation_mode( + mut self, + mode: IncomingRecordValidationMode, + ) -> Self { + self.validation_mode = mode; + self + } + /// Set Kademlia protocol names, overriding the default protocol name. /// /// The order of the protocol names signifies preference so if, for example, there are two @@ -178,6 +198,7 @@ impl ConfigBuilder { self.known_peers, self.protocol_names, self.update_mode, + self.validation_mode, ) } } diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index df63dcaf..5d3b4630 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -60,6 +60,17 @@ pub enum RoutingTableUpdateMode { Automatic, } +/// Incoming record validation mode. +#[derive(Debug, Copy, Clone)] +pub enum IncomingRecordValidationMode { + /// Don't insert incoming records automatically to the local DHT store + /// and let the user do that by calling [`KademliaHandle::store_record()`]. + Manual, + + /// Automatically accept all incoming records. + Automatic, +} + /// Kademlia commands. #[derive(Debug)] pub(crate) enum KademliaCommand { @@ -118,6 +129,12 @@ pub(crate) enum KademliaCommand { /// Query ID for the query. query_id: QueryId, }, + + /// Store record locally. + StoreRecord { + // Record. + record: Record, + }, } /// Kademlia events. @@ -171,6 +188,16 @@ pub enum KademliaEvent { /// Query ID. query_id: QueryId, }, + + /// Incoming `PUT_VALUE` request received. + /// + /// In case of using [`IncomingRecordValidationMode::Manual`] and successful validation + /// the record must be manually inserted into the local DHT store with + /// [`KademliaHandle::store_record()`]. + IncomingRecord { + /// Record. + record: Record, + }, } /// The type of the DHT records. @@ -272,6 +299,12 @@ impl KademliaHandle { query_id } + /// Store the record in the local store. Used in combination with + /// [`IncomingRecordValidationMode::Manual`]. + pub async fn store_record(&mut self, record: Record) { + let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await; + } + /// Try to add known peer and if the channel is clogged, return an error. pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec) -> Result<(), ()> { self.cmd_tx @@ -329,6 +362,12 @@ impl KademliaHandle { .map(|_| query_id) .map_err(|_| ()) } + + /// Try to store the record in the local store, and if the channel is clogged, return an error. + /// Used in combination with [`IncomingRecordValidationMode::Manual`]. + pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> { + self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ()) + } } impl Stream for KademliaHandle { diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 9a885ead..24f3f2fd 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -49,7 +49,9 @@ use std::collections::{hash_map::Entry, HashMap}; pub use self::handle::RecordsType; pub use config::{Config, ConfigBuilder}; -pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode}; +pub use handle::{ + IncomingRecordValidationMode, KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode, +}; pub use query::QueryId; pub use record::{Key as RecordKey, PeerRecord, Record}; @@ -142,6 +144,9 @@ pub(crate) struct Kademlia { /// Routing table update mode. update_mode: RoutingTableUpdateMode, + /// Incoming records validation mode. + validation_mode: IncomingRecordValidationMode, + /// Query engine. engine: QueryEngine, @@ -175,6 +180,7 @@ impl Kademlia { executor: QueryExecutor::new(), pending_substreams: HashMap::new(), update_mode: config.update_mode, + validation_mode: config.validation_mode, replication_factor: config.replication_factor, engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR), } @@ -418,7 +424,11 @@ impl Kademlia { "handle `PUT_VALUE` message", ); - self.store.put(record); + if let IncomingRecordValidationMode::Automatic = self.validation_mode { + self.store.put(record.clone()); + } + + let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await; } ref message @ KademliaMessage::GetRecord { ref key, @@ -844,6 +854,15 @@ impl Kademlia { self.service.add_known_address(&peer, addresses.into_iter()); } + Some(KademliaCommand::StoreRecord { record }) => { + tracing::debug!( + target: LOG_TARGET, + key = ?record.key, + "store record in local store", + ); + + self.store.put(record); + } None => return Err(Error::EssentialTaskClosed), } }, @@ -894,6 +913,7 @@ mod tests { codec: ProtocolCodec::UnsignedVarint(None), replication_factor: 20usize, update_mode: RoutingTableUpdateMode::Automatic, + validation_mode: IncomingRecordValidationMode::Automatic, event_tx, cmd_rx, }; diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index 8d3168c7..2785dfbf 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -25,6 +25,9 @@ use crate::protocol::libp2p::kademlia::record::{Key, Record}; use std::collections::{hash_map::Entry, HashMap}; +/// Logging target for the file. +const LOG_TARGET: &str = "litep2p::ipfs::kademlia::store"; + /// Memory store events. pub enum MemoryStoreEvent {} @@ -71,6 +74,14 @@ impl MemoryStore { /// Store record. pub fn put(&mut self, record: Record) { if record.value.len() >= self.config.max_record_size_bytes { + tracing::warn!( + target: LOG_TARGET, + key = ?record.key, + publisher = ?record.publisher, + size = record.value.len(), + max_size = self.config.max_record_size_bytes, + "discarding a DHT record that exceeds the configured size limit", + ); return; } @@ -91,6 +102,11 @@ impl MemoryStore { Entry::Vacant(entry) => { if len >= self.config.max_records { + tracing::warn!( + target: LOG_TARGET, + max_records = self.config.max_records, + "discarding a DHT record, because maximum memory store size reached", + ); return; } diff --git a/tests/protocol/kademlia.rs b/tests/protocol/kademlia.rs index 4d9ca922..3eb22722 100644 --- a/tests/protocol/kademlia.rs +++ b/tests/protocol/kademlia.rs @@ -24,7 +24,10 @@ use futures::StreamExt; use litep2p::{ config::ConfigBuilder, crypto::ed25519::Keypair, - protocol::libp2p::kademlia::{ConfigBuilder as KademliaConfigBuilder, RecordKey}, + protocol::libp2p::kademlia::{ + ConfigBuilder as KademliaConfigBuilder, IncomingRecordValidationMode, KademliaEvent, + PeerRecord, Quorum, Record, RecordKey, RecordsType, + }, transport::tcp::config::Config as TcpConfig, Litep2p, PeerId, }; @@ -122,3 +125,315 @@ async fn put_value() { // } // } } + +#[tokio::test] +async fn records_are_stored_automatically() { + let (kad_config1, mut kad_handle1) = KademliaConfigBuilder::new().build(); + let (kad_config2, mut kad_handle2) = KademliaConfigBuilder::new().build(); + + let config1 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config1) + .build(); + + let config2 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config2) + .build(); + + let mut litep2p1 = Litep2p::new(config1).unwrap(); + let mut litep2p2 = Litep2p::new(config2).unwrap(); + + kad_handle1 + .add_known_peer( + *litep2p2.local_peer_id(), + litep2p2.listen_addresses().cloned().collect(), + ) + .await; + + // Publish the record. + let record = Record::new(vec![1, 2, 3], vec![0x01]); + kad_handle1.put_record(record.clone()).await; + + loop { + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { + panic!("record was not stored in 10 secs") + } + _ = litep2p1.next_event() => {} + _ = litep2p2.next_event() => {} + _ = kad_handle1.next() => {} + event = kad_handle2.next() => { + match event { + Some(KademliaEvent::IncomingRecord { record: got_record }) => { + assert_eq!(got_record, record); + // Check if the record was stored. + let _ = kad_handle2 + .get_record(RecordKey::from(vec![1, 2, 3]), Quorum::One).await; + } + Some(KademliaEvent::GetRecordSuccess { query_id: _, records }) => { + match records { + RecordsType::LocalStore(got_record) => { + assert_eq!(got_record, record); + break + } + RecordsType::Network(_) => { + panic!("record was not stored locally") + } + } + } + _ => {} + } + } + } + } +} + +#[tokio::test] +async fn records_are_stored_manually() { + let (kad_config1, mut kad_handle1) = KademliaConfigBuilder::new() + .with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual) + .build(); + let (kad_config2, mut kad_handle2) = KademliaConfigBuilder::new() + .with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual) + .build(); + + let config1 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config1) + .build(); + + let config2 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config2) + .build(); + + let mut litep2p1 = Litep2p::new(config1).unwrap(); + let mut litep2p2 = Litep2p::new(config2).unwrap(); + + kad_handle1 + .add_known_peer( + *litep2p2.local_peer_id(), + litep2p2.listen_addresses().cloned().collect(), + ) + .await; + + // Publish the record. + let record = Record::new(vec![1, 2, 3], vec![0x01]); + kad_handle1.put_record(record.clone()).await; + + loop { + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { + panic!("record was not stored in 10 secs") + } + _ = litep2p1.next_event() => {} + _ = litep2p2.next_event() => {} + _ = kad_handle1.next() => {} + event = kad_handle2.next() => { + match event { + Some(KademliaEvent::IncomingRecord { record: got_record }) => { + assert_eq!(got_record, record); + kad_handle2.store_record(got_record).await; + + // Check if the record was stored. + let _ = kad_handle2 + .get_record(RecordKey::from(vec![1, 2, 3]), Quorum::One).await; + } + Some(KademliaEvent::GetRecordSuccess { query_id: _, records }) => { + match records { + RecordsType::LocalStore(got_record) => { + assert_eq!(got_record, record); + break + } + RecordsType::Network(_) => { + panic!("record was not stored locally") + } + } + } + _ => {} + } + } + } + } +} + +#[tokio::test] +async fn not_validated_records_are_not_stored() { + let (kad_config1, mut kad_handle1) = KademliaConfigBuilder::new() + .with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual) + .build(); + let (kad_config2, mut kad_handle2) = KademliaConfigBuilder::new() + .with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual) + .build(); + + let config1 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config1) + .build(); + + let config2 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config2) + .build(); + + let mut litep2p1 = Litep2p::new(config1).unwrap(); + let mut litep2p2 = Litep2p::new(config2).unwrap(); + + kad_handle1 + .add_known_peer( + *litep2p2.local_peer_id(), + litep2p2.listen_addresses().cloned().collect(), + ) + .await; + + // Publish the record. + let record = Record::new(vec![1, 2, 3], vec![0x01]); + kad_handle1.put_record(record.clone()).await; + + let mut get_record_query_id = None; + + loop { + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { + panic!("query has not failed in 10 secs") + } + event = litep2p1.next_event() => {} + event = litep2p2.next_event() => {} + event = kad_handle1.next() => {} + event = kad_handle2.next() => { + match event { + Some(KademliaEvent::IncomingRecord { record: got_record }) => { + assert_eq!(got_record, record); + // Do not call `kad_handle2.store_record(record).await`. + + // Check if the record was stored. + let query_id = kad_handle2 + .get_record(RecordKey::from(vec![1, 2, 3]), Quorum::One).await; + get_record_query_id = Some(query_id); + } + Some(KademliaEvent::GetRecordSuccess { query_id: _, records }) => { + match records { + RecordsType::LocalStore(_) => { + panic!("the record was added without validation") + } + RecordsType::Network(_) => break + } + } + Some(KademliaEvent::QueryFailed { query_id }) => { + assert_eq!(query_id, get_record_query_id.unwrap()); + break + } + _ => {} + } + } + } + } +} + +#[tokio::test] +async fn get_record_retrieves_remote_records() { + let (kad_config1, mut kad_handle1) = KademliaConfigBuilder::new() + .with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual) + .build(); + let (kad_config2, mut kad_handle2) = KademliaConfigBuilder::new() + .with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual) + .build(); + + let config1 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config1) + .build(); + + let config2 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config2) + .build(); + + let mut litep2p1 = Litep2p::new(config1).unwrap(); + let mut litep2p2 = Litep2p::new(config2).unwrap(); + + // Store the record on `litep2p1``. + let original_record = Record::new(vec![1, 2, 3], vec![0x01]); + let query1 = kad_handle1.put_record(original_record.clone()).await; + + let mut query2 = None; + + loop { + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { + panic!("record was not retrieved in 10 secs") + } + event = litep2p1.next_event() => {} + event = litep2p2.next_event() => {} + event = kad_handle1.next() => { + match event { + Some(KademliaEvent::QueryFailed { query_id }) => { + // Query failed, but the record was stored locally. + assert_eq!(query_id, query1); + + // Let peer2 know about peer1. + kad_handle2 + .add_known_peer( + *litep2p1.local_peer_id(), + litep2p1.listen_addresses().cloned().collect(), + ) + .await; + + // Let peer2 get record from peer1. + let query_id = kad_handle2 + .get_record(RecordKey::from(vec![1, 2, 3]), Quorum::One).await; + query2 = Some(query_id); + } + _ => {} + } + } + event = kad_handle2.next() => { + match event { + Some(KademliaEvent::GetRecordSuccess { query_id: _, records }) => { + match records { + RecordsType::LocalStore(_) => { + panic!("the record was unexpectedly added to peer2") + } + RecordsType::Network(records) => { + assert_eq!(records.len(), 1); + let PeerRecord { peer, record } = records.first().unwrap(); + assert_eq!(peer, litep2p1.local_peer_id()); + assert_eq!(record, &original_record); + break + } + } + } + Some(KademliaEvent::QueryFailed { query_id: _ }) => { + panic!("query failed") + } + _ => {} + } + } + } + } +}