Skip to content

Commit

Permalink
Improve app_limit detection by keeping track on app_limited state in …
Browse files Browse the repository at this point in the history
…on_packet_sent

Fixes mozilla#1475
  • Loading branch information
mb committed Nov 2, 2023
1 parent 887d256 commit 4ebc9f0
Showing 1 changed file with 145 additions and 84 deletions.
229 changes: 145 additions & 84 deletions neqo-transport/src/cc/classic_cc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ pub struct ClassicCongestionControl<T> {
acked_bytes: usize,
ssthresh: usize,
recovery_start: Option<PacketNumber>,
/// `first_app_limited` indicates the packet number after which the application might be
/// underutilizing the congestion window. When underutilizing the congestion window due to not
/// sending out enough data, we SHOULD NOT increase the congestion window.[1] Packets sent
/// before this point are deemed to fully utilize the congestion window and count towards
/// increasing the congestion window.
///
/// [1]: https://datatracker.ietf.org/doc/html/rfc9002#section-7.8
first_app_limited: PacketNumber,

qlog: NeqoQlog,
}
Expand Down Expand Up @@ -150,19 +158,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {

// Multi-packet version of OnPacketAckedCC
fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
// Check whether we are app limited before acked packets are removed
// from bytes_in_flight.
let is_app_limited = self.app_limited();
qtrace!(
[self],
"limited={}, bytes_in_flight={}, cwnd={}, state={:?} pacing_burst_size={}",
is_app_limited,
self.bytes_in_flight,
self.congestion_window,
self.state,
MAX_DATAGRAM_SIZE * PACING_BURST_SIZE,
);

let mut is_app_limited = true;
let mut new_acked = 0;
for pkt in acked_pkts {
qinfo!(
Expand All @@ -176,6 +172,9 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
if !pkt.cc_outstanding() {
continue;
}
if pkt.pn < self.first_app_limited {
is_app_limited = false;
}
assert!(self.bytes_in_flight >= pkt.size);
self.bytes_in_flight -= pkt.size;

Expand Down Expand Up @@ -323,6 +322,13 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
if !pkt.cc_in_flight() {
return;
}
if !self.app_limited() {
// Given the current non-app-limited condition, we're fully utilizing the congestion
// window. We can safely consider that all in-flight packets up to the current to be not
// app-limiting. However, the subsequent packets might fall under app-limited
// condition. Hence, set the `first_app_limited` to the next packet number.
self.first_app_limited = pkt.pn + 1;
}

self.bytes_in_flight += pkt.size;
qinfo!(
Expand Down Expand Up @@ -354,6 +360,7 @@ impl<T: WindowAdjustment> ClassicCongestionControl<T> {
ssthresh: usize::MAX,
recovery_start: None,
qlog: NeqoQlog::disabled(),
first_app_limited: 0,
}
}

Expand Down Expand Up @@ -523,6 +530,7 @@ mod tests {
use super::{
ClassicCongestionControl, WindowAdjustment, CWND_INITIAL, CWND_MIN, PERSISTENT_CONG_THRESH,
};
use crate::cc::classic_cc::State;
use crate::cc::cubic::{Cubic, CUBIC_BETA_USIZE_DIVIDEND, CUBIC_BETA_USIZE_DIVISOR};
use crate::cc::new_reno::NewReno;
use crate::cc::{
Expand Down Expand Up @@ -978,131 +986,184 @@ mod tests {

#[test]
fn app_limited_slow_start() {
const LESS_THAN_CWND_PKTS: usize = 4;
const BELOW_APP_LIMIT_PKTS: usize = 5;
const ABOVE_APP_LIMIT_PKTS: usize = BELOW_APP_LIMIT_PKTS + 1;
let mut cc = ClassicCongestionControl::new(NewReno::default());

for i in 0..CWND_INITIAL_PKTS {
let sent = SentPacket::new(
PacketType::Short,
u64::try_from(i).unwrap(), // pn
now(), // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
cc.on_packet_sent(&sent);
let cwnd = cc.congestion_window;
let mut now = now();
let mut next_pn = 0;

// simulate packet bursts below app_limit
for packet_burst_size in 1..=BELOW_APP_LIMIT_PKTS {
// always stay below app_limit during sent.
let mut pkts = Vec::new();
for _ in 0..packet_burst_size {
let p = SentPacket::new(
PacketType::Short,
next_pn, // pn
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}
assert_eq!(cc.bytes_in_flight(), packet_burst_size * MAX_DATAGRAM_SIZE);
now += RTT;
for (i, pkt) in pkts.into_iter().enumerate() {
cc.on_packets_acked(&[pkt], RTT, now);

assert_eq!(
cc.bytes_in_flight(),
(packet_burst_size - i - 1) * MAX_DATAGRAM_SIZE
);

assert_eq!(cwnd, cc.congestion_window); // CWND doesn't grow because we're app limited
assert_eq!(cc.acked_bytes, 0);
}
}
assert_eq!(cc.bytes_in_flight(), CWND_INITIAL);

for i in 0..LESS_THAN_CWND_PKTS {
let acked = SentPacket::new(
// Utilize congestion window and increase it for each ACKs arriving
let mut pkts = Vec::new();
for _ in 0..ABOVE_APP_LIMIT_PKTS {
let p = SentPacket::new(
PacketType::Short,
u64::try_from(i).unwrap(), // pn
now(), // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
cc.on_packets_acked(&[acked], RTT, now());

assert_eq!(
cc.bytes_in_flight(),
(CWND_INITIAL_PKTS - i - 1) * MAX_DATAGRAM_SIZE
next_pn, // pn
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
assert_eq!(cc.cwnd(), (CWND_INITIAL_PKTS + i + 1) * MAX_DATAGRAM_SIZE);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}

// Now we are app limited
for i in 4..CWND_INITIAL_PKTS {
let p = [SentPacket::new(
PacketType::Short,
u64::try_from(i).unwrap(), // pn
now(), // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
)];
cc.on_packets_acked(&p, RTT, now());
assert_eq!(
cc.bytes_in_flight(),
ABOVE_APP_LIMIT_PKTS * MAX_DATAGRAM_SIZE
);
now += RTT;
for (i, pkt) in pkts.into_iter().enumerate() {
cc.on_packets_acked(&[pkt], RTT, now);

assert_eq!(
cc.bytes_in_flight(),
(CWND_INITIAL_PKTS - i - 1) * MAX_DATAGRAM_SIZE
(ABOVE_APP_LIMIT_PKTS - i - 1) * MAX_DATAGRAM_SIZE
);
assert_eq!(cc.cwnd(), (CWND_INITIAL_PKTS + 4) * MAX_DATAGRAM_SIZE);
// increase acked_bytes with each packet
println!("{} {}", cc.congestion_window, cwnd + i * MAX_DATAGRAM_SIZE);
assert_eq!(cc.congestion_window, cwnd + (i + 1) * MAX_DATAGRAM_SIZE); // CWND doesn't grow because we're app limited
assert_eq!(cc.acked_bytes, 0);
}
}

#[test]
fn app_limited_congestion_avoidance() {
const CWND_PKTS_CA: usize = CWND_INITIAL_PKTS / 2;
const BELOW_APP_LIMIT_PKTS: usize = CWND_PKTS_CA - 2;
const ABOVE_APP_LIMIT_PKTS: usize = BELOW_APP_LIMIT_PKTS + 1;

let mut cc = ClassicCongestionControl::new(NewReno::default());
let mut now = now();

// Change state to congestion avoidance by introducing loss.

let p_lost = SentPacket::new(
PacketType::Short,
1, // pn
now(), // time sent
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
cc.on_packet_sent(&p_lost);
cwnd_is_default(&cc);
cc.on_packets_lost(Some(now()), None, PTO, &[p_lost]);
now += PTO;
cc.on_packets_lost(Some(now), None, PTO, &[p_lost]);
cwnd_is_halved(&cc);
let p_not_lost = SentPacket::new(
PacketType::Short,
1, // pn
now(), // time sent
2, // pn
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
cc.on_packet_sent(&p_not_lost);
cc.on_packets_acked(&[p_not_lost], RTT, now());
now += RTT;
cc.on_packets_acked(&[p_not_lost], RTT, now);
cwnd_is_halved(&cc);
// cc is app limited therefore cwnd in not increased.
assert_eq!(cc.acked_bytes, 0);

// Now we are in the congestion avoidance state.
assert_eq!(cc.state, State::CongestionAvoidance);
// simulate packet bursts below app_limit
let mut next_pn = 3;
for packet_burst_size in 1..=BELOW_APP_LIMIT_PKTS {
// always stay below app_limit during sent.
let mut pkts = Vec::new();
for _ in 0..packet_burst_size {
let p = SentPacket::new(
PacketType::Short,
next_pn, // pn
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}
assert_eq!(cc.bytes_in_flight(), packet_burst_size * MAX_DATAGRAM_SIZE);
now += RTT;
for (i, pkt) in pkts.into_iter().enumerate() {
cc.on_packets_acked(&[pkt], RTT, now);

assert_eq!(
cc.bytes_in_flight(),
(packet_burst_size - i - 1) * MAX_DATAGRAM_SIZE
);
cwnd_is_halved(&cc); // CWND doesn't grow because we're app limited
assert_eq!(cc.acked_bytes, 0);
}
}

// Utilize congestion window and increase it for each ACKs arriving
let mut pkts = Vec::new();
for i in 0..CWND_PKTS_CA {
for _ in 0..ABOVE_APP_LIMIT_PKTS {
let p = SentPacket::new(
PacketType::Short,
u64::try_from(i + 3).unwrap(), // pn
now(), // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
next_pn, // pn
now, // time sent
true, // ack eliciting
Vec::new(), // tokens
MAX_DATAGRAM_SIZE, // size
);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}
assert_eq!(cc.bytes_in_flight(), CWND_INITIAL / 2);

for i in 0..CWND_PKTS_CA - 2 {
cc.on_packets_acked(&pkts[i..=i], RTT, now());

assert_eq!(
cc.bytes_in_flight(),
(CWND_PKTS_CA - i - 1) * MAX_DATAGRAM_SIZE
);
assert_eq!(cc.cwnd(), CWND_PKTS_CA * MAX_DATAGRAM_SIZE);
assert_eq!(cc.acked_bytes, MAX_DATAGRAM_SIZE * (i + 1));
}

// Now we are app limited
for i in CWND_PKTS_CA - 2..CWND_PKTS_CA {
cc.on_packets_acked(&pkts[i..=i], RTT, now());
assert_eq!(
cc.bytes_in_flight(),
ABOVE_APP_LIMIT_PKTS * MAX_DATAGRAM_SIZE
);
now += RTT;
let mut last_acked_bytes = 0;
for (i, pkt) in pkts.into_iter().enumerate() {
cc.on_packets_acked(&[pkt], RTT, now);

assert_eq!(
cc.bytes_in_flight(),
(CWND_PKTS_CA - i - 1) * MAX_DATAGRAM_SIZE
(ABOVE_APP_LIMIT_PKTS - i - 1) * MAX_DATAGRAM_SIZE
);
assert_eq!(cc.cwnd(), CWND_PKTS_CA * MAX_DATAGRAM_SIZE);
assert_eq!(cc.acked_bytes, MAX_DATAGRAM_SIZE * 3);
cwnd_is_halved(&cc);
// increase acked_bytes with each packet
assert_ne!(cc.acked_bytes, last_acked_bytes);
last_acked_bytes = cc.acked_bytes;
}
}
}

0 comments on commit 4ebc9f0

Please sign in to comment.