Skip to content

Commit

Permalink
Support manual DHT record insertion (#135)
Browse files Browse the repository at this point in the history
Support manual DHT record insertion to comply with `libp2p`
implementation.
  • Loading branch information
dmitry-markin authored Jun 3, 2024
1 parent ec31635 commit 8e724f0
Show file tree
Hide file tree
Showing 5 changed files with 415 additions and 4 deletions.
23 changes: 22 additions & 1 deletion src/protocol/libp2p/kademlia/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
use crate::{
codec::ProtocolCodec,
protocol::libp2p::kademlia::handle::{
KademliaCommand, KademliaEvent, KademliaHandle, RoutingTableUpdateMode,
IncomingRecordValidationMode, KademliaCommand, KademliaEvent, KademliaHandle,
RoutingTableUpdateMode,
},
types::protocol::ProtocolName,
PeerId, DEFAULT_CHANNEL_SIZE,
Expand Down Expand Up @@ -59,6 +60,9 @@ pub struct Config {
/// Routing table update mode.
pub(super) update_mode: RoutingTableUpdateMode,

/// Incoming records validation mode.
pub(super) validation_mode: IncomingRecordValidationMode,

/// TX channel for sending events to `KademliaHandle`.
pub(super) event_tx: Sender<KademliaEvent>,

Expand All @@ -72,6 +76,7 @@ impl Config {
known_peers: HashMap<PeerId, Vec<Multiaddr>>,
mut protocol_names: Vec<ProtocolName>,
update_mode: RoutingTableUpdateMode,
validation_mode: IncomingRecordValidationMode,
) -> (Self, KademliaHandle) {
let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE);
let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE);
Expand All @@ -85,6 +90,7 @@ impl Config {
Config {
protocol_names,
update_mode,
validation_mode,
codec: ProtocolCodec::UnsignedVarint(None),
replication_factor,
known_peers,
Expand All @@ -102,6 +108,7 @@ impl Config {
HashMap::new(),
Vec::new(),
RoutingTableUpdateMode::Automatic,
IncomingRecordValidationMode::Automatic,
)
}
}
Expand All @@ -115,6 +122,9 @@ pub struct ConfigBuilder {
/// Routing table update mode.
pub(super) update_mode: RoutingTableUpdateMode,

/// Incoming records validation mode.
pub(super) validation_mode: IncomingRecordValidationMode,

/// Known peers.
pub(super) known_peers: HashMap<PeerId, Vec<Multiaddr>>,

Expand All @@ -136,6 +146,7 @@ impl ConfigBuilder {
known_peers: HashMap::new(),
protocol_names: Vec::new(),
update_mode: RoutingTableUpdateMode::Automatic,
validation_mode: IncomingRecordValidationMode::Automatic,
}
}

Expand All @@ -157,6 +168,15 @@ impl ConfigBuilder {
self
}

/// Set incoming records validation mode.
pub fn with_incoming_records_validation_mode(
mut self,
mode: IncomingRecordValidationMode,
) -> Self {
self.validation_mode = mode;
self
}

/// Set Kademlia protocol names, overriding the default protocol name.
///
/// The order of the protocol names signifies preference so if, for example, there are two
Expand All @@ -178,6 +198,7 @@ impl ConfigBuilder {
self.known_peers,
self.protocol_names,
self.update_mode,
self.validation_mode,
)
}
}
39 changes: 39 additions & 0 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ pub enum RoutingTableUpdateMode {
Automatic,
}

/// Incoming record validation mode.
#[derive(Debug, Copy, Clone)]
pub enum IncomingRecordValidationMode {
/// Don't insert incoming records automatically to the local DHT store
/// and let the user do that by calling [`KademliaHandle::store_record()`].
Manual,

/// Automatically accept all incoming records.
Automatic,
}

/// Kademlia commands.
#[derive(Debug)]
pub(crate) enum KademliaCommand {
Expand Down Expand Up @@ -118,6 +129,12 @@ pub(crate) enum KademliaCommand {
/// Query ID for the query.
query_id: QueryId,
},

/// Store record locally.
StoreRecord {
// Record.
record: Record,
},
}

/// Kademlia events.
Expand Down Expand Up @@ -171,6 +188,16 @@ pub enum KademliaEvent {
/// Query ID.
query_id: QueryId,
},

/// Incoming `PUT_VALUE` request received.
///
/// In case of using [`IncomingRecordValidationMode::Manual`] and successful validation
/// the record must be manually inserted into the local DHT store with
/// [`KademliaHandle::store_record()`].
IncomingRecord {
/// Record.
record: Record,
},
}

/// The type of the DHT records.
Expand Down Expand Up @@ -272,6 +299,12 @@ impl KademliaHandle {
query_id
}

/// Store the record in the local store. Used in combination with
/// [`IncomingRecordValidationMode::Manual`].
pub async fn store_record(&mut self, record: Record) {
let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await;
}

/// Try to add known peer and if the channel is clogged, return an error.
pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec<Multiaddr>) -> Result<(), ()> {
self.cmd_tx
Expand Down Expand Up @@ -329,6 +362,12 @@ impl KademliaHandle {
.map(|_| query_id)
.map_err(|_| ())
}

/// Try to store the record in the local store, and if the channel is clogged, return an error.
/// Used in combination with [`IncomingRecordValidationMode::Manual`].
pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> {
self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ())
}
}

impl Stream for KademliaHandle {
Expand Down
24 changes: 22 additions & 2 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ use std::collections::{hash_map::Entry, HashMap};

pub use self::handle::RecordsType;
pub use config::{Config, ConfigBuilder};
pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode};
pub use handle::{
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode,
};
pub use query::QueryId;
pub use record::{Key as RecordKey, PeerRecord, Record};

Expand Down Expand Up @@ -142,6 +144,9 @@ pub(crate) struct Kademlia {
/// Routing table update mode.
update_mode: RoutingTableUpdateMode,

/// Incoming records validation mode.
validation_mode: IncomingRecordValidationMode,

/// Query engine.
engine: QueryEngine,

Expand Down Expand Up @@ -175,6 +180,7 @@ impl Kademlia {
executor: QueryExecutor::new(),
pending_substreams: HashMap::new(),
update_mode: config.update_mode,
validation_mode: config.validation_mode,
replication_factor: config.replication_factor,
engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR),
}
Expand Down Expand Up @@ -418,7 +424,11 @@ impl Kademlia {
"handle `PUT_VALUE` message",
);

self.store.put(record);
if let IncomingRecordValidationMode::Automatic = self.validation_mode {
self.store.put(record.clone());
}

let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await;
}
ref message @ KademliaMessage::GetRecord {
ref key,
Expand Down Expand Up @@ -844,6 +854,15 @@ impl Kademlia {
self.service.add_known_address(&peer, addresses.into_iter());

}
Some(KademliaCommand::StoreRecord { record }) => {
tracing::debug!(
target: LOG_TARGET,
key = ?record.key,
"store record in local store",
);

self.store.put(record);
}
None => return Err(Error::EssentialTaskClosed),
}
},
Expand Down Expand Up @@ -894,6 +913,7 @@ mod tests {
codec: ProtocolCodec::UnsignedVarint(None),
replication_factor: 20usize,
update_mode: RoutingTableUpdateMode::Automatic,
validation_mode: IncomingRecordValidationMode::Automatic,
event_tx,
cmd_rx,
};
Expand Down
16 changes: 16 additions & 0 deletions src/protocol/libp2p/kademlia/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use crate::protocol::libp2p::kademlia::record::{Key, Record};

use std::collections::{hash_map::Entry, HashMap};

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::store";

/// Memory store events.
pub enum MemoryStoreEvent {}

Expand Down Expand Up @@ -71,6 +74,14 @@ impl MemoryStore {
/// Store record.
pub fn put(&mut self, record: Record) {
if record.value.len() >= self.config.max_record_size_bytes {
tracing::warn!(
target: LOG_TARGET,
key = ?record.key,
publisher = ?record.publisher,
size = record.value.len(),
max_size = self.config.max_record_size_bytes,
"discarding a DHT record that exceeds the configured size limit",
);
return;
}

Expand All @@ -91,6 +102,11 @@ impl MemoryStore {

Entry::Vacant(entry) => {
if len >= self.config.max_records {
tracing::warn!(
target: LOG_TARGET,
max_records = self.config.max_records,
"discarding a DHT record, because maximum memory store size reached",
);
return;
}

Expand Down
Loading

0 comments on commit 8e724f0

Please sign in to comment.