diff --git a/crates/connector/src/game/preceiver.rs b/crates/connector/src/game/preceiver.rs index 5cca279f..48c0ff90 100644 --- a/crates/connector/src/game/preceiver.rs +++ b/crates/connector/src/game/preceiver.rs @@ -78,3 +78,64 @@ async fn handle_player<'a>(message: BorrowedFromPlayers<'a>, target: SocketAddr) // TODO } + +// TODO test this whole thing +struct Builders { + slots: Vec, +} + +impl Builders { + fn new() -> Self { + Self { slots: Vec::new() } + } + + // TODO rename + // TODO docs + fn setup(&mut self, players: &[(u8, SocketAddr)]) { + let mut idx = 0; + while idx < self.slots.len() { + let slot = &self.slots[idx]; + let remove = players + .iter() + .all(|(id, addr)| slot.id != *id || slot.addr != *addr); + if remove { + self.slots.swap_remove(idx); + } else { + idx += 1; + } + } + + for &(id, addr) in players { + let add = self.slots.iter().all(|slot| slot.id != id); + if add { + self.slots.push(PlayerSlot::new(id, addr)); + } + } + } +} + +struct PlayerSlot { + id: u8, + addr: SocketAddr, + builders: PlayerBuilders, +} + +impl PlayerSlot { + fn new(id: u8, addr: SocketAddr) -> Self { + Self { + id, + addr, + builders: PlayerBuilders::new(), + } + } +} + +struct PlayerBuilders { + // TODO +} + +impl PlayerBuilders { + fn new() -> Self { + Self {} + } +} diff --git a/crates/connector/src/game/state.rs b/crates/connector/src/game/state.rs index e559383f..51021b34 100644 --- a/crates/connector/src/game/state.rs +++ b/crates/connector/src/game/state.rs @@ -63,9 +63,15 @@ impl GameState { /// # Arguments /// /// * `exclude` - if not None, this player is included among the targets. + // TODO do not create new Vec pub(super) async fn targets(&self, exclude: Option) -> Vec { self.inner.read().await.targets(exclude) } + + // TODO docs + pub(super) async fn players(&self, buf: &mut Vec<(u8, SocketAddr)>) { + self.inner.read().await.players(buf) + } } struct GameStateInner { @@ -175,6 +181,15 @@ impl GameStateInner { } addrs } + + fn players(&self, buf: &mut Vec<(u8, SocketAddr)>) { + buf.clear(); + buf.reserve(self.players.len()); + + for (&addr, player) in self.players.iter() { + buf.push((player.id, addr)) + } + } } struct AvailableIds(Vec); diff --git a/crates/multiplayer/src/messages.rs b/crates/multiplayer/src/messages.rs index ff0cd151..c4025d9c 100644 --- a/crates/multiplayer/src/messages.rs +++ b/crates/multiplayer/src/messages.rs @@ -277,7 +277,9 @@ fn message_sender( } for mut builder in [unreliable, unordered, semi_ordered] { - for package in builder.build() { + // Build all packages. This system runs once per frame and thus some + // aggregation is done via the update frequency. + for package in builder.build_all() { outputs.send(package.into()); } } diff --git a/crates/net/src/tasks/communicator/builder.rs b/crates/net/src/tasks/communicator/builder.rs index 823881d1..16250da3 100644 --- a/crates/net/src/tasks/communicator/builder.rs +++ b/crates/net/src/tasks/communicator/builder.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, mem, net::SocketAddr}; +use std::{collections::VecDeque, mem, net::SocketAddr, time::Instant}; use bincode::{encode_into_slice, error::EncodeError}; @@ -13,9 +13,8 @@ pub struct PackageBuilder { reliability: Reliability, peers: Peers, target: SocketAddr, - buffer: Vec, - used: usize, - packages: VecDeque, + buffer: Buffer, + packages: VecDeque, } impl PackageBuilder { @@ -24,25 +23,32 @@ impl PackageBuilder { reliability, peers, target, - buffer: vec![0; MAX_DATAGRAM_SIZE], - used: HEADER_SIZE, + buffer: Buffer::new(), packages: VecDeque::new(), } } - /// Build output packages from all pushed messages. + /// Build packages from all messages pushed before a given threshold. The + /// last yielded package may contain newer data. /// - /// The messages are distributed among the packages in a sequential order. - /// Each package is filled with as many messages as it can accommodate. - // TODO update docs - // TODO arguments (older_than, full) - pub fn build(&mut self) -> PackageIterator<'_> { - if self.used > HEADER_SIZE { + /// See [`Self::build_all`]. + pub fn build_old(&mut self, threshold: Instant) -> PackageIterator<'_> { + if self.buffer.birth().map_or(false, |t| t <= threshold) { self.build_package(false); } PackageIterator::new(&mut self.packages) } + /// Build packages from all pushed messages. + /// + /// The messages are distributed among the packages in a sequential order. + /// Each package except the last one is filled with as many messages as it + /// can accommodate. + pub fn build_all(&mut self) -> PackageIterator<'_> { + self.build_package(true); + PackageIterator::new(&mut self.packages) + } + /// Push another message to the builder so that it is included in one of /// the resulting packages. pub fn push(&mut self, message: &E) -> Result<(), EncodeError> @@ -51,7 +57,7 @@ impl PackageBuilder { { match self.push_inner(message) { Err(EncodeError::UnexpectedEnd) => { - self.build_package(true); + self.build_package(false); self.push_inner(message) } Err(err) => Err(err), @@ -63,55 +69,107 @@ impl PackageBuilder { where E: bincode::Encode, { - let len = encode_into_slice(message, &mut self.buffer[self.used..], BINCODE_CONF)?; - self.used += len; + let len = encode_into_slice(message, self.buffer.unused_mut(), BINCODE_CONF)?; + self.buffer.forward(len); Ok(()) } - /// Build and store another package from already buffered data. + /// Build and store another package from already buffered data (if there is + /// any). /// /// # Arguments /// - /// * `reusable` - if false, newly created buffer for further messages will + /// * `empty` - if true, newly created buffer for further messages will /// be empty. - fn build_package(&mut self, reusable: bool) { - let (mut data, used) = if reusable { - (vec![0; MAX_DATAGRAM_SIZE], HEADER_SIZE) - } else { - (Vec::new(), 0) + fn build_package(&mut self, empty: bool) { + let Some(data) = self.buffer.consume(empty) else { + return; }; - mem::swap(&mut data, &mut self.buffer); - data.truncate(self.used); - self.used = used; - - self.packages.push_back(PackageSlot::new(OutPackage::new( + self.packages.push_back(OutPackage::new( data, self.reliability, self.peers, self.target, - ))); + )); } } -struct PackageSlot { - package: OutPackage, +struct Buffer { + /// Time of the first piece of data. + birth: Option, + data: Vec, + used: usize, } -impl PackageSlot { - fn new(package: OutPackage) -> Self { - Self { package } +impl Buffer { + fn new() -> Self { + Self { + birth: None, + data: vec![0; MAX_DATAGRAM_SIZE], + used: HEADER_SIZE, + } + } + + /// Returns true if no data was pushed to the buffer. + fn empty(&self) -> bool { + self.used <= HEADER_SIZE + } + + fn birth(&self) -> Option { + self.birth + } + + /// Resets the buffer and returns the old data (before the reset). If there + /// was no data pushed, it returns None. + /// + /// # Arguments + /// + /// * `empty` - if true, the new buffer will have zero capacity. + fn consume(&mut self, empty: bool) -> Option> { + if self.empty() { + return None; + } + + let (mut data, used) = if empty { + (Vec::new(), 0) + } else { + (vec![0; MAX_DATAGRAM_SIZE], HEADER_SIZE) + }; + + mem::swap(&mut data, &mut self.data); + data.truncate(self.used); + self.used = used; + self.birth = None; + + Some(data) + } + + /// Returns mutable slice to the unused part of the buffer. + fn unused_mut(&mut self) -> &mut [u8] { + &mut self.data[self.used..] + } + + /// Moves used data pointer forward and sets birth time to now if it is not + /// set already. + fn forward(&mut self, amount: usize) { + if self.birth.is_none() { + self.birth = Some(Instant::now()); + } + + self.used += amount; + debug_assert!(self.used <= self.data.len()); } } // TODO better name // TODO docs + what happens if not fully consumed pub struct PackageIterator<'a> { - packages: &'a mut VecDeque, + packages: &'a mut VecDeque, } impl<'a> PackageIterator<'a> { - fn new(packages: &'a mut VecDeque) -> Self { + fn new(packages: &'a mut VecDeque) -> Self { Self { packages } } } @@ -120,7 +178,7 @@ impl<'a> Iterator for PackageIterator<'a> { type Item = OutPackage; fn next(&mut self) -> Option { - self.packages.pop_front().map(|p| p.package) + self.packages.pop_front() } } @@ -150,7 +208,7 @@ mod tests { .unwrap(); } - let packages: Vec = builder.build().collect(); + let packages: Vec = builder.build_all().collect(); assert_eq!(packages.len(), 4); // 3 items + something extra for the encoding assert!(packages[0].data_slice().len() >= 128 * 3);