Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support manual DHT record insertion #135

Merged
merged 6 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/protocol/libp2p/kademlia/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
use crate::{
codec::ProtocolCodec,
protocol::libp2p::kademlia::handle::{
KademliaCommand, KademliaEvent, KademliaHandle, RoutingTableUpdateMode,
IncomingRecordValidationMode, KademliaCommand, KademliaEvent, KademliaHandle,
RoutingTableUpdateMode,
},
types::protocol::ProtocolName,
PeerId, DEFAULT_CHANNEL_SIZE,
Expand Down Expand Up @@ -59,6 +60,9 @@ pub struct Config {
/// Routing table update mode.
pub(super) update_mode: RoutingTableUpdateMode,

/// Incoming records validation mode.
pub(super) validation_mode: IncomingRecordValidationMode,

/// TX channel for sending events to `KademliaHandle`.
pub(super) event_tx: Sender<KademliaEvent>,

Expand All @@ -72,6 +76,7 @@ impl Config {
known_peers: HashMap<PeerId, Vec<Multiaddr>>,
mut protocol_names: Vec<ProtocolName>,
update_mode: RoutingTableUpdateMode,
validation_mode: IncomingRecordValidationMode,
) -> (Self, KademliaHandle) {
let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE);
let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE);
Expand All @@ -85,6 +90,7 @@ impl Config {
Config {
protocol_names,
update_mode,
validation_mode,
codec: ProtocolCodec::UnsignedVarint(None),
replication_factor,
known_peers,
Expand All @@ -102,6 +108,7 @@ impl Config {
HashMap::new(),
Vec::new(),
RoutingTableUpdateMode::Automatic,
IncomingRecordValidationMode::Automatic,
)
}
}
Expand All @@ -115,6 +122,9 @@ pub struct ConfigBuilder {
/// Routing table update mode.
pub(super) update_mode: RoutingTableUpdateMode,

/// Incoming records validation mode.
pub(super) validation_mode: IncomingRecordValidationMode,

/// Known peers.
pub(super) known_peers: HashMap<PeerId, Vec<Multiaddr>>,

Expand All @@ -136,6 +146,7 @@ impl ConfigBuilder {
known_peers: HashMap::new(),
protocol_names: Vec::new(),
update_mode: RoutingTableUpdateMode::Automatic,
validation_mode: IncomingRecordValidationMode::Automatic,
}
}

Expand All @@ -157,6 +168,15 @@ impl ConfigBuilder {
self
}

/// Set incoming records validation mode.
pub fn with_incoming_records_validation_mode(
mut self,
mode: IncomingRecordValidationMode,
) -> Self {
self.validation_mode = mode;
self
}

/// Set Kademlia protocol names, overriding the default protocol name.
///
/// The order of the protocol names signifies preference so if, for example, there are two
Expand All @@ -178,6 +198,7 @@ impl ConfigBuilder {
self.known_peers,
self.protocol_names,
self.update_mode,
self.validation_mode,
)
}
}
39 changes: 39 additions & 0 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ pub enum RoutingTableUpdateMode {
Automatic,
}

/// Incoming record validation mode.
#[derive(Debug, Copy, Clone)]
pub enum IncomingRecordValidationMode {
/// Don't insert incoming records automatically to the local DHT store
/// and let the user do that by calling [`KademliaHandle::store_record()`].
Manual,

/// Automatically accept all incoming records.
Automatic,
}

/// Kademlia commands.
#[derive(Debug)]
pub(crate) enum KademliaCommand {
Expand Down Expand Up @@ -118,6 +129,12 @@ pub(crate) enum KademliaCommand {
/// Query ID for the query.
query_id: QueryId,
},

/// Store record locally.
StoreRecord {
// Record.
record: Record,
},
}

/// Kademlia events.
Expand Down Expand Up @@ -171,6 +188,16 @@ pub enum KademliaEvent {
/// Query ID.
query_id: QueryId,
},

/// Incoming `PUT_VALUE` request received.
///
/// In case of using [`IncomingRecordValidationMode::Manual`] and successful validation
/// the record must be manually inserted into the local DHT store with
/// [`KademliaHandle::store_record()`].
IncomingRecord {
/// Record.
record: Record,
},
}

/// The type of the DHT records.
Expand Down Expand Up @@ -272,6 +299,12 @@ impl KademliaHandle {
query_id
}

/// Store the record in the local store. Used in combination with
/// [`IncomingRecordValidationMode::Manual`].
pub async fn store_record(&mut self, record: Record) {
let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await;
}

/// Try to add known peer and if the channel is clogged, return an error.
pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec<Multiaddr>) -> Result<(), ()> {
self.cmd_tx
Expand Down Expand Up @@ -329,6 +362,12 @@ impl KademliaHandle {
.map(|_| query_id)
.map_err(|_| ())
}

/// Try to store the record in the local store, and if the channel is clogged, return an error.
/// Used in combination with [`IncomingRecordValidationMode::Manual`].
pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> {
self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ())
}
}

impl Stream for KademliaHandle {
Expand Down
24 changes: 22 additions & 2 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ use std::collections::{hash_map::Entry, HashMap};

pub use self::handle::RecordsType;
pub use config::{Config, ConfigBuilder};
pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode};
pub use handle::{
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode,
};
pub use query::QueryId;
pub use record::{Key as RecordKey, PeerRecord, Record};

Expand Down Expand Up @@ -142,6 +144,9 @@ pub(crate) struct Kademlia {
/// Routing table update mode.
update_mode: RoutingTableUpdateMode,

/// Incoming records validation mode.
validation_mode: IncomingRecordValidationMode,

/// Query engine.
engine: QueryEngine,

Expand Down Expand Up @@ -175,6 +180,7 @@ impl Kademlia {
executor: QueryExecutor::new(),
pending_substreams: HashMap::new(),
update_mode: config.update_mode,
validation_mode: config.validation_mode,
replication_factor: config.replication_factor,
engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR),
}
Expand Down Expand Up @@ -418,7 +424,11 @@ impl Kademlia {
"handle `PUT_VALUE` message",
);

self.store.put(record);
if let IncomingRecordValidationMode::Automatic = self.validation_mode {
self.store.put(record.clone());
}

let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await;
}
ref message @ KademliaMessage::GetRecord {
ref key,
Expand Down Expand Up @@ -844,6 +854,15 @@ impl Kademlia {
self.service.add_known_address(&peer, addresses.into_iter());

}
Some(KademliaCommand::StoreRecord { record }) => {
tracing::debug!(
target: LOG_TARGET,
key = ?record.key,
"store record in local store",
);

self.store.put(record);
}
None => return Err(Error::EssentialTaskClosed),
}
},
Expand Down Expand Up @@ -894,6 +913,7 @@ mod tests {
codec: ProtocolCodec::UnsignedVarint(None),
replication_factor: 20usize,
update_mode: RoutingTableUpdateMode::Automatic,
validation_mode: IncomingRecordValidationMode::Automatic,
event_tx,
cmd_rx,
};
Expand Down
16 changes: 16 additions & 0 deletions src/protocol/libp2p/kademlia/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use crate::protocol::libp2p::kademlia::record::{Key, Record};

use std::collections::{hash_map::Entry, HashMap};

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::store";

/// Memory store events.
pub enum MemoryStoreEvent {}

Expand Down Expand Up @@ -71,6 +74,14 @@ impl MemoryStore {
/// Store record.
pub fn put(&mut self, record: Record) {
if record.value.len() >= self.config.max_record_size_bytes {
tracing::warn!(
target: LOG_TARGET,
key = ?record.key,
publisher = ?record.publisher,
size = record.value.len(),
max_size = self.config.max_record_size_bytes,
"discarding a DHT record that exceeds the configured size limit",
);
return;
}

Expand All @@ -91,6 +102,11 @@ impl MemoryStore {

Entry::Vacant(entry) => {
if len >= self.config.max_records {
tracing::warn!(
target: LOG_TARGET,
max_records = self.config.max_records,
"discarding a DHT record, because maximum memory store size reached",
);
return;
}

Expand Down
Loading