From 382234b97dd9ad6ea37aed3a4c5277559a1fd0cb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 28 Sep 2024 10:08:02 +0200 Subject: [PATCH] feat: use recvmmsg in addition to GRO Previously we would only do GRO. --- .github/workflows/check.yml | 1 - neqo-bin/src/client/http09.rs | 2 +- neqo-bin/src/client/http3.rs | 4 +- neqo-bin/src/client/mod.rs | 15 +-- neqo-bin/src/server/http09.rs | 4 +- neqo-bin/src/server/mod.rs | 83 ++++++++------- neqo-bin/src/udp.rs | 3 +- neqo-http3/src/connection_client.rs | 12 ++- neqo-transport/src/connection/mod.rs | 52 ++++++---- neqo-transport/src/server.rs | 8 +- neqo-udp/src/lib.rs | 145 +++++++++++++++++---------- 11 files changed, 202 insertions(+), 127 deletions(-) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 490c7047c4..8cded22681 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -98,5 +98,4 @@ jobs: if: matrix.type == 'debug' && matrix.rust-toolchain == 'stable' bench: - needs: [check] uses: ./.github/workflows/bench.yml diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index f4bb818182..fa7ab95d8d 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -179,7 +179,7 @@ impl TryFrom<&State> for CloseState { impl super::Client for Connection { fn process_into_buffer<'a>( &mut self, - input: Option>, + input: Option>>, now: Instant, out: &'a mut Vec, ) -> Output<&'a [u8]> { diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index b571c2a0ce..9b6eb86935 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -128,9 +128,9 @@ impl super::Client for Http3Client { self.state().try_into() } - fn process_into_buffer<'a>( + fn process_into_buffer<'a, 'b>( &mut self, - input: Option>, + input: Option>>, now: Instant, out: &'a mut Vec, ) -> Output<&'a [u8]> { diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 77eaa1d22d..b6481633aa 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -374,7 +374,7 @@ enum CloseState { trait Client { fn process_into_buffer<'a>( &mut self, - input: Option>, + input: Option>>, now: Instant, out: &'a mut Vec, ) -> Output<&'a [u8]>; @@ -457,16 +457,17 @@ impl<'a, H: Handler> Runner<'a, H> { async fn process(&mut self, mut should_read: bool) -> Result<(), io::Error> { loop { - let dgram = should_read + let dgrams = should_read .then(|| self.socket.recv(&self.local_addr, &mut self.recv_buf)) .transpose()? .flatten(); - should_read = dgram.is_some(); + should_read = dgrams.is_some(); - match self - .client - .process_into_buffer(dgram, Instant::now(), &mut self.send_buf) - { + match self.client.process_into_buffer( + dgrams.as_ref().map(|d| d.iter()), + Instant::now(), + &mut self.send_buf, + ) { Output::Datagram(dgram) => { self.socket.writable().await?; self.socket.send(dgram)?; diff --git a/neqo-bin/src/server/http09.rs b/neqo-bin/src/server/http09.rs index 379a835ae7..f91dd39356 100644 --- a/neqo-bin/src/server/http09.rs +++ b/neqo-bin/src/server/http09.rs @@ -189,9 +189,9 @@ impl super::HttpServer for HttpServer { &mut self, dgram: Option>, now: Instant, - out: &'a mut Vec, + write_buffer: &'a mut Vec, ) -> Output<&'a [u8]> { - self.server.process_into_buffer(dgram, now, out) + self.server.process_into_buffer(dgram, now, write_buffer) } fn process_events(&mut self, now: Instant) { diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index ae8f4e6812..131594e054 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -232,57 +232,66 @@ impl ServerRunner { } async fn process(&mut self, mut socket_inx: Option) -> Result<(), io::Error> { - loop { - let mut dgram = if let Some(inx) = socket_inx { + // TODO: Cleanup! We should really be passing a set of datagrams to neqo_transport server! + 'outer: loop { + let dgrams = if let Some(inx) = socket_inx { let (host, socket) = self.sockets.get_mut(inx).unwrap(); - let dgram = socket.recv(host, &mut self.recv_buf)?; - if dgram.is_none() { + let dgrams = socket.recv(host, &mut self.recv_buf)?; + if dgrams.is_none() { // Done reading. socket_inx.take(); } - dgram + dgrams } else { None }; - match self - .server - .process_into_buffer(dgram.take(), (self.now)(), &mut self.send_buf) - { - Output::Datagram(dgram) => { - // Find outbound socket. If none match, take the first. - let socket = if let Some(socket) = - self.sockets.iter_mut().find_map(|(_host, socket)| { + let mut dgrams = dgrams.iter().map(|d| d.iter()).flatten().peekable(); + + 'inner: loop { + match self.server.process_into_buffer( + dgrams.next(), + (self.now)(), + &mut self.send_buf, + ) { + Output::Datagram(dgram) => { + // Find outbound socket. If none match, take the first. + let socket = if let Some(socket) = + self.sockets.iter_mut().find_map(|(_host, socket)| { + socket + .local_addr() + .ok() + .map_or(false, |socket_addr| socket_addr == dgram.source()) + .then_some(socket) + }) { socket - .local_addr() - .ok() - .map_or(false, |socket_addr| socket_addr == dgram.source()) - .then_some(socket) - }) { - socket - } else { - &mut self.sockets.iter_mut().next().unwrap().1 - }; - - socket.writable().await?; - socket.send(dgram)?; - self.send_buf.clear(); - continue; + } else { + &mut self.sockets.iter_mut().next().unwrap().1 + }; + + socket.writable().await?; + socket.send(dgram)?; + self.send_buf.clear(); + continue 'inner; + } + Output::Callback(new_timeout) => { + qdebug!("Setting timeout of {:?}", new_timeout); + self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); + } + Output::None => {} } - Output::Callback(new_timeout) => { - qdebug!("Setting timeout of {:?}", new_timeout); - self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); + + if dgrams.peek().is_some() { + continue 'inner; } - Output::None => {} - } - if socket_inx.is_none() { - // No socket to read and nothing to write. - break; + if socket_inx.is_some() { + continue 'outer; + } + + return Ok(()); } } - - Ok(()) } // Wait for any of the sockets to be readable or the timeout to fire. diff --git a/neqo-bin/src/udp.rs b/neqo-bin/src/udp.rs index 1624ac6241..bc333ce306 100644 --- a/neqo-bin/src/udp.rs +++ b/neqo-bin/src/udp.rs @@ -55,11 +55,12 @@ impl Socket { /// Receive a batch of [`Datagram`]s on the given [`Socket`], each set with /// the provided local address. + // TODO: Option needed? pub fn recv<'a>( &self, local_address: &SocketAddr, recv_buf: &'a mut Vec, - ) -> Result>, io::Error> { + ) -> Result>, io::Error> { self.inner .try_io(tokio::io::Interest::READABLE, || { neqo_udp::recv_inner(local_address, &self.state, &self.inner, recv_buf) diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index a1935ea13d..1e804c0fb9 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -854,18 +854,20 @@ impl Http3Client { .stats(&mut self.conn) } - pub fn process_into_buffer<'a>( + pub fn process_into_buffer<'a, 'b>( &mut self, - input: Option>, + input: Option>>, now: Instant, out: &'a mut Vec, ) -> Output<&'a [u8]> { qtrace!([self], "Process."); if let Some(d) = input { - self.conn.process_input(d, now); + self.conn.process_input_2(d, now); } self.process_http3(now); - let out = self.conn.process_into_buffer(None, now, out); + let out = self + .conn + .process_into_buffer(None::>>, now, out); self.process_http3(now); out } @@ -879,7 +881,7 @@ impl Http3Client { /// new [`Vec`]. pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { let mut out = vec![]; - self.process_into_buffer(dgram.map(Into::into), now, &mut out) + self.process_into_buffer(dgram.map(Into::into).map(std::iter::once), now, &mut out) .map_datagram(Into::into) } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 8ad41422d5..f8e023e0eb 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -1040,7 +1040,18 @@ impl Connection { /// Process new input datagrams on the connection. pub fn process_input<'a>(&mut self, d: impl Into>, now: Instant) { - self.input(d.into(), now, now); + self.input(std::iter::once(d.into()), now, now); + self.process_saved(now); + self.streams.cleanup_closed_streams(); + } + + /// Process new input datagrams on the connection. + pub fn process_input_2<'a, 'b>( + &mut self, + d: impl Iterator>, + now: Instant, + ) { + self.input(d, now, now); self.process_saved(now); self.streams.cleanup_closed_streams(); } @@ -1156,7 +1167,7 @@ impl Connection { #[must_use = "Output of the process function must be handled"] pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { let mut out = vec![]; - self.process_into_buffer(dgram.map(Into::into), now, &mut out) + self.process_into_buffer(dgram.map(Into::into).map(std::iter::once), now, &mut out) .map_datagram(Into::into) } @@ -1166,9 +1177,9 @@ impl Connection { /// /// Panics when `out` is not empty. #[must_use = "Output of the process function must be handled"] - pub fn process_into_buffer<'a>( + pub fn process_into_buffer<'a, 'b>( &mut self, - input: Option>, + input: Option>>, now: Instant, out: &'a mut Vec, ) -> Output<&'a [u8]> { @@ -1291,7 +1302,7 @@ impl Connection { debug_assert!(self.crypto.states.rx_hp(self.version, cspace).is_some()); for saved in self.saved_datagrams.take_saved() { qtrace!([self], "input saved @{:?}: {:?}", saved.t, saved.d); - self.input((&saved.d).into(), saved.t, now); + self.input(std::iter::once((&saved.d).into()), saved.t, now); } } } @@ -1544,18 +1555,25 @@ impl Connection { /// Take a datagram as input. This reports an error if the packet was bad. /// This takes two times: when the datagram was received, and the current time. - fn input(&mut self, d: Datagram<&[u8]>, received: Instant, now: Instant) { - // First determine the path. - let path = self.paths.find_path_with_rebinding( - d.destination(), - d.source(), - self.conn_params.get_cc_algorithm(), - self.conn_params.pacing_enabled(), - now, - ); - path.borrow_mut().add_received(d.len()); - let res = self.input_path(&path, d, received); - self.capture_error(Some(path), now, 0, res).ok(); + fn input<'a>( + &mut self, + dgrams: impl Iterator>, + received: Instant, + now: Instant, + ) { + for d in dgrams { + // First determine the path. + let path = self.paths.find_path_with_rebinding( + d.destination(), + d.source(), + self.conn_params.get_cc_algorithm(), + self.conn_params.pacing_enabled(), + now, + ); + path.borrow_mut().add_received(d.len()); + let res = self.input_path(&path, d, received); + self.capture_error(Some(path), now, 0, res).ok(); + } } fn input_path(&mut self, path: &PathRef, d: Datagram<&[u8]>, now: Instant) -> Res<()> { diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 2446f8d355..26f2571821 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -327,7 +327,7 @@ impl Server { match sconn { Ok(mut c) => { self.setup_connection(&mut c, initial, orig_dcid); - let out = c.process_into_buffer(Some(dgram), now, out); + let out = c.process_into_buffer(Some(std::iter::once(dgram)), now, out); self.connections.push(Rc::new(RefCell::new(c))); out } @@ -368,7 +368,9 @@ impl Server { .iter_mut() .find(|c| c.borrow().is_valid_local_cid(packet.dcid())) { - return c.borrow_mut().process_into_buffer(Some(dgram), now, out); + return c + .borrow_mut() + .process_into_buffer(Some(std::iter::once(dgram)), now, out); } if packet.packet_type() == PacketType::Short { @@ -446,7 +448,7 @@ impl Server { for connection in &mut self.connections { match connection.borrow_mut().process_into_buffer( - None, + None::>>, now, // See .github/workflows/polonius.yml. unsafe { &mut *std::ptr::from_mut(out) }, diff --git a/neqo-udp/src/lib.rs b/neqo-udp/src/lib.rs index 4a31a23ee8..d805494720 100644 --- a/neqo-udp/src/lib.rs +++ b/neqo-udp/src/lib.rs @@ -9,18 +9,18 @@ use std::{ io::{self, IoSliceMut}, net::SocketAddr, - slice, }; -use neqo_common::{qdebug, qtrace, Datagram, IpTos}; -use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState}; +use neqo_common::{qtrace, Datagram, IpTos}; +use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState, BATCH_SIZE}; /// Socket receive buffer size. /// /// Allows reading multiple datagrams in a single [`Socket::recv`] call. // // TODO: Experiment with different values across platforms. -pub const RECV_BUF_SIZE: usize = u16::MAX as usize; +// TODO: This might be too large on e.g. Linux. +pub const RECV_BUF_SIZE: usize = u16::MAX as usize * BATCH_SIZE; pub fn send_inner( state: &UdpSocketState, @@ -53,51 +53,91 @@ use std::os::fd::AsFd as SocketRef; use std::os::windows::io::AsSocket as SocketRef; pub fn recv_inner<'a>( + // TODO Implements Copy local_address: &SocketAddr, state: &UdpSocketState, socket: impl SocketRef, recv_buf: &'a mut Vec, -) -> Result, io::Error> { - let mut meta; - - let data = loop { - meta = RecvMeta::default(); - - state.recv( - (&socket).into(), - &mut [IoSliceMut::new(recv_buf.as_mut())], - slice::from_mut(&mut meta), - )?; - - if meta.len == 0 || meta.stride == 0 { - qdebug!( - "ignoring datagram from {} to {} len {} stride {}", - meta.addr, - local_address, - meta.len, - meta.stride - ); - continue; - } - - break &recv_buf[..meta.len]; +) -> Result, io::Error> { + let mut metas = [RecvMeta::default(); BATCH_SIZE]; + + let mut iovs: [IoSliceMut; BATCH_SIZE] = { + let recv_buf_len = recv_buf.len(); + let mut bufs = recv_buf + .chunks_mut(recv_buf_len / BATCH_SIZE) + .map(IoSliceMut::new); + + // TODO + // expect() safe as self.recv_buf is chunked into BATCH_SIZE items + // and iovs will be of size BATCH_SIZE, thus from_fn is called + // exactly BATCH_SIZE times. + std::array::from_fn(|_| bufs.next().expect("BATCH_SIZE elements")) }; - qtrace!( - "received {} bytes from {} to {} with {} segments", - data.len(), - meta.addr, - local_address, - meta.len.div_ceil(meta.stride), - ); + let msgs = state.recv((&socket).into(), &mut iovs, &mut metas)?; + + // TODO: What to do in the empty case? + // if meta.len == 0 || meta.stride == 0 { + // qdebug!( + // "ignoring datagram from {} to {} len {} stride {}", + // meta.addr, + // local_address, + // meta.len, + // meta.stride + // ); + // continue; + // } + + // TODO + // qtrace!( + // "received {} bytes from {} to {} with {} segments", + // data.len(), + // meta.addr, + // local_address, + // meta.len.div_ceil(meta.stride), + // ); + + Ok(Datagrams { + metas, + iovs, + msgs, + local_address: *local_address, + }) +} - Ok(Datagram::new( - meta.addr, - *local_address, - meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(), - data, - Some(meta.stride), - )) +pub struct Datagrams<'a> { + metas: [RecvMeta; BATCH_SIZE], + iovs: [IoSliceMut<'a>; BATCH_SIZE], + msgs: usize, + local_address: SocketAddr, +} + +// TODO: Rework. +impl<'a> std::fmt::Debug for Datagrams<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Datagrams") + .field("msgs", &self.msgs) + .field("local_address", &self.local_address) + .finish() + } +} + +impl<'a> Datagrams<'a> { + pub fn iter(&'a self) -> impl Iterator> { + self.metas + .iter() + .zip(self.iovs.iter()) + .take(self.msgs) + .map(|(meta, iov)| { + Datagram::new( + meta.addr, + self.local_address, + meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(), + &**iov, + Some(meta.stride), + ) + }) + } } /// A wrapper around a UDP socket, sending and receiving [`Datagram`]s. @@ -126,7 +166,7 @@ impl Socket { &self, local_address: &SocketAddr, recv_buf: &'a mut Vec, - ) -> Result, io::Error> { + ) -> Result, io::Error> { recv_inner(local_address, &self.state, &self.inner, recv_buf) } } @@ -185,9 +225,10 @@ mod tests { sender.send(datagram)?; let mut recv_buf = vec![0; RECV_BUF_SIZE]; - let received_datagram = receiver + let received_datagrams = receiver .recv(&receiver_addr, &mut recv_buf) .expect("receive to succeed"); + let received_datagram = received_datagrams.iter().next().unwrap(); // Assert that the ECN is correct. assert_eq!( @@ -230,15 +271,17 @@ mod tests { let mut recv_buf = vec![0; RECV_BUF_SIZE]; while num_received < max_gso_segments { recv_buf.clear(); - let dgram = receiver + let dgrams = receiver .recv(&receiver_addr, &mut recv_buf) .expect("receive to succeed"); - assert_eq!( - SEGMENT_SIZE, - dgram.segment_size(), - "Expect received datagrams to have same length as sent datagrams." - ); - num_received += dgram.num_segments(); + for dgram in dgrams.iter() { + assert_eq!( + SEGMENT_SIZE, + dgram.segment_size(), + "Expect received datagrams to have same length as sent datagrams." + ); + num_received += dgram.num_segments(); + } } Ok(())