Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Indy2222 committed Aug 26, 2023
1 parent 8ef337e commit e2a5a80
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 166 deletions.
34 changes: 23 additions & 11 deletions crates/connector/src/game/mreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ use async_std::{channel::Sender, future::timeout};
use de_net::{PackageReceiver, Peers};
use tracing::{error, info, warn};

use super::greceiver::ToGameMessage;
use crate::game::preceiver::PlayersPackage;
use super::{greceiver::ToGameMessage, preceiver::ToPlayersMessage};

pub(super) async fn run(
port: u16,
packages: PackageReceiver,
server: Sender<ToGameMessage>,
players: Sender<PlayersPackage>,
players: Sender<ToPlayersMessage>,
) {
info!("Starting game server input processor on port {port}...");

Expand Down Expand Up @@ -51,20 +50,33 @@ pub(super) async fn run(
}
}
Err(err) => {
warn!("Received invalid package: {err:?}");
warn!("Received invalid game package: {err:?}");
break;
}
}
}
}
Peers::Players => {
let _ = players
.send(PlayersPackage::new(
package.reliability(),
package.source(),
package.data(),
))
.await;
for message_result in package.decode() {
match message_result {
Ok(message) => {
let result = players
.send(ToPlayersMessage::new(
package.reliability(),
package.source(),
message,
))
.await;
if result.is_err() {
break;
}
}
Err(err) => {
warn!("Received invalid players package: {err:?}");
break;
}
}
}
}
}
}
Expand Down
51 changes: 28 additions & 23 deletions crates/connector/src/game/preceiver.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
use std::net::SocketAddr;

use async_std::channel::Receiver;
use de_messages::FromGame;
use de_net::{OutPackage, PackageSender, Peers, Reliability};
use de_messages::{FromGame, FromPlayers, ToPlayers};
use de_net::{OutPackage, PackageBuilder, PackageSender, PeerPackageBuilders, Peers, Reliability};
use tracing::{error, info, warn};

use super::state::GameState;

/// A package destined to other players in the game.
pub(super) struct PlayersPackage {
pub(super) struct ToPlayersMessage {
reliability: Reliability,
source: SocketAddr,
data: Vec<u8>,
message: ToPlayers,
}

impl PlayersPackage {
pub(super) fn new(reliability: Reliability, source: SocketAddr, data: Vec<u8>) -> Self {
impl ToPlayersMessage {
pub(super) fn new(reliability: Reliability, source: SocketAddr, message: ToPlayers) -> Self {
Self {
reliability,
source,
data,
message,
}
}
}

pub(super) async fn run(
port: u16,
packages: Receiver<PlayersPackage>,
messages: Receiver<ToPlayersMessage>,
outputs: PackageSender,
state: GameState,
) {
info!("Starting game player package handler on port {port}...");

let mut builders = PeerPackageBuilders::new(Peers::Players);

'main: loop {
if packages.is_closed() {
if messages.is_closed() {
break;
}

Expand All @@ -42,39 +44,42 @@ pub(super) async fn run(
break;
}

let Ok(package) = packages.recv().await else {
let Ok(message) = messages.recv().await else {
break;
};

if !state.contains(package.source).await {
if !state.contains(message.source).await {
warn!(
"Received a player message from a non-participating client: {:?}.",
package.source
message.source
);

let _ = outputs
.send(
OutPackage::encode_single(
&FromGame::NotJoined,
package.reliability,
message.reliability,
Peers::Server,
package.source,
message.source,
)
.unwrap(),
)
.await;
continue;
}

for target in state.targets(Some(package.source)).await {
let result = outputs
.send(OutPackage::new(
package.data.clone(),
package.reliability,
Peers::Players,
target,
))
.await;
// TODO use real player ID
// TODO handle output
builders
.push(message.reliability, &FromPlayers::new(1, message.message))
.unwrap();

let targets = state.targets(Some(message.source)).await;

// TODO accumulate to builder with a timeout
// TODO split this into multiple tasks
for output in builders.build(message.reliability, targets) {
let result = outputs.send(output).await;
if result.is_err() {
break 'main;
}
Expand Down
1 change: 1 addition & 0 deletions crates/messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! Connector during multiplayer game.

pub use game::{FromGame, JoinError, Readiness, ToGame};
pub use players::{FromPlayers, ToPlayers};
pub use server::{FromServer, GameOpenError, ToServer};

mod game;
Expand Down
28 changes: 15 additions & 13 deletions crates/multiplayer/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{net::SocketAddr, time::Instant};

use bevy::prelude::*;
use bincode::error::EncodeError;
use de_core::schedule::PreMovement;
use de_messages::{FromGame, FromServer, ToGame, ToServer};
use de_net::{InPackage, PackageBuilder, Peers, Reliability};
use de_net::{InPackage, OutPackage, PackageBuilder, Peers, Reliability};

use crate::{
config::ConnectionType,
Expand Down Expand Up @@ -252,6 +253,7 @@ fn cleanup(mut commands: Commands) {
fn message_sender<E>(
conf: Res<NetGameConfRes>,
ports: Res<Ports>,
mut builders: Local<Option<PeerPackageBuilders>>,
mut inputs: EventReader<E>,
mut outputs: EventWriter<SendPackageEvent>,
) where
Expand All @@ -261,23 +263,23 @@ fn message_sender<E>(
warn!("Port not (yet) known.");
return;
};
let addr = SocketAddr::new(conf.server_host(), port);

let mut unreliable = PackageBuilder::new(Reliability::Unreliable, Peers::Server, addr);
let mut unordered = PackageBuilder::new(Reliability::Unordered, Peers::Server, addr);
let mut semi_ordered = PackageBuilder::new(Reliability::SemiOrdered, Peers::Server, addr);
if builders.is_none() {
let addr = SocketAddr::new(conf.server_host(), port);
*builders = Some(PackageBuilders::new(addr));
}

let builders = builders.as_mut().unwrap();
for event in inputs.iter() {
let builder = match event.reliability() {
Reliability::Unreliable => &mut unreliable,
Reliability::Unordered => &mut unordered,
Reliability::SemiOrdered => &mut semi_ordered,
};
builder.push(event.message()).unwrap();
builders.push(event.reliability(), event.message()).unwrap();
}

for builder in [unreliable, unordered, semi_ordered] {
for package in builder.build() {
for mut reliability in [
Reliability::Unreliable,
Reliability::Unordered,
Reliability::SemiOrdered,
] {
for package in builders.build(reliability) {
outputs.send(package.into());
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub use protocol::MAX_PACKAGE_SIZE;
pub use socket::{RecvError, SendError, Socket, MAX_DATAGRAM_SIZE};
pub use tasks::{
startup, ConnErrorReceiver, ConnectionError, InPackage, MessageDecoder, OutPackage,
PackageBuilder, PackageReceiver, PackageSender,
PackageBuilder, PackageReceiver, PackageSender, PeerPackageBuilders,
};

mod connection;
Expand Down
Loading

0 comments on commit e2a5a80

Please sign in to comment.