Skip to content

Commit

Permalink
config: add a keep_alive_timeout configuration option
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Aug 13, 2024
1 parent 8f2eac8 commit 2cb399c
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 41 deletions.
18 changes: 16 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
},
transport::{
manager::limits::ConnectionLimitsConfig, tcp::config::Config as TcpConfig,
MAX_PARALLEL_DIALS,
KEEP_ALIVE_TIMEOUT, MAX_PARALLEL_DIALS,
},
types::protocol::ProtocolName,
PeerId,
Expand All @@ -45,7 +45,7 @@ use crate::transport::websocket::config::Config as WebSocketConfig;

use multiaddr::Multiaddr;

use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Duration};

/// Connection role.
#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -121,6 +121,9 @@ pub struct ConfigBuilder {

/// Connection limits config.
connection_limits: ConnectionLimitsConfig,

/// Close the connection if no substreams are open within this time frame.
keep_alive_timeout: Duration,
}

impl Default for ConfigBuilder {
Expand Down Expand Up @@ -153,6 +156,7 @@ impl ConfigBuilder {
request_response_protocols: HashMap::new(),
known_addresses: Vec::new(),
connection_limits: ConnectionLimitsConfig::default(),
keep_alive_timeout: KEEP_ALIVE_TIMEOUT,
}
}

Expand Down Expand Up @@ -268,6 +272,12 @@ impl ConfigBuilder {
self
}

/// Set keep alive timeout for connections.
pub fn with_keep_alive_timeout(mut self, timeout: Duration) -> Self {
self.keep_alive_timeout = timeout;
self
}

/// Build [`Litep2pConfig`].
pub fn build(mut self) -> Litep2pConfig {
let keypair = match self.keypair {
Expand Down Expand Up @@ -296,6 +306,7 @@ impl ConfigBuilder {
request_response_protocols: self.request_response_protocols,
known_addresses: self.known_addresses,
connection_limits: self.connection_limits,
keep_alive_timeout: self.keep_alive_timeout,
}
}
}
Expand Down Expand Up @@ -355,4 +366,7 @@ pub struct Litep2pConfig {

/// Connection limits config.
pub(crate) connection_limits: ConnectionLimitsConfig,

/// Close the connection if no substreams are open within this time frame.
pub(crate) keep_alive_timeout: Duration,
}
16 changes: 8 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use multihash::Multihash;
use transport::Endpoint;
use types::ConnectionId;

use std::{collections::HashSet, sync::Arc, time::Duration};
use std::{collections::HashSet, sync::Arc};

pub use bandwidth::BandwidthSink;
pub use error::Error;
Expand Down Expand Up @@ -171,7 +171,7 @@ impl Litep2p {
protocol,
config.fallback_names.clone(),
config.codec,
Duration::from_secs(5),
litep2p_config.keep_alive_timeout,
);
let executor = Arc::clone(&litep2p_config.executor);
litep2p_config.executor.run(Box::pin(async move {
Expand All @@ -191,7 +191,7 @@ impl Litep2p {
protocol,
config.fallback_names.clone(),
config.codec,
config.timeout,
litep2p_config.keep_alive_timeout,
);
litep2p_config.executor.run(Box::pin(async move {
RequestResponseProtocol::new(service, config).run().await
Expand All @@ -206,7 +206,7 @@ impl Litep2p {
protocol_name,
Vec::new(),
protocol.codec(),
Duration::from_secs(5),
litep2p_config.keep_alive_timeout,
);
litep2p_config.executor.run(Box::pin(async move {
let _ = protocol.run(service).await;
Expand All @@ -225,7 +225,7 @@ impl Litep2p {
ping_config.protocol.clone(),
Vec::new(),
ping_config.codec,
Duration::from_secs(5),
litep2p_config.keep_alive_timeout,
);
litep2p_config.executor.run(Box::pin(async move {
Ping::new(service, ping_config).run().await
Expand All @@ -248,7 +248,7 @@ impl Litep2p {
main_protocol.clone(),
fallback_names,
kademlia_config.codec,
Duration::from_secs(5),
litep2p_config.keep_alive_timeout,
);
litep2p_config.executor.run(Box::pin(async move {
let _ = Kademlia::new(service, kademlia_config).run().await;
Expand All @@ -269,7 +269,7 @@ impl Litep2p {
identify_config.protocol.clone(),
Vec::new(),
identify_config.codec,
Duration::from_secs(5),
litep2p_config.keep_alive_timeout,
);
identify_config.public = Some(litep2p_config.keypair.public().into());

Expand All @@ -289,7 +289,7 @@ impl Litep2p {
bitswap_config.protocol.clone(),
Vec::new(),
bitswap_config.codec,
Duration::from_secs(5),
litep2p_config.keep_alive_timeout,
);
litep2p_config.executor.run(Box::pin(async move {
Bitswap::new(service, bitswap_config).run().await
Expand Down
7 changes: 5 additions & 2 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,10 @@ mod tests {
use crate::{
codec::ProtocolCodec,
crypto::ed25519::Keypair,
transport::manager::{limits::ConnectionLimitsConfig, TransportManager},
transport::{
manager::{limits::ConnectionLimitsConfig, TransportManager},
KEEP_ALIVE_TIMEOUT,
},
types::protocol::ProtocolName,
BandwidthSink,
};
Expand Down Expand Up @@ -902,7 +905,7 @@ mod tests {
Vec::new(),
Default::default(),
handle,
std::time::Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
let (event_tx, event_rx) = channel(64);
let (_cmd_tx, cmd_rx) = channel(64);
Expand Down
7 changes: 5 additions & 2 deletions src/protocol/notification/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use crate::{
},
InnerTransportEvent, ProtocolCommand, TransportService,
},
transport::manager::{limits::ConnectionLimitsConfig, TransportManager},
transport::{
manager::{limits::ConnectionLimitsConfig, TransportManager},
KEEP_ALIVE_TIMEOUT,
},
types::protocol::ProtocolName,
BandwidthSink, PeerId,
};
Expand Down Expand Up @@ -63,7 +66,7 @@ fn make_notification_protocol() -> (
Vec::new(),
std::sync::Arc::new(Default::default()),
handle,
std::time::Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
let (config, handle) = NotificationConfig::new(
ProtocolName::from("/notif/1"),
Expand Down
7 changes: 5 additions & 2 deletions src/protocol/request_response/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use crate::{
InnerTransportEvent, TransportService,
},
substream::Substream,
transport::manager::{limits::ConnectionLimitsConfig, TransportManager},
transport::{
manager::{limits::ConnectionLimitsConfig, TransportManager},
KEEP_ALIVE_TIMEOUT,
},
types::{RequestId, SubstreamId},
BandwidthSink, Error, PeerId, ProtocolName,
};
Expand Down Expand Up @@ -61,7 +64,7 @@ fn protocol() -> (
Vec::new(),
std::sync::Arc::new(Default::default()),
handle,
std::time::Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
let (config, handle) =
ConfigBuilder::new(ProtocolName::from("/req/1")).with_max_size(1024).build();
Expand Down
35 changes: 19 additions & 16 deletions src/protocol/transport_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ pub struct TransportService {
next_substream_id: Arc<AtomicUsize>,

/// Close the connection if no substreams are open within this time frame.
keep_alive: Duration,
keep_alive_timeout: Duration,

/// Pending keep-alive timeouts.
keep_alive_timeouts: FuturesUnordered<BoxFuture<'static, (PeerId, ConnectionId)>>,
pending_keep_alive_timeouts: FuturesUnordered<BoxFuture<'static, (PeerId, ConnectionId)>>,
}

impl TransportService {
Expand All @@ -137,7 +137,7 @@ impl TransportService {
fallback_names: Vec<ProtocolName>,
next_substream_id: Arc<AtomicUsize>,
transport_handle: TransportManagerHandle,
keep_alive: Duration,
keep_alive_timeout: Duration,
) -> (Self, Sender<InnerTransportEvent>) {
let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE);

Expand All @@ -150,8 +150,8 @@ impl TransportService {
transport_handle,
next_substream_id,
connections: HashMap::new(),
keep_alive,
keep_alive_timeouts: FuturesUnordered::new(),
keep_alive_timeout: keep_alive_timeout,
pending_keep_alive_timeouts: FuturesUnordered::new(),
},
tx,
)
Expand All @@ -173,7 +173,7 @@ impl TransportService {
?connection_id,
"connection established",
);
let keep_alive = self.keep_alive;
let keep_alive_timeout = self.keep_alive_timeout;

match self.connections.get_mut(&peer) {
Some(context) => match context.secondary {
Expand All @@ -188,8 +188,8 @@ impl TransportService {
None
}
None => {
self.keep_alive_timeouts.push(Box::pin(async move {
tokio::time::sleep(keep_alive).await;
self.pending_keep_alive_timeouts.push(Box::pin(async move {
tokio::time::sleep(keep_alive_timeout).await;
(peer, connection_id)
}));
context.secondary = Some(handle);
Expand All @@ -199,8 +199,8 @@ impl TransportService {
},
None => {
self.connections.insert(peer, ConnectionContext::new(handle));
self.keep_alive_timeouts.push(Box::pin(async move {
tokio::time::sleep(keep_alive).await;
self.pending_keep_alive_timeouts.push(Box::pin(async move {
tokio::time::sleep(keep_alive_timeout).await;
(peer, connection_id)
}));

Expand Down Expand Up @@ -393,7 +393,7 @@ impl Stream for TransportService {
}

while let Poll::Ready(Some((peer, connection_id))) =
self.keep_alive_timeouts.poll_next_unpin(cx)
self.pending_keep_alive_timeouts.poll_next_unpin(cx)
{
if let Some(context) = self.connections.get_mut(&peer) {
tracing::trace!(
Expand All @@ -416,7 +416,10 @@ mod tests {
use super::*;
use crate::{
protocol::TransportService,
transport::manager::{handle::InnerTransportManagerCommand, TransportManagerHandle},
transport::{
manager::{handle::InnerTransportManagerCommand, TransportManagerHandle},
KEEP_ALIVE_TIMEOUT,
},
};
use futures::StreamExt;
use parking_lot::RwLock;
Expand Down Expand Up @@ -445,7 +448,7 @@ mod tests {
Vec::new(),
Arc::new(AtomicUsize::new(0usize)),
handle,
std::time::Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);

(service, sender, cmd_rx)
Expand Down Expand Up @@ -787,7 +790,7 @@ mod tests {
};

// verify the first connection state is correct
assert_eq!(service.keep_alive_timeouts.len(), 1);
assert_eq!(service.pending_keep_alive_timeouts.len(), 1);
match service.connections.get(&peer) {
Some(context) => {
assert_eq!(
Expand Down Expand Up @@ -822,7 +825,7 @@ mod tests {
// doesn't exist anymore
//
// the peer is removed because there is no connection to them
assert_eq!(service.keep_alive_timeouts.len(), 1);
assert_eq!(service.pending_keep_alive_timeouts.len(), 1);
assert!(service.connections.get(&peer).is_none());

// register new primary connection but verify that there are now two pending keep-alive
Expand Down Expand Up @@ -850,7 +853,7 @@ mod tests {
};

// verify the first connection state is correct
assert_eq!(service.keep_alive_timeouts.len(), 2);
assert_eq!(service.pending_keep_alive_timeouts.len(), 2);
match service.connections.get(&peer) {
Some(context) => {
assert_eq!(
Expand Down
Loading

0 comments on commit 2cb399c

Please sign in to comment.