From 473d8861ddb9c1ca6f3416e7e0a7bd8507d62ffc Mon Sep 17 00:00:00 2001 From: Martin Indra Date: Sun, 27 Aug 2023 14:54:16 +0200 Subject: [PATCH] Impl FromPlayers & ToPlayers messages --- crates/connector/src/game/buffer.rs | 88 ++++++++++ crates/connector/src/game/ereceiver.rs | 6 +- crates/connector/src/game/greceiver.rs | 53 ++----- crates/connector/src/game/message.rs | 34 ++++ crates/connector/src/game/mod.rs | 2 + crates/connector/src/game/mreceiver.rs | 85 ++++++---- crates/connector/src/game/preceiver.rs | 100 ++++++------ crates/connector/src/game/state.rs | 58 ++++++- crates/messages/src/lib.rs | 2 + crates/messages/src/players.rs | 43 +++++ crates/multiplayer/src/messages.rs | 6 +- crates/net/src/lib.rs | 2 +- crates/net/src/tasks/communicator/builder.rs | 159 +++++++++++++++---- crates/net/src/tasks/communicator/mod.rs | 2 +- crates/net/src/tasks/mod.rs | 2 +- 15 files changed, 478 insertions(+), 164 deletions(-) create mode 100644 crates/connector/src/game/buffer.rs create mode 100644 crates/connector/src/game/message.rs create mode 100644 crates/messages/src/players.rs diff --git a/crates/connector/src/game/buffer.rs b/crates/connector/src/game/buffer.rs new file mode 100644 index 00000000..d123b0ec --- /dev/null +++ b/crates/connector/src/game/buffer.rs @@ -0,0 +1,88 @@ +use std::{ + net::SocketAddr, + time::{Duration, Instant}, +}; + +use bincode::error::EncodeError; +use de_net::{OutPackage, PackageBuilder, PackageIterator, Peers, Reliability}; + +const UNRELIABLE_TIMEOUT: Duration = Duration::from_millis(10); +const RELIABLE_TIMEOUT: Duration = Duration::from_millis(50); + +/// Buffers of player messages and package builder. +pub(super) struct PlayerBuffer { + unreliable: PackageBuilder, + unordered: PackageBuilder, + semi_ordered: PackageBuilder, +} + +impl PlayerBuffer { + pub(super) fn new(target: SocketAddr) -> Self { + Self { + unreliable: PackageBuilder::new(Reliability::Unreliable, Peers::Players, target), + unordered: PackageBuilder::new(Reliability::Unordered, Peers::Players, target), + semi_ordered: PackageBuilder::new(Reliability::SemiOrdered, Peers::Players, target), + } + } + + /// Pushes a single message to an appropriate buffer. + pub(super) fn push( + &mut self, + reliability: Reliability, + message: &E, + ) -> Result<(), EncodeError> + where + E: bincode::Encode, + { + self.builder_mut(reliability).push(message) + } + + /// Builds packages from old enough messages and removes the packages from + /// the buffer. + /// + /// # Arguments + /// + /// * `time` - current time. + pub(super) fn build(&mut self, time: Instant) -> PlayerPackageIterator<'_> { + let unreliable_threshodl = time - UNRELIABLE_TIMEOUT; + let reliable_threshodl = time - RELIABLE_TIMEOUT; + + PlayerPackageIterator { + index: 0, + iterators: [ + self.unreliable.build_old(unreliable_threshodl), + self.unordered.build_old(reliable_threshodl), + self.semi_ordered.build_old(reliable_threshodl), + ], + } + } + + fn builder_mut(&mut self, reliability: Reliability) -> &mut PackageBuilder { + match reliability { + Reliability::Unreliable => &mut self.unreliable, + Reliability::Unordered => &mut self.unordered, + Reliability::SemiOrdered => &mut self.semi_ordered, + } + } +} + +pub(super) struct PlayerPackageIterator<'a> { + index: usize, + iterators: [PackageIterator<'a>; 3], +} + +impl<'a> Iterator for PlayerPackageIterator<'a> { + type Item = OutPackage; + + fn next(&mut self) -> Option { + while self.index < self.iterators.len() { + let item = self.iterators[self.index].next(); + if item.is_some() { + return item; + } + self.index += 1; + } + + None + } +} diff --git a/crates/connector/src/game/ereceiver.rs b/crates/connector/src/game/ereceiver.rs index 207b56d2..58359e48 100644 --- a/crates/connector/src/game/ereceiver.rs +++ b/crates/connector/src/game/ereceiver.rs @@ -5,9 +5,9 @@ use de_messages::ToGame; use de_net::{ConnErrorReceiver, Reliability}; use tracing::{error, info, warn}; -use super::greceiver::ToGameMessage; +use super::message::InMessage; -pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender) { +pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender>) { info!("Starting game connection error handler on port {port}..."); loop { @@ -26,7 +26,7 @@ pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender Self { - Self { - meta: MessageMeta { - source, - reliability, - }, - message, - } - } -} - -struct MessageMeta { - source: SocketAddr, - reliability: Reliability, -} - pub(super) struct GameProcessor { port: u16, owner: SocketAddr, - messages: Receiver, + messages: Receiver>, outputs: Sender, state: GameState, clients: Clients, @@ -46,7 +27,7 @@ impl GameProcessor { pub(super) fn new( port: u16, owner: SocketAddr, - messages: Receiver, + messages: Receiver>, outputs: Sender, state: GameState, clients: Clients, @@ -93,18 +74,18 @@ impl GameProcessor { continue; } - match message.message { + match message.message() { ToGame::Ping(id) => { - self.process_ping(message.meta, id).await; + self.process_ping(message.meta(), *id).await; } ToGame::Join => { - self.process_join(message.meta).await; + self.process_join(message.meta()).await; } ToGame::Leave => { - self.process_leave(message.meta).await; + self.process_leave(message.meta()).await; } ToGame::Readiness(readiness) => { - self.process_readiness(message.meta, readiness).await; + self.process_readiness(message.meta(), *readiness).await; } } @@ -122,8 +103,8 @@ impl GameProcessor { /// Returns true if the massage should be ignored and further handles such /// messages. - async fn handle_ignore(&self, message: &ToGameMessage) -> bool { - if matches!(message.message, ToGame::Join | ToGame::Leave) { + async fn handle_ignore(&self, message: &InMessage) -> bool { + if matches!(message.message(), ToGame::Join | ToGame::Leave) { // Join must be excluded from the condition because of the // chicken and egg problem. // @@ -132,22 +113,22 @@ impl GameProcessor { return false; } - if self.state.contains(message.meta.source).await { + if self.state.contains(message.meta().source).await { return false; } warn!( "Received a game message from a non-participating client: {:?}.", - message.meta.source + message.meta().source ); let _ = self .outputs .send( OutPackage::encode_single( &FromGame::NotJoined, - message.meta.reliability, + message.meta().reliability, Peers::Server, - message.meta.source, + message.meta().source, ) .unwrap(), ) diff --git a/crates/connector/src/game/message.rs b/crates/connector/src/game/message.rs new file mode 100644 index 00000000..d5de8280 --- /dev/null +++ b/crates/connector/src/game/message.rs @@ -0,0 +1,34 @@ +use std::net::SocketAddr; + +use de_net::Reliability; + +pub(super) struct InMessage { + meta: MessageMeta, + message: M, +} + +impl InMessage { + pub(super) fn new(source: SocketAddr, reliability: Reliability, message: M) -> Self { + Self { + meta: MessageMeta { + source, + reliability, + }, + message, + } + } + + pub(super) fn meta(&self) -> MessageMeta { + self.meta + } + + pub(super) fn message(&self) -> &M { + &self.message + } +} + +#[derive(Clone, Copy)] +pub(super) struct MessageMeta { + pub(super) source: SocketAddr, + pub(super) reliability: Reliability, +} diff --git a/crates/connector/src/game/mod.rs b/crates/connector/src/game/mod.rs index d03f6644..5154c0db 100644 --- a/crates/connector/src/game/mod.rs +++ b/crates/connector/src/game/mod.rs @@ -6,8 +6,10 @@ use de_net::{self, Socket}; use self::{greceiver::GameProcessor, state::GameState}; use crate::clients::Clients; +mod buffer; mod ereceiver; mod greceiver; +mod message; mod mreceiver; mod preceiver; mod state; diff --git a/crates/connector/src/game/mreceiver.rs b/crates/connector/src/game/mreceiver.rs index df65afc6..39375d38 100644 --- a/crates/connector/src/game/mreceiver.rs +++ b/crates/connector/src/game/mreceiver.rs @@ -1,17 +1,19 @@ use std::time::Duration; use async_std::{channel::Sender, future::timeout}; -use de_net::{PackageReceiver, Peers}; +use bincode::error::DecodeError; +use de_messages::{ToGame, ToPlayers}; +use de_net::{InPackage, PackageReceiver, Peers}; +use thiserror::Error; use tracing::{error, info, warn}; -use super::greceiver::ToGameMessage; -use crate::game::preceiver::PlayersPackage; +use super::message::InMessage; pub(super) async fn run( port: u16, packages: PackageReceiver, - server: Sender, - players: Sender, + server: Sender>, + players: Sender>, ) { info!("Starting game server input processor on port {port}..."); @@ -34,40 +36,55 @@ pub(super) async fn run( break; }; - match package.peers() { - Peers::Server => { - for message_result in package.decode() { - match message_result { - Ok(message) => { - let result = server - .send(ToGameMessage::new( - package.source(), - package.reliability(), - message, - )) - .await; - if result.is_err() { - break; - } - } - Err(err) => { - warn!("Received invalid package: {err:?}"); - break; - } + let peers = package.peers(); + let result = match peers { + Peers::Server => handle_package(package, &server).await, + Peers::Players => handle_package(package, &players).await, + }; + + if let Err(err) = result { + match err { + PackageHandleError::Decode(err) => { + warn!("Received invalid package: {err:?}"); + } + PackageHandleError::SendError => { + if peers == Peers::Server { + break; } } } - Peers::Players => { - let _ = players - .send(PlayersPackage::new( - package.reliability(), - package.source(), - package.data(), - )) - .await; - } } } info!("Game server input processor on port {port} finished."); } + +async fn handle_package( + package: InPackage, + output: &Sender>, +) -> Result<(), PackageHandleError> +where + M: bincode::Decode, +{ + for message_result in package.decode() { + let message = message_result.map_err(PackageHandleError::from)?; + output + .send(InMessage::new( + package.source(), + package.reliability(), + message, + )) + .await + .map_err(|_| PackageHandleError::SendError)?; + } + + Ok(()) +} + +#[derive(Debug, Error)] +enum PackageHandleError { + #[error("Decoding error: {0}")] + Decode(#[from] DecodeError), + #[error("Sending to output channel failed.")] + SendError, +} diff --git a/crates/connector/src/game/preceiver.rs b/crates/connector/src/game/preceiver.rs index 0968f74b..e2020207 100644 --- a/crates/connector/src/game/preceiver.rs +++ b/crates/connector/src/game/preceiver.rs @@ -1,39 +1,22 @@ -use std::net::SocketAddr; +use std::time::{Duration, Instant}; -use async_std::channel::Receiver; -use de_messages::FromGame; -use de_net::{OutPackage, PackageSender, Peers, Reliability}; +use async_std::{channel::Receiver, future::timeout}; +use de_messages::{BorrowedFromPlayers, FromGame, ToPlayers}; +use de_net::{OutPackage, PackageSender, Peers}; use tracing::{error, info, warn}; -use super::state::GameState; - -/// A package destined to other players in the game. -pub(super) struct PlayersPackage { - reliability: Reliability, - source: SocketAddr, - data: Vec, -} - -impl PlayersPackage { - pub(super) fn new(reliability: Reliability, source: SocketAddr, data: Vec) -> Self { - Self { - reliability, - source, - data, - } - } -} +use super::{message::InMessage, state::GameState}; pub(super) async fn run( port: u16, - packages: Receiver, + messages: Receiver>, outputs: PackageSender, - state: GameState, + mut state: GameState, ) { info!("Starting game player package handler on port {port}..."); 'main: loop { - if packages.is_closed() { + if messages.is_closed() { break; } @@ -42,41 +25,50 @@ pub(super) async fn run( break; } - let Ok(package) = packages.recv().await else { - break; + // TODO timeout constant? + let message = match timeout(Duration::from_millis(10), messages.recv()).await { + Ok(Ok(message)) => Some(message), + Ok(Err(_)) => break 'main, + Err(_) => None, }; - if !state.contains(package.source).await { - warn!( - "Received a player message from a non-participating client: {:?}.", - package.source - ); + if let Some(message) = message { + let meta = message.meta(); + let Some(player_id) = state.id(meta.source).await else { + warn!( + "Received a player message from a non-participating client: {:?}.", + meta.source + ); - let _ = outputs - .send( - OutPackage::encode_single( - &FromGame::NotJoined, - package.reliability, - Peers::Server, - package.source, + let _ = outputs + .send( + OutPackage::encode_single( + &FromGame::NotJoined, + meta.reliability, + Peers::Server, + meta.source, + ) + .unwrap(), ) - .unwrap(), - ) - .await; - continue; + .await; + continue; + }; + + let out_message = BorrowedFromPlayers::new(player_id, message.message()); + for buffer in state.lock().await.buffers_mut(Some(meta.source)) { + if let Err(err) = buffer.push(meta.reliability, &out_message) { + error!("Could not encode player message, skipping: {err:?}"); + } + } } - for target in state.targets(Some(package.source)).await { - let result = outputs - .send(OutPackage::from_slice( - &package.data, - package.reliability, - Peers::Players, - target, - )) - .await; - if result.is_err() { - break 'main; + let time = Instant::now(); + for buffer in state.lock().await.buffers_mut(None) { + for output in buffer.build(time) { + let result = outputs.send(output).await; + if result.is_err() { + break 'main; + } } } } diff --git a/crates/connector/src/game/state.rs b/crates/connector/src/game/state.rs index 344634bd..2bae0c64 100644 --- a/crates/connector/src/game/state.rs +++ b/crates/connector/src/game/state.rs @@ -1,10 +1,12 @@ use std::{collections::hash_map::Entry, net::SocketAddr}; use ahash::AHashMap; -use async_std::sync::{Arc, RwLock}; +use async_std::sync::{Arc, RwLock, RwLockWriteGuard}; use de_messages::Readiness; use thiserror::Error; +use super::buffer::PlayerBuffer; + #[derive(Clone)] pub(super) struct GameState { inner: Arc>, @@ -17,6 +19,12 @@ impl GameState { } } + pub(crate) async fn lock(&mut self) -> GameStateGuard { + GameStateGuard { + guard: self.inner.write().await, + } + } + /// Returns true if there is no players currently connected to the game. pub(super) async fn is_empty(&self) -> bool { self.inner.read().await.is_empty() @@ -27,6 +35,12 @@ impl GameState { self.inner.read().await.contains(addr) } + /// Returns ID of the player or None if such player is not part of the + /// game. + pub(super) async fn id(&self, addr: SocketAddr) -> Option { + self.inner.read().await.id(addr) + } + /// Adds a player to the game and returns ID of the added player. pub(super) async fn add(&mut self, addr: SocketAddr) -> Result { self.inner.write().await.add(addr) @@ -62,6 +76,25 @@ impl GameState { } } +/// The lock is unlocked once this guard is dropped. +pub(super) struct GameStateGuard<'a> { + guard: RwLockWriteGuard<'a, GameStateInner>, +} + +impl<'a> GameStateGuard<'a> { + /// Returns an iterator over message buffers of all or all but one player. + /// + /// # Arguments + /// + /// * `exclude` - exclude this player from the iterator. + pub(super) fn buffers_mut( + &mut self, + exclude: Option, + ) -> impl Iterator { + self.guard.buffers_mut(exclude) + } +} + struct GameStateInner { available_ids: AvailableIds, readiness: Readiness, @@ -85,6 +118,10 @@ impl GameStateInner { self.players.contains_key(&addr) } + fn id(&self, addr: SocketAddr) -> Option { + self.players.get(&addr).map(|p| p.id) + } + fn add(&mut self, addr: SocketAddr) -> Result { if self.readiness != Readiness::NotReady { return Err(JoinError::GameNotOpened); @@ -94,7 +131,7 @@ impl GameStateInner { Entry::Occupied(_) => Err(JoinError::AlreadyJoined), Entry::Vacant(vacant) => match self.available_ids.lease() { Some(id) => { - vacant.insert(Player::new(id)); + vacant.insert(Player::new(id, addr)); Ok(id) } None => Err(JoinError::GameFull), @@ -165,6 +202,19 @@ impl GameStateInner { } addrs } + + fn buffers_mut( + &mut self, + exclude: Option, + ) -> impl Iterator { + self.players.iter_mut().filter_map(move |(addr, player)| { + if Some(*addr) == exclude { + None + } else { + Some(&mut player.buffer) + } + }) + } } struct AvailableIds(Vec); @@ -222,13 +272,15 @@ pub(super) enum ReadinessUpdateError { struct Player { id: u8, readiness: Readiness, + buffer: PlayerBuffer, } impl Player { - fn new(id: u8) -> Self { + fn new(id: u8, addr: SocketAddr) -> Self { Self { id, readiness: Readiness::default(), + buffer: PlayerBuffer::new(addr), } } } diff --git a/crates/messages/src/lib.rs b/crates/messages/src/lib.rs index b4d4b5c3..ef8519f3 100644 --- a/crates/messages/src/lib.rs +++ b/crates/messages/src/lib.rs @@ -2,7 +2,9 @@ //! Connector during multiplayer game. pub use game::{FromGame, JoinError, Readiness, ToGame}; +pub use players::{BorrowedFromPlayers, FromPlayers, ToPlayers}; pub use server::{FromServer, GameOpenError, ToServer}; mod game; +mod players; mod server; diff --git a/crates/messages/src/players.rs b/crates/messages/src/players.rs new file mode 100644 index 00000000..d692c006 --- /dev/null +++ b/crates/messages/src/players.rs @@ -0,0 +1,43 @@ +use bincode::{Decode, Encode}; + +/// Messages to be sent by a player/client or occasionally the game server to +/// other players. +#[derive(Debug, Decode)] +pub struct FromPlayers { + /// ID of the sending player. + source: u8, + /// Original message. + message: ToPlayers, +} + +impl FromPlayers { + /// ID of the sending player + pub fn source(&self) -> u8 { + self.source + } + + pub fn message(&self) -> &ToPlayers { + &self.message + } +} + +/// Messages to be sent by a player/client or occasionally the game server to +/// other players. +#[derive(Debug, Encode, Clone, Copy)] +pub struct BorrowedFromPlayers<'a> { + /// ID of the sending player. + source: u8, + /// Original message. + message: &'a ToPlayers, +} + +impl<'a> BorrowedFromPlayers<'a> { + pub fn new(source: u8, message: &'a ToPlayers) -> Self { + Self { source, message } + } +} + +/// Message to be sent by a player/client or occasionally the game server to +/// the game server for the distribution to other game players. +#[derive(Debug, Encode, Decode)] +pub enum ToPlayers {} diff --git a/crates/multiplayer/src/messages.rs b/crates/multiplayer/src/messages.rs index 1c2086aa..c4025d9c 100644 --- a/crates/multiplayer/src/messages.rs +++ b/crates/multiplayer/src/messages.rs @@ -276,8 +276,10 @@ fn message_sender( builder.push(event.message()).unwrap(); } - for builder in [unreliable, unordered, semi_ordered] { - for package in builder.build() { + for mut builder in [unreliable, unordered, semi_ordered] { + // Build all packages. This system runs once per frame and thus some + // aggregation is done via the update frequency. + for package in builder.build_all() { outputs.send(package.into()); } } diff --git a/crates/net/src/lib.rs b/crates/net/src/lib.rs index 5170a99a..5f5382c5 100644 --- a/crates/net/src/lib.rs +++ b/crates/net/src/lib.rs @@ -3,7 +3,7 @@ pub use protocol::MAX_PACKAGE_SIZE; pub use socket::{RecvError, SendError, Socket, MAX_DATAGRAM_SIZE}; pub use tasks::{ startup, ConnErrorReceiver, ConnectionError, InPackage, MessageDecoder, OutPackage, - PackageBuilder, PackageReceiver, PackageSender, + PackageBuilder, PackageIterator, PackageReceiver, PackageSender, }; mod connection; diff --git a/crates/net/src/tasks/communicator/builder.rs b/crates/net/src/tasks/communicator/builder.rs index 824b084e..4523d448 100644 --- a/crates/net/src/tasks/communicator/builder.rs +++ b/crates/net/src/tasks/communicator/builder.rs @@ -1,4 +1,4 @@ -use std::{mem, net::SocketAddr}; +use std::{collections::VecDeque, mem, net::SocketAddr, time::Instant}; use bincode::{encode_into_slice, error::EncodeError}; @@ -13,9 +13,8 @@ pub struct PackageBuilder { reliability: Reliability, peers: Peers, target: SocketAddr, - buffer: Vec, - used: usize, - packages: Vec, + buffer: Buffer, + packages: VecDeque, } impl PackageBuilder { @@ -24,21 +23,35 @@ impl PackageBuilder { reliability, peers, target, - buffer: vec![0; MAX_DATAGRAM_SIZE], - used: HEADER_SIZE, - packages: Vec::new(), + buffer: Buffer::new(), + packages: VecDeque::new(), } } - /// Build output packages from all pushed messages. + /// Build packages from all messages pushed before a given threshold. The + /// last yielded package may contain newer data. /// - /// The messages are distributed among the packages in a sequential order. - /// Each package is filled with as many messages as it can accommodate. - pub fn build(mut self) -> Vec { - if self.used > HEADER_SIZE { + /// See [`Self::build_all`]. + pub fn build_old(&mut self, threshold: Instant) -> PackageIterator<'_> { + if self.buffer.birth().map_or(false, |t| t <= threshold) { self.build_package(false); } - self.packages + + // Threshold is used only in the condition to build package from + // currently buffered messages. It makes little sense to make yielding + // of already build packages conditional because the packages cannot + // change in the future. + PackageIterator::new(&mut self.packages) + } + + /// Build packages from all pushed messages. + /// + /// The messages are distributed among the packages in a sequential order. + /// Each package except the last one is filled with as many messages as it + /// can accommodate. + pub fn build_all(&mut self) -> PackageIterator<'_> { + self.build_package(true); + PackageIterator::new(&mut self.packages) } /// Push another message to the builder so that it is included in one of @@ -49,7 +62,7 @@ impl PackageBuilder { { match self.push_inner(message) { Err(EncodeError::UnexpectedEnd) => { - self.build_package(true); + self.build_package(false); self.push_inner(message) } Err(err) => Err(err), @@ -61,29 +74,24 @@ impl PackageBuilder { where E: bincode::Encode, { - let len = encode_into_slice(message, &mut self.buffer[self.used..], BINCODE_CONF)?; - self.used += len; + let len = encode_into_slice(message, self.buffer.unused_mut(), BINCODE_CONF)?; + self.buffer.forward(len); Ok(()) } - /// Build and store another package from already buffered data. + /// Build and store another package from already buffered data (if there is + /// any). /// /// # Arguments /// - /// * `reusable` - if false, newly created buffer for further messages will + /// * `empty` - if true, newly created buffer for further messages will /// be empty. - fn build_package(&mut self, reusable: bool) { - let (mut data, used) = if reusable { - (vec![0; MAX_DATAGRAM_SIZE], HEADER_SIZE) - } else { - (Vec::new(), 0) + fn build_package(&mut self, empty: bool) { + let Some(data) = self.buffer.consume(empty) else { + return; }; - mem::swap(&mut data, &mut self.buffer); - data.truncate(self.used); - self.used = used; - - self.packages.push(OutPackage::new( + self.packages.push_back(OutPackage::new( data, self.reliability, self.peers, @@ -92,6 +100,99 @@ impl PackageBuilder { } } +struct Buffer { + /// Time of the first piece of data. + birth: Option, + data: Vec, + used: usize, +} + +impl Buffer { + fn new() -> Self { + Self { + birth: None, + data: vec![0; MAX_DATAGRAM_SIZE], + used: HEADER_SIZE, + } + } + + /// Returns true if no data was pushed to the buffer. + fn empty(&self) -> bool { + self.used <= HEADER_SIZE + } + + fn birth(&self) -> Option { + self.birth + } + + /// Resets the buffer and returns the old data (before the reset). If there + /// was no data pushed, it returns None. + /// + /// # Arguments + /// + /// * `empty` - if true, the new buffer may be created with zero capacity + /// as an optimization. + fn consume(&mut self, empty: bool) -> Option> { + if self.empty() { + return None; + } + + let (mut data, used) = if empty { + (Vec::new(), 0) + } else { + (vec![0; MAX_DATAGRAM_SIZE], HEADER_SIZE) + }; + + mem::swap(&mut data, &mut self.data); + data.truncate(self.used); + self.used = used; + self.birth = None; + + Some(data) + } + + /// Returns mutable slice to the unused part of the buffer. + fn unused_mut(&mut self) -> &mut [u8] { + &mut self.data[self.used..] + } + + /// Moves used data pointer forward and sets birth time to now if it is not + /// set already. + /// + /// # Panics + /// + /// May panic if the pointer is moved beyond the buffer capacity. + fn forward(&mut self, amount: usize) { + if self.birth.is_none() { + self.birth = Some(Instant::now()); + } + + self.used += amount; + debug_assert!(self.used <= self.data.len()); + } +} + +/// Iterator over already build packages. +/// +/// Not consumed packages stay in the buffer. +pub struct PackageIterator<'a> { + packages: &'a mut VecDeque, +} + +impl<'a> PackageIterator<'a> { + fn new(packages: &'a mut VecDeque) -> Self { + Self { packages } + } +} + +impl<'a> Iterator for PackageIterator<'a> { + type Item = OutPackage; + + fn next(&mut self) -> Option { + self.packages.pop_front() + } +} + #[cfg(test)] mod tests { use super::*; @@ -118,7 +219,7 @@ mod tests { .unwrap(); } - let packages = builder.build(); + let packages: Vec = builder.build_all().collect(); assert_eq!(packages.len(), 4); // 3 items + something extra for the encoding assert!(packages[0].data_slice().len() >= 128 * 3); diff --git a/crates/net/src/tasks/communicator/mod.rs b/crates/net/src/tasks/communicator/mod.rs index 6586b0e6..259099fd 100644 --- a/crates/net/src/tasks/communicator/mod.rs +++ b/crates/net/src/tasks/communicator/mod.rs @@ -1,5 +1,5 @@ use bincode::config::{BigEndian, Configuration, Limit, Varint}; -pub use builder::PackageBuilder; +pub use builder::{PackageBuilder, PackageIterator}; pub use channels::{ConnErrorReceiver, ConnectionError, PackageReceiver, PackageSender}; pub use decode::{InPackage, MessageDecoder}; pub use encode::OutPackage; diff --git a/crates/net/src/tasks/mod.rs b/crates/net/src/tasks/mod.rs index b52fa41c..6094b1ec 100644 --- a/crates/net/src/tasks/mod.rs +++ b/crates/net/src/tasks/mod.rs @@ -58,7 +58,7 @@ use async_std::channel::bounded; pub use communicator::{ ConnErrorReceiver, ConnectionError, InPackage, MessageDecoder, OutPackage, PackageBuilder, - PackageReceiver, PackageSender, + PackageIterator, PackageReceiver, PackageSender, }; pub(crate) use dsender::OutDatagram; use futures::future::BoxFuture;