Skip to content

Commit

Permalink
udp: add send and make sendmsg address optional (#233)
Browse files Browse the repository at this point in the history
Because sendmsg(2) can be used on connected sockets without msg_name.

UDP send_to remains the same.
UDP send is added. Does not incur the overhead of a Box on the app side, and may be more efficient on the kernel side too.

UDP sendmsg gets a parameter change. The address is now an `Option`. Passing `None` means the io_uring sendmsg is sent without a msg_name, so the socket should have been connected first, as with the `send`.
  • Loading branch information
redbaron authored Feb 13, 2023
1 parent 96ad633 commit e05ca59
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 17 deletions.
22 changes: 16 additions & 6 deletions src/io/send_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,39 @@ pub(crate) struct SendTo<T> {
#[allow(dead_code)]
io_slices: Vec<IoSlice<'static>>,
#[allow(dead_code)]
socket_addr: Box<SockAddr>,
socket_addr: Option<Box<SockAddr>>,
pub(crate) msghdr: Box<libc::msghdr>,
}

impl<T: BoundedBuf> Op<SendTo<T>> {
pub(crate) fn send_to(
fd: &SharedFd,
buf: T,
socket_addr: SocketAddr,
socket_addr: Option<SocketAddr>,
) -> io::Result<Op<SendTo<T>>> {
use io_uring::{opcode, types};

let io_slices = vec![IoSlice::new(unsafe {
std::slice::from_raw_parts(buf.stable_ptr(), buf.bytes_init())
})];

let socket_addr = Box::new(SockAddr::from(socket_addr));

let mut msghdr: Box<libc::msghdr> = Box::new(unsafe { std::mem::zeroed() });
msghdr.msg_iov = io_slices.as_ptr() as *mut _;
msghdr.msg_iovlen = io_slices.len() as _;
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
msghdr.msg_namelen = socket_addr.len();

let socket_addr = match socket_addr {
Some(_socket_addr) => {
let socket_addr = Box::new(SockAddr::from(_socket_addr));
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
msghdr.msg_namelen = socket_addr.len();
Some(socket_addr)
}
None => {
msghdr.msg_name = std::ptr::null_mut();
msghdr.msg_namelen = 0;
None
}
};

CONTEXT.with(|x| {
x.handle().expect("Not in a runtime context").submit_op(
Expand Down
22 changes: 16 additions & 6 deletions src/io/sendmsg_zc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub(crate) struct SendMsgZc<T, U> {
#[allow(dead_code)]
io_bufs: Vec<T>,
#[allow(dead_code)]
socket_addr: Box<SockAddr>,
socket_addr: Option<Box<SockAddr>>,
msg_control: Option<U>,
msghdr: libc::msghdr,

Expand All @@ -25,13 +25,11 @@ impl<T: BoundedBuf, U: BoundedBuf> Op<SendMsgZc<T, U>, MultiCQEFuture> {
pub(crate) fn sendmsg_zc(
fd: &SharedFd,
io_bufs: Vec<T>,
socket_addr: SocketAddr,
socket_addr: Option<SocketAddr>,
msg_control: Option<U>,
) -> io::Result<Self> {
use io_uring::{opcode, types};

let socket_addr = Box::new(SockAddr::from(socket_addr));

let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() };

let mut io_slices: Vec<IoSlice> = Vec::with_capacity(io_bufs.len());
Expand All @@ -44,8 +42,20 @@ impl<T: BoundedBuf, U: BoundedBuf> Op<SendMsgZc<T, U>, MultiCQEFuture> {

msghdr.msg_iov = io_slices.as_ptr() as *mut _;
msghdr.msg_iovlen = io_slices.len() as _;
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
msghdr.msg_namelen = socket_addr.len();

let socket_addr = match socket_addr {
Some(_socket_addr) => {
let socket_addr = Box::new(SockAddr::from(_socket_addr));
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
msghdr.msg_namelen = socket_addr.len();
Some(socket_addr)
}
None => {
msghdr.msg_name = std::ptr::null_mut();
msghdr.msg_namelen = 0;
None
}
};

match msg_control {
Some(ref _msg_control) => {
Expand Down
4 changes: 2 additions & 2 deletions src/io/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Socket {
pub(crate) async fn send_to<T: BoundedBuf>(
&self,
buf: T,
socket_addr: SocketAddr,
socket_addr: Option<SocketAddr>,
) -> crate::BufResult<usize, T> {
let op = Op::send_to(&self.fd, buf, socket_addr).unwrap();
op.await
Expand All @@ -150,7 +150,7 @@ impl Socket {
pub(crate) async fn sendmsg_zc<T: BoundedBuf, U: BoundedBuf>(
&self,
io_slices: Vec<T>,
socket_addr: SocketAddr,
socket_addr: Option<SocketAddr>,
msg_control: Option<U>,
) -> (io::Result<usize>, Vec<T>, Option<U>) {
let op = Op::sendmsg_zc(&self.fd, io_slices, socket_addr, msg_control).unwrap();
Expand Down
24 changes: 21 additions & 3 deletions src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ use std::{
///
/// assert_eq!(b"hello world", &buf[..n_bytes]);
///
/// // write data using send on connected socket
/// let (result, _) = socket.send(b"hello world via send".as_slice()).await;
/// result.unwrap();
///
/// // read data
/// let (result, buf) = other_socket.read(buf).await;
/// let n_bytes = result.unwrap();
///
/// assert_eq!(b"hello world via send", &buf[..n_bytes]);
///
/// Ok(())
/// })
/// }
Expand Down Expand Up @@ -198,6 +208,13 @@ impl UdpSocket {
self.inner.connect(SockAddr::from(socket_addr)).await
}

/// Sends data on the connected socket
///
/// On success, returns the number of bytes written.
pub async fn send<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
self.inner.send_to(buf, None).await
}

/// Sends data on the socket to the given address.
///
/// On success, returns the number of bytes written.
Expand All @@ -206,7 +223,7 @@ impl UdpSocket {
buf: T,
socket_addr: SocketAddr,
) -> crate::BufResult<usize, T> {
self.inner.send_to(buf, socket_addr).await
self.inner.send_to(buf, Some(socket_addr)).await
}

/// Sends data on the socket. Will attempt to do so without intermediate copies.
Expand Down Expand Up @@ -242,11 +259,12 @@ impl UdpSocket {
/// > notification overhead. As a result, zero copy is generally only effective
/// > at writes over around 10 KB.
///
/// Note: Using fixed buffers [#54](https://github.com/tokio-rs/tokio-uring/pull/54), avoids the page-pinning overhead
/// Can be used with socket_addr: None on connected sockets, which can have performance
/// benefits if multiple datagrams are sent to the same destination address.
pub async fn sendmsg_zc<T: BoundedBuf, U: BoundedBuf>(
&self,
io_slices: Vec<T>,
socket_addr: SocketAddr,
socket_addr: Option<SocketAddr>,
msg_control: Option<U>,
) -> (io::Result<usize>, Vec<T>, Option<U>) {
self.inner
Expand Down

0 comments on commit e05ca59

Please sign in to comment.