Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

de_connector: Impl game starting #677

Merged
merged 4 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions crates/connector/src/game/greceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ impl GameProcessor {
ToGame::Leave => {
self.process_leave(message.meta).await;
}
ToGame::Start => {
self.process_start().await;
}
ToGame::Initialized => {
self.process_initialized(message.meta).await;
}
}

if self.state.is_empty().await {
Expand Down Expand Up @@ -199,6 +205,16 @@ impl GameProcessor {
self.send(&FromGame::JoinError(JoinError::GameFull), meta.source)
.await;
}
JoinErrorInner::GameNotOpened => {
warn!(
"Player {:?} could not join game on port {} because the game is no \
longer opened.",
meta.source, self.port
);

self.send(&FromGame::JoinError(JoinError::GameNotOpened), meta.source)
.await;
}
}
}
}
Expand Down Expand Up @@ -233,6 +249,18 @@ impl GameProcessor {
self.send_all(&FromGame::PeerLeft(id), None).await;
}

async fn process_start(&mut self) {
if self.state.start().await {
self.send_all(&FromGame::Starting, None).await;
}
}

async fn process_initialized(&mut self, meta: MessageMeta) {
if self.state.mark_initialized(meta.source).await {
self.send_all(&FromGame::Started, None).await;
}
}

/// Send a reliable message to all players of the game.
///
/// # Arguments
Expand Down
85 changes: 83 additions & 2 deletions crates/connector/src/game/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ impl GameState {
self.inner.write().await.remove(addr)
}

/// If the game is in state `Open`, changes its state to `Starting` and
/// returns true.
pub(super) async fn start(&mut self) -> bool {
self.inner.write().await.start()
}

/// Marks a player as initialized. Returns true if the game was just
/// started.
pub(super) async fn mark_initialized(&mut self, addr: SocketAddr) -> bool {
self.inner.write().await.mark_initialized(addr)
}

/// Constructs and returns package targets which includes all or all but
/// one players connected to the game. It returns None if there is no
/// matching target.
Expand All @@ -52,13 +64,15 @@ impl GameState {

struct GameStateInner {
available_ids: AvailableIds,
phase: GamePhase,
players: AHashMap<SocketAddr, Player>,
}

impl GameStateInner {
fn new(max_players: u8) -> Self {
Self {
available_ids: AvailableIds::new(max_players),
phase: GamePhase::Open,
players: AHashMap::new(),
}
}
Expand All @@ -72,11 +86,15 @@ impl GameStateInner {
}

fn add(&mut self, addr: SocketAddr) -> Result<u8, JoinError> {
if self.phase != GamePhase::Open {
return Err(JoinError::GameNotOpened);
}

match self.players.entry(addr) {
Entry::Occupied(_) => Err(JoinError::AlreadyJoined),
Entry::Vacant(vacant) => match self.available_ids.lease() {
Some(id) => {
vacant.insert(Player { id });
vacant.insert(Player::new(id));
Ok(id)
}
None => Err(JoinError::GameFull),
Expand All @@ -94,6 +112,30 @@ impl GameStateInner {
}
}

fn start(&mut self) -> bool {
if self.phase == GamePhase::Open {
self.phase = GamePhase::Starting;
true
} else {
false
}
}

fn mark_initialized(&mut self, addr: SocketAddr) -> bool {
let prev = self.phase;

if matches!(self.phase, GamePhase::Starting) {
if let Some(player) = self.players.get_mut(&addr) {
player.initialized = true;
}
if self.players.values().all(|p| p.initialized) {
self.phase = GamePhase::Started;
}
}

self.phase == GamePhase::Started && self.phase != prev
}

fn targets(&self, exclude: Option<SocketAddr>) -> Option<Targets<'static>> {
let len = if exclude.map_or(false, |e| self.players.contains_key(&e)) {
self.players.len() - 1
Expand Down Expand Up @@ -153,16 +195,35 @@ impl AvailableIds {
}
}

#[derive(Debug, Error)]
#[derive(Debug, Error, PartialEq)]
pub(super) enum JoinError {
#[error("The player has already joined the game.")]
AlreadyJoined,
#[error("The game is full.")]
GameFull,
#[error("The game is no longer opened.")]
GameNotOpened,
}

struct Player {
id: u8,
initialized: bool,
}

impl Player {
fn new(id: u8) -> Self {
Self {
id,
initialized: false,
}
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) enum GamePhase {
Open,
Starting,
Started,
}

#[cfg(test)]
Expand Down Expand Up @@ -221,6 +282,26 @@ mod tests {
}));
}

#[test]
fn test_transitions() {
let client_a: SocketAddr = "127.0.0.1:8081".parse().unwrap();
let client_b: SocketAddr = "127.0.0.1:8082".parse().unwrap();
let client_c: SocketAddr = "127.0.0.1:8083".parse().unwrap();

let mut state = GameStateInner::new(3);

state.add(client_a).unwrap();
state.add(client_b).unwrap();

assert!(state.start());
assert!(!state.start());

assert_eq!(state.add(client_c), Err(JoinError::GameNotOpened));

assert!(!state.mark_initialized(client_b));
assert!(state.mark_initialized(client_a));
}

#[test]
fn test_targets() {
let mut state = GameStateInner::new(8);
Expand Down
141 changes: 141 additions & 0 deletions crates/connector/tests/commands.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};

use async_std::{future::timeout, task};
use de_net::{
self, ConnErrorReceiver, FromGame, FromServer, JoinError, OutPackage, PackageReceiver,
PackageSender, Peers, Socket, ToGame, ToServer,
};
use ntest::timeout;

use crate::common::{spawn_and_wait, term_and_wait};

mod common;

macro_rules! check_response {
($comms:expr, $expect:pat) => {
let mut response = $comms.recv().await;
if response.len() != 1 {
panic!("Unexpected number of messages: {response:?}");
}

let response = response.pop().unwrap();
match response {
$expect => (),
_ => panic!("Unexpected response: {response:?}"),
}
};
}

#[test]
#[timeout(10_000)]
fn test() {
let child = spawn_and_wait();

task::block_on(task::spawn(async {
let mut comms_a = Comms::init().await;
let mut comms_b = Comms::init().await;
let mut comms_c = Comms::init().await;
let mut comms_d = Comms::init().await;

comms_a.send(ToServer::OpenGame { max_players: 3 }).await;
let mut response = comms_a.recv::<FromServer>().await;
assert_eq!(response.len(), 1);
let response = response.pop().unwrap();
let game_port = match response {
FromServer::GameOpened { port } => port,
_ => panic!("Unexpected message: {response:?}"),
};

comms_a.port = game_port;
comms_b.port = game_port;
comms_c.port = game_port;
comms_d.port = game_port;

check_response!(comms_a, FromGame::Joined(1));

comms_b.send(ToGame::Join).await;
check_response!(comms_b, FromGame::Joined(2));
check_response!(comms_a, FromGame::PeerJoined(2));

comms_a.send(ToGame::Start).await;
check_response!(comms_a, FromGame::Starting);
check_response!(comms_b, FromGame::Starting);

comms_c.send(ToGame::Join).await;
check_response!(comms_c, FromGame::JoinError(JoinError::GameNotOpened));

comms_a.send(ToGame::Initialized).await;
// The other player is not yet initialized -> no message should be received.
assert!(timeout(Duration::from_secs(1), comms_a.recv::<FromGame>())
.await
.is_err());
assert!(timeout(Duration::from_secs(1), comms_b.recv::<FromGame>())
.await
.is_err());

comms_b.send(ToGame::Initialized).await;
check_response!(comms_a, FromGame::Started);
check_response!(comms_b, FromGame::Started);

comms_d.send(ToGame::Join).await;
check_response!(comms_d, FromGame::JoinError(JoinError::GameNotOpened));

assert!(comms_a.errors.is_empty());
assert!(comms_b.errors.is_empty());
assert!(comms_c.errors.is_empty());
}));

term_and_wait(child);
}

struct Comms {
host: IpAddr,
port: u16,
sender: PackageSender,
receiver: PackageReceiver,
errors: ConnErrorReceiver,
}

impl Comms {
async fn init() -> Self {
let socket = Socket::bind(None).await.unwrap();
let (sender, receiver, errors) = de_net::startup(
|t| {
task::spawn(t);
},
socket,
);

Self {
host: IpAddr::V4(Ipv4Addr::LOCALHOST),
port: 8082,
sender,
receiver,
errors,
}
}

async fn send<E>(&self, message: E)
where
E: bincode::Encode,
{
let addr = SocketAddr::new(self.host, self.port);
let package = OutPackage::encode_single(&message, true, Peers::Server, addr).unwrap();
self.sender.send(package).await.unwrap();
}

async fn recv<P>(&self) -> Vec<P>
where
P: bincode::Decode,
{
let package = self.receiver.recv().await.unwrap();
let mut messages = Vec::new();
for message in package.decode::<P>() {
messages.push(message.unwrap());
}
messages
}
}
13 changes: 13 additions & 0 deletions crates/multiplayer/src/game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ fn process_from_game(
JoinError::GameFull => {
fatals.send(FatalErrorEvent::new("Game is full, cannot join."));
}
JoinError::GameNotOpened => {
fatals.send(FatalErrorEvent::new(
"Game is no longer opened, cannot join.",
));
}
JoinError::AlreadyJoined => {
fatals.send(FatalErrorEvent::new(
"Already joined the game, cannot re-join.",
Expand All @@ -184,6 +189,14 @@ fn process_from_game(
FromGame::PeerLeft(id) => {
info!("Peer {id} left.");
}
FromGame::Starting => {
info!("Multiplayer game is starting.");
}
FromGame::Started => {
fatals.send(FatalErrorEvent::new(
"Multiplayer game is unexpectedly fully started.",
));
}
}
}
}
Expand Down
Loading