Skip to content

Commit

Permalink
fix(udp): handle multiple datagrams on GRO (#1708)
Browse files Browse the repository at this point in the history
* Test multi packet GRO read

* fix(udp): handle multiple datagrams through gro

Previously `Socket::recv` would at most return a single `Datagram` (i.e. `->
Result<Option<Datagram>, io::Error>`). When supported by the OS, the underlying
`quinn-udp` can use both recvMmsg and GRO, each with the ability to return one
or more datagrams.

As of today, `neqo_common::udp` does not make use of recvmmsg, i.e. it only
provides a single `IoSliceMut` to write into. That said, that single
`IoSliceMut` might contain multiple `Datagram`s through GRO. Previously this
would have been provided as a single `Datagram` to the caller of `Socket::recv`.

This commit makes sure to handle the case where many `Datagram`s are retrieved
via GRO (see `meta.stride` flag). It updates `neqo_common::udp::Socket::recv`
and `neqo-server` and `neqo-client` accordingly.

* fix: support single gso sendmmsg to result in multiple gro recvmmsg

E.g. the case on CI on windows runner.

* Reduce diff in client
  • Loading branch information
mxinden authored Mar 5, 2024
1 parent 1ec476e commit a56e092
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 20 deletions.
16 changes: 10 additions & 6 deletions neqo-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,11 +824,13 @@ impl<'a> ClientRunner<'a> {

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgram = self.socket.recv(&self.local_addr)?;
if dgram.is_none() {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
self.process(dgram.as_ref()).await?;
for dgram in &dgrams {
self.process(Some(dgram)).await?;
}
self.handler.maybe_key_update(&mut self.client)?;
},
Ready::Timeout => {
Expand Down Expand Up @@ -1337,11 +1339,13 @@ mod old {

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgram = self.socket.recv(&self.local_addr)?;
if dgram.is_none() {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
self.process(dgram.as_ref()).await?;
for dgram in &dgrams {
self.process(Some(dgram)).await?;
}
self.handler.maybe_key_update(&mut self.client)?;
},
Ready::Timeout => {
Expand Down
82 changes: 71 additions & 11 deletions neqo-common/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Socket {
}

/// Receive a UDP datagram on the specified socket.
pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Option<Datagram>, io::Error> {
pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
let mut meta = RecvMeta::default();

match self.socket.try_io(Interest::READABLE, || {
Expand All @@ -94,7 +94,7 @@ impl Socket {
if err.kind() == io::ErrorKind::WouldBlock
|| err.kind() == io::ErrorKind::Interrupted =>
{
return Ok(None)
return Ok(vec![])
}
Err(err) => {
return Err(err);
Expand All @@ -103,23 +103,27 @@ impl Socket {

if meta.len == 0 {
eprintln!("zero length datagram received?");
return Ok(None);
return Ok(vec![]);
}

if meta.len == self.recv_buf.len() {
eprintln!(
"Might have received more than {} bytes",
self.recv_buf.len()
);
}

Ok(Some(Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
&self.recv_buf[..meta.len],
)))
Ok(self.recv_buf[0..meta.len]
.chunks(meta.stride.min(self.recv_buf.len()))
.map(|d| {
Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
d,
)
})
.collect())
}
}

Expand Down Expand Up @@ -149,6 +153,8 @@ mod tests {
let received_datagram = receiver
.recv(&receiver_addr)
.expect("receive to succeed")
.into_iter()
.next()
.expect("receive to yield datagram");

// Assert that the ECN is correct.
Expand All @@ -159,4 +165,58 @@ mod tests {

Ok(())
}

/// Expect [`Socket::recv`] to handle multiple [`Datagram`]s on GRO read.
#[tokio::test]
#[cfg_attr(not(any(target_os = "linux", target_os = "windows")), ignore)]
async fn many_datagrams_through_gro() -> Result<(), io::Error> {
const SEGMENT_SIZE: usize = 128;

let sender = Socket::bind("127.0.0.1:0")?;
let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut receiver = Socket::bind(receiver_addr)?;

// `neqo_common::udp::Socket::send` does not yet
// (https://github.com/mozilla/neqo/issues/1693) support GSO. Use
// `quinn_udp` directly.
let max_gso_segments = sender.state.max_gso_segments();
let msg = vec![0xAB; SEGMENT_SIZE * max_gso_segments];
let transmit = Transmit {
destination: receiver.local_addr()?,
ecn: EcnCodepoint::from_bits(Into::<u8>::into(IpTos::from((
IpTosDscp::Le,
IpTosEcn::Ect1,
)))),
contents: msg.clone().into(),
segment_size: Some(SEGMENT_SIZE),
src_ip: None,
};
sender.writable().await?;
let n = sender.socket.try_io(Interest::WRITABLE, || {
sender
.state
.send((&sender.socket).into(), slice::from_ref(&transmit))
})?;
assert_eq!(n, 1, "only passed one slice");

// Allow for one GSO sendmmsg to result in multiple GRO recvmmsg.
let mut num_received = 0;
while num_received < max_gso_segments {
receiver.readable().await?;
receiver
.recv(&receiver_addr)
.expect("receive to succeed")
.into_iter()
.for_each(|d| {
assert_eq!(
SEGMENT_SIZE,
d.len(),
"Expect received datagrams to have same length as sent datagrams."
);
num_received += 1;
});
}

Ok(())
}
}
8 changes: 5 additions & 3 deletions neqo-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,11 +686,13 @@ impl ServersRunner {
match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgram = socket.recv(host)?;
if dgram.is_none() {
let dgrams = socket.recv(host)?;
if dgrams.is_empty() {
break;
}
self.process(dgram.as_ref()).await?;
for dgram in dgrams {
self.process(Some(&dgram)).await?;
}
},
Ready::Timeout => {
self.timeout = None;
Expand Down

0 comments on commit a56e092

Please sign in to comment.