From e2a5a800ceca2536d575b2c69965830d50698f5d Mon Sep 17 00:00:00 2001 From: Martin Indra Date: Sat, 26 Aug 2023 17:08:55 +0200 Subject: [PATCH] WIP --- crates/connector/src/game/mreceiver.rs | 34 ++- crates/connector/src/game/preceiver.rs | 51 +++-- crates/messages/src/lib.rs | 1 + crates/multiplayer/src/messages.rs | 28 +-- crates/net/src/lib.rs | 2 +- crates/net/src/tasks/builders.rs | 294 +++++++++++++++++++++++++ crates/net/src/tasks/communicator.rs | 121 +--------- crates/net/src/tasks/mod.rs | 6 +- 8 files changed, 371 insertions(+), 166 deletions(-) create mode 100644 crates/net/src/tasks/builders.rs diff --git a/crates/connector/src/game/mreceiver.rs b/crates/connector/src/game/mreceiver.rs index df65afc6..9e11a25b 100644 --- a/crates/connector/src/game/mreceiver.rs +++ b/crates/connector/src/game/mreceiver.rs @@ -4,14 +4,13 @@ use async_std::{channel::Sender, future::timeout}; use de_net::{PackageReceiver, Peers}; use tracing::{error, info, warn}; -use super::greceiver::ToGameMessage; -use crate::game::preceiver::PlayersPackage; +use super::{greceiver::ToGameMessage, preceiver::ToPlayersMessage}; pub(super) async fn run( port: u16, packages: PackageReceiver, server: Sender, - players: Sender, + players: Sender, ) { info!("Starting game server input processor on port {port}..."); @@ -51,20 +50,33 @@ pub(super) async fn run( } } Err(err) => { - warn!("Received invalid package: {err:?}"); + warn!("Received invalid game package: {err:?}"); break; } } } } Peers::Players => { - let _ = players - .send(PlayersPackage::new( - package.reliability(), - package.source(), - package.data(), - )) - .await; + for message_result in package.decode() { + match message_result { + Ok(message) => { + let result = players + .send(ToPlayersMessage::new( + package.reliability(), + package.source(), + message, + )) + .await; + if result.is_err() { + break; + } + } + Err(err) => { + warn!("Received invalid players package: {err:?}"); + break; + } + } + } } } } diff --git a/crates/connector/src/game/preceiver.rs b/crates/connector/src/game/preceiver.rs index b0a25be4..aed76c22 100644 --- a/crates/connector/src/game/preceiver.rs +++ b/crates/connector/src/game/preceiver.rs @@ -1,39 +1,41 @@ use std::net::SocketAddr; use async_std::channel::Receiver; -use de_messages::FromGame; -use de_net::{OutPackage, PackageSender, Peers, Reliability}; +use de_messages::{FromGame, FromPlayers, ToPlayers}; +use de_net::{OutPackage, PackageBuilder, PackageSender, PeerPackageBuilders, Peers, Reliability}; use tracing::{error, info, warn}; use super::state::GameState; /// A package destined to other players in the game. -pub(super) struct PlayersPackage { +pub(super) struct ToPlayersMessage { reliability: Reliability, source: SocketAddr, - data: Vec, + message: ToPlayers, } -impl PlayersPackage { - pub(super) fn new(reliability: Reliability, source: SocketAddr, data: Vec) -> Self { +impl ToPlayersMessage { + pub(super) fn new(reliability: Reliability, source: SocketAddr, message: ToPlayers) -> Self { Self { reliability, source, - data, + message, } } } pub(super) async fn run( port: u16, - packages: Receiver, + messages: Receiver, outputs: PackageSender, state: GameState, ) { info!("Starting game player package handler on port {port}..."); + let mut builders = PeerPackageBuilders::new(Peers::Players); + 'main: loop { - if packages.is_closed() { + if messages.is_closed() { break; } @@ -42,23 +44,23 @@ pub(super) async fn run( break; } - let Ok(package) = packages.recv().await else { + let Ok(message) = messages.recv().await else { break; }; - if !state.contains(package.source).await { + if !state.contains(message.source).await { warn!( "Received a player message from a non-participating client: {:?}.", - package.source + message.source ); let _ = outputs .send( OutPackage::encode_single( &FromGame::NotJoined, - package.reliability, + message.reliability, Peers::Server, - package.source, + message.source, ) .unwrap(), ) @@ -66,15 +68,18 @@ pub(super) async fn run( continue; } - for target in state.targets(Some(package.source)).await { - let result = outputs - .send(OutPackage::new( - package.data.clone(), - package.reliability, - Peers::Players, - target, - )) - .await; + // TODO use real player ID + // TODO handle output + builders + .push(message.reliability, &FromPlayers::new(1, message.message)) + .unwrap(); + + let targets = state.targets(Some(message.source)).await; + + // TODO accumulate to builder with a timeout + // TODO split this into multiple tasks + for output in builders.build(message.reliability, targets) { + let result = outputs.send(output).await; if result.is_err() { break 'main; } diff --git a/crates/messages/src/lib.rs b/crates/messages/src/lib.rs index 446e2397..2e120eb6 100644 --- a/crates/messages/src/lib.rs +++ b/crates/messages/src/lib.rs @@ -2,6 +2,7 @@ //! Connector during multiplayer game. pub use game::{FromGame, JoinError, Readiness, ToGame}; +pub use players::{FromPlayers, ToPlayers}; pub use server::{FromServer, GameOpenError, ToServer}; mod game; diff --git a/crates/multiplayer/src/messages.rs b/crates/multiplayer/src/messages.rs index 1c2086aa..25d30d46 100644 --- a/crates/multiplayer/src/messages.rs +++ b/crates/multiplayer/src/messages.rs @@ -1,9 +1,10 @@ use std::{net::SocketAddr, time::Instant}; use bevy::prelude::*; +use bincode::error::EncodeError; use de_core::schedule::PreMovement; use de_messages::{FromGame, FromServer, ToGame, ToServer}; -use de_net::{InPackage, PackageBuilder, Peers, Reliability}; +use de_net::{InPackage, OutPackage, PackageBuilder, Peers, Reliability}; use crate::{ config::ConnectionType, @@ -252,6 +253,7 @@ fn cleanup(mut commands: Commands) { fn message_sender( conf: Res, ports: Res, + mut builders: Local>, mut inputs: EventReader, mut outputs: EventWriter, ) where @@ -261,23 +263,23 @@ fn message_sender( warn!("Port not (yet) known."); return; }; - let addr = SocketAddr::new(conf.server_host(), port); - let mut unreliable = PackageBuilder::new(Reliability::Unreliable, Peers::Server, addr); - let mut unordered = PackageBuilder::new(Reliability::Unordered, Peers::Server, addr); - let mut semi_ordered = PackageBuilder::new(Reliability::SemiOrdered, Peers::Server, addr); + if builders.is_none() { + let addr = SocketAddr::new(conf.server_host(), port); + *builders = Some(PackageBuilders::new(addr)); + } + let builders = builders.as_mut().unwrap(); for event in inputs.iter() { - let builder = match event.reliability() { - Reliability::Unreliable => &mut unreliable, - Reliability::Unordered => &mut unordered, - Reliability::SemiOrdered => &mut semi_ordered, - }; - builder.push(event.message()).unwrap(); + builders.push(event.reliability(), event.message()).unwrap(); } - for builder in [unreliable, unordered, semi_ordered] { - for package in builder.build() { + for mut reliability in [ + Reliability::Unreliable, + Reliability::Unordered, + Reliability::SemiOrdered, + ] { + for package in builders.build(reliability) { outputs.send(package.into()); } } diff --git a/crates/net/src/lib.rs b/crates/net/src/lib.rs index 5170a99a..298d094c 100644 --- a/crates/net/src/lib.rs +++ b/crates/net/src/lib.rs @@ -3,7 +3,7 @@ pub use protocol::MAX_PACKAGE_SIZE; pub use socket::{RecvError, SendError, Socket, MAX_DATAGRAM_SIZE}; pub use tasks::{ startup, ConnErrorReceiver, ConnectionError, InPackage, MessageDecoder, OutPackage, - PackageBuilder, PackageReceiver, PackageSender, + PackageBuilder, PackageReceiver, PackageSender, PeerPackageBuilders, }; mod connection; diff --git a/crates/net/src/tasks/builders.rs b/crates/net/src/tasks/builders.rs new file mode 100644 index 00000000..c2c63816 --- /dev/null +++ b/crates/net/src/tasks/builders.rs @@ -0,0 +1,294 @@ +use std::{array::IntoIter, mem, net::SocketAddr}; + +use bincode::{ + config::{BigEndian, Configuration, Limit, Varint}, + encode_into_slice, + error::EncodeError, +}; + +use super::communicator::BINCODE_CONF; +use crate::{ + header::{Peers, Reliability}, + protocol::MAX_PACKAGE_SIZE, + OutPackage, +}; + +// TODO test this +// TODO docs +pub struct PeerPackageBuilders { + unreliable: PackageBuilder, + unordered: PackageBuilder, + semi_ordered: PackageBuilder, +} + +impl PeerPackageBuilders { + pub fn new(peer: Peers) -> Self { + Self { + unreliable: PackageBuilder::new(Reliability::Unreliable, peer), + unordered: PackageBuilder::new(Reliability::Unordered, peer), + semi_ordered: PackageBuilder::new(Reliability::SemiOrdered, peer), + } + } + + // TODO docs + pub fn push(&mut self, reliablity: Reliability, message: &E) -> Result<(), EncodeError> + where + E: bincode::Encode, + { + self.builder(reliablity).push(message) + } + + // TODO docs + pub fn build(&mut self, reliablity: Reliability, targets: T) -> PackageIterator + where + T: IntoIterator, + { + self.builder(reliablity).build(targets) + } + + fn builder(&mut self, reliablity: Reliability) -> &mut PackageBuilder { + match reliablity { + Reliability::Unreliable => &mut self.unreliable, + Reliability::Unordered => &mut self.unordered, + Reliability::SemiOrdered => &mut self.semi_ordered, + } + } +} + +/// It cumulatively builds output packages from individual messages. +pub struct PackageBuilder { + reliability: Reliability, + peers: Peers, + buffer: Vec, + used: usize, + packages: Vec, +} + +impl PackageBuilder { + pub fn new(reliability: Reliability, peers: Peers) -> Self { + Self { + reliability, + peers, + buffer: vec![0; MAX_PACKAGE_SIZE], + used: 0, + packages: Vec::new(), + } + } + + /// Build packages for a single target. See [`Self::build`]. + pub fn build_single(&mut self, target: SocketAddr) -> PackageIterator> { + self.build([target]) + } + + /// Returns output package iterator. + /// + /// The messages are distributed among the packages in a sequential order. + /// Each package is filled with as many messages as it can accommodate. + /// + /// Packages are grouped by target, i.e. all packages for the first target + /// are at the beginning, all packages for the second target follow, and so + /// on. + /// + /// This method call clear the builder so it can be used again. + /// + /// # Arguments + /// + /// * `targets` - packages for these targets are generated. + pub fn build(&mut self, targets: T) -> PackageIterator + where + T: IntoIterator, + { + if self.used > 0 { + self.build_package(); + } + + let mut packages = Vec::new(); + mem::swap(&mut packages, &mut self.packages); + + PackageIterator::new(packages, targets.into_iter()) + } + + /// Push another message to the builder so that it is included in one of + /// the resulting packages. + pub fn push(&mut self, message: &E) -> Result<(), EncodeError> + where + E: bincode::Encode, + { + match self.push_inner(message) { + Err(EncodeError::UnexpectedEnd) => { + self.build_package(); + + self.push_inner(message) + } + Err(err) => Err(err), + Ok(()) => Ok(()), + } + } + + /// Creates a package from buffered data, creates clean new data buffer, + /// and pushes the new package to buffered packages. + /// + /// # Panics + /// + /// Panics if there is no buffered data. + fn build_package(&mut self) { + assert!(self.used > 0); + + let mut data = vec![0; MAX_PACKAGE_SIZE]; + mem::swap(&mut data, &mut self.buffer); + + data.truncate(self.used); + self.used = 0; + + self.packages.push(AnonymousOutPackage { + data, + reliability: self.reliability, + peers: self.peers, + }); + } + + fn push_inner(&mut self, message: &E) -> Result<(), EncodeError> + where + E: bincode::Encode, + { + let len = encode_into_slice(message, &mut self.buffer[self.used..], BINCODE_CONF)?; + self.used += len; + Ok(()) + } +} + +struct AnonymousOutPackage { + data: Vec, + reliability: Reliability, + peers: Peers, +} + +impl AnonymousOutPackage { + // TODO naming according to rust API guide + fn to_package(&self, target: SocketAddr) -> OutPackage { + OutPackage::new(self.data.clone(), self.reliability, self.peers, target) + } +} + +pub struct PackageIterator +where + T: Iterator, +{ + packages: Vec, + index: usize, + targets: T, + current: Option, +} + +impl PackageIterator +where + T: Iterator, +{ + fn new(packages: Vec, targets: T) -> Self { + Self { + packages, + index: 0, + targets, + current: None, + } + } +} + +impl Iterator for PackageIterator +where + T: Iterator, +{ + type Item = OutPackage; + + fn next(&mut self) -> Option { + loop { + if self.current.is_none() { + self.current = self.targets.next(); + } + + let Some(target) = self.current else { + return None; + }; + + if self.index >= self.packages.len() { + self.current = None; + self.index = 0; + continue; + } + + let index = self.index; + self.index += 1; + + break Some(self.packages[index].to_package(target)); + } + } + + fn size_hint(&self) -> (usize, Option) { + let mut lower = if self.current.is_some() { + self.packages.len() - self.index + } else { + 0 + }; + let mut upper = lower; + + let (targets_lower, targets_upper) = self.targets.size_hint(); + lower += targets_lower * self.packages.len(); + + match targets_upper { + Some(targets_upper) => { + upper += targets_upper * self.packages.len(); + (lower, Some(upper)) + } + None => (lower, None), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_out_message_builder() { + #[derive(bincode::Encode)] + struct TestData { + values: [u64; 16], // up to 128 bytes + } + + let mut builder = PackageBuilder::new(Reliability::Unordered, Peers::Players); + + for i in 0..10 { + builder + .push(&TestData { + // Use large u64 so that the value cannot be shrunk. + values: [u64::MAX - (i as u64); 16], + }) + .unwrap(); + } + + let mut packages = builder.build([ + "127.0.0.1:1111".parse::().unwrap(), + "127.0.0.1:1112".parse::().unwrap(), + ]); + assert_eq!(packages.size_hint(), (7, Some(7))); + + for _ in 0..2 { + for _ in 0..3 { + let package = packages.next().unwrap(); + // 3 items + something extra for the encoding + assert!(package.data.len() >= 128 * 3); + // less then 4 items + assert!(package.data.len() < 128 * 4); + } + + let package = packages.next().unwrap(); + // last one contains only one leftover item + assert!(package.data.len() >= 128 * 3); + assert!(package.data.len() < 128 * 3); + } + + for _ in 0..5 { + assert!(packages.next().is_none()); + } + } +} diff --git a/crates/net/src/tasks/communicator.rs b/crates/net/src/tasks/communicator.rs index c87b1ee8..46abd640 100644 --- a/crates/net/src/tasks/communicator.rs +++ b/crates/net/src/tasks/communicator.rs @@ -1,94 +1,21 @@ -use std::{marker::PhantomData, mem, net::SocketAddr, ops::Deref, time::Instant}; +// TODO rename this module (packages.rs?) +use std::{marker::PhantomData, net::SocketAddr, ops::Deref, time::Instant}; use async_std::channel::{Receiver, Sender}; use bincode::{ config::{BigEndian, Configuration, Limit, Varint}, - decode_from_slice, encode_into_slice, encode_to_vec, + decode_from_slice, encode_to_vec, error::{DecodeError, EncodeError}, }; -use crate::{ - header::{Peers, Reliability}, - protocol::MAX_PACKAGE_SIZE, -}; +use crate::{protocol::MAX_PACKAGE_SIZE, Peers, Reliability}; -const BINCODE_CONF: Configuration> = +pub(super) const BINCODE_CONF: Configuration> = bincode::config::standard() .with_big_endian() .with_variable_int_encoding() .with_limit::(); -/// It cumulatively builds output packages from individual messages. -pub struct PackageBuilder { - reliability: Reliability, - peers: Peers, - target: SocketAddr, - buffer: Vec, - used: usize, - packages: Vec, -} - -impl PackageBuilder { - pub fn new(reliability: Reliability, peers: Peers, target: SocketAddr) -> Self { - Self { - reliability, - peers, - target, - buffer: vec![0; MAX_PACKAGE_SIZE], - used: 0, - packages: Vec::new(), - } - } - - /// Build output packages from all pushed messages. - /// - /// The messages are distributed among the packages in a sequential order. - /// Each package is filled with as many messages as it can accommodate. - pub fn build(mut self) -> Vec { - let mut packages = self.packages; - - if self.used > 0 { - self.buffer.truncate(self.used); - let package = OutPackage::new(self.buffer, self.reliability, self.peers, self.target); - packages.push(package); - } - - packages - } - - /// Push another message to the builder so that it is included in one of - /// the resulting packages. - pub fn push(&mut self, message: &E) -> Result<(), EncodeError> - where - E: bincode::Encode, - { - match self.push_inner(message) { - Err(EncodeError::UnexpectedEnd) => { - let mut data = vec![0; MAX_PACKAGE_SIZE]; - mem::swap(&mut data, &mut self.buffer); - data.truncate(self.used); - self.used = 0; - - let package = OutPackage::new(data, self.reliability, self.peers, self.target); - self.packages.push(package); - - self.push_inner(message) - } - Err(err) => Err(err), - Ok(()) => Ok(()), - } - } - - fn push_inner(&mut self, message: &E) -> Result<(), EncodeError> - where - E: bincode::Encode, - { - let len = encode_into_slice(message, &mut self.buffer[self.used..], BINCODE_CONF)?; - self.used += len; - Ok(()) - } -} - /// A package to be send. pub struct OutPackage { pub(super) data: Vec, @@ -306,44 +233,6 @@ mod tests { use super::*; - #[test] - fn test_out_message_builder() { - #[derive(bincode::Encode)] - struct TestData { - values: [u64; 16], // up to 128 bytes - } - - let mut builder = PackageBuilder::new( - Reliability::Unordered, - Peers::Players, - "127.0.0.1:1111".parse::().unwrap(), - ); - - for i in 0..10 { - builder - .push(&TestData { - // Use large u64 so that the value cannot be shrunk. - values: [u64::MAX - (i as u64); 16], - }) - .unwrap(); - } - - let packages = builder.build(); - assert_eq!(packages.len(), 4); - // 3 items + something extra for the encoding - assert!(packages[0].data.len() >= 128 * 3); - // less then 4 items - assert!(packages[0].data.len() < 128 * 4); - - assert!(packages[1].data.len() >= 128 * 3); - assert!(packages[1].data.len() < 128 * 4); - assert!(packages[2].data.len() >= 128 * 3); - assert!(packages[2].data.len() < 128 * 4); - // last one contains only one leftover item - assert!(packages[3].data.len() >= 128); - assert!(packages[3].data.len() < 128 * 2); - } - #[test] fn test_decoding() { #[derive(Decode, Debug, Eq, PartialEq)] diff --git a/crates/net/src/tasks/mod.rs b/crates/net/src/tasks/mod.rs index b52fa41c..7b89edd1 100644 --- a/crates/net/src/tasks/mod.rs +++ b/crates/net/src/tasks/mod.rs @@ -56,9 +56,10 @@ //! [`PackageReceiver`] respectively. use async_std::channel::bounded; +pub use builders::{PackageBuilder, PeerPackageBuilders}; pub use communicator::{ - ConnErrorReceiver, ConnectionError, InPackage, MessageDecoder, OutPackage, PackageBuilder, - PackageReceiver, PackageSender, + ConnErrorReceiver, ConnectionError, InPackage, MessageDecoder, OutPackage, PackageReceiver, + PackageSender, }; pub(crate) use dsender::OutDatagram; use futures::future::BoxFuture; @@ -71,6 +72,7 @@ use crate::{ Socket, }; +mod builders; mod cancellation; mod communicator; mod confirmer;