From 92233a11cb0a95bab1dd492fdfb308ce5bccb97a Mon Sep 17 00:00:00 2001 From: Dragana Damjanovic Date: Mon, 18 Jan 2021 21:44:54 +0100 Subject: [PATCH] Cubic (#1074) --- neqo-transport/src/cc/classic_cc.rs | 563 ++++++++++++++---------- neqo-transport/src/cc/cubic.rs | 203 +++++++++ neqo-transport/src/cc/mod.rs | 11 +- neqo-transport/src/cc/new_reno.rs | 31 +- neqo-transport/src/cc/tests/cubic.rs | 303 +++++++++++++ neqo-transport/src/cc/tests/mod.rs | 7 + neqo-transport/src/cc/tests/new_reno.rs | 131 ++++++ neqo-transport/src/recovery.rs | 3 +- neqo-transport/src/sender.rs | 9 +- 9 files changed, 1018 insertions(+), 243 deletions(-) create mode 100644 neqo-transport/src/cc/cubic.rs create mode 100644 neqo-transport/src/cc/tests/cubic.rs create mode 100644 neqo-transport/src/cc/tests/mod.rs create mode 100644 neqo-transport/src/cc/tests/new_reno.rs diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index b41969c680..57c3d5a99c 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -20,7 +20,7 @@ use crate::tracking::SentPacket; use neqo_common::{const_max, const_min, qdebug, qinfo, qlog::NeqoQlog, qtrace}; pub const CWND_INITIAL_PKTS: usize = 10; -const CWND_INITIAL: usize = const_min( +pub const CWND_INITIAL: usize = const_min( CWND_INITIAL_PKTS * MAX_DATAGRAM_SIZE, const_max(2 * MAX_DATAGRAM_SIZE, 14720), ); @@ -76,8 +76,27 @@ impl State { } pub trait WindowAdjustment: Display + Debug { - fn on_packets_acked(&mut self, curr_cwnd: usize, acked_bytes: usize) -> (usize, usize); - fn on_congestion_event(&mut self, curr_cwnd: usize, acked_bytes: usize) -> (usize, usize); + /// This is called when an ack is received. + /// The function calculates the amount of acked bytes congestion controller needs + /// to collect before increasing its cwnd by `MAX_DATAGRAM_SIZE`. + fn bytes_for_cwnd_increase( + &mut self, + curr_cwnd: usize, + new_acked_bytes: usize, + min_rtt: Duration, + now: Instant, + ) -> usize; + /// This function is called when a congestion event has beed detected and it + /// returns new (decreased) values of `curr_cwnd` and `acked_bytes`. + /// This value can be very small; the calling code is responsible for ensuring that the + /// congestion window doesn't drop below the minimum of `CWND_MIN`. + fn reduce_cwnd(&mut self, curr_cwnd: usize, acked_bytes: usize) -> (usize, usize); + /// Cubic needs this signal to reset its epoch. + fn on_app_limited(&mut self); + #[cfg(test)] + fn last_max_cwnd(&self) -> f64; + #[cfg(test)] + fn set_last_max_cwnd(&mut self, last_max_cwnd: f64); } #[derive(Debug)] @@ -127,7 +146,7 @@ impl CongestionControl for ClassicCongestionControl { } // Multi-packet version of OnPacketAckedCC - fn on_packets_acked(&mut self, acked_pkts: &[SentPacket]) { + fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) { // Check whether we are app limited before acked packets are removed // from bytes_in_flight. let is_app_limited = self.app_limited(); @@ -141,7 +160,7 @@ impl CongestionControl for ClassicCongestionControl { MAX_DATAGRAM_SIZE * PACING_BURST_SIZE, ); - let mut acked_bytes = 0; + let mut new_acked = 0; for pkt in acked_pkts.iter().filter(|pkt| pkt.cc_outstanding()) { assert!(self.bytes_in_flight >= pkt.size); self.bytes_in_flight -= pkt.size; @@ -157,17 +176,19 @@ impl CongestionControl for ClassicCongestionControl { qlog::metrics_updated(&mut self.qlog, &[QlogMetric::InRecovery(false)]); } - acked_bytes += pkt.size; + new_acked += pkt.size; } - if !is_app_limited { - self.acked_bytes += acked_bytes; + if is_app_limited { + self.cc_algorithm.on_app_limited(); + return; } qtrace!([self], "ACK received, acked_bytes = {}", self.acked_bytes); // Slow start, up to the slow start threshold. if self.congestion_window < self.ssthresh { + self.acked_bytes += new_acked; let increase = min(self.ssthresh - self.congestion_window, self.acked_bytes); self.congestion_window += increase; self.acked_bytes -= increase; @@ -180,11 +201,29 @@ impl CongestionControl for ClassicCongestionControl { } // Congestion avoidance, above the slow start threshold. if self.congestion_window >= self.ssthresh { - let (cwnd, acked_bytes) = self - .cc_algorithm - .on_packets_acked(self.congestion_window, self.acked_bytes); - self.congestion_window = cwnd; - self.acked_bytes = acked_bytes; + // The following function return the amount acked bytes a controller needs + // to collect to be allowed to increase its cwnd by MAX_DATAGRAM_SIZE. + let bytes_for_increase = self.cc_algorithm.bytes_for_cwnd_increase( + self.congestion_window, + new_acked, + min_rtt, + now, + ); + // If enough credit has been accumulated already, apply them gradually. + // If we have sudden increase in allowed rate we actually increase cwnd gently. + if self.acked_bytes >= bytes_for_increase { + self.acked_bytes = 0; + self.congestion_window += MAX_DATAGRAM_SIZE; + } + self.acked_bytes += new_acked; + if self.acked_bytes >= bytes_for_increase { + self.acked_bytes -= bytes_for_increase; + self.congestion_window += MAX_DATAGRAM_SIZE; // or is this the current MTU? + } + // The number of bytes we require can go down over time with Cubic. + // That might result in an excessive rate of increase, so limit the number of unused + // acknowledged bytes after increasing the congestion window twice. + self.acked_bytes = min(bytes_for_increase, self.acked_bytes); } qlog::metrics_updated( &mut self.qlog, @@ -291,6 +330,26 @@ impl ClassicCongestionControl { self.ssthresh } + #[cfg(test)] + pub fn set_ssthresh(&mut self, v: usize) { + self.ssthresh = v; + } + + #[cfg(test)] + pub fn last_max_cwnd(&self) -> f64 { + self.cc_algorithm.last_max_cwnd() + } + + #[cfg(test)] + pub fn set_last_max_cwnd(&mut self, last_max_cwnd: f64) { + self.cc_algorithm.set_last_max_cwnd(last_max_cwnd); + } + + #[cfg(test)] + pub fn acked_bytes(&self) -> usize { + self.acked_bytes + } + fn set_state(&mut self, state: State) { if self.state != state { qdebug!([self], "state -> {:?}", state); @@ -377,7 +436,7 @@ impl ClassicCongestionControl { if self.after_recovery_start(last_packet) { let (cwnd, acked_bytes) = self .cc_algorithm - .on_congestion_event(self.congestion_window, self.acked_bytes); + .reduce_cwnd(self.congestion_window, self.acked_bytes); self.congestion_window = max(cwnd, CWND_MIN); self.acked_bytes = acked_bytes; self.ssthresh = self.congestion_window; @@ -418,11 +477,15 @@ impl ClassicCongestionControl { #[cfg(test)] mod tests { - use super::{ClassicCongestionControl, CWND_INITIAL, CWND_MIN, PERSISTENT_CONG_THRESH}; + use super::{ + ClassicCongestionControl, WindowAdjustment, CWND_INITIAL, CWND_MIN, PERSISTENT_CONG_THRESH, + }; + use crate::cc::cubic::{Cubic, CUBIC_BETA_USIZE_DIVISOR, CUBIC_BETA_USIZE_QUOTIENT}; use crate::cc::new_reno::NewReno; use crate::cc::{CongestionControl, CWND_INITIAL_PKTS, MAX_DATAGRAM_SIZE}; use crate::packet::{PacketNumber, PacketType}; use crate::tracking::SentPacket; + use crate::CongestionControlAlgorithm; use std::convert::TryFrom; use std::time::{Duration, Instant}; use test_fixture::now; @@ -448,109 +511,6 @@ mod tests { assert_eq!(cc.ssthresh(), CWND_INITIAL / 2); } - #[test] - fn issue_876() { - let mut cc = ClassicCongestionControl::new(NewReno::default()); - let time_now = now(); - let time_before = time_now - Duration::from_millis(100); - let time_after = time_now + Duration::from_millis(150); - - let sent_packets = &[ - SentPacket::new( - PacketType::Short, - 1, // pn - time_before, // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE - 1, // size - ), - SentPacket::new( - PacketType::Short, - 2, // pn - time_before, // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE - 2, // size - ), - SentPacket::new( - PacketType::Short, - 3, // pn - time_before, // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size - ), - SentPacket::new( - PacketType::Short, - 4, // pn - time_before, // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size - ), - SentPacket::new( - PacketType::Short, - 5, // pn - time_before, // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size - ), - SentPacket::new( - PacketType::Short, - 6, // pn - time_before, // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size - ), - SentPacket::new( - PacketType::Short, - 7, // pn - time_after, // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE - 3, // size - ), - ]; - - // Send some more packets so that the cc is not app-limited. - for p in &sent_packets[..6] { - cc.on_packet_sent(p); - } - assert_eq!(cc.acked_bytes, 0); - cwnd_is_default(&cc); - assert_eq!(cc.bytes_in_flight(), 6 * MAX_DATAGRAM_SIZE - 3); - - cc.on_packets_lost(Some(time_now), None, PTO, &sent_packets[0..1]); - - // We are now in recovery - assert!(cc.recovery_packet()); - assert_eq!(cc.acked_bytes, 0); - cwnd_is_halved(&cc); - assert_eq!(cc.bytes_in_flight(), 5 * MAX_DATAGRAM_SIZE - 2); - - // Send a packet after recovery starts - cc.on_packet_sent(&sent_packets[6]); - assert!(!cc.recovery_packet()); - cwnd_is_halved(&cc); - assert_eq!(cc.acked_bytes, 0); - assert_eq!(cc.bytes_in_flight(), 6 * MAX_DATAGRAM_SIZE - 5); - - // and ack it. cwnd increases slightly - cc.on_packets_acked(&sent_packets[6..]); - assert_eq!(cc.acked_bytes, sent_packets[6].size); - cwnd_is_halved(&cc); - assert_eq!(cc.bytes_in_flight(), 5 * MAX_DATAGRAM_SIZE - 2); - - // Packet from before is lost. Should not hurt cwnd. - cc.on_packets_lost(Some(time_now), None, PTO, &sent_packets[1..2]); - assert!(!cc.recovery_packet()); - assert_eq!(cc.acked_bytes, sent_packets[6].size); - cwnd_is_halved(&cc); - assert_eq!(cc.bytes_in_flight(), 4 * MAX_DATAGRAM_SIZE); - } - fn lost(pn: PacketNumber, ack_eliciting: bool, t: Duration) -> SentPacket { SentPacket::new( PacketType::Short, @@ -562,153 +522,195 @@ mod tests { ) } - fn persistent_congestion(lost_packets: &[SentPacket]) -> bool { - let mut cc = ClassicCongestionControl::new(NewReno::default()); + fn congestion_control(cc: CongestionControlAlgorithm) -> Box { + match cc { + CongestionControlAlgorithm::NewReno => { + Box::new(ClassicCongestionControl::new(NewReno::default())) + } + CongestionControlAlgorithm::Cubic => { + Box::new(ClassicCongestionControl::new(Cubic::default())) + } + } + } + + fn persistent_congestion_by_algorithm( + cc_alg: CongestionControlAlgorithm, + reduced_cwnd: usize, + lost_packets: &[SentPacket], + persistent_expected: bool, + ) { + let mut cc = congestion_control(cc_alg); for p in lost_packets { cc.on_packet_sent(p); } cc.on_packets_lost(Some(now()), None, PTO, lost_packets); - if cc.cwnd() == CWND_INITIAL / 2 { + + let persistent = if cc.cwnd() == reduced_cwnd { false } else if cc.cwnd() == CWND_MIN { true } else { panic!("unexpected cwnd"); - } + }; + assert_eq!(persistent, persistent_expected); + } + + fn persistent_congestion(lost_packets: &[SentPacket], persistent_expected: bool) { + persistent_congestion_by_algorithm( + CongestionControlAlgorithm::NewReno, + CWND_INITIAL / 2, + lost_packets, + persistent_expected, + ); + persistent_congestion_by_algorithm( + CongestionControlAlgorithm::Cubic, + CWND_INITIAL * CUBIC_BETA_USIZE_QUOTIENT / CUBIC_BETA_USIZE_DIVISOR, + lost_packets, + persistent_expected, + ); } /// A span of exactly the PC threshold only reduces the window on loss. #[test] fn persistent_congestion_none() { - assert!(!persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, true, SUB_PC), - ])); + persistent_congestion(&[lost(1, true, ZERO), lost(2, true, SUB_PC)], false); } /// A span of just more than the PC threshold causes persistent congestion. #[test] fn persistent_congestion_simple() { - assert!(persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, true, PC), - ])); + persistent_congestion(&[lost(1, true, ZERO), lost(2, true, PC)], true); } /// Both packets need to be ack-eliciting. #[test] fn persistent_congestion_non_ack_eliciting() { - assert!(!persistent_congestion(&[ - lost(1, false, ZERO), - lost(2, true, PC), - ])); - assert!(!persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, false, PC), - ])); + persistent_congestion(&[lost(1, false, ZERO), lost(2, true, PC)], false); + persistent_congestion(&[lost(1, true, ZERO), lost(2, false, PC)], false); } /// Packets in the middle, of any type, are OK. #[test] fn persistent_congestion_middle() { - assert!(persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, false, RTT), - lost(3, true, PC), - ])); - assert!(persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, true, RTT), - lost(3, true, PC), - ])); + persistent_congestion( + &[lost(1, true, ZERO), lost(2, false, RTT), lost(3, true, PC)], + true, + ); + persistent_congestion( + &[lost(1, true, ZERO), lost(2, true, RTT), lost(3, true, PC)], + true, + ); } /// Leading non-ack-eliciting packets are skipped. #[test] fn persistent_congestion_leading_non_ack_eliciting() { - assert!(!persistent_congestion(&[ - lost(1, false, ZERO), - lost(2, true, RTT), - lost(3, true, PC), - ])); - assert!(persistent_congestion(&[ - lost(1, false, ZERO), - lost(2, true, RTT), - lost(3, true, RTT + PC), - ])); + persistent_congestion( + &[lost(1, false, ZERO), lost(2, true, RTT), lost(3, true, PC)], + false, + ); + persistent_congestion( + &[ + lost(1, false, ZERO), + lost(2, true, RTT), + lost(3, true, RTT + PC), + ], + true, + ); } /// Trailing non-ack-eliciting packets aren't relevant. #[test] fn persistent_congestion_trailing_non_ack_eliciting() { - assert!(persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, true, PC), - lost(3, false, PC + EPSILON), - ])); - assert!(!persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, true, SUB_PC), - lost(3, false, PC), - ])); + persistent_congestion( + &[ + lost(1, true, ZERO), + lost(2, true, PC), + lost(3, false, PC + EPSILON), + ], + true, + ); + persistent_congestion( + &[ + lost(1, true, ZERO), + lost(2, true, SUB_PC), + lost(3, false, PC), + ], + false, + ); } /// Gaps in the middle, of any type, restart the count. #[test] fn persistent_congestion_gap_reset() { - assert!(!persistent_congestion(&[ - lost(1, true, ZERO), - lost(3, true, PC), - ])); - assert!(!persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, true, RTT), - lost(4, true, GAP), - lost(5, true, GAP + PTO * PERSISTENT_CONG_THRESH), - ])); + persistent_congestion(&[lost(1, true, ZERO), lost(3, true, PC)], false); + persistent_congestion( + &[ + lost(1, true, ZERO), + lost(2, true, RTT), + lost(4, true, GAP), + lost(5, true, GAP + PTO * PERSISTENT_CONG_THRESH), + ], + false, + ); } /// A span either side of a gap will cause persistent congestion. #[test] fn persistent_congestion_gap_or() { - assert!(persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, true, PC), - lost(4, true, GAP), - lost(5, true, GAP + PTO), - ])); - assert!(persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, true, PTO), - lost(4, true, GAP), - lost(5, true, GAP + PC), - ])); + persistent_congestion( + &[ + lost(1, true, ZERO), + lost(2, true, PC), + lost(4, true, GAP), + lost(5, true, GAP + PTO), + ], + true, + ); + persistent_congestion( + &[ + lost(1, true, ZERO), + lost(2, true, PTO), + lost(4, true, GAP), + lost(5, true, GAP + PC), + ], + true, + ); } /// A gap only restarts after an ack-eliciting packet. #[test] fn persistent_congestion_gap_non_ack_eliciting() { - assert!(!persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, true, PTO), - lost(4, false, GAP), - lost(5, true, GAP + PC), - ])); - assert!(!persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, true, PTO), - lost(4, false, GAP), - lost(5, true, GAP + RTT), - lost(6, true, GAP + RTT + SUB_PC), - ])); - assert!(persistent_congestion(&[ - lost(1, true, ZERO), - lost(2, true, PTO), - lost(4, false, GAP), - lost(5, true, GAP + RTT), - lost(6, true, GAP + RTT + PC), - ])); + persistent_congestion( + &[ + lost(1, true, ZERO), + lost(2, true, PTO), + lost(4, false, GAP), + lost(5, true, GAP + PC), + ], + false, + ); + persistent_congestion( + &[ + lost(1, true, ZERO), + lost(2, true, PTO), + lost(4, false, GAP), + lost(5, true, GAP + RTT), + lost(6, true, GAP + RTT + SUB_PC), + ], + false, + ); + persistent_congestion( + &[ + lost(1, true, ZERO), + lost(2, true, PTO), + lost(4, false, GAP), + lost(5, true, GAP + RTT), + lost(6, true, GAP + RTT + PC), + ], + true, + ); } /// Get a time, in multiples of `PTO`, relative to `now()`. @@ -738,8 +740,12 @@ mod tests { /// Call `detect_persistent_congestion` using times relative to now and the fixed PTO time. /// `last_ack` and `rtt_time` are times in multiples of `PTO`, relative to `now()`, /// for the time of the largest acknowledged and the first RTT sample, respectively. - fn persistent_congestion_by_pto(last_ack: u32, rtt_time: u32, lost: &[SentPacket]) -> bool { - let mut cc = ClassicCongestionControl::new(NewReno::default()); + fn persistent_congestion_by_pto( + mut cc: ClassicCongestionControl, + last_ack: u32, + rtt_time: u32, + lost: &[SentPacket], + ) -> bool { assert_eq!(cc.cwnd(), CWND_INITIAL); let last_ack = Some(by_pto(last_ack)); @@ -759,14 +765,36 @@ mod tests { #[test] fn persistent_congestion_no_lost() { let lost = make_lost(&[]); - assert!(!persistent_congestion_by_pto(0, 0, &lost)); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(NewReno::default()), + 0, + 0, + &lost + )); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(Cubic::default()), + 0, + 0, + &lost + )); } /// No persistent congestion can be had if there is only one lost packet. #[test] fn persistent_congestion_one_lost() { let lost = make_lost(&[1]); - assert!(!persistent_congestion_by_pto(0, 0, &lost)); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(NewReno::default()), + 0, + 0, + &lost + )); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(Cubic::default()), + 0, + 0, + &lost + )); } /// Persistent congestion can't happen based on old packets. @@ -775,9 +803,42 @@ mod tests { // Packets sent prior to either the last acknowledged or the first RTT // sample are not considered. So 0 is ignored. let lost = make_lost(&[0, PERSISTENT_CONG_THRESH + 1, PERSISTENT_CONG_THRESH + 2]); - assert!(!persistent_congestion_by_pto(1, 1, &lost)); - assert!(!persistent_congestion_by_pto(0, 1, &lost)); - assert!(!persistent_congestion_by_pto(1, 0, &lost)); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(NewReno::default()), + 1, + 1, + &lost + )); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(NewReno::default()), + 0, + 1, + &lost + )); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(NewReno::default()), + 1, + 0, + &lost + )); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(Cubic::default()), + 1, + 1, + &lost + )); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(Cubic::default()), + 0, + 1, + &lost + )); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(Cubic::default()), + 1, + 0, + &lost + )); } /// Persistent congestion doesn't start unless the packet is ack-eliciting. @@ -792,7 +853,18 @@ mod tests { Vec::new(), lost[0].size, ); - assert!(!persistent_congestion_by_pto(0, 0, &lost)); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(NewReno::default()), + 0, + 0, + &lost + )); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(Cubic::default()), + 0, + 0, + &lost + )); } /// Detect persistent congestion. Note that the first lost packet needs to have a time @@ -801,26 +873,63 @@ mod tests { #[test] fn persistent_congestion_min() { let lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]); - assert!(persistent_congestion_by_pto(0, 0, &lost)); + assert!(persistent_congestion_by_pto( + ClassicCongestionControl::new(NewReno::default()), + 0, + 0, + &lost + )); + assert!(persistent_congestion_by_pto( + ClassicCongestionControl::new(Cubic::default()), + 0, + 0, + &lost + )); } /// Make sure that not having a previous largest acknowledged also results /// in detecting persistent congestion. (This is not expected to happen, but /// the code permits it). #[test] - fn persistent_congestion_no_prev_ack() { + fn persistent_congestion_no_prev_ack_newreno() { let lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]); let mut cc = ClassicCongestionControl::new(NewReno::default()); cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, &lost); assert_eq!(cc.cwnd(), CWND_MIN); } + #[test] + fn persistent_congestion_no_prev_ack_cubic() { + let lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]); + let mut cc = ClassicCongestionControl::new(Cubic::default()); + cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, &lost); + assert_eq!(cc.cwnd(), CWND_MIN); + } + + /// The code asserts on ordering errors. + #[test] + #[should_panic] + fn persistent_congestion_unsorted_newreno() { + let lost = make_lost(&[PERSISTENT_CONG_THRESH + 2, 1]); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(NewReno::default()), + 0, + 0, + &lost + )); + } + /// The code asserts on ordering errors. #[test] #[should_panic] - fn persistent_congestion_unsorted() { + fn persistent_congestion_unsorted_cubic() { let lost = make_lost(&[PERSISTENT_CONG_THRESH + 2, 1]); - assert!(!persistent_congestion_by_pto(0, 0, &lost)); + assert!(!persistent_congestion_by_pto( + ClassicCongestionControl::new(Cubic::default()), + 0, + 0, + &lost + )); } #[test] @@ -850,7 +959,7 @@ mod tests { Vec::new(), // tokens MAX_DATAGRAM_SIZE, // size ); - cc.on_packets_acked(&[acked]); + cc.on_packets_acked(&[acked], RTT, now()); assert_eq!( cc.bytes_in_flight(), @@ -869,7 +978,7 @@ mod tests { Vec::new(), // tokens MAX_DATAGRAM_SIZE, // size )]; - cc.on_packets_acked(&p); + cc.on_packets_acked(&p, RTT, now()); assert_eq!( cc.bytes_in_flight(), @@ -908,7 +1017,7 @@ mod tests { MAX_DATAGRAM_SIZE, // size ); cc.on_packet_sent(&p_not_lost); - cc.on_packets_acked(&[p_not_lost]); + cc.on_packets_acked(&[p_not_lost], RTT, now()); cwnd_is_halved(&cc); // cc is app limited therefore cwnd in not increased. assert_eq!(cc.acked_bytes, 0); @@ -930,7 +1039,7 @@ mod tests { assert_eq!(cc.bytes_in_flight(), CWND_INITIAL / 2); for i in 0..CWND_PKTS_CA - 2 { - cc.on_packets_acked(&pkts[i..=i]); + cc.on_packets_acked(&pkts[i..=i], RTT, now()); assert_eq!( cc.bytes_in_flight(), @@ -942,7 +1051,7 @@ mod tests { // Now we are app limited for i in CWND_PKTS_CA - 2..CWND_PKTS_CA { - cc.on_packets_acked(&pkts[i..=i]); + cc.on_packets_acked(&pkts[i..=i], RTT, now()); assert_eq!( cc.bytes_in_flight(), diff --git a/neqo-transport/src/cc/cubic.rs b/neqo-transport/src/cc/cubic.rs new file mode 100644 index 0000000000..285037ff15 --- /dev/null +++ b/neqo-transport/src/cc/cubic.rs @@ -0,0 +1,203 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![deny(clippy::pedantic)] + +use std::fmt::{self, Display}; +use std::time::{Duration, Instant}; + +use crate::cc::{classic_cc::WindowAdjustment, MAX_DATAGRAM_SIZE_F64}; +use neqo_common::qtrace; +use std::convert::TryFrom; + +// CUBIC congestion control + +// C is a constant fixed to determine the aggressiveness of window +// increase in high BDP networks. +pub const CUBIC_C: f64 = 0.4; +pub const CUBIC_ALPHA: f64 = 3.0 * (1.0 - 0.7) / (1.0 + 0.7); + +// CUBIC_BETA = 0.7; +pub const CUBIC_BETA_USIZE_QUOTIENT: usize = 7; +pub const CUBIC_BETA_USIZE_DIVISOR: usize = 10; + +/// The fast convergence ratio further reduces the congestion window when a congestion event +/// occurs before reaching the previous `W_max`. +pub const CUBIC_FAST_CONVERGENCE: f64 = 0.85; // (1.0 + CUBIC_BETA) / 2.0; + +/// The minimum number of multiples of the datagram size that need +/// to be received to cause an increase in the congestion window. +/// When there is no loss, Cubic can return to exponential increase, but +/// this value reduces the magnitude of the resulting growth by a constant factor. +/// A value of 1.0 would mean a return to the rate used in slow start. +const EXPONENTIAL_GROWTH_REDUCTION: f64 = 2.0; + +fn convert_to_f64(v: usize) -> f64 { + assert!(v < (1 << 53)); + let mut f_64 = f64::try_from(u32::try_from(v >> 21).unwrap()).unwrap(); + f_64 *= 2_097_152.0; // f_64 <<= 21 + f_64 += f64::try_from(u32::try_from(v & 0x1f_ffff).unwrap()).unwrap(); + f_64 +} + +#[derive(Debug)] +pub struct Cubic { + last_max_cwnd: f64, + estimated_tcp_cwnd: f64, + k: f64, + w_max: f64, + ca_epoch_start: Option, + last_phase_was_tcp: bool, + tcp_acked_bytes: f64, +} + +impl Default for Cubic { + fn default() -> Self { + Self { + last_max_cwnd: 0.0, + estimated_tcp_cwnd: 0.0, + k: 0.0, + w_max: 0.0, + ca_epoch_start: None, + last_phase_was_tcp: false, + tcp_acked_bytes: 0.0, + } + } +} + +impl Display for Cubic { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Cubic [last_max_cwnd: {}, k: {}, w_max: {}, ca_epoch_start: {:?}]", + self.last_max_cwnd, self.k, self.w_max, self.ca_epoch_start + )?; + Ok(()) + } +} + +#[allow(clippy::doc_markdown)] +impl Cubic { + /// Original equations is: + /// K = cubic_root(W_max*(1-beta_cubic)/C) (Eq. 2 RFC8312) + /// W_max is number of segments of the maximum segment size (MSS). + /// + /// K is actually the time that W_cubic(t) = C*(t-K)^3 + W_max (Eq. 1) would + /// take to increase to W_max. We use bytes not MSS units, therefore this + /// equation will be: W_cubic(t) = C*MSS*(t-K)^3 + W_max. + /// + /// From that equation we can calculate K as: + /// K = cubic_root((W_max - W_cubic) / C / MSS); + fn calc_k(&self, curr_cwnd: f64) -> f64 { + ((self.w_max - curr_cwnd) / CUBIC_C / MAX_DATAGRAM_SIZE_F64).cbrt() + } + + /// W_cubic(t) = C*(t-K)^3 + W_max (Eq. 1) + /// t is relative to the start of the congestion avoidance phase and it is in seconds. + fn w_cubic(&self, t: f64) -> f64 { + CUBIC_C * (t - self.k).powi(3) * MAX_DATAGRAM_SIZE_F64 + self.w_max + } + + fn start_epoch(&mut self, curr_cwnd_f64: f64, new_acked_f64: f64, now: Instant) { + self.ca_epoch_start = Some(now); + // reset tcp_acked_bytes and estimated_tcp_cwnd; + self.tcp_acked_bytes = new_acked_f64; + self.estimated_tcp_cwnd = curr_cwnd_f64; + if self.last_max_cwnd <= curr_cwnd_f64 { + self.w_max = curr_cwnd_f64; + self.k = 0.0; + } else { + self.w_max = self.last_max_cwnd; + self.k = self.calc_k(curr_cwnd_f64); + } + qtrace!([self], "New epoch"); + } +} + +impl WindowAdjustment for Cubic { + // This is because of the cast in the last line from f64 to usize. + #[allow(clippy::cast_possible_truncation)] + #[allow(clippy::cast_sign_loss)] + fn bytes_for_cwnd_increase( + &mut self, + curr_cwnd: usize, + new_acked_bytes: usize, + min_rtt: Duration, + now: Instant, + ) -> usize { + let curr_cwnd_f64 = convert_to_f64(curr_cwnd); + let new_acked_f64 = convert_to_f64(new_acked_bytes); + if self.ca_epoch_start.is_none() { + // This is a start of a new congestion avoidance phase. + self.start_epoch(curr_cwnd_f64, new_acked_f64, now); + } else { + self.tcp_acked_bytes += new_acked_f64; + } + + let time_ca = self + .ca_epoch_start + .map_or(min_rtt, |t| now + min_rtt - t) + .as_secs_f64(); + let target_cubic = self.w_cubic(time_ca); + + let tcp_cnt = self.estimated_tcp_cwnd / CUBIC_ALPHA; + while self.tcp_acked_bytes > tcp_cnt { + self.tcp_acked_bytes -= tcp_cnt; + self.estimated_tcp_cwnd += MAX_DATAGRAM_SIZE_F64; + } + + let target_cwnd = target_cubic.max(self.estimated_tcp_cwnd); + + // Calculate the number of bytes that would need to be acknowledged for an increase + // of `MAX_DATAGRAM_SIZE` to match the increase of `target - cwnd / cwnd` as defined + // in the specification (Sections 4.4 and 4.5). + // The amount of data required therefore reduces asymptotically as the target increases. + // If the target is not significantly higher than the congestion window, require a very large + // amount of acknowledged data (effectively block increases). + let mut acked_to_increase = + MAX_DATAGRAM_SIZE_F64 * curr_cwnd_f64 / (target_cwnd - curr_cwnd_f64).max(1.0); + + // Limit increase to max 1 MSS per EXPONENTIAL_GROWTH_REDUCTION ack packets. + // This effectively limits target_cwnd to (1 + 1 / EXPONENTIAL_GROWTH_REDUCTION) cwnd. + acked_to_increase = + acked_to_increase.max(EXPONENTIAL_GROWTH_REDUCTION * MAX_DATAGRAM_SIZE_F64); + acked_to_increase as usize + } + + fn reduce_cwnd(&mut self, curr_cwnd: usize, acked_bytes: usize) -> (usize, usize) { + let curr_cwnd_f64 = convert_to_f64(curr_cwnd); + // Fast Convergence + // If congestion event occurs before the maximum congestion window before the last congestion event, + // we reduce the the maximum congestion window and thereby W_max. + // check cwnd + MAX_DATAGRAM_SIZE instead of cwnd because with cwnd in bytes, cwnd may be slightly off. + self.last_max_cwnd = if curr_cwnd_f64 + MAX_DATAGRAM_SIZE_F64 < self.last_max_cwnd { + curr_cwnd_f64 * CUBIC_FAST_CONVERGENCE + } else { + curr_cwnd_f64 + }; + self.ca_epoch_start = None; + ( + curr_cwnd * CUBIC_BETA_USIZE_QUOTIENT / CUBIC_BETA_USIZE_DIVISOR, + acked_bytes * CUBIC_BETA_USIZE_QUOTIENT / CUBIC_BETA_USIZE_DIVISOR, + ) + } + + fn on_app_limited(&mut self) { + // Reset ca_epoch_start. Let it start again when the congestion controller + // exits the app-limited period. + self.ca_epoch_start = None; + } + + #[cfg(test)] + fn last_max_cwnd(&self) -> f64 { + self.last_max_cwnd + } + + #[cfg(test)] + fn set_last_max_cwnd(&mut self, last_max_cwnd: f64) { + self.last_max_cwnd = last_max_cwnd; + } +} diff --git a/neqo-transport/src/cc/mod.rs b/neqo-transport/src/cc/mod.rs index b469d023a3..4b8d442f6d 100644 --- a/neqo-transport/src/cc/mod.rs +++ b/neqo-transport/src/cc/mod.rs @@ -15,13 +15,16 @@ use std::fmt::{Debug, Display}; use std::time::{Duration, Instant}; mod classic_cc; +mod cubic; mod new_reno; pub use classic_cc::ClassicCongestionControl; -pub use classic_cc::{CWND_INITIAL_PKTS, CWND_MIN}; +pub use classic_cc::{CWND_INITIAL, CWND_INITIAL_PKTS, CWND_MIN}; +pub use cubic::Cubic; pub use new_reno::NewReno; pub const MAX_DATAGRAM_SIZE: usize = PATH_MTU_V6; +pub const MAX_DATAGRAM_SIZE_F64: f64 = 1337.0; pub trait CongestionControl: Display + Debug { fn set_qlog(&mut self, qlog: NeqoQlog); @@ -32,7 +35,7 @@ pub trait CongestionControl: Display + Debug { fn cwnd_avail(&self) -> usize; - fn on_packets_acked(&mut self, acked_pkts: &[SentPacket]); + fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant); fn on_packets_lost( &mut self, @@ -52,4 +55,8 @@ pub trait CongestionControl: Display + Debug { #[derive(Debug, Copy, Clone)] pub enum CongestionControlAlgorithm { NewReno, + Cubic, } + +#[cfg(test)] +mod tests; diff --git a/neqo-transport/src/cc/new_reno.rs b/neqo-transport/src/cc/new_reno.rs index a398887d61..8272c4f3ec 100644 --- a/neqo-transport/src/cc/new_reno.rs +++ b/neqo-transport/src/cc/new_reno.rs @@ -9,8 +9,8 @@ use std::fmt::{self, Display}; -use crate::cc::{classic_cc::WindowAdjustment, MAX_DATAGRAM_SIZE}; -use neqo_common::qinfo; +use crate::cc::classic_cc::WindowAdjustment; +use std::time::{Duration, Instant}; #[derive(Debug)] pub struct NewReno {} @@ -29,16 +29,27 @@ impl Display for NewReno { } impl WindowAdjustment for NewReno { - fn on_packets_acked(&mut self, mut curr_cwnd: usize, mut acked_bytes: usize) -> (usize, usize) { - if acked_bytes >= curr_cwnd { - acked_bytes -= curr_cwnd; - curr_cwnd += MAX_DATAGRAM_SIZE; - qinfo!([self], "congestion avoidance += {}", MAX_DATAGRAM_SIZE); - } - (curr_cwnd, acked_bytes) + fn bytes_for_cwnd_increase( + &mut self, + curr_cwnd: usize, + _new_acked_bytes: usize, + _min_rtt: Duration, + _now: Instant, + ) -> usize { + curr_cwnd } - fn on_congestion_event(&mut self, curr_cwnd: usize, acked_bytes: usize) -> (usize, usize) { + fn reduce_cwnd(&mut self, curr_cwnd: usize, acked_bytes: usize) -> (usize, usize) { (curr_cwnd / 2, acked_bytes / 2) } + + fn on_app_limited(&mut self) {} + + #[cfg(test)] + fn last_max_cwnd(&self) -> f64 { + 0.0 + } + + #[cfg(test)] + fn set_last_max_cwnd(&mut self, _last_max_cwnd: f64) {} } diff --git a/neqo-transport/src/cc/tests/cubic.rs b/neqo-transport/src/cc/tests/cubic.rs new file mode 100644 index 0000000000..d019c1d4cb --- /dev/null +++ b/neqo-transport/src/cc/tests/cubic.rs @@ -0,0 +1,303 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![allow(clippy::cast_possible_truncation)] +#![allow(clippy::cast_sign_loss)] + +use crate::cc::{ + classic_cc::{ClassicCongestionControl, CWND_INITIAL}, + cubic::{ + Cubic, CUBIC_ALPHA, CUBIC_BETA_USIZE_DIVISOR, CUBIC_BETA_USIZE_QUOTIENT, CUBIC_C, + CUBIC_FAST_CONVERGENCE, + }, + CongestionControl, MAX_DATAGRAM_SIZE, MAX_DATAGRAM_SIZE_F64, +}; +use crate::packet::PacketType; +use crate::tracking::SentPacket; +use std::convert::TryFrom; +use std::ops::Sub; +use std::time::{Duration, Instant}; +use test_fixture::now; + +const RTT: Duration = Duration::from_millis(100); +const CWND_INITIAL_F64: f64 = 10.0 * MAX_DATAGRAM_SIZE_F64; +const CWND_INITIAL_10_F64: f64 = 10.0 * CWND_INITIAL_F64; +const CWND_INITIAL_10: usize = 10 * CWND_INITIAL; +const CWND_AFTER_LOSS: usize = CWND_INITIAL * CUBIC_BETA_USIZE_QUOTIENT / CUBIC_BETA_USIZE_DIVISOR; +const CWND_AFTER_LOSS_SLOW_START: usize = + (CWND_INITIAL + MAX_DATAGRAM_SIZE) * CUBIC_BETA_USIZE_QUOTIENT / CUBIC_BETA_USIZE_DIVISOR; + +fn fill_cwnd(cc: &mut ClassicCongestionControl, mut next_pn: u64, now: Instant) -> u64 { + while cc.bytes_in_flight() < cc.cwnd() { + let sent = SentPacket::new( + PacketType::Short, + next_pn, // pn + now, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size + ); + cc.on_packet_sent(&sent); + next_pn += 1; + } + next_pn +} + +fn ack_packet(cc: &mut ClassicCongestionControl, pn: u64, now: Instant) { + let acked = SentPacket::new( + PacketType::Short, + pn, // pn + now, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size + ); + cc.on_packets_acked(&[acked], RTT, now); +} + +fn packet_lost(cc: &mut ClassicCongestionControl, pn: u64) { + const PTO: Duration = Duration::from_millis(120); + let p_lost = SentPacket::new( + PacketType::Short, + pn, // pn + now(), // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size + ); + cc.on_packets_lost(None, None, PTO, &[p_lost]); +} + +fn expected_tcp_acks(cwnd_rtt_start: usize) -> u64 { + (f64::try_from(i32::try_from(cwnd_rtt_start).unwrap()).unwrap() + / MAX_DATAGRAM_SIZE_F64 + / CUBIC_ALPHA) + .round() as u64 +} + +#[test] +fn tcp_phase() { + let mut cubic = ClassicCongestionControl::new(Cubic::default()); + + // change to congestion avoidance state. + cubic.set_ssthresh(1); + + let mut now = now(); + let start_time = now; + // helper variables to remember the next packet number to be sent/acked. + let mut next_pn_send = 0; + let mut next_pn_ack = 0; + + next_pn_send = fill_cwnd(&mut cubic, next_pn_send, now); + + // This will start with TCP phase. + // in this phase cwnd is increase by CUBIC_ALPHA every RTT. We can look at it as + // increase of MAX_DATAGRAM_SIZE every 1 / CUBIC_ALPHA RTTs. + // The phase will end when cwnd calculated with cubic equation is equal to TCP estimate: + // CUBIC_C * (n * RTT / CUBIC_ALPHA)^3 * MAX_DATAGRAM_SIZE = n * MAX_DATAGRAM_SIZE + // from this n = sqrt(CUBIC_ALPHA^3/ (CUBIC_C * RTT^3)). + let num_tcp_increases = (CUBIC_ALPHA.powi(3) / (CUBIC_C * RTT.as_secs_f64().powi(3))) + .sqrt() + .floor() as u64; + + for _ in 0..num_tcp_increases { + let cwnd_rtt_start = cubic.cwnd(); + //Expected acks during a period of RTT / CUBIC_ALPHA. + let acks = expected_tcp_acks(cwnd_rtt_start); + // The time between acks if they are ideally paced over a RTT. + let time_increase = RTT / u32::try_from(cwnd_rtt_start / MAX_DATAGRAM_SIZE).unwrap(); + + for _ in 0..acks { + now += time_increase; + ack_packet(&mut cubic, next_pn_ack, now); + next_pn_ack += 1; + next_pn_send = fill_cwnd(&mut cubic, next_pn_send, now); + } + + assert_eq!(cubic.cwnd() - cwnd_rtt_start, MAX_DATAGRAM_SIZE); + } + + // The next increase will be according to the cubic equation. + + let cwnd_rtt_start = cubic.cwnd(); + // cwnd_rtt_start has change, therefore calculate new time_increase (the time + // between acks if they are ideally paced over a RTT). + let time_increase = RTT / u32::try_from(cwnd_rtt_start / MAX_DATAGRAM_SIZE).unwrap(); + let mut num_acks = 0; // count the number of acks. until cwnd is increased by MAX_DATAGRAM_SIZE. + + while cwnd_rtt_start == cubic.cwnd() { + num_acks += 1; + now += time_increase; + ack_packet(&mut cubic, next_pn_ack, now); + next_pn_ack += 1; + next_pn_send = fill_cwnd(&mut cubic, next_pn_send, now); + } + + // Make sure that the increase is not according to TCP equation, i.e., that it took + // less than RTT / CUBIC_ALPHA. + let expected_ack_tcp_increase = expected_tcp_acks(cwnd_rtt_start); + assert!(num_acks < expected_ack_tcp_increase); + + // This first increase after a TCP phase may be shorter than what it would take by a regular cubic phase, + // because of the proper byte counting and the credit it already had before entering this phase. Therefore + // We will perform another round and compare it to expected increase using the cubic equation. + + let cwnd_rtt_start_after_tcp = cubic.cwnd(); + let elapsed_time = now - start_time; + + // calculate new time_increase. + let time_increase = RTT / u32::try_from(cwnd_rtt_start_after_tcp / MAX_DATAGRAM_SIZE).unwrap(); + let mut num_acks2 = 0; // count the number of acks. until cwnd is increased by MAX_DATAGRAM_SIZE. + + while cwnd_rtt_start_after_tcp == cubic.cwnd() { + num_acks2 += 1; + now += time_increase; + ack_packet(&mut cubic, next_pn_ack, now); + next_pn_ack += 1; + next_pn_send = fill_cwnd(&mut cubic, next_pn_send, now); + } + + let expected_ack_tcp_increase2 = expected_tcp_acks(cwnd_rtt_start_after_tcp); + assert!(num_acks2 < expected_ack_tcp_increase2); + + // The time needed to increase cwnd by MAX_DATAGRAM_SIZE using the cubic equation will be calculates from: + // W_cubic(elapsed_time + t_to_increase) - W_cubis(elapsed_time) = MAX_DATAGRAM_SIZE => + // CUBIC_C * (elapsed_time + t_to_increase)^3 * MAX_DATAGRAM_SIZE + CWND_INITIAL - + // CUBIC_C * elapsed_time^3 * MAX_DATAGRAM_SIZE + CWND_INITIAL = MAX_DATAGRAM_SIZE => + // t_to_increase = cbrt((1 + CUBIC_C * elapsed_time^3) / CUBIC_C) - elapsed_time + // (t_to_increase is in seconds) + // number of ack needed is t_to_increase / time_increase. + let expected_ack_cubic_increase = + ((((1.0 + CUBIC_C * (elapsed_time).as_secs_f64().powi(3)) / CUBIC_C).cbrt() + - elapsed_time.as_secs_f64()) + / time_increase.as_secs_f64()) + .ceil() as u64; + // num_acks is very close to the calculated value. The exact value is hard to calculate + // because the proportional increase(i.e. curr_cwnd_f64 / (target - curr_cwnd_f64) * MAX_DATAGRAM_SIZE_F64) + // and the byte counting. + assert_eq!(num_acks2, expected_ack_cubic_increase + 2); +} + +#[test] +fn cubic_phase() { + let mut cubic = ClassicCongestionControl::new(Cubic::default()); + // Set last_max_cwnd to a higher number make sure that cc is the cubic phase (cwnd is calculated by the cubic equation). + cubic.set_last_max_cwnd(CWND_INITIAL_10_F64); + // Set ssthresh to something small to make sure that cc is in the congection avoidance phase. + cubic.set_ssthresh(1); + let mut now = now(); + let mut next_pn_send = 0; + let mut next_pn_ack = 0; + + next_pn_send = fill_cwnd(&mut cubic, next_pn_send, now); + + let k = ((CWND_INITIAL_10_F64 - CWND_INITIAL_F64) / CUBIC_C / MAX_DATAGRAM_SIZE_F64).cbrt(); + let epoch_start = now; + + // The number of RTT until W_max is reached. + let num_rtts_w_max = (k / RTT.as_secs_f64()).round() as u64; + for _ in 0..num_rtts_w_max { + let cwnd_rtt_start = cubic.cwnd(); + //Expected acks + let acks = cwnd_rtt_start / MAX_DATAGRAM_SIZE; + let time_increase = RTT / u32::try_from(acks).unwrap(); + for _ in 0..acks { + now += time_increase; + ack_packet(&mut cubic, next_pn_ack, now); + next_pn_ack += 1; + next_pn_send = fill_cwnd(&mut cubic, next_pn_send, now); + } + + let expected = + (CUBIC_C * ((now - epoch_start).as_secs_f64() - k).powi(3) * MAX_DATAGRAM_SIZE_F64 + + CWND_INITIAL_10_F64) + .round() as usize; + + assert_within(cubic.cwnd(), expected, MAX_DATAGRAM_SIZE); + } + assert_eq!(cubic.cwnd(), CWND_INITIAL_10); +} + +fn assert_within + PartialOrd + Copy>(value: T, expected: T, margin: T) { + if value >= expected { + assert!(value - expected < margin); + } else { + assert!(expected - value < margin); + } +} + +#[test] +fn congestion_event_slow_start() { + let mut cubic = ClassicCongestionControl::new(Cubic::default()); + + let _ = fill_cwnd(&mut cubic, 0, now()); + ack_packet(&mut cubic, 0, now()); + + assert_within(cubic.last_max_cwnd(), 0.0, f64::EPSILON); + + // cwnd is increased by 1 in slow start phase, after an ack. + assert_eq!(cubic.cwnd(), CWND_INITIAL + MAX_DATAGRAM_SIZE); + + // Trigger a congestion_event in slow start phase + packet_lost(&mut cubic, 1); + + // last_max_cwnd is equal to cwnd before decrease. + assert_within( + cubic.last_max_cwnd(), + CWND_INITIAL_F64 + MAX_DATAGRAM_SIZE_F64, + f64::EPSILON, + ); + assert_eq!(cubic.cwnd(), CWND_AFTER_LOSS_SLOW_START); +} + +#[test] +fn congestion_event_congestion_avoidance() { + let mut cubic = ClassicCongestionControl::new(Cubic::default()); + + // Set ssthresh to something small to make sure that cc is in the congection avoidance phase. + cubic.set_ssthresh(1); + + // Set last_max_cwnd to something smaller than cwnd so that the fast convergence is not triggered. + cubic.set_last_max_cwnd(3.0 * MAX_DATAGRAM_SIZE_F64); + + let _ = fill_cwnd(&mut cubic, 0, now()); + ack_packet(&mut cubic, 0, now()); + + assert_eq!(cubic.cwnd(), CWND_INITIAL); + + // Trigger a congestion_event in slow start phase + packet_lost(&mut cubic, 1); + + assert_within(cubic.last_max_cwnd(), CWND_INITIAL_F64, f64::EPSILON); + assert_eq!(cubic.cwnd(), CWND_AFTER_LOSS); +} + +#[test] +fn congestion_event_congestion_avoidance_2() { + let mut cubic = ClassicCongestionControl::new(Cubic::default()); + + // Set ssthresh to something small to make sure that cc is in the congection avoidance phase. + cubic.set_ssthresh(1); + + // Set last_max_cwnd to something higher than cwnd so that the fast convergence is triggered. + cubic.set_last_max_cwnd(CWND_INITIAL_10_F64); + + let _ = fill_cwnd(&mut cubic, 0, now()); + ack_packet(&mut cubic, 0, now()); + + assert_within(cubic.last_max_cwnd(), CWND_INITIAL_10_F64, f64::EPSILON); + assert_eq!(cubic.cwnd(), CWND_INITIAL); + + // Trigger a congestion_event. + packet_lost(&mut cubic, 1); + + assert_within( + cubic.last_max_cwnd(), + CWND_INITIAL_F64 * CUBIC_FAST_CONVERGENCE, + f64::EPSILON, + ); + assert_eq!(cubic.cwnd(), CWND_AFTER_LOSS); +} diff --git a/neqo-transport/src/cc/tests/mod.rs b/neqo-transport/src/cc/tests/mod.rs new file mode 100644 index 0000000000..238a7ad012 --- /dev/null +++ b/neqo-transport/src/cc/tests/mod.rs @@ -0,0 +1,7 @@ +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +mod cubic; +mod new_reno; diff --git a/neqo-transport/src/cc/tests/new_reno.rs b/neqo-transport/src/cc/tests/new_reno.rs new file mode 100644 index 0000000000..6376e64bff --- /dev/null +++ b/neqo-transport/src/cc/tests/new_reno.rs @@ -0,0 +1,131 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// Congestion control +#![deny(clippy::pedantic)] + +use crate::cc::new_reno::NewReno; +use crate::cc::{ClassicCongestionControl, CongestionControl, CWND_INITIAL, MAX_DATAGRAM_SIZE}; +use crate::packet::PacketType; +use crate::tracking::SentPacket; +use std::time::Duration; +use test_fixture::now; + +const PTO: Duration = Duration::from_millis(100); +const RTT: Duration = Duration::from_millis(98); + +fn cwnd_is_default(cc: &ClassicCongestionControl) { + assert_eq!(cc.cwnd(), CWND_INITIAL); + assert_eq!(cc.ssthresh(), usize::MAX); +} + +fn cwnd_is_halved(cc: &ClassicCongestionControl) { + assert_eq!(cc.cwnd(), CWND_INITIAL / 2); + assert_eq!(cc.ssthresh(), CWND_INITIAL / 2); +} + +#[test] +fn issue_876() { + let mut cc = ClassicCongestionControl::new(NewReno::default()); + let time_now = now(); + let time_before = time_now - Duration::from_millis(100); + let time_after = time_now + Duration::from_millis(150); + + let sent_packets = &[ + SentPacket::new( + PacketType::Short, + 1, // pn + time_before, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE - 1, // size + ), + SentPacket::new( + PacketType::Short, + 2, // pn + time_before, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE - 2, // size + ), + SentPacket::new( + PacketType::Short, + 3, // pn + time_before, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size + ), + SentPacket::new( + PacketType::Short, + 4, // pn + time_before, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size + ), + SentPacket::new( + PacketType::Short, + 5, // pn + time_before, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size + ), + SentPacket::new( + PacketType::Short, + 6, // pn + time_before, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size + ), + SentPacket::new( + PacketType::Short, + 7, // pn + time_after, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE - 3, // size + ), + ]; + + // Send some more packets so that the cc is not app-limited. + for p in &sent_packets[..6] { + cc.on_packet_sent(p); + } + assert_eq!(cc.acked_bytes(), 0); + cwnd_is_default(&cc); + assert_eq!(cc.bytes_in_flight(), 6 * MAX_DATAGRAM_SIZE - 3); + + cc.on_packets_lost(Some(time_now), None, PTO, &sent_packets[0..1]); + + // We are now in recovery + assert!(cc.recovery_packet()); + assert_eq!(cc.acked_bytes(), 0); + cwnd_is_halved(&cc); + assert_eq!(cc.bytes_in_flight(), 5 * MAX_DATAGRAM_SIZE - 2); + + // Send a packet after recovery starts + cc.on_packet_sent(&sent_packets[6]); + assert!(!cc.recovery_packet()); + cwnd_is_halved(&cc); + assert_eq!(cc.acked_bytes(), 0); + assert_eq!(cc.bytes_in_flight(), 6 * MAX_DATAGRAM_SIZE - 5); + + // and ack it. cwnd increases slightly + cc.on_packets_acked(&sent_packets[6..], RTT, time_now); + assert_eq!(cc.acked_bytes(), sent_packets[6].size); + cwnd_is_halved(&cc); + assert_eq!(cc.bytes_in_flight(), 5 * MAX_DATAGRAM_SIZE - 2); + + // Packet from before is lost. Should not hurt cwnd. + cc.on_packets_lost(Some(time_now), None, PTO, &sent_packets[1..2]); + assert!(!cc.recovery_packet()); + assert_eq!(cc.acked_bytes(), sent_packets[6].size); + cwnd_is_halved(&cc); + assert_eq!(cc.bytes_in_flight(), 4 * MAX_DATAGRAM_SIZE); +} diff --git a/neqo-transport/src/recovery.rs b/neqo-transport/src/recovery.rs index f5356f1176..565407c644 100644 --- a/neqo-transport/src/recovery.rs +++ b/neqo-transport/src/recovery.rs @@ -771,7 +771,8 @@ impl LossRecovery { // This must happen after on_packets_lost. If in recovery, this could // take us out, and then lost packets will start a new recovery period // when it shouldn't. - self.packet_sender.on_packets_acked(&acked_packets); + self.packet_sender + .on_packets_acked(&acked_packets, self.rtt_vals.min_rtt, now); self.pto_state = None; diff --git a/neqo-transport/src/sender.rs b/neqo-transport/src/sender.rs index 15675f4b2b..46cbfd5d13 100644 --- a/neqo-transport/src/sender.rs +++ b/neqo-transport/src/sender.rs @@ -9,7 +9,7 @@ #![allow(clippy::module_name_repetitions)] use crate::cc::{ - ClassicCongestionControl, CongestionControl, CongestionControlAlgorithm, NewReno, + ClassicCongestionControl, CongestionControl, CongestionControlAlgorithm, Cubic, NewReno, MAX_DATAGRAM_SIZE, }; use crate::pace::Pacer; @@ -46,6 +46,9 @@ impl PacketSender { CongestionControlAlgorithm::NewReno => { Box::new(ClassicCongestionControl::new(NewReno::default())) } + CongestionControlAlgorithm::Cubic => { + Box::new(ClassicCongestionControl::new(Cubic::default())) + } }, pacer: None, } @@ -67,8 +70,8 @@ impl PacketSender { } // Multi-packet version of OnPacketAckedCC - pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket]) { - self.cc.on_packets_acked(acked_pkts); + pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) { + self.cc.on_packets_acked(acked_pkts, min_rtt, now); } pub fn on_packets_lost(