Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Indy2222 committed Aug 23, 2023
1 parent 2e0b1fe commit 8d6c607
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 197 deletions.
11 changes: 5 additions & 6 deletions crates/connector/src/game/greceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_std::{
task,
};
use de_messages::{FromGame, JoinError, Readiness, ToGame};
use de_net::{OutPackage, Peers, Targets};
use de_net::{OutPackage, Peers};
use tracing::{error, info, warn};

use super::state::{GameState, JoinError as JoinErrorInner};
Expand Down Expand Up @@ -274,18 +274,17 @@ impl GameProcessor {
where
E: bincode::Encode,
{
if let Some(targets) = self.state.targets(exclude).await {
self.send(message, targets).await;
for target in self.state.targets(exclude).await {
self.send(message, target).await;
}
}

/// Send message to some targets.
async fn send<E, T>(&self, message: &E, targets: T)
async fn send<E>(&self, message: &E, target: SocketAddr)
where
E: bincode::Encode,
T: Into<Targets<'static>>,
{
let message = OutPackage::encode_single(message, true, Peers::Server, targets).unwrap();
let message = OutPackage::encode_single(message, true, Peers::Server, target).unwrap();
let _ = self.outputs.send(message).await;
}
}
28 changes: 13 additions & 15 deletions crates/connector/src/game/preceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(super) async fn run(
) {
info!("Starting game player package handler on port {port}...");

loop {
'main: loop {
if packages.is_closed() {
break;
}
Expand Down Expand Up @@ -66,20 +66,18 @@ pub(super) async fn run(
continue;
}

let Some(targets) = state.targets(Some(package.source)).await else {
continue;
};

let result = outputs
.send(OutPackage::new(
package.data,
package.reliable,
Peers::Players,
targets,
))
.await;
if result.is_err() {
break;
for target in state.targets(Some(package.source)).await {
let result = outputs
.send(OutPackage::new(
package.data.clone(),
package.reliable,
Peers::Players,
target,
))
.await;
if result.is_err() {
break 'main;
}
}
}

Expand Down
45 changes: 12 additions & 33 deletions crates/connector/src/game/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{collections::hash_map::Entry, net::SocketAddr};
use ahash::AHashMap;
use async_std::sync::{Arc, RwLock};
use de_messages::Readiness;
use de_net::Targets;
use thiserror::Error;

#[derive(Clone)]
Expand Down Expand Up @@ -53,13 +52,12 @@ impl GameState {
}

/// Constructs and returns package targets which includes all or all but
/// one players connected to the game. It returns None if there is no
/// matching target.
/// one players connected to the game.
///
/// # Arguments
///
/// * `exclude` - if not None, this player is included among the targets.
pub(super) async fn targets(&self, exclude: Option<SocketAddr>) -> Option<Targets<'static>> {
pub(super) async fn targets(&self, exclude: Option<SocketAddr>) -> Vec<SocketAddr> {
self.inner.read().await.targets(exclude)
}
}
Expand Down Expand Up @@ -158,32 +156,14 @@ impl GameStateInner {
Ok(progressed)
}

fn targets(&self, exclude: Option<SocketAddr>) -> Option<Targets<'static>> {
let len = if exclude.map_or(false, |e| self.players.contains_key(&e)) {
self.players.len() - 1
} else {
self.players.len()
};

if len == 0 {
None
} else if len == 1 {
for &addr in self.players.keys() {
if Some(addr) != exclude {
return Some(Targets::Single(addr));
}
fn targets(&self, exclude: Option<SocketAddr>) -> Vec<SocketAddr> {
let mut addrs = Vec::with_capacity(self.players.len());
for &addr in self.players.keys() {
if Some(addr) != exclude {
addrs.push(addr);
}

unreachable!("No non-excluded player found.");
} else {
let mut addrs = Vec::with_capacity(len);
for &addr in self.players.keys() {
if Some(addr) != exclude {
addrs.push(addr);
}
}
Some(addrs.into())
}
addrs
}
}

Expand Down Expand Up @@ -377,21 +357,21 @@ mod tests {
fn test_targets() {
let mut state = GameStateInner::new(8);

assert!(state.targets(None).is_none());
assert!(state.targets(None).is_empty());

state.add("127.0.0.1:2001".parse().unwrap()).unwrap();
assert_eq!(
HashSet::<SocketAddr>::from_iter(state.targets(None).unwrap().into_iter()),
HashSet::<SocketAddr>::from_iter(state.targets(None).into_iter()),
HashSet::from_iter(["127.0.0.1:2001".parse().unwrap()])
);
assert!(state
.targets(Some("127.0.0.1:2001".parse().unwrap()))
.is_none());
.is_empty());

state.add("127.0.0.1:2002".parse().unwrap()).unwrap();
state.add("127.0.0.1:2003".parse().unwrap()).unwrap();
assert_eq!(
HashSet::<SocketAddr>::from_iter(state.targets(None).unwrap().into_iter()),
HashSet::<SocketAddr>::from_iter(state.targets(None).into_iter()),
HashSet::from_iter([
"127.0.0.1:2001".parse().unwrap(),
"127.0.0.1:2002".parse().unwrap(),
Expand All @@ -402,7 +382,6 @@ mod tests {
HashSet::<SocketAddr>::from_iter(
state
.targets(Some("127.0.0.1:2002".parse().unwrap()))
.unwrap()
.into_iter()
),
HashSet::from_iter([
Expand Down
19 changes: 18 additions & 1 deletion crates/net/src/connection/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use async_std::{
use self::resends::{RescheduleResult, Resends, START_BACKOFF_MS};
use super::book::{Connection, ConnectionBook};
use crate::{
header::{DatagramHeader, PackageId, Peers},
header::{DatagramHeader, PackageId, PackageIdRange},
tasks::OutDatagram,
Peers,
};

mod resends;
Expand All @@ -29,6 +30,16 @@ impl DispatchHandler {
}
}

/// Returns ID to be given to a to-be-send package.
///
/// It is assumed that this is called exactly once before each reliably
/// send package and that all generated IDs are used.
pub(crate) async fn next_package_id(&mut self, time: Instant, addr: SocketAddr) -> PackageId {
let mut book = self.book.lock().await;
let handler = book.update(time, addr, ConnDispatchHandler::new);
handler.next_package_id()
}

pub(crate) async fn sent(
&mut self,
time: Instant,
Expand Down Expand Up @@ -125,14 +136,20 @@ pub(crate) struct ResendResult {

struct ConnDispatchHandler {
resends: Resends,
package_ids: PackageIdRange,
}

impl ConnDispatchHandler {
fn new() -> Self {
Self {
resends: Resends::new(),
package_ids: PackageIdRange::counter(),
}
}

fn next_package_id(&mut self) -> PackageId {
self.package_ids.next().unwrap()
}
}

impl Connection for ConnDispatchHandler {
Expand Down
2 changes: 1 addition & 1 deletion crates/net/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub use header::Peers;
pub use protocol::{Targets, MAX_PACKAGE_SIZE};
pub use protocol::MAX_PACKAGE_SIZE;
pub use socket::{RecvError, SendError, Socket, MAX_DATAGRAM_SIZE};
pub use tasks::{
startup, ConnErrorReceiver, ConnectionError, InPackage, MessageDecoder, OutPackage,
Expand Down
94 changes: 7 additions & 87 deletions crates/net/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{borrow::Cow, net::SocketAddr};
use std::net::SocketAddr;

use async_std::sync::Arc;
use futures::future::try_join_all;
use thiserror::Error;
use tracing::{error, trace};

Expand Down Expand Up @@ -39,34 +38,22 @@ impl ProtocolSocket {
///
/// * `data` - datagram payload.
///
/// * `targets` - recipients of the datagram.
pub(crate) async fn send<'a, T>(
&'a self,
/// * `target` - recipient of the datagram.
pub(crate) async fn send(
&self,
buf: &mut [u8],
header: DatagramHeader,
data: &[u8],
targets: T,
) -> Result<(), SendError>
where
T: Into<Targets<'a>>,
{
target: SocketAddr,
) -> Result<(), SendError> {
let len = HEADER_SIZE + data.len();
assert!(buf.len() >= len);
let buf = &mut buf[..len];
buf[HEADER_SIZE..len].copy_from_slice(data);

trace!("Going to send datagram {}", header);
header.write(buf);

match targets.into() {
Targets::Single(target) => {
self.socket.send(target, buf).await?;
}
Targets::Many(targets) => {
try_join_all(targets.iter().map(|&target| self.socket.send(target, buf))).await?;
}
}

self.socket.send(target, buf).await?;
Ok(())
}

Expand Down Expand Up @@ -98,73 +85,6 @@ impl ProtocolSocket {
}
}

#[derive(Clone)]
pub enum Targets<'a> {
Single(SocketAddr),
Many(Cow<'a, [SocketAddr]>),
}

impl<'a> From<SocketAddr> for Targets<'a> {
fn from(addr: SocketAddr) -> Self {
Self::Single(addr)
}
}

impl<'a> From<&'a [SocketAddr]> for Targets<'a> {
fn from(addrs: &'a [SocketAddr]) -> Self {
Self::Many(addrs.into())
}
}

impl<'a> From<Vec<SocketAddr>> for Targets<'a> {
fn from(addrs: Vec<SocketAddr>) -> Self {
Self::Many(addrs.into())
}
}

impl<'a> IntoIterator for &Targets<'a> {
type Item = SocketAddr;
type IntoIter = TargetsIter<'a>;

fn into_iter(self) -> Self::IntoIter {
TargetsIter {
targets: self.clone(),
offset: 0,
}
}
}

pub struct TargetsIter<'a> {
targets: Targets<'a>,
offset: usize,
}

impl<'a> Iterator for TargetsIter<'a> {
type Item = SocketAddr;

fn next(&mut self) -> Option<Self::Item> {
match self.targets {
Targets::Single(single) => {
if self.offset > 0 {
None
} else {
self.offset += 1;
Some(single)
}
}
Targets::Many(ref many) => {
if self.offset >= many.len() {
None
} else {
let addr = many[self.offset];
self.offset += 1;
Some(addr)
}
}
}
}
}

#[derive(Error, Debug)]
pub(crate) enum MsgRecvError {
#[error(transparent)]
Expand Down
Loading

0 comments on commit 8d6c607

Please sign in to comment.