Skip to content

Commit

Permalink
de_connector: Improve readiness staging (#680)
Browse files Browse the repository at this point in the history
- This makes it possible to wait a bit until all clients enter the game
and become prepared to receive messages related to game initialization
(like some kind of to be implemented map object spawn messages).
- It changes how the game is started -- all clients must be ready for
that.
- All of that in a unified step-by-step progression of readiness stages.
  • Loading branch information
Indy2222 authored Aug 15, 2023
1 parent 66cff69 commit b038cc3
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 93 deletions.
30 changes: 15 additions & 15 deletions crates/connector/src/game/greceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_std::{
channel::{Receiver, Sender},
task,
};
use de_net::{FromGame, JoinError, OutPackage, Peers, Targets, ToGame};
use de_net::{FromGame, JoinError, OutPackage, Peers, Readiness, Targets, ToGame};
use tracing::{error, info, warn};

use super::state::{GameState, JoinError as JoinErrorInner};
Expand Down Expand Up @@ -99,11 +99,8 @@ impl GameProcessor {
ToGame::Leave => {
self.process_leave(message.meta).await;
}
ToGame::Start => {
self.process_start().await;
}
ToGame::Initialized => {
self.process_initialized(message.meta).await;
ToGame::Readiness(readiness) => {
self.process_readiness(message.meta, readiness).await;
}
}

Expand Down Expand Up @@ -249,15 +246,18 @@ impl GameProcessor {
self.send_all(&FromGame::PeerLeft(id), None).await;
}

async fn process_start(&mut self) {
if self.state.start().await {
self.send_all(&FromGame::Starting, None).await;
}
}

async fn process_initialized(&mut self, meta: MessageMeta) {
if self.state.mark_initialized(meta.source).await {
self.send_all(&FromGame::Started, None).await;
async fn process_readiness(&mut self, meta: MessageMeta, readiness: Readiness) {
match self.state.update_readiness(meta.source, readiness).await {
Ok(progressed) => {
if progressed {
self.send_all(&FromGame::GameReadiness(readiness), None)
.await;
}
}
Err(err) => warn!(
"Invalid readiness update from {source:?}: {err:?}",
source = meta.source
),
}
}

Expand Down
158 changes: 114 additions & 44 deletions crates/connector/src/game/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::hash_map::Entry, net::SocketAddr};

use ahash::AHashMap;
use async_std::sync::{Arc, RwLock};
use de_net::Targets;
use de_net::{Readiness, Targets};
use thiserror::Error;

#[derive(Clone)]
Expand Down Expand Up @@ -38,16 +38,17 @@ impl GameState {
self.inner.write().await.remove(addr)
}

/// If the game is in state `Open`, changes its state to `Starting` and
/// returns true.
pub(super) async fn start(&mut self) -> bool {
self.inner.write().await.start()
}

/// Marks a player as initialized. Returns true if the game was just
/// started.
pub(super) async fn mark_initialized(&mut self, addr: SocketAddr) -> bool {
self.inner.write().await.mark_initialized(addr)
/// Updates readiness of a single player. Whole game readiness is updated
/// once all players reach another readiness stage.
///
/// Returns true if game readiness progressed as a result (to the readiness
/// of the player).
pub(super) async fn update_readiness(
&mut self,
addr: SocketAddr,
readiness: Readiness,
) -> Result<bool, ReadinessUpdateError> {
self.inner.write().await.update_readiness(addr, readiness)
}

/// Constructs and returns package targets which includes all or all but
Expand All @@ -64,15 +65,15 @@ impl GameState {

struct GameStateInner {
available_ids: AvailableIds,
phase: GamePhase,
readiness: Readiness,
players: AHashMap<SocketAddr, Player>,
}

impl GameStateInner {
fn new(max_players: u8) -> Self {
Self {
available_ids: AvailableIds::new(max_players),
phase: GamePhase::Open,
readiness: Readiness::default(),
players: AHashMap::new(),
}
}
Expand All @@ -86,7 +87,7 @@ impl GameStateInner {
}

fn add(&mut self, addr: SocketAddr) -> Result<u8, JoinError> {
if self.phase != GamePhase::Open {
if self.readiness != Readiness::NotReady {
return Err(JoinError::GameNotOpened);
}

Expand All @@ -112,28 +113,48 @@ impl GameStateInner {
}
}

fn start(&mut self) -> bool {
if self.phase == GamePhase::Open {
self.phase = GamePhase::Starting;
true
} else {
false
fn update_readiness(
&mut self,
addr: SocketAddr,
readiness: Readiness,
) -> Result<bool, ReadinessUpdateError> {
let Some(player) = self.players.get_mut(&addr) else {
return Err(ReadinessUpdateError::UnknownClient(addr));
};

if player.readiness > readiness {
return Err(ReadinessUpdateError::Downgrade {
from: player.readiness,
to: readiness,
});
}
}

fn mark_initialized(&mut self, addr: SocketAddr) -> bool {
let prev = self.phase;
if player.readiness == readiness {
return Ok(false);
}

if matches!(self.phase, GamePhase::Starting) {
if let Some(player) = self.players.get_mut(&addr) {
player.initialized = true;
}
if self.players.values().all(|p| p.initialized) {
self.phase = GamePhase::Started;
}
if player.readiness.progress() != Some(readiness) {
return Err(ReadinessUpdateError::Skip {
from: player.readiness,
to: readiness,
});
}

if player.readiness > self.readiness {
// The player is already ahead of the game, cannot move them further.
return Err(ReadinessUpdateError::Desync {
game: self.readiness,
client: readiness,
});
}

self.phase == GamePhase::Started && self.phase != prev
player.readiness = readiness;

let previous = self.readiness;
self.readiness = self.players.values().map(|p| p.readiness).min().unwrap();
let progressed = previous != self.readiness;
assert!(self.readiness == readiness || !progressed);
Ok(progressed)
}

fn targets(&self, exclude: Option<SocketAddr>) -> Option<Targets<'static>> {
Expand Down Expand Up @@ -205,27 +226,32 @@ pub(super) enum JoinError {
GameNotOpened,
}

#[derive(Debug, Error, PartialEq)]
pub(super) enum ReadinessUpdateError {
#[error("Client {0:?} is not part of the game.")]
UnknownClient(SocketAddr),
#[error("Cannot downgrade client readiness from {from:?} to {to:?}.")]
Downgrade { from: Readiness, to: Readiness },
#[error("Cannot upgrade client readiness from {from:?} to {to:?}.")]
Skip { from: Readiness, to: Readiness },
#[error("Cannot change client readiness to {client:?} when game is at {game:?}.")]
Desync { game: Readiness, client: Readiness },
}

struct Player {
id: u8,
initialized: bool,
readiness: Readiness,
}

impl Player {
fn new(id: u8) -> Self {
Self {
id,
initialized: false,
readiness: Readiness::default(),
}
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) enum GamePhase {
Open,
Starting,
Started,
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
Expand Down Expand Up @@ -293,13 +319,57 @@ mod tests {
state.add(client_a).unwrap();
state.add(client_b).unwrap();

assert!(state.start());
assert!(!state.start());
assert_eq!(state.readiness, Readiness::NotReady);

assert!(!state
.update_readiness(client_a, Readiness::NotReady)
.unwrap());
assert_eq!(state.readiness, Readiness::NotReady);

assert!(!state.update_readiness(client_b, Readiness::Ready).unwrap());
assert_eq!(state.readiness, Readiness::NotReady);
assert!(state.update_readiness(client_a, Readiness::Ready).unwrap());
assert_eq!(state.readiness, Readiness::Ready);

assert_eq!(state.add(client_c), Err(JoinError::GameNotOpened));

assert!(!state.mark_initialized(client_b));
assert!(state.mark_initialized(client_a));
assert_eq!(
state
.update_readiness(client_a, Readiness::Initialized)
.unwrap_err(),
ReadinessUpdateError::Skip {
from: Readiness::Ready,
to: Readiness::Initialized
}
);

assert!(!state
.update_readiness(client_b, Readiness::Prepared)
.unwrap());
assert_eq!(
state
.update_readiness(client_b, Readiness::Initialized)
.unwrap_err(),
ReadinessUpdateError::Desync {
game: Readiness::Ready,
client: Readiness::Initialized
}
);
assert_eq!(state.readiness, Readiness::Ready);

assert!(state
.update_readiness(client_a, Readiness::Prepared)
.unwrap());
assert_eq!(state.readiness, Readiness::Prepared);

assert!(!state
.update_readiness(client_a, Readiness::Initialized)
.unwrap());
assert_eq!(state.readiness, Readiness::Prepared);
assert!(state
.update_readiness(client_b, Readiness::Initialized)
.unwrap());
assert_eq!(state.readiness, Readiness::Initialized);
}

#[test]
Expand Down
70 changes: 55 additions & 15 deletions crates/connector/tests/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use async_std::{future::timeout, task};
use de_net::{
self, ConnErrorReceiver, FromGame, FromServer, JoinError, OutPackage, PackageReceiver,
PackageSender, Peers, Socket, ToGame, ToServer,
PackageSender, Peers, Readiness, Socket, ToGame, ToServer,
};
use ntest::timeout;

Expand Down Expand Up @@ -60,29 +60,69 @@ fn test() {
check_response!(comms_b, FromGame::Joined(2));
check_response!(comms_a, FromGame::PeerJoined(2));

comms_a.send(ToGame::Start).await;
check_response!(comms_a, FromGame::Starting);
check_response!(comms_b, FromGame::Starting);
comms_a.send(ToGame::Readiness(Readiness::Ready)).await;
// The other player is not yet ready -> no message should be received.
assert!(
timeout(Duration::from_millis(500), comms_a.recv::<FromGame>())
.await
.is_err()
);
assert!(
timeout(Duration::from_millis(500), comms_b.recv::<FromGame>())
.await
.is_err()
);
comms_b.send(ToGame::Readiness(Readiness::Ready)).await;

check_response!(comms_a, FromGame::GameReadiness(Readiness::Ready));
check_response!(comms_b, FromGame::GameReadiness(Readiness::Ready));

comms_c.send(ToGame::Join).await;
check_response!(comms_c, FromGame::JoinError(JoinError::GameNotOpened));

comms_a.send(ToGame::Initialized).await;
// The other player is not yet initialized -> no message should be received.
assert!(timeout(Duration::from_secs(1), comms_a.recv::<FromGame>())
.await
.is_err());
assert!(timeout(Duration::from_secs(1), comms_b.recv::<FromGame>())
.await
.is_err());
comms_a.send(ToGame::Readiness(Readiness::Prepared)).await;
// The other player is not yet prepared -> no message should be received.
assert!(
timeout(Duration::from_millis(500), comms_a.recv::<FromGame>())
.await
.is_err()
);
assert!(
timeout(Duration::from_millis(500), comms_b.recv::<FromGame>())
.await
.is_err()
);

comms_b.send(ToGame::Readiness(Readiness::Prepared)).await;

comms_b.send(ToGame::Initialized).await;
check_response!(comms_a, FromGame::Started);
check_response!(comms_b, FromGame::Started);
check_response!(comms_a, FromGame::GameReadiness(Readiness::Prepared));
check_response!(comms_b, FromGame::GameReadiness(Readiness::Prepared));

comms_d.send(ToGame::Join).await;
check_response!(comms_d, FromGame::JoinError(JoinError::GameNotOpened));

comms_a
.send(ToGame::Readiness(Readiness::Initialized))
.await;
// The other player is not yet initialized -> no message should be received.
assert!(
timeout(Duration::from_millis(500), comms_a.recv::<FromGame>())
.await
.is_err()
);
assert!(
timeout(Duration::from_millis(500), comms_b.recv::<FromGame>())
.await
.is_err()
);

comms_b
.send(ToGame::Readiness(Readiness::Initialized))
.await;

check_response!(comms_a, FromGame::GameReadiness(Readiness::Initialized));
check_response!(comms_b, FromGame::GameReadiness(Readiness::Initialized));

assert!(comms_a.errors.is_empty());
assert!(comms_b.errors.is_empty());
assert!(comms_c.errors.is_empty());
Expand Down
9 changes: 2 additions & 7 deletions crates/multiplayer/src/game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,8 @@ fn process_from_game(
FromGame::PeerLeft(id) => {
info!("Peer {id} left.");
}
FromGame::Starting => {
info!("Multiplayer game is starting.");
}
FromGame::Started => {
fatals.send(FatalErrorEvent::new(
"Multiplayer game is unexpectedly fully started.",
));
FromGame::GameReadiness(readiness) => {
info!("Game readiness changed to: {readiness:?}");
}
}
}
Expand Down
Loading

0 comments on commit b038cc3

Please sign in to comment.