diff --git a/crates/connector/src/game/ereceiver.rs b/crates/connector/src/game/ereceiver.rs index 207b56d2..58359e48 100644 --- a/crates/connector/src/game/ereceiver.rs +++ b/crates/connector/src/game/ereceiver.rs @@ -5,9 +5,9 @@ use de_messages::ToGame; use de_net::{ConnErrorReceiver, Reliability}; use tracing::{error, info, warn}; -use super::greceiver::ToGameMessage; +use super::message::InMessage; -pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender) { +pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender>) { info!("Starting game connection error handler on port {port}..."); loop { @@ -26,7 +26,7 @@ pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender Self { - Self { - meta: MessageMeta { - source, - reliability, - }, - message, - } - } -} - -struct MessageMeta { - source: SocketAddr, - reliability: Reliability, -} - pub(super) struct GameProcessor { port: u16, owner: SocketAddr, - messages: Receiver, + messages: Receiver>, outputs: Sender, state: GameState, clients: Clients, @@ -46,7 +27,7 @@ impl GameProcessor { pub(super) fn new( port: u16, owner: SocketAddr, - messages: Receiver, + messages: Receiver>, outputs: Sender, state: GameState, clients: Clients, @@ -93,18 +74,18 @@ impl GameProcessor { continue; } - match message.message { + match message.message() { ToGame::Ping(id) => { - self.process_ping(message.meta, id).await; + self.process_ping(message.meta(), *id).await; } ToGame::Join => { - self.process_join(message.meta).await; + self.process_join(message.meta()).await; } ToGame::Leave => { - self.process_leave(message.meta).await; + self.process_leave(message.meta()).await; } ToGame::Readiness(readiness) => { - self.process_readiness(message.meta, readiness).await; + self.process_readiness(message.meta(), *readiness).await; } } @@ -122,8 +103,8 @@ impl GameProcessor { /// Returns true if the massage should be ignored and further handles such /// messages. - async fn handle_ignore(&self, message: &ToGameMessage) -> bool { - if matches!(message.message, ToGame::Join | ToGame::Leave) { + async fn handle_ignore(&self, message: &InMessage) -> bool { + if matches!(message.message(), ToGame::Join | ToGame::Leave) { // Join must be excluded from the condition because of the // chicken and egg problem. // @@ -132,22 +113,22 @@ impl GameProcessor { return false; } - if self.state.contains(message.meta.source).await { + if self.state.contains(message.meta().source).await { return false; } warn!( "Received a game message from a non-participating client: {:?}.", - message.meta.source + message.meta().source ); let _ = self .outputs .send( OutPackage::encode_single( &FromGame::NotJoined, - message.meta.reliability, + message.meta().reliability, Peers::Server, - message.meta.source, + message.meta().source, ) .unwrap(), ) diff --git a/crates/connector/src/game/message.rs b/crates/connector/src/game/message.rs new file mode 100644 index 00000000..d5de8280 --- /dev/null +++ b/crates/connector/src/game/message.rs @@ -0,0 +1,34 @@ +use std::net::SocketAddr; + +use de_net::Reliability; + +pub(super) struct InMessage { + meta: MessageMeta, + message: M, +} + +impl InMessage { + pub(super) fn new(source: SocketAddr, reliability: Reliability, message: M) -> Self { + Self { + meta: MessageMeta { + source, + reliability, + }, + message, + } + } + + pub(super) fn meta(&self) -> MessageMeta { + self.meta + } + + pub(super) fn message(&self) -> &M { + &self.message + } +} + +#[derive(Clone, Copy)] +pub(super) struct MessageMeta { + pub(super) source: SocketAddr, + pub(super) reliability: Reliability, +} diff --git a/crates/connector/src/game/mod.rs b/crates/connector/src/game/mod.rs index d03f6644..ac30a00b 100644 --- a/crates/connector/src/game/mod.rs +++ b/crates/connector/src/game/mod.rs @@ -8,6 +8,7 @@ use crate::clients::Clients; mod ereceiver; mod greceiver; +mod message; mod mreceiver; mod preceiver; mod state; diff --git a/crates/connector/src/game/mreceiver.rs b/crates/connector/src/game/mreceiver.rs index df65afc6..39375d38 100644 --- a/crates/connector/src/game/mreceiver.rs +++ b/crates/connector/src/game/mreceiver.rs @@ -1,17 +1,19 @@ use std::time::Duration; use async_std::{channel::Sender, future::timeout}; -use de_net::{PackageReceiver, Peers}; +use bincode::error::DecodeError; +use de_messages::{ToGame, ToPlayers}; +use de_net::{InPackage, PackageReceiver, Peers}; +use thiserror::Error; use tracing::{error, info, warn}; -use super::greceiver::ToGameMessage; -use crate::game::preceiver::PlayersPackage; +use super::message::InMessage; pub(super) async fn run( port: u16, packages: PackageReceiver, - server: Sender, - players: Sender, + server: Sender>, + players: Sender>, ) { info!("Starting game server input processor on port {port}..."); @@ -34,40 +36,55 @@ pub(super) async fn run( break; }; - match package.peers() { - Peers::Server => { - for message_result in package.decode() { - match message_result { - Ok(message) => { - let result = server - .send(ToGameMessage::new( - package.source(), - package.reliability(), - message, - )) - .await; - if result.is_err() { - break; - } - } - Err(err) => { - warn!("Received invalid package: {err:?}"); - break; - } + let peers = package.peers(); + let result = match peers { + Peers::Server => handle_package(package, &server).await, + Peers::Players => handle_package(package, &players).await, + }; + + if let Err(err) = result { + match err { + PackageHandleError::Decode(err) => { + warn!("Received invalid package: {err:?}"); + } + PackageHandleError::SendError => { + if peers == Peers::Server { + break; } } } - Peers::Players => { - let _ = players - .send(PlayersPackage::new( - package.reliability(), - package.source(), - package.data(), - )) - .await; - } } } info!("Game server input processor on port {port} finished."); } + +async fn handle_package( + package: InPackage, + output: &Sender>, +) -> Result<(), PackageHandleError> +where + M: bincode::Decode, +{ + for message_result in package.decode() { + let message = message_result.map_err(PackageHandleError::from)?; + output + .send(InMessage::new( + package.source(), + package.reliability(), + message, + )) + .await + .map_err(|_| PackageHandleError::SendError)?; + } + + Ok(()) +} + +#[derive(Debug, Error)] +enum PackageHandleError { + #[error("Decoding error: {0}")] + Decode(#[from] DecodeError), + #[error("Sending to output channel failed.")] + SendError, +} diff --git a/crates/connector/src/game/preceiver.rs b/crates/connector/src/game/preceiver.rs index 0968f74b..5cca279f 100644 --- a/crates/connector/src/game/preceiver.rs +++ b/crates/connector/src/game/preceiver.rs @@ -1,39 +1,22 @@ use std::net::SocketAddr; use async_std::channel::Receiver; -use de_messages::FromGame; -use de_net::{OutPackage, PackageSender, Peers, Reliability}; +use de_messages::{BorrowedFromPlayers, FromGame, ToPlayers}; +use de_net::{OutPackage, PackageBuilder, PackageSender, Peers, Reliability}; use tracing::{error, info, warn}; -use super::state::GameState; - -/// A package destined to other players in the game. -pub(super) struct PlayersPackage { - reliability: Reliability, - source: SocketAddr, - data: Vec, -} - -impl PlayersPackage { - pub(super) fn new(reliability: Reliability, source: SocketAddr, data: Vec) -> Self { - Self { - reliability, - source, - data, - } - } -} +use super::{message::InMessage, state::GameState}; pub(super) async fn run( port: u16, - packages: Receiver, + messages: Receiver>, outputs: PackageSender, state: GameState, ) { info!("Starting game player package handler on port {port}..."); - 'main: loop { - if packages.is_closed() { + loop { + if messages.is_closed() { break; } @@ -42,44 +25,56 @@ pub(super) async fn run( break; } - let Ok(package) = packages.recv().await else { + // TODO timeout if there is anything buffered + let Ok(message) = messages.recv().await else { break; }; - if !state.contains(package.source).await { + let meta = message.meta(); + let Some(player_id) = state.id(meta.source).await else { warn!( "Received a player message from a non-participating client: {:?}.", - package.source + message.meta().source ); let _ = outputs .send( OutPackage::encode_single( &FromGame::NotJoined, - package.reliability, + meta.reliability, Peers::Server, - package.source, + meta.source, ) .unwrap(), ) .await; continue; - } + }; - for target in state.targets(Some(package.source)).await { - let result = outputs - .send(OutPackage::from_slice( - &package.data, - package.reliability, - Peers::Players, - target, - )) - .await; - if result.is_err() { - break 'main; - } + let from_message = BorrowedFromPlayers::new(player_id, message.message()); + for target in state.targets(Some(meta.source)).await { + handle_player(from_message, target); + // TODO + // let result = outputs + // .send(OutPackage::from_slice( + // &package.data, + // package.reliability, + // Peers::Players, + // target, + // )) + // .await; + // if result.is_err() { + // break 'main; + // } } } info!("Game player package handler on port {port} finished."); } + +async fn handle_player<'a>(message: BorrowedFromPlayers<'a>, target: SocketAddr) { + // TODO proper reliability + let mut builder = PackageBuilder::new(Reliability::SemiOrdered, Peers::Players, target); + + // TODO +} diff --git a/crates/connector/src/game/state.rs b/crates/connector/src/game/state.rs index 344634bd..e559383f 100644 --- a/crates/connector/src/game/state.rs +++ b/crates/connector/src/game/state.rs @@ -27,6 +27,12 @@ impl GameState { self.inner.read().await.contains(addr) } + /// Returns ID of the player or None if such player is not part of the + /// game. + pub(super) async fn id(&self, addr: SocketAddr) -> Option { + self.inner.read().await.id(addr) + } + /// Adds a player to the game and returns ID of the added player. pub(super) async fn add(&mut self, addr: SocketAddr) -> Result { self.inner.write().await.add(addr) @@ -85,6 +91,10 @@ impl GameStateInner { self.players.contains_key(&addr) } + fn id(&self, addr: SocketAddr) -> Option { + self.players.get(&addr).map(|p| p.id) + } + fn add(&mut self, addr: SocketAddr) -> Result { if self.readiness != Readiness::NotReady { return Err(JoinError::GameNotOpened); diff --git a/crates/messages/src/lib.rs b/crates/messages/src/lib.rs index b4d4b5c3..ef8519f3 100644 --- a/crates/messages/src/lib.rs +++ b/crates/messages/src/lib.rs @@ -2,7 +2,9 @@ //! Connector during multiplayer game. pub use game::{FromGame, JoinError, Readiness, ToGame}; +pub use players::{BorrowedFromPlayers, FromPlayers, ToPlayers}; pub use server::{FromServer, GameOpenError, ToServer}; mod game; +mod players; mod server; diff --git a/crates/messages/src/players.rs b/crates/messages/src/players.rs new file mode 100644 index 00000000..ab677a4f --- /dev/null +++ b/crates/messages/src/players.rs @@ -0,0 +1,43 @@ +use bincode::{Decode, Encode}; + +/// Messages to be sent by a player/client or occasionally the game server to +/// other players. +#[derive(Debug, Decode)] +pub struct FromPlayers { + /// ID of the sending player. + source: u8, + /// Original message. + message: ToPlayers, +} + +impl FromPlayers { + /// ID of the sending player + pub fn source(&self) -> u8 { + self.source + } + + pub fn message(&self) -> &ToPlayers { + &self.message + } +} + +/// Messages to be sent by a player/client or occasionally the game server to +/// other players. +#[derive(Debug, Encode, Clone, Copy)] +pub struct BorrowedFromPlayers<'a> { + /// ID of the sending player. + source: u8, + /// Original message. + message: &'a ToPlayers, +} + +impl<'a> BorrowedFromPlayers<'a> { + pub fn new(source: u8, message: &'a ToPlayers) -> Self { + Self { source, message } + } +} + +/// Messages to be sent by a player/client or occasionally the game server to +/// other players. +#[derive(Debug, Encode, Decode)] +pub enum ToPlayers {} diff --git a/crates/multiplayer/src/messages.rs b/crates/multiplayer/src/messages.rs index 1c2086aa..ff0cd151 100644 --- a/crates/multiplayer/src/messages.rs +++ b/crates/multiplayer/src/messages.rs @@ -276,7 +276,7 @@ fn message_sender( builder.push(event.message()).unwrap(); } - for builder in [unreliable, unordered, semi_ordered] { + for mut builder in [unreliable, unordered, semi_ordered] { for package in builder.build() { outputs.send(package.into()); } diff --git a/crates/net/src/tasks/communicator/builder.rs b/crates/net/src/tasks/communicator/builder.rs index 824b084e..823881d1 100644 --- a/crates/net/src/tasks/communicator/builder.rs +++ b/crates/net/src/tasks/communicator/builder.rs @@ -1,4 +1,4 @@ -use std::{mem, net::SocketAddr}; +use std::{collections::VecDeque, mem, net::SocketAddr}; use bincode::{encode_into_slice, error::EncodeError}; @@ -15,7 +15,7 @@ pub struct PackageBuilder { target: SocketAddr, buffer: Vec, used: usize, - packages: Vec, + packages: VecDeque, } impl PackageBuilder { @@ -26,7 +26,7 @@ impl PackageBuilder { target, buffer: vec![0; MAX_DATAGRAM_SIZE], used: HEADER_SIZE, - packages: Vec::new(), + packages: VecDeque::new(), } } @@ -34,11 +34,13 @@ impl PackageBuilder { /// /// The messages are distributed among the packages in a sequential order. /// Each package is filled with as many messages as it can accommodate. - pub fn build(mut self) -> Vec { + // TODO update docs + // TODO arguments (older_than, full) + pub fn build(&mut self) -> PackageIterator<'_> { if self.used > HEADER_SIZE { self.build_package(false); } - self.packages + PackageIterator::new(&mut self.packages) } /// Push another message to the builder so that it is included in one of @@ -83,12 +85,42 @@ impl PackageBuilder { data.truncate(self.used); self.used = used; - self.packages.push(OutPackage::new( + self.packages.push_back(PackageSlot::new(OutPackage::new( data, self.reliability, self.peers, self.target, - )); + ))); + } +} + +struct PackageSlot { + package: OutPackage, +} + +impl PackageSlot { + fn new(package: OutPackage) -> Self { + Self { package } + } +} + +// TODO better name +// TODO docs + what happens if not fully consumed +pub struct PackageIterator<'a> { + packages: &'a mut VecDeque, +} + +impl<'a> PackageIterator<'a> { + fn new(packages: &'a mut VecDeque) -> Self { + Self { packages } + } +} + +impl<'a> Iterator for PackageIterator<'a> { + type Item = OutPackage; + + fn next(&mut self) -> Option { + self.packages.pop_front().map(|p| p.package) } } @@ -118,7 +150,7 @@ mod tests { .unwrap(); } - let packages = builder.build(); + let packages: Vec = builder.build().collect(); assert_eq!(packages.len(), 4); // 3 items + something extra for the encoding assert!(packages[0].data_slice().len() >= 128 * 3);