diff --git a/src/protocol/libp2p/kademlia/config.rs b/src/protocol/libp2p/kademlia/config.rs index 6a50c599..8a02a3e3 100644 --- a/src/protocol/libp2p/kademlia/config.rs +++ b/src/protocol/libp2p/kademlia/config.rs @@ -34,7 +34,10 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use std::{collections::HashMap, time::Duration}; /// Default TTL for the records. -const DEFAULT_TTL: u64 = 36 * 60 * 60; +const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60); + +/// Default provider record TTL. +const DEFAULT_PROVIDER_TTL: Duration = Duration::from_secs(48 * 60 * 60); /// Protocol name. const PROTOCOL_NAME: &str = "/ipfs/kad/1.0.0"; @@ -65,9 +68,12 @@ pub struct Config { /// Incoming records validation mode. pub(super) validation_mode: IncomingRecordValidationMode, - /// Default record TTl. + /// Default record TTL. pub(super) record_ttl: Duration, + /// Provider record TTL. + pub(super) provider_ttl: Duration, + /// TX channel for sending events to `KademliaHandle`. pub(super) event_tx: Sender, @@ -83,6 +89,7 @@ impl Config { update_mode: RoutingTableUpdateMode, validation_mode: IncomingRecordValidationMode, record_ttl: Duration, + provider_ttl: Duration, ) -> (Self, KademliaHandle) { let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE); let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE); @@ -98,6 +105,7 @@ impl Config { update_mode, validation_mode, record_ttl, + provider_ttl, codec: ProtocolCodec::UnsignedVarint(None), replication_factor, known_peers, @@ -116,7 +124,8 @@ impl Config { Vec::new(), RoutingTableUpdateMode::Automatic, IncomingRecordValidationMode::Automatic, - Duration::from_secs(DEFAULT_TTL), + DEFAULT_TTL, + DEFAULT_PROVIDER_TTL, ) } } @@ -141,6 +150,9 @@ pub struct ConfigBuilder { /// Default TTL for the records. pub(super) record_ttl: Duration, + + /// Default TTL for the provider records. + pub(super) provider_ttl: Duration, } impl Default for ConfigBuilder { @@ -158,7 +170,8 @@ impl ConfigBuilder { protocol_names: Vec::new(), update_mode: RoutingTableUpdateMode::Automatic, validation_mode: IncomingRecordValidationMode::Automatic, - record_ttl: Duration::from_secs(DEFAULT_TTL), + record_ttl: DEFAULT_TTL, + provider_ttl: DEFAULT_PROVIDER_TTL, } } @@ -211,6 +224,14 @@ impl ConfigBuilder { self } + /// Set default TTL for the provider records. Recommended value is 2 * (refresh interval) + 20%. + /// + /// If unspecified, the default TTL is 48 hours. + pub fn with_provider_record_ttl(mut self, provider_record_ttl: Duration) -> Self { + self.provider_ttl = provider_record_ttl; + self + } + /// Build Kademlia [`Config`]. pub fn build(self) -> (Config, KademliaHandle) { Config::new( @@ -220,6 +241,7 @@ impl ConfigBuilder { self.update_mode, self.validation_mode, self.record_ttl, + self.provider_ttl, ) } } diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index e0729aa8..4f53fbc1 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -20,9 +20,9 @@ use crate::{ protocol::libp2p::kademlia::{ - record::{Key as RecordKey, Record}, + record::{Key as RecordKey, ProviderRecord, Record}, schema, - types::KademliaPeer, + types::{ConnectionType, KademliaPeer}, }, PeerId, }; @@ -60,9 +60,31 @@ pub enum KademliaMessage { /// Record. record: Option, - /// Peers closest to key. + /// Peers closer to the key. peers: Vec, }, + + /// `ADD_PROVIDER` message. + AddProvider { + /// Key. + key: RecordKey, + + /// Peers, providing the data for `key`. Must contain exactly one peer matching the sender + /// of the message. + providers: Vec, + }, + + /// `GET_PROVIDERS` message. + GetProviders { + /// Key. `None` in response. + key: Option, + + /// Peers closer to the key. + peers: Vec, + + /// Peers, providing the data for `key`. + providers: Vec, + }, } impl KademliaMessage { @@ -149,10 +171,84 @@ impl KademliaMessage { buf } + /// Create `ADD_PROVIDER` message with `provider`. + #[allow(unused)] + pub fn add_provider(provider: ProviderRecord) -> Vec { + let peer = KademliaPeer::new( + provider.provider, + provider.addresses, + ConnectionType::CanConnect, // ignored by message recipient + ); + let message = schema::kademlia::Message { + key: provider.key.clone().to_vec(), + cluster_level_raw: 10, + r#type: schema::kademlia::MessageType::AddProvider.into(), + provider_peers: std::iter::once((&peer).into()).collect(), + ..Default::default() + }; + + let mut buf = Vec::with_capacity(message.encoded_len()); + message.encode(&mut buf).expect("Vec to provide needed capacity"); + + buf + } + + /// Create `GET_PROVIDERS` request for `key`. + #[allow(unused)] + pub fn get_providers_request(key: RecordKey) -> Vec { + let message = schema::kademlia::Message { + key: key.to_vec(), + cluster_level_raw: 10, + r#type: schema::kademlia::MessageType::GetProviders.into(), + ..Default::default() + }; + + let mut buf = Vec::with_capacity(message.encoded_len()); + message.encode(&mut buf).expect("Vec to provide needed capacity"); + + buf + } + + /// Create `GET_PROVIDERS` response. + pub fn get_providers_response( + key: RecordKey, + providers: Vec, + closer_peers: &[KademliaPeer], + ) -> Vec { + debug_assert!(providers.iter().all(|p| p.key == key)); + + let provider_peers = providers + .into_iter() + .map(|p| { + KademliaPeer::new( + p.provider, + p.addresses, + ConnectionType::CanConnect, // ignored by recipient + ) + }) + .map(|p| (&p).into()) + .collect(); + + let message = schema::kademlia::Message { + key: key.to_vec(), + cluster_level_raw: 10, + r#type: schema::kademlia::MessageType::GetProviders.into(), + closer_peers: closer_peers.iter().map(Into::into).collect(), + provider_peers, + ..Default::default() + }; + + let mut buf = Vec::with_capacity(message.encoded_len()); + message.encode(&mut buf).expect("Vec to provide needed capacity"); + + buf + } + /// Get [`KademliaMessage`] from bytes. pub fn from_bytes(bytes: BytesMut) -> Option { match schema::kademlia::Message::decode(bytes) { Ok(message) => match message.r#type { + // FIND_NODE 4 => { let peers = message .closer_peers @@ -165,6 +261,7 @@ impl KademliaMessage { peers, }) } + // PUT_VALUE 0 => { let record = message.record?; @@ -172,6 +269,7 @@ impl KademliaMessage { record: record_from_schema(record)?, }) } + // GET_VALUE 1 => { let key = match message.key.is_empty() { true => message.record.as_ref().and_then(|record| { @@ -196,8 +294,39 @@ impl KademliaMessage { .collect(), }) } - message => { - tracing::warn!(target: LOG_TARGET, ?message, "unhandled message"); + // ADD_PROVIDER + 2 => { + let key = (!message.key.is_empty()).then_some(message.key.into())?; + let providers = message + .provider_peers + .iter() + .filter_map(|peer| KademliaPeer::try_from(peer).ok()) + .collect(); + + Some(Self::AddProvider { key, providers }) + } + // GET_PROVIDERS + 3 => { + let key = (!message.key.is_empty()).then_some(message.key.into()); + let peers = message + .closer_peers + .iter() + .filter_map(|peer| KademliaPeer::try_from(peer).ok()) + .collect(); + let providers = message + .provider_peers + .iter() + .filter_map(|peer| KademliaPeer::try_from(peer).ok()) + .collect(); + + Some(Self::GetProviders { + key, + peers, + providers, + }) + } + message_type => { + tracing::warn!(target: LOG_TARGET, ?message_type, "unhandled message"); None } }, diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index b69689e6..b59a1fcf 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -29,6 +29,7 @@ use crate::{ handle::KademliaCommand, message::KademliaMessage, query::{QueryAction, QueryEngine}, + record::ProviderRecord, routing_table::RoutingTable, store::MemoryStore, types::{ConnectionType, KademliaPeer, Key}, @@ -153,6 +154,9 @@ pub(crate) struct Kademlia { /// Default record TTL. record_ttl: Duration, + /// Provider record TTL. + provider_ttl: Duration, + /// Query engine. engine: QueryEngine, @@ -188,6 +192,7 @@ impl Kademlia { update_mode: config.update_mode, validation_mode: config.validation_mode, record_ttl: config.record_ttl, + provider_ttl: config.provider_ttl, replication_factor: config.replication_factor, engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR), } @@ -482,7 +487,98 @@ impl Kademlia { target: LOG_TARGET, ?peer, ?message, - "both query and record key missing, unable to handle message", + "unable to handle `GET_RECORD` request with empty key", + ), + } + } + KademliaMessage::AddProvider { key, providers } => { + tracing::trace!( + target: LOG_TARGET, + ?peer, + ?key, + ?providers, + "handle `ADD_PROVIDER` message", + ); + + match (providers.len(), providers.first()) { + (1, Some(provider)) => + if provider.peer == peer { + self.store.put_provider(ProviderRecord { + key, + provider: peer, + addresses: provider.addresses.clone(), + expires: Instant::now() + self.provider_ttl, + }); + } else { + tracing::trace!( + target: LOG_TARGET, + publisher = ?peer, + provider = ?provider.peer, + "ignoring `ADD_PROVIDER` message with `publisher` != `provider`" + ) + }, + (n, _) => { + tracing::trace!( + target: LOG_TARGET, + publisher = ?peer, + ?n, + "ignoring `ADD_PROVIDER` message with `n` != 1 providers" + ) + } + } + } + ref message @ KademliaMessage::GetProviders { + ref key, + ref peers, + ref providers, + } => { + match (query_id, key) { + (Some(query_id), key) => { + // Note: key is not required, but can be non-empty. We just ignore it here. + tracing::trace!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + ?key, + ?peers, + ?providers, + "handle `GET_PROVIDERS` response", + ); + + // update routing table and inform user about the update + self.update_routing_table(peers).await; + + self.engine.register_response(query_id, peer, message.clone()); + } + (None, Some(key)) => { + tracing::trace!( + target: LOG_TARGET, + ?peer, + ?key, + "handle `GET_PROVIDERS` request", + ); + + let providers = self.store.get_providers(key); + // TODO: if local peer is among the providers, update its `ProviderRecord` + // to have up-to-date addresses. + // Requires https://github.com/paritytech/litep2p/issues/211. + + let closer_peers = self + .routing_table + .closest(Key::from(key.to_vec()), self.replication_factor); + + let message = KademliaMessage::get_providers_response( + key.clone(), + providers, + &closer_peers, + ); + self.executor.send_message(peer, message.into(), substream); + } + (None, None) => tracing::debug!( + target: LOG_TARGET, + ?peer, + ?message, + "unable to handle `GET_PROVIDERS` request with empty key", ), } } @@ -922,6 +1018,7 @@ mod tests { update_mode: RoutingTableUpdateMode::Automatic, validation_mode: IncomingRecordValidationMode::Automatic, record_ttl: Duration::from_secs(36 * 60 * 60), + provider_ttl: Duration::from_secs(48 * 60 * 60), event_tx, cmd_rx, }; diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index 1ff059fc..fced9372 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -125,7 +125,7 @@ impl MemoryStore { /// Try to get providers from local store for `key`. /// /// Returns a non-empty list of providers, if any. - pub fn get_providers(&mut self, key: &Key) -> Option<&Vec> { + pub fn get_providers(&mut self, key: &Key) -> Vec { let drop = self.provider_keys.get_mut(key).map_or(false, |providers| { let now = std::time::Instant::now(); providers.retain(|p| !p.is_expired(now)); @@ -136,9 +136,9 @@ impl MemoryStore { if drop { self.provider_keys.remove(key); - None + Vec::default() } else { - self.provider_keys.get(key) + self.provider_keys.get(key).cloned().unwrap_or_else(|| Vec::default()) } } @@ -175,8 +175,9 @@ impl MemoryStore { Entry::Occupied(mut entry) => { let mut providers = entry.get_mut(); - // Providers under every key are sorted by distance, with equal distances meaning - // peer IDs (more strictly, their hashes) are equal. + // Providers under every key are sorted by distance from the provided key, with + // equal distances meaning peer IDs (more strictly, their hashes) + // are equal. let provider_position = providers.binary_search_by(|p| p.distance().cmp(&provider_record.distance())); @@ -360,7 +361,7 @@ mod tests { }; store.put_provider(provider.clone()); - assert_eq!(store.get_providers(&provider.key).unwrap(), &vec![provider]); + assert_eq!(store.get_providers(&provider.key), vec![provider]); } #[test] @@ -383,7 +384,7 @@ mod tests { store.put_provider(provider1.clone()); store.put_provider(provider2.clone()); - let got_providers = store.get_providers(&key).unwrap(); + let got_providers = store.get_providers(&key); assert_eq!(got_providers.len(), 2); assert!(got_providers.contains(&provider1)); assert!(got_providers.contains(&provider2)); @@ -412,7 +413,7 @@ mod tests { providers }; - assert_eq!(store.get_providers(&key).unwrap(), &sorted_providers); + assert_eq!(store.get_providers(&key), sorted_providers); } #[test] @@ -434,7 +435,7 @@ mod tests { providers.iter().for_each(|p| { store.put_provider(p.clone()); }); - assert_eq!(store.get_providers(&key).unwrap().len(), 10); + assert_eq!(store.get_providers(&key).len(), 10); } #[test] @@ -464,7 +465,7 @@ mod tests { providers }; - assert_eq!(store.get_providers(&key).unwrap(), &closest_providers); + assert_eq!(store.get_providers(&key), closest_providers); } #[test] @@ -493,11 +494,11 @@ mod tests { for i in 0..10 { assert!(store.put_provider(sorted_providers[i].clone())); } - assert_eq!(store.get_providers(&key).unwrap(), &sorted_providers[..10]); + assert_eq!(store.get_providers(&key), sorted_providers[..10]); - // The fursests provider doesn't fit. + // The furthests provider doesn't fit. assert!(!store.put_provider(sorted_providers[10].clone())); - assert_eq!(store.get_providers(&key).unwrap(), &sorted_providers[..10]); + assert_eq!(store.get_providers(&key), sorted_providers[..10]); } #[test] @@ -529,7 +530,7 @@ mod tests { providers }; - assert_eq!(store.get_providers(&key).unwrap(), &sorted_providers); + assert_eq!(store.get_providers(&key), sorted_providers); let provider0_new = ProviderRecord { key: key.clone(), @@ -552,7 +553,7 @@ mod tests { }) .collect::>(); - assert_eq!(store.get_providers(&key).unwrap(), &providers_new); + assert_eq!(store.get_providers(&key), providers_new); } #[test] @@ -569,7 +570,7 @@ mod tests { assert!(provider.is_expired(std::time::Instant::now())); store.put_provider(provider.clone()); - assert_eq!(store.get_providers(&provider.key), None); + assert!(store.get_providers(&provider.key).is_empty()); } #[test] @@ -594,7 +595,7 @@ mod tests { store.put_provider(provider1.clone()); store.put_provider(provider2.clone()); - assert_eq!(store.get_providers(&key).unwrap(), &vec![provider2]); + assert_eq!(store.get_providers(&key), vec![provider2]); } #[test] @@ -619,7 +620,7 @@ mod tests { store.put_provider(provider); - let got_providers = store.get_providers(&key).unwrap(); + let got_providers = store.get_providers(&key); assert_eq!(got_providers.len(), 1); assert_eq!(got_providers.first().unwrap().key, key); assert_eq!(got_providers.first().unwrap().addresses.len(), 2); @@ -655,8 +656,8 @@ mod tests { assert!(store.put_provider(provider2.clone())); assert!(!store.put_provider(provider3.clone())); - assert_eq!(store.get_providers(&provider1.key), Some(&vec![provider1])); - assert_eq!(store.get_providers(&provider2.key), Some(&vec![provider2])); - assert_eq!(store.get_providers(&provider3.key), None); + assert_eq!(store.get_providers(&provider1.key), vec![provider1]); + assert_eq!(store.get_providers(&provider2.key), vec![provider2]); + assert_eq!(store.get_providers(&provider3.key), vec![]); } } diff --git a/src/protocol/libp2p/schema/kademlia.proto b/src/protocol/libp2p/schema/kademlia.proto index 83800f4f..f135a88d 100644 --- a/src/protocol/libp2p/schema/kademlia.proto +++ b/src/protocol/libp2p/schema/kademlia.proto @@ -85,6 +85,6 @@ message Message { repeated Peer closerPeers = 8; // Used to return Providers - // GET_VALUE, ADD_PROVIDER, GET_PROVIDERS + // ADD_PROVIDER, GET_PROVIDERS repeated Peer providerPeers = 9; }