diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index 639cfe9358..436ee3022c 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -14,6 +14,7 @@ use std::time::{Duration, Instant}; use super::CongestionControl; use crate::cc::MAX_DATAGRAM_SIZE; +use crate::packet::PacketNumber; use crate::qlog::{self, QlogMetric}; use crate::sender::PACING_BURST_SIZE; use crate::tracking::SentPacket; @@ -110,6 +111,14 @@ pub struct ClassicCongestionControl { acked_bytes: usize, ssthresh: usize, recovery_start: Option, + /// `first_app_limited` indicates the packet number after which the application might be + /// underutilizing the congestion window. When underutilizing the congestion window due to not + /// sending out enough data, we SHOULD NOT increase the congestion window.[1] Packets sent + /// before this point are deemed to fully utilize the congestion window and count towards + /// increasing the congestion window. + /// + /// [1]: https://datatracker.ietf.org/doc/html/rfc9002#section-7.8 + first_app_limited: PacketNumber, qlog: NeqoQlog, } @@ -149,21 +158,12 @@ impl CongestionControl for ClassicCongestionControl { // Multi-packet version of OnPacketAckedCC fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) { - // Check whether we are app limited before acked packets are removed - // from bytes_in_flight. - let is_app_limited = self.app_limited(); - qtrace!( - [self], - "limited={}, bytes_in_flight={}, cwnd={}, state={:?} pacing_burst_size={}", - is_app_limited, - self.bytes_in_flight, - self.congestion_window, - self.state, - MAX_DATAGRAM_SIZE * PACING_BURST_SIZE, - ); - let mut new_acked = 0; + let mut is_app_limited = true; for pkt in acked_pkts.iter().filter(|pkt| pkt.cc_outstanding()) { + if pkt.pn < self.first_app_limited { + is_app_limited = false; + } assert!(self.bytes_in_flight >= pkt.size); self.bytes_in_flight -= pkt.size; @@ -181,6 +181,16 @@ impl CongestionControl for ClassicCongestionControl { new_acked += pkt.size; } + qtrace!( + [self], + "limited={}, bytes_in_flight={}, cwnd={}, state={:?} pacing_burst_size={}", + is_app_limited, + self.bytes_in_flight, + self.congestion_window, + self.state, + MAX_DATAGRAM_SIZE * PACING_BURST_SIZE, + ); + if is_app_limited { self.cc_algorithm.on_app_limited(); return; @@ -300,6 +310,13 @@ impl CongestionControl for ClassicCongestionControl { if !pkt.cc_in_flight() { return; } + if !self.app_limited() { + // Given the current non-app-limited condition, we're fully utilizing the congestion + // window. We can safely consider that all in-flight packets up to the current to be not + // app-limiting. However, the subsequent packets might fall under app-limited + // condition. Hence, set the `first_app_limited` to the next packet number. + self.first_app_limited = pkt.pn + 1; + } self.bytes_in_flight += pkt.size; qdebug!( @@ -332,6 +349,7 @@ impl ClassicCongestionControl { ssthresh: usize::MAX, recovery_start: None, qlog: NeqoQlog::disabled(), + first_app_limited: 0, } } @@ -501,6 +519,7 @@ mod tests { use super::{ ClassicCongestionControl, WindowAdjustment, CWND_INITIAL, CWND_MIN, PERSISTENT_CONG_THRESH, }; + use crate::cc::classic_cc::State; use crate::cc::cubic::{Cubic, CUBIC_BETA_USIZE_DIVIDEND, CUBIC_BETA_USIZE_DIVISOR}; use crate::cc::new_reno::NewReno; use crate::cc::{ @@ -956,131 +975,184 @@ mod tests { #[test] fn app_limited_slow_start() { - const LESS_THAN_CWND_PKTS: usize = 4; + const BELOW_APP_LIMIT_PKTS: usize = 5; + const ABOVE_APP_LIMIT_PKTS: usize = BELOW_APP_LIMIT_PKTS + 1; let mut cc = ClassicCongestionControl::new(NewReno::default()); - - for i in 0..CWND_INITIAL_PKTS { - let sent = SentPacket::new( - PacketType::Short, - u64::try_from(i).unwrap(), // pn - now(), // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size - ); - cc.on_packet_sent(&sent); + let cwnd = cc.congestion_window; + let mut now = now(); + let mut next_pn = 0; + + // simulate packet bursts below app_limit + for packet_burst_size in 1..=BELOW_APP_LIMIT_PKTS { + // always stay below app_limit during sent. + let mut pkts = Vec::new(); + for _ in 0..packet_burst_size { + let p = SentPacket::new( + PacketType::Short, + next_pn, // pn + now, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size + ); + next_pn += 1; + cc.on_packet_sent(&p); + pkts.push(p); + } + assert_eq!(cc.bytes_in_flight(), packet_burst_size * MAX_DATAGRAM_SIZE); + now += RTT; + for (i, pkt) in pkts.into_iter().enumerate() { + cc.on_packets_acked(&[pkt], RTT, now); + + assert_eq!( + cc.bytes_in_flight(), + (packet_burst_size - i - 1) * MAX_DATAGRAM_SIZE + ); + + assert_eq!(cwnd, cc.congestion_window); // CWND doesn't grow because we're app limited + assert_eq!(cc.acked_bytes, 0); + } } - assert_eq!(cc.bytes_in_flight(), CWND_INITIAL); - for i in 0..LESS_THAN_CWND_PKTS { - let acked = SentPacket::new( + // Utilize congestion window and go out of packet + let mut pkts = Vec::new(); + for _ in 0..ABOVE_APP_LIMIT_PKTS { + let p = SentPacket::new( PacketType::Short, - u64::try_from(i).unwrap(), // pn - now(), // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size - ); - cc.on_packets_acked(&[acked], RTT, now()); - - assert_eq!( - cc.bytes_in_flight(), - (CWND_INITIAL_PKTS - i - 1) * MAX_DATAGRAM_SIZE + next_pn, // pn + now, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size ); - assert_eq!(cc.cwnd(), (CWND_INITIAL_PKTS + i + 1) * MAX_DATAGRAM_SIZE); + next_pn += 1; + cc.on_packet_sent(&p); + pkts.push(p); } - - // Now we are app limited - for i in 4..CWND_INITIAL_PKTS { - let p = [SentPacket::new( - PacketType::Short, - u64::try_from(i).unwrap(), // pn - now(), // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size - )]; - cc.on_packets_acked(&p, RTT, now()); + assert_eq!( + cc.bytes_in_flight(), + ABOVE_APP_LIMIT_PKTS * MAX_DATAGRAM_SIZE + ); + now += RTT; + for (i, pkt) in pkts.into_iter().enumerate() { + cc.on_packets_acked(&[pkt], RTT, now); assert_eq!( cc.bytes_in_flight(), - (CWND_INITIAL_PKTS - i - 1) * MAX_DATAGRAM_SIZE + (ABOVE_APP_LIMIT_PKTS - i - 1) * MAX_DATAGRAM_SIZE ); - assert_eq!(cc.cwnd(), (CWND_INITIAL_PKTS + 4) * MAX_DATAGRAM_SIZE); + // increase acked_bytes with each packet + println!("{} {}", cc.congestion_window, cwnd + i * MAX_DATAGRAM_SIZE); + assert_eq!(cc.congestion_window, cwnd + (i + 1) * MAX_DATAGRAM_SIZE); // CWND doesn't grow because we're app limited + assert_eq!(cc.acked_bytes, 0); } } #[test] fn app_limited_congestion_avoidance() { const CWND_PKTS_CA: usize = CWND_INITIAL_PKTS / 2; + const BELOW_APP_LIMIT_PKTS: usize = CWND_PKTS_CA - 2; + const ABOVE_APP_LIMIT_PKTS: usize = BELOW_APP_LIMIT_PKTS + 1; let mut cc = ClassicCongestionControl::new(NewReno::default()); + let mut now = now(); // Change state to congestion avoidance by introducing loss. let p_lost = SentPacket::new( PacketType::Short, 1, // pn - now(), // time sent + now, // time sent true, // ack eliciting Vec::new(), // tokens MAX_DATAGRAM_SIZE, // size ); cc.on_packet_sent(&p_lost); cwnd_is_default(&cc); - cc.on_packets_lost(Some(now()), None, PTO, &[p_lost]); + now += PTO; + cc.on_packets_lost(Some(now), None, PTO, &[p_lost]); cwnd_is_halved(&cc); let p_not_lost = SentPacket::new( PacketType::Short, - 1, // pn - now(), // time sent + 2, // pn + now, // time sent true, // ack eliciting Vec::new(), // tokens MAX_DATAGRAM_SIZE, // size ); cc.on_packet_sent(&p_not_lost); - cc.on_packets_acked(&[p_not_lost], RTT, now()); + now += RTT; + cc.on_packets_acked(&[p_not_lost], RTT, now); cwnd_is_halved(&cc); // cc is app limited therefore cwnd in not increased. assert_eq!(cc.acked_bytes, 0); // Now we are in the congestion avoidance state. + assert_eq!(cc.state, State::CongestionAvoidance); + // simulate packet bursts below app_limit + let mut next_pn = 3; + for packet_burst_size in 1..=BELOW_APP_LIMIT_PKTS { + // always stay below app_limit during sent. + let mut pkts = Vec::new(); + for _ in 0..packet_burst_size { + let p = SentPacket::new( + PacketType::Short, + next_pn, // pn + now, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size + ); + next_pn += 1; + cc.on_packet_sent(&p); + pkts.push(p); + } + assert_eq!(cc.bytes_in_flight(), packet_burst_size * MAX_DATAGRAM_SIZE); + now += RTT; + for (i, pkt) in pkts.into_iter().enumerate() { + cc.on_packets_acked(&[pkt], RTT, now); + + assert_eq!( + cc.bytes_in_flight(), + (packet_burst_size - i - 1) * MAX_DATAGRAM_SIZE + ); + cwnd_is_halved(&cc); // CWND doesn't grow because we're app limited + assert_eq!(cc.acked_bytes, 0); + } + } + + // Utilize congestion window and go out of packet let mut pkts = Vec::new(); - for i in 0..CWND_PKTS_CA { + for _ in 0..ABOVE_APP_LIMIT_PKTS { let p = SentPacket::new( PacketType::Short, - u64::try_from(i + 3).unwrap(), // pn - now(), // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size + next_pn, // pn + now, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size ); + next_pn += 1; cc.on_packet_sent(&p); pkts.push(p); } - assert_eq!(cc.bytes_in_flight(), CWND_INITIAL / 2); - - for i in 0..CWND_PKTS_CA - 2 { - cc.on_packets_acked(&pkts[i..=i], RTT, now()); - - assert_eq!( - cc.bytes_in_flight(), - (CWND_PKTS_CA - i - 1) * MAX_DATAGRAM_SIZE - ); - assert_eq!(cc.cwnd(), CWND_PKTS_CA * MAX_DATAGRAM_SIZE); - assert_eq!(cc.acked_bytes, MAX_DATAGRAM_SIZE * (i + 1)); - } - - // Now we are app limited - for i in CWND_PKTS_CA - 2..CWND_PKTS_CA { - cc.on_packets_acked(&pkts[i..=i], RTT, now()); + assert_eq!( + cc.bytes_in_flight(), + ABOVE_APP_LIMIT_PKTS * MAX_DATAGRAM_SIZE + ); + now += RTT; + let mut last_acked_bytes = 0; + for (i, pkt) in pkts.into_iter().enumerate() { + cc.on_packets_acked(&[pkt], RTT, now); assert_eq!( cc.bytes_in_flight(), - (CWND_PKTS_CA - i - 1) * MAX_DATAGRAM_SIZE + (ABOVE_APP_LIMIT_PKTS - i - 1) * MAX_DATAGRAM_SIZE ); - assert_eq!(cc.cwnd(), CWND_PKTS_CA * MAX_DATAGRAM_SIZE); - assert_eq!(cc.acked_bytes, MAX_DATAGRAM_SIZE * 3); + cwnd_is_halved(&cc); + // increase acked_bytes with each packet + assert_ne!(cc.acked_bytes, last_acked_bytes); + last_acked_bytes = cc.acked_bytes; } } }