From 993d7271e62609481b16f57aa8a71e1ec20dd326 Mon Sep 17 00:00:00 2001 From: Martin Indra Date: Thu, 10 Aug 2023 19:19:04 +0200 Subject: [PATCH 1/4] de_connector: Impl game starting --- crates/connector/src/game/greceiver.rs | 28 +++++++++ crates/connector/src/game/state.rs | 86 +++++++++++++++++++++++++- crates/multiplayer/src/game.rs | 13 ++++ crates/net/src/messages.rs | 13 ++++ 4 files changed, 138 insertions(+), 2 deletions(-) diff --git a/crates/connector/src/game/greceiver.rs b/crates/connector/src/game/greceiver.rs index ba9b148e..b2154853 100644 --- a/crates/connector/src/game/greceiver.rs +++ b/crates/connector/src/game/greceiver.rs @@ -99,6 +99,12 @@ impl GameProcessor { ToGame::Leave => { self.process_leave(message.meta).await; } + ToGame::Start => { + self.process_start().await; + } + ToGame::Initialized => { + self.process_initialized(message.meta).await; + } } if self.state.is_empty().await { @@ -199,6 +205,16 @@ impl GameProcessor { self.send(&FromGame::JoinError(JoinError::GameFull), meta.source) .await; } + JoinErrorInner::GameNotOpened => { + warn!( + "Player {:?} could not join game on port {} because the game is no \ + longer opened.", + meta.source, self.port + ); + + self.send(&FromGame::JoinError(JoinError::GameNotOpened), meta.source) + .await; + } } } } @@ -233,6 +249,18 @@ impl GameProcessor { self.send_all(&FromGame::PeerLeft(id), None).await; } + async fn process_start(&mut self) { + if self.state.start().await { + self.send_all(&FromGame::Starting, None).await; + } + } + + async fn process_initialized(&mut self, meta: MessageMeta) { + if self.state.mark_initialized(meta.source).await { + self.send_all(&FromGame::Started, None).await; + } + } + /// Send a reliable message to all players of the game. /// /// # Arguments diff --git a/crates/connector/src/game/state.rs b/crates/connector/src/game/state.rs index 495e0b57..b1d2adb9 100644 --- a/crates/connector/src/game/state.rs +++ b/crates/connector/src/game/state.rs @@ -38,6 +38,18 @@ impl GameState { self.inner.write().await.remove(addr) } + /// If the game is in state `Open`, changes its state to `Starting` and + /// returns true. + pub(super) async fn start(&mut self) -> bool { + self.inner.write().await.start() + } + + /// Marks a player as initialized. Returns true if the game was just + /// started. + pub(super) async fn mark_initialized(&mut self, addr: SocketAddr) -> bool { + self.inner.write().await.mark_initialized(addr) + } + /// Constructs and returns package targets which includes all or all but /// one players connected to the game. It returns None if there is no /// matching target. @@ -52,6 +64,7 @@ impl GameState { struct GameStateInner { available_ids: AvailableIds, + state: GameStateX, players: AHashMap, } @@ -59,6 +72,7 @@ impl GameStateInner { fn new(max_players: u8) -> Self { Self { available_ids: AvailableIds::new(max_players), + state: GameStateX::Open, players: AHashMap::new(), } } @@ -72,11 +86,15 @@ impl GameStateInner { } fn add(&mut self, addr: SocketAddr) -> Result { + if self.state != GameStateX::Open { + return Err(JoinError::GameNotOpened); + } + match self.players.entry(addr) { Entry::Occupied(_) => Err(JoinError::AlreadyJoined), Entry::Vacant(vacant) => match self.available_ids.lease() { Some(id) => { - vacant.insert(Player { id }); + vacant.insert(Player::new(id)); Ok(id) } None => Err(JoinError::GameFull), @@ -94,6 +112,30 @@ impl GameStateInner { } } + fn start(&mut self) -> bool { + if self.state == GameStateX::Open { + self.state = GameStateX::Starting; + true + } else { + false + } + } + + fn mark_initialized(&mut self, addr: SocketAddr) -> bool { + let prev = self.state; + + if matches!(self.state, GameStateX::Starting) { + if let Some(player) = self.players.get_mut(&addr) { + player.initialized = true; + } + if self.players.values().all(|p| p.initialized) { + self.state = GameStateX::Started; + } + } + + self.state == GameStateX::Started && self.state != prev + } + fn targets(&self, exclude: Option) -> Option> { let len = if exclude.map_or(false, |e| self.players.contains_key(&e)) { self.players.len() - 1 @@ -153,16 +195,36 @@ impl AvailableIds { } } -#[derive(Debug, Error)] +#[derive(Debug, Error, PartialEq)] pub(super) enum JoinError { #[error("The player has already joined the game.")] AlreadyJoined, #[error("The game is full.")] GameFull, + #[error("The game is no longer opened.")] + GameNotOpened, } struct Player { id: u8, + initialized: bool, +} + +impl Player { + fn new(id: u8) -> Self { + Self { + id, + initialized: false, + } + } +} + +// TODO better name +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) enum GameStateX { + Open, + Starting, + Started, } #[cfg(test)] @@ -221,6 +283,26 @@ mod tests { })); } + #[test] + fn test_transitions() { + let client_a: SocketAddr = "127.0.0.1:8081".parse().unwrap(); + let client_b: SocketAddr = "127.0.0.1:8082".parse().unwrap(); + let client_c: SocketAddr = "127.0.0.1:8083".parse().unwrap(); + + let mut state = GameStateInner::new(3); + + state.add(client_a).unwrap(); + state.add(client_b).unwrap(); + + assert!(state.start()); + assert!(!state.start()); + + assert_eq!(state.add(client_c), Err(JoinError::GameNotOpened)); + + assert!(!state.mark_initialized(client_b)); + assert!(state.mark_initialized(client_a)); + } + #[test] fn test_targets() { let mut state = GameStateInner::new(8); diff --git a/crates/multiplayer/src/game.rs b/crates/multiplayer/src/game.rs index 6a320127..286f6947 100644 --- a/crates/multiplayer/src/game.rs +++ b/crates/multiplayer/src/game.rs @@ -162,6 +162,11 @@ fn process_from_game( JoinError::GameFull => { fatals.send(FatalErrorEvent::new("Game is full, cannot join.")); } + JoinError::GameNotOpened => { + fatals.send(FatalErrorEvent::new( + "Game is no longer opened, cannot join.", + )); + } JoinError::AlreadyJoined => { fatals.send(FatalErrorEvent::new( "Already joined the game, cannot re-join.", @@ -184,6 +189,14 @@ fn process_from_game( FromGame::PeerLeft(id) => { info!("Peer {id} left."); } + FromGame::Starting => { + info!("Multiplayer game is starting."); + } + FromGame::Started => { + fatals.send(FatalErrorEvent::new( + "Multiplayer game is unexpectedly fully started.", + )); + } } } } diff --git a/crates/net/src/messages.rs b/crates/net/src/messages.rs index b5766e14..a54dc7a0 100644 --- a/crates/net/src/messages.rs +++ b/crates/net/src/messages.rs @@ -43,6 +43,11 @@ pub enum ToGame { /// /// The game is automatically closed once all players disconnect. Leave, + /// This initiates game startup. + Start, + /// The game switches from starting state to started state once this + /// message is received from all players. + Initialized, } /// Message to be sent from a game server to a player/client (inside of a @@ -73,11 +78,19 @@ pub enum FromGame { /// Informs the player that another player with the given ID just /// disconnected from the same game. PeerLeft(u8), + /// Informs the client that the game is starting. The game is no longer + /// available for joining. The client should start game initialization. + Starting, + /// Informs the client that the game fully started because all clients are + /// initiated. + Started, } #[derive(Encode, Decode)] pub enum JoinError { GameFull, + /// The game is no longer opened. + GameNotOpened, /// The player has already joined the game. AlreadyJoined, /// The player already participates on a different game. From 94fca06298032cf2245e7900441b641d9c10f29f Mon Sep 17 00:00:00 2001 From: Martin Indra Date: Thu, 10 Aug 2023 19:22:55 +0200 Subject: [PATCH 2/4] i9n test --- crates/connector/tests/commands.rs | 146 ++++++++++++++++++ .../tests/{integration.rs => network.rs} | 0 crates/net/src/messages.rs | 12 +- 3 files changed, 152 insertions(+), 6 deletions(-) create mode 100644 crates/connector/tests/commands.rs rename crates/connector/tests/{integration.rs => network.rs} (100%) diff --git a/crates/connector/tests/commands.rs b/crates/connector/tests/commands.rs new file mode 100644 index 00000000..38599e80 --- /dev/null +++ b/crates/connector/tests/commands.rs @@ -0,0 +1,146 @@ +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, +}; + +use async_std::{ + future::{timeout, TimeoutError}, + task, +}; +use de_net::{ + self, ConnErrorReceiver, FromGame, FromServer, JoinError, OutPackage, PackageReceiver, + PackageSender, Peers, Socket, ToGame, ToServer, +}; +use ntest::timeout; + +use crate::common::{spawn_and_wait, term_and_wait}; + +mod common; + +macro_rules! check_response { + ($comms:expr, $expect:pat) => { + let mut response = $comms.recv().await; + if response.len() != 1 { + panic!("Unexpected number of messages: {response:?}"); + } + + let response = response.pop().unwrap(); + match response { + $expect => (), + _ => panic!("Unexpected response: {response:?}"), + } + }; +} + +#[test] +#[timeout(10_000)] +fn test() { + let child = spawn_and_wait(); + + task::block_on(task::spawn(async { + let mut comms_a = Comms::init().await; + let mut comms_b = Comms::init().await; + let mut comms_c = Comms::init().await; + let mut comms_d = Comms::init().await; + + comms_a.send(ToServer::OpenGame { max_players: 3 }).await; + let mut response = comms_a.recv::().await; + assert_eq!(response.len(), 1); + let response = response.pop().unwrap(); + let game_port = match response { + FromServer::GameOpened { port } => port, + _ => panic!("Unexpected message: {response:?}"), + }; + + comms_a.port = game_port; + comms_b.port = game_port; + comms_c.port = game_port; + comms_d.port = game_port; + + check_response!(comms_a, FromGame::Joined(1)); + + comms_b.send(ToGame::Join).await; + check_response!(comms_b, FromGame::Joined(2)); + check_response!(comms_a, FromGame::PeerJoined(2)); + + comms_a.send(ToGame::Start).await; + check_response!(comms_a, FromGame::Starting); + check_response!(comms_b, FromGame::Starting); + + comms_c.send(ToGame::Join).await; + check_response!(comms_c, FromGame::JoinError(JoinError::GameNotOpened)); + + comms_a.send(ToGame::Initialized).await; + // The other player is not yet initialized -> no message should be received. + assert!(matches!( + timeout(Duration::from_secs(1), comms_a.recv::()).await, + Err(TimeoutError) + )); + assert!(matches!( + timeout(Duration::from_secs(1), comms_b.recv::()).await, + Err(TimeoutError) + )); + + comms_b.send(ToGame::Initialized).await; + check_response!(comms_a, FromGame::Started); + check_response!(comms_b, FromGame::Started); + + comms_d.send(ToGame::Join).await; + check_response!(comms_d, FromGame::JoinError(JoinError::GameNotOpened)); + + assert!(comms_a.errors.is_empty()); + assert!(comms_b.errors.is_empty()); + assert!(comms_c.errors.is_empty()); + })); + + term_and_wait(child); +} + +struct Comms { + host: IpAddr, + port: u16, + sender: PackageSender, + receiver: PackageReceiver, + errors: ConnErrorReceiver, +} + +impl Comms { + async fn init() -> Self { + let socket = Socket::bind(None).await.unwrap(); + let (sender, receiver, errors) = de_net::startup( + |t| { + task::spawn(t); + }, + socket, + ); + + Self { + host: IpAddr::V4(Ipv4Addr::LOCALHOST), + port: 8082, + sender, + receiver, + errors, + } + } + + async fn send(&self, message: E) + where + E: bincode::Encode, + { + let addr = SocketAddr::new(self.host, self.port); + let package = OutPackage::encode_single(&message, true, Peers::Server, addr).unwrap(); + self.sender.send(package).await.unwrap(); + } + + async fn recv

(&self) -> Vec

+ where + P: bincode::Decode, + { + let package = self.receiver.recv().await.unwrap(); + let mut messages = Vec::new(); + for message in package.decode::

() { + messages.push(message.unwrap()); + } + messages + } +} diff --git a/crates/connector/tests/integration.rs b/crates/connector/tests/network.rs similarity index 100% rename from crates/connector/tests/integration.rs rename to crates/connector/tests/network.rs diff --git a/crates/net/src/messages.rs b/crates/net/src/messages.rs index a54dc7a0..412a16fa 100644 --- a/crates/net/src/messages.rs +++ b/crates/net/src/messages.rs @@ -2,7 +2,7 @@ use bincode::{Decode, Encode}; /// Message to be sent from a player/client to a main server (outside of a /// game). -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum ToServer { /// Prompts the server to respond [`FromServer::Pong`] with the same ping ID. Ping(u32), @@ -13,7 +13,7 @@ pub enum ToServer { /// Message to be sent from a main server to a player/client (outside of a /// game). -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum FromServer { /// Response to [`ToServer::Ping`]. Pong(u32), @@ -25,7 +25,7 @@ pub enum FromServer { GameOpenError(GameOpenError), } -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum GameOpenError { /// The player opening the game has already joined a different game. DifferentGame, @@ -33,7 +33,7 @@ pub enum GameOpenError { /// Message to be sent from a player/client to a game server (inside of a /// game). -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum ToGame { /// Prompts the server to respond [`FromGame::Pong`] with the same ping ID. Ping(u32), @@ -56,7 +56,7 @@ pub enum ToGame { /// # Notes /// /// * Players are numbered from 1 to `max_players` (inclusive). -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum FromGame { /// Response to [`ToGame::Ping`]. Pong(u32), @@ -86,7 +86,7 @@ pub enum FromGame { Started, } -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum JoinError { GameFull, /// The game is no longer opened. From 7257349a8c5cc406ee24c0ee8863e7100eff073d Mon Sep 17 00:00:00 2001 From: Martin Indra Date: Thu, 10 Aug 2023 20:11:38 +0200 Subject: [PATCH 3/4] Rename --- crates/connector/src/game/state.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/crates/connector/src/game/state.rs b/crates/connector/src/game/state.rs index b1d2adb9..4eb127a5 100644 --- a/crates/connector/src/game/state.rs +++ b/crates/connector/src/game/state.rs @@ -64,7 +64,7 @@ impl GameState { struct GameStateInner { available_ids: AvailableIds, - state: GameStateX, + phase: GamePhase, players: AHashMap, } @@ -72,7 +72,7 @@ impl GameStateInner { fn new(max_players: u8) -> Self { Self { available_ids: AvailableIds::new(max_players), - state: GameStateX::Open, + phase: GamePhase::Open, players: AHashMap::new(), } } @@ -86,7 +86,7 @@ impl GameStateInner { } fn add(&mut self, addr: SocketAddr) -> Result { - if self.state != GameStateX::Open { + if self.phase != GamePhase::Open { return Err(JoinError::GameNotOpened); } @@ -113,8 +113,8 @@ impl GameStateInner { } fn start(&mut self) -> bool { - if self.state == GameStateX::Open { - self.state = GameStateX::Starting; + if self.phase == GamePhase::Open { + self.phase = GamePhase::Starting; true } else { false @@ -122,18 +122,18 @@ impl GameStateInner { } fn mark_initialized(&mut self, addr: SocketAddr) -> bool { - let prev = self.state; + let prev = self.phase; - if matches!(self.state, GameStateX::Starting) { + if matches!(self.phase, GamePhase::Starting) { if let Some(player) = self.players.get_mut(&addr) { player.initialized = true; } if self.players.values().all(|p| p.initialized) { - self.state = GameStateX::Started; + self.phase = GamePhase::Started; } } - self.state == GameStateX::Started && self.state != prev + self.phase == GamePhase::Started && self.phase != prev } fn targets(&self, exclude: Option) -> Option> { @@ -219,9 +219,8 @@ impl Player { } } -// TODO better name #[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(super) enum GameStateX { +pub(super) enum GamePhase { Open, Starting, Started, From b5c1f2886edccdb4b7b6545c544a9d95221cc7fa Mon Sep 17 00:00:00 2001 From: Martin Indra Date: Thu, 10 Aug 2023 20:15:13 +0200 Subject: [PATCH 4/4] Clippy --- crates/connector/tests/commands.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/crates/connector/tests/commands.rs b/crates/connector/tests/commands.rs index 38599e80..a60566ea 100644 --- a/crates/connector/tests/commands.rs +++ b/crates/connector/tests/commands.rs @@ -3,10 +3,7 @@ use std::{ time::Duration, }; -use async_std::{ - future::{timeout, TimeoutError}, - task, -}; +use async_std::{future::timeout, task}; use de_net::{ self, ConnErrorReceiver, FromGame, FromServer, JoinError, OutPackage, PackageReceiver, PackageSender, Peers, Socket, ToGame, ToServer, @@ -72,14 +69,12 @@ fn test() { comms_a.send(ToGame::Initialized).await; // The other player is not yet initialized -> no message should be received. - assert!(matches!( - timeout(Duration::from_secs(1), comms_a.recv::()).await, - Err(TimeoutError) - )); - assert!(matches!( - timeout(Duration::from_secs(1), comms_b.recv::()).await, - Err(TimeoutError) - )); + assert!(timeout(Duration::from_secs(1), comms_a.recv::()) + .await + .is_err()); + assert!(timeout(Duration::from_secs(1), comms_b.recv::()) + .await + .is_err()); comms_b.send(ToGame::Initialized).await; check_response!(comms_a, FromGame::Started);