Skip to content

Commit

Permalink
Impl FromPlayers & ToPlayers messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Indy2222 committed Sep 9, 2023
1 parent 0202976 commit ace5067
Show file tree
Hide file tree
Showing 15 changed files with 481 additions and 164 deletions.
88 changes: 88 additions & 0 deletions crates/connector/src/game/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::{
net::SocketAddr,
time::{Duration, Instant},
};

use bincode::error::EncodeError;
use de_net::{OutPackage, PackageBuilder, PackageIterator, Peers, Reliability};

const UNRELIABLE_TIMEOUT: Duration = Duration::from_millis(10);
const RELIABLE_TIMEOUT: Duration = Duration::from_millis(50);

/// Buffers of player messages and package builder.
pub(super) struct PlayerBuffer {
unreliable: PackageBuilder,
unordered: PackageBuilder,
semi_ordered: PackageBuilder,
}

impl PlayerBuffer {
pub(super) fn new(target: SocketAddr) -> Self {
Self {
unreliable: PackageBuilder::new(Reliability::Unreliable, Peers::Players, target),
unordered: PackageBuilder::new(Reliability::Unordered, Peers::Players, target),
semi_ordered: PackageBuilder::new(Reliability::SemiOrdered, Peers::Players, target),
}
}

/// Pushes a single message to an appropriate buffer.
pub(super) fn push<E>(
&mut self,
reliability: Reliability,
message: &E,
) -> Result<(), EncodeError>
where
E: bincode::Encode,
{
self.builder_mut(reliability).push(message)
}

/// Builds packages from old enough messages and removes the packages from
/// the buffer.
///
/// # Arguments
///
/// * `time` - current time.
pub(super) fn build(&mut self, time: Instant) -> PlayerPackageIterator<'_> {
let unreliable_threshodl = time - UNRELIABLE_TIMEOUT;
let reliable_threshodl = time - RELIABLE_TIMEOUT;

PlayerPackageIterator {
index: 0,
iterators: [
self.unreliable.build_old(unreliable_threshodl),
self.unordered.build_old(reliable_threshodl),
self.semi_ordered.build_old(reliable_threshodl),
],
}
}

fn builder_mut(&mut self, reliability: Reliability) -> &mut PackageBuilder {
match reliability {
Reliability::Unreliable => &mut self.unreliable,
Reliability::Unordered => &mut self.unordered,
Reliability::SemiOrdered => &mut self.semi_ordered,
}
}
}

pub(super) struct PlayerPackageIterator<'a> {
index: usize,
iterators: [PackageIterator<'a>; 3],
}

impl<'a> Iterator for PlayerPackageIterator<'a> {
type Item = OutPackage;

fn next(&mut self) -> Option<Self::Item> {
while self.index < self.iterators.len() {
let item = self.iterators[self.index].next();
if item.is_some() {
return item;
}
self.index += 1;
}

None
}
}
6 changes: 3 additions & 3 deletions crates/connector/src/game/ereceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use de_messages::ToGame;
use de_net::{ConnErrorReceiver, Reliability};
use tracing::{error, info, warn};

use super::greceiver::ToGameMessage;
use super::message::InMessage;

pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender<ToGameMessage>) {
pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender<InMessage<ToGame>>) {
info!("Starting game connection error handler on port {port}...");

loop {
Expand All @@ -26,7 +26,7 @@ pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender<ToG

warn!("In game connection lost with {:?}", error.target());
let _ = server
.send(ToGameMessage::new(
.send(InMessage::new(
error.target(),
Reliability::Unordered,
ToGame::Leave,
Expand Down
53 changes: 17 additions & 36 deletions crates/connector/src/game/greceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,16 @@ use de_messages::{FromGame, JoinError, Readiness, ToGame};
use de_net::{OutPackage, Peers, Reliability};
use tracing::{error, info, warn};

use super::state::{GameState, JoinError as JoinErrorInner};
use super::{
message::{InMessage, MessageMeta},
state::{GameState, JoinError as JoinErrorInner},
};
use crate::clients::Clients;

pub(super) struct ToGameMessage {
meta: MessageMeta,
message: ToGame,
}

impl ToGameMessage {
pub(super) fn new(source: SocketAddr, reliability: Reliability, message: ToGame) -> Self {
Self {
meta: MessageMeta {
source,
reliability,
},
message,
}
}
}

struct MessageMeta {
source: SocketAddr,
reliability: Reliability,
}

pub(super) struct GameProcessor {
port: u16,
owner: SocketAddr,
messages: Receiver<ToGameMessage>,
messages: Receiver<InMessage<ToGame>>,
outputs: Sender<OutPackage>,
state: GameState,
clients: Clients,
Expand All @@ -46,7 +27,7 @@ impl GameProcessor {
pub(super) fn new(
port: u16,
owner: SocketAddr,
messages: Receiver<ToGameMessage>,
messages: Receiver<InMessage<ToGame>>,
outputs: Sender<OutPackage>,
state: GameState,
clients: Clients,
Expand Down Expand Up @@ -93,18 +74,18 @@ impl GameProcessor {
continue;
}

match message.message {
match message.message() {
ToGame::Ping(id) => {
self.process_ping(message.meta, id).await;
self.process_ping(message.meta(), *id).await;
}
ToGame::Join => {
self.process_join(message.meta).await;
self.process_join(message.meta()).await;
}
ToGame::Leave => {
self.process_leave(message.meta).await;
self.process_leave(message.meta()).await;
}
ToGame::Readiness(readiness) => {
self.process_readiness(message.meta, readiness).await;
self.process_readiness(message.meta(), *readiness).await;
}
}

Expand All @@ -122,8 +103,8 @@ impl GameProcessor {

/// Returns true if the massage should be ignored and further handles such
/// messages.
async fn handle_ignore(&self, message: &ToGameMessage) -> bool {
if matches!(message.message, ToGame::Join | ToGame::Leave) {
async fn handle_ignore(&self, message: &InMessage<ToGame>) -> bool {
if matches!(message.message(), ToGame::Join | ToGame::Leave) {
// Join must be excluded from the condition because of the
// chicken and egg problem.
//
Expand All @@ -132,22 +113,22 @@ impl GameProcessor {
return false;
}

if self.state.contains(message.meta.source).await {
if self.state.contains(message.meta().source).await {
return false;
}

warn!(
"Received a game message from a non-participating client: {:?}.",
message.meta.source
message.meta().source
);
let _ = self
.outputs
.send(
OutPackage::encode_single(
&FromGame::NotJoined,
message.meta.reliability,
message.meta().reliability,
Peers::Server,
message.meta.source,
message.meta().source,
)
.unwrap(),
)
Expand Down
34 changes: 34 additions & 0 deletions crates/connector/src/game/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::net::SocketAddr;

use de_net::Reliability;

pub(super) struct InMessage<M> {
meta: MessageMeta,
message: M,
}

impl<M> InMessage<M> {
pub(super) fn new(source: SocketAddr, reliability: Reliability, message: M) -> Self {
Self {
meta: MessageMeta {
source,
reliability,
},
message,
}
}

pub(super) fn meta(&self) -> MessageMeta {
self.meta
}

pub(super) fn message(&self) -> &M {
&self.message
}
}

#[derive(Clone, Copy)]
pub(super) struct MessageMeta {
pub(super) source: SocketAddr,
pub(super) reliability: Reliability,
}
2 changes: 2 additions & 0 deletions crates/connector/src/game/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use de_net::{self, Socket};
use self::{greceiver::GameProcessor, state::GameState};
use crate::clients::Clients;

mod buffer;
mod ereceiver;
mod greceiver;
mod message;
mod mreceiver;
mod preceiver;
mod state;
Expand Down
85 changes: 51 additions & 34 deletions crates/connector/src/game/mreceiver.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::time::Duration;

use async_std::{channel::Sender, future::timeout};
use de_net::{PackageReceiver, Peers};
use bincode::error::DecodeError;
use de_messages::{ToGame, ToPlayers};
use de_net::{InPackage, PackageReceiver, Peers};
use thiserror::Error;
use tracing::{error, info, warn};

use super::greceiver::ToGameMessage;
use crate::game::preceiver::PlayersPackage;
use super::message::InMessage;

pub(super) async fn run(
port: u16,
packages: PackageReceiver,
server: Sender<ToGameMessage>,
players: Sender<PlayersPackage>,
server: Sender<InMessage<ToGame>>,
players: Sender<InMessage<ToPlayers>>,
) {
info!("Starting game server input processor on port {port}...");

Expand All @@ -34,40 +36,55 @@ pub(super) async fn run(
break;
};

match package.peers() {
Peers::Server => {
for message_result in package.decode() {
match message_result {
Ok(message) => {
let result = server
.send(ToGameMessage::new(
package.source(),
package.reliability(),
message,
))
.await;
if result.is_err() {
break;
}
}
Err(err) => {
warn!("Received invalid package: {err:?}");
break;
}
let peers = package.peers();
let result = match peers {
Peers::Server => handle_package(package, &server).await,
Peers::Players => handle_package(package, &players).await,
};

if let Err(err) = result {
match err {
PackageHandleError::Decode(err) => {
warn!("Received invalid package: {err:?}");
}
PackageHandleError::SendError => {
if peers == Peers::Server {
break;
}
}
}
Peers::Players => {
let _ = players
.send(PlayersPackage::new(
package.reliability(),
package.source(),
package.data(),
))
.await;
}
}
}

info!("Game server input processor on port {port} finished.");
}

async fn handle_package<M>(
package: InPackage,
output: &Sender<InMessage<M>>,
) -> Result<(), PackageHandleError>
where
M: bincode::Decode,
{
for message_result in package.decode() {
let message = message_result.map_err(PackageHandleError::from)?;
output
.send(InMessage::new(
package.source(),
package.reliability(),
message,
))
.await
.map_err(|_| PackageHandleError::SendError)?;
}

Ok(())
}

#[derive(Debug, Error)]
enum PackageHandleError {
#[error("Decoding error: {0}")]
Decode(#[from] DecodeError),
#[error("Sending to output channel failed.")]
SendError,
}
Loading

0 comments on commit ace5067

Please sign in to comment.