diff --git a/crates/connector/src/game/ereceiver.rs b/crates/connector/src/game/ereceiver.rs index 5a608e03..207b56d2 100644 --- a/crates/connector/src/game/ereceiver.rs +++ b/crates/connector/src/game/ereceiver.rs @@ -2,7 +2,7 @@ use std::time::Duration; use async_std::{channel::Sender, future::timeout}; use de_messages::ToGame; -use de_net::ConnErrorReceiver; +use de_net::{ConnErrorReceiver, Reliability}; use tracing::{error, info, warn}; use super::greceiver::ToGameMessage; @@ -26,7 +26,11 @@ pub(super) async fn run(port: u16, errors: ConnErrorReceiver, server: Sender Self { + pub(super) fn new(source: SocketAddr, reliability: Reliability, message: ToGame) -> Self { Self { - meta: MessageMeta { source, reliable }, + meta: MessageMeta { + source, + reliability, + }, message, } } @@ -27,7 +30,7 @@ impl ToGameMessage { struct MessageMeta { source: SocketAddr, - reliable: bool, + reliability: Reliability, } pub(super) struct GameProcessor { @@ -142,7 +145,7 @@ impl GameProcessor { .send( OutPackage::encode_single( &FromGame::NotJoined, - message.meta.reliable, + message.meta.reliability, Peers::Server, message.meta.source, ) @@ -159,7 +162,7 @@ impl GameProcessor { .send( OutPackage::encode_single( &FromGame::Pong(id), - meta.reliable, + meta.reliability, Peers::Server, meta.source, ) @@ -172,8 +175,12 @@ impl GameProcessor { async fn process_join(&mut self, meta: MessageMeta) { if let Err(err) = self.clients.reserve(meta.source).await { warn!("Join request error: {err}"); - self.send(&FromGame::JoinError(JoinError::DifferentGame), meta.source) - .await; + self.send( + &FromGame::JoinError(JoinError::DifferentGame), + Reliability::Unordered, + meta.source, + ) + .await; return; } @@ -191,8 +198,12 @@ impl GameProcessor { meta.source, self.port ); - self.send(&FromGame::JoinError(JoinError::AlreadyJoined), meta.source) - .await; + self.send( + &FromGame::JoinError(JoinError::AlreadyJoined), + Reliability::Unordered, + meta.source, + ) + .await; } JoinErrorInner::GameFull => { warn!( @@ -200,8 +211,12 @@ impl GameProcessor { meta.source, self.port ); - self.send(&FromGame::JoinError(JoinError::GameFull), meta.source) - .await; + self.send( + &FromGame::JoinError(JoinError::GameFull), + Reliability::Unordered, + meta.source, + ) + .await; } JoinErrorInner::GameNotOpened => { warn!( @@ -210,8 +225,12 @@ impl GameProcessor { meta.source, self.port ); - self.send(&FromGame::JoinError(JoinError::GameNotOpened), meta.source) - .await; + self.send( + &FromGame::JoinError(JoinError::GameNotOpened), + Reliability::Unordered, + meta.source, + ) + .await; } } } @@ -224,8 +243,14 @@ impl GameProcessor { "Player {id} on {addr:?} just joined game on port {}.", self.port ); - self.send(&FromGame::Joined(id), addr).await; - self.send_all(&FromGame::PeerJoined(id), Some(addr)).await; + self.send(&FromGame::Joined(id), Reliability::SemiOrdered, addr) + .await; + self.send_all( + &FromGame::PeerJoined(id), + Reliability::SemiOrdered, + Some(addr), + ) + .await; Ok(()) } @@ -243,16 +268,22 @@ impl GameProcessor { meta.source, self.port ); - self.send(&FromGame::Left, meta.source).await; - self.send_all(&FromGame::PeerLeft(id), None).await; + self.send(&FromGame::Left, Reliability::SemiOrdered, meta.source) + .await; + self.send_all(&FromGame::PeerLeft(id), Reliability::SemiOrdered, None) + .await; } async fn process_readiness(&mut self, meta: MessageMeta, readiness: Readiness) { match self.state.update_readiness(meta.source, readiness).await { Ok(progressed) => { if progressed { - self.send_all(&FromGame::GameReadiness(readiness), None) - .await; + self.send_all( + &FromGame::GameReadiness(readiness), + Reliability::SemiOrdered, + None, + ) + .await; } } Err(err) => warn!( @@ -268,23 +299,25 @@ impl GameProcessor { /// /// * `message` - message to be sent. /// + /// * `reliability` - reliability mode for the message. + /// /// * `exclude` - if not None, the message will be delivered to all but /// this player. - async fn send_all(&self, message: &E, exclude: Option) + async fn send_all(&self, message: &E, reliability: Reliability, exclude: Option) where E: bincode::Encode, { for target in self.state.targets(exclude).await { - self.send(message, target).await; + self.send(message, reliability, target).await; } } - /// Send message to some targets. - async fn send(&self, message: &E, target: SocketAddr) + async fn send(&self, message: &E, reliability: Reliability, target: SocketAddr) where E: bincode::Encode, { - let message = OutPackage::encode_single(message, true, Peers::Server, target).unwrap(); + let message = + OutPackage::encode_single(message, reliability, Peers::Server, target).unwrap(); let _ = self.outputs.send(message).await; } } diff --git a/crates/connector/src/game/mreceiver.rs b/crates/connector/src/game/mreceiver.rs index 6897c521..df65afc6 100644 --- a/crates/connector/src/game/mreceiver.rs +++ b/crates/connector/src/game/mreceiver.rs @@ -42,7 +42,7 @@ pub(super) async fn run( let result = server .send(ToGameMessage::new( package.source(), - package.reliable(), + package.reliability(), message, )) .await; @@ -60,7 +60,7 @@ pub(super) async fn run( Peers::Players => { let _ = players .send(PlayersPackage::new( - package.reliable(), + package.reliability(), package.source(), package.data(), )) diff --git a/crates/connector/src/game/preceiver.rs b/crates/connector/src/game/preceiver.rs index 80a09093..b0a25be4 100644 --- a/crates/connector/src/game/preceiver.rs +++ b/crates/connector/src/game/preceiver.rs @@ -2,22 +2,22 @@ use std::net::SocketAddr; use async_std::channel::Receiver; use de_messages::FromGame; -use de_net::{OutPackage, PackageSender, Peers}; +use de_net::{OutPackage, PackageSender, Peers, Reliability}; use tracing::{error, info, warn}; use super::state::GameState; /// A package destined to other players in the game. pub(super) struct PlayersPackage { - reliable: bool, + reliability: Reliability, source: SocketAddr, data: Vec, } impl PlayersPackage { - pub(super) fn new(reliable: bool, source: SocketAddr, data: Vec) -> Self { + pub(super) fn new(reliability: Reliability, source: SocketAddr, data: Vec) -> Self { Self { - reliable, + reliability, source, data, } @@ -56,7 +56,7 @@ pub(super) async fn run( .send( OutPackage::encode_single( &FromGame::NotJoined, - package.reliable, + package.reliability, Peers::Server, package.source, ) @@ -70,7 +70,7 @@ pub(super) async fn run( let result = outputs .send(OutPackage::new( package.data.clone(), - package.reliable, + package.reliability, Peers::Players, target, )) diff --git a/crates/connector/src/server.rs b/crates/connector/src/server.rs index 360d9224..c6163138 100644 --- a/crates/connector/src/server.rs +++ b/crates/connector/src/server.rs @@ -3,7 +3,9 @@ use std::net::SocketAddr; use anyhow::Context; use async_std::task; use de_messages::{FromServer, GameOpenError, ToServer}; -use de_net::{self, MessageDecoder, OutPackage, PackageReceiver, PackageSender, Peers, Socket}; +use de_net::{ + self, MessageDecoder, OutPackage, PackageReceiver, PackageSender, Peers, Reliability, Socket, +}; use tracing::{error, info, warn}; use crate::{clients::Clients, game}; @@ -102,7 +104,10 @@ impl MainServer { async fn reply(&mut self, message: &FromServer, target: SocketAddr) -> anyhow::Result<()> { self.outputs - .send(OutPackage::encode_single(message, true, Peers::Server, target).unwrap()) + .send( + OutPackage::encode_single(message, Reliability::Unordered, Peers::Server, target) + .unwrap(), + ) .await .context("Failed to send a reply") } diff --git a/crates/connector/tests/commands.rs b/crates/connector/tests/commands.rs index d00d52a2..c58b7047 100644 --- a/crates/connector/tests/commands.rs +++ b/crates/connector/tests/commands.rs @@ -5,7 +5,9 @@ use std::{ use async_std::{future::timeout, task}; use de_messages::{FromGame, FromServer, JoinError, Readiness, ToGame, ToServer}; -use de_net::{self, ConnErrorReceiver, OutPackage, PackageReceiver, PackageSender, Peers, Socket}; +use de_net::{ + self, ConnErrorReceiver, OutPackage, PackageReceiver, PackageSender, Peers, Reliability, Socket, +}; use ntest::timeout; use crate::common::{spawn_and_wait, term_and_wait}; @@ -161,7 +163,9 @@ impl Comms { E: bincode::Encode, { let addr = SocketAddr::new(self.host, self.port); - let package = OutPackage::encode_single(&message, true, Peers::Server, addr).unwrap(); + let package = + OutPackage::encode_single(&message, Reliability::SemiOrdered, Peers::Server, addr) + .unwrap(); self.sender.send(package).await.unwrap(); } diff --git a/crates/connector/tests/network.rs b/crates/connector/tests/network.rs index 94c0188c..f89587dc 100644 --- a/crates/connector/tests/network.rs +++ b/crates/connector/tests/network.rs @@ -4,7 +4,7 @@ use std::{ }; use async_std::{prelude::FutureExt, task}; -use de_net::Socket; +use de_net::{Reliability, Socket}; use futures::join; use ntest::timeout; @@ -34,10 +34,14 @@ impl ReceivedBuffer { ); } - fn find_id(&self, filter_reliable: bool, filter_data: &[u8]) -> Option { + fn find_id(&self, filter_reliability: Reliability, filter_data: &[u8]) -> Option { self.0.iter().find_map(|incomming| match incomming { - Incomming::Data { reliable, id, data } => { - if *reliable == filter_reliable && data == filter_data { + Incomming::Data { + reliability, + id, + data, + } => { + if *reliability == filter_reliability && data == filter_data { Some(*id) } else { None @@ -68,7 +72,16 @@ impl ReceivedBuffer { self.0.push(Incomming::Confirm(id)); } } else { - let reliable = buf[0] & 64 > 0; + let reliability = (buf[0] & 96) >> 5; + let reliability = if reliability == 0 { + Reliability::Unreliable + } else if reliability == 1 { + Reliability::Unordered + } else if reliability == 2 { + Reliability::SemiOrdered + } else { + panic!("Invalid reliability bits"); + }; id_bytes[0] = 0; id_bytes[1] = buf[1]; @@ -77,7 +90,7 @@ impl ReceivedBuffer { let id = u32::from_be_bytes(id_bytes); self.0.push(Incomming::Data { - reliable, + reliability, id, data: buf[4..n].to_vec(), }); @@ -89,7 +102,7 @@ impl ReceivedBuffer { enum Incomming { Confirm(u32), Data { - reliable: bool, + reliability: Reliability, id: u32, data: Vec, }, @@ -110,17 +123,22 @@ fn test() { received.load(&mut client, &mut buffer).await; // [5, 2] -> FromGame::PeerJoined(1) - let id = received.find_id(true, &[5, 2]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::SemiOrdered, &[5, 2]) + .unwrap() + .to_be_bytes(); // And send a confirmation client .send(server, &[128, 0, 0, 0, id[1], id[2], id[3]]) .await .unwrap(); - let first_id = received.find_id(true, &[5, 6, 7, 8]).unwrap(); + let first_id = received + .find_id(Reliability::Unordered, &[5, 6, 7, 8]) + .unwrap(); let mut data = [22; 412]; - data[0] = 64; // Reliable + data[0] = 32; // Unordered data[1] = 0; data[2] = 0; data[3] = 22; @@ -130,7 +148,9 @@ fn test() { received.load(&mut client, &mut buffer).await; received.load(&mut client, &mut buffer).await; received.assert_confirmed(22); - received.find_id(false, &[82, 83, 84]).unwrap(); + received + .find_id(Reliability::Unreliable, &[82, 83, 84]) + .unwrap(); // Try to send invalid data -- wrong header client @@ -146,10 +166,20 @@ fn test() { // Two retries before we confirm. let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; - assert_eq!(received.find_id(true, &[5, 6, 7, 8]).unwrap(), first_id); + assert_eq!( + received + .find_id(Reliability::Unordered, &[5, 6, 7, 8]) + .unwrap(), + first_id + ); let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; - assert_eq!(received.find_id(true, &[5, 6, 7, 8]).unwrap(), first_id); + assert_eq!( + received + .find_id(Reliability::Unordered, &[5, 6, 7, 8]) + .unwrap(), + first_id + ); let id = first_id.to_be_bytes(); // And send a confirmation @@ -158,8 +188,8 @@ fn test() { .await .unwrap(); - client.send(server, &[64, 0, 0, 92, 16]).await.unwrap(); - client.send(server, &[64, 0, 0, 86, 23]).await.unwrap(); + client.send(server, &[32, 0, 0, 92, 16]).await.unwrap(); + client.send(server, &[32, 0, 0, 86, 23]).await.unwrap(); let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; received.assert_confirmed(92); @@ -179,8 +209,8 @@ fn test() { let mut buffer = [0u8; 1024]; client - // Reliable - .send(server, &[64, 0, 0, 14, 5, 6, 7, 8]) + // unordered + .send(server, &[32, 0, 0, 14, 5, 6, 7, 8]) .await .unwrap(); @@ -188,7 +218,10 @@ fn test() { received.load(&mut client, &mut buffer).await; received.load(&mut client, &mut buffer).await; received.assert_confirmed(14); - let id = received.find_id(true, &[22; 408]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::Unordered, &[22; 408]) + .unwrap() + .to_be_bytes(); // Sending confirmation client @@ -207,7 +240,10 @@ fn test() { let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; - let id = received.find_id(true, &[16]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::Unordered, &[16]) + .unwrap() + .to_be_bytes(); client .send(server, &[128, 0, 0, 0, id[1], id[2], id[3]]) .await @@ -215,7 +251,10 @@ fn test() { let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; - let id = received.find_id(true, &[23]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::Unordered, &[23]) + .unwrap() + .to_be_bytes(); client .send(server, &[128, 0, 0, 0, id[1], id[2], id[3]]) .await @@ -245,11 +284,11 @@ async fn create_game() -> (Socket, u16) { let mut client = Socket::bind(None).await.unwrap(); - // [64 + 32] -> reliable + Peers::Server + // [32 + 16] -> unordered + Peers::Server // [0, 0, 7] -> datagram ID = 7 // [1 3] -> ToGame::OpenGame { max_players: 3 } client - .send(SERVER_ADDR, &[64 + 32, 0, 0, 7, 1, 3]) + .send(SERVER_ADDR, &[32 + 16, 0, 0, 7, 1, 3]) .await .unwrap(); @@ -259,11 +298,16 @@ async fn create_game() -> (Socket, u16) { assert_eq!(received.0.len(), 1); let port = { - let Incomming::Data { reliable, id, data } = &(received.0)[0] else { + let Incomming::Data { + reliability, + id, + data, + } = &(received.0)[0] + else { panic!("Unexpected data received: {:?}", received); }; - assert!(reliable); + assert!(reliability.is_reliable()); // Confirm let id = id.to_be_bytes(); @@ -295,7 +339,10 @@ async fn create_game() -> (Socket, u16) { received.load(&mut client, &mut buffer).await; // [2, 1] -> FromGame::Joined(1) - let id = received.find_id(true, &[2, 1]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::SemiOrdered, &[2, 1]) + .unwrap() + .to_be_bytes(); client .send(server, &[128, 0, 0, 0, id[1], id[2], id[3]]) .await @@ -310,10 +357,10 @@ async fn join_game(game_port: u16) -> Socket { let server = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, game_port)); let mut client = Socket::bind(None).await.unwrap(); - // [64 + 32] -> reliable + Peers::Server + // [32 + 16] -> unordered + Peers::Server // [0, 0, 3] -> datagram ID = 3 // [1] -> ToGame::Join - client.send(server, &[64 + 32, 0, 0, 3, 1]).await.unwrap(); + client.send(server, &[32 + 16, 0, 0, 3, 1]).await.unwrap(); let mut received = ReceivedBuffer::new(); received.load(&mut client, &mut buffer).await; @@ -321,7 +368,10 @@ async fn join_game(game_port: u16) -> Socket { received.assert_confirmed(3); // [2, 2] -> FromGame::Joined(2) - let id = received.find_id(true, &[2, 2]).unwrap().to_be_bytes(); + let id = received + .find_id(Reliability::SemiOrdered, &[2, 2]) + .unwrap() + .to_be_bytes(); client .send(server, &[128, 0, 0, 0, id[1], id[2], id[3]]) .await diff --git a/crates/multiplayer/src/messages.rs b/crates/multiplayer/src/messages.rs index 99577646..0ff03ea6 100644 --- a/crates/multiplayer/src/messages.rs +++ b/crates/multiplayer/src/messages.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, time::Instant}; use bevy::prelude::*; use de_core::schedule::PreMovement; use de_messages::{FromGame, FromServer, ToGame, ToServer}; -use de_net::{InPackage, PackageBuilder, Peers}; +use de_net::{InPackage, PackageBuilder, Peers, Reliability}; use crate::{ config::ConnectionType, @@ -255,7 +255,12 @@ fn message_sender( return; }; let addr = SocketAddr::new(conf.server_host(), port); - let mut builder = PackageBuilder::new(E::RELIABLE, Peers::Server, addr); + let reliability = if E::RELIABLE { + Reliability::Unordered + } else { + Reliability::Unreliable + }; + let mut builder = PackageBuilder::new(reliability, Peers::Server, addr); for event in inputs.iter() { builder.push(event.message()).unwrap(); diff --git a/crates/net/src/connection/databuf.rs b/crates/net/src/connection/databuf.rs index 87842551..eb658130 100644 --- a/crates/net/src/connection/databuf.rs +++ b/crates/net/src/connection/databuf.rs @@ -51,6 +51,19 @@ impl DataBuf { self.data.extend(data); } + /// See [`Self::get`] and [`Self::remove`]. + /// + /// # Panics + /// + /// Panics if `buf` len is smaller than length of found data. + pub(super) fn get_and_remove(&mut self, id: PackageId, buf: &mut [u8]) -> Option { + let result = self.get(id, buf); + if result.is_some() { + self.remove(id); + } + result + } + /// Searches for data stored under ID `id`, and if found, writes the data /// to `buf` and returns length of the data. /// diff --git a/crates/net/src/connection/delivery/deliveries.rs b/crates/net/src/connection/delivery/deliveries.rs new file mode 100644 index 00000000..ea9a73ae --- /dev/null +++ b/crates/net/src/connection/delivery/deliveries.rs @@ -0,0 +1,200 @@ +use std::mem; + +use super::pending::Pending; +use crate::{header::PackageId, record::DeliveryRecord}; + +/// Iterator over packages ready to be delivered to the user. It may include +/// just received package and postponed out-of-order packages received in the +/// past. +/// +/// The packages are yielded in the order of their IDs. +pub(crate) struct Deliveries<'a, 'b> { + pending: Option>, + current: Option<(DeliveryRecord, Vec)>, + buf: &'b mut [u8], +} + +impl<'a, 'b> Deliveries<'a, 'b> { + /// Creates iterator which yields 0 items. + pub(super) fn empty(buf: &'b mut [u8]) -> Self { + Self { + pending: None, + current: None, + buf, + } + } + + /// Create iterator yielding solely one package. + pub(super) fn current( + current_record: DeliveryRecord, + current_data: Vec, + buf: &'b mut [u8], + ) -> Self { + Self { + pending: None, + current: Some((current_record, current_data)), + buf, + } + } + + /// Constructs deliveries of pending packages and the current package. + pub(super) fn drain( + pending: PendingDeliveries<'a>, + current_record: DeliveryRecord, + current_data: Vec, + buf: &'b mut [u8], + ) -> Self { + Self { + pending: Some(pending), + current: Some((current_record, current_data)), + buf, + } + } +} + +impl<'a, 'b> Iterator for Deliveries<'a, 'b> { + type Item = (DeliveryRecord, Vec); + + fn next(&mut self) -> Option { + let buf = &mut self.buf; + + let mut item = self.pending.as_mut().and_then(|p| p.pop(buf)); + + let current_id = self.current.as_ref().map(|c| c.0.header().id()); + let item_id = item.as_ref().map(|c| c.0.header().id()); + let current_first = match (current_id, item_id) { + (Some(current), Some(item)) => { + assert!(current != item); + current < item + } + (Some(_), None) => true, + _ => false, + }; + + if current_first { + mem::swap(&mut item, &mut self.current); + } + + item + } +} + +pub(super) struct PendingDeliveries<'a> { + bound: PackageId, + pending: &'a mut Pending, +} + +impl<'a> PendingDeliveries<'a> { + pub(super) fn new(bound: PackageId, pending: &'a mut Pending) -> Self { + Self { bound, pending } + } + + fn pop(&mut self, buf: &mut [u8]) -> Option<(DeliveryRecord, Vec)> { + self.pending + .pop_first(self.bound, buf) + .map(|(record, len)| (record, (buf[..len]).to_vec())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{header::PackageHeader, Peers, Reliability}; + + #[test] + fn test_empty() { + let mut buf = [0u8; 4]; + let mut iter = Deliveries::empty(&mut buf); + assert!(iter.next().is_none()); + assert!(iter.next().is_none()); + assert!(iter.next().is_none()); + } + + #[test] + fn test_current() { + let current_record = DeliveryRecord::now(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Players, + PackageId::from_bytes(&[0, 0, 5]), + )); + let current_data = vec![11, 7]; + + let mut buf = [0u8; 4]; + let mut iter = Deliveries::current(current_record, current_data, &mut buf); + + let (first_record, first_data) = iter.next().unwrap(); + assert_eq!( + first_record.header().reliability(), + Reliability::SemiOrdered + ); + assert_eq!( + first_record.header().id(), + PackageId::from_bytes(&[0, 0, 5]) + ); + assert_eq!(first_data, vec![11, 7]); + + assert!(iter.next().is_none()); + assert!(iter.next().is_none()); + assert!(iter.next().is_none()); + } + + #[test] + fn test_drain() { + let mut pending = Pending::new(); + pending.store( + DeliveryRecord::now(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Players, + PackageId::from_bytes(&[0, 0, 6]), + )), + &[12], + ); + pending.store( + DeliveryRecord::now(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Players, + PackageId::from_bytes(&[0, 0, 7]), + )), + &[13, 14], + ); + pending.store( + DeliveryRecord::now(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Players, + PackageId::from_bytes(&[0, 0, 4]), + )), + &[10, 7, 3], + ); + + let current_record = DeliveryRecord::now(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Players, + PackageId::from_bytes(&[0, 0, 5]), + )); + let current_data = vec![11, 7]; + + let mut buf = [0u8; 4]; + let mut iter = Deliveries::drain( + PendingDeliveries::new(PackageId::from_bytes(&[0, 0, 7]), &mut pending), + current_record, + current_data, + &mut buf, + ); + + let (next_record, next_data) = iter.next().unwrap(); + assert_eq!(next_record.header().id(), PackageId::from_bytes(&[0, 0, 4])); + assert_eq!(next_data, vec![10, 7, 3]); + + let (next_record, next_data) = iter.next().unwrap(); + assert_eq!(next_record.header().id(), PackageId::from_bytes(&[0, 0, 5])); + assert_eq!(next_data, vec![11, 7]); + + let (next_record, next_data) = iter.next().unwrap(); + assert_eq!(next_record.header().id(), PackageId::from_bytes(&[0, 0, 6])); + assert_eq!(next_data, vec![12]); + + assert!(iter.next().is_none()); + assert!(iter.next().is_none()); + assert!(iter.next().is_none()); + } +} diff --git a/crates/net/src/connection/delivery/mod.rs b/crates/net/src/connection/delivery/mod.rs index 46c19b0c..ef51a766 100644 --- a/crates/net/src/connection/delivery/mod.rs +++ b/crates/net/src/connection/delivery/mod.rs @@ -2,17 +2,22 @@ use std::{net::SocketAddr, time::Instant}; use async_std::{ channel::{SendError, Sender}, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, MutexGuard}, }; +pub(crate) use self::received::ReceivedIdError; use self::{ confirms::{ConfirmsBuffer, MAX_BUFF_AGE}, - received::{Received, ReceivedIdError}, + deliveries::{Deliveries, PendingDeliveries}, + pending::Pending, + received::{IdContinuity, Received}, }; use super::book::{Connection, ConnectionBook}; -use crate::{header::PackageId, tasks::OutDatagram}; +use crate::{record::DeliveryRecord, tasks::OutDatagram, Reliability}; mod confirms; +mod deliveries; +mod pending; mod received; #[derive(Clone)] @@ -27,23 +32,10 @@ impl DeliveryHandler { } } - /// This method checks whether a package with `id` from `addr` was already - /// marked as received in the past. If so it returns true. Otherwise, it - /// marks the package as received and returns false. - /// - /// This method should be called exactly once after each reliable package - /// is delivered and in order. - pub(crate) async fn received( - &mut self, - time: Instant, - addr: SocketAddr, - id: PackageId, - ) -> Result { - self.book - .lock() - .await - .update(time, addr, ConnDeliveryHandler::new) - .push(time, id) + pub(crate) async fn lock(&mut self) -> ReceiveHandlerGuard { + ReceiveHandlerGuard { + guard: self.book.lock().await, + } } /// Send package confirmation datagrams. @@ -88,8 +80,33 @@ impl DeliveryHandler { } } +/// The lock is unlocked once this guard is dropped. +pub(crate) struct ReceiveHandlerGuard<'a> { + guard: MutexGuard<'a, ConnectionBook>, +} + +impl<'a> ReceiveHandlerGuard<'a> { + /// Validate input package and return an iterator of to be delivered + /// packages on success. + /// + /// All reliable sent packages are not to be delivered to the user + /// directly but via the returned iterator. + pub(crate) fn received<'buf>( + &mut self, + addr: SocketAddr, + record: DeliveryRecord, + data: Vec, + buf: &'buf mut [u8], + ) -> Result, ReceivedIdError> { + self.guard + .update(record.time(), addr, ConnDeliveryHandler::new) + .push(record, data, buf) + } +} + struct ConnDeliveryHandler { received: Received, + pending: Pending, confirms: ConfirmsBuffer, } @@ -97,19 +114,50 @@ impl ConnDeliveryHandler { fn new() -> Self { Self { received: Received::new(), + pending: Pending::new(), confirms: ConfirmsBuffer::new(), } } - /// Registers a package as received and returns whether the it was a - /// duplicate delivery. - fn push(&mut self, time: Instant, id: PackageId) -> Result { - // Return early on error to avoid confirmation of erroneous datagrams. - let duplicate = self.received.process(id)?; - // Push to the buffer even duplicate packages, because the reason - // behind the re-delivery might be loss of the confirmation datagram. - self.confirms.push(time, id); - Ok(duplicate) + /// Registers package as received and returns an iterator of the to be + /// delivered packages. + /// + /// # Panics + /// + /// Panics if `buf` len is smaller than length of any of the drained + /// buffered pending package. + fn push<'b>( + &mut self, + record: DeliveryRecord, + data: Vec, + buf: &'b mut [u8], + ) -> Result, ReceivedIdError> { + let result = self.received.process(record.header().id()); + if let Ok(_) | Err(ReceivedIdError::Duplicate) = result { + // Push to the buffer even duplicate packages, because the reason + // behind the re-delivery might be loss of the confirmation + // datagram. + self.confirms.push(record.time(), record.header().id()); + } + + Ok(match result? { + IdContinuity::Continuous(bound) => Deliveries::drain( + PendingDeliveries::new(bound, &mut self.pending), + record, + data, + buf, + ), + IdContinuity::Sparse => match record.header().reliability() { + Reliability::SemiOrdered => { + self.pending.store(record, &data); + Deliveries::empty(buf) + } + Reliability::Unordered => Deliveries::current(record, data, buf), + Reliability::Unreliable => { + unreachable!("Unreliable packages cannot be processed by receive handler.") + } + }, + }) } } diff --git a/crates/net/src/connection/delivery/pending.rs b/crates/net/src/connection/delivery/pending.rs new file mode 100644 index 00000000..4363df54 --- /dev/null +++ b/crates/net/src/connection/delivery/pending.rs @@ -0,0 +1,137 @@ +use std::collections::BTreeMap; + +use crate::{connection::databuf::DataBuf, header::PackageId, record::DeliveryRecord}; + +/// Buffer for packages delivered out-of-order and thus awaiting the right +/// moment to be delivered. +pub(super) struct Pending { + ids: BTreeMap, + buf: DataBuf, +} + +impl Pending { + pub(super) fn new() -> Self { + Self { + ids: BTreeMap::new(), + buf: DataBuf::new(), + } + } + + /// Stores a package as pending (i.e. cannot be delivered right away). + /// + /// # Panics + /// + /// * When there already is a pending package with the given `id`. + /// + /// * It is not a (semi-)ordered package. + pub(super) fn store(&mut self, record: DeliveryRecord, data: &[u8]) { + assert!(record.header().reliability().is_ordered()); + let id = record.header().id(); + let result = self.ids.insert(id, record); + assert!(result.is_none()); + self.buf.push(id, data); + } + + /// Finds oldest (smallest) pending package older (smaller) than the given + /// bound. Returns None if there is no such package. Otherwise, stores the + /// package to the given buffer and returns original package delivery + /// record and length of the package data (as stored to the buffer). + /// + /// # Arguments + /// + /// * `bound` - exclusive ID bound. + /// + /// # Panics + /// + /// Panics if `buf` len is smaller than length of found data. + pub(super) fn pop_first( + &mut self, + bound: PackageId, + buf: &mut [u8], + ) -> Option<(DeliveryRecord, usize)> { + match self.ids.first_entry() { + Some(entry) => { + if entry.key().cmp(&bound).is_lt() { + let id = *entry.key(); + let record = entry.remove(); + Some((record, self.buf.get_and_remove(id, buf).unwrap())) + } else { + None + } + } + None => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{header::PackageHeader, Peers, Reliability, MAX_PACKAGE_SIZE}; + + #[test] + fn test_pending() { + let pkg_a = DeliveryRecord::now(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Server, + PackageId::from_bytes(&[0, 0, 18]), + )); + let pkg_b = DeliveryRecord::now(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Server, + PackageId::from_bytes(&[0, 0, 14]), + )); + let pkg_c = DeliveryRecord::now(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Server, + PackageId::from_bytes(&[0, 0, 22]), + )); + + let mut buf = [0u8; MAX_PACKAGE_SIZE]; + let mut pending = Pending::new(); + + assert!(pending + .pop_first(PackageId::from_bytes(&[0, 0, 10]), &mut buf) + .is_none()); + + pending.store(pkg_a.clone(), &[4, 5, 6, 7]); + pending.store(pkg_b.clone(), &[1, 2, 3]); + pending.store(pkg_c.clone(), &[92, 86]); + + assert!(pending + .pop_first(PackageId::from_bytes(&[0, 0, 10]), &mut buf) + .is_none()); + + assert_eq!( + pending + .pop_first(PackageId::from_bytes(&[0, 0, 20]), &mut buf) + .unwrap(), + (pkg_b, 3) + ); + assert_eq!(&buf[..3], &[1, 2, 3]); + + assert_eq!( + pending + .pop_first(PackageId::from_bytes(&[0, 0, 20]), &mut buf) + .unwrap(), + (pkg_a, 4) + ); + assert_eq!(&buf[..4], &[4, 5, 6, 7]); + + assert!(pending + .pop_first(PackageId::from_bytes(&[0, 0, 20]), &mut buf) + .is_none()); + + assert_eq!( + pending + .pop_first(PackageId::from_bytes(&[0, 0, 30]), &mut buf) + .unwrap(), + (pkg_c, 2) + ); + assert_eq!(&buf[..2], &[92, 86]); + + assert!(pending + .pop_first(PackageId::from_bytes(&[0, 0, 30]), &mut buf) + .is_none()); + } +} diff --git a/crates/net/src/connection/delivery/received.rs b/crates/net/src/connection/delivery/received.rs index cbb33e03..4fe0c3a7 100644 --- a/crates/net/src/connection/delivery/received.rs +++ b/crates/net/src/connection/delivery/received.rs @@ -1,37 +1,51 @@ -use std::cmp::Ordering; +use std::{cmp::Ordering, collections::BTreeSet}; -use ahash::AHashSet; use thiserror::Error; use crate::header::{PackageId, PackageIdRange}; -pub(super) const MAX_SKIPPED: usize = 1024; +// This must be less than `u32::MAX / 2` due to ID ordering circularity issues. +const MAX_SKIPPED: usize = 1024; -/// Database of already received packages. +/// Database of already received packages. It servers for duplicate and +/// out-of-order delivery detection. pub(super) struct Received { highest_id: Option, - holes: AHashSet, + holes: BTreeSet, } impl Received { pub(super) fn new() -> Self { Self { highest_id: None, - holes: AHashSet::new(), + holes: BTreeSet::new(), } } - /// Registers package as delivered and returns true if it was already - /// delivered in the past. - pub(super) fn process(&mut self, id: PackageId) -> Result { + /// Registers package as received and returns delivery order continuity in + /// respect with earlier sent packages. + pub(super) fn process(&mut self, id: PackageId) -> Result { let range_start = match self.highest_id { - Some(highest) => match highest.ordering(id) { + Some(highest) => match highest.cmp(&id) { Ordering::Less => highest.incremented(), Ordering::Greater => { - return Ok(!self.holes.remove(&id)); + if self.holes.remove(&id) { + return Ok(match self.holes.first() { + Some(first) => { + if first.cmp(&id).is_ge() { + IdContinuity::Continuous(*first) + } else { + IdContinuity::Sparse + } + } + None => IdContinuity::Continuous(highest.incremented()), + }); + } else { + return Err(ReceivedIdError::Duplicate); + } } Ordering::Equal => { - return Ok(true); + return Err(ReceivedIdError::Duplicate); } }, None => PackageId::zero(), @@ -48,12 +62,27 @@ impl Received { self.holes.insert(hole); } - Ok(false) + Ok(if skipped == 0 { + IdContinuity::Continuous(id.incremented()) + } else { + IdContinuity::Sparse + }) } } -#[derive(Error, Debug)] +#[derive(Debug, PartialEq, Eq)] +pub(super) enum IdContinuity { + /// Some of the earlier sent packages has not yet been delivered. + Sparse, + /// All packages up to the given ID (exclusive) has been delivered. Just + /// delivered package is included in that range. + Continuous(PackageId), +} + +#[derive(Error, Debug, PartialEq, Eq)] pub(crate) enum ReceivedIdError { + #[error("Duplicate package")] + Duplicate, #[error("Too many packages skipped: {0}")] TooManySkipped(usize), } @@ -66,20 +95,47 @@ mod tests { fn test_received() { let mut received = Received::new(); - assert!(!received.process(PackageId::from_bytes(&[0, 0, 2])).unwrap()); - assert!(!received.process(PackageId::from_bytes(&[0, 0, 1])).unwrap()); - assert!(received.process(PackageId::from_bytes(&[0, 0, 1])).unwrap()); - assert!(!received.process(PackageId::from_bytes(&[0, 0, 0])).unwrap()); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 2])), + Ok(IdContinuity::Sparse) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 1])), + Ok(IdContinuity::Sparse) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 1])), + Err(ReceivedIdError::Duplicate) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 0])), + Ok(IdContinuity::Continuous(PackageId::from_bytes(&[0, 0, 3]))) + ); - assert!(!received.process(PackageId::from_bytes(&[0, 0, 5])).unwrap()); - assert!(!received.process(PackageId::from_bytes(&[0, 0, 3])).unwrap()); - assert!(received.process(PackageId::from_bytes(&[0, 0, 5])).unwrap()); - assert!(!received.process(PackageId::from_bytes(&[0, 0, 6])).unwrap()); - assert!(received.process(PackageId::from_bytes(&[0, 0, 3])).unwrap()); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 5])), + Ok(IdContinuity::Sparse) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 3])), + Ok(IdContinuity::Continuous(PackageId::from_bytes(&[0, 0, 4]))) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 5])), + Err(ReceivedIdError::Duplicate) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 6])), + Ok(IdContinuity::Sparse) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 3])), + Err(ReceivedIdError::Duplicate) + ); - assert!(matches!( + assert_eq!( received.process(PackageId::from_bytes(&[50, 0, 6])), Err(ReceivedIdError::TooManySkipped(3276800)) - )); + ); } } diff --git a/crates/net/src/connection/dispatch/mod.rs b/crates/net/src/connection/dispatch/mod.rs index 8fcf58e9..7375c547 100644 --- a/crates/net/src/connection/dispatch/mod.rs +++ b/crates/net/src/connection/dispatch/mod.rs @@ -11,9 +11,8 @@ use async_std::{ use self::resends::{RescheduleResult, Resends, START_BACKOFF_MS}; use super::book::{Connection, ConnectionBook}; use crate::{ - header::{DatagramHeader, PackageId, PackageIdRange}, + header::{DatagramHeader, PackageHeader, PackageId, PackageIdRange}, tasks::OutDatagram, - Peers, }; mod resends; @@ -44,13 +43,12 @@ impl DispatchHandler { &mut self, time: Instant, addr: SocketAddr, - id: PackageId, - peers: Peers, + header: PackageHeader, data: &[u8], ) { let mut book = self.book.lock().await; let handler = book.update(time, addr, ConnDispatchHandler::new); - handler.resends.push(id, peers, data, time); + handler.resends.push(header, data, time); } /// Processes data with package confirmations. @@ -86,10 +84,10 @@ impl DispatchHandler { while let Some((addr, handler)) = book.next() { let failure = loop { match handler.resends.reschedule(buf, time) { - RescheduleResult::Resend { len, id, peers } => { + RescheduleResult::Resend { len, header } => { datagrams .send(OutDatagram::new( - DatagramHeader::new_package(true, peers, id), + DatagramHeader::Package(header), buf[..len].to_vec(), addr, )) diff --git a/crates/net/src/connection/dispatch/resends.rs b/crates/net/src/connection/dispatch/resends.rs index d9f212b8..a9efd1b8 100644 --- a/crates/net/src/connection/dispatch/resends.rs +++ b/crates/net/src/connection/dispatch/resends.rs @@ -8,8 +8,7 @@ use priority_queue::PriorityQueue; use crate::{ connection::{book::MAX_CONN_AGE, databuf::DataBuf}, - header::PackageId, - Peers, + header::{PackageHeader, PackageId}, }; pub(super) const START_BACKOFF_MS: u64 = 220; @@ -20,7 +19,7 @@ const MAX_BASE_RESEND_INTERVAL_MS: u64 = (MAX_CONN_AGE.as_millis() / 2) as u64; /// confirmed). pub(super) struct Resends { queue: PriorityQueue, - meta: AHashMap, + headers: AHashMap, data: DataBuf, } @@ -28,7 +27,7 @@ impl Resends { pub(super) fn new() -> Self { Self { queue: PriorityQueue::new(), - meta: AHashMap::new(), + headers: AHashMap::new(), data: DataBuf::new(), } } @@ -44,10 +43,10 @@ impl Resends { } /// Registers new package for re-sending until it is resolved. - pub(super) fn push(&mut self, id: PackageId, peers: Peers, data: &[u8], now: Instant) { - self.queue.push(id, Timing::new(now)); - self.meta.insert(id, peers); - self.data.push(id, data); + pub(super) fn push(&mut self, header: PackageHeader, data: &[u8], now: Instant) { + self.queue.push(header.id(), Timing::new(now)); + self.headers.insert(header.id(), header); + self.data.push(header.id(), data); } /// Marks a package as delivered. No more re-sends will be scheduled and @@ -55,7 +54,7 @@ impl Resends { pub(super) fn resolve(&mut self, id: PackageId) { let result = self.queue.remove(&id); if result.is_some() { - self.meta.remove(&id); + self.headers.remove(&id); self.data.remove(id); } } @@ -85,8 +84,8 @@ impl Resends { Some(backoff) => { self.queue.change_priority(&id, backoff); let len = self.data.get(id, buf).unwrap(); - let peers = *self.meta.get(&id).unwrap(); - RescheduleResult::Resend { len, id, peers } + let header = *self.headers.get(&id).unwrap(); + RescheduleResult::Resend { len, header } } None => { self.queue.remove(&id).unwrap(); @@ -109,8 +108,7 @@ pub(crate) enum RescheduleResult { Resend { /// Length of the datagram data (written to a buffer) in bytes. len: usize, - id: PackageId, - peers: Peers, + header: PackageHeader, }, /// No datagram is currently scheduled for an immediate resent. This /// variant holds soonest possible time of a next resend. @@ -191,7 +189,7 @@ impl PartialEq for Timing { #[cfg(test)] mod tests { use super::*; - use crate::MAX_PACKAGE_SIZE; + use crate::{Peers, Reliability, MAX_PACKAGE_SIZE}; #[test] fn test_resends() { @@ -202,20 +200,29 @@ mod tests { assert!(resends.is_empty()); resends.push( - PackageId::from_bytes(&[0, 0, 0]), - Peers::Server, + PackageHeader::new( + Reliability::Unordered, + Peers::Server, + PackageId::from_bytes(&[0, 0, 0]), + ), &[4, 5, 8], time, ); resends.push( - PackageId::from_bytes(&[0, 0, 1]), - Peers::Players, + PackageHeader::new( + Reliability::Unordered, + Peers::Players, + PackageId::from_bytes(&[0, 0, 1]), + ), &[4, 5, 8, 9], time + Duration::from_millis(10_010), ); resends.push( - PackageId::from_bytes(&[0, 0, 2]), - Peers::Server, + PackageHeader::new( + Reliability::Unordered, + Peers::Server, + PackageId::from_bytes(&[0, 0, 2]), + ), &[4, 5, 8, 9, 10], time + Duration::from_millis(50_020), ); @@ -225,8 +232,11 @@ mod tests { resends.reschedule(&mut buf, time + Duration::from_secs(20)), RescheduleResult::Resend { len: 3, - id: PackageId::from_bytes(&[0, 0, 0]), - peers: Peers::Server, + header: PackageHeader::new( + Reliability::Unordered, + Peers::Server, + PackageId::from_bytes(&[0, 0, 0]), + ) } ); assert_eq!(&buf[..3], &[4, 5, 8]); @@ -236,8 +246,11 @@ mod tests { resends.reschedule(&mut buf, time + Duration::from_secs(20)), RescheduleResult::Resend { len: 4, - id: PackageId::from_bytes(&[0, 0, 1]), - peers: Peers::Players, + header: PackageHeader::new( + Reliability::Unordered, + Peers::Players, + PackageId::from_bytes(&[0, 0, 1]) + ) } ); assert_eq!(&buf[..4], &[4, 5, 8, 9]); @@ -253,8 +266,11 @@ mod tests { resends.reschedule(&mut buf, time + Duration::from_secs(1000)), RescheduleResult::Resend { len: 5, - id: PackageId::from_bytes(&[0, 0, 2]), - peers: Peers::Server, + header: PackageHeader::new( + Reliability::Unordered, + Peers::Server, + PackageId::from_bytes(&[0, 0, 2]) + ) } ); // 2nd resend @@ -262,8 +278,11 @@ mod tests { resends.reschedule(&mut buf, time + Duration::from_secs(2000)), RescheduleResult::Resend { len: 5, - id: PackageId::from_bytes(&[0, 0, 2]), - peers: Peers::Server, + header: PackageHeader::new( + Reliability::Unordered, + Peers::Server, + PackageId::from_bytes(&[0, 0, 2]) + ) } ); // 3rd resend @@ -271,8 +290,11 @@ mod tests { resends.reschedule(&mut buf, time + Duration::from_secs(3000)), RescheduleResult::Resend { len: 5, - id: PackageId::from_bytes(&[0, 0, 2]), - peers: Peers::Server, + header: PackageHeader::new( + Reliability::Unordered, + Peers::Server, + PackageId::from_bytes(&[0, 0, 2]) + ) } ); // 4th resend @@ -280,8 +302,11 @@ mod tests { resends.reschedule(&mut buf, time + Duration::from_secs(4000)), RescheduleResult::Resend { len: 5, - id: PackageId::from_bytes(&[0, 0, 2]), - peers: Peers::Server, + header: PackageHeader::new( + Reliability::Unordered, + Peers::Server, + PackageId::from_bytes(&[0, 0, 2]) + ) } ); // 5th resend @@ -289,8 +314,11 @@ mod tests { resends.reschedule(&mut buf, time + Duration::from_secs(5000)), RescheduleResult::Resend { len: 5, - id: PackageId::from_bytes(&[0, 0, 2]), - peers: Peers::Server, + header: PackageHeader::new( + Reliability::Unordered, + Peers::Server, + PackageId::from_bytes(&[0, 0, 2]) + ) } ); // 6th resend (7th try) => failure diff --git a/crates/net/src/connection/mod.rs b/crates/net/src/connection/mod.rs index 9d6f2f83..78f20108 100644 --- a/crates/net/src/connection/mod.rs +++ b/crates/net/src/connection/mod.rs @@ -1,4 +1,4 @@ -pub(crate) use delivery::DeliveryHandler; +pub(crate) use delivery::{DeliveryHandler, ReceivedIdError}; pub(crate) use dispatch::DispatchHandler; mod book; diff --git a/crates/net/src/header.rs b/crates/net/src/header.rs index dde20021..6477466e 100644 --- a/crates/net/src/header.rs +++ b/crates/net/src/header.rs @@ -7,11 +7,9 @@ pub(crate) const HEADER_SIZE: usize = 4; /// This bit is set in protocol control datagrams. const CONTROL_BIT: u8 = 0b1000_0000; -/// This bit is set on datagrams which must be delivered reliably. -const RELIABLE_BIT: u8 = 0b0100_0000; /// This bit is set on datagrams which are sent to the server instead of other /// players. -const SERVER_PEER_BIT: u8 = 0b0010_0000; +const SERVER_PEER_BIT: u8 = 0b0001_0000; #[derive(Clone, Copy, Debug, PartialEq)] pub(crate) enum DatagramHeader { @@ -20,14 +18,6 @@ pub(crate) enum DatagramHeader { } impl DatagramHeader { - pub(crate) fn new_package(reliable: bool, peers: Peers, id: PackageId) -> Self { - Self::Package(PackageHeader { - reliable, - peers, - id, - }) - } - /// Writes the header to the beginning of a bytes buffer. /// /// # Panics @@ -38,10 +28,7 @@ impl DatagramHeader { let (mask, id) = match self { Self::Confirmation => (CONTROL_BIT, [0, 0, 0]), Self::Package(package_header) => { - let mut mask = 0; - if package_header.reliable { - mask |= RELIABLE_BIT; - } + let mut mask = package_header.reliability().to_bits(); if matches!(package_header.peers, Peers::Server) { mask |= SERVER_PEER_BIT; } @@ -71,14 +58,14 @@ impl DatagramHeader { Err(HeaderError::Invalid) } } else { - let reliable = mask & RELIABLE_BIT > 0; + let reliability = Reliability::from_bits(mask)?; let peers = if mask & SERVER_PEER_BIT > 0 { Peers::Server } else { Peers::Players }; Ok(Self::Package(PackageHeader { - reliable, + reliability, peers, id: PackageId::from_bytes(&data[1..HEADER_SIZE]), })) @@ -93,8 +80,8 @@ impl fmt::Display for DatagramHeader { Self::Package(header) => { write!( f, - "Package {{ reliable: {}, peers: {}, id: {} }}", - header.reliable, header.peers, header.id + "Package {{ reliability: {}, peers: {}, id: {} }}", + header.reliability, header.peers, header.id ) } } @@ -104,14 +91,22 @@ impl fmt::Display for DatagramHeader { #[derive(Clone, Copy, Debug, PartialEq)] pub(crate) struct PackageHeader { /// True if the package is delivered reliably. - reliable: bool, + reliability: Reliability, peers: Peers, id: PackageId, } impl PackageHeader { - pub(crate) fn reliable(&self) -> bool { - self.reliable + pub(crate) fn new(reliability: Reliability, peers: Peers, id: PackageId) -> Self { + Self { + reliability, + peers, + id, + } + } + + pub(crate) fn reliability(&self) -> Reliability { + self.reliability } pub(crate) fn peers(&self) -> Peers { @@ -123,6 +118,69 @@ impl PackageHeader { } } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Reliability { + /// There are no guarantees on reliability, ordering or duplicate delivery + /// of the package. + Unreliable, + /// Non-duplicate delivery of the package is guaranteed. There are no + /// guarantees on ordering of the package with respect to other packages. + Unordered, + /// Non-duplicate delivery of the package is guaranteed. The package is + /// guaranteed to be delivered after all other previously reliably sent + /// packages. There are no guarantees on ordering of the package with + /// respect to other packages sent after this one. + SemiOrdered, +} + +impl Reliability { + fn to_bits(self) -> u8 { + let bits = match self { + Self::Unreliable => 0, + Self::Unordered => 1, + Self::SemiOrdered => 2, + }; + bits << 5 + } + + fn from_bits(bits: u8) -> Result { + let bits = (bits >> 5) & 3; + match bits { + 0 => Ok(Self::Unreliable), + 1 => Ok(Self::Unordered), + 2 => Ok(Self::SemiOrdered), + _ => Err(HeaderError::Invalid), + } + } + + /// Returns true if the package is delivered reliably, independently on + /// ordering. + pub fn is_reliable(&self) -> bool { + match self { + Self::SemiOrdered | Self::Unordered => true, + Self::Unreliable => false, + } + } + + /// Returns true if there are any guarantees on ordering of the package. + pub fn is_ordered(&self) -> bool { + match self { + Self::SemiOrdered => true, + Self::Unordered | Self::Unreliable => false, + } + } +} + +impl fmt::Display for Reliability { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Unreliable => write!(f, "unreliable"), + Self::Unordered => write!(f, "unordered"), + Self::SemiOrdered => write!(f, "semi-ordered"), + } + } +} + #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Peers { /// Communication between networking server and a player/client. @@ -166,11 +224,32 @@ impl PackageId { } } + /// # Panics + /// + /// If not exactly 3 bytes are passed. + pub(crate) fn from_bytes(bytes: &[u8]) -> Self { + assert_eq!(bytes.len(), 3); + let a = (bytes[0] as u32) << 16; + let b = (bytes[1] as u32) << 8; + let c = bytes[2] as u32; + Self(a + b + c) + } + + pub(crate) fn to_bytes(self) -> [u8; 3] { + [ + ((self.0 >> 16) & 0xff) as u8, + ((self.0 >> 8) & 0xff) as u8, + (self.0 & 0xff) as u8, + ] + } +} + +impl Ord for PackageId { /// Returns probable relative ordering of two package IDs. /// /// Note that the implementation is circular due to wrapping around maximum /// value and thus the ordering is not transitive. - pub(crate) fn ordering(self, other: PackageId) -> Ordering { + fn cmp(&self, other: &Self) -> Ordering { match self.0.cmp(&other.0) { Ordering::Greater => { if self.0.abs_diff(other.0) < Self::MAX / 2 { @@ -189,24 +268,11 @@ impl PackageId { Ordering::Equal => Ordering::Equal, } } +} - /// # Panics - /// - /// If not exactly 3 bytes are passed. - pub(crate) fn from_bytes(bytes: &[u8]) -> Self { - assert_eq!(bytes.len(), 3); - let a = (bytes[0] as u32) << 16; - let b = (bytes[1] as u32) << 8; - let c = bytes[2] as u32; - Self(a + b + c) - } - - pub(crate) fn to_bytes(self) -> [u8; 3] { - [ - ((self.0 >> 16) & 0xff) as u8, - ((self.0 >> 8) & 0xff) as u8, - (self.0 & 0xff) as u8, - ] +impl PartialOrd for PackageId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) } } @@ -290,15 +356,31 @@ mod tests { fn test_write_header() { let mut buf = [0u8; 256]; - DatagramHeader::new_package(false, Peers::Server, PackageId::zero()).write(&mut buf); - assert_eq![&buf[0..4], &[0b0010_0000, 0, 0, 0]]; + DatagramHeader::Package(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Server, + PackageId::zero(), + )) + .write(&mut buf); + assert_eq![&buf[0..4], &[0b0101_0000, 0, 0, 0]]; assert_eq![&buf[4..], &[0; 252]]; - DatagramHeader::new_package(true, Peers::Server, 256.try_into().unwrap()).write(&mut buf); - assert_eq![&buf[0..4], &[0b0110_0000, 0, 1, 0]]; + + DatagramHeader::Package(PackageHeader::new( + Reliability::Unordered, + Peers::Server, + 256.try_into().unwrap(), + )) + .write(&mut buf); + assert_eq![&buf[0..4], &[0b0011_0000, 0, 1, 0]]; assert_eq![&buf[4..], &[0; 252]]; - DatagramHeader::new_package(true, Peers::Players, 1033.try_into().unwrap()).write(&mut buf); - assert_eq![&buf[0..4], &[0b0100_0000, 0, 4, 9]]; + DatagramHeader::Package(PackageHeader::new( + Reliability::Unreliable, + Peers::Players, + 1033.try_into().unwrap(), + )) + .write(&mut buf); + assert_eq![&buf[0..4], &[0b0000_0000, 0, 4, 9]]; assert_eq![&buf[4..], &[0; 252]]; } @@ -309,19 +391,31 @@ mod tests { buf[0..4].copy_from_slice(&[64, 0, 0, 0]); assert_eq!( DatagramHeader::read(&buf).unwrap(), - DatagramHeader::new_package(true, Peers::Players, 0.try_into().unwrap()) + DatagramHeader::Package(PackageHeader::new( + Reliability::SemiOrdered, + Peers::Players, + 0.try_into().unwrap() + )) ); - buf[0..4].copy_from_slice(&[64, 1, 0, 3]); + buf[0..4].copy_from_slice(&[32, 1, 0, 3]); assert_eq!( DatagramHeader::read(&buf).unwrap(), - DatagramHeader::new_package(true, Peers::Players, 65539.try_into().unwrap()) + DatagramHeader::Package(PackageHeader::new( + Reliability::Unordered, + Peers::Players, + 65539.try_into().unwrap() + )) ); - buf[0..4].copy_from_slice(&[32, 0, 0, 2]); + buf[0..4].copy_from_slice(&[16, 0, 0, 2]); assert_eq!( DatagramHeader::read(&buf).unwrap(), - DatagramHeader::new_package(false, Peers::Server, 2.try_into().unwrap()) + DatagramHeader::Package(PackageHeader::new( + Reliability::Unreliable, + Peers::Server, + 2.try_into().unwrap() + )) ); } @@ -337,24 +431,24 @@ mod tests { #[test] fn test_ordering() { assert_eq!( - PackageId::from_bytes(&[0, 1, 1]).ordering(PackageId::from_bytes(&[0, 1, 2])), + PackageId::from_bytes(&[0, 1, 1]).cmp(&PackageId::from_bytes(&[0, 1, 2])), Ordering::Less ); assert_eq!( - PackageId::from_bytes(&[0, 1, 1]).ordering(PackageId::from_bytes(&[0, 1, 0])), + PackageId::from_bytes(&[0, 1, 1]).cmp(&PackageId::from_bytes(&[0, 1, 0])), Ordering::Greater ); assert_eq!( - PackageId::from_bytes(&[0, 1, 1]).ordering(PackageId::from_bytes(&[0, 1, 1])), + PackageId::from_bytes(&[0, 1, 1]).cmp(&PackageId::from_bytes(&[0, 1, 1])), Ordering::Equal ); assert_eq!( - PackageId::from_bytes(&[0, 1, 2]).ordering(PackageId::from_bytes(&[255, 255, 1])), + PackageId::from_bytes(&[0, 1, 2]).cmp(&PackageId::from_bytes(&[255, 255, 1])), Ordering::Greater ); assert_eq!( - PackageId::from_bytes(&[255, 255, 1]).ordering(PackageId::from_bytes(&[0, 1, 2])), + PackageId::from_bytes(&[255, 255, 1]).cmp(&PackageId::from_bytes(&[0, 1, 2])), Ordering::Less ); } diff --git a/crates/net/src/lib.rs b/crates/net/src/lib.rs index fb34e7d2..5170a99a 100644 --- a/crates/net/src/lib.rs +++ b/crates/net/src/lib.rs @@ -1,4 +1,4 @@ -pub use header::Peers; +pub use header::{Peers, Reliability}; pub use protocol::MAX_PACKAGE_SIZE; pub use socket::{RecvError, SendError, Socket, MAX_DATAGRAM_SIZE}; pub use tasks::{ @@ -9,5 +9,6 @@ pub use tasks::{ mod connection; mod header; mod protocol; +mod record; mod socket; mod tasks; diff --git a/crates/net/src/record.rs b/crates/net/src/record.rs new file mode 100644 index 00000000..8d1b46ee --- /dev/null +++ b/crates/net/src/record.rs @@ -0,0 +1,28 @@ +use std::time::Instant; + +use crate::header::PackageHeader; + +#[derive(Debug, PartialEq, Clone)] +pub(super) struct DeliveryRecord { + time: Instant, + header: PackageHeader, +} + +impl DeliveryRecord { + pub(crate) fn now(header: PackageHeader) -> Self { + Self { + time: Instant::now(), + header, + } + } + + /// Original package receive time. + pub(crate) fn time(&self) -> Instant { + self.time + } + + /// Package header. + pub(crate) fn header(&self) -> PackageHeader { + self.header + } +} diff --git a/crates/net/src/tasks/communicator.rs b/crates/net/src/tasks/communicator.rs index afa3d002..c87b1ee8 100644 --- a/crates/net/src/tasks/communicator.rs +++ b/crates/net/src/tasks/communicator.rs @@ -7,7 +7,10 @@ use bincode::{ error::{DecodeError, EncodeError}, }; -use crate::{header::Peers, protocol::MAX_PACKAGE_SIZE}; +use crate::{ + header::{Peers, Reliability}, + protocol::MAX_PACKAGE_SIZE, +}; const BINCODE_CONF: Configuration> = bincode::config::standard() @@ -17,7 +20,7 @@ const BINCODE_CONF: Configuration> = /// It cumulatively builds output packages from individual messages. pub struct PackageBuilder { - reliable: bool, + reliability: Reliability, peers: Peers, target: SocketAddr, buffer: Vec, @@ -26,9 +29,9 @@ pub struct PackageBuilder { } impl PackageBuilder { - pub fn new(reliable: bool, peers: Peers, target: SocketAddr) -> Self { + pub fn new(reliability: Reliability, peers: Peers, target: SocketAddr) -> Self { Self { - reliable, + reliability, peers, target, buffer: vec![0; MAX_PACKAGE_SIZE], @@ -46,7 +49,7 @@ impl PackageBuilder { if self.used > 0 { self.buffer.truncate(self.used); - let package = OutPackage::new(self.buffer, self.reliable, self.peers, self.target); + let package = OutPackage::new(self.buffer, self.reliability, self.peers, self.target); packages.push(package); } @@ -66,7 +69,7 @@ impl PackageBuilder { data.truncate(self.used); self.used = 0; - let package = OutPackage::new(data, self.reliable, self.peers, self.target); + let package = OutPackage::new(data, self.reliability, self.peers, self.target); self.packages.push(package); self.push_inner(message) @@ -89,7 +92,7 @@ impl PackageBuilder { /// A package to be send. pub struct OutPackage { pub(super) data: Vec, - reliable: bool, + reliability: Reliability, peers: Peers, pub(super) target: SocketAddr, } @@ -100,7 +103,7 @@ impl OutPackage { /// See also [`Self::new`]. pub fn encode_single( message: &E, - reliable: bool, + reliability: Reliability, peers: Peers, target: SocketAddr, ) -> Result @@ -108,32 +111,32 @@ impl OutPackage { E: bincode::Encode, { let data = encode_to_vec(message, BINCODE_CONF)?; - Ok(Self::new(data, reliable, peers, target)) + Ok(Self::new(data, reliability, peers, target)) } /// # Arguments /// /// * `data` - data to be send. /// - /// * `reliable` - whether to deliver the data reliably. + /// * `reliability` - package delivery reliability mode. /// /// * `target` - package recipient. /// /// # Panics /// /// Panics if data is longer than [`MAX_PACKAGE_SIZE`]. - pub fn new(data: Vec, reliable: bool, peers: Peers, target: SocketAddr) -> Self { + pub fn new(data: Vec, reliability: Reliability, peers: Peers, target: SocketAddr) -> Self { assert!(data.len() < MAX_PACKAGE_SIZE); Self { data, - reliable, + reliability, peers, target, } } - pub(super) fn reliable(&self) -> bool { - self.reliable + pub(super) fn reliability(&self) -> Reliability { + self.reliability } pub(super) fn peers(&self) -> Peers { @@ -144,7 +147,7 @@ impl OutPackage { /// A received message / datagram. pub struct InPackage { data: Vec, - reliable: bool, + reliability: Reliability, peers: Peers, source: SocketAddr, time: Instant, @@ -153,14 +156,14 @@ pub struct InPackage { impl InPackage { pub(super) fn new( data: Vec, - reliable: bool, + reliability: Reliability, peers: Peers, source: SocketAddr, time: Instant, ) -> Self { Self { data, - reliable, + reliability, peers, source, time, @@ -183,9 +186,8 @@ impl InPackage { } } - /// Whether the datagram was delivered reliably. - pub fn reliable(&self) -> bool { - self.reliable + pub fn reliability(&self) -> Reliability { + self.reliability } pub fn source(&self) -> SocketAddr { @@ -312,7 +314,7 @@ mod tests { } let mut builder = PackageBuilder::new( - true, + Reliability::Unordered, Peers::Players, "127.0.0.1:1111".parse::().unwrap(), ); @@ -353,7 +355,7 @@ mod tests { let package = InPackage { // Message::Two([3, 4]), Message::One(1286) data: vec![1, 3, 4, 0, 251, 5, 6], - reliable: false, + reliability: Reliability::Unreliable, peers: Peers::Players, source: "127.0.0.1:1111".parse().unwrap(), time: Instant::now(), diff --git a/crates/net/src/tasks/ureceiver.rs b/crates/net/src/tasks/ureceiver.rs index fc49444d..cd7dabbd 100644 --- a/crates/net/src/tasks/ureceiver.rs +++ b/crates/net/src/tasks/ureceiver.rs @@ -1,4 +1,4 @@ -use std::time::{Duration, Instant}; +use std::time::Duration; use async_std::{ channel::{Receiver, Sender}, @@ -7,7 +7,11 @@ use async_std::{ use tracing::{error, info, trace, warn}; use super::{cancellation::CancellationSender, dreceiver::InPackageDatagram}; -use crate::{connection::DeliveryHandler, InPackage}; +use crate::{ + connection::{DeliveryHandler, ReceivedIdError}, + record::DeliveryRecord, + InPackage, MAX_PACKAGE_SIZE, +}; /// Handler of user datagrams, i.e. datagrams with user data targeted to /// higher-level users of the network protocol. @@ -23,7 +27,9 @@ pub(super) async fn run( ) { info!("Starting package receiver on port {port}..."); - loop { + let mut buf = vec![0; MAX_PACKAGE_SIZE]; + + 'main: loop { let Ok(result) = timeout(Duration::from_millis(500), datagrams.recv()).await else { if packages.is_closed() { // This must be here in case of no incoming packages to ensure @@ -45,40 +51,51 @@ pub(super) async fn run( error!("Datagram receiver channel is unexpectedly closed."); break; }; + let record = DeliveryRecord::now(datagram.header); - let time = Instant::now(); - if datagram.header.reliable() { - match delivery_handler - .received(time, datagram.source, datagram.header.id()) - .await - { - Ok(true) => { + if datagram.header.reliability().is_reliable() { + let mut guard = delivery_handler.lock().await; + match guard.received(datagram.source, record, datagram.data, &mut buf) { + Ok(deliveries) => { + for (record, data) in deliveries { + let result = packages + .send(InPackage::new( + data, + record.header().reliability(), + record.header().peers(), + datagram.source, + record.time(), + )) + .await; + if result.is_err() { + break 'main; + } + } + } + Err(ReceivedIdError::Duplicate) => { trace!( "Duplicate delivery of package {:?} from {:?}.", datagram.header.id(), datagram.source ); - continue; } - Ok(false) => (), Err(err) => { - warn!("Package ID error: {err:?}"); + warn!("Received package error: {err:?}"); } } - } - - let result = packages - .send(InPackage::new( - datagram.data, - datagram.header.reliable(), - datagram.header.peers(), - datagram.source, - time, - )) - .await; - - if result.is_err() { - break; + } else { + let result = packages + .send(InPackage::new( + datagram.data, + datagram.header.reliability(), + datagram.header.peers(), + datagram.source, + record.time(), + )) + .await; + if result.is_err() { + break 'main; + } } } diff --git a/crates/net/src/tasks/usender.rs b/crates/net/src/tasks/usender.rs index 330e5475..4937b3d3 100644 --- a/crates/net/src/tasks/usender.rs +++ b/crates/net/src/tasks/usender.rs @@ -6,7 +6,7 @@ use tracing::{error, info}; use super::{cancellation::CancellationSender, dsender::OutDatagram}; use crate::{ connection::DispatchHandler, - header::{DatagramHeader, PackageIdRange}, + header::{DatagramHeader, PackageHeader, PackageIdRange}, OutPackage, }; @@ -29,26 +29,19 @@ pub(super) async fn run( let time = Instant::now(); - let package_id = if package.reliable() { + let package_id = if package.reliability().is_reliable() { dispatch_handler.next_package_id(time, package.target).await } else { counter_unreliable.next().unwrap() }; - let header = DatagramHeader::new_package(package.reliable(), package.peers(), package_id); + let package_header = PackageHeader::new(package.reliability(), package.peers(), package_id); + let header = DatagramHeader::Package(package_header); - if let DatagramHeader::Package(package_header) = header { - if package_header.reliable() { - dispatch_handler - .sent( - time, - package.target, - package_header.id(), - package_header.peers(), - &package.data, - ) - .await; - } + if package_header.reliability().is_reliable() { + dispatch_handler + .sent(time, package.target, package_header, &package.data) + .await; } let closed = datagrams