Skip to content

Commit

Permalink
kad: Handle ADD_PROVIDER & GET_PROVIDERS network requests (#213)
Browse files Browse the repository at this point in the history
This implements the `ADD_PROVIDER` & `GET_PROVIDER` Kademlia RPC without
initiating the queries itself. This should already allow participating
in the DHT with content providers and serve all network requests, but
won't allow initiating queries (like getting the content provider &
starting providing) locally.

Implements "server side" of
#201.
  • Loading branch information
dmitry-markin authored Aug 22, 2024
1 parent 677e151 commit ba047f1
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 32 deletions.
30 changes: 26 additions & 4 deletions src/protocol/libp2p/kademlia/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use std::{collections::HashMap, time::Duration};

/// Default TTL for the records.
const DEFAULT_TTL: u64 = 36 * 60 * 60;
const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60);

/// Default provider record TTL.
const DEFAULT_PROVIDER_TTL: Duration = Duration::from_secs(48 * 60 * 60);

/// Protocol name.
const PROTOCOL_NAME: &str = "/ipfs/kad/1.0.0";
Expand Down Expand Up @@ -65,9 +68,12 @@ pub struct Config {
/// Incoming records validation mode.
pub(super) validation_mode: IncomingRecordValidationMode,

/// Default record TTl.
/// Default record TTL.
pub(super) record_ttl: Duration,

/// Provider record TTL.
pub(super) provider_ttl: Duration,

/// TX channel for sending events to `KademliaHandle`.
pub(super) event_tx: Sender<KademliaEvent>,

Expand All @@ -83,6 +89,7 @@ impl Config {
update_mode: RoutingTableUpdateMode,
validation_mode: IncomingRecordValidationMode,
record_ttl: Duration,
provider_ttl: Duration,
) -> (Self, KademliaHandle) {
let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE);
let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE);
Expand All @@ -98,6 +105,7 @@ impl Config {
update_mode,
validation_mode,
record_ttl,
provider_ttl,
codec: ProtocolCodec::UnsignedVarint(None),
replication_factor,
known_peers,
Expand All @@ -116,7 +124,8 @@ impl Config {
Vec::new(),
RoutingTableUpdateMode::Automatic,
IncomingRecordValidationMode::Automatic,
Duration::from_secs(DEFAULT_TTL),
DEFAULT_TTL,
DEFAULT_PROVIDER_TTL,
)
}
}
Expand All @@ -141,6 +150,9 @@ pub struct ConfigBuilder {

/// Default TTL for the records.
pub(super) record_ttl: Duration,

/// Default TTL for the provider records.
pub(super) provider_ttl: Duration,
}

impl Default for ConfigBuilder {
Expand All @@ -158,7 +170,8 @@ impl ConfigBuilder {
protocol_names: Vec::new(),
update_mode: RoutingTableUpdateMode::Automatic,
validation_mode: IncomingRecordValidationMode::Automatic,
record_ttl: Duration::from_secs(DEFAULT_TTL),
record_ttl: DEFAULT_TTL,
provider_ttl: DEFAULT_PROVIDER_TTL,
}
}

Expand Down Expand Up @@ -211,6 +224,14 @@ impl ConfigBuilder {
self
}

/// Set default TTL for the provider records. Recommended value is 2 * (refresh interval) + 20%.
///
/// If unspecified, the default TTL is 48 hours.
pub fn with_provider_record_ttl(mut self, provider_record_ttl: Duration) -> Self {
self.provider_ttl = provider_record_ttl;
self
}

/// Build Kademlia [`Config`].
pub fn build(self) -> (Config, KademliaHandle) {
Config::new(
Expand All @@ -220,6 +241,7 @@ impl ConfigBuilder {
self.update_mode,
self.validation_mode,
self.record_ttl,
self.provider_ttl,
)
}
}
139 changes: 134 additions & 5 deletions src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

use crate::{
protocol::libp2p::kademlia::{
record::{Key as RecordKey, Record},
record::{Key as RecordKey, ProviderRecord, Record},
schema,
types::KademliaPeer,
types::{ConnectionType, KademliaPeer},
},
PeerId,
};
Expand Down Expand Up @@ -60,9 +60,31 @@ pub enum KademliaMessage {
/// Record.
record: Option<Record>,

/// Peers closest to key.
/// Peers closer to the key.
peers: Vec<KademliaPeer>,
},

/// `ADD_PROVIDER` message.
AddProvider {
/// Key.
key: RecordKey,

/// Peers, providing the data for `key`. Must contain exactly one peer matching the sender
/// of the message.
providers: Vec<KademliaPeer>,
},

/// `GET_PROVIDERS` message.
GetProviders {
/// Key. `None` in response.
key: Option<RecordKey>,

/// Peers closer to the key.
peers: Vec<KademliaPeer>,

/// Peers, providing the data for `key`.
providers: Vec<KademliaPeer>,
},
}

impl KademliaMessage {
Expand Down Expand Up @@ -149,10 +171,84 @@ impl KademliaMessage {
buf
}

/// Create `ADD_PROVIDER` message with `provider`.
#[allow(unused)]
pub fn add_provider(provider: ProviderRecord) -> Vec<u8> {
let peer = KademliaPeer::new(
provider.provider,
provider.addresses,
ConnectionType::CanConnect, // ignored by message recipient
);
let message = schema::kademlia::Message {
key: provider.key.clone().to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::AddProvider.into(),
provider_peers: std::iter::once((&peer).into()).collect(),
..Default::default()
};

let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");

buf
}

/// Create `GET_PROVIDERS` request for `key`.
#[allow(unused)]
pub fn get_providers_request(key: RecordKey) -> Vec<u8> {
let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetProviders.into(),
..Default::default()
};

let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");

buf
}

/// Create `GET_PROVIDERS` response.
pub fn get_providers_response(
key: RecordKey,
providers: Vec<ProviderRecord>,
closer_peers: &[KademliaPeer],
) -> Vec<u8> {
debug_assert!(providers.iter().all(|p| p.key == key));

let provider_peers = providers
.into_iter()
.map(|p| {
KademliaPeer::new(
p.provider,
p.addresses,
ConnectionType::CanConnect, // ignored by recipient
)
})
.map(|p| (&p).into())
.collect();

let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetProviders.into(),
closer_peers: closer_peers.iter().map(Into::into).collect(),
provider_peers,
..Default::default()
};

let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");

buf
}

/// Get [`KademliaMessage`] from bytes.
pub fn from_bytes(bytes: BytesMut) -> Option<Self> {
match schema::kademlia::Message::decode(bytes) {
Ok(message) => match message.r#type {
// FIND_NODE
4 => {
let peers = message
.closer_peers
Expand All @@ -165,13 +261,15 @@ impl KademliaMessage {
peers,
})
}
// PUT_VALUE
0 => {
let record = message.record?;

Some(Self::PutValue {
record: record_from_schema(record)?,
})
}
// GET_VALUE
1 => {
let key = match message.key.is_empty() {
true => message.record.as_ref().and_then(|record| {
Expand All @@ -196,8 +294,39 @@ impl KademliaMessage {
.collect(),
})
}
message => {
tracing::warn!(target: LOG_TARGET, ?message, "unhandled message");
// ADD_PROVIDER
2 => {
let key = (!message.key.is_empty()).then_some(message.key.into())?;
let providers = message
.provider_peers
.iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.collect();

Some(Self::AddProvider { key, providers })
}
// GET_PROVIDERS
3 => {
let key = (!message.key.is_empty()).then_some(message.key.into());
let peers = message
.closer_peers
.iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.collect();
let providers = message
.provider_peers
.iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.collect();

Some(Self::GetProviders {
key,
peers,
providers,
})
}
message_type => {
tracing::warn!(target: LOG_TARGET, ?message_type, "unhandled message");
None
}
},
Expand Down
Loading

0 comments on commit ba047f1

Please sign in to comment.