diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a3e79ec6e..e5eb6075b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,7 +7,7 @@ name: Test jobs: tests: runs-on: ubuntu-22.04 - needs: [check-msrv, test-msrv, test-stable, clippy] + needs: [check-msrv, test-msrv, test-stable, clippy, test-netsim] steps: - name: Done run: exit 0 @@ -48,6 +48,14 @@ jobs: - name: Run Tests nightly run: ./ci.sh test nightly + test-netsim: + runs-on: ubuntu-22.04 + continue-on-error: true + steps: + - uses: actions/checkout@v4 + - name: Run network-simulation tests + run: ./ci.sh netsim + test-build-16bit: runs-on: ubuntu-22.04 continue-on-error: true diff --git a/Cargo.toml b/Cargo.toml index c3ae7d556..aa2275726 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,8 @@ getopts = "0.2" rand = "0.8" url = "2.0" rstest = "0.17" +insta = "1.41.1" +rand_chacha = "0.3.1" [features] std = ["managed/std", "alloc"] @@ -109,6 +111,8 @@ default = [ "_proto-fragmentation" = [] +"_netsim" = [] + # BEGIN AUTOGENERATED CONFIG FEATURES # Generated by gen_config.py. DO NOT EDIT. iface-max-addr-count-1 = [] @@ -267,6 +271,10 @@ rpl-parents-buffer-count-32 = [] # END AUTOGENERATED CONFIG FEATURES +[[test]] +name = "netsim" +required-features = ["_netsim"] + [[example]] name = "packet2pcap" path = "utils/packet2pcap.rs" diff --git a/ci.sh b/ci.sh index 39a9cd334..950a3c1bd 100755 --- a/ci.sh +++ b/ci.sh @@ -59,6 +59,10 @@ test() { fi } +netsim() { + cargo test --release --features _netsim netsim +} + check() { local version=$1 rustup toolchain install $version @@ -138,3 +142,7 @@ fi if [[ $1 == "coverage" || $1 == "all" ]]; then coverage fi + +if [[ $1 == "netsim" || $1 == "all" ]]; then + netsim +fi diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 42f99175f..e33c31216 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -277,10 +277,20 @@ impl RttEstimator { #[derive(Debug, Clone, Copy, PartialEq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] enum Timer { - Idle { keep_alive_at: Option }, - Retransmit { expires_at: Instant }, + Idle { + keep_alive_at: Option, + }, + Retransmit { + expires_at: Instant, + }, FastRetransmit, - Close { expires_at: Instant }, + ZeroWindowProbe { + expires_at: Instant, + delay: Duration, + }, + Close { + expires_at: Instant, + }, } const ACK_DELAY_DEFAULT: Duration = Duration::from_millis(10); @@ -317,6 +327,13 @@ impl Timer { } } + fn should_zero_window_probe(&self, timestamp: Instant) -> bool { + match *self { + Timer::ZeroWindowProbe { expires_at, .. } if timestamp >= expires_at => true, + _ => false, + } + } + fn poll_at(&self) -> PollAt { match *self { Timer::Idle { @@ -325,6 +342,7 @@ impl Timer { Timer::Idle { keep_alive_at: None, } => PollAt::Ingress, + Timer::ZeroWindowProbe { expires_at, .. } => PollAt::Time(expires_at), Timer::Retransmit { expires_at, .. } => PollAt::Time(expires_at), Timer::FastRetransmit => PollAt::Now, Timer::Close { expires_at } => PollAt::Time(expires_at), @@ -353,7 +371,10 @@ impl Timer { fn set_for_retransmit(&mut self, timestamp: Instant, delay: Duration) { match *self { - Timer::Idle { .. } | Timer::FastRetransmit { .. } | Timer::Retransmit { .. } => { + Timer::Idle { .. } + | Timer::FastRetransmit { .. } + | Timer::Retransmit { .. } + | Timer::ZeroWindowProbe { .. } => { *self = Timer::Retransmit { expires_at: timestamp + delay, } @@ -372,12 +393,34 @@ impl Timer { } } - fn is_retransmit(&self) -> bool { - match *self { - Timer::Retransmit { .. } | Timer::FastRetransmit => true, - _ => false, + fn set_for_zero_window_probe(&mut self, timestamp: Instant, delay: Duration) { + *self = Timer::ZeroWindowProbe { + expires_at: timestamp + delay, + delay, } } + + fn rewind_zero_window_probe(&mut self, timestamp: Instant) { + if let Timer::ZeroWindowProbe { mut delay, .. } = *self { + delay = (delay * 2).min(Duration::from_millis(RTTE_MAX_RTO as _)); + *self = Timer::ZeroWindowProbe { + expires_at: timestamp + delay, + delay, + } + } + } + + fn is_idle(&self) -> bool { + matches!(self, Timer::Idle { .. }) + } + + fn is_zero_window_probe(&self) -> bool { + matches!(self, Timer::ZeroWindowProbe { .. }) + } + + fn is_retransmit(&self) -> bool { + matches!(self, Timer::Retransmit { .. } | Timer::FastRetransmit) + } } #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -1182,6 +1225,17 @@ impl<'a> Socket<'a> { self.remote_last_ts = None } + // if remote win is zero and we go from having no data to some data pending to + // send, start the zero window probe timer. + if self.remote_win_len == 0 && self.timer.is_idle() { + let delay = self.rtte.retransmission_timeout(); + tcp_trace!("starting zero-window-probe timer for t+{}", delay); + + // We don't have access to the current time here, so use Instant::ZERO instead. + // this will cause the first ZWP to be sent immediately, but that's okay. + self.timer.set_for_zero_window_probe(Instant::ZERO, delay); + } + #[cfg(any(test, feature = "verbose"))] tcp_trace!( "tx buffer: enqueueing {} octets (now {})", @@ -1713,7 +1767,7 @@ impl<'a> Socket<'a> { ack_of_fin = true; } - ack_all = self.remote_last_seq == ack_number + ack_all = self.remote_last_seq <= ack_number; } self.rtte.on_ack(cx.now(), ack_number); @@ -1796,7 +1850,6 @@ impl<'a> Socket<'a> { // ACK packets in the SYN-RECEIVED state change it to ESTABLISHED. (State::SynReceived, TcpControl::None) => { self.set_state(State::Established); - self.timer.set_for_idle(cx.now(), self.keep_alive); } // FIN packets in the SYN-RECEIVED state change it to CLOSE-WAIT. @@ -1806,7 +1859,6 @@ impl<'a> Socket<'a> { self.remote_seq_no += 1; self.rx_fin_received = true; self.set_state(State::CloseWait); - self.timer.set_for_idle(cx.now(), self.keep_alive); } // SYN|ACK packets in the SYN-SENT state change it to ESTABLISHED. @@ -1847,26 +1899,15 @@ impl<'a> Socket<'a> { } else { self.set_state(State::SynReceived); } - self.timer.set_for_idle(cx.now(), self.keep_alive); } - // RFC 6298: (5.2) ACK of all outstanding data turn off the retransmit timer. - // (5.3) ACK of new data in ESTABLISHED state restart the retransmit timer. - (State::Established, TcpControl::None) => { - if ack_all { - self.timer.set_for_idle(cx.now(), self.keep_alive); - } else if ack_len > 0 { - self.timer - .set_for_retransmit(cx.now(), self.rtte.retransmission_timeout()); - } - } + (State::Established, TcpControl::None) => {} // FIN packets in ESTABLISHED state indicate the remote side has closed. (State::Established, TcpControl::Fin) => { self.remote_seq_no += 1; self.rx_fin_received = true; self.set_state(State::CloseWait); - self.timer.set_for_idle(cx.now(), self.keep_alive); } // ACK packets in FIN-WAIT-1 state change it to FIN-WAIT-2, if we've already @@ -1875,9 +1916,6 @@ impl<'a> Socket<'a> { if ack_of_fin { self.set_state(State::FinWait2); } - if ack_all { - self.timer.set_for_idle(cx.now(), self.keep_alive); - } } // FIN packets in FIN-WAIT-1 state change it to CLOSING, or to TIME-WAIT @@ -1890,14 +1928,10 @@ impl<'a> Socket<'a> { self.timer.set_for_close(cx.now()); } else { self.set_state(State::Closing); - self.timer.set_for_idle(cx.now(), self.keep_alive); } } - // Data packets in FIN-WAIT-2 reset the idle timer. - (State::FinWait2, TcpControl::None) => { - self.timer.set_for_idle(cx.now(), self.keep_alive); - } + (State::FinWait2, TcpControl::None) => {} // FIN packets in FIN-WAIT-2 state change it to TIME-WAIT. (State::FinWait2, TcpControl::Fin) => { @@ -1912,15 +1946,10 @@ impl<'a> Socket<'a> { if ack_of_fin { self.set_state(State::TimeWait); self.timer.set_for_close(cx.now()); - } else { - self.timer.set_for_idle(cx.now(), self.keep_alive); } } - // ACK packets in CLOSE-WAIT state reset the retransmit timer. - (State::CloseWait, TcpControl::None) => { - self.timer.set_for_idle(cx.now(), self.keep_alive); - } + (State::CloseWait, TcpControl::None) => {} // ACK packets in LAST-ACK state change it to CLOSED. (State::LastAck, TcpControl::None) => { @@ -1928,8 +1957,6 @@ impl<'a> Socket<'a> { // Clear the remote endpoint, or we'll send an RST there. self.set_state(State::Closed); self.tuple = None; - } else { - self.timer.set_for_idle(cx.now(), self.keep_alive); } } @@ -2040,6 +2067,39 @@ impl<'a> Socket<'a> { self.last_remote_tsval = timestamp.tsval; } + // update timers. + match self.timer { + Timer::Retransmit { .. } | Timer::FastRetransmit => { + if ack_all { + // RFC 6298: (5.2) ACK of all outstanding data turn off the retransmit timer. + self.timer.set_for_idle(cx.now(), self.keep_alive); + } else if ack_len > 0 { + // (5.3) ACK of new data in ESTABLISHED state restart the retransmit timer. + let rto = self.rtte.retransmission_timeout(); + self.timer.set_for_retransmit(cx.now(), rto); + } + } + Timer::Idle { .. } => { + // any packet on idle refresh the keepalive timer. + self.timer.set_for_idle(cx.now(), self.keep_alive); + } + _ => {} + } + + // start/stop the Zero Window Probe timer. + if self.remote_win_len == 0 + && !self.tx_buffer.is_empty() + && (self.timer.is_idle() || ack_len > 0) + { + let delay = self.rtte.retransmission_timeout(); + tcp_trace!("starting zero-window-probe timer for t+{}", delay); + self.timer.set_for_zero_window_probe(cx.now(), delay); + } + if self.remote_win_len != 0 && self.timer.is_zero_window_probe() { + tcp_trace!("stopping zero-window-probe timer"); + self.timer.set_for_idle(cx.now(), self.keep_alive); + } + let payload_len = payload.len(); if payload_len == 0 { return None; @@ -2326,6 +2386,8 @@ impl<'a> Socket<'a> { } else if self.timer.should_keep_alive(cx.now()) { // If we need to transmit a keep-alive packet, do it. tcp_trace!("keep-alive timer expired"); + } else if self.timer.should_zero_window_probe(cx.now()) { + tcp_trace!("sending zero-window probe"); } else if self.timer.should_close(cx.now()) { // If we have spent enough time in the TIME-WAIT state, close the socket. tcp_trace!("TIME-WAIT timer expired"); @@ -2368,6 +2430,8 @@ impl<'a> Socket<'a> { payload: &[], }; + let mut is_zero_window_probe = false; + match self.state { // We transmit an RST in the CLOSED state. If we ended up in the CLOSED state // with a specified endpoint, it means that the socket was aborted. @@ -2409,7 +2473,7 @@ impl<'a> Socket<'a> { let win_right_edge = self.local_seq_no + self.remote_win_len; // Max amount of octets we're allowed to send according to the remote window. - let win_limit = if win_right_edge >= self.remote_last_seq { + let mut win_limit = if win_right_edge >= self.remote_last_seq { win_right_edge - self.remote_last_seq } else { // This can happen if we've sent some data and later the remote side @@ -2420,6 +2484,12 @@ impl<'a> Socket<'a> { 0 }; + // To send a zero-window-probe, force the window limit to at least 1 byte. + if win_limit == 0 && self.timer.should_zero_window_probe(cx.now()) { + win_limit = 1; + is_zero_window_probe = true; + } + // Maximum size we're allowed to send. This can be limited by 3 factors: // 1. remote window // 2. MSS the remote is willing to accept, probably determined by their MTU @@ -2518,6 +2588,12 @@ impl<'a> Socket<'a> { } self.ack_delay_timer = AckDelayTimer::Idle; + // Leave the rest of the state intact if sending a zero-window probe. + if is_zero_window_probe { + self.timer.rewind_zero_window_probe(cx.now()); + return Ok(()); + } + // Leave the rest of the state intact if sending a keep-alive packet, since those // carry a fake segment. if is_keep_alive { @@ -2537,12 +2613,12 @@ impl<'a> Socket<'a> { .post_transmit(cx.now(), repr.segment_len()); } - if !self.seq_to_transmit(cx) && repr.segment_len() > 0 && !self.timer.is_retransmit() { - // RFC 6298: (5.1) If we've transmitted all data we could (and there was - // something at all, data or flag, to transmit, not just an ACK), start the - // retransmit timer if it is not already running. - self.timer - .set_for_retransmit(cx.now(), self.rtte.retransmission_timeout()); + if repr.segment_len() > 0 && !self.timer.is_retransmit() { + // RFC 6298 (5.1) Every time a packet containing data is sent (including a + // retransmission), if the timer is not running, start it running + // so that it will expire after RTO seconds. + let rto = self.rtte.retransmission_timeout(); + self.timer.set_for_retransmit(cx.now(), rto); } if self.state == State::Closed { @@ -2806,11 +2882,14 @@ mod test { fn recv_nothing(socket: &mut TestSocket, timestamp: Instant) { socket.cx.set_now(timestamp); - let result: Result<(), ()> = socket - .socket - .dispatch(&mut socket.cx, |_, (_ip_repr, _tcp_repr)| { - panic!("Should not send a packet") - }); + let mut fail = false; + let result: Result<(), ()> = socket.socket.dispatch(&mut socket.cx, |_, _| { + fail = true; + Ok(()) + }); + if fail { + panic!("Should not send a packet") + } assert_eq!(result, Ok(())) } @@ -2833,6 +2912,10 @@ mod test { $( recv!($socket, Ok($repr)); )* recv_nothing!($socket) }); + ($socket:ident, time $time:expr, [$( $repr:expr ),*]) => ({ + $( recv!($socket, time $time, Ok($repr)); )* + recv_nothing!($socket, time $time) + }); ($socket:ident, $result:expr) => (recv!($socket, time 0, $result)); ($socket:ident, time $time:expr, $result:expr) => @@ -2948,6 +3031,9 @@ mod test { s.state = State::Closing; s.remote_last_seq = LOCAL_SEQ + 1 + 1; s.remote_seq_no = REMOTE_SEQ + 1 + 1; + s.timer = Timer::Retransmit { + expires_at: Instant::from_millis_const(1000), + }; s } @@ -6443,6 +6529,164 @@ mod test { })); } + #[test] + fn test_data_retransmit_ack_more_than_expected() { + let mut s = socket_established(); + s.remote_mss = 6; + s.send_slice(b"aaaaaabbbbbbcccccc").unwrap(); + + recv!(s, time 0, Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"aaaaaa"[..], + ..RECV_TEMPL + })); + recv!(s, time 0, Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1 + 6, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"bbbbbb"[..], + ..RECV_TEMPL + })); + recv!(s, time 0, Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1 + 12, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"cccccc"[..], + ..RECV_TEMPL + })); + recv_nothing!(s, time 0); + + recv_nothing!(s, time 50); + + // retransmit timer expires, we want to retransmit all 3 packets + // but we only manage to retransmit 2 (due to e.g. lack of device buffer space) + assert!(s.timer.is_retransmit()); + recv!(s, time 1000, Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"aaaaaa"[..], + ..RECV_TEMPL + })); + recv!(s, time 1000, Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1 + 6, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"bbbbbb"[..], + ..RECV_TEMPL + })); + + // ack first packet. + send!( + s, + time 3000, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1 + 6), + ..SEND_TEMPL + } + ); + + // this should keep retransmit timer on, because there's + // still unacked data. + assert!(s.timer.is_retransmit()); + + // ack all three packets. + // This might confuse the TCP stack because after the retransmit + // it "thinks" the 3rd packet hasn't been transmitted yet, but it is getting acked. + send!( + s, + time 3000, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1 + 18), + ..SEND_TEMPL + } + ); + + // this should exit retransmit mode. + assert!(!s.timer.is_retransmit()); + // and consider all data ACKed. + assert!(s.tx_buffer.is_empty()); + recv_nothing!(s, time 5000); + } + + #[test] + fn test_retransmit_fin() { + let mut s = socket_established(); + s.close(); + recv!(s, time 0, Ok(TcpRepr { + control: TcpControl::Fin, + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + ..RECV_TEMPL + })); + + recv_nothing!(s, time 999); + recv!(s, time 1000, Ok(TcpRepr { + control: TcpControl::Fin, + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + ..RECV_TEMPL + })); + } + + #[test] + fn test_retransmit_fin_wait() { + let mut s = socket_fin_wait_1(); + // we send FIN + recv!( + s, + [TcpRepr { + control: TcpControl::Fin, + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + ..RECV_TEMPL + }] + ); + // remote also sends FIN, does NOT ack ours. + send!( + s, + TcpRepr { + control: TcpControl::Fin, + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + ..SEND_TEMPL + } + ); + // we ack it + recv!( + s, + [TcpRepr { + control: TcpControl::None, + seq_number: LOCAL_SEQ + 2, + ack_number: Some(REMOTE_SEQ + 2), + ..RECV_TEMPL + }] + ); + + // we haven't got an ACK for our FIN, we should retransmit. + recv_nothing!(s, time 999); + recv!( + s, + time 1000, + [TcpRepr { + control: TcpControl::Fin, + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 2), + ..RECV_TEMPL + }] + ); + recv_nothing!(s, time 2999); + recv!( + s, + time 3000, + [TcpRepr { + control: TcpControl::Fin, + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 2), + ..RECV_TEMPL + }] + ); + } + // =========================================================================================// // Tests for window management. // =========================================================================================// @@ -6908,6 +7152,312 @@ mod test { assert!(s.window_to_update()); } + // =========================================================================================// + // Tests for zero-window probes. + // =========================================================================================// + + #[test] + fn test_zero_window_probe_enter_on_win_update() { + let mut s = socket_established(); + + assert!(!s.timer.is_zero_window_probe()); + + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + + assert!(!s.timer.is_zero_window_probe()); + + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + assert!(s.timer.is_zero_window_probe()); + } + + #[test] + fn test_zero_window_probe_enter_on_send() { + let mut s = socket_established(); + + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + assert!(!s.timer.is_zero_window_probe()); + + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + + assert!(s.timer.is_zero_window_probe()); + } + + #[test] + fn test_zero_window_probe_exit() { + let mut s = socket_established(); + + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + + assert!(!s.timer.is_zero_window_probe()); + + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + assert!(s.timer.is_zero_window_probe()); + + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 6, + ..SEND_TEMPL + } + ); + + assert!(!s.timer.is_zero_window_probe()); + } + + #[test] + fn test_zero_window_probe_exit_ack() { + let mut s = socket_established(); + + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv!( + s, + time 1000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + + send!( + s, + time 1010, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 2), + window_len: 6, + ..SEND_TEMPL + } + ); + + recv!( + s, + time 1010, + [TcpRepr { + seq_number: LOCAL_SEQ + 2, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"bcdef1"[..], + ..RECV_TEMPL + }] + ); + } + + #[test] + fn test_zero_window_probe_backoff_nack_reply() { + let mut s = socket_established(); + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv_nothing!(s, time 999); + recv!( + s, + time 1000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + send!( + s, + time 1100, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv_nothing!(s, time 2999); + recv!( + s, + time 3000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + send!( + s, + time 3100, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv_nothing!(s, time 6999); + recv!( + s, + time 7000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + } + + #[test] + fn test_zero_window_probe_backoff_no_reply() { + let mut s = socket_established(); + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv_nothing!(s, time 999); + recv!( + s, + time 1000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + + recv_nothing!(s, time 2999); + recv!( + s, + time 3000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + } + + #[test] + fn test_zero_window_probe_shift() { + let mut s = socket_established(); + + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv_nothing!(s, time 999); + recv!( + s, + time 1000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + + recv_nothing!(s, time 2999); + recv!( + s, + time 3000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + + // ack the ZWP byte, but still advertise zero window. + // this should restart the ZWP timer. + send!( + s, + time 3100, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 2), + window_len: 0, + ..SEND_TEMPL + } + ); + + // ZWP should be sent at 3100+1000 = 4100 + recv_nothing!(s, time 4099); + recv!( + s, + time 4100, + [TcpRepr { + seq_number: LOCAL_SEQ + 2, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"b"[..], + ..RECV_TEMPL + }] + ); + } + // =========================================================================================// // Tests for timeouts. // =========================================================================================// diff --git a/tests/netsim.rs b/tests/netsim.rs new file mode 100644 index 000000000..e7e5c47ef --- /dev/null +++ b/tests/netsim.rs @@ -0,0 +1,364 @@ +use std::cell::RefCell; +use std::collections::BinaryHeap; +use std::fmt::Write as _; +use std::io::Write as _; +use std::sync::Mutex; + +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaCha20Rng; +use smoltcp::iface::{Config, Interface, SocketSet}; +use smoltcp::phy::Tracer; +use smoltcp::phy::{self, ChecksumCapabilities, Device, DeviceCapabilities, Medium}; +use smoltcp::socket::tcp; +use smoltcp::time::{Duration, Instant}; +use smoltcp::wire::{EthernetAddress, HardwareAddress, IpAddress, IpCidr}; + +const MAC_A: HardwareAddress = HardwareAddress::Ethernet(EthernetAddress([2, 0, 0, 0, 0, 1])); +const MAC_B: HardwareAddress = HardwareAddress::Ethernet(EthernetAddress([2, 0, 0, 0, 0, 2])); +const IP_A: IpAddress = IpAddress::v4(10, 0, 0, 1); +const IP_B: IpAddress = IpAddress::v4(10, 0, 0, 2); + +const BYTES: usize = 10 * 1024 * 1024; + +static CLOCK: Mutex<(Instant, char)> = Mutex::new((Instant::ZERO, ' ')); + +#[test] +fn netsim() { + setup_logging(); + + let buffers = [128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768]; + let losses = [0.0, 0.001, 0.01, 0.02, 0.05, 0.10, 0.20, 0.30]; + + let mut s = String::new(); + + write!(&mut s, "buf\\loss").unwrap(); + for loss in losses { + write!(&mut s, "{loss:9.3} ").unwrap(); + } + writeln!(&mut s).unwrap(); + + for buffer in buffers { + write!(&mut s, "{buffer:7}").unwrap(); + for loss in losses { + let r = run_test(TestCase { + rtt: Duration::from_millis(100), + buffer, + loss, + }); + write!(&mut s, " {r:9.2}").unwrap(); + } + writeln!(&mut s).unwrap(); + } + + insta::assert_snapshot!(s); +} + +struct TestCase { + rtt: Duration, + loss: f64, + buffer: usize, +} + +fn run_test(case: TestCase) -> f64 { + let mut time = Instant::ZERO; + + let params = QueueParams { + latency: case.rtt / 2, + loss: case.loss, + }; + let queue_a_to_b = RefCell::new(PacketQueue::new(params.clone(), 0)); + let queue_b_to_a = RefCell::new(PacketQueue::new(params.clone(), 1)); + let device_a = QueueDevice::new(&queue_a_to_b, &queue_b_to_a, Medium::Ethernet); + let device_b = QueueDevice::new(&queue_b_to_a, &queue_a_to_b, Medium::Ethernet); + + let mut device_a = Tracer::new(device_a, |_timestamp, _printer| log::trace!("{}", _printer)); + let mut device_b = Tracer::new(device_b, |_timestamp, _printer| log::trace!("{}", _printer)); + + let mut iface_a = Interface::new(Config::new(MAC_A), &mut device_a, time); + iface_a.update_ip_addrs(|a| a.push(IpCidr::new(IP_A, 8)).unwrap()); + let mut iface_b = Interface::new(Config::new(MAC_B), &mut device_b, time); + iface_b.update_ip_addrs(|a| a.push(IpCidr::new(IP_B, 8)).unwrap()); + + // Create sockets + let socket_a = { + let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]); + let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]); + tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer) + }; + + let socket_b = { + let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]); + let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]); + tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer) + }; + + let mut sockets_a: [_; 2] = Default::default(); + let mut sockets_a = SocketSet::new(&mut sockets_a[..]); + let socket_a_handle = sockets_a.add(socket_a); + + let mut sockets_b: [_; 2] = Default::default(); + let mut sockets_b = SocketSet::new(&mut sockets_b[..]); + let socket_b_handle = sockets_b.add(socket_b); + + let mut did_listen = false; + let mut did_connect = false; + let mut processed = 0; + while processed < BYTES { + *CLOCK.lock().unwrap() = (time, ' '); + log::info!("loop"); + //println!("t = {}", time); + + *CLOCK.lock().unwrap() = (time, 'A'); + + iface_a.poll(time, &mut device_a, &mut sockets_a); + + let socket = sockets_a.get_mut::(socket_a_handle); + if !socket.is_active() && !socket.is_listening() && !did_listen { + //println!("listening"); + socket.listen(1234).unwrap(); + did_listen = true; + } + + while socket.can_recv() { + let received = socket.recv(|buffer| (buffer.len(), buffer.len())).unwrap(); + //println!("got {:?}", received,); + processed += received; + } + + *CLOCK.lock().unwrap() = (time, 'B'); + iface_b.poll(time, &mut device_b, &mut sockets_b); + let socket = sockets_b.get_mut::(socket_b_handle); + let cx = iface_b.context(); + if !socket.is_open() && !did_connect { + //println!("connecting"); + socket.connect(cx, (IP_A, 1234), 65000).unwrap(); + did_connect = true; + } + + while socket.can_send() { + //println!("sending"); + socket.send(|buffer| (buffer.len(), ())).unwrap(); + } + + *CLOCK.lock().unwrap() = (time, ' '); + + let mut next_time = queue_a_to_b.borrow_mut().next_expiration(); + next_time = next_time.min(queue_b_to_a.borrow_mut().next_expiration()); + if let Some(t) = iface_a.poll_at(time, &sockets_a) { + next_time = next_time.min(t); + } + if let Some(t) = iface_b.poll_at(time, &sockets_b) { + next_time = next_time.min(t); + } + assert!(next_time.total_micros() != i64::MAX); + time = time.max(next_time); + } + + let duration = time - Instant::ZERO; + processed as f64 / duration.total_micros() as f64 * 1e6 +} + +struct Packet { + timestamp: Instant, + id: u64, + data: Vec, +} + +impl PartialEq for Packet { + fn eq(&self, other: &Self) -> bool { + (other.timestamp, other.id) == (self.timestamp, self.id) + } +} + +impl Eq for Packet {} + +impl PartialOrd for Packet { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Packet { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (other.timestamp, other.id).cmp(&(self.timestamp, self.id)) + } +} + +#[derive(Clone)] +struct QueueParams { + latency: Duration, + loss: f64, +} + +struct PacketQueue { + queue: BinaryHeap, + next_id: u64, + params: QueueParams, + rng: ChaCha20Rng, +} + +impl PacketQueue { + pub fn new(params: QueueParams, seed: u64) -> Self { + Self { + queue: BinaryHeap::new(), + next_id: 0, + params, + rng: ChaCha20Rng::seed_from_u64(seed), + } + } + + pub fn next_expiration(&self) -> Instant { + self.queue + .peek() + .map(|p| p.timestamp) + .unwrap_or(Instant::from_micros(i64::MAX)) + } + + pub fn push(&mut self, data: Vec, timestamp: Instant) { + if self.rng.gen::() < self.params.loss { + log::info!("PACKET LOST!"); + return; + } + + self.queue.push(Packet { + data, + id: self.next_id, + timestamp: timestamp + self.params.latency, + }); + self.next_id += 1; + } + + pub fn pop(&mut self, timestamp: Instant) -> Option> { + let p = self.queue.peek()?; + if p.timestamp > timestamp { + return None; + } + Some(self.queue.pop().unwrap().data) + } +} + +pub struct QueueDevice<'a> { + tx_queue: &'a RefCell, + rx_queue: &'a RefCell, + medium: Medium, +} + +impl<'a> QueueDevice<'a> { + fn new( + tx_queue: &'a RefCell, + rx_queue: &'a RefCell, + medium: Medium, + ) -> Self { + Self { + tx_queue, + rx_queue, + medium, + } + } +} + +impl Device for QueueDevice<'_> { + type RxToken<'a> + = RxToken + where + Self: 'a; + type TxToken<'a> + = TxToken<'a> + where + Self: 'a; + + fn capabilities(&self) -> DeviceCapabilities { + let mut caps = DeviceCapabilities::default(); + caps.max_transmission_unit = 1514; + caps.medium = self.medium; + caps.checksum = ChecksumCapabilities::ignored(); + caps + } + + fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + self.rx_queue + .borrow_mut() + .pop(timestamp) + .map(move |buffer| { + let rx = RxToken { buffer }; + let tx = TxToken { + queue: self.tx_queue, + timestamp, + }; + (rx, tx) + }) + } + + fn transmit(&mut self, timestamp: Instant) -> Option> { + Some(TxToken { + queue: self.tx_queue, + timestamp, + }) + } +} + +pub struct RxToken { + buffer: Vec, +} + +impl phy::RxToken for RxToken { + fn consume(self, f: F) -> R + where + F: FnOnce(&[u8]) -> R, + { + f(&self.buffer) + } +} + +pub struct TxToken<'a> { + queue: &'a RefCell, + timestamp: Instant, +} + +impl<'a> phy::TxToken for TxToken<'a> { + fn consume(self, len: usize, f: F) -> R + where + F: FnOnce(&mut [u8]) -> R, + { + let mut buffer = vec![0; len]; + let result = f(&mut buffer); + self.queue.borrow_mut().push(buffer, self.timestamp); + result + } +} + +pub fn setup_logging() { + env_logger::Builder::new() + .format(move |buf, record| { + let (elapsed, side) = *CLOCK.lock().unwrap(); + + let timestamp = format!("[{elapsed} {side}]"); + if record.target().starts_with("smoltcp::") { + writeln!( + buf, + "{} ({}): {}", + timestamp, + record.target().replace("smoltcp::", ""), + record.args() + ) + } else if record.level() == log::Level::Trace { + let message = format!("{}", record.args()); + writeln!( + buf, + "{} {}", + timestamp, + message.replace('\n', "\n ") + ) + } else { + writeln!( + buf, + "{} ({}): {}", + timestamp, + record.target(), + record.args() + ) + } + }) + .parse_env("RUST_LOG") + .init(); +} diff --git a/tests/snapshots/netsim__netsim.snap b/tests/snapshots/netsim__netsim.snap new file mode 100644 index 000000000..0622a096d --- /dev/null +++ b/tests/snapshots/netsim__netsim.snap @@ -0,0 +1,15 @@ +--- +source: tests/netsim.rs +expression: s +snapshot_kind: text +--- +buf\loss 0.000 0.001 0.010 0.020 0.050 0.100 0.200 0.300 + 128 1279.98 1255.76 1054.15 886.36 538.66 227.84 33.99 7.18 + 256 2559.91 2507.27 2100.03 1770.30 1070.71 468.24 66.71 14.35 + 512 5119.63 5011.95 4172.36 3531.57 2098.73 942.38 144.73 29.45 + 1024 10238.50 10023.19 8340.90 7084.25 4003.34 1869.94 290.74 60.92 + 2048 17535.11 17171.82 14093.50 12063.90 7205.27 3379.12 824.76 131.54 + 4096 35062.41 33852.31 27011.08 22073.09 13680.70 7631.11 1617.81 302.65 + 8192 77374.28 72409.99 58428.68 48310.75 29123.30 14314.36 2880.39 551.60 + 16384 161842.28 159448.56 141467.31 127073.06 78239.08 38637.20 7565.64 1112.31 + 32768 322944.88 314313.90 266384.37 245985.29 138762.29 83162.99 10739.10 1951.95