Skip to content

Commit

Permalink
de_net: Add message ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
Indy2222 committed Aug 19, 2023
1 parent 82f6358 commit 04eff4c
Show file tree
Hide file tree
Showing 16 changed files with 348 additions and 184 deletions.
8 changes: 6 additions & 2 deletions crates/connector/src/game/ereceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use async_std::{channel::Sender, future::timeout};
use de_messages::ToGame;
use de_net::ConnErrorReceiver;
use de_net::{ConnErrorReceiver, Reliability};
use tracing::{error, info, warn};

use super::greceiver::ToGameMessage;
Expand All @@ -26,7 +26,11 @@ pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender<ToG

warn!("In game connection lost with {:?}", error.target());
let _ = server
.send(ToGameMessage::new(error.target(), true, ToGame::Leave))
.send(ToGameMessage::new(
error.target(),
Reliability::Unordered,
ToGame::Leave,
))
.await;
}

Expand Down
82 changes: 58 additions & 24 deletions crates/connector/src/game/greceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_std::{
task,
};
use de_messages::{FromGame, JoinError, Readiness, ToGame};
use de_net::{OutPackage, Peers, Targets};
use de_net::{OutPackage, Peers, Reliability, Targets};
use tracing::{error, info, warn};

use super::state::{GameState, JoinError as JoinErrorInner};
Expand All @@ -17,17 +17,20 @@ pub(super) struct ToGameMessage {
}

impl ToGameMessage {
pub(super) fn new(source: SocketAddr, reliable: bool, message: ToGame) -> Self {
pub(super) fn new(source: SocketAddr, reliability: Reliability, message: ToGame) -> Self {
Self {
meta: MessageMeta { source, reliable },
meta: MessageMeta {
source,
reliability,
},
message,
}
}
}

struct MessageMeta {
source: SocketAddr,
reliable: bool,
reliability: Reliability,
}

pub(super) struct GameProcessor {
Expand Down Expand Up @@ -142,7 +145,7 @@ impl GameProcessor {
.send(
OutPackage::encode_single(
&FromGame::NotJoined,
message.meta.reliable,
message.meta.reliability,
Peers::Server,
message.meta.source,
)
Expand All @@ -159,7 +162,7 @@ impl GameProcessor {
.send(
OutPackage::encode_single(
&FromGame::Pong(id),
meta.reliable,
meta.reliability,
Peers::Server,
meta.source,
)
Expand All @@ -172,8 +175,12 @@ impl GameProcessor {
async fn process_join(&mut self, meta: MessageMeta) {
if let Err(err) = self.clients.reserve(meta.source).await {
warn!("Join request error: {err}");
self.send(&FromGame::JoinError(JoinError::DifferentGame), meta.source)
.await;
self.send(
&FromGame::JoinError(JoinError::DifferentGame),
Reliability::Unordered,
meta.source,
)
.await;
return;
}

Expand All @@ -191,17 +198,25 @@ impl GameProcessor {
meta.source, self.port
);

self.send(&FromGame::JoinError(JoinError::AlreadyJoined), meta.source)
.await;
self.send(
&FromGame::JoinError(JoinError::AlreadyJoined),
Reliability::Unordered,
meta.source,
)
.await;
}
JoinErrorInner::GameFull => {
warn!(
"Player {:?} could not join game on port {} because the game is full.",
meta.source, self.port
);

self.send(&FromGame::JoinError(JoinError::GameFull), meta.source)
.await;
self.send(
&FromGame::JoinError(JoinError::GameFull),
Reliability::Unordered,
meta.source,
)
.await;
}
JoinErrorInner::GameNotOpened => {
warn!(
Expand All @@ -210,8 +225,12 @@ impl GameProcessor {
meta.source, self.port
);

self.send(&FromGame::JoinError(JoinError::GameNotOpened), meta.source)
.await;
self.send(
&FromGame::JoinError(JoinError::GameNotOpened),
Reliability::Unordered,
meta.source,
)
.await;
}
}
}
Expand All @@ -224,8 +243,14 @@ impl GameProcessor {
"Player {id} on {addr:?} just joined game on port {}.",
self.port
);
self.send(&FromGame::Joined(id), addr).await;
self.send_all(&FromGame::PeerJoined(id), Some(addr)).await;
self.send(&FromGame::Joined(id), Reliability::SemiOrdered, addr)
.await;
self.send_all(
&FromGame::PeerJoined(id),
Reliability::SemiOrdered,
Some(addr),
)
.await;
Ok(())
}

Expand All @@ -243,16 +268,22 @@ impl GameProcessor {
meta.source, self.port
);

self.send(&FromGame::Left, meta.source).await;
self.send_all(&FromGame::PeerLeft(id), None).await;
self.send(&FromGame::Left, Reliability::SemiOrdered, meta.source)
.await;
self.send_all(&FromGame::PeerLeft(id), Reliability::SemiOrdered, None)
.await;
}

async fn process_readiness(&mut self, meta: MessageMeta, readiness: Readiness) {
match self.state.update_readiness(meta.source, readiness).await {
Ok(progressed) => {
if progressed {
self.send_all(&FromGame::GameReadiness(readiness), None)
.await;
self.send_all(
&FromGame::GameReadiness(readiness),
Reliability::SemiOrdered,
None,
)
.await;
}
}
Err(err) => warn!(
Expand All @@ -268,24 +299,27 @@ impl GameProcessor {
///
/// * `message` - message to be sent.
///
/// * `reliability` - reliability mode for the message.
///
/// * `exclude` - if not None, the message will be delivered to all but
/// this player.
async fn send_all<E>(&self, message: &E, exclude: Option<SocketAddr>)
async fn send_all<E>(&self, message: &E, reliability: Reliability, exclude: Option<SocketAddr>)
where
E: bincode::Encode,
{
if let Some(targets) = self.state.targets(exclude).await {
self.send(message, targets).await;
self.send(message, reliability, targets).await;
}
}

/// Send message to some targets.
async fn send<E, T>(&self, message: &E, targets: T)
async fn send<E, T>(&self, message: &E, reliability: Reliability, targets: T)
where
E: bincode::Encode,
T: Into<Targets<'static>>,
{
let message = OutPackage::encode_single(message, true, Peers::Server, targets).unwrap();
let message =
OutPackage::encode_single(message, reliability, Peers::Server, targets).unwrap();
let _ = self.outputs.send(message).await;
}
}
4 changes: 2 additions & 2 deletions crates/connector/src/game/mreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(super) async fn run(
let result = server
.send(ToGameMessage::new(
package.source(),
package.reliable(),
package.reliability(),
message,
))
.await;
Expand All @@ -60,7 +60,7 @@ pub(super) async fn run(
Peers::Players => {
let _ = players
.send(PlayersPackage::new(
package.reliable(),
package.reliability(),
package.source(),
package.data(),
))
Expand Down
12 changes: 6 additions & 6 deletions crates/connector/src/game/preceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@ use std::net::SocketAddr;

use async_std::channel::Receiver;
use de_messages::FromGame;
use de_net::{OutPackage, PackageSender, Peers};
use de_net::{OutPackage, PackageSender, Peers, Reliability};
use tracing::{error, info, warn};

use super::state::GameState;

/// A package destined to other players in the game.
pub(super) struct PlayersPackage {
reliable: bool,
reliability: Reliability,
source: SocketAddr,
data: Vec<u8>,
}

impl PlayersPackage {
pub(super) fn new(reliable: bool, source: SocketAddr, data: Vec<u8>) -> Self {
pub(super) fn new(reliability: Reliability, source: SocketAddr, data: Vec<u8>) -> Self {
Self {
reliable,
reliability,
source,
data,
}
Expand Down Expand Up @@ -56,7 +56,7 @@ pub(super) async fn run(
.send(
OutPackage::encode_single(
&FromGame::NotJoined,
package.reliable,
package.reliability,
Peers::Server,
package.source,
)
Expand All @@ -73,7 +73,7 @@ pub(super) async fn run(
let result = outputs
.send(OutPackage::new(
package.data,
package.reliable,
package.reliability,
Peers::Players,
targets,
))
Expand Down
9 changes: 7 additions & 2 deletions crates/connector/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::net::SocketAddr;
use anyhow::Context;
use async_std::task;
use de_messages::{FromServer, GameOpenError, ToServer};
use de_net::{self, MessageDecoder, OutPackage, PackageReceiver, PackageSender, Peers, Socket};
use de_net::{
self, MessageDecoder, OutPackage, PackageReceiver, PackageSender, Peers, Reliability, Socket,
};
use tracing::{error, info, warn};

use crate::{clients::Clients, game};
Expand Down Expand Up @@ -102,7 +104,10 @@ impl MainServer {

async fn reply(&mut self, message: &FromServer, target: SocketAddr) -> anyhow::Result<()> {
self.outputs
.send(OutPackage::encode_single(message, true, Peers::Server, target).unwrap())
.send(
OutPackage::encode_single(message, Reliability::Unordered, Peers::Server, target)
.unwrap(),
)
.await
.context("Failed to send a reply")
}
Expand Down
8 changes: 6 additions & 2 deletions crates/connector/tests/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::{

use async_std::{future::timeout, task};
use de_messages::{FromGame, FromServer, JoinError, Readiness, ToGame, ToServer};
use de_net::{self, ConnErrorReceiver, OutPackage, PackageReceiver, PackageSender, Peers, Socket};
use de_net::{
self, ConnErrorReceiver, OutPackage, PackageReceiver, PackageSender, Peers, Reliability, Socket,
};
use ntest::timeout;

use crate::common::{spawn_and_wait, term_and_wait};
Expand Down Expand Up @@ -161,7 +163,9 @@ impl Comms {
E: bincode::Encode,
{
let addr = SocketAddr::new(self.host, self.port);
let package = OutPackage::encode_single(&message, true, Peers::Server, addr).unwrap();
let package =
OutPackage::encode_single(&message, Reliability::SemiOrdered, Peers::Server, addr)
.unwrap();
self.sender.send(package).await.unwrap();
}

Expand Down
29 changes: 21 additions & 8 deletions crates/multiplayer/src/game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
use bevy::prelude::*;
use de_core::{player::Player, schedule::PreMovement};
use de_messages::{FromGame, FromServer, GameOpenError, JoinError, Readiness, ToGame, ToServer};
use de_net::Reliability;

use crate::{
config::ConnectionType,
Expand Down Expand Up @@ -102,7 +103,7 @@ impl From<Readiness> for SetReadinessEvent {
fn open_or_join(
conf: Res<NetGameConfRes>,
mut main_server: EventWriter<ToMainServerEvent>,
mut game_server: EventWriter<ToGameServerEvent<true>>,
mut game_server: EventWriter<ToGameServerEvent>,
) {
match conf.connection_type() {
ConnectionType::CreateGame { max_players, .. } => {
Expand All @@ -116,7 +117,10 @@ fn open_or_join(
}
ConnectionType::JoinGame(_) => {
info!("Sending a join-game request.");
game_server.send(ToGame::Join.into());
game_server.send(ToGameServerEvent::new(
Reliability::SemiOrdered,
ToGame::Join,
));
}
}
}
Expand All @@ -125,7 +129,7 @@ fn process_from_server(
conf: Res<NetGameConfRes>,
mut ports: ResMut<Ports>,
mut events: EventReader<FromMainServerEvent>,
mut outputs: EventWriter<ToGameServerEvent<true>>,
mut outputs: EventWriter<ToGameServerEvent>,
mut opened: EventWriter<GameOpenedEvent>,
mut fatals: EventWriter<FatalErrorEvent>,
) {
Expand All @@ -138,7 +142,10 @@ fn process_from_server(
Ok(_) => {
info!("Game on port {} opened.", *port);
// Send something to open NAT.
outputs.send(ToGame::Ping(u32::MAX).into());
outputs.send(ToGameServerEvent::new(
Reliability::Unordered,
ToGame::Ping(u32::MAX),
));
opened.send(GameOpenedEvent(SocketAddr::new(conf.server_host(), *port)));
}
Err(err) => {
Expand Down Expand Up @@ -232,18 +239,24 @@ fn process_from_game(

fn set_readiness(
mut readiness_events: EventReader<SetReadinessEvent>,
mut message_events: EventWriter<ToGameServerEvent<true>>,
mut message_events: EventWriter<ToGameServerEvent>,
) {
let Some(readiness) = readiness_events.iter().last() else {
return;
};

message_events.send(ToGameServerEvent::from(ToGame::Readiness(readiness.0)));
message_events.send(ToGameServerEvent::new(
Reliability::SemiOrdered,
ToGame::Readiness(readiness.0),
));
}

fn leave(mut server: EventWriter<ToGameServerEvent<true>>) {
fn leave(mut server: EventWriter<ToGameServerEvent>) {
info!("Sending leave game message.");
// Send this even if not yet joined because the join / open-game request
// might already be processed.
server.send(ToGame::Leave.into());
server.send(ToGameServerEvent::new(
Reliability::SemiOrdered,
ToGame::Leave,
));
}
Loading

0 comments on commit 04eff4c

Please sign in to comment.