From 7ee5a6518c523397c196bc11a006b26f8eeed942 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 5 Aug 2024 19:05:36 +0300 Subject: [PATCH 01/12] transport: Accept / reject pending connection methods Signed-off-by: Alexandru Vasile --- src/transport/mod.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 1885a641..667c869e 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -118,6 +118,11 @@ pub(crate) enum TransportEvent { endpoint: Endpoint, }, + PendingInboundConnection { + /// Connection ID. + connection_id: ConnectionId, + }, + /// Connection opened to remote but not yet negotiated. ConnectionOpened { /// Connection ID. @@ -173,6 +178,12 @@ pub(crate) trait Transport: Stream + Unpin + Send { /// Accept negotiated connection. fn accept(&mut self, connection_id: ConnectionId) -> crate::Result<()>; + /// Accept pending connection. + fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()>; + + /// Reject pending connection. + fn reject_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()>; + /// Reject negotiated connection. fn reject(&mut self, connection_id: ConnectionId) -> crate::Result<()>; From b7dc82ff6bf4eeecb76a5e6b41ad234f57008b43 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 5 Aug 2024 19:06:04 +0300 Subject: [PATCH 02/12] manager/limits: Check inbound connection limits Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index b6838e8e..cb7e89b6 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -101,6 +101,17 @@ impl ConnectionLimits { Ok(usize::MAX) } + /// Called before accepting a new incoming connection. + pub fn on_incoming(&mut self) -> Result<(), ConnectionLimitsError> { + if let Some(max_incoming_connections) = self.config.max_incoming_connections { + if self.incoming_connections.len() >= max_incoming_connections { + return Err(ConnectionLimitsError::MaxIncomingConnectionsExceeded); + } + } + + Ok(()) + } + /// Called when a new connection is established. pub fn on_connection_established( &mut self, From dbb3facf25a55e86453261cab837e116fb6d4834 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 5 Aug 2024 19:06:26 +0300 Subject: [PATCH 03/12] manager: Accept/reject pending incoming Signed-off-by: Alexandru Vasile --- src/transport/manager/mod.rs | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index be732b1f..48ddbaf1 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -834,6 +834,11 @@ impl TransportManager { } } + fn on_pending_incoming_connection(&mut self) -> crate::Result<()> { + self.connection_limits.on_incoming()?; + Ok(()) + } + /// Handle closed connection. fn on_connection_closed( &mut self, @@ -1708,7 +1713,34 @@ impl TransportManager { } Ok(None) => {} } - } + }, + TransportEvent::PendingInboundConnection { connection_id } => { + if self.on_pending_incoming_connection().is_ok() { + tracing::trace!( + target: LOG_TARGET, + ?connection_id, + "accept pending incoming connection", + ); + + let _ = self + .transports + .get_mut(&transport) + .expect("transport to exist") + .accept_pending(connection_id); + } else { + tracing::debug!( + target: LOG_TARGET, + ?connection_id, + "reject pending incoming connection", + ); + + let _ = self + .transports + .get_mut(&transport) + .expect("transport to exist") + .reject_pending(connection_id); + } + }, event => panic!("event not supported: {event:?}"), } }, From ef0a8168999b075d940ac1035665331d9b59f7f4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 5 Aug 2024 19:06:57 +0300 Subject: [PATCH 04/12] transports: Implement accept/reject pending for supported transports Signed-off-by: Alexandru Vasile --- src/transport/dummy.rs | 8 +++ src/transport/quic/mod.rs | 54 ++++++++++++---- src/transport/tcp/mod.rs | 52 ++++++++++++++- src/transport/webrtc/mod.rs | 24 +++++++ src/transport/websocket/mod.rs | 114 ++++++++++++++++++++++++--------- 5 files changed, 206 insertions(+), 46 deletions(-) diff --git a/src/transport/dummy.rs b/src/transport/dummy.rs index b7fd0aa1..f8c07571 100644 --- a/src/transport/dummy.rs +++ b/src/transport/dummy.rs @@ -77,6 +77,14 @@ impl Transport for DummyTransport { Ok(()) } + fn accept_pending(&mut self, _connection_id: ConnectionId) -> crate::Result<()> { + Ok(()) + } + + fn reject_pending(&mut self, _connection_id: ConnectionId) -> crate::Result<()> { + Ok(()) + } + fn reject(&mut self, _: ConnectionId) -> crate::Result<()> { Ok(()) } diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index 5972993f..0dc3a202 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -36,7 +36,7 @@ use crate::{ use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; use multiaddr::{Multiaddr, Protocol}; -use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout}; +use quinn::{ClientConfig, Connecting, Connection, Endpoint, IdleTimeout}; use std::{ collections::{HashMap, HashSet}, @@ -80,6 +80,9 @@ pub(crate) struct QuicTransport { /// Pending dials. pending_dials: HashMap, + /// Pending inbound connections. + pending_inbound_connections: HashMap, + /// Pending connections. pending_connections: FuturesUnordered)>>, @@ -110,6 +113,22 @@ impl QuicTransport { Some(p2p_cert.peer_id()) } + /// Handle inbound accepted connection. + fn on_inbound_connection(&mut self, connection_id: ConnectionId, connection: Connecting) { + self.pending_connections.push(Box::pin(async move { + let connection = match connection.await { + Ok(connection) => connection, + Err(error) => return (connection_id, Err(error.into())), + }; + + let Some(peer) = Self::extract_peer_id(&connection) else { + return (connection_id, Err(Error::InvalidCertificate)); + }; + + (connection_id, Ok(NegotiatedConnection { peer, connection })) + })); + } + /// Handle established connection. fn on_connection_established( &mut self, @@ -193,6 +212,7 @@ impl TransportBuilder for QuicTransport { opened_raw: HashMap::new(), pending_open: HashMap::new(), pending_dials: HashMap::new(), + pending_inbound_connections: HashMap::new(), pending_raw_connections: FuturesUnordered::new(), pending_connections: FuturesUnordered::new(), }, @@ -291,6 +311,23 @@ impl Transport for QuicTransport { .map_or(Err(Error::ConnectionDoesntExist(connection_id)), |_| Ok(())) } + fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> { + let connection = self + .pending_inbound_connections + .remove(&connection_id) + .ok_or(Error::ConnectionDoesntExist(connection_id))?; + + self.on_inbound_connection(connection_id, connection); + + Ok(()) + } + + fn reject_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> { + self.pending_inbound_connections + .remove(&connection_id) + .map_or(Err(Error::ConnectionDoesntExist(connection_id)), |_| Ok(())) + } + fn open( &mut self, connection_id: ConnectionId, @@ -412,20 +449,13 @@ impl Stream for QuicTransport { tracing::trace!( target: LOG_TARGET, ?connection_id, - "accept connection", + "pending inbound connection", ); - self.pending_connections.push(Box::pin(async move { - let connection = match connection.await { - Ok(connection) => connection, - Err(error) => return (connection_id, Err(error.into())), - }; - - let Some(peer) = Self::extract_peer_id(&connection) else { - return (connection_id, Err(Error::InvalidCertificate)); - }; + self.pending_inbound_connections.insert(connection_id, connection); - (connection_id, Ok(NegotiatedConnection { peer, connection })) + return Poll::Ready(Some(TransportEvent::PendingInboundConnection { + connection_id, })); } diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 62cc00d9..ecee6cee 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -62,6 +62,14 @@ pub mod config; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::tcp"; +/// Pending inbound connection. +struct PendingInboundConnection { + /// Socket address of the remote peer. + connection: TcpStream, + /// Address of the remote peer. + address: SocketAddr, +} + /// TCP transport. pub(crate) struct TcpTransport { /// Transport context. @@ -79,6 +87,9 @@ pub(crate) struct TcpTransport { /// Dial addresses. dial_addresses: DialAddresses, + /// Pending inbound connections. + pending_inbound_connections: HashMap, + /// Pending opening connections. pending_connections: FuturesUnordered>>, @@ -101,8 +112,12 @@ pub(crate) struct TcpTransport { impl TcpTransport { /// Handle inbound TCP connection. - fn on_inbound_connection(&mut self, connection: TcpStream, address: SocketAddr) { - let connection_id = self.context.next_connection_id(); + fn on_inbound_connection( + &mut self, + connection_id: ConnectionId, + connection: TcpStream, + address: SocketAddr, + ) { let yamux_config = self.config.yamux_config.clone(); let max_read_ahead_factor = self.config.noise_read_ahead_frame_count; let max_write_buffer_size = self.config.noise_write_buffer_size; @@ -228,6 +243,7 @@ impl TransportBuilder for TcpTransport { opened_raw: HashMap::new(), pending_open: HashMap::new(), pending_dials: HashMap::new(), + pending_inbound_connections: HashMap::new(), pending_connections: FuturesUnordered::new(), pending_raw_connections: FuturesUnordered::new(), }, @@ -309,6 +325,23 @@ impl Transport for TcpTransport { Ok(()) } + fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> { + let pending = self + .pending_inbound_connections + .remove(&connection_id) + .ok_or(Error::ConnectionDoesntExist(connection_id))?; + + self.on_inbound_connection(connection_id, pending.connection, pending.address); + + Ok(()) + } + + fn reject_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> { + self.pending_inbound_connections + .remove(&connection_id) + .map_or(Err(Error::ConnectionDoesntExist(connection_id)), |_| Ok(())) + } + fn reject(&mut self, connection_id: ConnectionId) -> crate::Result<()> { self.canceled.insert(connection_id); self.pending_open @@ -423,7 +456,19 @@ impl Stream for TcpTransport { match event { None | Some(Err(_)) => return Poll::Ready(None), Some(Ok((connection, address))) => { - self.on_inbound_connection(connection, address); + let connection_id = self.context.next_connection_id(); + + self.pending_inbound_connections.insert( + connection_id, + PendingInboundConnection { + connection, + address, + }, + ); + + return Poll::Ready(Some(TransportEvent::PendingInboundConnection { + connection_id, + })); } } } @@ -620,6 +665,7 @@ mod tests { TransportEvent::DialFailure { .. } => {} TransportEvent::ConnectionOpened { .. } => {} TransportEvent::OpenFailure { .. } => {} + TransportEvent::PendingInboundConnection { .. } => {} } } }); diff --git a/src/transport/webrtc/mod.rs b/src/transport/webrtc/mod.rs index 2e9fcb78..10ee59ad 100644 --- a/src/transport/webrtc/mod.rs +++ b/src/transport/webrtc/mod.rs @@ -499,6 +499,30 @@ impl Transport for WebRtcTransport { Err(Error::NotSupported("webrtc cannot dial peers".to_string())) } + fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> { + tracing::trace!( + target: LOG_TARGET, + ?connection_id, + "webrtc cannot accept pending connections", + ); + + Err(Error::NotSupported( + "webrtc cannot accept pending connections".to_string(), + )) + } + + fn reject_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> { + tracing::trace!( + target: LOG_TARGET, + ?connection_id, + "webrtc cannot reject pending connections", + ); + + Err(Error::NotSupported( + "webrtc cannot reject pending connections".to_string(), + )) + } + fn accept(&mut self, connection_id: ConnectionId) -> crate::Result<()> { tracing::trace!( target: LOG_TARGET, diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index cc2e3740..51a4c103 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -39,6 +39,7 @@ use crate::{ use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; use multiaddr::{Multiaddr, Protocol}; use socket2::{Domain, Socket, Type}; +use std::net::SocketAddr; use tokio::net::TcpStream; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; @@ -80,6 +81,14 @@ impl WebSocketError { /// Logging target for the file. const LOG_TARGET: &str = "litep2p::websocket"; +/// Pending inbound connection. +struct PendingInboundConnection { + /// Socket address of the remote peer. + connection: TcpStream, + /// Address of the remote peer. + address: SocketAddr, +} + /// WebSocket transport. pub(crate) struct WebSocketTransport { /// Transport context. @@ -97,6 +106,9 @@ pub(crate) struct WebSocketTransport { /// Pending dials. pending_dials: HashMap, + /// Pending inbound connections. + pending_inbound_connections: HashMap, + /// Pending connections. pending_connections: FuturesUnordered>>, @@ -127,6 +139,46 @@ pub(crate) struct WebSocketTransport { } impl WebSocketTransport { + /// Handle inbound connection. + fn on_inbound_connection( + &mut self, + connection_id: ConnectionId, + connection: TcpStream, + address: SocketAddr, + ) { + let keypair = self.context.keypair.clone(); + let yamux_config = self.config.yamux_config.clone(); + let connection_open_timeout = self.config.connection_open_timeout; + let max_read_ahead_factor = self.config.noise_read_ahead_frame_count; + let max_write_buffer_size = self.config.noise_write_buffer_size; + let address = Multiaddr::empty() + .with(Protocol::from(address.ip())) + .with(Protocol::Tcp(address.port())) + .with(Protocol::Ws(std::borrow::Cow::Owned("/".to_string()))); + + self.pending_connections.push(Box::pin(async move { + match tokio::time::timeout(connection_open_timeout, async move { + WebSocketConnection::accept_connection( + connection, + connection_id, + keypair, + address, + yamux_config, + max_read_ahead_factor, + max_write_buffer_size, + ) + .await + .map_err(|error| WebSocketError::new(error, None)) + }) + .await + { + Err(_) => Err(WebSocketError::new(Error::Timeout, None)), + Ok(Err(error)) => Err(error), + Ok(Ok(result)) => Ok(result), + } + })); + } + /// Convert `Multiaddr` into `url::Url` fn multiaddr_into_url(address: Multiaddr) -> crate::Result<(Url, PeerId)> { let mut protocol_stack = address.iter(); @@ -282,6 +334,7 @@ impl TransportBuilder for WebSocketTransport { opened_raw: HashMap::new(), pending_open: HashMap::new(), pending_dials: HashMap::new(), + pending_inbound_connections: HashMap::new(), pending_connections: FuturesUnordered::new(), pending_raw_connections: FuturesUnordered::new(), }, @@ -385,6 +438,24 @@ impl Transport for WebSocketTransport { .map_or(Err(Error::ConnectionDoesntExist(connection_id)), |_| Ok(())) } + fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> { + let pending = self + .pending_inbound_connections + .remove(&connection_id) + .ok_or(Error::ConnectionDoesntExist(connection_id))?; + + self.on_inbound_connection(connection_id, pending.connection, pending.address); + + Ok(()) + } + + fn reject_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> { + self.canceled.insert(connection_id); + self.pending_open + .remove(&connection_id) + .map_or(Err(Error::ConnectionDoesntExist(connection_id)), |_| Ok(())) + } + fn open( &mut self, connection_id: ConnectionId, @@ -492,38 +563,19 @@ impl Stream for WebSocketTransport { while let Poll::Ready(Some(connection)) = self.listener.poll_next_unpin(cx) { match connection { Err(_) => return Poll::Ready(None), - Ok((stream, address)) => { + Ok((connection, address)) => { let connection_id = self.context.next_connection_id(); - let keypair = self.context.keypair.clone(); - let yamux_config = self.config.yamux_config.clone(); - let connection_open_timeout = self.config.connection_open_timeout; - let max_read_ahead_factor = self.config.noise_read_ahead_frame_count; - let max_write_buffer_size = self.config.noise_write_buffer_size; - let address = Multiaddr::empty() - .with(Protocol::from(address.ip())) - .with(Protocol::Tcp(address.port())) - .with(Protocol::Ws(std::borrow::Cow::Owned("/".to_string()))); - - self.pending_connections.push(Box::pin(async move { - match tokio::time::timeout(connection_open_timeout, async move { - WebSocketConnection::accept_connection( - stream, - connection_id, - keypair, - address, - yamux_config, - max_read_ahead_factor, - max_write_buffer_size, - ) - .await - .map_err(|error| WebSocketError::new(error, None)) - }) - .await - { - Err(_) => Err(WebSocketError::new(Error::Timeout, None)), - Ok(Err(error)) => Err(error), - Ok(Ok(result)) => Ok(result), - } + + self.pending_inbound_connections.insert( + connection_id, + PendingInboundConnection { + connection, + address, + }, + ); + + return Poll::Ready(Some(TransportEvent::PendingInboundConnection { + connection_id, })); } } From 5d5eb21937e700dbb6330d81bb88e9ca6d102780 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 5 Aug 2024 20:09:56 +0300 Subject: [PATCH 05/12] quic/tests: Adjust testing to accept connection Signed-off-by: Alexandru Vasile --- src/transport/quic/mod.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index 0dc3a202..794c05b1 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -577,6 +577,15 @@ mod tests { )); transport2.dial(ConnectionId::new(), listen_address).unwrap(); + + let event = transport1.next().await.unwrap(); + match event { + TransportEvent::PendingInboundConnection { connection_id } => { + transport1.accept_pending(connection_id).unwrap(); + } + _ => panic!("unexpected event"), + } + let (res1, res2) = tokio::join!(transport1.next(), transport2.next()); assert!(std::matches!( From f2a7a5ac2b6590fb988b5aa21cc78565151ca3a7 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 6 Aug 2024 12:39:02 +0300 Subject: [PATCH 06/12] tcp/tests: Check accept / reject connection Signed-off-by: Alexandru Vasile --- src/transport/tcp/connection.rs | 5 +- src/transport/tcp/mod.rs | 165 ++++++++++++++++++++++++++++++-- 2 files changed, 162 insertions(+), 8 deletions(-) diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index 1dd43898..3d3511e0 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -247,7 +247,10 @@ impl TcpConnection { }) .await { - Err(_) => Err(Error::Timeout), + Err(_) => { + tracing::trace!(target: LOG_TARGET, ?connection_id, "connection timed out during negotiation"); + Err(Error::Timeout) + } Ok(result) => result, } } diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index ecee6cee..14342dee 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -125,6 +125,13 @@ impl TcpTransport { let substream_open_timeout = self.config.substream_open_timeout; let keypair = self.context.keypair.clone(); + tracing::trace!( + target: LOG_TARGET, + ?connection_id, + ?address, + "accept connection", + ); + self.pending_connections.push(Box::pin(async move { TcpConnection::accept_connection( connection, @@ -152,7 +159,15 @@ impl TcpTransport { let (socket_address, _) = TcpAddress::multiaddr_to_socket_address(&address)?; let remote_address = match tokio::time::timeout(connection_open_timeout, socket_address.lookup_ip()).await { - Err(_) => return Err(Error::Timeout), + Err(_) => { + tracing::debug!( + target: LOG_TARGET, + ?address, + ?connection_open_timeout, + "failed to resolve address within timeout", + ); + return Err(Error::Timeout); + } Ok(Err(error)) => return Err(error), Ok(Ok(address)) => address, }; @@ -204,9 +219,24 @@ impl TcpTransport { }; match tokio::time::timeout(connection_open_timeout, future).await { - Err(_) => Err(Error::Timeout), + Err(_) => { + tracing::debug!( + target: LOG_TARGET, + ?connection_open_timeout, + "failed to connect within timeout", + ); + Err(Error::Timeout) + } Ok(Err(error)) => Err(error.into()), - Ok(Ok((address, stream))) => Ok((address, stream)), + Ok(Ok((address, stream))) => { + tracing::debug!( + target: LOG_TARGET, + ?address, + "connected", + ); + + Ok((address, stream)) + } } } } @@ -452,11 +482,21 @@ impl Stream for TcpTransport { type Item = TransportEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + tracing::trace!(target: LOG_TARGET, "poll transport"); + while let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { + tracing::trace!(target: LOG_TARGET, ?event, "event"); + match event { None | Some(Err(_)) => return Poll::Ready(None), Some(Ok((connection, address))) => { let connection_id = self.context.next_connection_id(); + tracing::trace!( + target: LOG_TARGET, + ?connection_id, + ?address, + "pending inbound TCP connection", + ); self.pending_inbound_connections.insert( connection_id, @@ -473,6 +513,8 @@ impl Stream for TcpTransport { } } + tracing::trace!(target: LOG_TARGET, "poll pending RAW connections"); + while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) { match result { Ok((connection_id, address, stream)) => { @@ -500,6 +542,8 @@ impl Stream for TcpTransport { } } + tracing::trace!(target: LOG_TARGET, "poll pending connections"); + while let Poll::Ready(Some(connection)) = self.pending_connections.poll_next_unpin(cx) { match connection { Ok(connection) => { @@ -524,6 +568,8 @@ impl Stream for TcpTransport { } } + tracing::trace!(target: LOG_TARGET, "return Pending"); + Poll::Pending } } @@ -614,18 +660,123 @@ mod tests { let (mut transport2, _) = TcpTransport::new(handle2, transport_config2).unwrap(); transport2.dial(ConnectionId::new(), listen_address).unwrap(); - let (res1, res2) = tokio::join!(transport1.next(), transport2.next()); + let (tx, mut from_transport2) = channel(64); + tokio::spawn(async move { + let event = transport2.next().await; + tx.send(event).await.unwrap(); + }); + let event = transport1.next().await.unwrap(); + match event { + TransportEvent::PendingInboundConnection { connection_id } => { + transport1.accept_pending(connection_id).unwrap(); + } + _ => panic!("unexpected event"), + } + + let event = transport1.next().await; assert!(std::matches!( - res1, + event, Some(TransportEvent::ConnectionEstablished { .. }) )); + + let event = from_transport2.recv().await.unwrap(); assert!(std::matches!( - res2, + event, Some(TransportEvent::ConnectionEstablished { .. }) )); } + #[tokio::test] + async fn connect_and_reject_works() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let keypair1 = Keypair::generate(); + let (tx1, _rx1) = channel(64); + let (event_tx1, _event_rx1) = channel(64); + let bandwidth_sink = BandwidthSink::new(); + + let handle1 = crate::transport::manager::TransportHandle { + executor: Arc::new(DefaultExecutor {}), + protocol_names: Vec::new(), + next_substream_id: Default::default(), + next_connection_id: Default::default(), + keypair: keypair1.clone(), + tx: event_tx1, + bandwidth_sink: bandwidth_sink.clone(), + + protocols: HashMap::from_iter([( + ProtocolName::from("/notif/1"), + ProtocolContext { + tx: tx1, + codec: ProtocolCodec::Identity(32), + fallback_names: Vec::new(), + }, + )]), + }; + let transport_config1 = Config { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }; + + let (mut transport1, listen_addresses) = + TcpTransport::new(handle1, transport_config1).unwrap(); + let listen_address = listen_addresses[0].clone(); + + let keypair2 = Keypair::generate(); + let (tx2, _rx2) = channel(64); + let (event_tx2, _event_rx2) = channel(64); + + let handle2 = crate::transport::manager::TransportHandle { + executor: Arc::new(DefaultExecutor {}), + protocol_names: Vec::new(), + next_substream_id: Default::default(), + next_connection_id: Default::default(), + keypair: keypair2.clone(), + tx: event_tx2, + bandwidth_sink: bandwidth_sink.clone(), + + protocols: HashMap::from_iter([( + ProtocolName::from("/notif/1"), + ProtocolContext { + tx: tx2, + codec: ProtocolCodec::Identity(32), + fallback_names: Vec::new(), + }, + )]), + }; + let transport_config2 = Config { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }; + + let (mut transport2, _) = TcpTransport::new(handle2, transport_config2).unwrap(); + transport2.dial(ConnectionId::new(), listen_address).unwrap(); + + let (tx, mut from_transport2) = channel(64); + tokio::spawn(async move { + let event = transport2.next().await; + tx.send(event).await.unwrap(); + }); + + // Reject connection. + let event = transport1.next().await.unwrap(); + match event { + TransportEvent::PendingInboundConnection { connection_id } => { + transport1.reject_pending(connection_id).unwrap(); + } + _ => panic!("unexpected event"), + } + + let event = from_transport2.recv().await.unwrap(); + assert!(std::matches!( + event, + Some(TransportEvent::DialFailure { .. }) + )); + } + #[tokio::test] async fn dial_failure() { let _ = tracing_subscriber::fmt() @@ -711,7 +862,7 @@ mod tests { transport2.dial(ConnectionId::new(), address).unwrap(); - // spawn the other conection in the background as it won't return anything + // spawn the other connection in the background as it won't return anything tokio::spawn(async move { loop { let _ = event_rx1.recv().await; From 45110f1e0ccb35a45c23d0761eec0c849452f8d6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 6 Aug 2024 19:30:16 +0300 Subject: [PATCH 07/12] Fix clippy Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 14 +++++--------- src/transport/manager/mod.rs | 10 +++------- src/transport/tcp/mod.rs | 24 ++++++------------------ src/transport/websocket/mod.rs | 12 ++++++------ 4 files changed, 20 insertions(+), 40 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index cb7e89b6..a6a1dfd9 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -125,11 +125,9 @@ impl ConnectionLimits { return Err(ConnectionLimitsError::MaxIncomingConnectionsExceeded); } } - } else { - if let Some(max_outgoing_connections) = self.config.max_outgoing_connections { - if self.outgoing_connections.len() >= max_outgoing_connections { - return Err(ConnectionLimitsError::MaxOutgoingConnectionsExceeded); - } + } else if let Some(max_outgoing_connections) = self.config.max_outgoing_connections { + if self.outgoing_connections.len() >= max_outgoing_connections { + return Err(ConnectionLimitsError::MaxOutgoingConnectionsExceeded); } } @@ -138,10 +136,8 @@ impl ConnectionLimits { if self.config.max_incoming_connections.is_some() { self.incoming_connections.insert(connection_id); } - } else { - if self.config.max_outgoing_connections.is_some() { - self.outgoing_connections.insert(connection_id); - } + } else if self.config.max_outgoing_connections.is_some() { + self.outgoing_connections.insert(connection_id); } Ok(()) diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 1bc39ef7..4689097c 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -508,18 +508,14 @@ impl TransportManager { record.set_connection_id(connection_id); #[cfg(feature = "quic")] - if address.iter().find(|p| std::matches!(p, Protocol::QuicV1)).is_some() { + if address.iter().any(|p| std::matches!(&p, Protocol::QuicV1)) { quic.push(address.clone()); transports.insert(SupportedTransport::Quic); continue; } #[cfg(feature = "websocket")] - if address - .iter() - .find(|p| std::matches!(p, Protocol::Ws(_) | Protocol::Wss(_))) - .is_some() - { + if address.iter().any(|p| std::matches!(&p, Protocol::Ws(_) | Protocol::Wss(_))) { websocket.push(address.clone()); transports.insert(SupportedTransport::WebSocket); continue; @@ -2595,7 +2591,7 @@ mod tests { peer_context.state = PeerState::Connected { record, - dial_record: dial_record, + dial_record, }; } diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 37e5fcf4..14cbbf04 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -482,13 +482,9 @@ impl Stream for TcpTransport { type Item = TransportEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - tracing::trace!(target: LOG_TARGET, "poll transport"); - - while let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { - tracing::trace!(target: LOG_TARGET, ?event, "event"); - - match event { - None | Some(Err(_)) => return Poll::Ready(None), + if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { + return match event { + None | Some(Err(_)) => Poll::Ready(None), Some(Ok((connection, address))) => { let connection_id = self.context.next_connection_id(); tracing::trace!( @@ -506,15 +502,13 @@ impl Stream for TcpTransport { }, ); - return Poll::Ready(Some(TransportEvent::PendingInboundConnection { + Poll::Ready(Some(TransportEvent::PendingInboundConnection { connection_id, - })); + })) } - } + }; } - tracing::trace!(target: LOG_TARGET, "poll pending RAW connections"); - while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) { match result { Ok((connection_id, address, stream)) => { @@ -542,8 +536,6 @@ impl Stream for TcpTransport { } } - tracing::trace!(target: LOG_TARGET, "poll pending connections"); - while let Poll::Ready(Some(connection)) = self.pending_connections.poll_next_unpin(cx) { match connection { Ok(connection) => { @@ -568,8 +560,6 @@ impl Stream for TcpTransport { } } - tracing::trace!(target: LOG_TARGET, "return Pending"); - Poll::Pending } } @@ -698,7 +688,6 @@ mod tests { let handle1 = crate::transport::manager::TransportHandle { executor: Arc::new(DefaultExecutor {}), - protocol_names: Vec::new(), next_substream_id: Default::default(), next_connection_id: Default::default(), keypair: keypair1.clone(), @@ -729,7 +718,6 @@ mod tests { let handle2 = crate::transport::manager::TransportHandle { executor: Arc::new(DefaultExecutor {}), - protocol_names: Vec::new(), next_substream_id: Default::default(), next_connection_id: Default::default(), keypair: keypair2.clone(), diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 51a4c103..e0d10587 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -560,9 +560,9 @@ impl Stream for WebSocketTransport { type Item = TransportEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - while let Poll::Ready(Some(connection)) = self.listener.poll_next_unpin(cx) { - match connection { - Err(_) => return Poll::Ready(None), + if let Poll::Ready(Some(connection)) = self.listener.poll_next_unpin(cx) { + return match connection { + Err(_) => Poll::Ready(None), Ok((connection, address)) => { let connection_id = self.context.next_connection_id(); @@ -574,11 +574,11 @@ impl Stream for WebSocketTransport { }, ); - return Poll::Ready(Some(TransportEvent::PendingInboundConnection { + Poll::Ready(Some(TransportEvent::PendingInboundConnection { connection_id, - })); + })) } - } + }; } while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) { From 13ef0ba4ee8d6e9ea0f4c42c7b271bf207000d0c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 6 Aug 2024 19:44:48 +0300 Subject: [PATCH 08/12] tranpsort: Easy path for connection limits Signed-off-by: Alexandru Vasile --- src/transport/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/transport/mod.rs b/src/transport/mod.rs index cacb728a..0746b9e7 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -39,6 +39,8 @@ pub mod websocket; pub(crate) mod dummy; pub(crate) mod manager; +pub use manager::limits::{ConnectionLimitsConfig, ConnectionLimitsError}; + /// Timeout for opening a connection. pub(crate) const CONNECTION_OPEN_TIMEOUT: Duration = Duration::from_secs(10); From 644613eb8541891e9ce214c9650be30ead4e3804 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 6 Aug 2024 19:45:34 +0300 Subject: [PATCH 09/12] quic: Fix clippy Signed-off-by: Alexandru Vasile --- src/transport/quic/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index 86beb617..90fe0cee 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -443,7 +443,7 @@ impl Stream for QuicTransport { type Item = TransportEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - while let Poll::Ready(Some(connection)) = self.listener.poll_next_unpin(cx) { + if let Poll::Ready(Some(connection)) = self.listener.poll_next_unpin(cx) { let connection_id = self.context.next_connection_id(); tracing::trace!( From c06220afb8e3e32284ad7617575708baf14d1965 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 7 Aug 2024 20:36:16 +0300 Subject: [PATCH 10/12] webrtc: Add debug asserts for unreachable code paths Signed-off-by: Alexandru Vasile --- src/transport/webrtc/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/transport/webrtc/mod.rs b/src/transport/webrtc/mod.rs index 10ee59ad..7dce743d 100644 --- a/src/transport/webrtc/mod.rs +++ b/src/transport/webrtc/mod.rs @@ -496,6 +496,7 @@ impl Transport for WebRtcTransport { "webrtc cannot dial", ); + debug_assert!(false); Err(Error::NotSupported("webrtc cannot dial peers".to_string())) } @@ -506,6 +507,7 @@ impl Transport for WebRtcTransport { "webrtc cannot accept pending connections", ); + debug_assert!(false); Err(Error::NotSupported( "webrtc cannot accept pending connections".to_string(), )) @@ -518,6 +520,7 @@ impl Transport for WebRtcTransport { "webrtc cannot reject pending connections", ); + debug_assert!(false); Err(Error::NotSupported( "webrtc cannot reject pending connections".to_string(), )) From 366b0a882a997cab6c612ea86998083f32908ab9 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Wed, 7 Aug 2024 20:38:10 +0300 Subject: [PATCH 11/12] Update src/transport/websocket/mod.rs Co-authored-by: Dmitry Markin --- src/transport/websocket/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index e0d10587..82d5a3e7 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -154,7 +154,7 @@ impl WebSocketTransport { let address = Multiaddr::empty() .with(Protocol::from(address.ip())) .with(Protocol::Tcp(address.port())) - .with(Protocol::Ws(std::borrow::Cow::Owned("/".to_string()))); + .with(Protocol::Ws(std::borrow::Cow::Borrowed("/"))); self.pending_connections.push(Box::pin(async move { match tokio::time::timeout(connection_open_timeout, async move { From 85228767c1154c64ea94de6fb15ea89aeb332b79 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Wed, 7 Aug 2024 20:38:18 +0300 Subject: [PATCH 12/12] Update src/transport/websocket/mod.rs Co-authored-by: Dmitry Markin --- src/transport/websocket/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 82d5a3e7..617c2eef 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -450,7 +450,6 @@ impl Transport for WebSocketTransport { } fn reject_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> { - self.canceled.insert(connection_id); self.pending_open .remove(&connection_id) .map_or(Err(Error::ConnectionDoesntExist(connection_id)), |_| Ok(()))