diff --git a/crates/net/src/tasks/communicator.rs b/crates/net/src/tasks/communicator.rs deleted file mode 100644 index a7eac375..00000000 --- a/crates/net/src/tasks/communicator.rs +++ /dev/null @@ -1,422 +0,0 @@ -use std::{marker::PhantomData, mem, net::SocketAddr, ops::Deref, time::Instant}; - -use async_std::channel::{Receiver, Sender}; -use bincode::{ - config::{BigEndian, Configuration, Limit, Varint}, - decode_from_slice, encode_into_slice, encode_into_std_write, - error::{DecodeError, EncodeError}, -}; - -use crate::{ - header::{Peers, Reliability, HEADER_SIZE}, - protocol::MAX_PACKAGE_SIZE, - MAX_DATAGRAM_SIZE, -}; - -const BINCODE_CONF: Configuration> = - bincode::config::standard() - .with_big_endian() - .with_variable_int_encoding() - .with_limit::(); - -/// It cumulatively builds output packages from individual messages. -pub struct PackageBuilder { - reliability: Reliability, - peers: Peers, - target: SocketAddr, - buffer: Vec, - used: usize, - packages: Vec, -} - -impl PackageBuilder { - pub fn new(reliability: Reliability, peers: Peers, target: SocketAddr) -> Self { - Self { - reliability, - peers, - target, - buffer: vec![0; MAX_DATAGRAM_SIZE], - used: HEADER_SIZE, - packages: Vec::new(), - } - } - - /// Build output packages from all pushed messages. - /// - /// The messages are distributed among the packages in a sequential order. - /// Each package is filled with as many messages as it can accommodate. - pub fn build(mut self) -> Vec { - if self.used > HEADER_SIZE { - self.build_package(false); - } - self.packages - } - - /// Push another message to the builder so that it is included in one of - /// the resulting packages. - pub fn push(&mut self, message: &E) -> Result<(), EncodeError> - where - E: bincode::Encode, - { - match self.push_inner(message) { - Err(EncodeError::UnexpectedEnd) => { - self.build_package(true); - self.push_inner(message) - } - Err(err) => Err(err), - Ok(()) => Ok(()), - } - } - - fn push_inner(&mut self, message: &E) -> Result<(), EncodeError> - where - E: bincode::Encode, - { - let len = encode_into_slice(message, &mut self.buffer[self.used..], BINCODE_CONF)?; - self.used += len; - Ok(()) - } - - /// Build and store another package from already buffered data. - /// - /// # Arguments - /// - /// * `reusable` - if false, newly created buffer for further messages will - /// be empty. - fn build_package(&mut self, reusable: bool) { - let (mut data, used) = if reusable { - (vec![0; MAX_DATAGRAM_SIZE], HEADER_SIZE) - } else { - (Vec::new(), 0) - }; - - mem::swap(&mut data, &mut self.buffer); - data.truncate(self.used); - self.used = used; - - self.packages.push(OutPackage::new( - data, - self.reliability, - self.peers, - self.target, - )); - } -} - -/// A package to be send. -pub struct OutPackage { - /// First [`HEADER_SIZE`] bytes are reserved for the header. Payload must - /// follow. - data: Vec, - reliability: Reliability, - peers: Peers, - pub(super) target: SocketAddr, -} - -impl OutPackage { - /// Creates a package from a single message. - pub fn encode_single( - message: &E, - reliability: Reliability, - peers: Peers, - target: SocketAddr, - ) -> Result - where - E: bincode::Encode, - { - let mut data = Vec::with_capacity(HEADER_SIZE + 1); - data.extend([0; HEADER_SIZE]); - encode_into_std_write(message, &mut data, BINCODE_CONF)?; - Ok(Self::new(data, reliability, peers, target)) - } - - /// # Panics - /// - /// If `data` is longer than [`MAX_PACKAGE_SIZE`]. - pub fn from_slice( - data: &[u8], - reliability: Reliability, - peers: Peers, - target: SocketAddr, - ) -> Self { - assert!(data.len() <= MAX_PACKAGE_SIZE); - - let mut full_data = Vec::with_capacity(HEADER_SIZE + data.len()); - full_data.extend([0; HEADER_SIZE]); - full_data.extend(data); - Self::new(full_data, reliability, peers, target) - } - - /// # Arguments - /// - /// * `data` - data to be send. The message data must start exactly at - /// [`HEADER_SIZE`]. The initial bytes are reserved for the header. The - /// header is not filled by the caller. - /// - /// * `reliability` - package delivery reliability mode. - /// - /// * `target` - package recipient. - /// - /// # Panics - /// - /// * If data length is smaller or equal to header size.. - /// - /// * If data is longer than [`MAX_DATAGRAM_SIZE`]. - fn new(data: Vec, reliability: Reliability, peers: Peers, target: SocketAddr) -> Self { - assert!(data.len() > HEADER_SIZE); - assert!(data.len() <= MAX_DATAGRAM_SIZE); - Self { - data, - reliability, - peers, - target, - } - } - - /// Returns package data. - /// - /// The data start at [`HEADER_SIZE`] so that header may be written - /// to the beginning of the vector. - pub(super) fn data(self) -> Vec { - self.data - } - - /// Returns slice to the payload part (without header) of the data. - pub(super) fn data_slice(&self) -> &[u8] { - &self.data[HEADER_SIZE..] - } - - pub(super) fn reliability(&self) -> Reliability { - self.reliability - } - - pub(super) fn peers(&self) -> Peers { - self.peers - } -} - -/// A received message / datagram. -pub struct InPackage { - data: Vec, - reliability: Reliability, - peers: Peers, - source: SocketAddr, - time: Instant, -} - -impl InPackage { - pub(super) fn new( - data: Vec, - reliability: Reliability, - peers: Peers, - source: SocketAddr, - time: Instant, - ) -> Self { - Self { - data, - reliability, - peers, - source, - time, - } - } - - pub fn data(self) -> Vec { - self.data - } - - /// Interpret the data as a sequence of encoded messages. - pub fn decode(&self) -> MessageDecoder - where - E: bincode::Decode, - { - MessageDecoder { - data: self.data.as_slice(), - offset: 0, - _marker: PhantomData, - } - } - - pub fn reliability(&self) -> Reliability { - self.reliability - } - - pub fn source(&self) -> SocketAddr { - self.source - } - - pub fn peers(&self) -> Peers { - self.peers - } - - /// Package arrival time. - pub fn time(&self) -> Instant { - self.time - } -} - -/// An iterator which decodes binary input data item by item. -pub struct MessageDecoder<'a, E> -where - E: bincode::Decode, -{ - data: &'a [u8], - offset: usize, - _marker: PhantomData, -} - -impl<'a, E> Iterator for MessageDecoder<'a, E> -where - E: bincode::Decode, -{ - type Item = Result; - - fn next(&mut self) -> Option { - if self.offset >= self.data.len() { - return None; - } - - match decode_from_slice(&self.data[self.offset..], BINCODE_CONF) { - Ok((item, len)) => { - self.offset += len; - Some(Ok(item)) - } - Err(err) => Some(Err(err)), - } - } -} - -/// This error indicates failure to deliver a package to the target. -pub struct ConnectionError { - target: SocketAddr, -} - -impl ConnectionError { - pub(super) fn new(target: SocketAddr) -> Self { - Self { target } - } - - pub fn target(&self) -> SocketAddr { - self.target - } -} - -/// Channel into networking stack tasks, used for data sending. -/// -/// The data-sending components of the networking stack are halted when this -/// channel is closed (dropped). -pub struct PackageSender(pub(crate) Sender); - -impl Deref for PackageSender { - type Target = Sender; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// Channel into networking stack tasks, used for data receiving. -/// -/// This is based on a bounded queue, so non-receiving of packages can -/// eventually block the networking stack. -/// -/// The data-receiving components of the networking stack are halted when this -/// channel is closed or dropped. -pub struct PackageReceiver(pub(crate) Receiver); - -impl Deref for PackageReceiver { - type Target = Receiver; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// Channel into networking stack tasks, used for receiving connection errors. -/// -/// This channel is based on a bounded queue; therefore, the non-receiving of -/// errors can eventually block the networking stack. -/// -/// If the connection errors are not needed, this channel can be safely -/// dropped. Its closure does not stop or block any part of the networking -/// stack. Although it must be dropped for the networking stack to fully -/// terminate. -pub struct ConnErrorReceiver(pub(crate) Receiver); - -impl Deref for ConnErrorReceiver { - type Target = Receiver; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -#[cfg(test)] -mod tests { - use bincode::Decode; - - use super::*; - - #[test] - fn test_out_message_builder() { - #[derive(bincode::Encode)] - struct TestData { - values: [u64; 16], // up to 128 bytes - } - - let mut builder = PackageBuilder::new( - Reliability::Unordered, - Peers::Players, - "127.0.0.1:1111".parse::().unwrap(), - ); - - for i in 0..10 { - builder - .push(&TestData { - // Use large u64 so that the value cannot be shrunk. - values: [u64::MAX - (i as u64); 16], - }) - .unwrap(); - } - - let packages = builder.build(); - assert_eq!(packages.len(), 4); - // 3 items + something extra for the encoding - assert!(packages[0].data.len() >= 128 * 3); - // less then 4 items - assert!(packages[0].data.len() < 128 * 4); - - assert!(packages[1].data.len() >= 128 * 3); - assert!(packages[1].data.len() < 128 * 4); - assert!(packages[2].data.len() >= 128 * 3); - assert!(packages[2].data.len() < 128 * 4); - // last one contains only one leftover item - assert!(packages[3].data.len() >= 128); - assert!(packages[3].data.len() < 128 * 2); - } - - #[test] - fn test_decoding() { - #[derive(Decode, Debug, Eq, PartialEq)] - enum Message { - One(u16), - Two([u32; 2]), - } - - let package = InPackage { - // Message::Two([3, 4]), Message::One(1286) - data: vec![1, 3, 4, 0, 251, 5, 6], - reliability: Reliability::Unreliable, - peers: Peers::Players, - source: "127.0.0.1:1111".parse().unwrap(), - time: Instant::now(), - }; - - let mut items: MessageDecoder = package.decode(); - let first = items.next().unwrap().unwrap(); - assert_eq!(first, Message::Two([3, 4])); - let second = items.next().unwrap().unwrap(); - assert_eq!(second, Message::One(1286)); - assert!(items.next().is_none()); - } -} diff --git a/crates/net/src/tasks/communicator/builder.rs b/crates/net/src/tasks/communicator/builder.rs new file mode 100644 index 00000000..824b084e --- /dev/null +++ b/crates/net/src/tasks/communicator/builder.rs @@ -0,0 +1,136 @@ +use std::{mem, net::SocketAddr}; + +use bincode::{encode_into_slice, error::EncodeError}; + +use crate::{ + header::{Peers, Reliability, HEADER_SIZE}, + tasks::communicator::BINCODE_CONF, + OutPackage, MAX_DATAGRAM_SIZE, +}; + +/// It cumulatively builds output packages from individual messages. +pub struct PackageBuilder { + reliability: Reliability, + peers: Peers, + target: SocketAddr, + buffer: Vec, + used: usize, + packages: Vec, +} + +impl PackageBuilder { + pub fn new(reliability: Reliability, peers: Peers, target: SocketAddr) -> Self { + Self { + reliability, + peers, + target, + buffer: vec![0; MAX_DATAGRAM_SIZE], + used: HEADER_SIZE, + packages: Vec::new(), + } + } + + /// Build output packages from all pushed messages. + /// + /// The messages are distributed among the packages in a sequential order. + /// Each package is filled with as many messages as it can accommodate. + pub fn build(mut self) -> Vec { + if self.used > HEADER_SIZE { + self.build_package(false); + } + self.packages + } + + /// Push another message to the builder so that it is included in one of + /// the resulting packages. + pub fn push(&mut self, message: &E) -> Result<(), EncodeError> + where + E: bincode::Encode, + { + match self.push_inner(message) { + Err(EncodeError::UnexpectedEnd) => { + self.build_package(true); + self.push_inner(message) + } + Err(err) => Err(err), + Ok(()) => Ok(()), + } + } + + fn push_inner(&mut self, message: &E) -> Result<(), EncodeError> + where + E: bincode::Encode, + { + let len = encode_into_slice(message, &mut self.buffer[self.used..], BINCODE_CONF)?; + self.used += len; + Ok(()) + } + + /// Build and store another package from already buffered data. + /// + /// # Arguments + /// + /// * `reusable` - if false, newly created buffer for further messages will + /// be empty. + fn build_package(&mut self, reusable: bool) { + let (mut data, used) = if reusable { + (vec![0; MAX_DATAGRAM_SIZE], HEADER_SIZE) + } else { + (Vec::new(), 0) + }; + + mem::swap(&mut data, &mut self.buffer); + data.truncate(self.used); + self.used = used; + + self.packages.push(OutPackage::new( + data, + self.reliability, + self.peers, + self.target, + )); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_out_message_builder() { + #[derive(bincode::Encode)] + struct TestData { + values: [u64; 16], // up to 128 bytes + } + + let mut builder = PackageBuilder::new( + Reliability::Unordered, + Peers::Players, + "127.0.0.1:1111".parse::().unwrap(), + ); + + for i in 0..10 { + builder + .push(&TestData { + // Use large u64 so that the value cannot be shrunk. + values: [u64::MAX - (i as u64); 16], + }) + .unwrap(); + } + + let packages = builder.build(); + assert_eq!(packages.len(), 4); + // 3 items + something extra for the encoding + assert!(packages[0].data_slice().len() >= 128 * 3); + // less then 4 items + assert!(packages[0].data_slice().len() < 128 * 4); + + assert!(packages[1].data_slice().len() >= 128 * 3); + assert!(packages[1].data_slice().len() < 128 * 4); + assert!(packages[2].data_slice().len() >= 128 * 3); + assert!(packages[2].data_slice().len() < 128 * 4); + // last one contains only one leftover item + assert!(packages[3].data_slice().len() >= 128); + assert!(packages[3].data_slice().len() < 128 * 2); + } +} diff --git a/crates/net/src/tasks/communicator/channels.rs b/crates/net/src/tasks/communicator/channels.rs new file mode 100644 index 00000000..b46040d5 --- /dev/null +++ b/crates/net/src/tasks/communicator/channels.rs @@ -0,0 +1,70 @@ +use std::{net::SocketAddr, ops::Deref}; + +use async_std::channel::{Receiver, Sender}; + +use super::{decode::InPackage, encode::OutPackage}; + +/// Channel into networking stack tasks, used for data sending. +/// +/// The data-sending components of the networking stack are halted when this +/// channel is closed (dropped). +pub struct PackageSender(pub(crate) Sender); + +impl Deref for PackageSender { + type Target = Sender; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Channel into networking stack tasks, used for data receiving. +/// +/// This is based on a bounded queue, so non-receiving of packages can +/// eventually block the networking stack. +/// +/// The data-receiving components of the networking stack are halted when this +/// channel is closed or dropped. +pub struct PackageReceiver(pub(crate) Receiver); + +impl Deref for PackageReceiver { + type Target = Receiver; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Channel into networking stack tasks, used for receiving connection errors. +/// +/// This channel is based on a bounded queue; therefore, the non-receiving of +/// errors can eventually block the networking stack. +/// +/// If the connection errors are not needed, this channel can be safely +/// dropped. Its closure does not stop or block any part of the networking +/// stack. Although it must be dropped for the networking stack to fully +/// terminate. +pub struct ConnErrorReceiver(pub(crate) Receiver); + +impl Deref for ConnErrorReceiver { + type Target = Receiver; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// This error indicates failure to deliver a package to the target. +pub struct ConnectionError { + target: SocketAddr, +} + +impl ConnectionError { + pub(crate) fn new(target: SocketAddr) -> Self { + Self { target } + } + + pub fn target(&self) -> SocketAddr { + self.target + } +} diff --git a/crates/net/src/tasks/communicator/decode.rs b/crates/net/src/tasks/communicator/decode.rs new file mode 100644 index 00000000..5fe78790 --- /dev/null +++ b/crates/net/src/tasks/communicator/decode.rs @@ -0,0 +1,128 @@ +use std::{marker::PhantomData, net::SocketAddr, time::Instant}; + +use bincode::{decode_from_slice, error::DecodeError}; + +use crate::{tasks::communicator::BINCODE_CONF, Peers, Reliability}; + +/// A received message / datagram. +pub struct InPackage { + data: Vec, + reliability: Reliability, + peers: Peers, + source: SocketAddr, + time: Instant, +} + +impl InPackage { + pub(crate) fn new( + data: Vec, + reliability: Reliability, + peers: Peers, + source: SocketAddr, + time: Instant, + ) -> Self { + Self { + data, + reliability, + peers, + source, + time, + } + } + + pub fn data(self) -> Vec { + self.data + } + + /// Interpret the data as a sequence of encoded messages. + pub fn decode(&self) -> MessageDecoder + where + E: bincode::Decode, + { + MessageDecoder { + data: self.data.as_slice(), + offset: 0, + _marker: PhantomData, + } + } + + pub fn reliability(&self) -> Reliability { + self.reliability + } + + pub fn source(&self) -> SocketAddr { + self.source + } + + pub fn peers(&self) -> Peers { + self.peers + } + + /// Package arrival time. + pub fn time(&self) -> Instant { + self.time + } +} + +/// An iterator which decodes binary input data item by item. +pub struct MessageDecoder<'a, E> +where + E: bincode::Decode, +{ + data: &'a [u8], + offset: usize, + _marker: PhantomData, +} + +impl<'a, E> Iterator for MessageDecoder<'a, E> +where + E: bincode::Decode, +{ + type Item = Result; + + fn next(&mut self) -> Option { + if self.offset >= self.data.len() { + return None; + } + + match decode_from_slice(&self.data[self.offset..], BINCODE_CONF) { + Ok((item, len)) => { + self.offset += len; + Some(Ok(item)) + } + Err(err) => Some(Err(err)), + } + } +} + +#[cfg(test)] +mod tests { + use bincode::Decode; + + use super::*; + + #[test] + fn test_decoding() { + #[derive(Decode, Debug, Eq, PartialEq)] + enum Message { + One(u16), + Two([u32; 2]), + } + + let package = InPackage { + // Message::Two([3, 4]), Message::One(1286) + data: vec![1, 3, 4, 0, 251, 5, 6], + reliability: Reliability::Unreliable, + peers: Peers::Players, + source: "127.0.0.1:1111".parse().unwrap(), + time: Instant::now(), + }; + + let mut items: MessageDecoder = package.decode(); + let first = items.next().unwrap().unwrap(); + assert_eq!(first, Message::Two([3, 4])); + let second = items.next().unwrap().unwrap(); + assert_eq!(second, Message::One(1286)); + assert!(items.next().is_none()); + } +} diff --git a/crates/net/src/tasks/communicator/encode.rs b/crates/net/src/tasks/communicator/encode.rs new file mode 100644 index 00000000..4d51186d --- /dev/null +++ b/crates/net/src/tasks/communicator/encode.rs @@ -0,0 +1,111 @@ +use std::net::SocketAddr; + +use bincode::{encode_into_std_write, error::EncodeError}; + +use crate::{ + header::{Peers, Reliability, HEADER_SIZE}, + protocol::MAX_PACKAGE_SIZE, + tasks::communicator::BINCODE_CONF, + MAX_DATAGRAM_SIZE, +}; + +/// A package to be send. +pub struct OutPackage { + /// First [`HEADER_SIZE`] bytes are reserved for the header. Payload must + /// follow. + data: Vec, + reliability: Reliability, + peers: Peers, + target: SocketAddr, +} + +impl OutPackage { + /// Creates a package from a single message. + pub fn encode_single( + message: &E, + reliability: Reliability, + peers: Peers, + target: SocketAddr, + ) -> Result + where + E: bincode::Encode, + { + let mut data = Vec::with_capacity(HEADER_SIZE + 1); + data.extend([0; HEADER_SIZE]); + encode_into_std_write(message, &mut data, BINCODE_CONF)?; + Ok(Self::new(data, reliability, peers, target)) + } + + /// # Panics + /// + /// If `data` is longer than [`MAX_PACKAGE_SIZE`]. + pub fn from_slice( + data: &[u8], + reliability: Reliability, + peers: Peers, + target: SocketAddr, + ) -> Self { + assert!(data.len() <= MAX_PACKAGE_SIZE); + + let mut full_data = Vec::with_capacity(HEADER_SIZE + data.len()); + full_data.extend([0; HEADER_SIZE]); + full_data.extend(data); + Self::new(full_data, reliability, peers, target) + } + + /// # Arguments + /// + /// * `data` - data to be send. The message data must start exactly at + /// [`HEADER_SIZE`]. The initial bytes are reserved for the header. The + /// header is not filled by the caller. + /// + /// * `reliability` - package delivery reliability mode. + /// + /// * `target` - package recipient. + /// + /// # Panics + /// + /// * If data length is smaller or equal to header size.. + /// + /// * If data is longer than [`MAX_DATAGRAM_SIZE`]. + pub(super) fn new( + data: Vec, + reliability: Reliability, + peers: Peers, + target: SocketAddr, + ) -> Self { + assert!(data.len() > HEADER_SIZE); + assert!(data.len() <= MAX_DATAGRAM_SIZE); + Self { + data, + reliability, + peers, + target, + } + } + + /// Returns package data. + /// + /// The data start at [`HEADER_SIZE`] so that header may be written + /// to the beginning of the vector. + pub(crate) fn data(self) -> Vec { + self.data + } + + /// Returns slice to the payload part (without header) of the data. + pub(crate) fn data_slice(&self) -> &[u8] { + &self.data[HEADER_SIZE..] + } + + pub(crate) fn reliability(&self) -> Reliability { + self.reliability + } + + pub(crate) fn peers(&self) -> Peers { + self.peers + } + + pub(crate) fn target(&self) -> SocketAddr { + self.target + } +} diff --git a/crates/net/src/tasks/communicator/mod.rs b/crates/net/src/tasks/communicator/mod.rs new file mode 100644 index 00000000..6586b0e6 --- /dev/null +++ b/crates/net/src/tasks/communicator/mod.rs @@ -0,0 +1,18 @@ +use bincode::config::{BigEndian, Configuration, Limit, Varint}; +pub use builder::PackageBuilder; +pub use channels::{ConnErrorReceiver, ConnectionError, PackageReceiver, PackageSender}; +pub use decode::{InPackage, MessageDecoder}; +pub use encode::OutPackage; + +use crate::protocol::MAX_PACKAGE_SIZE; + +mod builder; +mod channels; +mod decode; +mod encode; + +const BINCODE_CONF: Configuration> = + bincode::config::standard() + .with_big_endian() + .with_variable_int_encoding() + .with_limit::(); diff --git a/crates/net/src/tasks/usender.rs b/crates/net/src/tasks/usender.rs index 28876da5..42627a8d 100644 --- a/crates/net/src/tasks/usender.rs +++ b/crates/net/src/tasks/usender.rs @@ -28,9 +28,10 @@ pub(super) async fn run( }; let time = Instant::now(); + let target = package.target(); let package_id = if package.reliability().is_reliable() { - dispatch_handler.next_package_id(time, package.target).await + dispatch_handler.next_package_id(time, target).await } else { counter_unreliable.next().unwrap() }; @@ -40,11 +41,10 @@ pub(super) async fn run( if package_header.reliability().is_reliable() { dispatch_handler - .sent(time, package.target, package_header, package.data_slice()) + .sent(time, target, package_header, package.data_slice()) .await; } - let target = package.target; let closed = datagrams .send(OutDatagram::new(header, package.data(), target)) .await