Skip to content

Commit

Permalink
chore: add KEEP_ALIVE_TIMEOUT const, and some other changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Aug 13, 2024
1 parent 4ecff68 commit 32d8fc3
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 34 deletions.
6 changes: 3 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
},
transport::{
manager::limits::ConnectionLimitsConfig, tcp::config::Config as TcpConfig,
MAX_PARALLEL_DIALS,
KEEP_ALIVE_TIMEOUT, MAX_PARALLEL_DIALS,
},
types::protocol::ProtocolName,
PeerId,
Expand Down Expand Up @@ -156,7 +156,7 @@ impl ConfigBuilder {
request_response_protocols: HashMap::new(),
known_addresses: Vec::new(),
connection_limits: ConnectionLimitsConfig::default(),
keep_alive_timeout: Duration::from_secs(5),
keep_alive_timeout: KEEP_ALIVE_TIMEOUT,
}
}

Expand Down Expand Up @@ -273,7 +273,7 @@ impl ConfigBuilder {
}

/// Set keep alive timeout for connections.
pub fn set_keep_alive_timeout(mut self, timeout: Duration) -> Self {
pub fn with_keep_alive_timeout(mut self, timeout: Duration) -> Self {
self.keep_alive_timeout = timeout;
self
}
Expand Down
7 changes: 5 additions & 2 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,10 @@ mod tests {
use crate::{
codec::ProtocolCodec,
crypto::ed25519::Keypair,
transport::manager::{limits::ConnectionLimitsConfig, TransportManager},
transport::{
manager::{limits::ConnectionLimitsConfig, TransportManager},
KEEP_ALIVE_TIMEOUT,
},
types::protocol::ProtocolName,
BandwidthSink,
};
Expand Down Expand Up @@ -902,7 +905,7 @@ mod tests {
Vec::new(),
Default::default(),
handle,
std::time::Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
let (event_tx, event_rx) = channel(64);
let (_cmd_tx, cmd_rx) = channel(64);
Expand Down
7 changes: 5 additions & 2 deletions src/protocol/notification/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use crate::{
},
InnerTransportEvent, ProtocolCommand, TransportService,
},
transport::manager::{limits::ConnectionLimitsConfig, TransportManager},
transport::{
manager::{limits::ConnectionLimitsConfig, TransportManager},
KEEP_ALIVE_TIMEOUT,
},
types::protocol::ProtocolName,
BandwidthSink, PeerId,
};
Expand Down Expand Up @@ -63,7 +66,7 @@ fn make_notification_protocol() -> (
Vec::new(),
std::sync::Arc::new(Default::default()),
handle,
std::time::Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
let (config, handle) = NotificationConfig::new(
ProtocolName::from("/notif/1"),
Expand Down
7 changes: 5 additions & 2 deletions src/protocol/request_response/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use crate::{
InnerTransportEvent, TransportService,
},
substream::Substream,
transport::manager::{limits::ConnectionLimitsConfig, TransportManager},
transport::{
manager::{limits::ConnectionLimitsConfig, TransportManager},
KEEP_ALIVE_TIMEOUT,
},
types::{RequestId, SubstreamId},
BandwidthSink, Error, PeerId, ProtocolName,
};
Expand Down Expand Up @@ -61,7 +64,7 @@ fn protocol() -> (
Vec::new(),
std::sync::Arc::new(Default::default()),
handle,
std::time::Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
let (config, handle) =
ConfigBuilder::new(ProtocolName::from("/req/1")).with_max_size(1024).build();
Expand Down
35 changes: 19 additions & 16 deletions src/protocol/transport_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ pub struct TransportService {
next_substream_id: Arc<AtomicUsize>,

/// Close the connection if no substreams are open within this time frame.
keep_alive: Duration,
keep_alive_timeout: Duration,

/// Pending keep-alive timeouts.
keep_alive_timeouts: FuturesUnordered<BoxFuture<'static, (PeerId, ConnectionId)>>,
pending_keep_alive_timeouts: FuturesUnordered<BoxFuture<'static, (PeerId, ConnectionId)>>,
}

impl TransportService {
Expand All @@ -137,7 +137,7 @@ impl TransportService {
fallback_names: Vec<ProtocolName>,
next_substream_id: Arc<AtomicUsize>,
transport_handle: TransportManagerHandle,
keep_alive: Duration,
keep_alive_timeout: Duration,
) -> (Self, Sender<InnerTransportEvent>) {
let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE);

Expand All @@ -150,8 +150,8 @@ impl TransportService {
transport_handle,
next_substream_id,
connections: HashMap::new(),
keep_alive,
keep_alive_timeouts: FuturesUnordered::new(),
keep_alive_timeout: keep_alive_timeout,
pending_keep_alive_timeouts: FuturesUnordered::new(),
},
tx,
)
Expand All @@ -173,7 +173,7 @@ impl TransportService {
?connection_id,
"connection established",
);
let keep_alive = self.keep_alive;
let keep_alive_timeout = self.keep_alive_timeout;

match self.connections.get_mut(&peer) {
Some(context) => match context.secondary {
Expand All @@ -188,8 +188,8 @@ impl TransportService {
None
}
None => {
self.keep_alive_timeouts.push(Box::pin(async move {
tokio::time::sleep(keep_alive).await;
self.pending_keep_alive_timeouts.push(Box::pin(async move {
tokio::time::sleep(keep_alive_timeout).await;
(peer, connection_id)
}));
context.secondary = Some(handle);
Expand All @@ -199,8 +199,8 @@ impl TransportService {
},
None => {
self.connections.insert(peer, ConnectionContext::new(handle));
self.keep_alive_timeouts.push(Box::pin(async move {
tokio::time::sleep(keep_alive).await;
self.pending_keep_alive_timeouts.push(Box::pin(async move {
tokio::time::sleep(keep_alive_timeout).await;
(peer, connection_id)
}));

Expand Down Expand Up @@ -393,7 +393,7 @@ impl Stream for TransportService {
}

while let Poll::Ready(Some((peer, connection_id))) =
self.keep_alive_timeouts.poll_next_unpin(cx)
self.pending_keep_alive_timeouts.poll_next_unpin(cx)
{
if let Some(context) = self.connections.get_mut(&peer) {
tracing::trace!(
Expand All @@ -416,7 +416,10 @@ mod tests {
use super::*;
use crate::{
protocol::TransportService,
transport::manager::{handle::InnerTransportManagerCommand, TransportManagerHandle},
transport::{
manager::{handle::InnerTransportManagerCommand, TransportManagerHandle},
KEEP_ALIVE_TIMEOUT,
},
};
use futures::StreamExt;
use parking_lot::RwLock;
Expand Down Expand Up @@ -445,7 +448,7 @@ mod tests {
Vec::new(),
Arc::new(AtomicUsize::new(0usize)),
handle,
std::time::Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);

(service, sender, cmd_rx)
Expand Down Expand Up @@ -787,7 +790,7 @@ mod tests {
};

// verify the first connection state is correct
assert_eq!(service.keep_alive_timeouts.len(), 1);
assert_eq!(service.pending_keep_alive_timeouts.len(), 1);
match service.connections.get(&peer) {
Some(context) => {
assert_eq!(
Expand Down Expand Up @@ -822,7 +825,7 @@ mod tests {
// doesn't exist anymore
//
// the peer is removed because there is no connection to them
assert_eq!(service.keep_alive_timeouts.len(), 1);
assert_eq!(service.pending_keep_alive_timeouts.len(), 1);
assert!(service.connections.get(&peer).is_none());

// register new primary connection but verify that there are now two pending keep-alive
Expand Down Expand Up @@ -850,7 +853,7 @@ mod tests {
};

// verify the first connection state is correct
assert_eq!(service.keep_alive_timeouts.len(), 2);
assert_eq!(service.pending_keep_alive_timeouts.len(), 2);
match service.connections.get(&peer) {
Some(context) => {
assert_eq!(
Expand Down
20 changes: 11 additions & 9 deletions src/transport/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl TransportManager {
protocol: ProtocolName,
fallback_names: Vec<ProtocolName>,
codec: ProtocolCodec,
keep_alive: Duration,
keep_alive_timeout: Duration,
) -> TransportService {
assert!(!self.protocol_names.contains(&protocol));

Expand All @@ -339,7 +339,7 @@ impl TransportManager {
fallback_names.clone(),
self.next_substream_id.clone(),
self.transport_manager_handle.clone(),
keep_alive,
keep_alive_timeout,
);

self.protocols.insert(
Expand Down Expand Up @@ -1759,7 +1759,9 @@ mod tests {

use super::*;
use crate::{
crypto::ed25519::Keypair, executor::DefaultExecutor, transport::dummy::DummyTransport,
crypto::ed25519::Keypair,
executor::DefaultExecutor,
transport::{dummy::DummyTransport, KEEP_ALIVE_TIMEOUT},
};
use std::{
net::{Ipv4Addr, Ipv6Addr},
Expand Down Expand Up @@ -1796,13 +1798,13 @@ mod tests {
ProtocolName::from("/notif/1"),
Vec::new(),
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
manager.register_protocol(
ProtocolName::from("/notif/1"),
Vec::new(),
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
}

Expand All @@ -1823,7 +1825,7 @@ mod tests {
ProtocolName::from("/notif/1"),
Vec::new(),
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
manager.register_protocol(
ProtocolName::from("/notif/2"),
Expand All @@ -1832,7 +1834,7 @@ mod tests {
ProtocolName::from("/notif/1"),
],
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
}

Expand All @@ -1856,7 +1858,7 @@ mod tests {
ProtocolName::from("/notif/1"),
],
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
manager.register_protocol(
ProtocolName::from("/notif/2"),
Expand All @@ -1865,7 +1867,7 @@ mod tests {
ProtocolName::from("/notif/1/new"),
],
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
KEEP_ALIVE_TIMEOUT,
);
}

Expand Down
3 changes: 3 additions & 0 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub(crate) const CONNECTION_OPEN_TIMEOUT: Duration = Duration::from_secs(10);
/// Timeout for opening a substream.
pub(crate) const SUBSTREAM_OPEN_TIMEOUT: Duration = Duration::from_secs(5);

/// Timeout for connection waiting new substreams.
pub(crate) const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(5);

/// Maximum number of parallel dial attempts.
pub(crate) const MAX_PARALLEL_DIALS: usize = 8;

Expand Down

0 comments on commit 32d8fc3

Please sign in to comment.