From 6ccc48cef063ba9f09fca32770ef089ad54888cf Mon Sep 17 00:00:00 2001 From: Martin Indra Date: Sun, 27 Aug 2023 12:29:11 +0200 Subject: [PATCH] de_net: Remove an unnecessary data copy --- crates/connector/src/game/preceiver.rs | 4 +- .../net/src/connection/delivery/confirms.rs | 4 +- crates/net/src/connection/dispatch/mod.rs | 4 +- crates/net/src/protocol.rs | 11 +- crates/net/src/tasks/communicator.rs | 107 +++++++++++++----- crates/net/src/tasks/dsender.rs | 48 ++++++-- crates/net/src/tasks/usender.rs | 5 +- 7 files changed, 130 insertions(+), 53 deletions(-) diff --git a/crates/connector/src/game/preceiver.rs b/crates/connector/src/game/preceiver.rs index b0a25be4..0968f74b 100644 --- a/crates/connector/src/game/preceiver.rs +++ b/crates/connector/src/game/preceiver.rs @@ -68,8 +68,8 @@ pub(super) async fn run( for target in state.targets(Some(package.source)).await { let result = outputs - .send(OutPackage::new( - package.data.clone(), + .send(OutPackage::from_slice( + &package.data, package.reliability, Peers::Players, target, diff --git a/crates/net/src/connection/delivery/confirms.rs b/crates/net/src/connection/delivery/confirms.rs index 9b9912dd..f6860439 100644 --- a/crates/net/src/connection/delivery/confirms.rs +++ b/crates/net/src/connection/delivery/confirms.rs @@ -81,9 +81,9 @@ impl ConfirmsBuffer { if force || expiration <= time || self.full() { while let Some(data) = self.flush(MAX_PACKAGE_SIZE) { datagrams - .send(OutDatagram::new( + .send(OutDatagram::from_slice( DatagramHeader::Confirmation, - data.to_vec(), + data, addr, )) .await?; diff --git a/crates/net/src/connection/dispatch/mod.rs b/crates/net/src/connection/dispatch/mod.rs index 7375c547..881dd194 100644 --- a/crates/net/src/connection/dispatch/mod.rs +++ b/crates/net/src/connection/dispatch/mod.rs @@ -86,9 +86,9 @@ impl DispatchHandler { match handler.resends.reschedule(buf, time) { RescheduleResult::Resend { len, header } => { datagrams - .send(OutDatagram::new( + .send(OutDatagram::from_slice( DatagramHeader::Package(header), - buf[..len].to_vec(), + &buf[..len], addr, )) .await?; diff --git a/crates/net/src/protocol.rs b/crates/net/src/protocol.rs index 8d9c7698..73d381e7 100644 --- a/crates/net/src/protocol.rs +++ b/crates/net/src/protocol.rs @@ -32,7 +32,8 @@ impl ProtocolSocket { /// /// # Arguments /// - /// * `buf` - binary data buffer used during datagram construction. + /// * `buf` - buffer used for datagram construction. First [`HEADER_SIZE`] + /// bytes are overwritten with the header. Payload bytes must follow. /// /// * `header` - header of the datagram. /// @@ -41,16 +42,10 @@ impl ProtocolSocket { /// * `target` - recipient of the datagram. pub(crate) async fn send( &self, - buf: &mut [u8], header: DatagramHeader, - data: &[u8], + buf: &mut [u8], target: SocketAddr, ) -> Result<(), SendError> { - let len = HEADER_SIZE + data.len(); - assert!(buf.len() >= len); - let buf = &mut buf[..len]; - buf[HEADER_SIZE..len].copy_from_slice(data); - trace!("Going to send datagram {}", header); header.write(buf); self.socket.send(target, buf).await?; diff --git a/crates/net/src/tasks/communicator.rs b/crates/net/src/tasks/communicator.rs index c87b1ee8..a7eac375 100644 --- a/crates/net/src/tasks/communicator.rs +++ b/crates/net/src/tasks/communicator.rs @@ -3,13 +3,14 @@ use std::{marker::PhantomData, mem, net::SocketAddr, ops::Deref, time::Instant}; use async_std::channel::{Receiver, Sender}; use bincode::{ config::{BigEndian, Configuration, Limit, Varint}, - decode_from_slice, encode_into_slice, encode_to_vec, + decode_from_slice, encode_into_slice, encode_into_std_write, error::{DecodeError, EncodeError}, }; use crate::{ - header::{Peers, Reliability}, + header::{Peers, Reliability, HEADER_SIZE}, protocol::MAX_PACKAGE_SIZE, + MAX_DATAGRAM_SIZE, }; const BINCODE_CONF: Configuration> = @@ -34,8 +35,8 @@ impl PackageBuilder { reliability, peers, target, - buffer: vec![0; MAX_PACKAGE_SIZE], - used: 0, + buffer: vec![0; MAX_DATAGRAM_SIZE], + used: HEADER_SIZE, packages: Vec::new(), } } @@ -45,15 +46,10 @@ impl PackageBuilder { /// The messages are distributed among the packages in a sequential order. /// Each package is filled with as many messages as it can accommodate. pub fn build(mut self) -> Vec { - let mut packages = self.packages; - - if self.used > 0 { - self.buffer.truncate(self.used); - let package = OutPackage::new(self.buffer, self.reliability, self.peers, self.target); - packages.push(package); + if self.used > HEADER_SIZE { + self.build_package(false); } - - packages + self.packages } /// Push another message to the builder so that it is included in one of @@ -64,14 +60,7 @@ impl PackageBuilder { { match self.push_inner(message) { Err(EncodeError::UnexpectedEnd) => { - let mut data = vec![0; MAX_PACKAGE_SIZE]; - mem::swap(&mut data, &mut self.buffer); - data.truncate(self.used); - self.used = 0; - - let package = OutPackage::new(data, self.reliability, self.peers, self.target); - self.packages.push(package); - + self.build_package(true); self.push_inner(message) } Err(err) => Err(err), @@ -87,11 +76,38 @@ impl PackageBuilder { self.used += len; Ok(()) } + + /// Build and store another package from already buffered data. + /// + /// # Arguments + /// + /// * `reusable` - if false, newly created buffer for further messages will + /// be empty. + fn build_package(&mut self, reusable: bool) { + let (mut data, used) = if reusable { + (vec![0; MAX_DATAGRAM_SIZE], HEADER_SIZE) + } else { + (Vec::new(), 0) + }; + + mem::swap(&mut data, &mut self.buffer); + data.truncate(self.used); + self.used = used; + + self.packages.push(OutPackage::new( + data, + self.reliability, + self.peers, + self.target, + )); + } } /// A package to be send. pub struct OutPackage { - pub(super) data: Vec, + /// First [`HEADER_SIZE`] bytes are reserved for the header. Payload must + /// follow. + data: Vec, reliability: Reliability, peers: Peers, pub(super) target: SocketAddr, @@ -99,8 +115,6 @@ pub struct OutPackage { impl OutPackage { /// Creates a package from a single message. - /// - /// See also [`Self::new`]. pub fn encode_single( message: &E, reliability: Reliability, @@ -110,13 +124,34 @@ impl OutPackage { where E: bincode::Encode, { - let data = encode_to_vec(message, BINCODE_CONF)?; + let mut data = Vec::with_capacity(HEADER_SIZE + 1); + data.extend([0; HEADER_SIZE]); + encode_into_std_write(message, &mut data, BINCODE_CONF)?; Ok(Self::new(data, reliability, peers, target)) } + /// # Panics + /// + /// If `data` is longer than [`MAX_PACKAGE_SIZE`]. + pub fn from_slice( + data: &[u8], + reliability: Reliability, + peers: Peers, + target: SocketAddr, + ) -> Self { + assert!(data.len() <= MAX_PACKAGE_SIZE); + + let mut full_data = Vec::with_capacity(HEADER_SIZE + data.len()); + full_data.extend([0; HEADER_SIZE]); + full_data.extend(data); + Self::new(full_data, reliability, peers, target) + } + /// # Arguments /// - /// * `data` - data to be send. + /// * `data` - data to be send. The message data must start exactly at + /// [`HEADER_SIZE`]. The initial bytes are reserved for the header. The + /// header is not filled by the caller. /// /// * `reliability` - package delivery reliability mode. /// @@ -124,9 +159,12 @@ impl OutPackage { /// /// # Panics /// - /// Panics if data is longer than [`MAX_PACKAGE_SIZE`]. - pub fn new(data: Vec, reliability: Reliability, peers: Peers, target: SocketAddr) -> Self { - assert!(data.len() < MAX_PACKAGE_SIZE); + /// * If data length is smaller or equal to header size.. + /// + /// * If data is longer than [`MAX_DATAGRAM_SIZE`]. + fn new(data: Vec, reliability: Reliability, peers: Peers, target: SocketAddr) -> Self { + assert!(data.len() > HEADER_SIZE); + assert!(data.len() <= MAX_DATAGRAM_SIZE); Self { data, reliability, @@ -135,6 +173,19 @@ impl OutPackage { } } + /// Returns package data. + /// + /// The data start at [`HEADER_SIZE`] so that header may be written + /// to the beginning of the vector. + pub(super) fn data(self) -> Vec { + self.data + } + + /// Returns slice to the payload part (without header) of the data. + pub(super) fn data_slice(&self) -> &[u8] { + &self.data[HEADER_SIZE..] + } + pub(super) fn reliability(&self) -> Reliability { self.reliability } diff --git a/crates/net/src/tasks/dsender.rs b/crates/net/src/tasks/dsender.rs index 453ff71e..b6429ccd 100644 --- a/crates/net/src/tasks/dsender.rs +++ b/crates/net/src/tasks/dsender.rs @@ -3,7 +3,11 @@ use std::net::SocketAddr; use async_std::channel::Receiver; use tracing::{error, info}; -use crate::{header::DatagramHeader, protocol::ProtocolSocket, MAX_DATAGRAM_SIZE}; +use crate::{ + header::{DatagramHeader, HEADER_SIZE}, + protocol::ProtocolSocket, + MAX_DATAGRAM_SIZE, MAX_PACKAGE_SIZE, +}; pub(crate) struct OutDatagram { header: DatagramHeader, @@ -12,7 +16,39 @@ pub(crate) struct OutDatagram { } impl OutDatagram { + /// # Panics + /// + /// * If `data` is empty. + /// + /// * If `data` is larger than [`MAX_PACKAGE_SIZE`]. + pub(crate) fn from_slice(header: DatagramHeader, data: &[u8], target: SocketAddr) -> Self { + assert!(!data.is_empty()); + assert!(data.len() <= MAX_PACKAGE_SIZE); + + let mut full_data = Vec::with_capacity(HEADER_SIZE + data.len()); + full_data.extend([0; HEADER_SIZE]); + full_data.extend(data); + Self::new(header, full_data, target) + } + + /// # Argument + /// + /// * `header` + /// + /// * `data` - data of the datagram. First [`HEADER_SIZE`] is reserved for + /// to-be-written header. + /// + /// * `target` - datagram recipient. + /// + /// # Panics + /// + /// * If `data` length is smaller or equal to [`HEADER_SIZE`]. + /// + /// * If `data` is larger than [`MAX_DATAGRAM_SIZE`]. pub(crate) fn new(header: DatagramHeader, data: Vec, target: SocketAddr) -> Self { + assert!(data.len() > HEADER_SIZE); + assert!(data.len() <= MAX_DATAGRAM_SIZE); + Self { header, data, @@ -23,19 +59,13 @@ impl OutDatagram { pub(super) async fn run(port: u16, datagrams: Receiver, socket: ProtocolSocket) { info!("Starting datagram sender on port {port}..."); - let mut buffer = [0u8; MAX_DATAGRAM_SIZE]; loop { - let Ok(datagram) = datagrams.recv().await else { + let Ok(mut datagram) = datagrams.recv().await else { break; }; if let Err(err) = socket - .send( - &mut buffer, - datagram.header, - &datagram.data, - datagram.target, - ) + .send(datagram.header, &mut datagram.data, datagram.target) .await { error!("Error while sending a datagram: {err:?}"); diff --git a/crates/net/src/tasks/usender.rs b/crates/net/src/tasks/usender.rs index 4937b3d3..28876da5 100644 --- a/crates/net/src/tasks/usender.rs +++ b/crates/net/src/tasks/usender.rs @@ -40,12 +40,13 @@ pub(super) async fn run( if package_header.reliability().is_reliable() { dispatch_handler - .sent(time, package.target, package_header, &package.data) + .sent(time, package.target, package_header, package.data_slice()) .await; } + let target = package.target; let closed = datagrams - .send(OutDatagram::new(header, package.data, package.target)) + .send(OutDatagram::new(header, package.data(), target)) .await .is_err();