Skip to content

Commit

Permalink
kad: Providers part 4: refresh local providers (#235)
Browse files Browse the repository at this point in the history
Republish local providers every 22 hours.
  • Loading branch information
dmitry-markin authored Sep 30, 2024
1 parent 41fa8e2 commit 6ffcdd2
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 118 deletions.
39 changes: 35 additions & 4 deletions src/protocol/libp2p/kademlia/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,21 @@ use crate::{
use multiaddr::Multiaddr;
use tokio::sync::mpsc::{channel, Receiver, Sender};

use std::{collections::HashMap, time::Duration};
use std::{
collections::HashMap,
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};

/// Default TTL for the records.
const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60);

/// Default provider record TTL.
const DEFAULT_PROVIDER_TTL: Duration = Duration::from_secs(48 * 60 * 60);

/// Default provider republish interval.
pub(super) const DEFAULT_PROVIDER_REFRESH_INTERVAL: Duration = Duration::from_secs(22 * 60 * 60);

/// Protocol name.
const PROTOCOL_NAME: &str = "/ipfs/kad/1.0.0";

Expand Down Expand Up @@ -74,11 +81,17 @@ pub struct Config {
/// Provider record TTL.
pub(super) provider_ttl: Duration,

/// Provider republish interval.
pub(super) provider_refresh_interval: Duration,

/// TX channel for sending events to `KademliaHandle`.
pub(super) event_tx: Sender<KademliaEvent>,

/// RX channel for receiving commands from `KademliaHandle`.
pub(super) cmd_rx: Receiver<KademliaCommand>,

/// Next query ID counter shared with the handle.
pub(super) next_query_id: Arc<AtomicUsize>,
}

impl Config {
Expand All @@ -90,9 +103,11 @@ impl Config {
validation_mode: IncomingRecordValidationMode,
record_ttl: Duration,
provider_ttl: Duration,
provider_refresh_interval: Duration,
) -> (Self, KademliaHandle) {
let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE);
let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE);
let next_query_id = Arc::new(AtomicUsize::new(0usize));

// if no protocol names were provided, use the default protocol
if protocol_names.is_empty() {
Expand All @@ -106,13 +121,15 @@ impl Config {
validation_mode,
record_ttl,
provider_ttl,
provider_refresh_interval,
codec: ProtocolCodec::UnsignedVarint(None),
replication_factor,
known_peers,
cmd_rx,
event_tx,
next_query_id: next_query_id.clone(),
},
KademliaHandle::new(cmd_tx, event_rx),
KademliaHandle::new(cmd_tx, event_rx, next_query_id),
)
}

Expand All @@ -126,6 +143,7 @@ impl Config {
IncomingRecordValidationMode::Automatic,
DEFAULT_TTL,
DEFAULT_PROVIDER_TTL,
DEFAULT_PROVIDER_REFRESH_INTERVAL,
)
}
}
Expand All @@ -151,8 +169,11 @@ pub struct ConfigBuilder {
/// Default TTL for the records.
pub(super) record_ttl: Duration,

/// Default TTL for the provider records.
/// TTL for the provider records.
pub(super) provider_ttl: Duration,

/// Republish interval for the provider records.
pub(super) provider_refresh_interval: Duration,
}

impl Default for ConfigBuilder {
Expand All @@ -172,6 +193,7 @@ impl ConfigBuilder {
validation_mode: IncomingRecordValidationMode::Automatic,
record_ttl: DEFAULT_TTL,
provider_ttl: DEFAULT_PROVIDER_TTL,
provider_refresh_interval: DEFAULT_PROVIDER_REFRESH_INTERVAL,
}
}

Expand Down Expand Up @@ -224,14 +246,22 @@ impl ConfigBuilder {
self
}

/// Set default TTL for the provider records. Recommended value is 2 * (refresh interval) + 20%.
/// Set TTL for the provider records. Recommended value is 2 * (refresh interval) + 10%.
///
/// If unspecified, the default TTL is 48 hours.
pub fn with_provider_record_ttl(mut self, provider_record_ttl: Duration) -> Self {
self.provider_ttl = provider_record_ttl;
self
}

/// Set the refresh (republish) interval for provider records.
///
/// If unspecified, the default interval is 22 hours.
pub fn with_provider_refresh_interval(mut self, provider_refresh_interval: Duration) -> Self {
self.provider_refresh_interval = provider_refresh_interval;
self
}

/// Build Kademlia [`Config`].
pub fn build(self) -> (Config, KademliaHandle) {
Config::new(
Expand All @@ -242,6 +272,7 @@ impl ConfigBuilder {
self.validation_mode,
self.record_ttl,
self.provider_ttl,
self.provider_refresh_interval,
)
}
}
58 changes: 7 additions & 51 deletions src/protocol/libp2p/kademlia/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{protocol::libp2p::kademlia::query::QueryId, substream::Substream, PeerId};
use crate::{
protocol::libp2p::kademlia::{futures_stream::FuturesStream, query::QueryId},
substream::Substream,
PeerId,
};

use bytes::{Bytes, BytesMut};
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
use futures::{future::BoxFuture, Stream, StreamExt};

use std::{
future::Future,
pin::Pin,
task::{Context, Poll, Waker},
task::{Context, Poll},
time::Duration,
};

Expand Down Expand Up @@ -71,53 +74,6 @@ pub struct QueryContext {
pub result: QueryResult,
}

/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically.
#[derive(Default)]
pub struct FuturesStream<F> {
futures: FuturesUnordered<F>,
waker: Option<Waker>,
}

impl<F> FuturesStream<F> {
/// Create new [`FuturesStream`].
pub fn new() -> Self {
Self {
futures: FuturesUnordered::new(),
waker: None,
}
}

/// Push a future for processing.
pub fn push(&mut self, future: F) {
self.futures.push(future);

if let Some(waker) = self.waker.take() {
waker.wake();
}
}
}

impl<F: Future> Stream for FuturesStream<F> {
type Item = <F as Future>::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else {
// We must save the current waker to wake up the task when new futures are inserted.
//
// Otherwise, simply returning `Poll::Pending` here would cause the task to never be
// woken up again.
//
// We were previously relying on some other task from the `loop tokio::select!` to
// finish.
self.waker = Some(cx.waker().clone());

return Poll::Pending;
};

Poll::Ready(Some(result))
}
}

/// Query executor.
pub struct QueryExecutor {
/// Pending futures.
Expand Down
76 changes: 76 additions & 0 deletions src/protocol/libp2p/kademlia/futures_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2024 litep2p developers
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::{stream::FuturesUnordered, Stream, StreamExt};

use std::{
future::Future,
pin::Pin,
task::{Context, Poll, Waker},
};

/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically.
/// The [`Stream`] implemented by [`FuturesStream`] never terminates and can be
/// polled when contains no futures.
#[derive(Default)]
pub struct FuturesStream<F> {
futures: FuturesUnordered<F>,
waker: Option<Waker>,
}

impl<F> FuturesStream<F> {
/// Create new [`FuturesStream`].
pub fn new() -> Self {
Self {
futures: FuturesUnordered::new(),
waker: None,
}
}

/// Push a future for processing.
pub fn push(&mut self, future: F) {
self.futures.push(future);

if let Some(waker) = self.waker.take() {
waker.wake();
}
}
}

impl<F: Future> Stream for FuturesStream<F> {
type Item = <F as Future>::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else {
// We must save the current waker to wake up the task when new futures are inserted.
//
// Otherwise, simply returning `Poll::Pending` here would cause the task to never be
// woken up again.
//
// We were previously relying on some other task from the `loop tokio::select!` to
// finish.
self.waker = Some(cx.waker().clone());

return Poll::Pending;
};

Poll::Ready(Some(result))
}
}
17 changes: 12 additions & 5 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ use tokio::sync::mpsc::{Receiver, Sender};
use std::{
num::NonZeroUsize,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
};

Expand Down Expand Up @@ -234,23 +238,26 @@ pub struct KademliaHandle {
event_rx: Receiver<KademliaEvent>,

/// Next query ID.
next_query_id: usize,
next_query_id: Arc<AtomicUsize>,
}

impl KademliaHandle {
/// Create new [`KademliaHandle`].
pub(super) fn new(cmd_tx: Sender<KademliaCommand>, event_rx: Receiver<KademliaEvent>) -> Self {
pub(super) fn new(
cmd_tx: Sender<KademliaCommand>,
event_rx: Receiver<KademliaEvent>,
next_query_id: Arc<AtomicUsize>,
) -> Self {
Self {
cmd_tx,
event_rx,
next_query_id: 0usize,
next_query_id,
}
}

/// Allocate next query ID.
fn next_query_id(&mut self) -> QueryId {
let query_id = self.next_query_id;
self.next_query_id += 1;
let query_id = self.next_query_id.fetch_add(1, Ordering::Relaxed);

QueryId(query_id)
}
Expand Down
Loading

0 comments on commit 6ffcdd2

Please sign in to comment.