Skip to content

Commit

Permalink
de_net: Add message ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
Indy2222 committed Aug 25, 2023
1 parent 68e742d commit 96a519a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 39 deletions.
1 change: 0 additions & 1 deletion crates/connector/tests/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ fn test() {
comms_d.port = game_port;

check_response!(comms_a, FromGame::Joined(1));

comms_b.send(ToGame::Join).await;
check_response!(comms_b, FromGame::Joined(2));
check_response!(comms_a, FromGame::PeerJoined(2));
Expand Down
29 changes: 21 additions & 8 deletions crates/multiplayer/src/game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
use bevy::prelude::*;
use de_core::{player::Player, schedule::PreMovement};
use de_messages::{FromGame, FromServer, GameOpenError, JoinError, Readiness, ToGame, ToServer};
use de_net::Reliability;

use crate::{
config::ConnectionType,
Expand Down Expand Up @@ -102,7 +103,7 @@ impl From<Readiness> for SetReadinessEvent {
fn open_or_join(
conf: Res<NetGameConfRes>,
mut main_server: EventWriter<ToMainServerEvent>,
mut game_server: EventWriter<ToGameServerEvent<true>>,
mut game_server: EventWriter<ToGameServerEvent>,
) {
match conf.connection_type() {
ConnectionType::CreateGame { max_players, .. } => {
Expand All @@ -116,7 +117,10 @@ fn open_or_join(
}
ConnectionType::JoinGame(_) => {
info!("Sending a join-game request.");
game_server.send(ToGame::Join.into());
game_server.send(ToGameServerEvent::new(
Reliability::SemiOrdered,
ToGame::Join,
));
}
}
}
Expand All @@ -125,7 +129,7 @@ fn process_from_server(
conf: Res<NetGameConfRes>,
mut ports: ResMut<Ports>,
mut events: EventReader<FromMainServerEvent>,
mut outputs: EventWriter<ToGameServerEvent<true>>,
mut outputs: EventWriter<ToGameServerEvent>,
mut opened: EventWriter<GameOpenedEvent>,
mut fatals: EventWriter<FatalErrorEvent>,
) {
Expand All @@ -138,7 +142,10 @@ fn process_from_server(
Ok(_) => {
info!("Game on port {} opened.", *port);
// Send something to open NAT.
outputs.send(ToGame::Ping(u32::MAX).into());
outputs.send(ToGameServerEvent::new(
Reliability::Unordered,
ToGame::Ping(u32::MAX),
));
opened.send(GameOpenedEvent(SocketAddr::new(conf.server_host(), *port)));
}
Err(err) => {
Expand Down Expand Up @@ -232,18 +239,24 @@ fn process_from_game(

fn set_readiness(
mut readiness_events: EventReader<SetReadinessEvent>,
mut message_events: EventWriter<ToGameServerEvent<true>>,
mut message_events: EventWriter<ToGameServerEvent>,
) {
let Some(readiness) = readiness_events.iter().last() else {
return;
};

message_events.send(ToGameServerEvent::from(ToGame::Readiness(readiness.0)));
message_events.send(ToGameServerEvent::new(
Reliability::SemiOrdered,
ToGame::Readiness(readiness.0),
));
}

fn leave(mut server: EventWriter<ToGameServerEvent<true>>) {
fn leave(mut server: EventWriter<ToGameServerEvent>) {
info!("Sending leave game message.");
// Send this even if not yet joined because the join / open-game request
// might already be processed.
server.send(ToGame::Leave.into());
server.send(ToGameServerEvent::new(
Reliability::SemiOrdered,
ToGame::Leave,
));
}
63 changes: 38 additions & 25 deletions crates/multiplayer/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ pub(crate) struct MessagesPlugin;
impl Plugin for MessagesPlugin {
fn build(&self, app: &mut App) {
app.add_event::<ToMainServerEvent>()
.add_event::<ToGameServerEvent<true>>()
.add_event::<ToGameServerEvent<false>>()
.add_event::<ToGameServerEvent>()
.add_event::<FromMainServerEvent>()
.add_event::<FromGameServerEvent>()
.add_systems(OnEnter(NetState::Connecting), setup)
Expand All @@ -30,12 +29,8 @@ impl Plugin for MessagesPlugin {
.run_if(on_event::<ToMainServerEvent>())
.in_set(MessagesSet::SendMessages)
.before(NetworkSet::SendPackages),
message_sender::<ToGameServerEvent<true>>
.run_if(on_event::<ToGameServerEvent<true>>())
.in_set(MessagesSet::SendMessages)
.before(NetworkSet::SendPackages),
message_sender::<ToGameServerEvent<false>>
.run_if(on_event::<ToGameServerEvent<false>>())
message_sender::<ToGameServerEvent>
.run_if(on_event::<ToGameServerEvent>())
.in_set(MessagesSet::SendMessages)
.before(NetworkSet::SendPackages),
),
Expand All @@ -62,8 +57,8 @@ where
{
type Message: bincode::Encode;
const PORT_TYPE: PortType;
const RELIABLE: bool;

fn reliability(&self) -> Reliability;
fn message(&self) -> &Self::Message;
}

Expand All @@ -79,29 +74,41 @@ impl From<ToServer> for ToMainServerEvent {
impl ToMessage for ToMainServerEvent {
type Message = ToServer;
const PORT_TYPE: PortType = PortType::Main;
const RELIABLE: bool = true;

fn reliability(&self) -> Reliability {
Reliability::Unordered
}

fn message(&self) -> &Self::Message {
&self.0
}
}

#[derive(Event)]
pub(crate) struct ToGameServerEvent<const R: bool>(ToGame);
pub(crate) struct ToGameServerEvent {
reliability: Reliability,
message: ToGame,
}

impl<const R: bool> From<ToGame> for ToGameServerEvent<R> {
fn from(message: ToGame) -> Self {
Self(message)
impl ToGameServerEvent {
pub(crate) fn new(reliability: Reliability, message: ToGame) -> Self {
Self {
reliability,
message,
}
}
}

impl<const R: bool> ToMessage for ToGameServerEvent<R> {
impl ToMessage for ToGameServerEvent {
type Message = ToGame;
const PORT_TYPE: PortType = PortType::Game;
const RELIABLE: bool = R;

fn reliability(&self) -> Reliability {
self.reliability
}

fn message(&self) -> &Self::Message {
&self.0
&self.message
}
}

Expand Down Expand Up @@ -255,18 +262,24 @@ fn message_sender<E>(
return;
};
let addr = SocketAddr::new(conf.server_host(), port);
let reliability = if E::RELIABLE {
Reliability::Unordered
} else {
Reliability::Unreliable
};
let mut builder = PackageBuilder::new(reliability, Peers::Server, addr);

let mut builder_unreliable = PackageBuilder::new(Reliability::Unreliable, Peers::Server, addr);
let mut builder_unordered = PackageBuilder::new(Reliability::Unordered, Peers::Server, addr);
let mut builder_semi = PackageBuilder::new(Reliability::SemiOrdered, Peers::Server, addr);

for event in inputs.iter() {
let builder = match event.reliability() {
Reliability::Unreliable => &mut builder_unreliable,
Reliability::Unordered => &mut builder_unordered,
Reliability::SemiOrdered => &mut builder_semi,
};
builder.push(event.message()).unwrap();
}
for package in builder.build() {
outputs.send(package.into());

for builder in [builder_unreliable, builder_unordered, builder_semi] {
for package in builder.build() {
outputs.send(package.into());
}
}
}

Expand Down
12 changes: 8 additions & 4 deletions crates/multiplayer/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
use bevy::prelude::*;
use de_core::schedule::PreMovement;
use de_messages::{FromGame, ToGame};
use de_net::Reliability;
use tracing::{debug, info, trace};

use crate::{
Expand Down Expand Up @@ -218,20 +219,23 @@ fn ping<const R: bool>(
mut timer: ResMut<PingTimer<R>>,
mut counter: ResMut<Counter>,
mut tracker: ResMut<PingTracker<R>>,
mut messages: EventWriter<ToGameServerEvent<R>>,
mut messages: EventWriter<ToGameServerEvent>,
) {
timer.0.tick(time.delta());

let time = Instant::now();
for _ in 0..timer.0.times_finished_this_tick() {
let id = counter.next();
tracker.register(id, time);
if R {
let reliability = if R {
info!("Sending reliable Ping({id}).",);
Reliability::Unordered
} else {
trace!("Sending unreliable Ping({id}).",);
}
messages.send(ToGame::Ping(id).into());
Reliability::Unreliable
};

messages.send(ToGameServerEvent::new(reliability, ToGame::Ping(id)));
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/net/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use thiserror::Error;

/// Number of bytes (at the beginning of each datagram) used up by the header.
pub(crate) const HEADER_SIZE: usize = 4;

/// This bit is set in protocol control datagrams.
const CONTROL_BIT: u8 = 0b1000_0000;
/// This bit is set on datagrams which are sent to the server instead of other
Expand Down

0 comments on commit 96a519a

Please sign in to comment.