diff --git a/crates/connector/src/game/greceiver.rs b/crates/connector/src/game/greceiver.rs index ba9b148e..b2154853 100644 --- a/crates/connector/src/game/greceiver.rs +++ b/crates/connector/src/game/greceiver.rs @@ -99,6 +99,12 @@ impl GameProcessor { ToGame::Leave => { self.process_leave(message.meta).await; } + ToGame::Start => { + self.process_start().await; + } + ToGame::Initialized => { + self.process_initialized(message.meta).await; + } } if self.state.is_empty().await { @@ -199,6 +205,16 @@ impl GameProcessor { self.send(&FromGame::JoinError(JoinError::GameFull), meta.source) .await; } + JoinErrorInner::GameNotOpened => { + warn!( + "Player {:?} could not join game on port {} because the game is no \ + longer opened.", + meta.source, self.port + ); + + self.send(&FromGame::JoinError(JoinError::GameNotOpened), meta.source) + .await; + } } } } @@ -233,6 +249,18 @@ impl GameProcessor { self.send_all(&FromGame::PeerLeft(id), None).await; } + async fn process_start(&mut self) { + if self.state.start().await { + self.send_all(&FromGame::Starting, None).await; + } + } + + async fn process_initialized(&mut self, meta: MessageMeta) { + if self.state.mark_initialized(meta.source).await { + self.send_all(&FromGame::Started, None).await; + } + } + /// Send a reliable message to all players of the game. /// /// # Arguments diff --git a/crates/connector/src/game/state.rs b/crates/connector/src/game/state.rs index 495e0b57..4eb127a5 100644 --- a/crates/connector/src/game/state.rs +++ b/crates/connector/src/game/state.rs @@ -38,6 +38,18 @@ impl GameState { self.inner.write().await.remove(addr) } + /// If the game is in state `Open`, changes its state to `Starting` and + /// returns true. + pub(super) async fn start(&mut self) -> bool { + self.inner.write().await.start() + } + + /// Marks a player as initialized. Returns true if the game was just + /// started. + pub(super) async fn mark_initialized(&mut self, addr: SocketAddr) -> bool { + self.inner.write().await.mark_initialized(addr) + } + /// Constructs and returns package targets which includes all or all but /// one players connected to the game. It returns None if there is no /// matching target. @@ -52,6 +64,7 @@ impl GameState { struct GameStateInner { available_ids: AvailableIds, + phase: GamePhase, players: AHashMap, } @@ -59,6 +72,7 @@ impl GameStateInner { fn new(max_players: u8) -> Self { Self { available_ids: AvailableIds::new(max_players), + phase: GamePhase::Open, players: AHashMap::new(), } } @@ -72,11 +86,15 @@ impl GameStateInner { } fn add(&mut self, addr: SocketAddr) -> Result { + if self.phase != GamePhase::Open { + return Err(JoinError::GameNotOpened); + } + match self.players.entry(addr) { Entry::Occupied(_) => Err(JoinError::AlreadyJoined), Entry::Vacant(vacant) => match self.available_ids.lease() { Some(id) => { - vacant.insert(Player { id }); + vacant.insert(Player::new(id)); Ok(id) } None => Err(JoinError::GameFull), @@ -94,6 +112,30 @@ impl GameStateInner { } } + fn start(&mut self) -> bool { + if self.phase == GamePhase::Open { + self.phase = GamePhase::Starting; + true + } else { + false + } + } + + fn mark_initialized(&mut self, addr: SocketAddr) -> bool { + let prev = self.phase; + + if matches!(self.phase, GamePhase::Starting) { + if let Some(player) = self.players.get_mut(&addr) { + player.initialized = true; + } + if self.players.values().all(|p| p.initialized) { + self.phase = GamePhase::Started; + } + } + + self.phase == GamePhase::Started && self.phase != prev + } + fn targets(&self, exclude: Option) -> Option> { let len = if exclude.map_or(false, |e| self.players.contains_key(&e)) { self.players.len() - 1 @@ -153,16 +195,35 @@ impl AvailableIds { } } -#[derive(Debug, Error)] +#[derive(Debug, Error, PartialEq)] pub(super) enum JoinError { #[error("The player has already joined the game.")] AlreadyJoined, #[error("The game is full.")] GameFull, + #[error("The game is no longer opened.")] + GameNotOpened, } struct Player { id: u8, + initialized: bool, +} + +impl Player { + fn new(id: u8) -> Self { + Self { + id, + initialized: false, + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) enum GamePhase { + Open, + Starting, + Started, } #[cfg(test)] @@ -221,6 +282,26 @@ mod tests { })); } + #[test] + fn test_transitions() { + let client_a: SocketAddr = "127.0.0.1:8081".parse().unwrap(); + let client_b: SocketAddr = "127.0.0.1:8082".parse().unwrap(); + let client_c: SocketAddr = "127.0.0.1:8083".parse().unwrap(); + + let mut state = GameStateInner::new(3); + + state.add(client_a).unwrap(); + state.add(client_b).unwrap(); + + assert!(state.start()); + assert!(!state.start()); + + assert_eq!(state.add(client_c), Err(JoinError::GameNotOpened)); + + assert!(!state.mark_initialized(client_b)); + assert!(state.mark_initialized(client_a)); + } + #[test] fn test_targets() { let mut state = GameStateInner::new(8); diff --git a/crates/connector/tests/commands.rs b/crates/connector/tests/commands.rs new file mode 100644 index 00000000..a60566ea --- /dev/null +++ b/crates/connector/tests/commands.rs @@ -0,0 +1,141 @@ +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, +}; + +use async_std::{future::timeout, task}; +use de_net::{ + self, ConnErrorReceiver, FromGame, FromServer, JoinError, OutPackage, PackageReceiver, + PackageSender, Peers, Socket, ToGame, ToServer, +}; +use ntest::timeout; + +use crate::common::{spawn_and_wait, term_and_wait}; + +mod common; + +macro_rules! check_response { + ($comms:expr, $expect:pat) => { + let mut response = $comms.recv().await; + if response.len() != 1 { + panic!("Unexpected number of messages: {response:?}"); + } + + let response = response.pop().unwrap(); + match response { + $expect => (), + _ => panic!("Unexpected response: {response:?}"), + } + }; +} + +#[test] +#[timeout(10_000)] +fn test() { + let child = spawn_and_wait(); + + task::block_on(task::spawn(async { + let mut comms_a = Comms::init().await; + let mut comms_b = Comms::init().await; + let mut comms_c = Comms::init().await; + let mut comms_d = Comms::init().await; + + comms_a.send(ToServer::OpenGame { max_players: 3 }).await; + let mut response = comms_a.recv::().await; + assert_eq!(response.len(), 1); + let response = response.pop().unwrap(); + let game_port = match response { + FromServer::GameOpened { port } => port, + _ => panic!("Unexpected message: {response:?}"), + }; + + comms_a.port = game_port; + comms_b.port = game_port; + comms_c.port = game_port; + comms_d.port = game_port; + + check_response!(comms_a, FromGame::Joined(1)); + + comms_b.send(ToGame::Join).await; + check_response!(comms_b, FromGame::Joined(2)); + check_response!(comms_a, FromGame::PeerJoined(2)); + + comms_a.send(ToGame::Start).await; + check_response!(comms_a, FromGame::Starting); + check_response!(comms_b, FromGame::Starting); + + comms_c.send(ToGame::Join).await; + check_response!(comms_c, FromGame::JoinError(JoinError::GameNotOpened)); + + comms_a.send(ToGame::Initialized).await; + // The other player is not yet initialized -> no message should be received. + assert!(timeout(Duration::from_secs(1), comms_a.recv::()) + .await + .is_err()); + assert!(timeout(Duration::from_secs(1), comms_b.recv::()) + .await + .is_err()); + + comms_b.send(ToGame::Initialized).await; + check_response!(comms_a, FromGame::Started); + check_response!(comms_b, FromGame::Started); + + comms_d.send(ToGame::Join).await; + check_response!(comms_d, FromGame::JoinError(JoinError::GameNotOpened)); + + assert!(comms_a.errors.is_empty()); + assert!(comms_b.errors.is_empty()); + assert!(comms_c.errors.is_empty()); + })); + + term_and_wait(child); +} + +struct Comms { + host: IpAddr, + port: u16, + sender: PackageSender, + receiver: PackageReceiver, + errors: ConnErrorReceiver, +} + +impl Comms { + async fn init() -> Self { + let socket = Socket::bind(None).await.unwrap(); + let (sender, receiver, errors) = de_net::startup( + |t| { + task::spawn(t); + }, + socket, + ); + + Self { + host: IpAddr::V4(Ipv4Addr::LOCALHOST), + port: 8082, + sender, + receiver, + errors, + } + } + + async fn send(&self, message: E) + where + E: bincode::Encode, + { + let addr = SocketAddr::new(self.host, self.port); + let package = OutPackage::encode_single(&message, true, Peers::Server, addr).unwrap(); + self.sender.send(package).await.unwrap(); + } + + async fn recv

(&self) -> Vec

+ where + P: bincode::Decode, + { + let package = self.receiver.recv().await.unwrap(); + let mut messages = Vec::new(); + for message in package.decode::

() { + messages.push(message.unwrap()); + } + messages + } +} diff --git a/crates/connector/tests/integration.rs b/crates/connector/tests/network.rs similarity index 100% rename from crates/connector/tests/integration.rs rename to crates/connector/tests/network.rs diff --git a/crates/multiplayer/src/game.rs b/crates/multiplayer/src/game.rs index 6a320127..286f6947 100644 --- a/crates/multiplayer/src/game.rs +++ b/crates/multiplayer/src/game.rs @@ -162,6 +162,11 @@ fn process_from_game( JoinError::GameFull => { fatals.send(FatalErrorEvent::new("Game is full, cannot join.")); } + JoinError::GameNotOpened => { + fatals.send(FatalErrorEvent::new( + "Game is no longer opened, cannot join.", + )); + } JoinError::AlreadyJoined => { fatals.send(FatalErrorEvent::new( "Already joined the game, cannot re-join.", @@ -184,6 +189,14 @@ fn process_from_game( FromGame::PeerLeft(id) => { info!("Peer {id} left."); } + FromGame::Starting => { + info!("Multiplayer game is starting."); + } + FromGame::Started => { + fatals.send(FatalErrorEvent::new( + "Multiplayer game is unexpectedly fully started.", + )); + } } } } diff --git a/crates/net/src/messages.rs b/crates/net/src/messages.rs index b5766e14..412a16fa 100644 --- a/crates/net/src/messages.rs +++ b/crates/net/src/messages.rs @@ -2,7 +2,7 @@ use bincode::{Decode, Encode}; /// Message to be sent from a player/client to a main server (outside of a /// game). -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum ToServer { /// Prompts the server to respond [`FromServer::Pong`] with the same ping ID. Ping(u32), @@ -13,7 +13,7 @@ pub enum ToServer { /// Message to be sent from a main server to a player/client (outside of a /// game). -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum FromServer { /// Response to [`ToServer::Ping`]. Pong(u32), @@ -25,7 +25,7 @@ pub enum FromServer { GameOpenError(GameOpenError), } -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum GameOpenError { /// The player opening the game has already joined a different game. DifferentGame, @@ -33,7 +33,7 @@ pub enum GameOpenError { /// Message to be sent from a player/client to a game server (inside of a /// game). -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum ToGame { /// Prompts the server to respond [`FromGame::Pong`] with the same ping ID. Ping(u32), @@ -43,6 +43,11 @@ pub enum ToGame { /// /// The game is automatically closed once all players disconnect. Leave, + /// This initiates game startup. + Start, + /// The game switches from starting state to started state once this + /// message is received from all players. + Initialized, } /// Message to be sent from a game server to a player/client (inside of a @@ -51,7 +56,7 @@ pub enum ToGame { /// # Notes /// /// * Players are numbered from 1 to `max_players` (inclusive). -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum FromGame { /// Response to [`ToGame::Ping`]. Pong(u32), @@ -73,11 +78,19 @@ pub enum FromGame { /// Informs the player that another player with the given ID just /// disconnected from the same game. PeerLeft(u8), + /// Informs the client that the game is starting. The game is no longer + /// available for joining. The client should start game initialization. + Starting, + /// Informs the client that the game fully started because all clients are + /// initiated. + Started, } -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum JoinError { GameFull, + /// The game is no longer opened. + GameNotOpened, /// The player has already joined the game. AlreadyJoined, /// The player already participates on a different game.