Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Indy2222 committed Sep 1, 2023
1 parent dfa3652 commit ec6b009
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 39 deletions.
61 changes: 61 additions & 0 deletions crates/connector/src/game/preceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,64 @@ async fn handle_player<'a>(message: BorrowedFromPlayers<'a>, target: SocketAddr)

// TODO
}

// TODO test this whole thing
struct Builders {
slots: Vec<PlayerSlot>,
}

impl Builders {
fn new() -> Self {
Self { slots: Vec::new() }
}

// TODO rename
// TODO docs
fn setup(&mut self, players: &[(u8, SocketAddr)]) {
let mut idx = 0;
while idx < self.slots.len() {
let slot = &self.slots[idx];
let remove = players
.iter()
.all(|(id, addr)| slot.id != *id || slot.addr != *addr);
if remove {
self.slots.swap_remove(idx);
} else {
idx += 1;
}
}

for &(id, addr) in players {
let add = self.slots.iter().all(|slot| slot.id != id);
if add {
self.slots.push(PlayerSlot::new(id, addr));
}
}
}
}

struct PlayerSlot {
id: u8,
addr: SocketAddr,
builders: PlayerBuilders,
}

impl PlayerSlot {
fn new(id: u8, addr: SocketAddr) -> Self {
Self {
id,
addr,
builders: PlayerBuilders::new(),
}
}
}

struct PlayerBuilders {
// TODO
}

impl PlayerBuilders {
fn new() -> Self {
Self {}
}
}
15 changes: 15 additions & 0 deletions crates/connector/src/game/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,15 @@ impl GameState {
/// # Arguments
///
/// * `exclude` - if not None, this player is included among the targets.
// TODO do not create new Vec
pub(super) async fn targets(&self, exclude: Option<SocketAddr>) -> Vec<SocketAddr> {
self.inner.read().await.targets(exclude)
}

// TODO docs
pub(super) async fn players(&self, buf: &mut Vec<(u8, SocketAddr)>) {
self.inner.read().await.players(buf)
}
}

struct GameStateInner {
Expand Down Expand Up @@ -175,6 +181,15 @@ impl GameStateInner {
}
addrs
}

fn players(&self, buf: &mut Vec<(u8, SocketAddr)>) {
buf.clear();
buf.reserve(self.players.len());

for (&addr, player) in self.players.iter() {
buf.push((player.id, addr))
}
}
}

struct AvailableIds(Vec<u8>);
Expand Down
4 changes: 3 additions & 1 deletion crates/multiplayer/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ fn message_sender<E>(
}

for mut builder in [unreliable, unordered, semi_ordered] {
for package in builder.build() {
// Build all packages. This system runs once per frame and thus some
// aggregation is done via the update frequency.
for package in builder.build_all() {
outputs.send(package.into());
}
}
Expand Down
134 changes: 96 additions & 38 deletions crates/net/src/tasks/communicator/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::VecDeque, mem, net::SocketAddr};
use std::{collections::VecDeque, mem, net::SocketAddr, time::Instant};

use bincode::{encode_into_slice, error::EncodeError};

Expand All @@ -13,9 +13,8 @@ pub struct PackageBuilder {
reliability: Reliability,
peers: Peers,
target: SocketAddr,
buffer: Vec<u8>,
used: usize,
packages: VecDeque<PackageSlot>,
buffer: Buffer,
packages: VecDeque<OutPackage>,
}

impl PackageBuilder {
Expand All @@ -24,25 +23,32 @@ impl PackageBuilder {
reliability,
peers,
target,
buffer: vec![0; MAX_DATAGRAM_SIZE],
used: HEADER_SIZE,
buffer: Buffer::new(),
packages: VecDeque::new(),
}
}

/// Build output packages from all pushed messages.
/// Build packages from all messages pushed before a given threshold. The
/// last yielded package may contain newer data.
///
/// The messages are distributed among the packages in a sequential order.
/// Each package is filled with as many messages as it can accommodate.
// TODO update docs
// TODO arguments (older_than, full)
pub fn build(&mut self) -> PackageIterator<'_> {
if self.used > HEADER_SIZE {
/// See [`Self::build_all`].
pub fn build_old(&mut self, threshold: Instant) -> PackageIterator<'_> {
if self.buffer.birth().map_or(false, |t| t <= threshold) {
self.build_package(false);
}
PackageIterator::new(&mut self.packages)
}

/// Build packages from all pushed messages.
///
/// The messages are distributed among the packages in a sequential order.
/// Each package except the last one is filled with as many messages as it
/// can accommodate.
pub fn build_all(&mut self) -> PackageIterator<'_> {
self.build_package(true);
PackageIterator::new(&mut self.packages)
}

/// Push another message to the builder so that it is included in one of
/// the resulting packages.
pub fn push<E>(&mut self, message: &E) -> Result<(), EncodeError>
Expand All @@ -51,7 +57,7 @@ impl PackageBuilder {
{
match self.push_inner(message) {
Err(EncodeError::UnexpectedEnd) => {
self.build_package(true);
self.build_package(false);
self.push_inner(message)
}
Err(err) => Err(err),
Expand All @@ -63,55 +69,107 @@ impl PackageBuilder {
where
E: bincode::Encode,
{
let len = encode_into_slice(message, &mut self.buffer[self.used..], BINCODE_CONF)?;
self.used += len;
let len = encode_into_slice(message, self.buffer.unused_mut(), BINCODE_CONF)?;
self.buffer.forward(len);
Ok(())
}

/// Build and store another package from already buffered data.
/// Build and store another package from already buffered data (if there is
/// any).
///
/// # Arguments
///
/// * `reusable` - if false, newly created buffer for further messages will
/// * `empty` - if true, newly created buffer for further messages will
/// be empty.
fn build_package(&mut self, reusable: bool) {
let (mut data, used) = if reusable {
(vec![0; MAX_DATAGRAM_SIZE], HEADER_SIZE)
} else {
(Vec::new(), 0)
fn build_package(&mut self, empty: bool) {
let Some(data) = self.buffer.consume(empty) else {
return;
};

mem::swap(&mut data, &mut self.buffer);
data.truncate(self.used);
self.used = used;

self.packages.push_back(PackageSlot::new(OutPackage::new(
self.packages.push_back(OutPackage::new(
data,
self.reliability,
self.peers,
self.target,
)));
));
}
}

struct PackageSlot {
package: OutPackage,
struct Buffer {
/// Time of the first piece of data.
birth: Option<Instant>,
data: Vec<u8>,
used: usize,
}

impl PackageSlot {
fn new(package: OutPackage) -> Self {
Self { package }
impl Buffer {
fn new() -> Self {
Self {
birth: None,
data: vec![0; MAX_DATAGRAM_SIZE],
used: HEADER_SIZE,
}
}

/// Returns true if no data was pushed to the buffer.
fn empty(&self) -> bool {
self.used <= HEADER_SIZE
}

fn birth(&self) -> Option<Instant> {
self.birth
}

/// Resets the buffer and returns the old data (before the reset). If there
/// was no data pushed, it returns None.
///
/// # Arguments
///
/// * `empty` - if true, the new buffer will have zero capacity.
fn consume(&mut self, empty: bool) -> Option<Vec<u8>> {
if self.empty() {
return None;
}

let (mut data, used) = if empty {
(Vec::new(), 0)
} else {
(vec![0; MAX_DATAGRAM_SIZE], HEADER_SIZE)
};

mem::swap(&mut data, &mut self.data);
data.truncate(self.used);
self.used = used;
self.birth = None;

Some(data)
}

/// Returns mutable slice to the unused part of the buffer.
fn unused_mut(&mut self) -> &mut [u8] {
&mut self.data[self.used..]
}

/// Moves used data pointer forward and sets birth time to now if it is not
/// set already.
fn forward(&mut self, amount: usize) {
if self.birth.is_none() {
self.birth = Some(Instant::now());
}

self.used += amount;
debug_assert!(self.used <= self.data.len());
}
}

// TODO better name
// TODO docs + what happens if not fully consumed
pub struct PackageIterator<'a> {
packages: &'a mut VecDeque<PackageSlot>,
packages: &'a mut VecDeque<OutPackage>,
}

impl<'a> PackageIterator<'a> {
fn new(packages: &'a mut VecDeque<PackageSlot>) -> Self {
fn new(packages: &'a mut VecDeque<OutPackage>) -> Self {
Self { packages }
}
}
Expand All @@ -120,7 +178,7 @@ impl<'a> Iterator for PackageIterator<'a> {
type Item = OutPackage;

fn next(&mut self) -> Option<Self::Item> {
self.packages.pop_front().map(|p| p.package)
self.packages.pop_front()
}
}

Expand Down Expand Up @@ -150,7 +208,7 @@ mod tests {
.unwrap();
}

let packages: Vec<OutPackage> = builder.build().collect();
let packages: Vec<OutPackage> = builder.build_all().collect();
assert_eq!(packages.len(), 4);
// 3 items + something extra for the encoding
assert!(packages[0].data_slice().len() >= 128 * 3);
Expand Down

0 comments on commit ec6b009

Please sign in to comment.