From 58e33b77977ca880f41675ff36b00aaca1a27fd2 Mon Sep 17 00:00:00 2001 From: Martin Indra Date: Fri, 18 Aug 2023 22:05:13 +0200 Subject: [PATCH] de_net: Add message ordering --- crates/connector/src/game/ereceiver.rs | 8 +- crates/connector/src/game/greceiver.rs | 82 ++++++++++---- crates/connector/src/game/mreceiver.rs | 4 +- crates/connector/src/game/preceiver.rs | 12 +- crates/connector/src/server.rs | 9 +- crates/connector/tests/commands.rs | 8 +- crates/multiplayer/src/game.rs | 29 +++-- crates/multiplayer/src/messages.rs | 60 ++++++---- crates/multiplayer/src/stats.rs | 12 +- crates/net/src/connection/confirms.rs | 65 +++++++---- crates/net/src/connection/resend.rs | 32 +++--- crates/net/src/header.rs | 145 +++++++++++++++++++------ crates/net/src/lib.rs | 2 +- crates/net/src/tasks/communicator.rs | 49 +++++---- crates/net/src/tasks/ureceiver.rs | 7 +- crates/net/src/tasks/usender.rs | 27 ++--- 16 files changed, 364 insertions(+), 187 deletions(-) diff --git a/crates/connector/src/game/ereceiver.rs b/crates/connector/src/game/ereceiver.rs index 5a608e03..207b56d2 100644 --- a/crates/connector/src/game/ereceiver.rs +++ b/crates/connector/src/game/ereceiver.rs @@ -2,7 +2,7 @@ use std::time::Duration; use async_std::{channel::Sender, future::timeout}; use de_messages::ToGame; -use de_net::ConnErrorReceiver; +use de_net::{ConnErrorReceiver, Reliability}; use tracing::{error, info, warn}; use super::greceiver::ToGameMessage; @@ -26,7 +26,11 @@ pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender Self { + pub(super) fn new(source: SocketAddr, reliability: Reliability, message: ToGame) -> Self { Self { - meta: MessageMeta { source, reliable }, + meta: MessageMeta { + source, + reliability, + }, message, } } @@ -27,7 +30,7 @@ impl ToGameMessage { struct MessageMeta { source: SocketAddr, - reliable: bool, + reliability: Reliability, } pub(super) struct GameProcessor { @@ -142,7 +145,7 @@ impl GameProcessor { .send( OutPackage::encode_single( &FromGame::NotJoined, - message.meta.reliable, + message.meta.reliability, Peers::Server, message.meta.source, ) @@ -159,7 +162,7 @@ impl GameProcessor { .send( OutPackage::encode_single( &FromGame::Pong(id), - meta.reliable, + meta.reliability, Peers::Server, meta.source, ) @@ -172,8 +175,12 @@ impl GameProcessor { async fn process_join(&mut self, meta: MessageMeta) { if let Err(err) = self.clients.reserve(meta.source).await { warn!("Join request error: {err}"); - self.send(&FromGame::JoinError(JoinError::DifferentGame), meta.source) - .await; + self.send( + &FromGame::JoinError(JoinError::DifferentGame), + Reliability::Unordered, + meta.source, + ) + .await; return; } @@ -191,8 +198,12 @@ impl GameProcessor { meta.source, self.port ); - self.send(&FromGame::JoinError(JoinError::AlreadyJoined), meta.source) - .await; + self.send( + &FromGame::JoinError(JoinError::AlreadyJoined), + Reliability::Unordered, + meta.source, + ) + .await; } JoinErrorInner::GameFull => { warn!( @@ -200,8 +211,12 @@ impl GameProcessor { meta.source, self.port ); - self.send(&FromGame::JoinError(JoinError::GameFull), meta.source) - .await; + self.send( + &FromGame::JoinError(JoinError::GameFull), + Reliability::Unordered, + meta.source, + ) + .await; } JoinErrorInner::GameNotOpened => { warn!( @@ -210,8 +225,12 @@ impl GameProcessor { meta.source, self.port ); - self.send(&FromGame::JoinError(JoinError::GameNotOpened), meta.source) - .await; + self.send( + &FromGame::JoinError(JoinError::GameNotOpened), + Reliability::Unordered, + meta.source, + ) + .await; } } } @@ -224,8 +243,14 @@ impl GameProcessor { "Player {id} on {addr:?} just joined game on port {}.", self.port ); - self.send(&FromGame::Joined(id), addr).await; - self.send_all(&FromGame::PeerJoined(id), Some(addr)).await; + self.send(&FromGame::Joined(id), Reliability::SemiOrdered, addr) + .await; + self.send_all( + &FromGame::PeerJoined(id), + Reliability::SemiOrdered, + Some(addr), + ) + .await; Ok(()) } @@ -243,16 +268,22 @@ impl GameProcessor { meta.source, self.port ); - self.send(&FromGame::Left, meta.source).await; - self.send_all(&FromGame::PeerLeft(id), None).await; + self.send(&FromGame::Left, Reliability::SemiOrdered, meta.source) + .await; + self.send_all(&FromGame::PeerLeft(id), Reliability::SemiOrdered, None) + .await; } async fn process_readiness(&mut self, meta: MessageMeta, readiness: Readiness) { match self.state.update_readiness(meta.source, readiness).await { Ok(progressed) => { if progressed { - self.send_all(&FromGame::GameReadiness(readiness), None) - .await; + self.send_all( + &FromGame::GameReadiness(readiness), + Reliability::SemiOrdered, + None, + ) + .await; } } Err(err) => warn!( @@ -268,24 +299,27 @@ impl GameProcessor { /// /// * `message` - message to be sent. /// + /// * `reliability` - reliability mode for the message. + /// /// * `exclude` - if not None, the message will be delivered to all but /// this player. - async fn send_all(&self, message: &E, exclude: Option) + async fn send_all(&self, message: &E, reliability: Reliability, exclude: Option) where E: bincode::Encode, { if let Some(targets) = self.state.targets(exclude).await { - self.send(message, targets).await; + self.send(message, reliability, targets).await; } } /// Send message to some targets. - async fn send(&self, message: &E, targets: T) + async fn send(&self, message: &E, reliability: Reliability, targets: T) where E: bincode::Encode, T: Into>, { - let message = OutPackage::encode_single(message, true, Peers::Server, targets).unwrap(); + let message = + OutPackage::encode_single(message, reliability, Peers::Server, targets).unwrap(); let _ = self.outputs.send(message).await; } } diff --git a/crates/connector/src/game/mreceiver.rs b/crates/connector/src/game/mreceiver.rs index 6897c521..df65afc6 100644 --- a/crates/connector/src/game/mreceiver.rs +++ b/crates/connector/src/game/mreceiver.rs @@ -42,7 +42,7 @@ pub(super) async fn run( let result = server .send(ToGameMessage::new( package.source(), - package.reliable(), + package.reliability(), message, )) .await; @@ -60,7 +60,7 @@ pub(super) async fn run( Peers::Players => { let _ = players .send(PlayersPackage::new( - package.reliable(), + package.reliability(), package.source(), package.data(), )) diff --git a/crates/connector/src/game/preceiver.rs b/crates/connector/src/game/preceiver.rs index 62adf99f..1ff62b28 100644 --- a/crates/connector/src/game/preceiver.rs +++ b/crates/connector/src/game/preceiver.rs @@ -2,22 +2,22 @@ use std::net::SocketAddr; use async_std::channel::Receiver; use de_messages::FromGame; -use de_net::{OutPackage, PackageSender, Peers}; +use de_net::{OutPackage, PackageSender, Peers, Reliability}; use tracing::{error, info, warn}; use super::state::GameState; /// A package destined to other players in the game. pub(super) struct PlayersPackage { - reliable: bool, + reliability: Reliability, source: SocketAddr, data: Vec, } impl PlayersPackage { - pub(super) fn new(reliable: bool, source: SocketAddr, data: Vec) -> Self { + pub(super) fn new(reliability: Reliability, source: SocketAddr, data: Vec) -> Self { Self { - reliable, + reliability, source, data, } @@ -56,7 +56,7 @@ pub(super) async fn run( .send( OutPackage::encode_single( &FromGame::NotJoined, - package.reliable, + package.reliability, Peers::Server, package.source, ) @@ -73,7 +73,7 @@ pub(super) async fn run( let result = outputs .send(OutPackage::new( package.data, - package.reliable, + package.reliability, Peers::Players, targets, )) diff --git a/crates/connector/src/server.rs b/crates/connector/src/server.rs index 360d9224..c6163138 100644 --- a/crates/connector/src/server.rs +++ b/crates/connector/src/server.rs @@ -3,7 +3,9 @@ use std::net::SocketAddr; use anyhow::Context; use async_std::task; use de_messages::{FromServer, GameOpenError, ToServer}; -use de_net::{self, MessageDecoder, OutPackage, PackageReceiver, PackageSender, Peers, Socket}; +use de_net::{ + self, MessageDecoder, OutPackage, PackageReceiver, PackageSender, Peers, Reliability, Socket, +}; use tracing::{error, info, warn}; use crate::{clients::Clients, game}; @@ -102,7 +104,10 @@ impl MainServer { async fn reply(&mut self, message: &FromServer, target: SocketAddr) -> anyhow::Result<()> { self.outputs - .send(OutPackage::encode_single(message, true, Peers::Server, target).unwrap()) + .send( + OutPackage::encode_single(message, Reliability::Unordered, Peers::Server, target) + .unwrap(), + ) .await .context("Failed to send a reply") } diff --git a/crates/connector/tests/commands.rs b/crates/connector/tests/commands.rs index d00d52a2..c58b7047 100644 --- a/crates/connector/tests/commands.rs +++ b/crates/connector/tests/commands.rs @@ -5,7 +5,9 @@ use std::{ use async_std::{future::timeout, task}; use de_messages::{FromGame, FromServer, JoinError, Readiness, ToGame, ToServer}; -use de_net::{self, ConnErrorReceiver, OutPackage, PackageReceiver, PackageSender, Peers, Socket}; +use de_net::{ + self, ConnErrorReceiver, OutPackage, PackageReceiver, PackageSender, Peers, Reliability, Socket, +}; use ntest::timeout; use crate::common::{spawn_and_wait, term_and_wait}; @@ -161,7 +163,9 @@ impl Comms { E: bincode::Encode, { let addr = SocketAddr::new(self.host, self.port); - let package = OutPackage::encode_single(&message, true, Peers::Server, addr).unwrap(); + let package = + OutPackage::encode_single(&message, Reliability::SemiOrdered, Peers::Server, addr) + .unwrap(); self.sender.send(package).await.unwrap(); } diff --git a/crates/multiplayer/src/game.rs b/crates/multiplayer/src/game.rs index 6d70ab59..1a1ed34c 100644 --- a/crates/multiplayer/src/game.rs +++ b/crates/multiplayer/src/game.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use bevy::prelude::*; use de_core::{player::Player, schedule::PreMovement}; use de_messages::{FromGame, FromServer, GameOpenError, JoinError, Readiness, ToGame, ToServer}; +use de_net::Reliability; use crate::{ config::ConnectionType, @@ -102,7 +103,7 @@ impl From for SetReadinessEvent { fn open_or_join( conf: Res, mut main_server: EventWriter, - mut game_server: EventWriter>, + mut game_server: EventWriter, ) { match conf.connection_type() { ConnectionType::CreateGame { max_players, .. } => { @@ -116,7 +117,10 @@ fn open_or_join( } ConnectionType::JoinGame(_) => { info!("Sending a join-game request."); - game_server.send(ToGame::Join.into()); + game_server.send(ToGameServerEvent::new( + Reliability::SemiOrdered, + ToGame::Join, + )); } } } @@ -125,7 +129,7 @@ fn process_from_server( conf: Res, mut ports: ResMut, mut events: EventReader, - mut outputs: EventWriter>, + mut outputs: EventWriter, mut opened: EventWriter, mut fatals: EventWriter, ) { @@ -138,7 +142,10 @@ fn process_from_server( Ok(_) => { info!("Game on port {} opened.", *port); // Send something to open NAT. - outputs.send(ToGame::Ping(u32::MAX).into()); + outputs.send(ToGameServerEvent::new( + Reliability::Unordered, + ToGame::Ping(u32::MAX), + )); opened.send(GameOpenedEvent(SocketAddr::new(conf.server_host(), *port))); } Err(err) => { @@ -232,18 +239,24 @@ fn process_from_game( fn set_readiness( mut readiness_events: EventReader, - mut message_events: EventWriter>, + mut message_events: EventWriter, ) { let Some(readiness) = readiness_events.iter().last() else { return; }; - message_events.send(ToGameServerEvent::from(ToGame::Readiness(readiness.0))); + message_events.send(ToGameServerEvent::new( + Reliability::SemiOrdered, + ToGame::Readiness(readiness.0), + )); } -fn leave(mut server: EventWriter>) { +fn leave(mut server: EventWriter) { info!("Sending leave game message."); // Send this even if not yet joined because the join / open-game request // might already be processed. - server.send(ToGame::Leave.into()); + server.send(ToGameServerEvent::new( + Reliability::SemiOrdered, + ToGame::Leave, + )); } diff --git a/crates/multiplayer/src/messages.rs b/crates/multiplayer/src/messages.rs index 99577646..bd45ee0f 100644 --- a/crates/multiplayer/src/messages.rs +++ b/crates/multiplayer/src/messages.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, time::Instant}; use bevy::prelude::*; use de_core::schedule::PreMovement; use de_messages::{FromGame, FromServer, ToGame, ToServer}; -use de_net::{InPackage, PackageBuilder, Peers}; +use de_net::{InPackage, PackageBuilder, Peers, Reliability}; use crate::{ config::ConnectionType, @@ -17,8 +17,7 @@ pub(crate) struct MessagesPlugin; impl Plugin for MessagesPlugin { fn build(&self, app: &mut App) { app.add_event::() - .add_event::>() - .add_event::>() + .add_event::() .add_event::() .add_event::() .add_systems(OnEnter(NetState::Connecting), setup) @@ -30,12 +29,8 @@ impl Plugin for MessagesPlugin { .run_if(on_event::()) .in_set(MessagesSet::SendMessages) .before(NetworkSet::SendPackages), - message_sender::> - .run_if(on_event::>()) - .in_set(MessagesSet::SendMessages) - .before(NetworkSet::SendPackages), - message_sender::> - .run_if(on_event::>()) + message_sender:: + .run_if(on_event::()) .in_set(MessagesSet::SendMessages) .before(NetworkSet::SendPackages), ), @@ -62,8 +57,8 @@ where { type Message: bincode::Encode; const PORT_TYPE: PortType; - const RELIABLE: bool; + fn reliability(&self) -> Reliability; fn message(&self) -> &Self::Message; } @@ -79,7 +74,10 @@ impl From for ToMainServerEvent { impl ToMessage for ToMainServerEvent { type Message = ToServer; const PORT_TYPE: PortType = PortType::Main; - const RELIABLE: bool = true; + + fn reliability(&self) -> Reliability { + Reliability::Unordered + } fn message(&self) -> &Self::Message { &self.0 @@ -87,21 +85,30 @@ impl ToMessage for ToMainServerEvent { } #[derive(Event)] -pub(crate) struct ToGameServerEvent(ToGame); +pub(crate) struct ToGameServerEvent { + reliability: Reliability, + message: ToGame, +} -impl From for ToGameServerEvent { - fn from(message: ToGame) -> Self { - Self(message) +impl ToGameServerEvent { + pub(crate) fn new(reliability: Reliability, message: ToGame) -> Self { + Self { + reliability, + message, + } } } -impl ToMessage for ToGameServerEvent { +impl ToMessage for ToGameServerEvent { type Message = ToGame; const PORT_TYPE: PortType = PortType::Game; - const RELIABLE: bool = R; + + fn reliability(&self) -> Reliability { + self.reliability + } fn message(&self) -> &Self::Message { - &self.0 + &self.message } } @@ -255,13 +262,24 @@ fn message_sender( return; }; let addr = SocketAddr::new(conf.server_host(), port); - let mut builder = PackageBuilder::new(E::RELIABLE, Peers::Server, addr); + + let mut builder_unreliable = PackageBuilder::new(Reliability::Unreliable, Peers::Server, addr); + let mut builder_unordered = PackageBuilder::new(Reliability::Unordered, Peers::Server, addr); + let mut builder_semi = PackageBuilder::new(Reliability::SemiOrdered, Peers::Server, addr); for event in inputs.iter() { + let builder = match event.reliability() { + Reliability::Unreliable => &mut builder_unreliable, + Reliability::Unordered => &mut builder_unordered, + Reliability::SemiOrdered => &mut builder_semi, + }; builder.push(event.message()).unwrap(); } - for package in builder.build() { - outputs.send(package.into()); + + for builder in [builder_unreliable, builder_unordered, builder_semi] { + for package in builder.build() { + outputs.send(package.into()); + } } } diff --git a/crates/multiplayer/src/stats.rs b/crates/multiplayer/src/stats.rs index afd5821e..3972c439 100644 --- a/crates/multiplayer/src/stats.rs +++ b/crates/multiplayer/src/stats.rs @@ -6,6 +6,7 @@ use std::{ use bevy::prelude::*; use de_core::schedule::PreMovement; use de_messages::{FromGame, ToGame}; +use de_net::Reliability; use tracing::{debug, info, trace}; use crate::{ @@ -218,7 +219,7 @@ fn ping( mut timer: ResMut>, mut counter: ResMut, mut tracker: ResMut>, - mut messages: EventWriter>, + mut messages: EventWriter, ) { timer.0.tick(time.delta()); @@ -226,12 +227,15 @@ fn ping( for _ in 0..timer.0.times_finished_this_tick() { let id = counter.next(); tracker.register(id, time); - if R { + let reliability = if R { info!("Sending reliable Ping({id}).",); + Reliability::Unordered } else { trace!("Sending unreliable Ping({id}).",); - } - messages.send(ToGame::Ping(id).into()); + Reliability::Unreliable + }; + + messages.send(ToGameServerEvent::new(reliability, ToGame::Ping(id))); } } diff --git a/crates/net/src/connection/confirms.rs b/crates/net/src/connection/confirms.rs index e4a30cd2..d5c70a91 100644 --- a/crates/net/src/connection/confirms.rs +++ b/crates/net/src/connection/confirms.rs @@ -48,7 +48,7 @@ impl Confirmations { time: Instant, addr: SocketAddr, id: PackageId, - ) -> Result { + ) -> Result { self.book .lock() .await @@ -109,27 +109,29 @@ impl Confirmations { } struct IdReceiver { - duplicates: Duplicates, + received: Received, buffer: Buffer, } impl IdReceiver { fn new() -> Self { Self { - duplicates: Duplicates::new(), + received: Received::new(), buffer: Buffer::new(), } } /// Registers a package as received and returns whether the it was a /// duplicate delivery. - fn push(&mut self, time: Instant, id: PackageId) -> Result { - // Return early on error to avoid confirmation of erroneous datagrams. - let duplicate = self.duplicates.process(id)?; - // Push to the buffer even duplicate packages, because the reason - // behind the re-delivery might be loss of the confirmation datagram. - self.buffer.push(time, id); - Ok(duplicate) + fn push(&mut self, time: Instant, id: PackageId) -> Result { + let result = self.received.process(id); + if let Ok(_) | Err(ReceivedIdError::Duplicate) = result { + // Push to the buffer even duplicate packages, because the reason + // behind the re-delivery might be loss of the confirmation + // datagram. + self.buffer.push(time, id); + } + result } } @@ -139,12 +141,12 @@ impl Connection for IdReceiver { } } -struct Duplicates { +struct Received { highest_id: Option, holes: AHashSet, } -impl Duplicates { +impl Received { fn new() -> Self { Self { highest_id: None, @@ -152,17 +154,22 @@ impl Duplicates { } } - /// Registers package as delivered and returns true if it was already - /// delivered in the past. - fn process(&mut self, id: PackageId) -> Result { + /// Registers package as delivered and returns delivery order continuity in + /// respect with earlier sent packages. + fn process(&mut self, id: PackageId) -> Result { let range_start = match self.highest_id { Some(highest) => match highest.ordering(id) { Ordering::Less => highest.incremented(), Ordering::Greater => { - return Ok(!self.holes.remove(&id)); + if self.holes.remove(&id) { + // TODO + return Ok(false); + } else { + return Err(ReceivedIdError::Duplicate); + } } Ordering::Equal => { - return Ok(true); + return Err(ReceivedIdError::Duplicate); } }, None => PackageId::zero(), @@ -171,7 +178,7 @@ impl Duplicates { let range = PackageIdRange::range(range_start, id); let skipped = range.size_hint().1.unwrap() + self.holes.len(); if skipped > MAX_SKIPPED { - return Err(PackageIdError::TooManySkipped(skipped)); + return Err(ReceivedIdError::TooManySkipped(skipped)); } self.highest_id = Some(id); @@ -179,12 +186,26 @@ impl Duplicates { self.holes.insert(hole); } - Ok(false) + Ok(if skipped == 0 { + IdContinuity::Continuous + } else { + IdContinuity::Sparse + }) } } +#[derive(Clone, Copy)] +pub(crate) enum IdContinuity { + /// Some of the earlier sent packages has not yet been delivered. + Sparse, + /// All of the previously sent packages has been delivered. + Continuous, +} + #[derive(Error, Debug)] -pub(crate) enum PackageIdError { +pub(crate) enum ReceivedIdError { + #[error("Duplicate package")] + Duplicate, #[error("Too many packages skipped: {0}")] TooManySkipped(usize), } @@ -256,7 +277,7 @@ mod tests { #[test] fn test_duplicates() { - let mut duplicates = Duplicates::new(); + let mut duplicates = Received::new(); assert!(!duplicates .process(PackageId::from_bytes(&[0, 0, 2])) @@ -289,7 +310,7 @@ mod tests { assert!(matches!( duplicates.process(PackageId::from_bytes(&[50, 0, 6])), - Err(PackageIdError::TooManySkipped(3276800)) + Err(ReceivedIdError::TooManySkipped(3276800)) )); } diff --git a/crates/net/src/connection/resend.rs b/crates/net/src/connection/resend.rs index f4a349c8..9bc90228 100644 --- a/crates/net/src/connection/resend.rs +++ b/crates/net/src/connection/resend.rs @@ -16,7 +16,7 @@ use super::{ databuf::DataBuf, }; use crate::{ - header::{DatagramHeader, PackageId, Peers}, + header::{DatagramHeader, PackageHeader, PackageId}, tasks::OutDatagram, }; @@ -40,13 +40,12 @@ impl Resends { &mut self, time: Instant, addr: SocketAddr, - id: PackageId, - peers: Peers, + header: PackageHeader, data: &[u8], ) { let mut book = self.book.lock().await; let queue = book.update(time, addr, Queue::new); - queue.push(id, peers, data, time); + queue.push(header, data, time); } /// Processes data with package confirmations. @@ -82,10 +81,10 @@ impl Resends { while let Some((addr, queue)) = book.next() { let failure = loop { match queue.reschedule(buf, time) { - RescheduleResult::Resend { len, id, peers } => { + RescheduleResult::Resend { len, header } => { datagrams .send(OutDatagram::new( - DatagramHeader::new_package(true, peers, id), + DatagramHeader::Package(header), buf[..len].to_vec(), addr, )) @@ -134,7 +133,7 @@ pub(crate) struct ResendResult { /// confirmed). struct Queue { queue: PriorityQueue, - meta: AHashMap, + headers: AHashMap, data: DataBuf, } @@ -142,7 +141,7 @@ impl Queue { fn new() -> Self { Self { queue: PriorityQueue::new(), - meta: AHashMap::new(), + headers: AHashMap::new(), data: DataBuf::new(), } } @@ -153,10 +152,10 @@ impl Queue { } /// Registers new package for re-sending until it is resolved. - fn push(&mut self, id: PackageId, peers: Peers, data: &[u8], now: Instant) { - self.queue.push(id, Timing::new(now)); - self.meta.insert(id, peers); - self.data.push(id, data); + fn push(&mut self, header: PackageHeader, data: &[u8], now: Instant) { + self.queue.push(header.id(), Timing::new(now)); + self.headers.insert(header.id(), header); + self.data.push(header.id(), data); } /// Marks a package as delivered. No more re-sends will be scheduled and @@ -164,7 +163,7 @@ impl Queue { fn resolve(&mut self, id: PackageId) { let result = self.queue.remove(&id); if result.is_some() { - self.meta.remove(&id); + self.headers.remove(&id); self.data.remove(id); } } @@ -194,8 +193,8 @@ impl Queue { Some(backoff) => { self.queue.change_priority(&id, backoff); let len = self.data.get(id, buf).unwrap(); - let peers = *self.meta.get(&id).unwrap(); - RescheduleResult::Resend { len, id, peers } + let header = *self.headers.get(&id).unwrap(); + RescheduleResult::Resend { len, header } } None => RescheduleResult::Failed, } @@ -220,8 +219,7 @@ pub(crate) enum RescheduleResult { Resend { /// Length of the datagram data (written to a buffer) in bytes. len: usize, - id: PackageId, - peers: Peers, + header: PackageHeader, }, /// No datagram is currently scheduled for an immediate resent. This /// variant holds soonest possible time of a next resend. diff --git a/crates/net/src/header.rs b/crates/net/src/header.rs index dde20021..55406138 100644 --- a/crates/net/src/header.rs +++ b/crates/net/src/header.rs @@ -4,14 +4,11 @@ use thiserror::Error; /// Number of bytes (at the beginning of each datagram) used up by the header. pub(crate) const HEADER_SIZE: usize = 4; - /// This bit is set in protocol control datagrams. const CONTROL_BIT: u8 = 0b1000_0000; -/// This bit is set on datagrams which must be delivered reliably. -const RELIABLE_BIT: u8 = 0b0100_0000; /// This bit is set on datagrams which are sent to the server instead of other /// players. -const SERVER_PEER_BIT: u8 = 0b0010_0000; +const SERVER_PEER_BIT: u8 = 0b0001_0000; #[derive(Clone, Copy, Debug, PartialEq)] pub(crate) enum DatagramHeader { @@ -20,14 +17,6 @@ pub(crate) enum DatagramHeader { } impl DatagramHeader { - pub(crate) fn new_package(reliable: bool, peers: Peers, id: PackageId) -> Self { - Self::Package(PackageHeader { - reliable, - peers, - id, - }) - } - /// Writes the header to the beginning of a bytes buffer. /// /// # Panics @@ -38,10 +27,7 @@ impl DatagramHeader { let (mask, id) = match self { Self::Confirmation => (CONTROL_BIT, [0, 0, 0]), Self::Package(package_header) => { - let mut mask = 0; - if package_header.reliable { - mask |= RELIABLE_BIT; - } + let mut mask = package_header.reliability().to_bits(); if matches!(package_header.peers, Peers::Server) { mask |= SERVER_PEER_BIT; } @@ -71,14 +57,14 @@ impl DatagramHeader { Err(HeaderError::Invalid) } } else { - let reliable = mask & RELIABLE_BIT > 0; + let reliability = Reliability::from_bits(mask)?; let peers = if mask & SERVER_PEER_BIT > 0 { Peers::Server } else { Peers::Players }; Ok(Self::Package(PackageHeader { - reliable, + reliability, peers, id: PackageId::from_bytes(&data[1..HEADER_SIZE]), })) @@ -93,8 +79,8 @@ impl fmt::Display for DatagramHeader { Self::Package(header) => { write!( f, - "Package {{ reliable: {}, peers: {}, id: {} }}", - header.reliable, header.peers, header.id + "Package {{ reliability: {}, peers: {}, id: {} }}", + header.reliability, header.peers, header.id ) } } @@ -104,14 +90,22 @@ impl fmt::Display for DatagramHeader { #[derive(Clone, Copy, Debug, PartialEq)] pub(crate) struct PackageHeader { /// True if the package is delivered reliably. - reliable: bool, + reliability: Reliability, peers: Peers, id: PackageId, } impl PackageHeader { - pub(crate) fn reliable(&self) -> bool { - self.reliable + pub(crate) fn new(reliability: Reliability, peers: Peers, id: PackageId) -> Self { + Self { + reliability, + peers, + id, + } + } + + pub(crate) fn reliability(&self) -> Reliability { + self.reliability } pub(crate) fn peers(&self) -> Peers { @@ -123,6 +117,61 @@ impl PackageHeader { } } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Reliability { + /// There are no guarantees on reliability, ordering or duplicate delivery + /// of the package. + Unreliable, + /// Non-duplicate delivery of the package is guaranteed. There are no + /// guarantees on ordering of the package with respect to other packages. + Unordered, + /// Non-duplicate delivery of the package is guaranteed. The package is + /// guaranteed to be delivered after all other previously reliably sent + /// packages. There are no guarantees on ordering of the package with + /// respect to other packages sent after this one. + SemiOrdered, +} + +impl Reliability { + fn to_bits(self) -> u8 { + let bits = match self { + Self::Unreliable => 0, + Self::Unordered => 1, + Self::SemiOrdered => 2, + }; + bits << 5 + } + + fn from_bits(bits: u8) -> Result { + let bits = (bits >> 5) & 3; + match bits { + 0 => Ok(Self::Unreliable), + 1 => Ok(Self::Unordered), + 2 => Ok(Self::SemiOrdered), + _ => Err(HeaderError::Invalid), + } + } + + /// Returns true if the package is delivered reliably, independently on + /// ordering. + pub fn is_reliable(&self) -> bool { + match self { + Self::SemiOrdered | Self::Unordered => true, + Self::Unreliable => false, + } + } +} + +impl fmt::Display for Reliability { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Unreliable => write!(f, "unreliable"), + Self::Unordered => write!(f, "unordered"), + Self::SemiOrdered => write!(f, "semi-ordered"), + } + } +} + #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Peers { /// Communication between networking server and a player/client. @@ -290,15 +339,31 @@ mod tests { fn test_write_header() { let mut buf = [0u8; 256]; - DatagramHeader::new_package(false, Peers::Server, PackageId::zero()).write(&mut buf); - assert_eq![&buf[0..4], &[0b0010_0000, 0, 0, 0]]; + DatagramHeader::Package(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Server, + PackageId::zero(), + )) + .write(&mut buf); + assert_eq![&buf[0..4], &[0b0101_0000, 0, 0, 0]]; assert_eq![&buf[4..], &[0; 252]]; - DatagramHeader::new_package(true, Peers::Server, 256.try_into().unwrap()).write(&mut buf); - assert_eq![&buf[0..4], &[0b0110_0000, 0, 1, 0]]; + + DatagramHeader::Package(PackageHeader::new( + Reliability::Unordered, + Peers::Server, + 256.try_into().unwrap(), + )) + .write(&mut buf); + assert_eq![&buf[0..4], &[0b0011_0000, 0, 1, 0]]; assert_eq![&buf[4..], &[0; 252]]; - DatagramHeader::new_package(true, Peers::Players, 1033.try_into().unwrap()).write(&mut buf); - assert_eq![&buf[0..4], &[0b0100_0000, 0, 4, 9]]; + DatagramHeader::Package(PackageHeader::new( + Reliability::Unordered, + Peers::Players, + 1033.try_into().unwrap(), + )) + .write(&mut buf); + assert_eq![&buf[0..4], &[0b0000_0000, 0, 4, 9]]; assert_eq![&buf[4..], &[0; 252]]; } @@ -309,19 +374,31 @@ mod tests { buf[0..4].copy_from_slice(&[64, 0, 0, 0]); assert_eq!( DatagramHeader::read(&buf).unwrap(), - DatagramHeader::new_package(true, Peers::Players, 0.try_into().unwrap()) + DatagramHeader::Package(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Players, + 0.try_into().unwrap() + )) ); - buf[0..4].copy_from_slice(&[64, 1, 0, 3]); + buf[0..4].copy_from_slice(&[32, 1, 0, 3]); assert_eq!( DatagramHeader::read(&buf).unwrap(), - DatagramHeader::new_package(true, Peers::Players, 65539.try_into().unwrap()) + DatagramHeader::Package(PackageHeader::new( + Reliability::Unordered, + Peers::Players, + 65539.try_into().unwrap() + )) ); - buf[0..4].copy_from_slice(&[32, 0, 0, 2]); + buf[0..4].copy_from_slice(&[16, 0, 0, 2]); assert_eq!( DatagramHeader::read(&buf).unwrap(), - DatagramHeader::new_package(false, Peers::Server, 2.try_into().unwrap()) + DatagramHeader::Package(PackageHeader::new( + Reliability::Unreliable, + Peers::Server, + 2.try_into().unwrap() + )) ); } diff --git a/crates/net/src/lib.rs b/crates/net/src/lib.rs index 66e9ac10..c39730e5 100644 --- a/crates/net/src/lib.rs +++ b/crates/net/src/lib.rs @@ -1,4 +1,4 @@ -pub use header::Peers; +pub use header::{Peers, Reliability}; pub use protocol::{Targets, MAX_PACKAGE_SIZE}; pub use socket::{RecvError, SendError, Socket, MAX_DATAGRAM_SIZE}; pub use tasks::{ diff --git a/crates/net/src/tasks/communicator.rs b/crates/net/src/tasks/communicator.rs index ddb25257..56f33786 100644 --- a/crates/net/src/tasks/communicator.rs +++ b/crates/net/src/tasks/communicator.rs @@ -8,7 +8,7 @@ use bincode::{ }; use crate::{ - header::Peers, + header::{Peers, Reliability}, protocol::{Targets, MAX_PACKAGE_SIZE}, }; @@ -20,7 +20,7 @@ const BINCODE_CONF: Configuration> = /// It cumulatively builds output packages from individual messages. pub struct PackageBuilder { - reliable: bool, + reliability: Reliability, peers: Peers, targets: Targets<'static>, buffer: Vec, @@ -29,12 +29,12 @@ pub struct PackageBuilder { } impl PackageBuilder { - pub fn new(reliable: bool, peers: Peers, targets: T) -> Self + pub fn new(reliability: Reliability, peers: Peers, targets: T) -> Self where T: Into>, { Self { - reliable, + reliability, peers, targets: targets.into(), buffer: vec![0; MAX_PACKAGE_SIZE], @@ -52,8 +52,12 @@ impl PackageBuilder { if self.used > 0 { self.buffer.truncate(self.used); - let package = - OutPackage::new(self.buffer, self.reliable, self.peers, self.targets.clone()); + let package = OutPackage::new( + self.buffer, + self.reliability, + self.peers, + self.targets.clone(), + ); packages.push(package); } @@ -74,7 +78,7 @@ impl PackageBuilder { self.used = 0; let package = - OutPackage::new(data, self.reliable, self.peers, self.targets.clone()); + OutPackage::new(data, self.reliability, self.peers, self.targets.clone()); self.packages.push(package); self.push_inner(message) @@ -97,7 +101,7 @@ impl PackageBuilder { /// A package to be send. pub struct OutPackage { pub(super) data: Vec, - reliable: bool, + reliability: Reliability, peers: Peers, pub(super) targets: Targets<'static>, } @@ -108,7 +112,7 @@ impl OutPackage { /// See also [`Self::new`]. pub fn encode_single( message: &E, - reliable: bool, + reliability: Reliability, peers: Peers, targets: T, ) -> Result @@ -117,35 +121,35 @@ impl OutPackage { T: Into>, { let data = encode_to_vec(message, BINCODE_CONF)?; - Ok(Self::new(data, reliable, peers, targets)) + Ok(Self::new(data, reliability, peers, targets)) } /// # Arguments /// /// * `data` - data to be send. /// - /// * `reliable` - whether to deliver the data reliably. + /// * `reliability` - package delivery reliability mode. /// /// * `targets` - package recipients. /// /// # Panics /// /// Panics if data is longer than [`MAX_PACKAGE_SIZE`]. - pub fn new(data: Vec, reliable: bool, peers: Peers, targets: T) -> Self + pub fn new(data: Vec, reliability: Reliability, peers: Peers, targets: T) -> Self where T: Into>, { assert!(data.len() < MAX_PACKAGE_SIZE); Self { data, - reliable, + reliability, peers, targets: targets.into(), } } - pub(super) fn reliable(&self) -> bool { - self.reliable + pub(super) fn reliability(&self) -> Reliability { + self.reliability } pub(super) fn peers(&self) -> Peers { @@ -156,7 +160,7 @@ impl OutPackage { /// A received message / datagram. pub struct InPackage { data: Vec, - reliable: bool, + reliability: Reliability, peers: Peers, source: SocketAddr, time: Instant, @@ -165,14 +169,14 @@ pub struct InPackage { impl InPackage { pub(super) fn new( data: Vec, - reliable: bool, + reliability: Reliability, peers: Peers, source: SocketAddr, time: Instant, ) -> Self { Self { data, - reliable, + reliability, peers, source, time, @@ -195,9 +199,8 @@ impl InPackage { } } - /// Whether the datagram was delivered reliably. - pub fn reliable(&self) -> bool { - self.reliable + pub fn reliability(&self) -> Reliability { + self.reliability } pub fn source(&self) -> SocketAddr { @@ -324,7 +327,7 @@ mod tests { } let mut builder = PackageBuilder::new( - true, + Reliability::Unordered, Peers::Players, "127.0.0.1:1111".parse::().unwrap(), ); @@ -365,7 +368,7 @@ mod tests { let package = InPackage { // Message::Two([3, 4]), Message::One(1286) data: vec![1, 3, 4, 0, 251, 5, 6], - reliable: false, + reliability: Reliability::Unreliable, peers: Peers::Players, source: "127.0.0.1:1111".parse().unwrap(), time: Instant::now(), diff --git a/crates/net/src/tasks/ureceiver.rs b/crates/net/src/tasks/ureceiver.rs index 42e33438..45b21bd7 100644 --- a/crates/net/src/tasks/ureceiver.rs +++ b/crates/net/src/tasks/ureceiver.rs @@ -47,7 +47,8 @@ pub(super) async fn run( }; let time = Instant::now(); - if datagram.header.reliable() { + if datagram.header.reliability().is_reliable() { + // TODO exploit this for reliability as well match confirms .received(time, datagram.source, datagram.header.id()) .await @@ -67,10 +68,12 @@ pub(super) async fn run( } } + // TODO hold datagram if reliable + let result = packages .send(InPackage::new( datagram.data, - datagram.header.reliable(), + datagram.header.reliability(), datagram.header.peers(), datagram.source, time, diff --git a/crates/net/src/tasks/usender.rs b/crates/net/src/tasks/usender.rs index 573bf586..4afa931e 100644 --- a/crates/net/src/tasks/usender.rs +++ b/crates/net/src/tasks/usender.rs @@ -6,7 +6,7 @@ use tracing::{error, info}; use super::{cancellation::CancellationSender, dsender::OutDatagram}; use crate::{ connection::Resends, - header::{DatagramHeader, PackageIdRange}, + header::{DatagramHeader, PackageHeader, PackageIdRange}, OutPackage, }; @@ -28,28 +28,21 @@ pub(super) async fn run( break; }; - let package_id = if package.reliable() { + let package_id = if package.reliability().is_reliable() { counter_reliable.next().unwrap() } else { counter_unreliable.next().unwrap() }; - let header = DatagramHeader::new_package(package.reliable(), package.peers(), package_id); + let package_header = PackageHeader::new(package.reliability(), package.peers(), package_id); + let header = DatagramHeader::Package(package_header); - if let DatagramHeader::Package(package_header) = header { - if package_header.reliable() { - let time = Instant::now(); - for target in &package.targets { - resends - .sent( - time, - target, - package_header.id(), - package_header.peers(), - &package.data, - ) - .await; - } + if package_header.reliability().is_reliable() { + let time = Instant::now(); + for target in &package.targets { + resends + .sent(time, target, package_header, &package.data) + .await; } }