Skip to content

Commit

Permalink
Improve acknowledgement strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
iyangsj committed May 27, 2024
1 parent e63a00c commit 2292b51
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 9 deletions.
237 changes: 229 additions & 8 deletions src/connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ use log::*;
use strum::IntoEnumIterator;

use self::cid::ConnectionIdItem;
use self::space::BufferFlags;
use self::space::BufferType;
use self::space::PacketNumSpace;
use self::space::RateSamplePacketState;
use self::space::SpaceId;
use self::stream::Stream;
use self::stream::StreamIter;
use self::timer::Timer;
use self::ConnectionFlags::*;
use crate::codec;
use crate::codec::Decoder;
use crate::codec::Encoder;
use crate::connection::space::BufferFlags;
use crate::connection::space::BufferType;
use crate::connection::space::RateSamplePacketState;
use crate::error::ConnectionError;
use crate::error::Error;
use crate::frame;
Expand Down Expand Up @@ -74,8 +76,6 @@ use crate::RecoveryConfig;
use crate::Result;
use crate::Shutdown;

use self::space::SpaceId;

/// A QUIC connection.
pub struct Connection {
/// QUIC version used for the connection.
Expand Down Expand Up @@ -650,13 +650,18 @@ impl Connection {
}
space.recv_pkt_num_win.insert(pkt_num);
space.recv_pkt_num_need_ack.add_elem(pkt_num);
space.need_send_ack = space.need_send_ack || ack_eliciting_pkt;
space.largest_rx_pkt_num = cmp::max(space.largest_rx_pkt_num, pkt_num);
if !probing_pkt {
space.largest_rx_non_probing_pkt_num =
cmp::max(space.largest_rx_non_probing_pkt_num, pkt_num);
// TODO: try to do connection migration
}
if ack_eliciting_pkt {
space.largest_rx_ack_eliciting_pkt_num =
cmp::max(space.largest_rx_ack_eliciting_pkt_num, pkt_num);
}

self.try_schedule_ack_frame(space_id, pkt_num, ack_eliciting_pkt)?;

// An endpoint restarts its idle timer when a packet from its peer is
// received and processed successfully.
Expand Down Expand Up @@ -1254,6 +1259,68 @@ impl Connection {
}
}

/// Check and schedule an ACK frame to acknowledge incoming packets.
fn try_schedule_ack_frame(
&mut self,
space_id: SpaceId,
pkt_num: u64,
ack_eliciting: bool,
) -> Result<()> {
if !ack_eliciting {
return Ok(());
}

let space = self.spaces.get_mut(space_id).ok_or(Error::InternalError)?;
if space.need_send_ack {
return Ok(());
}

// An endpoint MUST acknowledge all ack-eliciting Initial and Handshake
// packets immediately
if space.id == SpaceId::Initial || space.id == SpaceId::Handshake {
space.need_send_ack = true;
return Ok(());
}

// A receiver SHOULD send an ACK frame after receiving at least two
// ack-eliciting packets.
space.ack_eliciting_pkts_since_last_sent_ack += 1;
let ack_eliciting_threshold = self.recovery_conf.ack_eliciting_threshold;
if space.ack_eliciting_pkts_since_last_sent_ack >= ack_eliciting_threshold {
space.need_send_ack = true;
space.ack_timer = None;
return Ok(());
}

// In order to assist loss detection at the sender, an endpoint SHOULD
// generate and send an ACK frame without delay when it receives an
// ack-eliciting packet either:
// - when the received packet has a packet number less than another
// ack-eliciting packet that has been received, or
// - when the packet has a packet number larger than the highest-numbered
// ack-eliciting packet that has been received and there are missing
// packets between that packet and this packet.
if pkt_num < space.largest_rx_ack_eliciting_pkt_num
|| pkt_num > space.largest_rx_ack_eliciting_pkt_num + 1
{
space.need_send_ack = true;
space.ack_timer = None;
return Ok(());
}

// All ack-eliciting 0-RTT and 1-RTT packets within its advertised
// max_ack_delay.
if space.ack_timer.is_none() {
let ack_delay = time::Duration::from_millis(self.peer_transport_params.max_ack_delay);
space.ack_timer = Some(time::Instant::now() + ack_delay);
debug!(
"{} set ack timer for space {:?}, timeout {:?} ",
&self.trace_id, space_id, space.ack_timer
);
}
Ok(())
}

/// A server could receive packets protected with 0-RTT keys prior to
/// receiving a TLS ClientHello. The server MAY retain these packets for
/// later decryption in anticipation of receiving a ClientHello.
Expand Down Expand Up @@ -1995,6 +2062,7 @@ impl Connection {
};
Connection::write_frame_to_packet(frame, out, st)?;
space.need_send_ack = false;
space.ack_eliciting_pkts_since_last_sent_ack = 0;

Ok(())
}
Expand Down Expand Up @@ -3039,6 +3107,10 @@ impl Connection {
Some(time) => self.timers.set(Timer::PathChallenge, time),
None => self.timers.stop(Timer::PathChallenge),
}
match self.spaces.min_ack_timer() {
Some(time) => self.timers.set(Timer::Ack, time),
None => self.timers.stop(Timer::Ack),
}

self.timers.next_timeout()
};
Expand Down Expand Up @@ -3091,6 +3163,19 @@ impl Connection {
}
}

Timer::Ack => {
for (_, space) in self.spaces.iter_mut() {
if let Some(timer) = space.ack_timer {
if timer > now {
continue;
}
debug!("{} ack timeout for space {:?}", self.trace_id, space.id);
space.need_send_ack = true;
space.ack_timer = None;
}
}
}

Timer::Idle => {
info!("{} idle timeout", self.trace_id);
self.flags.insert(Closed);
Expand Down Expand Up @@ -5364,6 +5449,7 @@ pub(crate) mod tests {
server_config.set_recv_udp_payload_size(1550);
server_config.set_initial_max_data(10000);
server_config.set_initial_max_stream_data_bidi_remote(10000);
server_config.set_ack_eliciting_threshold(1);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
assert_eq!(
test_pair.client.paths.get(0)?.recovery.max_datagram_size,
Expand Down Expand Up @@ -5816,8 +5902,10 @@ pub(crate) mod tests {
for case in cases {
let mut client_config = TestPair::new_test_config(false)?;
client_config.enable_dplpmtud(case.0);
client_config.set_ack_eliciting_threshold(1);
let mut server_config = TestPair::new_test_config(true)?;
server_config.enable_dplpmtud(case.1);
server_config.set_ack_eliciting_threshold(1);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
assert_eq!(test_pair.handshake(), Ok(()));

Expand Down Expand Up @@ -5850,10 +5938,12 @@ pub(crate) mod tests {
for case in cases {
let mut client_config = TestPair::new_test_config(false)?;
client_config.enable_dplpmtud(true);
client_config.set_ack_eliciting_threshold(1);
let mut server_config = TestPair::new_test_config(true)?;
server_config.enable_dplpmtud(false);
server_config.set_initial_max_data(10240);
server_config.set_initial_max_stream_data_bidi_remote(10240);
server_config.set_ack_eliciting_threshold(1);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
let router_mtu: usize = case.0;

Expand Down Expand Up @@ -6278,6 +6368,122 @@ pub(crate) mod tests {
Ok(())
}

#[test]
fn ack_initial_or_handshake_space() -> Result<()> {
let mut client_config = TestPair::new_test_config(false)?;
client_config.set_ack_eliciting_threshold(2);
let mut server_config = TestPair::new_test_config(true)?;
server_config.set_ack_eliciting_threshold(2);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;

// Client send 1 UDP datagram carrying 1 Initial packet
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;
assert_eq!(packets.len(), 1);

// Server send 2 UDP datagrams carrying 1 Initial packet and 2 Handshake packets
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
let packets = TestPair::conn_packets_out(&mut test_pair.server)?;
assert_eq!(packets.len(), 2);

// Client's Initial must be acknowledged immediately
TestPair::conn_packets_in(&mut test_pair.client, packets)?;
{
let stat = test_pair.client.paths.get_active_mut()?.stats();
assert_eq!(stat.acked_count, 1);
}

// Client send Handshake and completes handshake.
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;

// Server's Initial/Handshake must be acknowledged immediately
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
{
let stat = test_pair.server.paths.get_active_mut()?.stats();
assert_eq!(stat.acked_count, 3);
}

Ok(())
}

#[test]
fn ack_data_space_ack_eliciting_threshold() -> Result<()> {
let mut client_config = TestPair::new_test_config(false)?;
client_config.set_ack_eliciting_threshold(4);
client_config.enable_dplpmtud(false);
let mut server_config = TestPair::new_test_config(true)?;
server_config.set_ack_eliciting_threshold(4);
server_config.enable_dplpmtud(false);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
assert_eq!(test_pair.handshake(), Ok(()));
test_pair.move_forward()?;

let data = Bytes::from_static(b"QUIC");
let sid = test_pair.client.stream_bidi_new(0, false)?;
let acked_pkts = test_pair.client.paths.get_active_mut()?.stats().acked_count;

for i in 0..4 {
// Client write data on the stream
test_pair.client.stream_write(sid, data.clone(), false)?;
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;

// Server recv packets from the client
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
let packets = TestPair::conn_packets_out(&mut test_pair.server)?;

TestPair::conn_packets_in(&mut test_pair.client, packets)?;
let new_acked_pkts = test_pair.client.paths.get_active_mut()?.stats().acked_count;
if i < 3 {
assert_eq!(acked_pkts, new_acked_pkts);
} else {
assert_eq!(acked_pkts + 4, new_acked_pkts);
}
}

Ok(())
}

#[test]
fn ack_data_space_ack_timeout() -> Result<()> {
let mut client_config = TestPair::new_test_config(false)?;
client_config.set_ack_eliciting_threshold(4);
client_config.enable_dplpmtud(false);
let mut server_config = TestPair::new_test_config(true)?;
server_config.set_ack_eliciting_threshold(4);
server_config.enable_dplpmtud(false);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
assert_eq!(test_pair.handshake(), Ok(()));
test_pair.move_forward()?;

let data = Bytes::from_static(b"QUIC");
let sid = test_pair.client.stream_bidi_new(0, false)?;
let acked_pkts = test_pair.client.paths.get_active_mut()?.stats().acked_count;

// Client write data on the stream
test_pair.client.stream_write(sid, data.clone(), false)?;
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;

// Server recv packets from the client
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
let packets = TestPair::conn_packets_out(&mut test_pair.server)?;
assert_eq!(packets.len(), 0);

// Advance server ticks until ack timeout
assert!(test_pair.server.timeout().is_some());
let ack_timeout = test_pair.server.timers.get(Timer::Ack);
assert!(ack_timeout.is_some());
let now = ack_timeout.unwrap();
test_pair.server.on_timeout(now);

// Server send ack
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
let packets = TestPair::conn_packets_out(&mut test_pair.server)?;
TestPair::conn_packets_in(&mut test_pair.client, packets)?;
let new_acked_pkts = test_pair.client.paths.get_active_mut()?.stats().acked_count;
assert_eq!(acked_pkts + 1, new_acked_pkts);

Ok(())
}

#[test]
fn conn_close_by_application() -> Result<()> {
// Establish a connection
Expand Down Expand Up @@ -6705,7 +6911,12 @@ pub(crate) mod tests {

#[test]
fn conn_max_streams_bidi() -> Result<()> {
let mut test_pair = TestPair::new_with_test_config()?;
let mut client_config = TestPair::new_test_config(false)?;
client_config.set_ack_eliciting_threshold(1);
let mut server_config = TestPair::new_test_config(true)?;

let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;

assert_eq!(test_pair.handshake(), Ok(()));

// Client create bidi streams
Expand Down Expand Up @@ -6864,7 +7075,13 @@ pub(crate) mod tests {

#[test]
fn stream_reset() -> Result<()> {
let mut test_pair = TestPair::new_with_test_config()?;
let mut client_config = TestPair::new_test_config(false)?;
client_config.set_ack_eliciting_threshold(1);
let mut server_config = TestPair::new_test_config(true)?;
server_config.set_ack_eliciting_threshold(1);

let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;

assert_eq!(test_pair.handshake(), Ok(()));
let mut buf = vec![0; 16];

Expand Down Expand Up @@ -6982,12 +7199,14 @@ pub(crate) mod tests {
client_config.set_cid_len(crate::MAX_CID_LEN);
client_config.enable_multipath(true);
client_config.set_multipath_algorithm(MultipathAlgorithm::Redundant);
client_config.set_ack_eliciting_threshold(1);
let mut server_config = TestPair::new_test_config(true)?;
server_config.set_cid_len(crate::MAX_CID_LEN);

// Handshake with multipath enabled
server_config.enable_multipath(true);
server_config.set_multipath_algorithm(MultipathAlgorithm::Redundant);
server_config.set_ack_eliciting_threshold(1);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;

let blocks = vec![
Expand All @@ -7012,11 +7231,13 @@ pub(crate) mod tests {
client_config.set_cid_len(crate::MAX_CID_LEN);
client_config.enable_multipath(true);
client_config.set_multipath_algorithm(MultipathAlgorithm::RoundRobin);
client_config.set_ack_eliciting_threshold(1);

let mut server_config = TestPair::new_test_config(true)?;
server_config.set_cid_len(crate::MAX_CID_LEN);
server_config.enable_multipath(true);
server_config.set_multipath_algorithm(MultipathAlgorithm::RoundRobin);
server_config.set_ack_eliciting_threshold(1);

let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
let mut blocks = vec![];
Expand Down
Loading

0 comments on commit 2292b51

Please sign in to comment.