Skip to content

Commit

Permalink
de_net: Remove an unnecessary data copy
Browse files Browse the repository at this point in the history
  • Loading branch information
Indy2222 committed Aug 27, 2023
1 parent 9a3b060 commit 6ccc48c
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 53 deletions.
4 changes: 2 additions & 2 deletions crates/connector/src/game/preceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ pub(super) async fn run(

for target in state.targets(Some(package.source)).await {
let result = outputs
.send(OutPackage::new(
package.data.clone(),
.send(OutPackage::from_slice(
&package.data,
package.reliability,
Peers::Players,
target,
Expand Down
4 changes: 2 additions & 2 deletions crates/net/src/connection/delivery/confirms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ impl ConfirmsBuffer {
if force || expiration <= time || self.full() {
while let Some(data) = self.flush(MAX_PACKAGE_SIZE) {
datagrams
.send(OutDatagram::new(
.send(OutDatagram::from_slice(
DatagramHeader::Confirmation,
data.to_vec(),
data,
addr,
))
.await?;
Expand Down
4 changes: 2 additions & 2 deletions crates/net/src/connection/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ impl DispatchHandler {
match handler.resends.reschedule(buf, time) {
RescheduleResult::Resend { len, header } => {
datagrams
.send(OutDatagram::new(
.send(OutDatagram::from_slice(
DatagramHeader::Package(header),
buf[..len].to_vec(),
&buf[..len],
addr,
))
.await?;
Expand Down
11 changes: 3 additions & 8 deletions crates/net/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ impl ProtocolSocket {
///
/// # Arguments
///
/// * `buf` - binary data buffer used during datagram construction.
/// * `buf` - buffer used for datagram construction. First [`HEADER_SIZE`]
/// bytes are overwritten with the header. Payload bytes must follow.
///
/// * `header` - header of the datagram.
///
Expand All @@ -41,16 +42,10 @@ impl ProtocolSocket {
/// * `target` - recipient of the datagram.
pub(crate) async fn send(
&self,
buf: &mut [u8],
header: DatagramHeader,
data: &[u8],
buf: &mut [u8],
target: SocketAddr,
) -> Result<(), SendError> {
let len = HEADER_SIZE + data.len();
assert!(buf.len() >= len);
let buf = &mut buf[..len];
buf[HEADER_SIZE..len].copy_from_slice(data);

trace!("Going to send datagram {}", header);
header.write(buf);
self.socket.send(target, buf).await?;
Expand Down
107 changes: 79 additions & 28 deletions crates/net/src/tasks/communicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::{marker::PhantomData, mem, net::SocketAddr, ops::Deref, time::Instant};
use async_std::channel::{Receiver, Sender};
use bincode::{
config::{BigEndian, Configuration, Limit, Varint},
decode_from_slice, encode_into_slice, encode_to_vec,
decode_from_slice, encode_into_slice, encode_into_std_write,
error::{DecodeError, EncodeError},
};

use crate::{
header::{Peers, Reliability},
header::{Peers, Reliability, HEADER_SIZE},
protocol::MAX_PACKAGE_SIZE,
MAX_DATAGRAM_SIZE,
};

const BINCODE_CONF: Configuration<BigEndian, Varint, Limit<MAX_PACKAGE_SIZE>> =
Expand All @@ -34,8 +35,8 @@ impl PackageBuilder {
reliability,
peers,
target,
buffer: vec![0; MAX_PACKAGE_SIZE],
used: 0,
buffer: vec![0; MAX_DATAGRAM_SIZE],
used: HEADER_SIZE,
packages: Vec::new(),
}
}
Expand All @@ -45,15 +46,10 @@ impl PackageBuilder {
/// The messages are distributed among the packages in a sequential order.
/// Each package is filled with as many messages as it can accommodate.
pub fn build(mut self) -> Vec<OutPackage> {
let mut packages = self.packages;

if self.used > 0 {
self.buffer.truncate(self.used);
let package = OutPackage::new(self.buffer, self.reliability, self.peers, self.target);
packages.push(package);
if self.used > HEADER_SIZE {
self.build_package(false);
}

packages
self.packages
}

/// Push another message to the builder so that it is included in one of
Expand All @@ -64,14 +60,7 @@ impl PackageBuilder {
{
match self.push_inner(message) {
Err(EncodeError::UnexpectedEnd) => {
let mut data = vec![0; MAX_PACKAGE_SIZE];
mem::swap(&mut data, &mut self.buffer);
data.truncate(self.used);
self.used = 0;

let package = OutPackage::new(data, self.reliability, self.peers, self.target);
self.packages.push(package);

self.build_package(true);
self.push_inner(message)
}
Err(err) => Err(err),
Expand All @@ -87,20 +76,45 @@ impl PackageBuilder {
self.used += len;
Ok(())
}

/// Build and store another package from already buffered data.
///
/// # Arguments
///
/// * `reusable` - if false, newly created buffer for further messages will
/// be empty.
fn build_package(&mut self, reusable: bool) {
let (mut data, used) = if reusable {
(vec![0; MAX_DATAGRAM_SIZE], HEADER_SIZE)
} else {
(Vec::new(), 0)
};

mem::swap(&mut data, &mut self.buffer);
data.truncate(self.used);
self.used = used;

self.packages.push(OutPackage::new(
data,
self.reliability,
self.peers,
self.target,
));
}
}

/// A package to be send.
pub struct OutPackage {
pub(super) data: Vec<u8>,
/// First [`HEADER_SIZE`] bytes are reserved for the header. Payload must
/// follow.
data: Vec<u8>,
reliability: Reliability,
peers: Peers,
pub(super) target: SocketAddr,
}

impl OutPackage {
/// Creates a package from a single message.
///
/// See also [`Self::new`].
pub fn encode_single<E>(
message: &E,
reliability: Reliability,
Expand All @@ -110,23 +124,47 @@ impl OutPackage {
where
E: bincode::Encode,
{
let data = encode_to_vec(message, BINCODE_CONF)?;
let mut data = Vec::with_capacity(HEADER_SIZE + 1);
data.extend([0; HEADER_SIZE]);
encode_into_std_write(message, &mut data, BINCODE_CONF)?;
Ok(Self::new(data, reliability, peers, target))
}

/// # Panics
///
/// If `data` is longer than [`MAX_PACKAGE_SIZE`].
pub fn from_slice(
data: &[u8],
reliability: Reliability,
peers: Peers,
target: SocketAddr,
) -> Self {
assert!(data.len() <= MAX_PACKAGE_SIZE);

let mut full_data = Vec::with_capacity(HEADER_SIZE + data.len());
full_data.extend([0; HEADER_SIZE]);
full_data.extend(data);
Self::new(full_data, reliability, peers, target)
}

/// # Arguments
///
/// * `data` - data to be send.
/// * `data` - data to be send. The message data must start exactly at
/// [`HEADER_SIZE`]. The initial bytes are reserved for the header. The
/// header is not filled by the caller.
///
/// * `reliability` - package delivery reliability mode.
///
/// * `target` - package recipient.
///
/// # Panics
///
/// Panics if data is longer than [`MAX_PACKAGE_SIZE`].
pub fn new(data: Vec<u8>, reliability: Reliability, peers: Peers, target: SocketAddr) -> Self {
assert!(data.len() < MAX_PACKAGE_SIZE);
/// * If data length is smaller or equal to header size..
///
/// * If data is longer than [`MAX_DATAGRAM_SIZE`].
fn new(data: Vec<u8>, reliability: Reliability, peers: Peers, target: SocketAddr) -> Self {
assert!(data.len() > HEADER_SIZE);
assert!(data.len() <= MAX_DATAGRAM_SIZE);
Self {
data,
reliability,
Expand All @@ -135,6 +173,19 @@ impl OutPackage {
}
}

/// Returns package data.
///
/// The data start at [`HEADER_SIZE`] so that header may be written
/// to the beginning of the vector.
pub(super) fn data(self) -> Vec<u8> {
self.data
}

/// Returns slice to the payload part (without header) of the data.
pub(super) fn data_slice(&self) -> &[u8] {
&self.data[HEADER_SIZE..]
}

pub(super) fn reliability(&self) -> Reliability {
self.reliability
}
Expand Down
48 changes: 39 additions & 9 deletions crates/net/src/tasks/dsender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ use std::net::SocketAddr;
use async_std::channel::Receiver;
use tracing::{error, info};

use crate::{header::DatagramHeader, protocol::ProtocolSocket, MAX_DATAGRAM_SIZE};
use crate::{
header::{DatagramHeader, HEADER_SIZE},
protocol::ProtocolSocket,
MAX_DATAGRAM_SIZE, MAX_PACKAGE_SIZE,
};

pub(crate) struct OutDatagram {
header: DatagramHeader,
Expand All @@ -12,7 +16,39 @@ pub(crate) struct OutDatagram {
}

impl OutDatagram {
/// # Panics
///
/// * If `data` is empty.
///
/// * If `data` is larger than [`MAX_PACKAGE_SIZE`].
pub(crate) fn from_slice(header: DatagramHeader, data: &[u8], target: SocketAddr) -> Self {
assert!(!data.is_empty());
assert!(data.len() <= MAX_PACKAGE_SIZE);

let mut full_data = Vec::with_capacity(HEADER_SIZE + data.len());
full_data.extend([0; HEADER_SIZE]);
full_data.extend(data);
Self::new(header, full_data, target)
}

/// # Argument
///
/// * `header`
///
/// * `data` - data of the datagram. First [`HEADER_SIZE`] is reserved for
/// to-be-written header.
///
/// * `target` - datagram recipient.
///
/// # Panics
///
/// * If `data` length is smaller or equal to [`HEADER_SIZE`].
///
/// * If `data` is larger than [`MAX_DATAGRAM_SIZE`].
pub(crate) fn new(header: DatagramHeader, data: Vec<u8>, target: SocketAddr) -> Self {
assert!(data.len() > HEADER_SIZE);
assert!(data.len() <= MAX_DATAGRAM_SIZE);

Self {
header,
data,
Expand All @@ -23,19 +59,13 @@ impl OutDatagram {

pub(super) async fn run(port: u16, datagrams: Receiver<OutDatagram>, socket: ProtocolSocket) {
info!("Starting datagram sender on port {port}...");
let mut buffer = [0u8; MAX_DATAGRAM_SIZE];

loop {
let Ok(datagram) = datagrams.recv().await else {
let Ok(mut datagram) = datagrams.recv().await else {
break;
};
if let Err(err) = socket
.send(
&mut buffer,
datagram.header,
&datagram.data,
datagram.target,
)
.send(datagram.header, &mut datagram.data, datagram.target)
.await
{
error!("Error while sending a datagram: {err:?}");
Expand Down
5 changes: 3 additions & 2 deletions crates/net/src/tasks/usender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ pub(super) async fn run(

if package_header.reliability().is_reliable() {
dispatch_handler
.sent(time, package.target, package_header, &package.data)
.sent(time, package.target, package_header, package.data_slice())
.await;
}

let target = package.target;
let closed = datagrams
.send(OutDatagram::new(header, package.data, package.target))
.send(OutDatagram::new(header, package.data(), target))
.await
.is_err();

Expand Down

0 comments on commit 6ccc48c

Please sign in to comment.