diff --git a/crates/connector/src/game/greceiver.rs b/crates/connector/src/game/greceiver.rs index b2154853..1061fac8 100644 --- a/crates/connector/src/game/greceiver.rs +++ b/crates/connector/src/game/greceiver.rs @@ -4,7 +4,7 @@ use async_std::{ channel::{Receiver, Sender}, task, }; -use de_net::{FromGame, JoinError, OutPackage, Peers, Targets, ToGame}; +use de_net::{FromGame, JoinError, OutPackage, Peers, Readiness, Targets, ToGame}; use tracing::{error, info, warn}; use super::state::{GameState, JoinError as JoinErrorInner}; @@ -99,11 +99,8 @@ impl GameProcessor { ToGame::Leave => { self.process_leave(message.meta).await; } - ToGame::Start => { - self.process_start().await; - } - ToGame::Initialized => { - self.process_initialized(message.meta).await; + ToGame::Readiness(readiness) => { + self.process_readiness(message.meta, readiness).await; } } @@ -249,15 +246,18 @@ impl GameProcessor { self.send_all(&FromGame::PeerLeft(id), None).await; } - async fn process_start(&mut self) { - if self.state.start().await { - self.send_all(&FromGame::Starting, None).await; - } - } - - async fn process_initialized(&mut self, meta: MessageMeta) { - if self.state.mark_initialized(meta.source).await { - self.send_all(&FromGame::Started, None).await; + async fn process_readiness(&mut self, meta: MessageMeta, readiness: Readiness) { + match self.state.update_readiness(meta.source, readiness).await { + Ok(progressed) => { + if progressed { + self.send_all(&FromGame::GameReadiness(readiness), None) + .await; + } + } + Err(err) => warn!( + "Invalid readiness update from {source:?}: {err:?}", + source = meta.source + ), } } diff --git a/crates/connector/src/game/state.rs b/crates/connector/src/game/state.rs index 4eb127a5..1415aec3 100644 --- a/crates/connector/src/game/state.rs +++ b/crates/connector/src/game/state.rs @@ -2,7 +2,7 @@ use std::{collections::hash_map::Entry, net::SocketAddr}; use ahash::AHashMap; use async_std::sync::{Arc, RwLock}; -use de_net::Targets; +use de_net::{Readiness, Targets}; use thiserror::Error; #[derive(Clone)] @@ -38,16 +38,17 @@ impl GameState { self.inner.write().await.remove(addr) } - /// If the game is in state `Open`, changes its state to `Starting` and - /// returns true. - pub(super) async fn start(&mut self) -> bool { - self.inner.write().await.start() - } - - /// Marks a player as initialized. Returns true if the game was just - /// started. - pub(super) async fn mark_initialized(&mut self, addr: SocketAddr) -> bool { - self.inner.write().await.mark_initialized(addr) + /// Updates readiness of a single player. Whole game readiness is updated + /// once all players reach another readiness stage. + /// + /// Returns true if game readiness progressed as a result (to the readiness + /// of the player). + pub(super) async fn update_readiness( + &mut self, + addr: SocketAddr, + readiness: Readiness, + ) -> Result { + self.inner.write().await.update_readiness(addr, readiness) } /// Constructs and returns package targets which includes all or all but @@ -64,7 +65,7 @@ impl GameState { struct GameStateInner { available_ids: AvailableIds, - phase: GamePhase, + readiness: Readiness, players: AHashMap, } @@ -72,7 +73,7 @@ impl GameStateInner { fn new(max_players: u8) -> Self { Self { available_ids: AvailableIds::new(max_players), - phase: GamePhase::Open, + readiness: Readiness::default(), players: AHashMap::new(), } } @@ -86,7 +87,7 @@ impl GameStateInner { } fn add(&mut self, addr: SocketAddr) -> Result { - if self.phase != GamePhase::Open { + if self.readiness != Readiness::NotReady { return Err(JoinError::GameNotOpened); } @@ -112,28 +113,48 @@ impl GameStateInner { } } - fn start(&mut self) -> bool { - if self.phase == GamePhase::Open { - self.phase = GamePhase::Starting; - true - } else { - false + fn update_readiness( + &mut self, + addr: SocketAddr, + readiness: Readiness, + ) -> Result { + let Some(player) = self.players.get_mut(&addr) else { + return Err(ReadinessUpdateError::UnknownClient(addr)); + }; + + if player.readiness > readiness { + return Err(ReadinessUpdateError::Downgrade { + from: player.readiness, + to: readiness, + }); } - } - fn mark_initialized(&mut self, addr: SocketAddr) -> bool { - let prev = self.phase; + if player.readiness == readiness { + return Ok(false); + } - if matches!(self.phase, GamePhase::Starting) { - if let Some(player) = self.players.get_mut(&addr) { - player.initialized = true; - } - if self.players.values().all(|p| p.initialized) { - self.phase = GamePhase::Started; - } + if player.readiness.progress() != Some(readiness) { + return Err(ReadinessUpdateError::Skip { + from: player.readiness, + to: readiness, + }); + } + + if player.readiness > self.readiness { + // The player is already ahead of the game, cannot move them further. + return Err(ReadinessUpdateError::Desync { + game: self.readiness, + client: readiness, + }); } - self.phase == GamePhase::Started && self.phase != prev + player.readiness = readiness; + + let previous = self.readiness; + self.readiness = self.players.values().map(|p| p.readiness).min().unwrap(); + let progressed = previous != self.readiness; + assert!(self.readiness == readiness || !progressed); + Ok(progressed) } fn targets(&self, exclude: Option) -> Option> { @@ -205,27 +226,32 @@ pub(super) enum JoinError { GameNotOpened, } +#[derive(Debug, Error, PartialEq)] +pub(super) enum ReadinessUpdateError { + #[error("Client {0:?} is not part of the game.")] + UnknownClient(SocketAddr), + #[error("Cannot downgrade client readiness from {from:?} to {to:?}.")] + Downgrade { from: Readiness, to: Readiness }, + #[error("Cannot upgrade client readiness from {from:?} to {to:?}.")] + Skip { from: Readiness, to: Readiness }, + #[error("Cannot change client readiness to {client:?} when game is at {game:?}.")] + Desync { game: Readiness, client: Readiness }, +} + struct Player { id: u8, - initialized: bool, + readiness: Readiness, } impl Player { fn new(id: u8) -> Self { Self { id, - initialized: false, + readiness: Readiness::default(), } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(super) enum GamePhase { - Open, - Starting, - Started, -} - #[cfg(test)] mod tests { use std::collections::HashSet; @@ -293,13 +319,57 @@ mod tests { state.add(client_a).unwrap(); state.add(client_b).unwrap(); - assert!(state.start()); - assert!(!state.start()); + assert_eq!(state.readiness, Readiness::NotReady); + + assert!(!state + .update_readiness(client_a, Readiness::NotReady) + .unwrap()); + assert_eq!(state.readiness, Readiness::NotReady); + + assert!(!state.update_readiness(client_b, Readiness::Ready).unwrap()); + assert_eq!(state.readiness, Readiness::NotReady); + assert!(state.update_readiness(client_a, Readiness::Ready).unwrap()); + assert_eq!(state.readiness, Readiness::Ready); assert_eq!(state.add(client_c), Err(JoinError::GameNotOpened)); - assert!(!state.mark_initialized(client_b)); - assert!(state.mark_initialized(client_a)); + assert_eq!( + state + .update_readiness(client_a, Readiness::Initialized) + .unwrap_err(), + ReadinessUpdateError::Skip { + from: Readiness::Ready, + to: Readiness::Initialized + } + ); + + assert!(!state + .update_readiness(client_b, Readiness::Prepared) + .unwrap()); + assert_eq!( + state + .update_readiness(client_b, Readiness::Initialized) + .unwrap_err(), + ReadinessUpdateError::Desync { + game: Readiness::Ready, + client: Readiness::Initialized + } + ); + assert_eq!(state.readiness, Readiness::Ready); + + assert!(state + .update_readiness(client_a, Readiness::Prepared) + .unwrap()); + assert_eq!(state.readiness, Readiness::Prepared); + + assert!(!state + .update_readiness(client_a, Readiness::Initialized) + .unwrap()); + assert_eq!(state.readiness, Readiness::Prepared); + assert!(state + .update_readiness(client_b, Readiness::Initialized) + .unwrap()); + assert_eq!(state.readiness, Readiness::Initialized); } #[test] diff --git a/crates/connector/tests/commands.rs b/crates/connector/tests/commands.rs index a60566ea..7abffc20 100644 --- a/crates/connector/tests/commands.rs +++ b/crates/connector/tests/commands.rs @@ -6,7 +6,7 @@ use std::{ use async_std::{future::timeout, task}; use de_net::{ self, ConnErrorReceiver, FromGame, FromServer, JoinError, OutPackage, PackageReceiver, - PackageSender, Peers, Socket, ToGame, ToServer, + PackageSender, Peers, Readiness, Socket, ToGame, ToServer, }; use ntest::timeout; @@ -60,29 +60,69 @@ fn test() { check_response!(comms_b, FromGame::Joined(2)); check_response!(comms_a, FromGame::PeerJoined(2)); - comms_a.send(ToGame::Start).await; - check_response!(comms_a, FromGame::Starting); - check_response!(comms_b, FromGame::Starting); + comms_a.send(ToGame::Readiness(Readiness::Ready)).await; + // The other player is not yet ready -> no message should be received. + assert!( + timeout(Duration::from_millis(500), comms_a.recv::()) + .await + .is_err() + ); + assert!( + timeout(Duration::from_millis(500), comms_b.recv::()) + .await + .is_err() + ); + comms_b.send(ToGame::Readiness(Readiness::Ready)).await; + + check_response!(comms_a, FromGame::GameReadiness(Readiness::Ready)); + check_response!(comms_b, FromGame::GameReadiness(Readiness::Ready)); comms_c.send(ToGame::Join).await; check_response!(comms_c, FromGame::JoinError(JoinError::GameNotOpened)); - comms_a.send(ToGame::Initialized).await; - // The other player is not yet initialized -> no message should be received. - assert!(timeout(Duration::from_secs(1), comms_a.recv::()) - .await - .is_err()); - assert!(timeout(Duration::from_secs(1), comms_b.recv::()) - .await - .is_err()); + comms_a.send(ToGame::Readiness(Readiness::Prepared)).await; + // The other player is not yet prepared -> no message should be received. + assert!( + timeout(Duration::from_millis(500), comms_a.recv::()) + .await + .is_err() + ); + assert!( + timeout(Duration::from_millis(500), comms_b.recv::()) + .await + .is_err() + ); + + comms_b.send(ToGame::Readiness(Readiness::Prepared)).await; - comms_b.send(ToGame::Initialized).await; - check_response!(comms_a, FromGame::Started); - check_response!(comms_b, FromGame::Started); + check_response!(comms_a, FromGame::GameReadiness(Readiness::Prepared)); + check_response!(comms_b, FromGame::GameReadiness(Readiness::Prepared)); comms_d.send(ToGame::Join).await; check_response!(comms_d, FromGame::JoinError(JoinError::GameNotOpened)); + comms_a + .send(ToGame::Readiness(Readiness::Initialized)) + .await; + // The other player is not yet initialized -> no message should be received. + assert!( + timeout(Duration::from_millis(500), comms_a.recv::()) + .await + .is_err() + ); + assert!( + timeout(Duration::from_millis(500), comms_b.recv::()) + .await + .is_err() + ); + + comms_b + .send(ToGame::Readiness(Readiness::Initialized)) + .await; + + check_response!(comms_a, FromGame::GameReadiness(Readiness::Initialized)); + check_response!(comms_b, FromGame::GameReadiness(Readiness::Initialized)); + assert!(comms_a.errors.is_empty()); assert!(comms_b.errors.is_empty()); assert!(comms_c.errors.is_empty()); diff --git a/crates/multiplayer/src/game.rs b/crates/multiplayer/src/game.rs index 286f6947..7629d997 100644 --- a/crates/multiplayer/src/game.rs +++ b/crates/multiplayer/src/game.rs @@ -189,13 +189,8 @@ fn process_from_game( FromGame::PeerLeft(id) => { info!("Peer {id} left."); } - FromGame::Starting => { - info!("Multiplayer game is starting."); - } - FromGame::Started => { - fatals.send(FatalErrorEvent::new( - "Multiplayer game is unexpectedly fully started.", - )); + FromGame::GameReadiness(readiness) => { + info!("Game readiness changed to: {readiness:?}"); } } } diff --git a/crates/net/src/lib.rs b/crates/net/src/lib.rs index c72159a5..a9172589 100644 --- a/crates/net/src/lib.rs +++ b/crates/net/src/lib.rs @@ -1,5 +1,5 @@ pub use header::Peers; -pub use messages::{FromGame, FromServer, GameOpenError, JoinError, ToGame, ToServer}; +pub use messages::{FromGame, FromServer, GameOpenError, JoinError, Readiness, ToGame, ToServer}; pub use protocol::{Targets, MAX_PACKAGE_SIZE}; pub use socket::{RecvError, SendError, Socket, MAX_DATAGRAM_SIZE}; pub use tasks::{ diff --git a/crates/net/src/messages.rs b/crates/net/src/messages.rs index 412a16fa..993e1fd5 100644 --- a/crates/net/src/messages.rs +++ b/crates/net/src/messages.rs @@ -43,11 +43,11 @@ pub enum ToGame { /// /// The game is automatically closed once all players disconnect. Leave, - /// This initiates game startup. - Start, - /// The game switches from starting state to started state once this - /// message is received from all players. - Initialized, + /// Sets readiness of the client. + /// + /// New readiness must be greater by one or equal to the current readiness. + /// See [`Readiness::progress`]. + Readiness(Readiness), } /// Message to be sent from a game server to a player/client (inside of a @@ -78,12 +78,8 @@ pub enum FromGame { /// Informs the player that another player with the given ID just /// disconnected from the same game. PeerLeft(u8), - /// Informs the client that the game is starting. The game is no longer - /// available for joining. The client should start game initialization. - Starting, - /// Informs the client that the game fully started because all clients are - /// initiated. - Started, + /// Game readiness has changed. + GameReadiness(Readiness), } #[derive(Debug, Encode, Decode)] @@ -96,3 +92,48 @@ pub enum JoinError { /// The player already participates on a different game. DifferentGame, } + +/// Readiness of an individual client or the game as a whole. It consists of a +/// progression of individual variants / stages. Once all clients progress to a +/// readiness stage, the game progresses to that stage as well. +#[derive(Clone, Copy, Default, Debug, Encode, Decode, PartialEq, Eq, PartialOrd, Ord)] +pub enum Readiness { + /// Initial stage for all clients and the game. + #[default] + NotReady, + /// The client / game is ready for the game to start. + Ready, + /// The client / game is prepared for game initialization to begin. + Prepared, + /// The client / game is ready for the game to start. + /// + /// The actually game-play happens in this readiness stage. + Initialized, +} + +impl Readiness { + pub fn progress(self) -> Option { + match self { + Self::NotReady => Some(Self::Ready), + Self::Ready => Some(Self::Prepared), + Self::Prepared => Some(Self::Initialized), + Self::Initialized => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_readiness() { + assert!(Readiness::default() < Readiness::Ready); + assert!(Readiness::NotReady < Readiness::Ready); + assert!(Readiness::Ready < Readiness::Prepared); + assert!(Readiness::Prepared < Readiness::Initialized); + assert!(Readiness::NotReady < Readiness::Initialized); + + assert_eq!(Readiness::NotReady.progress().unwrap(), Readiness::Ready); + } +}