Skip to content

Commit

Permalink
de_net: Add support for message ordering (#701)
Browse files Browse the repository at this point in the history
Relates to #700.
  • Loading branch information
Indy2222 authored Aug 25, 2023
1 parent a9a6ea8 commit 68e742d
Show file tree
Hide file tree
Showing 22 changed files with 1,007 additions and 286 deletions.
8 changes: 6 additions & 2 deletions crates/connector/src/game/ereceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use async_std::{channel::Sender, future::timeout};
use de_messages::ToGame;
use de_net::ConnErrorReceiver;
use de_net::{ConnErrorReceiver, Reliability};
use tracing::{error, info, warn};

use super::greceiver::ToGameMessage;
Expand All @@ -26,7 +26,11 @@ pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender<ToG

warn!("In game connection lost with {:?}", error.target());
let _ = server
.send(ToGameMessage::new(error.target(), true, ToGame::Leave))
.send(ToGameMessage::new(
error.target(),
Reliability::Unordered,
ToGame::Leave,
))
.await;
}

Expand Down
83 changes: 58 additions & 25 deletions crates/connector/src/game/greceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_std::{
task,
};
use de_messages::{FromGame, JoinError, Readiness, ToGame};
use de_net::{OutPackage, Peers};
use de_net::{OutPackage, Peers, Reliability};
use tracing::{error, info, warn};

use super::state::{GameState, JoinError as JoinErrorInner};
Expand All @@ -17,17 +17,20 @@ pub(super) struct ToGameMessage {
}

impl ToGameMessage {
pub(super) fn new(source: SocketAddr, reliable: bool, message: ToGame) -> Self {
pub(super) fn new(source: SocketAddr, reliability: Reliability, message: ToGame) -> Self {
Self {
meta: MessageMeta { source, reliable },
meta: MessageMeta {
source,
reliability,
},
message,
}
}
}

struct MessageMeta {
source: SocketAddr,
reliable: bool,
reliability: Reliability,
}

pub(super) struct GameProcessor {
Expand Down Expand Up @@ -142,7 +145,7 @@ impl GameProcessor {
.send(
OutPackage::encode_single(
&FromGame::NotJoined,
message.meta.reliable,
message.meta.reliability,
Peers::Server,
message.meta.source,
)
Expand All @@ -159,7 +162,7 @@ impl GameProcessor {
.send(
OutPackage::encode_single(
&FromGame::Pong(id),
meta.reliable,
meta.reliability,
Peers::Server,
meta.source,
)
Expand All @@ -172,8 +175,12 @@ impl GameProcessor {
async fn process_join(&mut self, meta: MessageMeta) {
if let Err(err) = self.clients.reserve(meta.source).await {
warn!("Join request error: {err}");
self.send(&FromGame::JoinError(JoinError::DifferentGame), meta.source)
.await;
self.send(
&FromGame::JoinError(JoinError::DifferentGame),
Reliability::Unordered,
meta.source,
)
.await;
return;
}

Expand All @@ -191,17 +198,25 @@ impl GameProcessor {
meta.source, self.port
);

self.send(&FromGame::JoinError(JoinError::AlreadyJoined), meta.source)
.await;
self.send(
&FromGame::JoinError(JoinError::AlreadyJoined),
Reliability::Unordered,
meta.source,
)
.await;
}
JoinErrorInner::GameFull => {
warn!(
"Player {:?} could not join game on port {} because the game is full.",
meta.source, self.port
);

self.send(&FromGame::JoinError(JoinError::GameFull), meta.source)
.await;
self.send(
&FromGame::JoinError(JoinError::GameFull),
Reliability::Unordered,
meta.source,
)
.await;
}
JoinErrorInner::GameNotOpened => {
warn!(
Expand All @@ -210,8 +225,12 @@ impl GameProcessor {
meta.source, self.port
);

self.send(&FromGame::JoinError(JoinError::GameNotOpened), meta.source)
.await;
self.send(
&FromGame::JoinError(JoinError::GameNotOpened),
Reliability::Unordered,
meta.source,
)
.await;
}
}
}
Expand All @@ -224,8 +243,14 @@ impl GameProcessor {
"Player {id} on {addr:?} just joined game on port {}.",
self.port
);
self.send(&FromGame::Joined(id), addr).await;
self.send_all(&FromGame::PeerJoined(id), Some(addr)).await;
self.send(&FromGame::Joined(id), Reliability::SemiOrdered, addr)
.await;
self.send_all(
&FromGame::PeerJoined(id),
Reliability::SemiOrdered,
Some(addr),
)
.await;
Ok(())
}

Expand All @@ -243,16 +268,22 @@ impl GameProcessor {
meta.source, self.port
);

self.send(&FromGame::Left, meta.source).await;
self.send_all(&FromGame::PeerLeft(id), None).await;
self.send(&FromGame::Left, Reliability::SemiOrdered, meta.source)
.await;
self.send_all(&FromGame::PeerLeft(id), Reliability::SemiOrdered, None)
.await;
}

async fn process_readiness(&mut self, meta: MessageMeta, readiness: Readiness) {
match self.state.update_readiness(meta.source, readiness).await {
Ok(progressed) => {
if progressed {
self.send_all(&FromGame::GameReadiness(readiness), None)
.await;
self.send_all(
&FromGame::GameReadiness(readiness),
Reliability::SemiOrdered,
None,
)
.await;
}
}
Err(err) => warn!(
Expand All @@ -268,23 +299,25 @@ impl GameProcessor {
///
/// * `message` - message to be sent.
///
/// * `reliability` - reliability mode for the message.
///
/// * `exclude` - if not None, the message will be delivered to all but
/// this player.
async fn send_all<E>(&self, message: &E, exclude: Option<SocketAddr>)
async fn send_all<E>(&self, message: &E, reliability: Reliability, exclude: Option<SocketAddr>)
where
E: bincode::Encode,
{
for target in self.state.targets(exclude).await {
self.send(message, target).await;
self.send(message, reliability, target).await;
}
}

/// Send message to some targets.
async fn send<E>(&self, message: &E, target: SocketAddr)
async fn send<E>(&self, message: &E, reliability: Reliability, target: SocketAddr)
where
E: bincode::Encode,
{
let message = OutPackage::encode_single(message, true, Peers::Server, target).unwrap();
let message =
OutPackage::encode_single(message, reliability, Peers::Server, target).unwrap();
let _ = self.outputs.send(message).await;
}
}
4 changes: 2 additions & 2 deletions crates/connector/src/game/mreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(super) async fn run(
let result = server
.send(ToGameMessage::new(
package.source(),
package.reliable(),
package.reliability(),
message,
))
.await;
Expand All @@ -60,7 +60,7 @@ pub(super) async fn run(
Peers::Players => {
let _ = players
.send(PlayersPackage::new(
package.reliable(),
package.reliability(),
package.source(),
package.data(),
))
Expand Down
12 changes: 6 additions & 6 deletions crates/connector/src/game/preceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@ use std::net::SocketAddr;

use async_std::channel::Receiver;
use de_messages::FromGame;
use de_net::{OutPackage, PackageSender, Peers};
use de_net::{OutPackage, PackageSender, Peers, Reliability};
use tracing::{error, info, warn};

use super::state::GameState;

/// A package destined to other players in the game.
pub(super) struct PlayersPackage {
reliable: bool,
reliability: Reliability,
source: SocketAddr,
data: Vec<u8>,
}

impl PlayersPackage {
pub(super) fn new(reliable: bool, source: SocketAddr, data: Vec<u8>) -> Self {
pub(super) fn new(reliability: Reliability, source: SocketAddr, data: Vec<u8>) -> Self {
Self {
reliable,
reliability,
source,
data,
}
Expand Down Expand Up @@ -56,7 +56,7 @@ pub(super) async fn run(
.send(
OutPackage::encode_single(
&FromGame::NotJoined,
package.reliable,
package.reliability,
Peers::Server,
package.source,
)
Expand All @@ -70,7 +70,7 @@ pub(super) async fn run(
let result = outputs
.send(OutPackage::new(
package.data.clone(),
package.reliable,
package.reliability,
Peers::Players,
target,
))
Expand Down
9 changes: 7 additions & 2 deletions crates/connector/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::net::SocketAddr;
use anyhow::Context;
use async_std::task;
use de_messages::{FromServer, GameOpenError, ToServer};
use de_net::{self, MessageDecoder, OutPackage, PackageReceiver, PackageSender, Peers, Socket};
use de_net::{
self, MessageDecoder, OutPackage, PackageReceiver, PackageSender, Peers, Reliability, Socket,
};
use tracing::{error, info, warn};

use crate::{clients::Clients, game};
Expand Down Expand Up @@ -102,7 +104,10 @@ impl MainServer {

async fn reply(&mut self, message: &FromServer, target: SocketAddr) -> anyhow::Result<()> {
self.outputs
.send(OutPackage::encode_single(message, true, Peers::Server, target).unwrap())
.send(
OutPackage::encode_single(message, Reliability::Unordered, Peers::Server, target)
.unwrap(),
)
.await
.context("Failed to send a reply")
}
Expand Down
8 changes: 6 additions & 2 deletions crates/connector/tests/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::{

use async_std::{future::timeout, task};
use de_messages::{FromGame, FromServer, JoinError, Readiness, ToGame, ToServer};
use de_net::{self, ConnErrorReceiver, OutPackage, PackageReceiver, PackageSender, Peers, Socket};
use de_net::{
self, ConnErrorReceiver, OutPackage, PackageReceiver, PackageSender, Peers, Reliability, Socket,
};
use ntest::timeout;

use crate::common::{spawn_and_wait, term_and_wait};
Expand Down Expand Up @@ -161,7 +163,9 @@ impl Comms {
E: bincode::Encode,
{
let addr = SocketAddr::new(self.host, self.port);
let package = OutPackage::encode_single(&message, true, Peers::Server, addr).unwrap();
let package =
OutPackage::encode_single(&message, Reliability::SemiOrdered, Peers::Server, addr)
.unwrap();
self.sender.send(package).await.unwrap();
}

Expand Down
Loading

0 comments on commit 68e742d

Please sign in to comment.