Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

de_net: Make package IDs separate for each connection #699

Merged
merged 1 commit into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions crates/connector/src/game/greceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_std::{
task,
};
use de_messages::{FromGame, JoinError, Readiness, ToGame};
use de_net::{OutPackage, Peers, Targets};
use de_net::{OutPackage, Peers};
use tracing::{error, info, warn};

use super::state::{GameState, JoinError as JoinErrorInner};
Expand Down Expand Up @@ -274,18 +274,17 @@ impl GameProcessor {
where
E: bincode::Encode,
{
if let Some(targets) = self.state.targets(exclude).await {
self.send(message, targets).await;
for target in self.state.targets(exclude).await {
self.send(message, target).await;
}
}

/// Send message to some targets.
async fn send<E, T>(&self, message: &E, targets: T)
async fn send<E>(&self, message: &E, target: SocketAddr)
where
E: bincode::Encode,
T: Into<Targets<'static>>,
{
let message = OutPackage::encode_single(message, true, Peers::Server, targets).unwrap();
let message = OutPackage::encode_single(message, true, Peers::Server, target).unwrap();
let _ = self.outputs.send(message).await;
}
}
28 changes: 13 additions & 15 deletions crates/connector/src/game/preceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(super) async fn run(
) {
info!("Starting game player package handler on port {port}...");

loop {
'main: loop {
if packages.is_closed() {
break;
}
Expand Down Expand Up @@ -66,20 +66,18 @@ pub(super) async fn run(
continue;
}

let Some(targets) = state.targets(Some(package.source)).await else {
continue;
};

let result = outputs
.send(OutPackage::new(
package.data,
package.reliable,
Peers::Players,
targets,
))
.await;
if result.is_err() {
break;
for target in state.targets(Some(package.source)).await {
let result = outputs
.send(OutPackage::new(
package.data.clone(),
package.reliable,
Peers::Players,
target,
))
.await;
if result.is_err() {
break 'main;
}
}
}

Expand Down
45 changes: 12 additions & 33 deletions crates/connector/src/game/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{collections::hash_map::Entry, net::SocketAddr};
use ahash::AHashMap;
use async_std::sync::{Arc, RwLock};
use de_messages::Readiness;
use de_net::Targets;
use thiserror::Error;

#[derive(Clone)]
Expand Down Expand Up @@ -53,13 +52,12 @@ impl GameState {
}

/// Constructs and returns package targets which includes all or all but
/// one players connected to the game. It returns None if there is no
/// matching target.
/// one players connected to the game.
///
/// # Arguments
///
/// * `exclude` - if not None, this player is included among the targets.
pub(super) async fn targets(&self, exclude: Option<SocketAddr>) -> Option<Targets<'static>> {
pub(super) async fn targets(&self, exclude: Option<SocketAddr>) -> Vec<SocketAddr> {
self.inner.read().await.targets(exclude)
}
}
Expand Down Expand Up @@ -158,32 +156,14 @@ impl GameStateInner {
Ok(progressed)
}

fn targets(&self, exclude: Option<SocketAddr>) -> Option<Targets<'static>> {
let len = if exclude.map_or(false, |e| self.players.contains_key(&e)) {
self.players.len() - 1
} else {
self.players.len()
};

if len == 0 {
None
} else if len == 1 {
for &addr in self.players.keys() {
if Some(addr) != exclude {
return Some(Targets::Single(addr));
}
fn targets(&self, exclude: Option<SocketAddr>) -> Vec<SocketAddr> {
let mut addrs = Vec::with_capacity(self.players.len());
for &addr in self.players.keys() {
if Some(addr) != exclude {
addrs.push(addr);
}

unreachable!("No non-excluded player found.");
} else {
let mut addrs = Vec::with_capacity(len);
for &addr in self.players.keys() {
if Some(addr) != exclude {
addrs.push(addr);
}
}
Some(addrs.into())
}
addrs
}
}

Expand Down Expand Up @@ -377,21 +357,21 @@ mod tests {
fn test_targets() {
let mut state = GameStateInner::new(8);

assert!(state.targets(None).is_none());
assert!(state.targets(None).is_empty());

state.add("127.0.0.1:2001".parse().unwrap()).unwrap();
assert_eq!(
HashSet::<SocketAddr>::from_iter(state.targets(None).unwrap().into_iter()),
HashSet::<SocketAddr>::from_iter(state.targets(None).into_iter()),
HashSet::from_iter(["127.0.0.1:2001".parse().unwrap()])
);
assert!(state
.targets(Some("127.0.0.1:2001".parse().unwrap()))
.is_none());
.is_empty());

state.add("127.0.0.1:2002".parse().unwrap()).unwrap();
state.add("127.0.0.1:2003".parse().unwrap()).unwrap();
assert_eq!(
HashSet::<SocketAddr>::from_iter(state.targets(None).unwrap().into_iter()),
HashSet::<SocketAddr>::from_iter(state.targets(None).into_iter()),
HashSet::from_iter([
"127.0.0.1:2001".parse().unwrap(),
"127.0.0.1:2002".parse().unwrap(),
Expand All @@ -402,7 +382,6 @@ mod tests {
HashSet::<SocketAddr>::from_iter(
state
.targets(Some("127.0.0.1:2002".parse().unwrap()))
.unwrap()
.into_iter()
),
HashSet::from_iter([
Expand Down
19 changes: 18 additions & 1 deletion crates/net/src/connection/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use async_std::{
use self::resends::{RescheduleResult, Resends, START_BACKOFF_MS};
use super::book::{Connection, ConnectionBook};
use crate::{
header::{DatagramHeader, PackageId, Peers},
header::{DatagramHeader, PackageId, PackageIdRange},
tasks::OutDatagram,
Peers,
};

mod resends;
Expand All @@ -29,6 +30,16 @@ impl DispatchHandler {
}
}

/// Returns ID to be given to a to-be-send package.
///
/// It is assumed that this is called exactly once before each reliably
/// send package and that all generated IDs are used.
pub(crate) async fn next_package_id(&mut self, time: Instant, addr: SocketAddr) -> PackageId {
let mut book = self.book.lock().await;
let handler = book.update(time, addr, ConnDispatchHandler::new);
handler.next_package_id()
}

pub(crate) async fn sent(
&mut self,
time: Instant,
Expand Down Expand Up @@ -125,14 +136,20 @@ pub(crate) struct ResendResult {

struct ConnDispatchHandler {
resends: Resends,
package_ids: PackageIdRange,
}

impl ConnDispatchHandler {
fn new() -> Self {
Self {
resends: Resends::new(),
package_ids: PackageIdRange::counter(),
}
}

fn next_package_id(&mut self) -> PackageId {
self.package_ids.next().unwrap()
}
}

impl Connection for ConnDispatchHandler {
Expand Down
2 changes: 1 addition & 1 deletion crates/net/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub use header::Peers;
pub use protocol::{Targets, MAX_PACKAGE_SIZE};
pub use protocol::MAX_PACKAGE_SIZE;
pub use socket::{RecvError, SendError, Socket, MAX_DATAGRAM_SIZE};
pub use tasks::{
startup, ConnErrorReceiver, ConnectionError, InPackage, MessageDecoder, OutPackage,
Expand Down
94 changes: 7 additions & 87 deletions crates/net/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{borrow::Cow, net::SocketAddr};
use std::net::SocketAddr;

use async_std::sync::Arc;
use futures::future::try_join_all;
use thiserror::Error;
use tracing::{error, trace};

Expand Down Expand Up @@ -39,34 +38,22 @@ impl ProtocolSocket {
///
/// * `data` - datagram payload.
///
/// * `targets` - recipients of the datagram.
pub(crate) async fn send<'a, T>(
&'a self,
/// * `target` - recipient of the datagram.
pub(crate) async fn send(
&self,
buf: &mut [u8],
header: DatagramHeader,
data: &[u8],
targets: T,
) -> Result<(), SendError>
where
T: Into<Targets<'a>>,
{
target: SocketAddr,
) -> Result<(), SendError> {
let len = HEADER_SIZE + data.len();
assert!(buf.len() >= len);
let buf = &mut buf[..len];
buf[HEADER_SIZE..len].copy_from_slice(data);

trace!("Going to send datagram {}", header);
header.write(buf);

match targets.into() {
Targets::Single(target) => {
self.socket.send(target, buf).await?;
}
Targets::Many(targets) => {
try_join_all(targets.iter().map(|&target| self.socket.send(target, buf))).await?;
}
}

self.socket.send(target, buf).await?;
Ok(())
}

Expand Down Expand Up @@ -98,73 +85,6 @@ impl ProtocolSocket {
}
}

#[derive(Clone)]
pub enum Targets<'a> {
Single(SocketAddr),
Many(Cow<'a, [SocketAddr]>),
}

impl<'a> From<SocketAddr> for Targets<'a> {
fn from(addr: SocketAddr) -> Self {
Self::Single(addr)
}
}

impl<'a> From<&'a [SocketAddr]> for Targets<'a> {
fn from(addrs: &'a [SocketAddr]) -> Self {
Self::Many(addrs.into())
}
}

impl<'a> From<Vec<SocketAddr>> for Targets<'a> {
fn from(addrs: Vec<SocketAddr>) -> Self {
Self::Many(addrs.into())
}
}

impl<'a> IntoIterator for &Targets<'a> {
type Item = SocketAddr;
type IntoIter = TargetsIter<'a>;

fn into_iter(self) -> Self::IntoIter {
TargetsIter {
targets: self.clone(),
offset: 0,
}
}
}

pub struct TargetsIter<'a> {
targets: Targets<'a>,
offset: usize,
}

impl<'a> Iterator for TargetsIter<'a> {
type Item = SocketAddr;

fn next(&mut self) -> Option<Self::Item> {
match self.targets {
Targets::Single(single) => {
if self.offset > 0 {
None
} else {
self.offset += 1;
Some(single)
}
}
Targets::Many(ref many) => {
if self.offset >= many.len() {
None
} else {
let addr = many[self.offset];
self.offset += 1;
Some(addr)
}
}
}
}
}

#[derive(Error, Debug)]
pub(crate) enum MsgRecvError {
#[error(transparent)]
Expand Down
Loading