Skip to content

Commit

Permalink
feat(s2n-quic-dc): accept linger parameter instead of always setting …
Browse files Browse the repository at this point in the history
…it (#2476)
  • Loading branch information
camshaft authored Feb 18, 2025
1 parent 5c78b33 commit dad94a7
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 10 deletions.
8 changes: 6 additions & 2 deletions dc/s2n-quic-dc/src/stream/client/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
socket::Protocol,
},
};
use std::{io, net::SocketAddr};
use std::{io, net::SocketAddr, time::Duration};
use tokio::net::TcpStream;

/// Connects using the UDP transport layer
Expand Down Expand Up @@ -54,6 +54,7 @@ pub async fn connect_tcp<H, Sub>(
acceptor_addr: SocketAddr,
env: &Environment<Sub>,
subscriber: Sub,
linger: Option<Duration>,
) -> io::Result<Stream<Sub>>
where
H: core::future::Future<Output = io::Result<secret::map::Peer>>,
Expand All @@ -64,7 +65,10 @@ where

// Make sure TCP_NODELAY is set
let _ = socket.set_nodelay(true);
let _ = socket.set_linger(Some(core::time::Duration::ZERO));

if linger.is_some() {
let _ = socket.set_linger(linger);
}

// if the acceptor_ip isn't known, then ask the socket to resolve it for us
let peer_addr = if acceptor_addr.ip().is_unspecified() {
Expand Down
5 changes: 5 additions & 0 deletions dc/s2n-quic-dc/src/stream/server/tokio/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
};
use core::{future::poll_fn, task::Poll};
use s2n_quic_core::{inet::SocketAddress, time::Clock};
use std::time::Duration;
use tokio::net::TcpListener;
use tracing::debug;

Expand All @@ -26,6 +27,7 @@ where
secrets: secret::Map,
backlog: usize,
accept_flavor: accept::Flavor,
linger: Option<Duration>,
subscriber: Sub,
}

Expand All @@ -42,6 +44,7 @@ where
secrets: &secret::Map,
backlog: usize,
accept_flavor: accept::Flavor,
linger: Option<Duration>,
subscriber: Sub,
) -> Self {
let acceptor = Self {
Expand All @@ -51,6 +54,7 @@ where
secrets: secrets.clone(),
backlog,
accept_flavor,
linger,
subscriber,
};

Expand Down Expand Up @@ -98,6 +102,7 @@ where
workers.insert(
remote_address,
socket,
self.linger,
&mut context,
subscriber_ctx,
&publisher,
Expand Down
3 changes: 3 additions & 0 deletions dc/s2n-quic-dc/src/stream/server/tokio/tcp/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ where
&mut self,
remote_address: SocketAddress,
stream: W::Stream,
linger: Option<Duration>,
cx: &mut W::Context,
connection_context: W::ConnectionContext,
publisher: &Pub,
Expand All @@ -179,6 +180,7 @@ where
self.inner.workers[idx].worker.replace(
remote_address,
stream,
linger,
connection_context,
publisher,
clock,
Expand Down Expand Up @@ -377,6 +379,7 @@ pub trait Worker {
&mut self,
remote_address: SocketAddress,
stream: Self::Stream,
linger: Option<Duration>,
connection_context: Self::ConnectionContext,
publisher: &Pub,
clock: &C,
Expand Down
2 changes: 2 additions & 0 deletions dc/s2n-quic-dc/src/stream/server/tokio/tcp/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl super::Worker for Worker {
&mut self,
_remote_address: SocketAddress,
_stream: Self::Stream,
_linger: Option<Duration>,
_connection_context: Self::ConnectionContext,
_publisher: &Pub,
clock: &C,
Expand Down Expand Up @@ -160,6 +161,7 @@ impl Harness {
self.manager.insert(
SocketAddress::default(),
(),
None,
&mut (),
(),
&publisher(&self.subscriber, &self.clock),
Expand Down
31 changes: 26 additions & 5 deletions dc/s2n-quic-dc/src/stream/server/tokio/tcp/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ where
&mut self,
remote_address: SocketAddress,
stream: TcpStream,
linger: Option<Duration>,
subscriber_ctx: Self::ConnectionContext,
publisher: &Pub,
clock: &C,
Expand All @@ -107,7 +108,10 @@ where
{
// Make sure TCP_NODELAY is set
let _ = stream.set_nodelay(true);
let _ = stream.set_linger(Some(Duration::ZERO));

if linger.is_some() {
let _ = stream.set_linger(linger);
}

let now = clock.get_time();

Expand All @@ -116,7 +120,14 @@ where
let prev_stream = core::mem::replace(&mut self.stream, Some((stream, remote_address)));
let prev_ctx = core::mem::replace(&mut self.subscriber_ctx, Some(subscriber_ctx));

if let Some(remote_address) = prev_stream.map(|(_socket, remote_address)| remote_address) {
if let Some(remote_address) = prev_stream.map(|(socket, remote_address)| {
// If linger wasn't already set or it was set to a value other than 0, then override it
if linger.is_none() || linger != Some(Duration::ZERO) {
// close the stream immediately and send a reset to the client
let _ = socket.set_linger(Some(Duration::ZERO));
}
remote_address
}) {
let sojourn_time = now.saturating_duration_since(prev_queue_time);
let buffer_len = match prev_state {
WorkerState::Init => 0,
Expand Down Expand Up @@ -331,6 +342,10 @@ impl WorkerState {
error: error.error,
};
continue;
} else {
// close the stream immediately and send a reset to the client
let _ = socket.set_linger(Some(Duration::ZERO));
drop(socket);
}
}
return Err(Some(error.error)).into();
Expand Down Expand Up @@ -381,16 +396,15 @@ impl WorkerState {
}

#[inline]
fn poll_initial_packet<S, Pub>(
fn poll_initial_packet<Pub>(
cx: &mut task::Context,
stream: &mut S,
stream: &mut TcpStream,
remote_address: &SocketAddress,
recv_buffer: &mut msg::recv::Message,
sojourn_time: Duration,
publisher: &Pub,
) -> Poll<Result<server::InitialPacket, Option<io::Error>>>
where
S: Socket,
Pub: EndpointPublisher,
{
loop {
Expand All @@ -403,6 +417,10 @@ impl WorkerState {
sojourn_time,
},
);

// close the stream immediately and send a reset to the client
let _ = stream.set_linger(Some(Duration::ZERO));

return Err(None).into();
}

Expand Down Expand Up @@ -437,6 +455,9 @@ impl WorkerState {
},
);

// close the stream immediately and send a reset to the client
let _ = stream.set_linger(Some(Duration::ZERO));

return Err(None).into();
}
}
Expand Down
22 changes: 19 additions & 3 deletions dc/s2n-quic-dc/src/stream/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,14 @@ impl Client {

match server.protocol {
Protocol::Tcp => {
stream_client::connect_tcp(handshake, server.local_addr, &self.env, subscriber)
.await
stream_client::connect_tcp(
handshake,
server.local_addr,
&self.env,
subscriber,
None,
)
.await
}
Protocol::Udp => {
stream_client::connect_udp(handshake, server.local_addr, &self.env, subscriber)
Expand Down Expand Up @@ -181,6 +187,8 @@ mod drop_handle {
}

pub mod server {
use std::time::Duration;

use super::*;

#[derive(Clone)]
Expand All @@ -201,6 +209,7 @@ pub mod server {
flavor: accept::Flavor,
protocol: Protocol,
map_capacity: usize,
linger: Option<Duration>,
subscriber: event::testing::Subscriber,
}

Expand All @@ -211,6 +220,7 @@ pub mod server {
flavor: accept::Flavor::default(),
protocol: Protocol::Tcp,
map_capacity: 16,
linger: None,
subscriber: event::testing::Subscriber::no_snapshot(),
}
}
Expand Down Expand Up @@ -255,6 +265,11 @@ pub mod server {
self
}

pub fn linger(mut self, linger: Duration) -> Self {
self.linger = Some(linger);
self
}

pub fn subscriber(mut self, subscriber: event::testing::Subscriber) -> Self {
self.subscriber = subscriber;
self
Expand All @@ -266,6 +281,7 @@ pub mod server {
flavor,
protocol,
map_capacity,
linger,
subscriber,
} = self;

Expand All @@ -291,7 +307,7 @@ pub mod server {
let socket = tokio::net::TcpListener::from_std(socket).unwrap();

let acceptor = stream_server::tcp::Acceptor::new(
0, socket, &sender, &env, &map, backlog, flavor, subscriber,
0, socket, &sender, &env, &map, backlog, flavor, linger, subscriber,
);
let acceptor = drop_handle_receiver.wrap(acceptor.run());
let acceptor = acceptor.instrument(tracing::info_span!("tcp"));
Expand Down

0 comments on commit dad94a7

Please sign in to comment.