Skip to content

Commit

Permalink
Improve app_limit detection by keeping track on app_limited state in …
Browse files Browse the repository at this point in the history
…on_packet_sent

Fixes mozilla#1475
  • Loading branch information
mb committed Nov 20, 2023
1 parent b0eb12a commit d1646e2
Showing 1 changed file with 145 additions and 84 deletions.
229 changes: 145 additions & 84 deletions neqo-transport/src/cc/classic_cc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ pub struct ClassicCongestionControl<T> {
acked_bytes: usize,
ssthresh: usize,
recovery_start: Option<PacketNumber>,
/// `first_app_limited` indicates the packet number after which the application might be
/// underutilizing the congestion window. When underutilizing the congestion window due to not
/// sending out enough data, we SHOULD NOT increase the congestion window.[1] Packets sent
/// before this point are deemed to fully utilize the congestion window and count towards
/// increasing the congestion window.
///
/// [1]: https://datatracker.ietf.org/doc/html/rfc9002#section-7.8
first_app_limited: PacketNumber,

qlog: NeqoQlog,
}
Expand Down Expand Up @@ -153,19 +161,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {

// Multi-packet version of OnPacketAckedCC
fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
// Check whether we are app limited before acked packets are removed
// from bytes_in_flight.
let is_app_limited = self.app_limited();
qtrace!(
[self],
"limited={}, bytes_in_flight={}, cwnd={}, state={:?} pacing_burst_size={}",
is_app_limited,
self.bytes_in_flight,
self.congestion_window,
self.state,
MAX_DATAGRAM_SIZE * PACING_BURST_SIZE,
);

let mut is_app_limited = true;
let mut new_acked = 0;
for pkt in acked_pkts {
qinfo!(
Expand All @@ -179,6 +175,9 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
if !pkt.cc_outstanding() {
continue;
}
if pkt.pn < self.first_app_limited {
is_app_limited = false;
}
assert!(self.bytes_in_flight >= pkt.size);
self.bytes_in_flight -= pkt.size;

Expand Down Expand Up @@ -326,6 +325,13 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
if !pkt.cc_in_flight() {
return;
}
if !self.app_limited() {
// Given the current non-app-limited condition, we're fully utilizing the congestion
// window. Assume that all in-flight packets up to this one are NOT app-limited.
// However, subsequent packets might be app-limited. Set `first_app_limited` to the
// next packet number.
self.first_app_limited = pkt.pn + 1;
}

self.bytes_in_flight += pkt.size;
qinfo!(
Expand Down Expand Up @@ -357,6 +363,7 @@ impl<T: WindowAdjustment> ClassicCongestionControl<T> {
ssthresh: usize::MAX,
recovery_start: None,
qlog: NeqoQlog::disabled(),
first_app_limited: 0,
}
}

Expand Down Expand Up @@ -532,13 +539,15 @@ mod tests {
};
use crate::{
cc::{
classic_cc::State,
cubic::{Cubic, CUBIC_BETA_USIZE_DIVIDEND, CUBIC_BETA_USIZE_DIVISOR},
new_reno::NewReno,
CongestionControl, CongestionControlAlgorithm, CWND_INITIAL_PKTS, MAX_DATAGRAM_SIZE,
},
packet::{PacketNumber, PacketType},
tracking::SentPacket,
};
use neqo_common::qinfo;
use std::{
convert::TryFrom,
time::{Duration, Instant},
Expand Down Expand Up @@ -989,131 +998,183 @@ mod tests {

#[test]
fn app_limited_slow_start() {
const LESS_THAN_CWND_PKTS: usize = 4;
const BELOW_APP_LIMIT_PKTS: usize = 5;
const ABOVE_APP_LIMIT_PKTS: usize = BELOW_APP_LIMIT_PKTS + 1;
let mut cc = ClassicCongestionControl::new(NewReno::default());

for i in 0..CWND_INITIAL_PKTS {
let sent = SentPacket::new(
PacketType::Short,
u64::try_from(i).unwrap(), // pn
now(), // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
cc.on_packet_sent(&sent);
let cwnd = cc.congestion_window;
let mut now = now();
let mut next_pn = 0;

// simulate packet bursts below app_limit
for packet_burst_size in 1..=BELOW_APP_LIMIT_PKTS {
// always stay below app_limit during sent.
let mut pkts = Vec::new();
for _ in 0..packet_burst_size {
let p = SentPacket::new(
PacketType::Short,
next_pn, // pn
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}
assert_eq!(cc.bytes_in_flight(), packet_burst_size * MAX_DATAGRAM_SIZE);
now += RTT;
cc.on_packets_acked(&pkts, RTT, now);
assert_eq!(cc.bytes_in_flight(), 0);
assert_eq!(cc.acked_bytes, 0);
assert_eq!(cwnd, cc.congestion_window); // CWND doesn't grow because we're app limited
}
assert_eq!(cc.bytes_in_flight(), CWND_INITIAL);

for i in 0..LESS_THAN_CWND_PKTS {
let acked = SentPacket::new(
// Fully utilize the congestion window by sending enough packets to
// have `bytes_in_flight` above the `app_limited` threshold.
let mut pkts = Vec::new();
for _ in 0..ABOVE_APP_LIMIT_PKTS {
let p = SentPacket::new(
PacketType::Short,
u64::try_from(i).unwrap(), // pn
now(), // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
next_pn, // pn
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
cc.on_packets_acked(&[acked], RTT, now());

assert_eq!(
cc.bytes_in_flight(),
(CWND_INITIAL_PKTS - i - 1) * MAX_DATAGRAM_SIZE
);
assert_eq!(cc.cwnd(), (CWND_INITIAL_PKTS + i + 1) * MAX_DATAGRAM_SIZE);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}

// Now we are app limited
for i in 4..CWND_INITIAL_PKTS {
let p = [SentPacket::new(
PacketType::Short,
u64::try_from(i).unwrap(), // pn
now(), // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
)];
cc.on_packets_acked(&p, RTT, now());
assert_eq!(
cc.bytes_in_flight(),
ABOVE_APP_LIMIT_PKTS * MAX_DATAGRAM_SIZE
);
now += RTT;
// Check if congestion window gets increased for all packets currently in flight
for (i, pkt) in pkts.into_iter().enumerate() {
cc.on_packets_acked(&[pkt], RTT, now);

assert_eq!(
cc.bytes_in_flight(),
(CWND_INITIAL_PKTS - i - 1) * MAX_DATAGRAM_SIZE
(ABOVE_APP_LIMIT_PKTS - i - 1) * MAX_DATAGRAM_SIZE
);
assert_eq!(cc.cwnd(), (CWND_INITIAL_PKTS + 4) * MAX_DATAGRAM_SIZE);
// increase acked_bytes with each packet
qinfo!("{} {}", cc.congestion_window, cwnd + i * MAX_DATAGRAM_SIZE);
assert_eq!(cc.congestion_window, cwnd + (i + 1) * MAX_DATAGRAM_SIZE);
assert_eq!(cc.acked_bytes, 0);
}
}

#[test]
fn app_limited_congestion_avoidance() {
const CWND_PKTS_CA: usize = CWND_INITIAL_PKTS / 2;
const BELOW_APP_LIMIT_PKTS: usize = CWND_PKTS_CA - 2;
const ABOVE_APP_LIMIT_PKTS: usize = BELOW_APP_LIMIT_PKTS + 1;

let mut cc = ClassicCongestionControl::new(NewReno::default());
let mut now = now();

// Change state to congestion avoidance by introducing loss.

let p_lost = SentPacket::new(
PacketType::Short,
1, // pn
now(), // time sent
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
cc.on_packet_sent(&p_lost);
cwnd_is_default(&cc);
cc.on_packets_lost(Some(now()), None, PTO, &[p_lost]);
now += PTO;
cc.on_packets_lost(Some(now), None, PTO, &[p_lost]);
cwnd_is_halved(&cc);
let p_not_lost = SentPacket::new(
PacketType::Short,
1, // pn
now(), // time sent
2, // pn
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
cc.on_packet_sent(&p_not_lost);
cc.on_packets_acked(&[p_not_lost], RTT, now());
now += RTT;
cc.on_packets_acked(&[p_not_lost], RTT, now);
cwnd_is_halved(&cc);
// cc is app limited therefore cwnd in not increased.
assert_eq!(cc.acked_bytes, 0);

// Now we are in the congestion avoidance state.
assert_eq!(cc.state, State::CongestionAvoidance);
// simulate packet bursts below app_limit
let mut next_pn = 3;
for packet_burst_size in 1..=BELOW_APP_LIMIT_PKTS {
// always stay below app_limit during sent.
let mut pkts = Vec::new();
for _ in 0..packet_burst_size {
let p = SentPacket::new(
PacketType::Short,
next_pn, // pn
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}
assert_eq!(cc.bytes_in_flight(), packet_burst_size * MAX_DATAGRAM_SIZE);
now += RTT;
for (i, pkt) in pkts.into_iter().enumerate() {
cc.on_packets_acked(&[pkt], RTT, now);

assert_eq!(
cc.bytes_in_flight(),
(packet_burst_size - i - 1) * MAX_DATAGRAM_SIZE
);
cwnd_is_halved(&cc); // CWND doesn't grow because we're app limited
assert_eq!(cc.acked_bytes, 0);
}
}

// Fully utilize the congestion window by sending enough packets to
// have `bytes_in_flight` above the `app_limited` threshold.
let mut pkts = Vec::new();
for i in 0..CWND_PKTS_CA {
for _ in 0..ABOVE_APP_LIMIT_PKTS {
let p = SentPacket::new(
PacketType::Short,
u64::try_from(i + 3).unwrap(), // pn
now(), // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
next_pn, // pn
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}
assert_eq!(cc.bytes_in_flight(), CWND_INITIAL / 2);

for i in 0..CWND_PKTS_CA - 2 {
cc.on_packets_acked(&pkts[i..=i], RTT, now());

assert_eq!(
cc.bytes_in_flight(),
(CWND_PKTS_CA - i - 1) * MAX_DATAGRAM_SIZE
);
assert_eq!(cc.cwnd(), CWND_PKTS_CA * MAX_DATAGRAM_SIZE);
assert_eq!(cc.acked_bytes, MAX_DATAGRAM_SIZE * (i + 1));
}

// Now we are app limited
for i in CWND_PKTS_CA - 2..CWND_PKTS_CA {
cc.on_packets_acked(&pkts[i..=i], RTT, now());
assert_eq!(
cc.bytes_in_flight(),
ABOVE_APP_LIMIT_PKTS * MAX_DATAGRAM_SIZE
);
now += RTT;
let mut last_acked_bytes = 0;
// Check if congestion window gets increased for all packets currently in flight
for (i, pkt) in pkts.into_iter().enumerate() {
cc.on_packets_acked(&[pkt], RTT, now);

assert_eq!(
cc.bytes_in_flight(),
(CWND_PKTS_CA - i - 1) * MAX_DATAGRAM_SIZE
(ABOVE_APP_LIMIT_PKTS - i - 1) * MAX_DATAGRAM_SIZE
);
assert_eq!(cc.cwnd(), CWND_PKTS_CA * MAX_DATAGRAM_SIZE);
assert_eq!(cc.acked_bytes, MAX_DATAGRAM_SIZE * 3);
// The cwnd doesn't increase, but the acked_bytes do, which will eventually lead to an
// increase, once the number of bytes reaches the necessary level
cwnd_is_halved(&cc);
// increase acked_bytes with each packet
assert_ne!(cc.acked_bytes, last_acked_bytes);
last_acked_bytes = cc.acked_bytes;
}
}
}

0 comments on commit d1646e2

Please sign in to comment.