Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bench(udp): run GSO, GRO and recvmmsg permutations #2010

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
$HOME/.cargo/bin/rustc --version
echo "~~~~ freebsd-version ~~~~"
freebsd-version
run: $HOME/.cargo/bin/cargo build --all-targets && $HOME/.cargo/bin/cargo test && $HOME/.cargo/bin/cargo test --manifest-path fuzz/Cargo.toml
run: $HOME/.cargo/bin/cargo build --all-targets && $HOME/.cargo/bin/cargo test && $HOME/.cargo/bin/cargo test --manifest-path fuzz/Cargo.toml && $HOME/.cargo/bin/cargo test --benches
test:
strategy:
matrix:
Expand All @@ -54,6 +54,7 @@ jobs:
- run: cargo test
- run: cargo test --manifest-path fuzz/Cargo.toml
if: ${{ matrix.rust }} == "stable"
- run: cargo test --benches
Comment on lines 54 to +57
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I did not simply extend the existing cargo test call, but instead added an additional cargo test --benches.

cargo test --benches executes some but not all unit tests in addition to all benchmark tests. See also rust-lang/cargo#6454.


test-aws-lc-rs:
runs-on: ubuntu-latest
Expand Down
9 changes: 7 additions & 2 deletions quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ once_cell = { workspace = true }
windows-sys = { workspace = true }

[dev-dependencies]
criterion = "0.5"
criterion = { version = "0.5", default-features = false, features = ["async_tokio"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Linux blocking recvmmsg calls will wait for all iovecs to be filled.

A blocking recvmmsg() call blocks until vlen messages have been
received or until the timeout expires. A nonblocking call reads
as many messages as are available (up to the limit specified by
vlen) and returns immediately.

https://man7.org/linux/man-pages/man2/recvmmsg.2.html

This is e.g. problematic when combined with a GSO send with less segments than iovecs in its recvmmsg counterpart.

Thus this pull request changes the benchmark to use non-blocking instead of blocking recvmmsg. To do so, it uses tokio.

See also discussion in #1993 (comment).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest you keep a separate commit that makes the change from sync to async.


[target.'cfg(any(target_os = "linux", target_os = "windows"))'.bench]
[lib]
# See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options
bench = false

[[bench]]
name = "throughput"
harness = false
136 changes: 92 additions & 44 deletions quinn-udp/benches/throughput.rs
Original file line number Diff line number Diff line change
@@ -1,76 +1,124 @@
use criterion::{criterion_group, criterion_main, Criterion};
use quinn_udp::{RecvMeta, Transmit, UdpSocketState};
use std::cmp::min;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::{io::IoSliceMut, net::UdpSocket, slice};
use quinn_udp::{RecvMeta, Transmit, UdpSocketState, BATCH_SIZE};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: would be good to do StdExternalCrate import blocks here.

use std::{
cmp::min,
io::{ErrorKind, IoSliceMut},
net::{Ipv4Addr, Ipv6Addr, UdpSocket},
};
use tokio::io::Interest;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: for consistency, there should be one import statement for all tokio imports.

use tokio::runtime::Runtime;

const MAX_IP_UDP_HEADER_SIZE: usize = 48;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: constants should go near the bottom.

const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize - MAX_IP_UDP_HEADER_SIZE;
Comment on lines +11 to +12
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linux allows for up to 64KiB per GSO and GRO call. That includes IP and UDP header.


pub fn criterion_benchmark(c: &mut Criterion) {
const TOTAL_BYTES: usize = 10 * 1024 * 1024;
// Maximum GSO buffer size is 64k.
const MAX_BUFFER_SIZE: usize = u16::MAX as usize;
const SEGMENT_SIZE: usize = 1280;

let send = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0))
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
.unwrap();
let recv = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0))
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
.unwrap();
let max_segments = min(
UdpSocketState::new((&send).into())
.unwrap()
.max_gso_segments(),
MAX_BUFFER_SIZE / SEGMENT_SIZE,
);
let dst_addr = recv.local_addr().unwrap();
let send_state = UdpSocketState::new((&send).into()).unwrap();
let recv_state = UdpSocketState::new((&recv).into()).unwrap();
// Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy
recv.set_nonblocking(false).unwrap();
let mut rt = Runtime::new().unwrap();
let (send_socket, send_state) = new_socket(&mut rt);
let (recv_socket, recv_state) = new_socket(&mut rt);
let dst_addr = recv_socket.local_addr().unwrap();

let mut receive_buffer = vec![0; MAX_BUFFER_SIZE];
let mut meta = RecvMeta::default();
let mut permutations = vec![];
for gso_enabled in [
false,
#[cfg(any(target_os = "linux", target_os = "windows"))]
true,
Comment on lines +26 to +27
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other platforms don't support GSO.

Needs to add apple platforms once #1993 merges.

] {
for gro_enabled in [false, true] {
#[cfg(target_os = "windows")]
if gso_enabled && !gro_enabled {
// Windows requires receive buffer to fit entire datagram on GRO
// enabled socket.
//
// OS error: "A message sent on a datagram socket was larger
// than the internal message buffer or some other network limit,
// or the buffer used to receive a datagram into was smaller
// than the datagram itself."
continue;
}

for gso_enabled in [false, true] {
let mut group = c.benchmark_group(format!("gso_{}", gso_enabled));
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64));
for recvmmsg_enabled in [false, true] {
permutations.push((gso_enabled, gro_enabled, recvmmsg_enabled));
}
}
}

let segments = if gso_enabled { max_segments } else { 1 };
let msg = vec![0xAB; SEGMENT_SIZE * segments];
for (gso_enabled, gro_enabled, recvmmsg_enabled) in permutations {
let mut group = c.benchmark_group(format!(
"gso_{}_gro_{}_recvmmsg_{}",
gso_enabled, gro_enabled, recvmmsg_enabled
));
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64));

let gso_segments = if gso_enabled {
send_state.max_gso_segments()
} else {
1
};
let msg = vec![0xAB; min(MAX_DATAGRAM_SIZE, SEGMENT_SIZE * gso_segments)];
let transmit = Transmit {
destination: dst_addr,
ecn: None,
contents: &msg,
segment_size: gso_enabled.then_some(SEGMENT_SIZE),
src_ip: None,
};
let gro_segments = if gro_enabled {
recv_state.gro_segments()
} else {
1
};
let batch_size = if recvmmsg_enabled { BATCH_SIZE } else { 1 };

group.bench_function("throughput", |b| {
b.iter(|| {
b.to_async(Runtime::new().unwrap()).iter(|| async {
let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * gro_segments]; batch_size];
let mut receive_slices = receive_buffers
.iter_mut()
.map(|buf| IoSliceMut::new(buf))
.collect::<Vec<_>>();
let mut meta = vec![RecvMeta::default(); batch_size];

let mut sent: usize = 0;
let mut received: usize = 0;
while sent < TOTAL_BYTES {
send_state.send((&send).into(), &transmit).unwrap();
send_socket.writable().await.unwrap();
send_socket
.try_io(Interest::WRITABLE, || {
send_state.send((&send_socket).into(), &transmit)
})
.unwrap();
sent += transmit.contents.len();

let mut received_segments = 0;
while received_segments < segments {
let n = recv_state
.recv(
(&recv).into(),
&mut [IoSliceMut::new(&mut receive_buffer)],
slice::from_mut(&mut meta),
)
.unwrap();
assert_eq!(n, 1);
received_segments += meta.len / meta.stride;
while received < sent {
recv_socket.readable().await.unwrap();
let n = match recv_socket.try_io(Interest::READABLE, || {
recv_state.recv((&recv_socket).into(), &mut receive_slices, &mut meta)
}) {
Ok(n) => n,
// recv.readable() can lead to false positives. Try again.
Err(e) if e.kind() == ErrorKind::WouldBlock => continue,
e => e.unwrap(),
Comment on lines +101 to +103
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UdpSocket::readable is allowed to return false-positives.

The function may complete without the socket being readable. This is a false-positive and attempting a try_recv() will return with io::ErrorKind::WouldBlock.

https://docs.rs/tokio/latest/tokio/net/struct.UdpSocket.html#method.readable

};
received += meta.iter().map(|m| m.len).take(n).sum::<usize>();
}
assert_eq!(received_segments, segments);
}
})
});
}
}

fn new_socket(rt: &mut Runtime) -> (tokio::net::UdpSocket, UdpSocketState) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the extraction of this function in a separate commit.

let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0))
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
.unwrap();

let state = UdpSocketState::new((&socket).into()).unwrap();
let socket = rt.block_on(async { tokio::net::UdpSocket::from_std(socket).unwrap() });
(socket, state)
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);