From 8d6c60714f30f38a186e381055ce4cf78789bf47 Mon Sep 17 00:00:00 2001 From: Martin Indra Date: Wed, 23 Aug 2023 21:58:43 +0200 Subject: [PATCH] WIP --- crates/connector/src/game/greceiver.rs | 11 ++- crates/connector/src/game/preceiver.rs | 28 +++---- crates/connector/src/game/state.rs | 45 +++-------- crates/net/src/connection/dispatch/mod.rs | 19 ++++- crates/net/src/lib.rs | 2 +- crates/net/src/protocol.rs | 94 ++-------------------- crates/net/src/tasks/communicator.rs | 38 +++------ crates/net/src/tasks/dsender.rs | 20 ++--- crates/net/src/tasks/usender.rs | 28 +++---- docs/src/multiplayer/connector/protocol.md | 3 +- 10 files changed, 91 insertions(+), 197 deletions(-) diff --git a/crates/connector/src/game/greceiver.rs b/crates/connector/src/game/greceiver.rs index 6fa49456..93e64332 100644 --- a/crates/connector/src/game/greceiver.rs +++ b/crates/connector/src/game/greceiver.rs @@ -5,7 +5,7 @@ use async_std::{ task, }; use de_messages::{FromGame, JoinError, Readiness, ToGame}; -use de_net::{OutPackage, Peers, Targets}; +use de_net::{OutPackage, Peers}; use tracing::{error, info, warn}; use super::state::{GameState, JoinError as JoinErrorInner}; @@ -274,18 +274,17 @@ impl GameProcessor { where E: bincode::Encode, { - if let Some(targets) = self.state.targets(exclude).await { - self.send(message, targets).await; + for target in self.state.targets(exclude).await { + self.send(message, target).await; } } /// Send message to some targets. - async fn send(&self, message: &E, targets: T) + async fn send(&self, message: &E, target: SocketAddr) where E: bincode::Encode, - T: Into>, { - let message = OutPackage::encode_single(message, true, Peers::Server, targets).unwrap(); + let message = OutPackage::encode_single(message, true, Peers::Server, target).unwrap(); let _ = self.outputs.send(message).await; } } diff --git a/crates/connector/src/game/preceiver.rs b/crates/connector/src/game/preceiver.rs index 62adf99f..80a09093 100644 --- a/crates/connector/src/game/preceiver.rs +++ b/crates/connector/src/game/preceiver.rs @@ -32,7 +32,7 @@ pub(super) async fn run( ) { info!("Starting game player package handler on port {port}..."); - loop { + 'main: loop { if packages.is_closed() { break; } @@ -66,20 +66,18 @@ pub(super) async fn run( continue; } - let Some(targets) = state.targets(Some(package.source)).await else { - continue; - }; - - let result = outputs - .send(OutPackage::new( - package.data, - package.reliable, - Peers::Players, - targets, - )) - .await; - if result.is_err() { - break; + for target in state.targets(Some(package.source)).await { + let result = outputs + .send(OutPackage::new( + package.data.clone(), + package.reliable, + Peers::Players, + target, + )) + .await; + if result.is_err() { + break 'main; + } } } diff --git a/crates/connector/src/game/state.rs b/crates/connector/src/game/state.rs index dd063f1e..344634bd 100644 --- a/crates/connector/src/game/state.rs +++ b/crates/connector/src/game/state.rs @@ -3,7 +3,6 @@ use std::{collections::hash_map::Entry, net::SocketAddr}; use ahash::AHashMap; use async_std::sync::{Arc, RwLock}; use de_messages::Readiness; -use de_net::Targets; use thiserror::Error; #[derive(Clone)] @@ -53,13 +52,12 @@ impl GameState { } /// Constructs and returns package targets which includes all or all but - /// one players connected to the game. It returns None if there is no - /// matching target. + /// one players connected to the game. /// /// # Arguments /// /// * `exclude` - if not None, this player is included among the targets. - pub(super) async fn targets(&self, exclude: Option) -> Option> { + pub(super) async fn targets(&self, exclude: Option) -> Vec { self.inner.read().await.targets(exclude) } } @@ -158,32 +156,14 @@ impl GameStateInner { Ok(progressed) } - fn targets(&self, exclude: Option) -> Option> { - let len = if exclude.map_or(false, |e| self.players.contains_key(&e)) { - self.players.len() - 1 - } else { - self.players.len() - }; - - if len == 0 { - None - } else if len == 1 { - for &addr in self.players.keys() { - if Some(addr) != exclude { - return Some(Targets::Single(addr)); - } + fn targets(&self, exclude: Option) -> Vec { + let mut addrs = Vec::with_capacity(self.players.len()); + for &addr in self.players.keys() { + if Some(addr) != exclude { + addrs.push(addr); } - - unreachable!("No non-excluded player found."); - } else { - let mut addrs = Vec::with_capacity(len); - for &addr in self.players.keys() { - if Some(addr) != exclude { - addrs.push(addr); - } - } - Some(addrs.into()) } + addrs } } @@ -377,21 +357,21 @@ mod tests { fn test_targets() { let mut state = GameStateInner::new(8); - assert!(state.targets(None).is_none()); + assert!(state.targets(None).is_empty()); state.add("127.0.0.1:2001".parse().unwrap()).unwrap(); assert_eq!( - HashSet::::from_iter(state.targets(None).unwrap().into_iter()), + HashSet::::from_iter(state.targets(None).into_iter()), HashSet::from_iter(["127.0.0.1:2001".parse().unwrap()]) ); assert!(state .targets(Some("127.0.0.1:2001".parse().unwrap())) - .is_none()); + .is_empty()); state.add("127.0.0.1:2002".parse().unwrap()).unwrap(); state.add("127.0.0.1:2003".parse().unwrap()).unwrap(); assert_eq!( - HashSet::::from_iter(state.targets(None).unwrap().into_iter()), + HashSet::::from_iter(state.targets(None).into_iter()), HashSet::from_iter([ "127.0.0.1:2001".parse().unwrap(), "127.0.0.1:2002".parse().unwrap(), @@ -402,7 +382,6 @@ mod tests { HashSet::::from_iter( state .targets(Some("127.0.0.1:2002".parse().unwrap())) - .unwrap() .into_iter() ), HashSet::from_iter([ diff --git a/crates/net/src/connection/dispatch/mod.rs b/crates/net/src/connection/dispatch/mod.rs index 0d455348..8fcf58e9 100644 --- a/crates/net/src/connection/dispatch/mod.rs +++ b/crates/net/src/connection/dispatch/mod.rs @@ -11,8 +11,9 @@ use async_std::{ use self::resends::{RescheduleResult, Resends, START_BACKOFF_MS}; use super::book::{Connection, ConnectionBook}; use crate::{ - header::{DatagramHeader, PackageId, Peers}, + header::{DatagramHeader, PackageId, PackageIdRange}, tasks::OutDatagram, + Peers, }; mod resends; @@ -29,6 +30,16 @@ impl DispatchHandler { } } + /// Returns ID to be given to a to-be-send package. + /// + /// It is assumed that this is called exactly once before each reliably + /// send package and that all generated IDs are used. + pub(crate) async fn next_package_id(&mut self, time: Instant, addr: SocketAddr) -> PackageId { + let mut book = self.book.lock().await; + let handler = book.update(time, addr, ConnDispatchHandler::new); + handler.next_package_id() + } + pub(crate) async fn sent( &mut self, time: Instant, @@ -125,14 +136,20 @@ pub(crate) struct ResendResult { struct ConnDispatchHandler { resends: Resends, + package_ids: PackageIdRange, } impl ConnDispatchHandler { fn new() -> Self { Self { resends: Resends::new(), + package_ids: PackageIdRange::counter(), } } + + fn next_package_id(&mut self) -> PackageId { + self.package_ids.next().unwrap() + } } impl Connection for ConnDispatchHandler { diff --git a/crates/net/src/lib.rs b/crates/net/src/lib.rs index 66e9ac10..fb34e7d2 100644 --- a/crates/net/src/lib.rs +++ b/crates/net/src/lib.rs @@ -1,5 +1,5 @@ pub use header::Peers; -pub use protocol::{Targets, MAX_PACKAGE_SIZE}; +pub use protocol::MAX_PACKAGE_SIZE; pub use socket::{RecvError, SendError, Socket, MAX_DATAGRAM_SIZE}; pub use tasks::{ startup, ConnErrorReceiver, ConnectionError, InPackage, MessageDecoder, OutPackage, diff --git a/crates/net/src/protocol.rs b/crates/net/src/protocol.rs index 21f32c02..8d9c7698 100644 --- a/crates/net/src/protocol.rs +++ b/crates/net/src/protocol.rs @@ -1,7 +1,6 @@ -use std::{borrow::Cow, net::SocketAddr}; +use std::net::SocketAddr; use async_std::sync::Arc; -use futures::future::try_join_all; use thiserror::Error; use tracing::{error, trace}; @@ -39,17 +38,14 @@ impl ProtocolSocket { /// /// * `data` - datagram payload. /// - /// * `targets` - recipients of the datagram. - pub(crate) async fn send<'a, T>( - &'a self, + /// * `target` - recipient of the datagram. + pub(crate) async fn send( + &self, buf: &mut [u8], header: DatagramHeader, data: &[u8], - targets: T, - ) -> Result<(), SendError> - where - T: Into>, - { + target: SocketAddr, + ) -> Result<(), SendError> { let len = HEADER_SIZE + data.len(); assert!(buf.len() >= len); let buf = &mut buf[..len]; @@ -57,16 +53,7 @@ impl ProtocolSocket { trace!("Going to send datagram {}", header); header.write(buf); - - match targets.into() { - Targets::Single(target) => { - self.socket.send(target, buf).await?; - } - Targets::Many(targets) => { - try_join_all(targets.iter().map(|&target| self.socket.send(target, buf))).await?; - } - } - + self.socket.send(target, buf).await?; Ok(()) } @@ -98,73 +85,6 @@ impl ProtocolSocket { } } -#[derive(Clone)] -pub enum Targets<'a> { - Single(SocketAddr), - Many(Cow<'a, [SocketAddr]>), -} - -impl<'a> From for Targets<'a> { - fn from(addr: SocketAddr) -> Self { - Self::Single(addr) - } -} - -impl<'a> From<&'a [SocketAddr]> for Targets<'a> { - fn from(addrs: &'a [SocketAddr]) -> Self { - Self::Many(addrs.into()) - } -} - -impl<'a> From> for Targets<'a> { - fn from(addrs: Vec) -> Self { - Self::Many(addrs.into()) - } -} - -impl<'a> IntoIterator for &Targets<'a> { - type Item = SocketAddr; - type IntoIter = TargetsIter<'a>; - - fn into_iter(self) -> Self::IntoIter { - TargetsIter { - targets: self.clone(), - offset: 0, - } - } -} - -pub struct TargetsIter<'a> { - targets: Targets<'a>, - offset: usize, -} - -impl<'a> Iterator for TargetsIter<'a> { - type Item = SocketAddr; - - fn next(&mut self) -> Option { - match self.targets { - Targets::Single(single) => { - if self.offset > 0 { - None - } else { - self.offset += 1; - Some(single) - } - } - Targets::Many(ref many) => { - if self.offset >= many.len() { - None - } else { - let addr = many[self.offset]; - self.offset += 1; - Some(addr) - } - } - } - } -} - #[derive(Error, Debug)] pub(crate) enum MsgRecvError { #[error(transparent)] diff --git a/crates/net/src/tasks/communicator.rs b/crates/net/src/tasks/communicator.rs index ddb25257..afa3d002 100644 --- a/crates/net/src/tasks/communicator.rs +++ b/crates/net/src/tasks/communicator.rs @@ -7,10 +7,7 @@ use bincode::{ error::{DecodeError, EncodeError}, }; -use crate::{ - header::Peers, - protocol::{Targets, MAX_PACKAGE_SIZE}, -}; +use crate::{header::Peers, protocol::MAX_PACKAGE_SIZE}; const BINCODE_CONF: Configuration> = bincode::config::standard() @@ -22,21 +19,18 @@ const BINCODE_CONF: Configuration> = pub struct PackageBuilder { reliable: bool, peers: Peers, - targets: Targets<'static>, + target: SocketAddr, buffer: Vec, used: usize, packages: Vec, } impl PackageBuilder { - pub fn new(reliable: bool, peers: Peers, targets: T) -> Self - where - T: Into>, - { + pub fn new(reliable: bool, peers: Peers, target: SocketAddr) -> Self { Self { reliable, peers, - targets: targets.into(), + target, buffer: vec![0; MAX_PACKAGE_SIZE], used: 0, packages: Vec::new(), @@ -52,8 +46,7 @@ impl PackageBuilder { if self.used > 0 { self.buffer.truncate(self.used); - let package = - OutPackage::new(self.buffer, self.reliable, self.peers, self.targets.clone()); + let package = OutPackage::new(self.buffer, self.reliable, self.peers, self.target); packages.push(package); } @@ -73,8 +66,7 @@ impl PackageBuilder { data.truncate(self.used); self.used = 0; - let package = - OutPackage::new(data, self.reliable, self.peers, self.targets.clone()); + let package = OutPackage::new(data, self.reliable, self.peers, self.target); self.packages.push(package); self.push_inner(message) @@ -99,25 +91,24 @@ pub struct OutPackage { pub(super) data: Vec, reliable: bool, peers: Peers, - pub(super) targets: Targets<'static>, + pub(super) target: SocketAddr, } impl OutPackage { /// Creates a package from a single message. /// /// See also [`Self::new`]. - pub fn encode_single( + pub fn encode_single( message: &E, reliable: bool, peers: Peers, - targets: T, + target: SocketAddr, ) -> Result where E: bincode::Encode, - T: Into>, { let data = encode_to_vec(message, BINCODE_CONF)?; - Ok(Self::new(data, reliable, peers, targets)) + Ok(Self::new(data, reliable, peers, target)) } /// # Arguments @@ -126,21 +117,18 @@ impl OutPackage { /// /// * `reliable` - whether to deliver the data reliably. /// - /// * `targets` - package recipients. + /// * `target` - package recipient. /// /// # Panics /// /// Panics if data is longer than [`MAX_PACKAGE_SIZE`]. - pub fn new(data: Vec, reliable: bool, peers: Peers, targets: T) -> Self - where - T: Into>, - { + pub fn new(data: Vec, reliable: bool, peers: Peers, target: SocketAddr) -> Self { assert!(data.len() < MAX_PACKAGE_SIZE); Self { data, reliable, peers, - targets: targets.into(), + target, } } diff --git a/crates/net/src/tasks/dsender.rs b/crates/net/src/tasks/dsender.rs index 42041597..453ff71e 100644 --- a/crates/net/src/tasks/dsender.rs +++ b/crates/net/src/tasks/dsender.rs @@ -1,28 +1,22 @@ +use std::net::SocketAddr; + use async_std::channel::Receiver; use tracing::{error, info}; -use crate::{ - header::DatagramHeader, - protocol::{ProtocolSocket, Targets}, - MAX_DATAGRAM_SIZE, -}; +use crate::{header::DatagramHeader, protocol::ProtocolSocket, MAX_DATAGRAM_SIZE}; pub(crate) struct OutDatagram { header: DatagramHeader, data: Vec, - targets: Targets<'static>, + target: SocketAddr, } impl OutDatagram { - pub(crate) fn new>>( - header: DatagramHeader, - data: Vec, - targets: T, - ) -> Self { + pub(crate) fn new(header: DatagramHeader, data: Vec, target: SocketAddr) -> Self { Self { header, data, - targets: targets.into(), + target, } } } @@ -40,7 +34,7 @@ pub(super) async fn run(port: u16, datagrams: Receiver, socket: Pro &mut buffer, datagram.header, &datagram.data, - datagram.targets, + datagram.target, ) .await { diff --git a/crates/net/src/tasks/usender.rs b/crates/net/src/tasks/usender.rs index 57ec57b7..330e5475 100644 --- a/crates/net/src/tasks/usender.rs +++ b/crates/net/src/tasks/usender.rs @@ -20,7 +20,6 @@ pub(super) async fn run( ) { info!("Starting package sender on port {port}..."); - let mut counter_reliable = PackageIdRange::counter(); let mut counter_unreliable = PackageIdRange::counter(); loop { @@ -28,8 +27,10 @@ pub(super) async fn run( break; }; + let time = Instant::now(); + let package_id = if package.reliable() { - counter_reliable.next().unwrap() + dispatch_handler.next_package_id(time, package.target).await } else { counter_unreliable.next().unwrap() }; @@ -38,23 +39,20 @@ pub(super) async fn run( if let DatagramHeader::Package(package_header) = header { if package_header.reliable() { - let time = Instant::now(); - for target in &package.targets { - dispatch_handler - .sent( - time, - target, - package_header.id(), - package_header.peers(), - &package.data, - ) - .await; - } + dispatch_handler + .sent( + time, + package.target, + package_header.id(), + package_header.peers(), + &package.data, + ) + .await; } } let closed = datagrams - .send(OutDatagram::new(header, package.data, package.targets)) + .send(OutDatagram::new(header, package.data, package.target)) .await .is_err(); diff --git a/docs/src/multiplayer/connector/protocol.md b/docs/src/multiplayer/connector/protocol.md index 8bd8b974..8f50a4eb 100644 --- a/docs/src/multiplayer/connector/protocol.md +++ b/docs/src/multiplayer/connector/protocol.md @@ -24,7 +24,8 @@ by the mask `0b1000_0000`). Each user package has an ID, encoded within the last three bytes of the datagram header. These IDs increment until they reach the maximum value that can be encoded within three bytes, after which the counter resets to 0. The ID -sequence for reliable and unreliable packages are independent. +sequence for reliable and unreliable packages are independent. Each connection +/ client has independent reliable package numbering. Packages can be transmitted in either reliable or non-reliable mode. Reliability is signaled by the second highest bit of the flags byte