From 4af2721aea8107411bef506c05423791ad1f0292 Mon Sep 17 00:00:00 2001 From: Martin Indra Date: Fri, 18 Aug 2023 22:05:13 +0200 Subject: [PATCH] de_net: Add message ordering --- crates/connector/tests/commands.rs | 1 - crates/multiplayer/src/game.rs | 29 ++++++++++---- crates/multiplayer/src/messages.rs | 63 ++++++++++++++++++------------ crates/multiplayer/src/stats.rs | 12 ++++-- crates/net/src/header.rs | 1 - 5 files changed, 67 insertions(+), 39 deletions(-) diff --git a/crates/connector/tests/commands.rs b/crates/connector/tests/commands.rs index c58b7047..952c7430 100644 --- a/crates/connector/tests/commands.rs +++ b/crates/connector/tests/commands.rs @@ -55,7 +55,6 @@ fn test() { comms_d.port = game_port; check_response!(comms_a, FromGame::Joined(1)); - comms_b.send(ToGame::Join).await; check_response!(comms_b, FromGame::Joined(2)); check_response!(comms_a, FromGame::PeerJoined(2)); diff --git a/crates/multiplayer/src/game.rs b/crates/multiplayer/src/game.rs index 6d70ab59..1a1ed34c 100644 --- a/crates/multiplayer/src/game.rs +++ b/crates/multiplayer/src/game.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use bevy::prelude::*; use de_core::{player::Player, schedule::PreMovement}; use de_messages::{FromGame, FromServer, GameOpenError, JoinError, Readiness, ToGame, ToServer}; +use de_net::Reliability; use crate::{ config::ConnectionType, @@ -102,7 +103,7 @@ impl From for SetReadinessEvent { fn open_or_join( conf: Res, mut main_server: EventWriter, - mut game_server: EventWriter>, + mut game_server: EventWriter, ) { match conf.connection_type() { ConnectionType::CreateGame { max_players, .. } => { @@ -116,7 +117,10 @@ fn open_or_join( } ConnectionType::JoinGame(_) => { info!("Sending a join-game request."); - game_server.send(ToGame::Join.into()); + game_server.send(ToGameServerEvent::new( + Reliability::SemiOrdered, + ToGame::Join, + )); } } } @@ -125,7 +129,7 @@ fn process_from_server( conf: Res, mut ports: ResMut, mut events: EventReader, - mut outputs: EventWriter>, + mut outputs: EventWriter, mut opened: EventWriter, mut fatals: EventWriter, ) { @@ -138,7 +142,10 @@ fn process_from_server( Ok(_) => { info!("Game on port {} opened.", *port); // Send something to open NAT. - outputs.send(ToGame::Ping(u32::MAX).into()); + outputs.send(ToGameServerEvent::new( + Reliability::Unordered, + ToGame::Ping(u32::MAX), + )); opened.send(GameOpenedEvent(SocketAddr::new(conf.server_host(), *port))); } Err(err) => { @@ -232,18 +239,24 @@ fn process_from_game( fn set_readiness( mut readiness_events: EventReader, - mut message_events: EventWriter>, + mut message_events: EventWriter, ) { let Some(readiness) = readiness_events.iter().last() else { return; }; - message_events.send(ToGameServerEvent::from(ToGame::Readiness(readiness.0))); + message_events.send(ToGameServerEvent::new( + Reliability::SemiOrdered, + ToGame::Readiness(readiness.0), + )); } -fn leave(mut server: EventWriter>) { +fn leave(mut server: EventWriter) { info!("Sending leave game message."); // Send this even if not yet joined because the join / open-game request // might already be processed. - server.send(ToGame::Leave.into()); + server.send(ToGameServerEvent::new( + Reliability::SemiOrdered, + ToGame::Leave, + )); } diff --git a/crates/multiplayer/src/messages.rs b/crates/multiplayer/src/messages.rs index 0ff03ea6..bd45ee0f 100644 --- a/crates/multiplayer/src/messages.rs +++ b/crates/multiplayer/src/messages.rs @@ -17,8 +17,7 @@ pub(crate) struct MessagesPlugin; impl Plugin for MessagesPlugin { fn build(&self, app: &mut App) { app.add_event::() - .add_event::>() - .add_event::>() + .add_event::() .add_event::() .add_event::() .add_systems(OnEnter(NetState::Connecting), setup) @@ -30,12 +29,8 @@ impl Plugin for MessagesPlugin { .run_if(on_event::()) .in_set(MessagesSet::SendMessages) .before(NetworkSet::SendPackages), - message_sender::> - .run_if(on_event::>()) - .in_set(MessagesSet::SendMessages) - .before(NetworkSet::SendPackages), - message_sender::> - .run_if(on_event::>()) + message_sender:: + .run_if(on_event::()) .in_set(MessagesSet::SendMessages) .before(NetworkSet::SendPackages), ), @@ -62,8 +57,8 @@ where { type Message: bincode::Encode; const PORT_TYPE: PortType; - const RELIABLE: bool; + fn reliability(&self) -> Reliability; fn message(&self) -> &Self::Message; } @@ -79,7 +74,10 @@ impl From for ToMainServerEvent { impl ToMessage for ToMainServerEvent { type Message = ToServer; const PORT_TYPE: PortType = PortType::Main; - const RELIABLE: bool = true; + + fn reliability(&self) -> Reliability { + Reliability::Unordered + } fn message(&self) -> &Self::Message { &self.0 @@ -87,21 +85,30 @@ impl ToMessage for ToMainServerEvent { } #[derive(Event)] -pub(crate) struct ToGameServerEvent(ToGame); +pub(crate) struct ToGameServerEvent { + reliability: Reliability, + message: ToGame, +} -impl From for ToGameServerEvent { - fn from(message: ToGame) -> Self { - Self(message) +impl ToGameServerEvent { + pub(crate) fn new(reliability: Reliability, message: ToGame) -> Self { + Self { + reliability, + message, + } } } -impl ToMessage for ToGameServerEvent { +impl ToMessage for ToGameServerEvent { type Message = ToGame; const PORT_TYPE: PortType = PortType::Game; - const RELIABLE: bool = R; + + fn reliability(&self) -> Reliability { + self.reliability + } fn message(&self) -> &Self::Message { - &self.0 + &self.message } } @@ -255,18 +262,24 @@ fn message_sender( return; }; let addr = SocketAddr::new(conf.server_host(), port); - let reliability = if E::RELIABLE { - Reliability::Unordered - } else { - Reliability::Unreliable - }; - let mut builder = PackageBuilder::new(reliability, Peers::Server, addr); + + let mut builder_unreliable = PackageBuilder::new(Reliability::Unreliable, Peers::Server, addr); + let mut builder_unordered = PackageBuilder::new(Reliability::Unordered, Peers::Server, addr); + let mut builder_semi = PackageBuilder::new(Reliability::SemiOrdered, Peers::Server, addr); for event in inputs.iter() { + let builder = match event.reliability() { + Reliability::Unreliable => &mut builder_unreliable, + Reliability::Unordered => &mut builder_unordered, + Reliability::SemiOrdered => &mut builder_semi, + }; builder.push(event.message()).unwrap(); } - for package in builder.build() { - outputs.send(package.into()); + + for builder in [builder_unreliable, builder_unordered, builder_semi] { + for package in builder.build() { + outputs.send(package.into()); + } } } diff --git a/crates/multiplayer/src/stats.rs b/crates/multiplayer/src/stats.rs index afd5821e..3972c439 100644 --- a/crates/multiplayer/src/stats.rs +++ b/crates/multiplayer/src/stats.rs @@ -6,6 +6,7 @@ use std::{ use bevy::prelude::*; use de_core::schedule::PreMovement; use de_messages::{FromGame, ToGame}; +use de_net::Reliability; use tracing::{debug, info, trace}; use crate::{ @@ -218,7 +219,7 @@ fn ping( mut timer: ResMut>, mut counter: ResMut, mut tracker: ResMut>, - mut messages: EventWriter>, + mut messages: EventWriter, ) { timer.0.tick(time.delta()); @@ -226,12 +227,15 @@ fn ping( for _ in 0..timer.0.times_finished_this_tick() { let id = counter.next(); tracker.register(id, time); - if R { + let reliability = if R { info!("Sending reliable Ping({id}).",); + Reliability::Unordered } else { trace!("Sending unreliable Ping({id}).",); - } - messages.send(ToGame::Ping(id).into()); + Reliability::Unreliable + }; + + messages.send(ToGameServerEvent::new(reliability, ToGame::Ping(id))); } } diff --git a/crates/net/src/header.rs b/crates/net/src/header.rs index 6477466e..ca4c1369 100644 --- a/crates/net/src/header.rs +++ b/crates/net/src/header.rs @@ -4,7 +4,6 @@ use thiserror::Error; /// Number of bytes (at the beginning of each datagram) used up by the header. pub(crate) const HEADER_SIZE: usize = 4; - /// This bit is set in protocol control datagrams. const CONTROL_BIT: u8 = 0b1000_0000; /// This bit is set on datagrams which are sent to the server instead of other