Skip to content

Commit

Permalink
feat: use recvmmsg in addition to GRO
Browse files Browse the repository at this point in the history
Previously we would only do GRO.
  • Loading branch information
mxinden committed Sep 28, 2024
1 parent 1ce5455 commit 382234b
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 127 deletions.
1 change: 0 additions & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,4 @@ jobs:
if: matrix.type == 'debug' && matrix.rust-toolchain == 'stable'

bench:
needs: [check]
uses: ./.github/workflows/bench.yml
2 changes: 1 addition & 1 deletion neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl TryFrom<&State> for CloseState {
impl super::Client for Connection {
fn process_into_buffer<'a>(
&mut self,
input: Option<Datagram<&[u8]>>,
input: Option<impl Iterator<Item = Datagram<&'a [u8]>>>,
now: Instant,
out: &'a mut Vec<u8>,
) -> Output<&'a [u8]> {
Expand Down
4 changes: 2 additions & 2 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ impl super::Client for Http3Client {
self.state().try_into()
}

fn process_into_buffer<'a>(
fn process_into_buffer<'a, 'b>(
&mut self,
input: Option<Datagram<&[u8]>>,
input: Option<impl Iterator<Item = Datagram<&'b [u8]>>>,
now: Instant,
out: &'a mut Vec<u8>,
) -> Output<&'a [u8]> {
Expand Down
15 changes: 8 additions & 7 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ enum CloseState {
trait Client {
fn process_into_buffer<'a>(
&mut self,
input: Option<Datagram<&[u8]>>,
input: Option<impl Iterator<Item = Datagram<&'a [u8]>>>,
now: Instant,
out: &'a mut Vec<u8>,
) -> Output<&'a [u8]>;
Expand Down Expand Up @@ -457,16 +457,17 @@ impl<'a, H: Handler> Runner<'a, H> {

async fn process(&mut self, mut should_read: bool) -> Result<(), io::Error> {
loop {
let dgram = should_read
let dgrams = should_read
.then(|| self.socket.recv(&self.local_addr, &mut self.recv_buf))
.transpose()?
.flatten();
should_read = dgram.is_some();
should_read = dgrams.is_some();

match self
.client
.process_into_buffer(dgram, Instant::now(), &mut self.send_buf)
{
match self.client.process_into_buffer(
dgrams.as_ref().map(|d| d.iter()),
Instant::now(),
&mut self.send_buf,
) {
Output::Datagram(dgram) => {
self.socket.writable().await?;
self.socket.send(dgram)?;
Expand Down
4 changes: 2 additions & 2 deletions neqo-bin/src/server/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ impl super::HttpServer for HttpServer {
&mut self,
dgram: Option<Datagram<&[u8]>>,
now: Instant,
out: &'a mut Vec<u8>,
write_buffer: &'a mut Vec<u8>,
) -> Output<&'a [u8]> {
self.server.process_into_buffer(dgram, now, out)
self.server.process_into_buffer(dgram, now, write_buffer)
}

fn process_events(&mut self, now: Instant) {
Expand Down
83 changes: 46 additions & 37 deletions neqo-bin/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,57 +232,66 @@ impl ServerRunner {
}

async fn process(&mut self, mut socket_inx: Option<usize>) -> Result<(), io::Error> {
loop {
let mut dgram = if let Some(inx) = socket_inx {
// TODO: Cleanup! We should really be passing a set of datagrams to neqo_transport server!
'outer: loop {
let dgrams = if let Some(inx) = socket_inx {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgram = socket.recv(host, &mut self.recv_buf)?;
if dgram.is_none() {
let dgrams = socket.recv(host, &mut self.recv_buf)?;
if dgrams.is_none() {
// Done reading.
socket_inx.take();
}
dgram
dgrams
} else {
None
};

match self
.server
.process_into_buffer(dgram.take(), (self.now)(), &mut self.send_buf)
{
Output::Datagram(dgram) => {
// Find outbound socket. If none match, take the first.
let socket = if let Some(socket) =
self.sockets.iter_mut().find_map(|(_host, socket)| {
let mut dgrams = dgrams.iter().map(|d| d.iter()).flatten().peekable();

'inner: loop {
match self.server.process_into_buffer(
dgrams.next(),
(self.now)(),
&mut self.send_buf,
) {
Output::Datagram(dgram) => {
// Find outbound socket. If none match, take the first.
let socket = if let Some(socket) =
self.sockets.iter_mut().find_map(|(_host, socket)| {
socket
.local_addr()
.ok()
.map_or(false, |socket_addr| socket_addr == dgram.source())
.then_some(socket)
}) {
socket
.local_addr()
.ok()
.map_or(false, |socket_addr| socket_addr == dgram.source())
.then_some(socket)
}) {
socket
} else {
&mut self.sockets.iter_mut().next().unwrap().1
};

socket.writable().await?;
socket.send(dgram)?;
self.send_buf.clear();
continue;
} else {
&mut self.sockets.iter_mut().next().unwrap().1
};

socket.writable().await?;
socket.send(dgram)?;
self.send_buf.clear();
continue 'inner;
}
Output::Callback(new_timeout) => {
qdebug!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
}
Output::None => {}
}
Output::Callback(new_timeout) => {
qdebug!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));

if dgrams.peek().is_some() {
continue 'inner;
}
Output::None => {}
}

if socket_inx.is_none() {
// No socket to read and nothing to write.
break;
if socket_inx.is_some() {
continue 'outer;
}

return Ok(());
}
}

Ok(())
}

// Wait for any of the sockets to be readable or the timeout to fire.
Expand Down
3 changes: 2 additions & 1 deletion neqo-bin/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ impl Socket {

/// Receive a batch of [`Datagram`]s on the given [`Socket`], each set with
/// the provided local address.
// TODO: Option needed?
pub fn recv<'a>(
&self,
local_address: &SocketAddr,
recv_buf: &'a mut Vec<u8>,
) -> Result<Option<Datagram<&'a [u8]>>, io::Error> {
) -> Result<Option<neqo_udp::Datagrams<'a>>, io::Error> {
self.inner
.try_io(tokio::io::Interest::READABLE, || {
neqo_udp::recv_inner(local_address, &self.state, &self.inner, recv_buf)
Expand Down
12 changes: 7 additions & 5 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,18 +854,20 @@ impl Http3Client {
.stats(&mut self.conn)
}

pub fn process_into_buffer<'a>(
pub fn process_into_buffer<'a, 'b>(
&mut self,
input: Option<Datagram<&[u8]>>,
input: Option<impl Iterator<Item = Datagram<&'b [u8]>>>,
now: Instant,
out: &'a mut Vec<u8>,
) -> Output<&'a [u8]> {
qtrace!([self], "Process.");
if let Some(d) = input {
self.conn.process_input(d, now);
self.conn.process_input_2(d, now);
}
self.process_http3(now);
let out = self.conn.process_into_buffer(None, now, out);
let out = self
.conn
.process_into_buffer(None::<std::iter::Once<Datagram<&[u8]>>>, now, out);
self.process_http3(now);
out
}
Expand All @@ -879,7 +881,7 @@ impl Http3Client {
/// new [`Vec`].
pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
let mut out = vec![];
self.process_into_buffer(dgram.map(Into::into), now, &mut out)
self.process_into_buffer(dgram.map(Into::into).map(std::iter::once), now, &mut out)
.map_datagram(Into::into)
}

Expand Down
52 changes: 35 additions & 17 deletions neqo-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,18 @@ impl Connection {

/// Process new input datagrams on the connection.
pub fn process_input<'a>(&mut self, d: impl Into<Datagram<&'a [u8]>>, now: Instant) {
self.input(d.into(), now, now);
self.input(std::iter::once(d.into()), now, now);
self.process_saved(now);
self.streams.cleanup_closed_streams();
}

/// Process new input datagrams on the connection.
pub fn process_input_2<'a, 'b>(
&mut self,
d: impl Iterator<Item = Datagram<&'b [u8]>>,
now: Instant,
) {
self.input(d, now, now);
self.process_saved(now);
self.streams.cleanup_closed_streams();
}
Expand Down Expand Up @@ -1156,7 +1167,7 @@ impl Connection {
#[must_use = "Output of the process function must be handled"]
pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
let mut out = vec![];
self.process_into_buffer(dgram.map(Into::into), now, &mut out)
self.process_into_buffer(dgram.map(Into::into).map(std::iter::once), now, &mut out)
.map_datagram(Into::into)
}

Expand All @@ -1166,9 +1177,9 @@ impl Connection {
///
/// Panics when `out` is not empty.
#[must_use = "Output of the process function must be handled"]
pub fn process_into_buffer<'a>(
pub fn process_into_buffer<'a, 'b>(
&mut self,
input: Option<Datagram<&[u8]>>,
input: Option<impl Iterator<Item = Datagram<&'b [u8]>>>,
now: Instant,
out: &'a mut Vec<u8>,
) -> Output<&'a [u8]> {
Expand Down Expand Up @@ -1291,7 +1302,7 @@ impl Connection {
debug_assert!(self.crypto.states.rx_hp(self.version, cspace).is_some());
for saved in self.saved_datagrams.take_saved() {
qtrace!([self], "input saved @{:?}: {:?}", saved.t, saved.d);
self.input((&saved.d).into(), saved.t, now);
self.input(std::iter::once((&saved.d).into()), saved.t, now);
}
}
}
Expand Down Expand Up @@ -1544,18 +1555,25 @@ impl Connection {

/// Take a datagram as input. This reports an error if the packet was bad.
/// This takes two times: when the datagram was received, and the current time.
fn input(&mut self, d: Datagram<&[u8]>, received: Instant, now: Instant) {
// First determine the path.
let path = self.paths.find_path_with_rebinding(
d.destination(),
d.source(),
self.conn_params.get_cc_algorithm(),
self.conn_params.pacing_enabled(),
now,
);
path.borrow_mut().add_received(d.len());
let res = self.input_path(&path, d, received);
self.capture_error(Some(path), now, 0, res).ok();
fn input<'a>(
&mut self,
dgrams: impl Iterator<Item = Datagram<&'a [u8]>>,
received: Instant,
now: Instant,
) {
for d in dgrams {
// First determine the path.
let path = self.paths.find_path_with_rebinding(
d.destination(),
d.source(),
self.conn_params.get_cc_algorithm(),
self.conn_params.pacing_enabled(),
now,
);
path.borrow_mut().add_received(d.len());
let res = self.input_path(&path, d, received);
self.capture_error(Some(path), now, 0, res).ok();
}
}

fn input_path(&mut self, path: &PathRef, d: Datagram<&[u8]>, now: Instant) -> Res<()> {
Expand Down
8 changes: 5 additions & 3 deletions neqo-transport/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl Server {
match sconn {
Ok(mut c) => {
self.setup_connection(&mut c, initial, orig_dcid);
let out = c.process_into_buffer(Some(dgram), now, out);
let out = c.process_into_buffer(Some(std::iter::once(dgram)), now, out);
self.connections.push(Rc::new(RefCell::new(c)));
out
}
Expand Down Expand Up @@ -368,7 +368,9 @@ impl Server {
.iter_mut()
.find(|c| c.borrow().is_valid_local_cid(packet.dcid()))
{
return c.borrow_mut().process_into_buffer(Some(dgram), now, out);
return c
.borrow_mut()
.process_into_buffer(Some(std::iter::once(dgram)), now, out);
}

if packet.packet_type() == PacketType::Short {
Expand Down Expand Up @@ -446,7 +448,7 @@ impl Server {

for connection in &mut self.connections {
match connection.borrow_mut().process_into_buffer(
None,
None::<std::iter::Once<Datagram<&[u8]>>>,
now,
// See .github/workflows/polonius.yml.
unsafe { &mut *std::ptr::from_mut(out) },
Expand Down
Loading

0 comments on commit 382234b

Please sign in to comment.