diff --git a/src/buf/bufgroup/mod.rs b/src/buf/bufgroup/mod.rs new file mode 100644 index 00000000..bcd3c119 --- /dev/null +++ b/src/buf/bufgroup/mod.rs @@ -0,0 +1,144 @@ +//! The io_uring device implements several provided-buffering mechanisms, which are all called +//! buffer groups in the liburing man pages. +//! +//! Buffer groups share a few things in common: +//! o all provide a mechanism to seed the kernel with userland buffers for use in various +//! read operations +//! o all use a u16 Buffer Group ID +//! o all use a u16 Buffer ID +//! o all are specified in the read or receive operations by setting +//! the IOSQE_BUFFER_SELECT bit in the sqe flags field and +//! then identifying the buffer group id in the sqe buf_group field +//! o all read or receive operations that used a buffer group have +//! the IORING_CQE_F_BUFFER bit set in the cqe flags field and +//! the buffer id chosen in the upper 16 bits of the cqe res field +//! +//! As of Oct 2022, the latest buffer group mechanism implemented by the io_uring device, and the +//! one that promises the best performance with least amount of overhead, is the buf_ring. The +//! buf_ring has several liburing man pages, the first to reference should probably be +//! io_uring_buf_ring_init(3). + +use crate::buf::bufring::BufRing; + +/// The buffer group ID. +/// +/// The creater of a buffer group is responsible for picking a buffer group id +/// that does not conflict with other buffer group ids also being registered with the uring +/// interface. +pub(crate) type Bgid = u16; + +// Future: Maybe create a bgid module with a trivial implementation of a type that tracks the next +// bgid to use. The crate's driver could do that perhaps, but there could be a benefit to tracking +// them across multiple thread's drivers. So there is flexibility in not building it into the +// driver. + +/// The buffer ID. Buffer ids are assigned and used by the crate and probably are not visible +/// to the crate user. +pub(crate) type Bid = u16; + +/// This tracks a buffer that has been filled in by the kernel, having gotten the memory +/// from a buffer ring, and returned to userland via a cqe entry. +pub struct BufX { + bgroup: BufRing, + bid: Bid, + len: usize, +} + +impl BufX { + // # Safety + // + // The bid must be the buffer id supplied by the kernel as having been chosen and written to. + // The length of the buffer must represent the length written to by the kernel. + pub(crate) unsafe fn new(bgroup: BufRing, bid: Bid, len: usize) -> Self { + // len will already have been checked against the buf_capacity + // so it is guaranteed that len <= bgroup.buf_capacity. + + Self { bgroup, bid, len } + } + + /// Return the number of bytes initialized. + /// + /// This value initially came from the kernel, as reported in the cqe. This value may have been + /// modified with a call to the IoBufMut::set_init method. + #[inline] + pub fn len(&self) -> usize { + self.len + } + + /// Return true if this represents an empty buffer. The length reported by the kernel was 0. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Return the capacity of this buffer. + #[inline] + pub fn cap(&self) -> usize { + self.bgroup.buf_capacity(self.bid) + } + + /// Return a byte slice reference. + #[inline] + pub fn as_slice(&self) -> &[u8] { + let p = self.bgroup.stable_ptr(self.bid); + // Safety: the pointer returned by stable_ptr is valid for the lifetime of self, + // and self's len is set when the kernel reports the amount of data that was + // written into the buffer. + unsafe { std::slice::from_raw_parts(p, self.len) } + } + + /// Return a mutable byte slice reference. + #[inline] + pub fn as_slice_mut(&mut self) -> &mut [u8] { + let p = self.bgroup.stable_mut_ptr(self.bid); + // Safety: the pointer returned by stable_mut_ptr is valid for the lifetime of self, + // and self's len is set when the kernel reports the amount of data that was + // written into the buffer. In addition, we hold a &mut reference to self. + unsafe { std::slice::from_raw_parts_mut(p, self.len) } + } + + // Future: provide access to the uninit space between len and cap if the buffer is being + // repurposed before being dropped. The set_init below does that too. +} + +impl Drop for BufX { + fn drop(&mut self) { + // Add the buffer back to the bgroup, for the kernel to reuse. + // Safety: this function may only be called by the buffer's drop function. + unsafe { self.bgroup.dropping_bid(self.bid) }; + } +} + +unsafe impl crate::buf::IoBuf for BufX { + fn stable_ptr(&self) -> *const u8 { + self.bgroup.stable_ptr(self.bid) + } + + fn bytes_init(&self) -> usize { + self.len + } + + fn bytes_total(&self) -> usize { + self.cap() + } +} + +unsafe impl crate::buf::IoBufMut for BufX { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.bgroup.stable_mut_ptr(self.bid) + } + + unsafe fn set_init(&mut self, init_len: usize) { + if self.len < init_len { + let cap = self.bgroup.buf_capacity(self.bid); + assert!(init_len <= cap); + self.len = init_len; + } + } +} + +impl From for Vec { + fn from(item: BufX) -> Self { + item.as_slice().to_vec() + } +} diff --git a/src/buf/bufring/mod.rs b/src/buf/bufring/mod.rs new file mode 100644 index 00000000..e10dd71a --- /dev/null +++ b/src/buf/bufring/mod.rs @@ -0,0 +1,17 @@ +//! A buf_ring pool of buffers registered with the kernel. +//! +//! This module provides the [`BufRing`] and [`Builder`] to allow +//! using the `buf_ring` feature of the kernel's `io_uring` device. +//! +//! The [`BufRing`] is this library's only implementation of the device's more general `Provided +//! Buffers` feature where some device operations can work with buffers that had been provided to +//! the device at an earlier point, rather than as part of the operation itself. +//! +//! Operations like [`crate::net::TcpStream::recv_provbuf`] make use of the `buf_ring`. This +//! operation does not take a buffer as input, but does return a buffer when successful. Once the +//! buffer is dropped, it is returned to the `buf_ring`. + +pub(crate) mod ring; + +pub use ring::BufRing; +pub use ring::Builder; diff --git a/src/buf/bufring/ring.rs b/src/buf/bufring/ring.rs new file mode 100644 index 00000000..0d0f1e81 --- /dev/null +++ b/src/buf/bufring/ring.rs @@ -0,0 +1,846 @@ +//! Module for the io_uring device's buf_ring feature. + +// Developer's note about io_uring return codes when a buf_ring is used: +// +// While a buf_ring pool is exhaused, new calls to read that are, or are not, ready to read will +// fail with the 105 error, "no buffers", while existing calls that were waiting to become ready to +// read will not fail. Only when the data becomes ready to read will they fail, if the buffer ring +// is still empty at that time. This makes sense when thinking about it from how the kernel +// implements the start of a read command; it can be confusing when first working with these +// commands from the userland perspective. + +// While the file! calls yield the clippy false positive. +#![allow(clippy::print_literal)] + +use io_uring::types; +use std::cell::Cell; +use std::io; +use std::rc::Rc; +use std::sync::atomic::{self, AtomicU16}; + +use super::super::bufgroup::{Bgid, Bid, BufX}; +use crate::runtime::CONTEXT; + +/// A `BufRing` represents the ring and the buffers used with the kernel's io_uring buf_ring +/// feature. +/// +/// In this implementation, it is both the ring of buffer entries and the actual buffer +/// allocations. +/// +/// A BufRing is created through the [`Builder`] and can be registered automatically by the +/// builder's `build` step or at a later time by the user. Registration involves informing the +/// kernel of the ring's dimensions and its identifier (its buffer group id, which goes by the name +/// `bgid`). +/// +/// Multiple buf_rings, here multiple BufRings, can be created and registered. BufRings are +/// reference counted to ensure their memory is live while their BufX buffers are live. When a BufX +/// buffer is dropped, it releases itself back to the BufRing from which it came allowing it to be +/// reused by the kernel. +/// +/// It is perhaps worth pointing out that it is the ring itself that is registered with the kernel, +/// not the buffers per se. While a given buf_ring cannot have it size changed dynamically, the +/// buffers that are pushed to the ring by userland, and later potentially re-pushed in the ring, +/// can change. The buffers can be of different sizes and they could come from different allocation +/// blocks. This implementation does not provide that flexibility. Each BufRing comes with its own +/// equal length buffer allocation. And when a BufRing buffer, a BufX, is dropped, its id is pushed +/// back to the ring. +/// +/// This is the one and only `Provided Buffers` implementation in `tokio_uring` at the moment and +/// in this version, is a purely concrete type, with a concrete BufX type for buffers that are +/// returned by operations like `recv_provbuf` to the userland application. +/// +/// Aside from the register and unregister steps, there are no syscalls used to pass buffers to the +/// kernel. The ring contains a tail memory address that this userland type updates as buffers are +/// added to the ring and which the kernel reads when it needs to pull a buffer from the ring. The +/// kernel does not have a head pointer address that it updates for the userland. The userland +/// (this type), is expected to avoid overwriting the head of the circular ring by keeping track of +/// how many buffers were added to the ring and how many have been returned through the CQE +/// mechanism. This particular implementation does not track the count because all buffers are +/// allocated at the beginning, by the builder, and only its own buffers that came back via a CQE +/// are ever added back to the ring, so it should be impossible to overflow the ring. +#[derive(Clone, Debug)] +pub struct BufRing { + // RawBufRing uses cell for fields where necessary. + raw: Rc, +} + +// Methods for the user. + +impl BufRing { + /// Registers the buf_ring manually. + /// + /// This is not normally called because the builder defaults to registering the ring when it + /// builds the ring. This is provided for the case where a BufRing is being built before the + /// tokio_uring runtime has started. + pub fn register(&self) -> io::Result<()> { + self.raw.register() + } + + /// Unregisters the buf_ring manually. + /// + /// This not normally called because the drop mechanism will unregister the ring if it had not + /// already been unregistered. + /// + /// This function makes it possible to unregister a ring while some of its BufX buffers may + /// still be live. This does not result in UB. It just means the io_uring device will not have + /// this buf_ring pool to draw from. Put another way, the step of unregistering the ring does + /// not deallocate the buffers. + pub fn unregister(&self) -> io::Result<()> { + self.raw.unregister() + } + + /// (Very experimental and should probably be behind a cfg option.) + /// + /// Returns the lowest level buffer pool size that has been observed. It cannot be accurate + /// because it cannot take into account in-flight operations that may draw from the pool. + /// + /// This might be useful when running the system and trying to decide if the pool was sized + /// correctly. Maintaining this value does come with a small overhead. + #[allow(dead_code)] + pub fn possible_min(&self) -> u16 { + self.raw.possible_min() + } + + /// (Very experimental and should probably be behind a cfg option.) + /// + /// Like `possible_min` but also resets the metric to the total number of buffers that had been + /// allocated. + #[allow(dead_code)] + pub fn possible_min_and_reset(&self) -> u16 { + self.raw.possible_min_and_reset() + } +} + +// Methods the BufX needs. + +impl BufRing { + pub(crate) fn buf_capacity(&self, _: Bid) -> usize { + self.raw.buf_capacity_i() + } + + pub(crate) fn stable_ptr(&self, bid: Bid) -> *const u8 { + // Will panic if bid is out of range. + self.raw.stable_ptr_i(bid) + } + + pub(crate) fn stable_mut_ptr(&mut self, bid: Bid) -> *mut u8 { + // Safety: self is &mut, we're good. + unsafe { self.raw.stable_mut_ptr_i(bid) } + } + + // # Safety + // + // `dropping_bid` should only be called by the buffer's drop function because once called, the + // buffer may be given back to the kernel for reuse. + pub(crate) unsafe fn dropping_bid(&self, bid: Bid) { + self.raw.dropping_bid_i(bid); + } +} + +// Methods the io operations need. + +impl BufRing { + pub(crate) fn bgid(&self) -> Bgid { + self.raw.bgid() + } + + // # Safety + // + // The res and flags values are used to lookup a buffer and set its initialized length. + // The caller is responsible for these being correct. This is expected to be called + // when these two values are received from the kernel via a CQE and we rely on the kernel to + // give us correct information. + pub(crate) unsafe fn get_buf(&self, res: u32, flags: u32) -> io::Result> { + let bid = match io_uring::cqueue::buffer_select(flags) { + Some(bid) => bid, + None => { + // Have seen res == 0, flags == 4 with a TCP socket. res == 0 we take to mean the + // socket is empty so return None to show there is no buffer returned, which should + // be interpreted to mean there is no more data to read from this file or socket. + if res == 0 { + return Ok(None); + } + + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "BufRing::get_buf failed as the buffer bit, IORING_CQE_F_BUFFER, was missing from flags, res = {}, flags = {}", + res, flags) + )); + } + }; + + let len = res as usize; + + /* + let flags = flags & !io_uring::sys::IORING_CQE_F_BUFFER; // for tracing flags + println!( + "{}:{}: get_buf res({res})=len({len}) flags({:#x})->bid({bid})\n\n", + file!(), + line!(), + flags + ); + */ + + assert!(len <= self.raw.buf_len); + + // TODO maybe later + // #[cfg(any(debug, feature = "cautious"))] + // { + // let mut debug_bitmap = self.debug_bitmap.borrow_mut(); + // let m = 1 << (bid % 8); + // assert!(debug_bitmap[(bid / 8) as usize] & m == m); + // debug_bitmap[(bid / 8) as usize] &= !m; + // } + + self.raw.metric_getting_another(); + /* + println!( + "{}:{}: get_buf cur {}, min {}", + file!(), + line!(), + self.possible_cur.get(), + self.possible_min.get(), + ); + */ + + // Safety: the len provided to BufX::new is given to us from the kernel. + Ok(Some(unsafe { BufX::new(self.clone(), bid, len) })) + } +} + +#[derive(Debug, Copy, Clone)] +/// Build the arguments to call build() that returns a [`BufRing`]. +/// +/// Refer to the methods descriptions for details. +#[allow(dead_code)] +pub struct Builder { + page_size: usize, + bgid: Bgid, + ring_entries: u16, + buf_cnt: u16, + buf_len: usize, + buf_align: usize, + ring_pad: usize, + bufend_align: usize, + + skip_register: bool, +} + +#[allow(dead_code)] +impl Builder { + /// Create a new Builder with the given buffer group ID and defaults. + /// + /// The buffer group ID, `bgid`, is the id the kernel's io_uring device uses to identify the + /// provided buffer pool to use by operations that are posted to the device. + /// + /// The user is responsible for picking a bgid that does not conflict with other buffer groups + /// that have been registered with the same uring interface. + pub fn new(bgid: Bgid) -> Builder { + Builder { + page_size: 4096, + bgid, + ring_entries: 128, + buf_cnt: 0, + buf_len: 4096, + buf_align: 0, + ring_pad: 0, + bufend_align: 0, + skip_register: false, + } + } + + /// The page size of the kernel. Defaults to 4096. + /// + /// The io_uring device requires the BufRing is allocated on the start of a page, i.e. with a + /// page size alignment. + /// + /// The caller should determine the page size, and may want to cache the info if multiple buf + /// rings are to be created. Crates are available to get this information or the user may want + /// to call the libc sysconf directly: + /// + /// use libc::{_SC_PAGESIZE, sysconf}; + /// let page_size: usize = unsafe { sysconf(_SC_PAGESIZE) as usize }; + pub fn page_size(mut self, page_size: usize) -> Builder { + self.page_size = page_size; + self + } + + /// The number of ring entries to create for the buffer ring. + /// + /// This defaults to 128 or the `buf_cnt`, whichever is larger. + /// + /// The number will be made a power of 2, and will be the maximum of the ring_entries setting + /// and the buf_cnt setting. The interface will enforce a maximum of 2^15 (32768) so it can do + /// rollover calculation. + /// + /// Each ring entry is 16 bytes. + pub fn ring_entries(mut self, ring_entries: u16) -> Builder { + self.ring_entries = ring_entries; + self + } + + /// The number of buffers to allocate. If left zero, the ring_entries value will be used and + /// that value defaults to 128. + pub fn buf_cnt(mut self, buf_cnt: u16) -> Builder { + self.buf_cnt = buf_cnt; + self + } + + /// The length of each allocated buffer. Defaults to 4096. + /// + /// Non-alignment values are possible and `buf_align` can be used to allocate each buffer on + /// an alignment buffer, even if the buffer length is not desired to equal the alignment. + pub fn buf_len(mut self, buf_len: usize) -> Builder { + self.buf_len = buf_len; + self + } + + /// The alignment of the first buffer allocated. + /// + /// Generally not needed. + /// + /// The buffers are allocated right after the ring unless `ring_pad` is used and generally the + /// buffers are allocated contiguous to one another unless the `buf_len` is set to something + /// different. + pub fn buf_align(mut self, buf_align: usize) -> Builder { + self.buf_align = buf_align; + self + } + + /// Pad to place after ring to ensure separation between rings and first buffer. + /// + /// Generally not needed but may be useful if the ring's end and the buffers' start are to have + /// some separation, perhaps for cacheline reasons. + pub fn ring_pad(mut self, ring_pad: usize) -> Builder { + self.ring_pad = ring_pad; + self + } + + /// The alignment of the end of the buffer allocated. To keep other things out of a cache line + /// or out of a page, if that's desired. + pub fn bufend_align(mut self, bufend_align: usize) -> Builder { + self.bufend_align = bufend_align; + self + } + + /// Skip automatic registration. The caller can manually invoke the buf_ring.register() + /// function later. Regardless, the unregister() method will be called automatically when the + /// BufRing goes out of scope if the caller hadn't manually called buf_ring.unregister() + /// already. + pub fn skip_auto_register(mut self, skip: bool) -> Builder { + self.skip_register = skip; + self + } + + /// Return a BufRing, having computed the layout for the single aligned allocation + /// of both the buffer ring elements and the buffers themselves. + /// + /// If auto_register was left enabled, register the BufRing with the driver. + pub fn build(&self) -> io::Result { + let mut b: Builder = *self; + + // Two cases where both buf_cnt and ring_entries are set to the max of the two. + if b.buf_cnt == 0 || b.ring_entries < b.buf_cnt { + let max = std::cmp::max(b.ring_entries, b.buf_cnt); + b.buf_cnt = max; + b.ring_entries = max; + } + + // Don't allow the next_power_of_two calculation to be done if already larger than 2^15 + // because 2^16 reads back as 0 in a u16. And the interface doesn't allow for ring_entries + // larger than 2^15 anyway, so this is a good place to catch it. Here we return a unique + // error that is more descriptive than the InvalidArg that would come from the interface. + if b.ring_entries > (1 << 15) { + return Err(io::Error::new( + io::ErrorKind::Other, + "ring_entries exceeded 32768", + )); + } + + // Requirement of the interface is the ring entries is a power of two, making its and our + // mask calculation trivial. + b.ring_entries = b.ring_entries.next_power_of_two(); + + Ok(BufRing { + raw: Rc::new(RawBufRing::new(NewArgs { + page_size: b.page_size, + bgid: b.bgid, + ring_entries: b.ring_entries, + buf_cnt: b.buf_cnt, + buf_len: b.buf_len, + buf_align: b.buf_align, + ring_pad: b.ring_pad, + bufend_align: b.bufend_align, + auto_register: !b.skip_register, + })?), + }) + } +} + +// Trivial helper struct for this module. +struct NewArgs { + page_size: usize, + bgid: Bgid, + ring_entries: u16, + buf_cnt: u16, + buf_len: usize, + buf_align: usize, + ring_pad: usize, + bufend_align: usize, + auto_register: bool, +} + +#[derive(Debug)] +struct RawBufRing { + bgid: Bgid, + + // Keep mask rather than ring size because mask is used often, ring size not. + //ring_entries: u16, // Invariants: > 0, power of 2, max 2^15 (32768). + ring_entries_mask: u16, // Invariant one less than ring_entries which is > 0, power of 2, max 2^15 (32768). + + buf_cnt: u16, // Invariants: > 0, <= ring_entries. + buf_len: usize, // Invariant: > 0. + layout: std::alloc::Layout, + ring_addr: *const types::BufRingEntry, // Invariant: constant. + buffers_addr: *mut u8, // Invariant: constant. + local_tail: Cell, + tail_addr: *const AtomicU16, + registered: Cell, + + // The first `possible` field is a best effort at tracking the current buffer pool usage and + // from that, tracking the lowest level that has been reached. The two are an attempt at + // letting the user check the sizing needs of their buf_ring pool. + // + // We don't really know how deep the uring device has gone into the pool because we never see + // its head value and it can be taking buffers from the ring, in-flight, while we add buffers + // back to the ring. All we know is when a CQE arrives and a buffer lookup is performed, a + // buffer has already been taken from the pool, and when the buffer is dropped, we add it back + // to the the ring and it is about to be considered part of the pool again. + possible_cur: Cell, + possible_min: Cell, + // + // TODO maybe later + // #[cfg(any(debug, feature = "cautious"))] + // debug_bitmap: RefCell>, +} + +impl RawBufRing { + fn new(new_args: NewArgs) -> io::Result { + #[allow(non_upper_case_globals)] + const trace: bool = false; + + let NewArgs { + page_size, + bgid, + ring_entries, + buf_cnt, + buf_len, + buf_align, + ring_pad, + bufend_align, + auto_register, + } = new_args; + + // Check that none of the important args are zero and the ring_entries is at least large + // enough to hold all the buffers and that ring_entries is a power of 2. + + if (buf_cnt == 0) + || (buf_cnt > ring_entries) + || (buf_len == 0) + || ((ring_entries & (ring_entries - 1)) != 0) + { + return Err(io::Error::from(io::ErrorKind::InvalidInput)); + } + + // entry_size is 16 bytes. + let entry_size = std::mem::size_of::(); + let mut ring_size = entry_size * (ring_entries as usize); + if trace { + println!( + "{}:{}: entry_size {} * ring_entries {} = ring_size {} {:#x}", + file!(), + line!(), + entry_size, + ring_entries, + ring_size, + ring_size, + ); + } + + ring_size += ring_pad; + + if trace { + println!( + "{}:{}: after +ring_pad {} ring_size {} {:#x}", + file!(), + line!(), + ring_pad, + ring_size, + ring_size, + ); + } + + if buf_align > 0 { + let buf_align = buf_align.next_power_of_two(); + ring_size = (ring_size + (buf_align - 1)) & !(buf_align - 1); + if trace { + println!( + "{}:{}: after buf_align ring_size {} {:#x}", + file!(), + line!(), + ring_size, + ring_size, + ); + } + } + let buf_size = buf_len * (buf_cnt as usize); + assert!(ring_size != 0); + assert!(buf_size != 0); + let mut tot_size: usize = ring_size + buf_size; + if trace { + println!( + "{}:{}: ring_size {} {:#x} + buf_size {} {:#x} = tot_size {} {:#x}", + file!(), + line!(), + ring_size, + ring_size, + buf_size, + buf_size, + tot_size, + tot_size + ); + } + if bufend_align > 0 { + // for example, if bufend_align is 4096, would make total size a multiple of pages + let bufend_align = bufend_align.next_power_of_two(); + tot_size = (tot_size + (bufend_align - 1)) & !(bufend_align - 1); + if trace { + println!( + "{}:{}: after bufend_align tot_size {} {:#x}", + file!(), + line!(), + tot_size, + tot_size, + ); + } + } + + let align: usize = page_size; // alignment must be at least the page size + let align = align.next_power_of_two(); + let layout = std::alloc::Layout::from_size_align(tot_size, align).unwrap(); + + assert!(layout.size() >= ring_size); + // Safety: we are assured layout has nonzero size, we passed the assert just above. + let ring_addr: *mut u8 = unsafe { std::alloc::alloc_zeroed(layout) }; + + // Buffers starts after the ring_size. + // Safety: are we assured the address and the offset are in bounds because the ring_addr is + // the value we got from the alloc call, and the layout.size was shown to be at least as + // large as the ring_size. + let buffers_addr: *mut u8 = unsafe { ring_addr.add(ring_size) }; + if trace { + println!( + "{}:{}: ring_addr {} {:#x}, layout: size {} align {}", + file!(), + line!(), + ring_addr as u64, + ring_addr as u64, + layout.size(), + layout.align() + ); + println!( + "{}:{}: buffers_addr {} {:#x}", + file!(), + line!(), + buffers_addr as u64, + buffers_addr as u64, + ); + } + + let ring_addr: *const types::BufRingEntry = ring_addr as _; + + // Safety: the ring_addr passed into tail is the start of the ring. It is both the start of + // the ring and the first entry in the ring. + let tail_addr = unsafe { types::BufRingEntry::tail(ring_addr) } as *const AtomicU16; + + let ring_entries_mask = ring_entries - 1; + assert!((ring_entries & ring_entries_mask) == 0); + + let buf_ring = RawBufRing { + bgid, + ring_entries_mask, + buf_cnt, + buf_len, + layout, + ring_addr, + buffers_addr, + local_tail: Cell::new(0), + tail_addr, + registered: Cell::new(false), + possible_cur: Cell::new(0), + possible_min: Cell::new(buf_cnt), + // + // TODO maybe later + // #[cfg(any(debug, feature = "cautious"))] + // debug_bitmap: RefCell::new(std::vec![0; ((buf_cnt+7)/8) as usize]), + }; + + // Question had come up: where should the initial buffers be added to the ring? + // Here when the ring is created, even before it is registered potentially? + // Or after registration? + // + // For this type, BufRing, we are adding the buffers to the ring as the last part of creating the BufRing, + // even before registration is optionally performed. + // + // We've seen the registration to be successful, even when the ring starts off empty. + + // Add the buffers here where the ring is created. + + for bid in 0..buf_cnt { + buf_ring.buf_ring_add(bid); + } + buf_ring.buf_ring_sync(); + + // The default is to register the buffer ring right here. There is usually no reason the + // caller should want to register it some time later. + // + // Perhaps the caller wants to allocate the buffer ring before the CONTEXT driver is in + // place - that would be a reason to delay the register call until later. + + if auto_register { + buf_ring.register()?; + } + Ok(buf_ring) + } + + /// Register the buffer ring with the kernel. + /// Normally this is done automatically when building a BufRing. + /// + /// This method must be called in the context of a `tokio-uring` runtime. + /// The registration persists for the lifetime of the runtime, unless + /// revoked by the [`unregister`] method. Dropping the + /// instance this method has been called on does revoke + /// the registration and deallocate the buffer space. + /// + /// [`unregister`]: Self::unregister + /// + /// # Errors + /// + /// If a `Provided Buffers` group with the same `bgid` is already registered, the function + /// returns an error. + fn register(&self) -> io::Result<()> { + let bgid = self.bgid; + //println!("{}:{}: register bgid {bgid}", file!(), line!()); + + // Future: move to separate public function so other buf_ring implementations + // can register, and unregister, the same way. + + let res = CONTEXT.with(|x| { + x.handle() + .as_ref() + .expect("Not in a runtime context") + .register_buf_ring(self.ring_addr as _, self.ring_entries(), bgid) + }); + // println!("{}:{}: res {:?}", file!(), line!(), res); + + if let Err(e) = res { + match e.raw_os_error() { + Some(22) => { + // using buf_ring requires kernel 5.19 or greater. + // TODO turn these eprintln into new, more expressive error being returned. + // TODO what convention should we follow in this crate for adding information + // onto an error? + eprintln!( + "buf_ring.register returned {e}, most likely indicating this kernel is not 5.19+", + ); + } + Some(17) => { + // Registering a duplicate bgid is not allowed. There is an `unregister` + // operations that can remove the first. + eprintln!( + "buf_ring.register returned `{e}`, indicating the attempted buffer group id {bgid} was already registered", + ); + } + _ => { + eprintln!("buf_ring.register returned `{e}` for group id {bgid}"); + } + } + return Err(e); + }; + + self.registered.set(true); + + res + } + + /// Unregister the buffer ring from the io_uring. + /// Normally this is done automatically when the BufRing goes out of scope. + /// + /// Warning: requires the CONTEXT driver is already in place or will panic. + fn unregister(&self) -> io::Result<()> { + // If not registered, make this a no-op. + if !self.registered.get() { + return Ok(()); + } + + self.registered.set(false); + + let bgid = self.bgid; + + // If there is no context, bail out with an Ok(()) because the registration and + // the entire io_uring is already done anyway. + CONTEXT.with(|x| { + x.handle() + .as_ref() + .map_or(Ok(()), |handle| handle.unregister_buf_ring(bgid)) + }) + } + + /// Returns the buffer group id. + #[inline] + fn bgid(&self) -> Bgid { + self.bgid + } + + fn metric_getting_another(&self) { + self.possible_cur.set(self.possible_cur.get() - 1); + self.possible_min.set(std::cmp::min( + self.possible_min.get(), + self.possible_cur.get(), + )); + } + + // # Safety + // + // Dropping a duplicate bid is likely to cause undefined behavior + // as the kernel uses the same buffer for different data concurrently. + unsafe fn dropping_bid_i(&self, bid: Bid) { + self.buf_ring_add(bid); + self.buf_ring_sync(); + } + + #[inline] + fn buf_capacity_i(&self) -> usize { + self.buf_len as _ + } + + #[inline] + // # Panic + // + // This function will panic if given a bid that is not within the valid range 0..self.buf_cnt. + fn stable_ptr_i(&self, bid: Bid) -> *const u8 { + assert!(bid < self.buf_cnt); + let offset: usize = self.buf_len * (bid as usize); + // Safety: buffers_addr is an u8 pointer and was part of an allocation large enough to hold + // buf_cnt number of buf_len buffers. buffers_addr, buf_cnt and buf_len are treated as + // constants and bid was just asserted to be less than buf_cnt. + unsafe { self.buffers_addr.add(offset) } + } + + // # Safety + // + // This may only be called by an owned or &mut object. + // + // # Panic + // This will panic if bid is out of range. + #[inline] + unsafe fn stable_mut_ptr_i(&self, bid: Bid) -> *mut u8 { + assert!(bid < self.buf_cnt); + let offset: usize = self.buf_len * (bid as usize); + // Safety: buffers_addr is an u8 pointer and was part of an allocation large enough to hold + // buf_cnt number of buf_len buffers. buffers_addr, buf_cnt and buf_len are treated as + // constants and bid was just asserted to be less than buf_cnt. + self.buffers_addr.add(offset) + } + + #[inline] + fn ring_entries(&self) -> u16 { + self.ring_entries_mask + 1 + } + + #[inline] + fn mask(&self) -> u16 { + self.ring_entries_mask + } + + // Writes to a ring entry and updates our local copy of the tail. + // + // Adds the buffer known by its buffer id to the buffer ring. The buffer's address and length + // are known given its bid. + // + // This does not sync the new tail value. The caller should use `buf_ring_sync` for that. + // + // Panics if the bid is out of range. + fn buf_ring_add(&self, bid: Bid) { + // Compute address of current tail position, increment the local copy of the tail. Then + // write the buffer's address, length and bid into the current tail entry. + + let cur_tail = self.local_tail.get(); + self.local_tail.set(cur_tail.wrapping_add(1)); + let ring_idx = cur_tail & self.mask(); + + let ring_addr = self.ring_addr as *mut types::BufRingEntry; + + // Safety: + // 1. the pointer address (ring_addr), is set and const at self creation time, + // and points to a block of memory at least as large as the number of ring_entries, + // 2. the mask used to create ring_idx is one less than + // the number of ring_entries, and ring_entries was tested to be a power of two, + // So the address gotten by adding ring_idx entries to ring_addr is guaranteed to + // be a valid address of a ring entry. + let entry = unsafe { &mut *ring_addr.add(ring_idx as usize) }; + + entry.set_addr(self.stable_ptr_i(bid) as _); + entry.set_len(self.buf_len as _); + entry.set_bid(bid); + + // Update accounting. + self.possible_cur.set(self.possible_cur.get() + 1); + + // TODO maybe later + // #[cfg(any(debug, feature = "cautious"))] + // { + // let mut debug_bitmap = self.debug_bitmap.borrow_mut(); + // let m = 1 << (bid % 8); + // assert!(debug_bitmap[(bid / 8) as usize] & m == 0); + // debug_bitmap[(bid / 8) as usize] |= m; + // } + } + + // Make 'count' new buffers visible to the kernel. Called after + // io_uring_buf_ring_add() has been called 'count' times to fill in new + // buffers. + #[inline] + fn buf_ring_sync(&self) { + // Safety: dereferencing this raw pointer is safe. The tail_addr was computed once at init + // to refer to the tail address in the ring and is held const for self's lifetime. + unsafe { + (*self.tail_addr).store(self.local_tail.get(), atomic::Ordering::Release); + } + // The liburing code did io_uring_smp_store_release(&br.tail, local_tail); + } + + // Return the possible_min buffer pool size. + fn possible_min(&self) -> u16 { + self.possible_min.get() + } + + // Return the possible_min buffer pool size and reset to allow fresh counting going forward. + fn possible_min_and_reset(&self) -> u16 { + let res = self.possible_min.get(); + self.possible_min.set(self.buf_cnt); + res + } +} + +impl Drop for RawBufRing { + fn drop(&mut self) { + if self.registered.get() { + _ = self.unregister(); + } + // Safety: the ptr and layout are treated as constant, and ptr (ring_addr) was assigned by + // a call to std::alloc::alloc_zeroed using the same layout. + unsafe { std::alloc::dealloc(self.ring_addr as *mut u8, self.layout) }; + } +} diff --git a/src/buf/mod.rs b/src/buf/mod.rs index 71ab196c..4cdcc30c 100644 --- a/src/buf/mod.rs +++ b/src/buf/mod.rs @@ -4,6 +4,10 @@ //! crate defines [`IoBuf`] and [`IoBufMut`] traits which are implemented by buffer //! types that respect the `io-uring` contract. +pub(crate) mod bufgroup; + +pub mod bufring; + pub mod fixed; mod io_buf; @@ -26,6 +30,6 @@ pub(crate) fn deref(buf: &impl IoBuf) -> &[u8] { pub(crate) fn deref_mut(buf: &mut impl IoBufMut) -> &mut [u8] { // Safety: the `IoBufMut` trait is marked as unsafe and is expected to be - // implemented correct. + // implemented correctly. unsafe { std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_init()) } } diff --git a/src/io/mod.rs b/src/io/mod.rs index c779c9b8..40d4439e 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -21,8 +21,12 @@ mod read_fixed; mod readv; +mod recv; + mod recv_from; +mod recv_provbuf; + mod recvmsg; mod rename_at; diff --git a/src/io/recv.rs b/src/io/recv.rs new file mode 100644 index 00000000..456fdc47 --- /dev/null +++ b/src/io/recv.rs @@ -0,0 +1,61 @@ +use crate::{ + buf::BoundedBufMut, io::SharedFd, BufResult, OneshotOutputTransform, UnsubmittedOneshot, +}; +use io_uring::cqueue::Entry; +use std::io; +use std::marker::PhantomData; + +/// An unsubmitted recv operation. +pub type UnsubmittedRecv = UnsubmittedOneshot, RecvTransform>; + +#[allow(missing_docs)] +pub struct RecvData { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + _fd: SharedFd, + + buf: T, +} + +#[allow(missing_docs)] +pub struct RecvTransform { + _phantom: PhantomData, +} + +impl OneshotOutputTransform for RecvTransform { + type Output = BufResult; + type StoredData = RecvData; + + fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { + let res = if cqe.result() >= 0 { + Ok(cqe.result() as usize) + } else { + Err(io::Error::from_raw_os_error(-cqe.result())) + }; + + (res, data.buf) + } +} + +impl UnsubmittedRecv { + pub(crate) fn recv(fd: &SharedFd, mut buf: T, flags: Option) -> Self { + use io_uring::{opcode, types}; + + // Get raw buffer info + let ptr = buf.stable_mut_ptr(); + let len = buf.bytes_init(); + + Self::new( + RecvData { + _fd: fd.clone(), + buf, + }, + RecvTransform { + _phantom: PhantomData::default(), + }, + opcode::Recv::new(types::Fd(fd.raw_fd()), ptr, len as _) + .flags(flags.unwrap_or(0)) + .build(), + ) + } +} diff --git a/src/io/recv_provbuf.rs b/src/io/recv_provbuf.rs new file mode 100644 index 00000000..25ceb631 --- /dev/null +++ b/src/io/recv_provbuf.rs @@ -0,0 +1,67 @@ +use crate::{ + buf::{bufgroup::BufX, bufring::BufRing}, + io::SharedFd, + OneshotOutputTransform, UnsubmittedOneshot, +}; +use io_uring::{cqueue::Entry, squeue}; +use std::io; + +/// An unsubmitted recv_provbuf operation. +pub type UnsubmittedRecvProvBuf = UnsubmittedOneshot; + +#[allow(missing_docs)] +pub struct RecvProvBufData { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + _fd: SharedFd, + + /// The bufgroup that supplies the bgid and the get_buf function. + group: BufRing, +} + +#[allow(missing_docs)] +pub struct RecvProvBufTransform { + // _phantom: PhantomData, +} + +impl OneshotOutputTransform for RecvProvBufTransform { + type Output = Result, io::Error>; + type StoredData = RecvProvBufData; + + fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { + let res = if cqe.result() >= 0 { + cqe.result() as u32 + } else { + return Err(io::Error::from_raw_os_error(-cqe.result())); + }; + let flags = cqe.flags(); + + // Safety: getting a buffer from the group requires the res and flags values accurately + // identify a buffer and the length which was written to by the kernel. The res and flags + // passed here are those provided by the kernel. + unsafe { data.group.get_buf(res, flags) } + } +} + +impl UnsubmittedRecvProvBuf { + pub(crate) fn recv_provbuf(fd: &SharedFd, group: BufRing, flags: Option) -> Self { + use io_uring::{opcode, types}; + + let bgid = group.bgid(); + + Self::new( + RecvProvBufData { + _fd: fd.clone(), + group, + }, + RecvProvBufTransform { + // _phantom: PhantomData::default(), + }, + opcode::Recv::new(types::Fd(fd.raw_fd()), std::ptr::null_mut(), 0 as _) + .flags(flags.unwrap_or(0)) + .buf_group(bgid) + .build() + .flags(squeue::Flags::BUFFER_SELECT), + ) + } +} diff --git a/src/io/socket.rs b/src/io/socket.rs index f35de7bf..c278d62f 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -1,3 +1,5 @@ +use crate::io::recv::UnsubmittedRecv; +use crate::io::recv_provbuf::UnsubmittedRecvProvBuf; use crate::io::write::UnsubmittedWrite; use crate::runtime::driver::op::Op; use crate::{ @@ -163,6 +165,18 @@ impl Socket { op.await } + pub(crate) fn recv(&self, buf: T, flags: Option) -> UnsubmittedRecv { + UnsubmittedOneshot::recv(&self.fd, buf, flags) + } + + pub(crate) fn recv_provbuf( + &self, + group: crate::buf::bufring::BufRing, + flags: Option, + ) -> UnsubmittedRecvProvBuf { + UnsubmittedOneshot::recv_provbuf(&self.fd, group, flags) + } + pub(crate) async fn read_fixed(&self, buf: T) -> crate::BufResult where T: BoundedBufMut, diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 2450dcb9..7b4dca2f 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -4,6 +4,7 @@ use std::{ os::unix::prelude::{AsRawFd, FromRawFd, RawFd}, }; +use crate::buf::{bufgroup::BufX, bufring}; use crate::{ buf::fixed::FixedBuf, buf::{BoundedBuf, BoundedBufMut}, @@ -78,6 +79,62 @@ impl TcpStream { self.inner.read(buf).await } + /// Creates a oneshot recv(2) operation using the provided buffer. + /// + /// Returns the original buffer and quantity of data received. + pub async fn recv( + &self, + buf: T, + flags: Option, + ) -> crate::BufResult { + self.inner.recv(buf, flags).submit().await + } + + /// Creates a oneshot recv(2) operation using the provided buf_ring pool. + /// + /// (Experimental: type BufRing and BufX likely to change.) + /// + /// Returns Result\, io::Error> meaning it returns Ok(None) + /// when there is no more data to read. + /// + /// When the buffer goes out of scope, it is returned to the buf_ring pool. + /// + /// Refer to io_uring_prep_recv(3) for a description of 'flags'. + /// + /// # Examples + /// + /// ```no_run + /// use tokio_uring::buf::bufring; + /// use tokio_uring::net::TcpStream; + /// + /// async fn recv_once( + /// stream: TcpStream, + /// group: bufring::BufRing, + /// flags: Option, + /// ) -> Result<(), std::io::Error> { + /// let bufx = match stream.recv_provbuf(group, flags).await { + /// Ok(Some(bufx)) => bufx, + /// Ok(None) => return Ok(()), + /// Err(e) => return Err(e), + /// }; + /// + /// let read = bufx.len(); + /// if read == 0 { + /// unreachable!(); + /// } + /// + /// // use bufx .. + /// Ok(()) + /// } + /// ``` + pub async fn recv_provbuf( + &self, + group: bufring::BufRing, + flags: Option, + ) -> Result, io::Error> { + self.inner.recv_provbuf(group, flags).submit().await + } + /// Read some data from the stream into a registered buffer. /// /// Like [`read`], but using a pre-mapped buffer diff --git a/src/runtime/driver/handle.rs b/src/runtime/driver/handle.rs index 115f780d..9a89fd1b 100644 --- a/src/runtime/driver/handle.rs +++ b/src/runtime/driver/handle.rs @@ -63,6 +63,21 @@ impl Handle { self.inner.borrow_mut().unregister_buffers(buffers) } + pub(crate) fn register_buf_ring( + &self, + ring_addr: u64, + ring_entries: u16, + bgid: u16, + ) -> io::Result<()> { + self.inner + .borrow_mut() + .register_buf_ring(ring_addr, ring_entries, bgid) + } + + pub(crate) fn unregister_buf_ring(&self, bgid: u16) -> io::Result<()> { + self.inner.borrow_mut().unregister_buf_ring(bgid) + } + pub(crate) fn submit_op_2(&self, sqe: squeue::Entry) -> usize { self.inner.borrow_mut().submit_op_2(sqe) } diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index 21d7de0b..eaa83427 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -122,6 +122,30 @@ impl Driver { )) } + pub(crate) fn register_buf_ring( + &mut self, + ring_addr: u64, + ring_entries: u16, + bgid: u16, + ) -> io::Result<()> { + self.uring + .submitter() + .register_buf_ring(ring_addr, ring_entries, bgid)?; + + // TODO should driver keep anything about the buf_ring? + // Perhaps something that maps the bgid to a creator, given a bid coming from the cqe? + // Or will the future that sent the command be able to convert the bid to a buffer pointer? + // And what if the future is dropped? + //self.fixed_buffers = Some(buffers); + Ok(()) + } + + pub(crate) fn unregister_buf_ring(&mut self, bgid: u16) -> io::Result<()> { + self.uring.submitter().unregister_buf_ring(bgid)?; + + Ok(()) + } + pub(crate) fn submit_op_2(&mut self, sqe: squeue::Entry) -> usize { let index = self.ops.insert(); diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 00000000..2c85329e --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,328 @@ +pub mod probe; + +use std::net::SocketAddr; + +use tokio::sync::oneshot; +use tokio::task::JoinSet; +use tokio_uring::buf::bufring; +use tokio_uring::net::{TcpListener, TcpStream}; + +#[derive(Clone)] +pub enum Rx { + Read, + Recv(Option), + RecvBufRing(bufring::BufRing, Option), +} + +pub async fn tcp_listener() -> Result<(TcpListener, SocketAddr), std::io::Error> { + let socket_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + + let listener = TcpListener::bind(socket_addr).unwrap(); + + let socket_addr = listener.local_addr().unwrap(); + + Ok((listener, socket_addr)) +} + +#[inline] +pub fn is_no_buffer_space_available(e: &std::io::Error) -> bool { + e.raw_os_error() == Some(105) +} + +async fn client_ping_pong(rx: Rx, stream: &TcpStream, send_cnt: usize, send_length: usize) { + // Implement client ping-pong loop. Make several read variations available. + + for _ in 0..send_cnt { + // Make this vector longer to cause splits in round trip transmission. + let buf = vec![1u8; send_length]; + + let (result, buf) = stream.write_all(buf).await; + let _result = result.unwrap(); + + let expect = buf.len(); + let mut got: usize = 0; + + let mut buf = buf; + while got < expect { + let result; + + result = match &rx { + Rx::Read => { + let result; + (result, buf) = stream.read(buf).await; + result + } + Rx::Recv(flags) => { + let result; + (result, buf) = stream.recv(buf, *flags).await; + result + } + Rx::RecvBufRing(group, flags) => { + loop { + let buf = stream.recv_provbuf(group.clone(), *flags).await; + match buf { + Ok(Some(buf)) => { + // If returning a Vec were necessary: + // Either form of conversion from Bufx data to Vec could be appropriate here. + // One consumes the BufX, the other doesn't and let's it drop here. + // break (Ok(buf.len()), buf.into()) + // break (Ok(buf.len()), buf.as_slice().to_vec()); + break Ok(buf.len()); + } + Ok(None) => { + // The connection is closed. Report 0 bytes read. + break Ok(0); + } + Err(e) => { + // Expected error: No buffer space available (os error 105) + + // but sometimes getting error indicating the returned res was 0 + // and flags was 4. + if e.kind() == std::io::ErrorKind::Other { + eprintln!( + "client: assuming connection is closed: ecv_provbuf error {}", + e + ); + break Err(e); + } + // Normal for some of the tests cases to cause the bufring to be exhuasted. + if !is_no_buffer_space_available(&e) { + panic!("client: recv_provbuf error {}", e); + } + } + } + } + } + }; + let read = result.unwrap(); + if read == 0 { + panic!( + "read of 0 but expected not yet reached, got {}, expected {}", + got, expect + ); + } + got += read; + // level1-println!("client: read {}", read); + // println!("client: read: {:?}", &_buf[..read]); + } + } +} + +async fn server_ping_pong_reusing_vec( + rx: Rx, + stream: TcpStream, + buf: Vec, + _local_addr: SocketAddr, +) { + use tokio_uring::buf::BoundedBuf; // for slice() + + let mut buf = buf; + // level1-println!("server: {} connected", peer); + let mut _n = 0; + + loop { + let (result, nbuf) = match &rx { + Rx::Read => stream.read(buf).await, + Rx::Recv(flags) => stream.recv(buf, *flags).await, + Rx::RecvBufRing(_, _) => unreachable!(), + }; + buf = nbuf; + let read = result.unwrap(); + if read == 0 { + // level1-println!("server: {} closed, {} total ping-ponged", peer, _n); + break; + } + + let (res, slice) = stream.write_all(buf.slice(..read)).await; + let _ = res.unwrap(); + buf = slice.into_inner(); + // level1-println!("server: {} all {} bytes ping-ponged", peer, read); + _n += read; + } +} + +async fn server_ping_pong_using_buf_ring( + stream: TcpStream, + group: &bufring::BufRing, + flags: Option, + _local_addr: SocketAddr, +) { + // Serve the connection by looping on input, each received bufx from the kernel which + // we let go out of scope when we are done so it can be given back to the kernel. + // + // Here is a completion model based loop, as described in + // https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023 + // where the buffer being written to by the kernel is picked by the kernel from a + // provided buffer pool, and when finished working with the buffer, it is returned to + // the kernel's provided buffer pool. + + let mut _n = 0; + loop { + // Loop to allow trying again if there was no buffer space available. + let bufx = loop { + let buf = stream.recv_provbuf(group.clone(), flags).await; + match buf { + Ok(Some(buf)) => break buf, + Ok(None) => { + // Normal that the client closed its connection and this + // server sees no more data is forthcoming. So the read + // amount was zero, so there was no buffer picked. + return; + } + Err(e) => { + // Expected error: No buffer space available (os error 105), + // for which we loop around. + // + // But sometimes getting error indicating the returned res was 0 + // and flags was 4. Treat this like the connection is closed while + // awaiting confirmation from the io_uring team. + if e.kind() == std::io::ErrorKind::Other { + eprintln!( + "server: assuming connection is closed: recv_provbuf error {}", + e + ); + return; + } + // Normal for some of the tests cases to cause the bufring to be exhuasted. + if !is_no_buffer_space_available(&e) { + panic!("server: recv_provbuf error {}", e); + } + } + } + }; + + // Copy of logic above, but different buffer type. + + let read = bufx.len(); + if read == 0 { + // Unlikely, as the zero case seems handled by the error above. + break; + } + + // Writing bufx or bufx.slice(..read) works equally well as the bufx length *is* + // the length that was read. + // let (res, _) = stream.write_all(bufx.slice(..read)).await; + // let (res, _) = stream.write_all(bufx).await; + // + // The returned _ represents the BufX or slice of the BufX which we let go out of scope. + + let (res, _) = stream.write_all(bufx).await; + + let _ = res.unwrap(); + // level1-println!("server: {} all {} bytes ping-ponged with bufx", peer, read); + _n += read; + } +} + +pub async fn async_block_ping_pong_listener_loop(server: Server, listener: TcpListener) { + let Server { rx } = server; + loop { + let (stream, socket_addr) = listener.accept().await.unwrap(); + let rx = rx.clone(); + + // Spawn new task for each connnection + tokio_uring::spawn(async move { + match &rx { + Rx::Read | Rx::Recv(_) => { + let buf = vec![0u8; 16 * 1024]; + server_ping_pong_reusing_vec(rx, stream, buf, socket_addr).await; + } + Rx::RecvBufRing(group, flags) => { + server_ping_pong_using_buf_ring(stream, group, *flags, socket_addr).await; + } + }; + }); + } +} + +fn spawn_ping_pong_listener_loop(server: Server, listener: TcpListener) { + tokio_uring::spawn(async move { + async_block_ping_pong_listener_loop(server, listener).await; + }); +} + +pub fn ping_pong_clients(clients: Clients, listener_addr: SocketAddr) -> oneshot::Receiver<()> { + // Spawn clients as tokio_uring tasks and return a tokio oneshot receiver + // that will indicate when they are done. + + let Clients { + rx, + client_cnt, + send_cnt, + send_length, + } = clients; + + let mut set = JoinSet::new(); + + // Spawn several clients + + for client_id in 0..client_cnt { + let rx = rx.clone(); + set.spawn_local(async move { + let stream = TcpStream::connect(listener_addr).await.unwrap(); + client_ping_pong(rx, &stream, send_cnt, send_length).await; + + client_id // return through handle + }); + } + + let (tx, rx) = oneshot::channel::<()>(); + + tokio_uring::spawn(async move { + let mut seen = vec![false; client_cnt]; + while let Some(res) = set.join_next().await { + let client_id = res.unwrap(); + seen[client_id] = true; + } + + for i in 0..client_cnt { + assert!(seen[i]); + } + let _ = tx.send(()).unwrap(); + }); + + rx +} + +async fn _ping_pong(clients: Clients, server: Server) { + // Run `client_cnt` clients. Both clients and server use the TcpStream method identified by `rx`. + + let (listener, listener_addr) = tcp_listener().await.unwrap(); + + // Spawn perhaps multiple clients + + let clients_done = ping_pong_clients(clients, listener_addr); + + // Spawn one server + + spawn_ping_pong_listener_loop(server, listener); + + // Wait until the clients signal they are done + + // println!("common/mode.rs:{} now wait for clients to be done", line!()); + let _ = clients_done.await.unwrap(); + // println!("common/mode.rs:{} clients report being done", line!()); +} + +pub struct Clients { + pub rx: Rx, + pub client_cnt: usize, + pub send_cnt: usize, + pub send_length: usize, +} + +pub struct Server { + pub rx: Rx, +} + +pub struct PingPong { + pub clients: Clients, + pub server: Server, +} + +impl PingPong { + pub async fn run(self) { + let PingPong { clients, server } = self; + _ping_pong(clients, server).await; + } +} diff --git a/tests/common/probe.rs b/tests/common/probe.rs new file mode 100644 index 00000000..f9fa5b08 --- /dev/null +++ b/tests/common/probe.rs @@ -0,0 +1,33 @@ +use io_uring::{opcode, IoUring, Probe}; + +pub fn register_probe() -> Option { + let io_uring = match IoUring::new(2) { + Ok(io_uring) => io_uring, + Err(_) => { + return None; + } + }; + let submitter = io_uring.submitter(); + + let mut probe = Probe::new(); + + // register_probe has been available since 5.6. + if submitter.register_probe(&mut probe).is_err() { + return None; + } + return Some(probe); +} + +pub fn is_kernel_minimum_5_19() -> bool { + let Some(register_probe) = register_probe() + else { + return false; + }; + + // IORING_OP_SOCKET was introduced in 5.19. + register_probe.is_supported(opcode::Socket::CODE) +} + +pub fn is_buf_ring_supported() -> bool { + is_kernel_minimum_5_19() +} diff --git a/tests/net.rs b/tests/net.rs new file mode 100644 index 00000000..32d1d505 --- /dev/null +++ b/tests/net.rs @@ -0,0 +1,215 @@ +use std::sync::mpsc::sync_channel; +use std::thread; + +use tokio_uring::buf::bufring; + +mod common; + +use common::probe; +use common::Rx; + +#[test] +fn net_tcp_ping_pong_read_one() { + // Run one client. Both client and server use the TcpStream `read` method. + + tokio_uring::start(async { + common::PingPong { + clients: common::Clients { + rx: Rx::Read, + client_cnt: 1, + send_cnt: 10, + send_length: 1024, + }, + server: common::Server { rx: Rx::Read }, + } + .run() + .await; + }); +} + +#[test] +fn net_tcp_ping_pong_read_several() { + // Run 3 clients. Both clients and server use the TcpStream `read` method. + + tokio_uring::start(async { + common::PingPong { + clients: common::Clients { + rx: Rx::Read, + client_cnt: 3, + send_cnt: 10, + send_length: 1024, + }, + server: common::Server { rx: Rx::Read }, + } + .run() + .await; + }); +} + +#[test] +fn net_tcp_ping_pong_recv() { + // Run 3 clients. Both clients and server use the TcpStream `recv` method. + + tokio_uring::start(async { + common::PingPong { + clients: common::Clients { + rx: Rx::Recv(None), + client_cnt: 3, + send_cnt: 10, + send_length: 1024, + }, + server: common::Server { rx: Rx::Recv(None) }, + } + .run() + .await; + }); +} + +#[test] +fn net_tcp_ping_pong_recv_oneshot_bufring() { + if !probe::is_buf_ring_supported() { + eprintln!("skipping test, buf_ring is not supported in this kernel"); + return; + } + // Run 5 clients. Both clients and server use the TcpStream `recv` method with a BufRing pool + // that is built small enough (4 entries) that there will be some pool exhaustion that has to + // be handled by retrying the requests. + // And a bit oddly, both clients and server are using the same BufRing, as they are all run in + // the same tokio_uring instance. + + tokio_uring::start(async { + let buf_ring = bufring::Builder::new(177) + .ring_entries(4) + .buf_len(4096) + // Normally, no reason to skip the auto register, but this let's us test the manual + // register below. + .skip_auto_register(true) + .build() + .unwrap(); + + buf_ring.register().unwrap(); + + common::PingPong { + clients: common::Clients { + rx: Rx::RecvBufRing(buf_ring.clone(), None), + client_cnt: 3, + send_cnt: 10, + send_length: 1024, + }, + server: common::Server { + rx: Rx::RecvBufRing(buf_ring.clone(), None), + }, + } + .run() + .await; + + // Manually unregistering the buf_ring. When it goes out of scope, it is unregistered + // automatically. Note, it remains in scope while there are outstanding buffers the + // application hasn't dropped yet. + buf_ring.unregister().unwrap(); + }); +} + +#[test] +fn net_tcp_ping_pong_recv_oneshot_bufring_2_threads() { + if !probe::is_buf_ring_supported() { + eprintln!("skipping test, buf_ring is not supported in this kernel"); + return; + } + // Similar to test net_tcp_ping_pong_recv_bufring above, but uses two new threads, + // one for the server code, one for all the clients. + // + // Two std thread syncing methods employed: a sync_channel gets the ephemeral port from one + // thread back to the main thread, and the main thread then is blocked at the end, waiting for + // the clients thread handle to report the clients thread is done. + // + // There is no attempt to shutdown the server thread. + // + // + // Further details: + // + // The server thread starts a tokio_uring runtime, creates a provided buffers buf_ring, + // and listens on a port, spawning tasks to serve the connections being established. All + // server task share the same provided buffer pool buf_ring. + // + // The client thread also starts a tokio_uring runtime, also creates a provided buffers + // buf_ring, and spawns as many client tasks as the constant below dictates. Each client task + // uses its own Vec buffer for writing data but all share the same buf_ring for receiving + // data back from its stream. + // + // Minutia: + // + // The buffer group id, bgid, assigned to each buf_ring, one for the server, one for the + // clients, are in independant spaces, so could have the same value. They are chosen here as + // 261 and 262, respectively, but they could both be 261. They could both be zero for that + // matter. + + use libc::{sysconf, _SC_PAGESIZE}; + let page_size: usize = unsafe { sysconf(_SC_PAGESIZE) as usize }; + + /* + * These yield a test run that takes about 2.8s + const CLIENT_CNT: usize = 32; + const SENDS_PER_CLIENT: usize = 64; + const SEND_LENGTH: usize = 64 * 1024; + const CLIENT_BUFRING_SIZE: u16 = 64; + const SERVER_BUFRING_SIZE: u16 = 64; + */ + const CLIENT_CNT: usize = 4; + const SENDS_PER_CLIENT: usize = 4; + const SEND_LENGTH: usize = 4 * 1024; + const CLIENT_BUFRING_SIZE: u16 = 8; + const SERVER_BUFRING_SIZE: u16 = 8; + const CLIENT_BUF_LEN: usize = 4096; + const SERVER_BUF_LEN: usize = 4096; + + // Used by the thread running the server to pass its ephemeral local port to the thread + let (addr_tx, addr_rx) = sync_channel::(0); + + let _server_handle = thread::spawn(move || { + tokio_uring::start(async { + let buf_ring = bufring::Builder::new(261) + .page_size(page_size) + .ring_entries(SERVER_BUFRING_SIZE) + .buf_len(SERVER_BUF_LEN) + .build() + .unwrap(); + let server = common::Server { + rx: Rx::RecvBufRing(buf_ring.clone(), None), + }; + let (listener, local_addr) = common::tcp_listener().await.unwrap(); + addr_tx.send(local_addr).unwrap(); + + common::async_block_ping_pong_listener_loop(server, listener).await; + }); + }); + + let listener_addr = addr_rx.recv().unwrap(); + + let clients_handle = thread::spawn(move || { + tokio_uring::start(async { + let buf_ring = bufring::Builder::new(262) + .page_size(page_size) + .ring_entries(CLIENT_BUFRING_SIZE as u16) + .buf_len(CLIENT_BUF_LEN) + .build() + .unwrap(); + let clients = common::Clients { + rx: Rx::RecvBufRing(buf_ring.clone(), None), + client_cnt: CLIENT_CNT, + send_cnt: SENDS_PER_CLIENT, + send_length: SEND_LENGTH, + }; + let clients_done = common::ping_pong_clients(clients, listener_addr); + + // Wait for the clients tasks to be done. + + // println!("net.rs:{} now wait for clients to be done", line!()); + let _ = clients_done.await.unwrap(); + // println!("net.rs:{} clients report being done", line!()); + }); + }); + + // Wait for the clients thread to finish. + clients_handle.join().unwrap(); +}