From a87cb55046396a5461f3fe3d445989090453da3a Mon Sep 17 00:00:00 2001 From: Martin Indra Date: Tue, 22 Aug 2023 12:03:56 +0200 Subject: [PATCH] WIP --- crates/connector/tests/commands.rs | 10 + crates/connector/tests/network.rs | 108 +++++-- .../src/connection/{ => delivery}/confirms.rs | 0 .../{deliveries.rs => delivery/mod.rs} | 28 +- .../src/connection/{ => delivery}/pending.rs | 3 +- .../src/connection/{ => delivery}/received.rs | 0 crates/net/src/connection/dispatch/mod.rs | 141 +++++++++ crates/net/src/connection/mod.rs | 12 +- crates/net/src/connection/resend.rs | 297 ------------------ crates/net/src/tasks/confirmer.rs | 4 +- crates/net/src/tasks/mod.rs | 6 +- crates/net/src/tasks/resender.rs | 8 +- crates/net/src/tasks/sreceiver.rs | 6 +- crates/net/src/tasks/ureceiver.rs | 4 +- crates/net/src/tasks/usender.rs | 7 +- docs/src/multiplayer/connector/protocol.md | 3 +- 16 files changed, 270 insertions(+), 367 deletions(-) rename crates/net/src/connection/{ => delivery}/confirms.rs (100%) rename crates/net/src/connection/{deliveries.rs => delivery/mod.rs} (92%) rename crates/net/src/connection/{ => delivery}/pending.rs (93%) rename crates/net/src/connection/{ => delivery}/received.rs (100%) create mode 100644 crates/net/src/connection/dispatch/mod.rs delete mode 100644 crates/net/src/connection/resend.rs diff --git a/crates/connector/tests/commands.rs b/crates/connector/tests/commands.rs index c58b7047..d26ce7ce 100644 --- a/crates/connector/tests/commands.rs +++ b/crates/connector/tests/commands.rs @@ -42,6 +42,10 @@ fn test() { comms_a.send(ToServer::OpenGame { max_players: 3 }).await; let mut response = comms_a.recv::().await; + + // TODO + println!("A"); + assert_eq!(response.len(), 1); let response = response.pop().unwrap(); let game_port = match response { @@ -55,10 +59,16 @@ fn test() { comms_d.port = game_port; check_response!(comms_a, FromGame::Joined(1)); + // TODO + println!("b"); comms_b.send(ToGame::Join).await; check_response!(comms_b, FromGame::Joined(2)); + // TODO + println!("c"); check_response!(comms_a, FromGame::PeerJoined(2)); + // TODO + println!("d"); comms_a.send(ToGame::Readiness(Readiness::Ready)).await; // The other player is not yet ready -> no message should be received. diff --git a/crates/connector/tests/network.rs b/crates/connector/tests/network.rs index 5c32aa98..c0b40a61 100644 --- a/crates/connector/tests/network.rs +++ b/crates/connector/tests/network.rs @@ -4,7 +4,7 @@ use std::{ }; use async_std::{prelude::FutureExt, task}; -use de_net::Socket; +use de_net::{Reliability, Socket}; use futures::join; use ntest::timeout; @@ -14,8 +14,6 @@ mod common; const SERVER_ADDR: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8082)); -// TODO fix headers - #[derive(Debug)] struct ReceivedBuffer(Vec); @@ -36,10 +34,14 @@ impl ReceivedBuffer { ); } - fn find_id(&self, filter_reliable: bool, filter_data: &[u8]) -> Option { + fn find_id(&self, filter_reliability: Reliability, filter_data: &[u8]) -> Option { self.0.iter().find_map(|incomming| match incomming { - Incomming::Data { reliable, id, data } => { - if *reliable == filter_reliable && data == filter_data { + Incomming::Data { + reliability, + id, + data, + } => { + if *reliability == filter_reliability && data == filter_data { Some(*id) } else { None @@ -70,7 +72,16 @@ impl ReceivedBuffer { self.0.push(Incomming::Confirm(id)); } } else { - let reliable = buf[0] & 64 > 0; + let reliability = (buf[0] & 96) >> 5; + let reliability = if reliability == 0 { + Reliability::Unreliable + } else if reliability == 1 { + Reliability::Unordered + } else if reliability == 2 { + Reliability::SemiOrdered + } else { + panic!("Invalid reliability bits"); + }; id_bytes[0] = 0; id_bytes[1] = buf[1]; @@ -79,7 +90,7 @@ impl ReceivedBuffer { let id = u32::from_be_bytes(id_bytes); self.0.push(Incomming::Data { - reliable, + reliability, id, data: buf[4..n].to_vec(), }); @@ -91,7 +102,7 @@ impl ReceivedBuffer { enum Incomming { Confirm(u32), Data { - reliable: bool, + reliability: Reliability, id: u32, data: Vec, }, @@ -112,17 +123,22 @@ fn test() { received.load(&mut client, &mut buffer).await; // [5, 2] -> FromGame::PeerJoined(1) - let id = received.find_id(true, &[5, 2]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::SemiOrdered, &[5, 2]) + .unwrap() + .to_be_bytes(); // And send a confirmation client .send(server, &[128, 0, 0, 0, id[1], id[2], id[3]]) .await .unwrap(); - let first_id = received.find_id(true, &[5, 6, 7, 8]).unwrap(); + let first_id = received + .find_id(Reliability::Unordered, &[5, 6, 7, 8]) + .unwrap(); let mut data = [22; 412]; - data[0] = 64; // Reliable + data[0] = 32; // Unordered data[1] = 0; data[2] = 0; data[3] = 22; @@ -132,7 +148,9 @@ fn test() { received.load(&mut client, &mut buffer).await; received.load(&mut client, &mut buffer).await; received.assert_confirmed(22); - received.find_id(false, &[82, 83, 84]).unwrap(); + received + .find_id(Reliability::Unreliable, &[82, 83, 84]) + .unwrap(); // Try to send invalid data -- wrong header client @@ -148,10 +166,20 @@ fn test() { // Two retries before we confirm. let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; - assert_eq!(received.find_id(true, &[5, 6, 7, 8]).unwrap(), first_id); + assert_eq!( + received + .find_id(Reliability::Unordered, &[5, 6, 7, 8]) + .unwrap(), + first_id + ); let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; - assert_eq!(received.find_id(true, &[5, 6, 7, 8]).unwrap(), first_id); + assert_eq!( + received + .find_id(Reliability::Unordered, &[5, 6, 7, 8]) + .unwrap(), + first_id + ); let id = first_id.to_be_bytes(); // And send a confirmation @@ -160,8 +188,8 @@ fn test() { .await .unwrap(); - client.send(server, &[64, 0, 0, 92, 16]).await.unwrap(); - client.send(server, &[64, 0, 0, 86, 23]).await.unwrap(); + client.send(server, &[32, 0, 0, 92, 16]).await.unwrap(); + client.send(server, &[32, 0, 0, 86, 23]).await.unwrap(); let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; received.assert_confirmed(92); @@ -181,8 +209,8 @@ fn test() { let mut buffer = [0u8; 1024]; client - // Reliable - .send(server, &[64, 0, 0, 14, 5, 6, 7, 8]) + // unordered + .send(server, &[32, 0, 0, 14, 5, 6, 7, 8]) .await .unwrap(); @@ -190,7 +218,10 @@ fn test() { received.load(&mut client, &mut buffer).await; received.load(&mut client, &mut buffer).await; received.assert_confirmed(14); - let id = received.find_id(true, &[22; 408]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::Unordered, &[22; 408]) + .unwrap() + .to_be_bytes(); // Sending confirmation client @@ -209,7 +240,10 @@ fn test() { let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; - let id = received.find_id(true, &[16]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::Unordered, &[16]) + .unwrap() + .to_be_bytes(); client .send(server, &[128, 0, 0, 0, id[1], id[2], id[3]]) .await @@ -217,7 +251,10 @@ fn test() { let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; - let id = received.find_id(true, &[23]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::Unordered, &[23]) + .unwrap() + .to_be_bytes(); client .send(server, &[128, 0, 0, 0, id[1], id[2], id[3]]) .await @@ -247,11 +284,11 @@ async fn create_game() -> (Socket, u16) { let mut client = Socket::bind(None).await.unwrap(); - // [64 + 32] -> reliable + Peers::Server + // [64 + 16] -> semi-ordered + Peers::Server // [0, 0, 7] -> datagram ID = 7 // [1 3] -> ToGame::OpenGame { max_players: 3 } client - .send(SERVER_ADDR, &[64 + 32, 0, 0, 7, 1, 3]) + .send(SERVER_ADDR, &[64 + 16, 0, 0, 7, 1, 3]) .await .unwrap(); @@ -261,11 +298,16 @@ async fn create_game() -> (Socket, u16) { assert_eq!(received.0.len(), 1); let port = { - let Incomming::Data { reliable, id, data } = &(received.0)[0] else { + let Incomming::Data { + reliability, + id, + data, + } = &(received.0)[0] + else { panic!("Unexpected data received: {:?}", received); }; - assert!(reliable); + assert!(reliability.is_reliable()); // Confirm let id = id.to_be_bytes(); @@ -297,7 +339,10 @@ async fn create_game() -> (Socket, u16) { received.load(&mut client, &mut buffer).await; // [2, 1] -> FromGame::Joined(1) - let id = received.find_id(true, &[2, 1]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::SemiOrdered, &[2, 1]) + .unwrap() + .to_be_bytes(); client .send(server, &[128, 0, 0, 0, id[1], id[2], id[3]]) .await @@ -312,10 +357,10 @@ async fn join_game(game_port: u16) -> Socket { let server = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, game_port)); let mut client = Socket::bind(None).await.unwrap(); - // [64 + 32] -> reliable + Peers::Server + // [64 + 16] -> semi-ordered + Peers::Server // [0, 0, 3] -> datagram ID = 3 // [1] -> ToGame::Join - client.send(server, &[64 + 32, 0, 0, 3, 1]).await.unwrap(); + client.send(server, &[64 + 16, 0, 0, 3, 1]).await.unwrap(); let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; @@ -323,7 +368,10 @@ async fn join_game(game_port: u16) -> Socket { received.assert_confirmed(3); // [2, 2] -> FromGame::Joined(2) - let id = received.find_id(true, &[2, 2]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::SemiOrdered, &[2, 2]) + .unwrap() + .to_be_bytes(); client .send(server, &[128, 0, 0, 0, id[1], id[2], id[3]]) .await diff --git a/crates/net/src/connection/confirms.rs b/crates/net/src/connection/delivery/confirms.rs similarity index 100% rename from crates/net/src/connection/confirms.rs rename to crates/net/src/connection/delivery/confirms.rs diff --git a/crates/net/src/connection/deliveries.rs b/crates/net/src/connection/delivery/mod.rs similarity index 92% rename from crates/net/src/connection/deliveries.rs rename to crates/net/src/connection/delivery/mod.rs index 7422cc58..7c507f54 100644 --- a/crates/net/src/connection/deliveries.rs +++ b/crates/net/src/connection/delivery/mod.rs @@ -4,22 +4,26 @@ use async_std::{ channel::{SendError, Sender}, sync::{Arc, Mutex, MutexGuard}, }; +pub(crate) use received::ReceivedIdError; -use super::{ - book::{Connection, ConnectionBook}, +use self::{ confirms::{ConfirmsBuffer, MAX_BUFF_AGE}, pending::Pending, - received::{IdContinuity, Received, ReceivedIdError}, + received::{IdContinuity, Received}, }; +use super::book::{Connection, ConnectionBook}; use crate::{header::PackageId, record::DeliveryRecord, tasks::OutDatagram, Reliability}; -// TODO rename +mod confirms; +mod pending; +mod received; + #[derive(Clone)] -pub(crate) struct Confirmations { - book: Arc>>, +pub(crate) struct DeliveryHandler { + book: Arc>>, } -impl Confirmations { +impl DeliveryHandler { pub(crate) fn new() -> Self { Self { book: Arc::new(Mutex::new(ConnectionBook::new())), @@ -76,7 +80,7 @@ impl Confirmations { } pub(crate) struct ReceiveHandlerGuard<'a> { - guard: MutexGuard<'a, ConnectionBook>, + guard: MutexGuard<'a, ConnectionBook>, } impl<'a> ReceiveHandlerGuard<'a> { @@ -94,18 +98,18 @@ impl<'a> ReceiveHandlerGuard<'a> { buf: &'buf mut [u8], ) -> Result, ReceivedIdError> { self.guard - .update(record.time(), addr, ReceiveHandler::new) + .update(record.time(), addr, ConnDeliveryHandler::new) .push(record, data, buf) } } -struct ReceiveHandler { +struct ConnDeliveryHandler { received: Received, pending: Pending, confirms: ConfirmsBuffer, } -impl ReceiveHandler { +impl ConnDeliveryHandler { fn new() -> Self { Self { received: Received::new(), @@ -165,7 +169,7 @@ impl ReceiveHandler { } } -impl Connection for ReceiveHandler { +impl Connection for ConnDeliveryHandler { fn pending(&self) -> bool { !self.confirms.is_empty() } diff --git a/crates/net/src/connection/pending.rs b/crates/net/src/connection/delivery/pending.rs similarity index 93% rename from crates/net/src/connection/pending.rs rename to crates/net/src/connection/delivery/pending.rs index 406078a2..57f320a7 100644 --- a/crates/net/src/connection/pending.rs +++ b/crates/net/src/connection/delivery/pending.rs @@ -1,7 +1,6 @@ use std::collections::BTreeMap; -use super::databuf::DataBuf; -use crate::{header::PackageId, record::DeliveryRecord}; +use crate::{connection::databuf::DataBuf, header::PackageId, record::DeliveryRecord}; /// Buffer for packages delivered out-of-order and thus awaiting the right /// moment to be delivered. diff --git a/crates/net/src/connection/received.rs b/crates/net/src/connection/delivery/received.rs similarity index 100% rename from crates/net/src/connection/received.rs rename to crates/net/src/connection/delivery/received.rs diff --git a/crates/net/src/connection/dispatch/mod.rs b/crates/net/src/connection/dispatch/mod.rs new file mode 100644 index 00000000..f40c80a6 --- /dev/null +++ b/crates/net/src/connection/dispatch/mod.rs @@ -0,0 +1,141 @@ +use std::{ + net::SocketAddr, + time::{Duration, Instant}, +}; + +use async_std::{ + channel::{SendError, Sender}, + sync::{Arc, Mutex}, +}; + +use self::resends::{RescheduleResult, Resends, START_BACKOFF_MS}; +use super::book::{Connection, ConnectionBook}; +use crate::{ + header::{DatagramHeader, PackageHeader, PackageId}, + tasks::OutDatagram, +}; + +mod resends; + +#[derive(Clone)] +pub(crate) struct DispatchHandler { + book: Arc>>, +} + +impl DispatchHandler { + pub(crate) fn new() -> Self { + Self { + book: Arc::new(Mutex::new(ConnectionBook::new())), + } + } + + pub(crate) async fn sent( + &mut self, + time: Instant, + addr: SocketAddr, + header: PackageHeader, + data: &[u8], + ) { + let mut book = self.book.lock().await; + let handler = book.update(time, addr, ConnDispatchHandler::new); + handler.resends.push(header, data, time); + } + + /// Processes data with package confirmations. + /// + /// The data encode IDs of delivered (and confirmed) packages so that they + /// can be forgotten. + pub(crate) async fn confirmed(&mut self, time: Instant, addr: SocketAddr, data: &[u8]) { + let mut book = self.book.lock().await; + let handler = book.update(time, addr, ConnDispatchHandler::new); + + for i in 0..data.len() / 3 { + let offset = i * 3; + let id = PackageId::from_bytes(&data[offset..offset + 3]); + handler.resends.resolve(id); + } + } + + /// Re-send all packages already due for re-sending. + pub(crate) async fn resend( + &mut self, + time: Instant, + buf: &mut [u8], + datagrams: &mut Sender, + ) -> Result> { + let mut result = ResendResult { + failures: Vec::new(), + pending: 0, + next: time + Duration::from_millis(START_BACKOFF_MS), + }; + + let mut book = self.book.lock().await; + + while let Some((addr, handler)) = book.next() { + let failure = loop { + match handler.resends.reschedule(buf, time) { + RescheduleResult::Resend { len, header } => { + datagrams + .send(OutDatagram::new( + DatagramHeader::Package(header), + buf[..len].to_vec(), + addr, + )) + .await?; + } + RescheduleResult::Waiting(until) => { + result.next = result.next.min(until); + break false; + } + RescheduleResult::Empty => { + break false; + } + RescheduleResult::Failed => { + result.failures.push(addr); + break true; + } + } + }; + + if failure { + book.remove_current(); + result.failures.push(addr); + } else { + result.pending += handler.resends.len(); + } + } + + Ok(result) + } + + pub(crate) async fn clean(&mut self, time: Instant) { + self.book.lock().await.clean(time); + } +} + +pub(crate) struct ResendResult { + /// Vec of failed connections. + pub(crate) failures: Vec, + /// Number of pending (not yet confirmed) datagrams. + pub(crate) pending: usize, + /// Soonest possible time of the next datagram resend. + pub(crate) next: Instant, +} + +struct ConnDispatchHandler { + resends: Resends, +} + +impl ConnDispatchHandler { + fn new() -> Self { + Self { + resends: Resends::new(), + } + } +} + +impl Connection for ConnDispatchHandler { + fn pending(&self) -> bool { + !self.resends.is_empty() + } +} diff --git a/crates/net/src/connection/mod.rs b/crates/net/src/connection/mod.rs index 3d0bb60d..78f20108 100644 --- a/crates/net/src/connection/mod.rs +++ b/crates/net/src/connection/mod.rs @@ -1,11 +1,7 @@ -pub(crate) use deliveries::Confirmations; -pub(crate) use received::ReceivedIdError; -pub(crate) use resend::Resends; +pub(crate) use delivery::{DeliveryHandler, ReceivedIdError}; +pub(crate) use dispatch::DispatchHandler; mod book; -mod confirms; mod databuf; -mod deliveries; -mod pending; -mod received; -mod resend; +mod delivery; +mod dispatch; diff --git a/crates/net/src/connection/resend.rs b/crates/net/src/connection/resend.rs deleted file mode 100644 index 9bc90228..00000000 --- a/crates/net/src/connection/resend.rs +++ /dev/null @@ -1,297 +0,0 @@ -use std::{ - cmp::Ordering, - net::SocketAddr, - time::{Duration, Instant}, -}; - -use ahash::AHashMap; -use async_std::{ - channel::{SendError, Sender}, - sync::{Arc, Mutex}, -}; -use priority_queue::PriorityQueue; - -use super::{ - book::{Connection, ConnectionBook, MAX_CONN_AGE}, - databuf::DataBuf, -}; -use crate::{ - header::{DatagramHeader, PackageHeader, PackageId}, - tasks::OutDatagram, -}; - -const START_BACKOFF_MS: u64 = 220; -const MAX_TRIES: u8 = 6; -const MAX_BASE_RESEND_INTERVAL_MS: u64 = (MAX_CONN_AGE.as_millis() / 2) as u64; - -#[derive(Clone)] -pub(crate) struct Resends { - book: Arc>>, -} - -impl Resends { - pub(crate) fn new() -> Self { - Self { - book: Arc::new(Mutex::new(ConnectionBook::new())), - } - } - - pub(crate) async fn sent( - &mut self, - time: Instant, - addr: SocketAddr, - header: PackageHeader, - data: &[u8], - ) { - let mut book = self.book.lock().await; - let queue = book.update(time, addr, Queue::new); - queue.push(header, data, time); - } - - /// Processes data with package confirmations. - /// - /// The data encode IDs of delivered (and confirmed) packages so that they - /// can be forgotten. - pub(crate) async fn confirmed(&mut self, time: Instant, addr: SocketAddr, data: &[u8]) { - let mut book = self.book.lock().await; - let queue = book.update(time, addr, Queue::new); - - for i in 0..data.len() / 3 { - let offset = i * 3; - let id = PackageId::from_bytes(&data[offset..offset + 3]); - queue.resolve(id); - } - } - - /// Re-send all packages already due for re-sending. - pub(crate) async fn resend( - &mut self, - time: Instant, - buf: &mut [u8], - datagrams: &mut Sender, - ) -> Result> { - let mut result = ResendResult { - failures: Vec::new(), - pending: 0, - next: time + Duration::from_millis(START_BACKOFF_MS), - }; - - let mut book = self.book.lock().await; - - while let Some((addr, queue)) = book.next() { - let failure = loop { - match queue.reschedule(buf, time) { - RescheduleResult::Resend { len, header } => { - datagrams - .send(OutDatagram::new( - DatagramHeader::Package(header), - buf[..len].to_vec(), - addr, - )) - .await?; - } - RescheduleResult::Waiting(until) => { - result.next = result.next.min(until); - break false; - } - RescheduleResult::Empty => { - break false; - } - RescheduleResult::Failed => { - result.failures.push(addr); - break true; - } - } - }; - - if failure { - book.remove_current(); - result.failures.push(addr); - } else { - result.pending += queue.len(); - } - } - - Ok(result) - } - - pub(crate) async fn clean(&mut self, time: Instant) { - self.book.lock().await.clean(time); - } -} - -pub(crate) struct ResendResult { - /// Vec of failed connections. - pub(crate) failures: Vec, - /// Number of pending (not yet confirmed) datagrams. - pub(crate) pending: usize, - /// Soonest possible time of the next datagram resend. - pub(crate) next: Instant, -} - -/// This struct governs reliable package re-sending (until each package is -/// confirmed). -struct Queue { - queue: PriorityQueue, - headers: AHashMap, - data: DataBuf, -} - -impl Queue { - fn new() -> Self { - Self { - queue: PriorityQueue::new(), - headers: AHashMap::new(), - data: DataBuf::new(), - } - } - - /// Return the number of pending actions. - fn len(&self) -> usize { - self.queue.len() - } - - /// Registers new package for re-sending until it is resolved. - fn push(&mut self, header: PackageHeader, data: &[u8], now: Instant) { - self.queue.push(header.id(), Timing::new(now)); - self.headers.insert(header.id(), header); - self.data.push(header.id(), data); - } - - /// Marks a package as delivered. No more re-sends will be scheduled and - /// package data will be dropped. - fn resolve(&mut self, id: PackageId) { - let result = self.queue.remove(&id); - if result.is_some() { - self.headers.remove(&id); - self.data.remove(id); - } - } - - /// Retrieves next package to be resend or None if there is not (yet) such - /// a package. - /// - /// Each package is resent multiple times with randomized exponential - /// backoff. - /// - /// # Arguments - /// - /// * `buf` - the package payload is written to this buffer. The buffer - /// length must be greater or equal to the length of the payload. - /// - /// * `now` - current time, used for the retry scheduling. - /// - /// # Panics - /// - /// Panics if `buf` is smaller than the retrieved package payload. - fn reschedule(&mut self, buf: &mut [u8], now: Instant) -> RescheduleResult { - match self.queue.peek() { - Some((&id, timing)) => { - let until = timing.expiration(); - if until <= now { - match timing.another(now) { - Some(backoff) => { - self.queue.change_priority(&id, backoff); - let len = self.data.get(id, buf).unwrap(); - let header = *self.headers.get(&id).unwrap(); - RescheduleResult::Resend { len, header } - } - None => RescheduleResult::Failed, - } - } else { - RescheduleResult::Waiting(until) - } - } - None => RescheduleResult::Empty, - } - } -} - -impl Connection for Queue { - fn pending(&self) -> bool { - !self.queue.is_empty() - } -} - -/// Rescheduling result. -pub(crate) enum RescheduleResult { - /// A datagram is scheduled for an immediate resend. - Resend { - /// Length of the datagram data (written to a buffer) in bytes. - len: usize, - header: PackageHeader, - }, - /// No datagram is currently scheduled for an immediate resent. This - /// variant holds soonest possible time of a next resend. - Waiting(Instant), - /// There is currently no datagram scheduled for resending (immediate or - /// future). - Empty, - /// A datagram expired. Id est the maximum number of resends has been - /// reached. - Failed, -} - -#[derive(Eq)] -struct Timing { - attempt: u8, - expiration: Instant, -} - -impl Timing { - fn new(now: Instant) -> Self { - Self { - attempt: 0, - expiration: Self::schedule(0, now), - } - } - - fn expiration(&self) -> Instant { - self.expiration - } - - fn another(&self, now: Instant) -> Option { - if self.attempt == MAX_TRIES { - None - } else { - let attempt = self.attempt + 1; - Some(Self { - attempt, - expiration: Self::schedule(attempt, now), - }) - } - } - - fn schedule(attempt: u8, now: Instant) -> Instant { - let millis = Self::jitter(Self::backoff(attempt)); - now + Duration::from_millis(millis) - } - - fn backoff(attempt: u8) -> u64 { - MAX_BASE_RESEND_INTERVAL_MS.min(START_BACKOFF_MS * 2u64.pow(attempt as u32)) - } - - fn jitter(millis: u64) -> u64 { - millis + fastrand::u64(0..millis / 2) - } -} - -impl Ord for Timing { - fn cmp(&self, other: &Self) -> Ordering { - self.expiration - .cmp(&other.expiration) - .then_with(|| self.attempt.cmp(&other.attempt)) - } -} - -impl PartialOrd for Timing { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for Timing { - fn eq(&self, other: &Self) -> bool { - self.expiration == other.expiration && self.attempt == other.attempt - } -} diff --git a/crates/net/src/tasks/confirmer.rs b/crates/net/src/tasks/confirmer.rs index 2566b0c5..7b9cce5e 100644 --- a/crates/net/src/tasks/confirmer.rs +++ b/crates/net/src/tasks/confirmer.rs @@ -4,14 +4,14 @@ use async_std::{channel::Sender, task}; use tracing::{error, info}; use super::{cancellation::CancellationRecv, dsender::OutDatagram}; -use crate::connection::Confirmations; +use crate::connection::DeliveryHandler; /// Scheduler of datagram confirmations. pub(super) async fn run( port: u16, cancellation: CancellationRecv, mut datagrams: Sender, - mut confirms: Confirmations, + mut confirms: DeliveryHandler, ) { info!("Starting confirmer on port {port}..."); diff --git a/crates/net/src/tasks/mod.rs b/crates/net/src/tasks/mod.rs index 9f03cef1..033d0950 100644 --- a/crates/net/src/tasks/mod.rs +++ b/crates/net/src/tasks/mod.rs @@ -65,7 +65,7 @@ use futures::future::BoxFuture; use tracing::info; use crate::{ - connection::{Confirmations, Resends}, + connection::{DeliveryHandler, DispatchHandler}, protocol::ProtocolSocket, tasks::cancellation::cancellation, Socket, @@ -121,7 +121,7 @@ where protocol_socket, ))); - let resends = Resends::new(); + let resends = DispatchHandler::new(); let (sreceiver_cancellation_sender, sreceiver_cancellation_receiver) = cancellation(); spawn(Box::pin(sreceiver::run( port, @@ -132,7 +132,7 @@ where let (inputs_sender, inputs_receiver) = bounded(CHANNEL_CAPACITY); let (confirmer_cancellation_sender, confirmer_cancellation_receiver) = cancellation(); - let confirms = Confirmations::new(); + let confirms = DeliveryHandler::new(); spawn(Box::pin(ureceiver::run( port, confirmer_cancellation_sender, diff --git a/crates/net/src/tasks/resender.rs b/crates/net/src/tasks/resender.rs index 7f1866c6..4e0a0eb3 100644 --- a/crates/net/src/tasks/resender.rs +++ b/crates/net/src/tasks/resender.rs @@ -8,7 +8,7 @@ use super::{ communicator::ConnectionError, dsender::OutDatagram, }; -use crate::{connection::Resends, MAX_DATAGRAM_SIZE}; +use crate::{connection::DispatchHandler, MAX_DATAGRAM_SIZE}; const CANCELLATION_DEADLINE: Duration = Duration::from_secs(5); @@ -19,7 +19,7 @@ pub(super) async fn run( _cancellation_send: CancellationSender, mut datagrams: Sender, errors: Sender, - mut resends: Resends, + mut dispatch_handler: DispatchHandler, ) { info!("Starting resender on port {port}..."); @@ -31,9 +31,9 @@ pub(super) async fn run( deadline = Some(Instant::now() + CANCELLATION_DEADLINE); } - resends.clean(Instant::now()).await; + dispatch_handler.clean(Instant::now()).await; - let Ok(resend_result) = resends + let Ok(resend_result) = dispatch_handler .resend(Instant::now(), &mut buf, &mut datagrams) .await else { diff --git a/crates/net/src/tasks/sreceiver.rs b/crates/net/src/tasks/sreceiver.rs index 81b75101..7797a8bb 100644 --- a/crates/net/src/tasks/sreceiver.rs +++ b/crates/net/src/tasks/sreceiver.rs @@ -4,7 +4,7 @@ use async_std::{channel::Receiver, future::timeout}; use tracing::{error, info}; use super::{cancellation::CancellationRecv, dreceiver::InSystemDatagram}; -use crate::connection::Resends; +use crate::connection::DispatchHandler; /// Handler of protocol control datagrams. /// @@ -13,7 +13,7 @@ pub(super) async fn run( port: u16, cancellation: CancellationRecv, datagrams: Receiver, - mut resends: Resends, + mut dispatch_handler: DispatchHandler, ) { info!("Starting protocol control datagram receiver on port {port}..."); @@ -31,7 +31,7 @@ pub(super) async fn run( break; }; - resends + dispatch_handler .confirmed(Instant::now(), datagram.source, &datagram.data) .await; } diff --git a/crates/net/src/tasks/ureceiver.rs b/crates/net/src/tasks/ureceiver.rs index 5c8f4ea4..725a7902 100644 --- a/crates/net/src/tasks/ureceiver.rs +++ b/crates/net/src/tasks/ureceiver.rs @@ -8,7 +8,7 @@ use tracing::{error, info, trace, warn}; use super::{cancellation::CancellationSender, dreceiver::InPackageDatagram}; use crate::{ - connection::{Confirmations, ReceivedIdError}, + connection::{DeliveryHandler, ReceivedIdError}, record::DeliveryRecord, InPackage, MAX_PACKAGE_SIZE, }; @@ -23,7 +23,7 @@ pub(super) async fn run( _cancellation: CancellationSender, datagrams: Receiver, packages: Sender, - mut confirms: Confirmations, + mut confirms: DeliveryHandler, ) { info!("Starting package receiver on port {port}..."); diff --git a/crates/net/src/tasks/usender.rs b/crates/net/src/tasks/usender.rs index 4afa931e..08915ec5 100644 --- a/crates/net/src/tasks/usender.rs +++ b/crates/net/src/tasks/usender.rs @@ -5,7 +5,7 @@ use tracing::{error, info}; use super::{cancellation::CancellationSender, dsender::OutDatagram}; use crate::{ - connection::Resends, + connection::DispatchHandler, header::{DatagramHeader, PackageHeader, PackageIdRange}, OutPackage, }; @@ -16,10 +16,11 @@ pub(super) async fn run( _cancellation: CancellationSender, datagrams: Sender, packages: Receiver, - mut resends: Resends, + mut dispatch_handler: DispatchHandler, ) { info!("Starting package sender on port {port}..."); + // TODO count each user independently let mut counter_reliable = PackageIdRange::counter(); let mut counter_unreliable = PackageIdRange::counter(); @@ -40,7 +41,7 @@ pub(super) async fn run( if package_header.reliability().is_reliable() { let time = Instant::now(); for target in &package.targets { - resends + dispatch_handler .sent(time, target, package_header, &package.data) .await; } diff --git a/docs/src/multiplayer/connector/protocol.md b/docs/src/multiplayer/connector/protocol.md index 8bd8b974..d50dba32 100644 --- a/docs/src/multiplayer/connector/protocol.md +++ b/docs/src/multiplayer/connector/protocol.md @@ -24,7 +24,8 @@ by the mask `0b1000_0000`). Each user package has an ID, encoded within the last three bytes of the datagram header. These IDs increment until they reach the maximum value that can be encoded within three bytes, after which the counter resets to 0. The ID -sequence for reliable and unreliable packages are independent. +sequence for reliable and unreliable packages are independent. Each connection +has it own sequence. Packages can be transmitted in either reliable or non-reliable mode. Reliability is signaled by the second highest bit of the flags byte