Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Faster UDP/IO on Apple platforms #1993

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
aa19b97
feat: Faster UDP/IO on Apple platforms
larseggert Sep 20, 2024
c36d954
`no_main` on non-Apple platforms
larseggert Sep 20, 2024
adf614d
Empty main on non-Apple platforms
larseggert Sep 20, 2024
c8dba51
Use `libc::msghdr` and `libc::iovec` on non-Apple platforms
larseggert Sep 20, 2024
c642dbb
Static bindings
larseggert Sep 23, 2024
0cc5f78
Undo
larseggert Sep 23, 2024
a5523fd
`sendmsg_x`
larseggert Sep 23, 2024
450b754
No panic
larseggert Sep 23, 2024
de86e7f
fmt
larseggert Sep 23, 2024
8979a14
Merge branch 'main' into feat-apple-datapath
larseggert Sep 23, 2024
fec6063
Many fixes
larseggert Sep 23, 2024
83ae3e7
bench fix
larseggert Sep 23, 2024
05dc111
Fix typo
larseggert Sep 23, 2024
f0ff0d7
Remove commented-out code
larseggert Sep 23, 2024
ff73229
Address review comment
larseggert Sep 23, 2024
8df7cab
Remove comment
larseggert Sep 23, 2024
10cf609
Make sure ECN works
larseggert Sep 24, 2024
ef51f1f
Address review comments
larseggert Sep 24, 2024
2c92fb0
Suggestion from @mxinden
larseggert Sep 24, 2024
2c2355a
Merge branch 'main' into feat-apple-datapath
larseggert Sep 24, 2024
8e88f60
Undo
larseggert Sep 25, 2024
f1b8448
Merge branch 'feat-apple-datapath' of github.com:larseggert/quinn int…
larseggert Sep 25, 2024
3b5ed21
Update quinn-udp/benches/throughput.rs
larseggert Oct 8, 2024
c69940e
Update quinn-udp/src/unix.rs
larseggert Oct 8, 2024
04be034
Update quinn-udp/src/unix.rs
larseggert Oct 8, 2024
155708e
Fixes
larseggert Oct 8, 2024
15e0a66
Merge branch 'main' into feat-apple-datapath
larseggert Oct 8, 2024
d191358
Add `fast-apple-datapath` feature
larseggert Oct 8, 2024
5d9677d
Use `cfg_aliases` to simplify the code
larseggert Oct 9, 2024
25db226
Nits
larseggert Oct 9, 2024
137422d
`bsd` macro
larseggert Oct 9, 2024
c10487a
fmt
larseggert Oct 9, 2024
32edcd7
Merge branch 'main' into feat-apple-datapath
larseggert Oct 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ windows-sys = { workspace = true }
[dev-dependencies]
criterion = "0.5"

[target.'cfg(any(target_os = "linux", target_os = "windows"))'.bench]
[lib]
# See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options
bench = false

[[bench]]
name = "throughput"
harness = false
22 changes: 12 additions & 10 deletions quinn-udp/benches/throughput.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use criterion::{criterion_group, criterion_main, Criterion};
use quinn_udp::{RecvMeta, Transmit, UdpSocketState};
use std::cmp::min;
use std::{io::IoSliceMut, net::UdpSocket, slice};
use std::{io::IoSliceMut, net::UdpSocket};

pub fn criterion_benchmark(c: &mut Criterion) {
const TOTAL_BYTES: usize = 10 * 1024 * 1024;
Expand All @@ -27,8 +27,13 @@ pub fn criterion_benchmark(c: &mut Criterion) {
// Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy
recv.set_nonblocking(false).unwrap();

let mut receive_buffer = vec![0; MAX_BUFFER_SIZE];
let mut meta = RecvMeta::default();
let gro_segments = UdpSocketState::new((&send).into()).unwrap().gro_segments();
let mut receive_buffers = vec![[0; SEGMENT_SIZE]; gro_segments];
let mut receive_slices = receive_buffers
.iter_mut()
.map(|buf| IoSliceMut::new(buf))
.collect::<Vec<_>>();
let mut meta = vec![RecvMeta::default(); gro_segments];
larseggert marked this conversation as resolved.
Show resolved Hide resolved

for gso_enabled in [false, true] {
let mut group = c.benchmark_group(format!("gso_{}", gso_enabled));
Expand All @@ -55,14 +60,11 @@ pub fn criterion_benchmark(c: &mut Criterion) {
let mut received_segments = 0;
while received_segments < segments {
let n = recv_state
.recv(
(&recv).into(),
&mut [IoSliceMut::new(&mut receive_buffer)],
slice::from_mut(&mut meta),
)
.recv((&recv).into(), &mut receive_slices, &mut meta)
.unwrap();
assert_eq!(n, 1);
received_segments += meta.len / meta.stride;
for i in meta.iter().take(n) {
received_segments += i.len / i.stride;
}
}
assert_eq!(received_segments, segments);
}
Expand Down
28 changes: 28 additions & 0 deletions quinn-udp/src/cmsg/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@ impl MsgHdr for libc::msghdr {
}
}

#[cfg(any(target_os = "macos", target_os = "ios"))]
impl MsgHdr for crate::imp::msghdr_x {
type ControlMessage = libc::cmsghdr;

fn cmsg_first_hdr(&self) -> *mut Self::ControlMessage {
let selfp = self as *const _ as *mut libc::msghdr;
unsafe { libc::CMSG_FIRSTHDR(selfp) }
}

fn cmsg_nxt_hdr(&self, cmsg: &Self::ControlMessage) -> *mut Self::ControlMessage {
let selfp = self as *const _ as *mut libc::msghdr;
unsafe { libc::CMSG_NXTHDR(selfp, cmsg) }
}

fn set_control_len(&mut self, len: usize) {
self.msg_controllen = len as _;
if len == 0 {
// netbsd is particular about this being a NULL pointer if there are no control
// messages.
larseggert marked this conversation as resolved.
Show resolved Hide resolved
self.msg_control = std::ptr::null_mut();
}
}

fn control_len(&self) -> usize {
self.msg_controllen as _
}
}

/// Helpers for [`libc::cmsghdr`]
impl CMsgHdr for libc::cmsghdr {
fn cmsg_len(length: usize) -> usize {
Expand Down
194 changes: 170 additions & 24 deletions quinn-udp/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,38 @@ use super::{
IO_ERROR_LOG_INTERVAL,
};

// Adapted from https://github.com/apple-oss-distributions/xnu/blob/main/bsd/sys/socket.h
#[cfg(any(target_os = "macos", target_os = "ios"))]
#[repr(C)]
#[allow(non_camel_case_types)]
pub(crate) struct msghdr_x {
pub msg_name: *mut std::ffi::c_void,
larseggert marked this conversation as resolved.
Show resolved Hide resolved
pub msg_namelen: libc::socklen_t,
pub msg_iov: *mut libc::iovec,
pub msg_iovlen: std::ffi::c_int,
pub msg_control: *mut std::ffi::c_void,
pub msg_controllen: libc::socklen_t,
pub msg_flags: std::ffi::c_int,
pub msg_datalen: usize,
}

#[cfg(any(target_os = "macos", target_os = "ios"))]
extern "C" {
larseggert marked this conversation as resolved.
Show resolved Hide resolved
fn recvmsg_x(
s: std::ffi::c_int,
msgp: *const msghdr_x,
cnt: std::ffi::c_uint,
flags: std::ffi::c_int,
) -> isize;

fn sendmsg_x(
s: std::ffi::c_int,
msgp: *const msghdr_x,
cnt: std::ffi::c_uint,
flags: std::ffi::c_int,
) -> isize;
}

// Defined in netinet6/in6.h on OpenBSD, this is not yet exported by the libc crate
// directly. See https://github.com/rust-lang/libc/issues/3704 for when we might be able to
// rely on this from the libc crate.
Expand Down Expand Up @@ -325,12 +357,63 @@ fn send(
}
}

#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "openbsd",
target_os = "netbsd"
))]
#[cfg(any(target_os = "macos", target_os = "ios",))]
fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
let mut iovs = unsafe { mem::zeroed::<[libc::iovec; BATCH_SIZE]>() };
let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
let addr = socket2::SockAddr::from(transmit.destination);
let segment_size = transmit.segment_size.unwrap_or(transmit.contents.len());
let cnt = transmit
.contents
.chunks(segment_size)
.enumerate()
.map(|(i, chunk)| {
prepare_msg(
&Transmit {
destination: transmit.destination,
ecn: transmit.ecn,
contents: chunk,
segment_size: Some(chunk.len()),
src_ip: transmit.src_ip,
},
&addr,
&mut hdrs[i],
&mut iovs[i],
&mut ctrls[i],
true,
state.sendmsg_einval(),
);
larseggert marked this conversation as resolved.
Show resolved Hide resolved
hdrs[i].msg_datalen = chunk.len();
})
.count();
let n = unsafe { sendmsg_x(io.as_raw_fd(), hdrs.as_ptr(), cnt as u32, 0) };
if n == -1 {
larseggert marked this conversation as resolved.
Show resolved Hide resolved
let e = io::Error::last_os_error();
match e.kind() {
io::ErrorKind::Interrupted => {
// Retry the transmission
}
io::ErrorKind::WouldBlock => return Err(e),
_ => {
// Other errors are ignored, since they will usually be handled
// by higher level retransmits and timeouts.
// - PermissionDenied errors have been observed due to iptable rules.
// Those are not fatal errors, since the
// configuration can be dynamically changed.
// - Destination unreachable errors have been observed for other
// - EMSGSIZE is expected for MTU probes. Future work might be able to avoid
// these by automatically clamping the MTUD upper bound to the interface MTU.
if e.raw_os_error() != Some(libc::EMSGSIZE) {
log_sendmsg_error(&state.last_send_error, e, transmit);
}
}
}
}
Ok(())
}

#[cfg(any(target_os = "openbsd", target_os = "netbsd"))]
fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
let mut iov: libc::iovec = unsafe { mem::zeroed() };
Expand All @@ -342,10 +425,7 @@ fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io:
&mut hdr,
&mut iov,
&mut ctrl,
cfg!(target_os = "macos")
|| cfg!(target_os = "ios")
|| cfg!(target_os = "openbsd")
|| cfg!(target_os = "netbsd"),
true,
state.sendmsg_einval(),
);
let n = unsafe { libc::sendmsg(io.as_raw_fd(), &hdr, 0) };
Expand Down Expand Up @@ -418,12 +498,42 @@ fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) ->
Ok(msg_count as usize)
}

#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "openbsd",
target_os = "solaris",
))]
#[cfg(any(target_os = "macos", target_os = "ios"))]
fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
let mut ctrls = [cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); BATCH_SIZE];
let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
let max_msg_count = bufs.len().min(BATCH_SIZE);
for i in 0..max_msg_count {
prepare_recv(&mut bufs[i], &mut names[i], &mut ctrls[i], &mut hdrs[i]);
}
let msg_count = loop {
let n = unsafe {
recvmsg_x(
io.as_raw_fd(),
hdrs.as_mut_ptr(),
bufs.len().min(BATCH_SIZE) as _,
0,
)
};
match n {
-1 => {
let e = io::Error::last_os_error();
if e.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(e);
}
n => break n,
}
};
for i in 0..(msg_count as usize) {
meta[i] = decode_recv(&names[i], &hdrs[i], hdrs[i].msg_datalen as usize);
}
Ok(msg_count as usize)
}

#[cfg(any(target_os = "openbsd", target_os = "solaris",))]
fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
let mut name = MaybeUninit::<libc::sockaddr_storage>::uninit();
let mut ctrl = cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit());
Expand Down Expand Up @@ -452,7 +562,8 @@ const CMSG_LEN: usize = 88;
fn prepare_msg(
transmit: &Transmit<'_>,
dst_addr: &socket2::SockAddr,
hdr: &mut libc::msghdr,
#[cfg(not(any(target_os = "macos", target_os = "ios")))] hdr: &mut libc::msghdr,
#[cfg(any(target_os = "macos", target_os = "ios"))] hdr: &mut msghdr_x,
iov: &mut libc::iovec,
ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
#[allow(unused_variables)] // only used on FreeBSD & macOS
Expand Down Expand Up @@ -542,6 +653,7 @@ fn prepare_msg(
encoder.finish();
}

#[cfg(not(any(target_os = "macos", target_os = "ios")))]
fn prepare_recv(
buf: &mut IoSliceMut,
name: &mut MaybeUninit<libc::sockaddr_storage>,
Expand All @@ -557,9 +669,27 @@ fn prepare_recv(
hdr.msg_flags = 0;
}

#[cfg(any(target_os = "macos", target_os = "ios"))]
fn prepare_recv(
buf: &mut IoSliceMut,
name: &mut MaybeUninit<libc::sockaddr_storage>,
ctrl: &mut cmsg::Aligned<MaybeUninit<[u8; CMSG_LEN]>>,
hdr: &mut msghdr_x,
) {
hdr.msg_name = name.as_mut_ptr() as _;
hdr.msg_namelen = mem::size_of::<libc::sockaddr_storage>() as _;
hdr.msg_iov = buf as *mut IoSliceMut as *mut libc::iovec;
hdr.msg_iovlen = 1;
hdr.msg_control = ctrl.0.as_mut_ptr() as _;
hdr.msg_controllen = CMSG_LEN as _;
hdr.msg_flags = 0;
hdr.msg_datalen = buf.len();
}

fn decode_recv(
name: &MaybeUninit<libc::sockaddr_storage>,
hdr: &libc::msghdr,
#[cfg(not(any(target_os = "macos", target_os = "ios")))] hdr: &libc::msghdr,
#[cfg(any(target_os = "macos", target_os = "ios"))] hdr: &msghdr_x,
len: usize,
) -> RecvMeta {
let name = unsafe { name.assume_init() };
Expand Down Expand Up @@ -654,13 +784,9 @@ fn decode_recv(
}
}

#[cfg(not(any(target_os = "macos", target_os = "ios")))]
// Chosen somewhat arbitrarily; might benefit from additional tuning.
pub(crate) const BATCH_SIZE: usize = 32;

#[cfg(any(target_os = "macos", target_os = "ios"))]
pub(crate) const BATCH_SIZE: usize = 1;

#[cfg(target_os = "linux")]
mod gso {
use super::*;
Expand Down Expand Up @@ -690,7 +816,18 @@ mod gso {
}
}

#[cfg(not(target_os = "linux"))]
#[cfg(any(target_os = "macos", target_os = "ios"))]
mod gso {
use super::*;

pub(super) fn max_gso_segments() -> usize {
BATCH_SIZE
}
larseggert marked this conversation as resolved.
Show resolved Hide resolved

pub(super) fn set_segment_size(_encoder: &mut cmsg::Encoder<msghdr_x>, _segment_size: u16) {}
}

#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "ios")))]
mod gso {
use super::*;

Expand Down Expand Up @@ -770,7 +907,16 @@ fn set_socket_option(

const OPTION_ON: libc::c_int = 1;

#[cfg(not(target_os = "linux"))]
#[cfg(any(target_os = "macos", target_os = "ios"))]
mod gro {
use super::BATCH_SIZE;

pub(super) fn gro_segments() -> usize {
BATCH_SIZE
larseggert marked this conversation as resolved.
Show resolved Hide resolved
}
}

#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "ios")))]
mod gro {
pub(super) fn gro_segments() -> usize {
1
Expand Down