diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 8457d1ee5..4ec2d5e2d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -30,7 +30,7 @@ jobs: $HOME/.cargo/bin/rustc --version echo "~~~~ freebsd-version ~~~~" freebsd-version - run: $HOME/.cargo/bin/cargo build --all-targets && $HOME/.cargo/bin/cargo test && $HOME/.cargo/bin/cargo test --manifest-path fuzz/Cargo.toml + run: $HOME/.cargo/bin/cargo build --all-targets && $HOME/.cargo/bin/cargo test && $HOME/.cargo/bin/cargo test --manifest-path fuzz/Cargo.toml && $HOME/.cargo/bin/cargo test --benches test: strategy: matrix: @@ -54,6 +54,7 @@ jobs: - run: cargo test - run: cargo test --manifest-path fuzz/Cargo.toml if: ${{ matrix.rust }} == "stable" + - run: cargo test --benches test-aws-lc-rs: runs-on: ubuntu-latest diff --git a/quinn-udp/Cargo.toml b/quinn-udp/Cargo.toml index 257ee0015..b7790001a 100644 --- a/quinn-udp/Cargo.toml +++ b/quinn-udp/Cargo.toml @@ -30,8 +30,13 @@ once_cell = { workspace = true } windows-sys = { workspace = true } [dev-dependencies] -criterion = "0.5" +criterion = { version = "0.5", default-features = false, features = ["async_tokio"] } +tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net"] } -[target.'cfg(any(target_os = "linux", target_os = "windows"))'.bench] +[lib] +# See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options +bench = false + +[[bench]] name = "throughput" harness = false diff --git a/quinn-udp/benches/throughput.rs b/quinn-udp/benches/throughput.rs index c19a9bc68..d60721269 100644 --- a/quinn-udp/benches/throughput.rs +++ b/quinn-udp/benches/throughput.rs @@ -1,43 +1,63 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use quinn_udp::{RecvMeta, Transmit, UdpSocketState}; -use std::cmp::min; -use std::net::{Ipv4Addr, Ipv6Addr}; -use std::{io::IoSliceMut, net::UdpSocket, slice}; +use quinn_udp::{RecvMeta, Transmit, UdpSocketState, BATCH_SIZE}; +use std::{ + cmp::min, + io::{ErrorKind, IoSliceMut}, + net::{Ipv4Addr, Ipv6Addr, UdpSocket}, +}; +use tokio::io::Interest; +use tokio::runtime::Runtime; + +const MAX_IP_UDP_HEADER_SIZE: usize = 48; +const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize - MAX_IP_UDP_HEADER_SIZE; pub fn criterion_benchmark(c: &mut Criterion) { const TOTAL_BYTES: usize = 10 * 1024 * 1024; - // Maximum GSO buffer size is 64k. - const MAX_BUFFER_SIZE: usize = u16::MAX as usize; const SEGMENT_SIZE: usize = 1280; - let send = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) - .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) - .unwrap(); - let recv = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) - .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) - .unwrap(); - let max_segments = min( - UdpSocketState::new((&send).into()) - .unwrap() - .max_gso_segments(), - MAX_BUFFER_SIZE / SEGMENT_SIZE, - ); - let dst_addr = recv.local_addr().unwrap(); - let send_state = UdpSocketState::new((&send).into()).unwrap(); - let recv_state = UdpSocketState::new((&recv).into()).unwrap(); - // Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy - recv.set_nonblocking(false).unwrap(); + let mut rt = Runtime::new().unwrap(); + let (send_socket, send_state) = new_socket(&mut rt); + let (recv_socket, recv_state) = new_socket(&mut rt); + let dst_addr = recv_socket.local_addr().unwrap(); - let mut receive_buffer = vec![0; MAX_BUFFER_SIZE]; - let mut meta = RecvMeta::default(); + let mut permutations = vec![]; + for gso_enabled in [ + false, + #[cfg(any(target_os = "linux", target_os = "windows"))] + true, + ] { + for gro_enabled in [false, true] { + #[cfg(target_os = "windows")] + if gso_enabled && !gro_enabled { + // Windows requires receive buffer to fit entire datagram on GRO + // enabled socket. + // + // OS error: "A message sent on a datagram socket was larger + // than the internal message buffer or some other network limit, + // or the buffer used to receive a datagram into was smaller + // than the datagram itself." + continue; + } - for gso_enabled in [false, true] { - let mut group = c.benchmark_group(format!("gso_{}", gso_enabled)); - group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64)); + for recvmmsg_enabled in [false, true] { + permutations.push((gso_enabled, gro_enabled, recvmmsg_enabled)); + } + } + } - let segments = if gso_enabled { max_segments } else { 1 }; - let msg = vec![0xAB; SEGMENT_SIZE * segments]; + for (gso_enabled, gro_enabled, recvmmsg_enabled) in permutations { + let mut group = c.benchmark_group(format!( + "gso_{}_gro_{}_recvmmsg_{}", + gso_enabled, gro_enabled, recvmmsg_enabled + )); + group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64)); + let gso_segments = if gso_enabled { + send_state.max_gso_segments() + } else { + 1 + }; + let msg = vec![0xAB; min(MAX_DATAGRAM_SIZE, SEGMENT_SIZE * gso_segments)]; let transmit = Transmit { destination: dst_addr, ecn: None, @@ -45,32 +65,60 @@ pub fn criterion_benchmark(c: &mut Criterion) { segment_size: gso_enabled.then_some(SEGMENT_SIZE), src_ip: None, }; + let gro_segments = if gro_enabled { + recv_state.gro_segments() + } else { + 1 + }; + let batch_size = if recvmmsg_enabled { BATCH_SIZE } else { 1 }; group.bench_function("throughput", |b| { - b.iter(|| { + b.to_async(Runtime::new().unwrap()).iter(|| async { + let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * gro_segments]; batch_size]; + let mut receive_slices = receive_buffers + .iter_mut() + .map(|buf| IoSliceMut::new(buf)) + .collect::>(); + let mut meta = vec![RecvMeta::default(); batch_size]; + let mut sent: usize = 0; + let mut received: usize = 0; while sent < TOTAL_BYTES { - send_state.send((&send).into(), &transmit).unwrap(); + send_socket.writable().await.unwrap(); + send_socket + .try_io(Interest::WRITABLE, || { + send_state.send((&send_socket).into(), &transmit) + }) + .unwrap(); sent += transmit.contents.len(); - let mut received_segments = 0; - while received_segments < segments { - let n = recv_state - .recv( - (&recv).into(), - &mut [IoSliceMut::new(&mut receive_buffer)], - slice::from_mut(&mut meta), - ) - .unwrap(); - assert_eq!(n, 1); - received_segments += meta.len / meta.stride; + while received < sent { + recv_socket.readable().await.unwrap(); + let n = match recv_socket.try_io(Interest::READABLE, || { + recv_state.recv((&recv_socket).into(), &mut receive_slices, &mut meta) + }) { + Ok(n) => n, + // recv.readable() can lead to false positives. Try again. + Err(e) if e.kind() == ErrorKind::WouldBlock => continue, + e => e.unwrap(), + }; + received += meta.iter().map(|m| m.len).take(n).sum::(); } - assert_eq!(received_segments, segments); } }) }); } } +fn new_socket(rt: &mut Runtime) -> (tokio::net::UdpSocket, UdpSocketState) { + let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) + .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) + .unwrap(); + + let state = UdpSocketState::new((&socket).into()).unwrap(); + let socket = rt.block_on(async { tokio::net::UdpSocket::from_std(socket).unwrap() }); + (socket, state) +} + criterion_group!(benches, criterion_benchmark); criterion_main!(benches);