Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Indy2222 committed Aug 20, 2023
1 parent 0bfd718 commit 2cc3b26
Show file tree
Hide file tree
Showing 9 changed files with 524 additions and 382 deletions.
383 changes: 41 additions & 342 deletions crates/net/src/connection/confirms.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/net/src/connection/databuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl DataBuf {
self.data.extend(data);
}

// TODO docs
// TODO docs (incl panics)
pub(super) fn pop(&mut self, id: PackageId, buf: &mut [u8]) -> Option<usize> {
let result = self.get(id, buf);
if result.is_some() {
Expand Down
228 changes: 228 additions & 0 deletions crates/net/src/connection/deliveries.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
use std::{mem, net::SocketAddr, time::Instant};

use async_std::{
channel::{SendError, Sender},
sync::{Arc, Mutex, MutexGuard},
};

use super::{
book::{Connection, ConnectionBook},
confirms::{ConfirmsBuffer, MAX_BUFF_AGE},
pending::Pending,
received::{IdContinuity, Received, ReceivedIdError},
};
use crate::{header::PackageId, record::DeliveryRecord, tasks::OutDatagram, Reliability};

// TODO rename
#[derive(Clone)]
pub(crate) struct Confirmations {
book: Arc<Mutex<ConnectionBook<ReceiveHandler>>>,
}

impl Confirmations {
pub(crate) fn new() -> Self {
Self {
book: Arc::new(Mutex::new(ConnectionBook::new())),
}
}

// TODO document
pub(crate) async fn lock(&mut self) -> ReceiveHandlerGuard {
ReceiveHandlerGuard {
guard: self.book.lock().await,
}
}

/// Send package confirmation datagrams.
///
/// Not all confirmations are sent because there is a small delay to enable
/// grouping.
///
/// # Arguments
///
/// * `time` - current time.
///
/// * `force` - if true, all pending confirmations will be sent.
///
/// * `datagrams` - output datagrams with the confirmations will be send to
/// this channel.
///
/// # Returns
///
/// On success, it returns an estimation of the next resend schedule time.
pub(crate) async fn send_confirms(
&mut self,
time: Instant,
force: bool,
datagrams: &mut Sender<OutDatagram>,
) -> Result<Instant, SendError<OutDatagram>> {
let mut next = time + MAX_BUFF_AGE;
let mut book = self.book.lock().await;

while let Some((addr, handler)) = book.next() {
let expiration = handler
.confirms
.send_confirms(time, force, addr, datagrams)
.await?;
next = next.min(expiration);
}

Ok(next)
}

pub(crate) async fn clean(&mut self, time: Instant) {
self.book.lock().await.clean(time);
}
}

pub(crate) struct ReceiveHandlerGuard<'a> {
guard: MutexGuard<'a, ConnectionBook<ReceiveHandler>>,
}

impl<'a> ReceiveHandlerGuard<'a> {
/// This method checks package ID is not corrupt, not duplicate and whether
/// all previously sent ID have already been delivered.
///
/// This method should be called exactly once after each reliable package
/// is delivered and in order.
// TODO update docs
pub(crate) fn received<'buf>(
&mut self,
addr: SocketAddr,
record: DeliveryRecord,
data: Vec<u8>,
buf: &'buf mut [u8],
) -> Result<Deliveries<'_, 'buf>, ReceivedIdError> {
self.guard
.update(record.time(), addr, ReceiveHandler::new)
.push(record, data, buf)
}
}

struct ReceiveHandler {
received: Received,
pending: Pending,
confirms: ConfirmsBuffer,
}

impl ReceiveHandler {
fn new() -> Self {
Self {
received: Received::new(),
pending: Pending::new(),
confirms: ConfirmsBuffer::new(),
}
}

/// Registers package as received and returns an iterator of the to be
/// delivered packages.
///
/// # Arguments
///
/// * `time` - package delivery time
///
/// * `id` - ID of the delivered package.
///
/// * `data` - package data to be buffered in case the package cannot be
/// delivered right away.
///
/// * `buf` - buffer used for pending package buffer draining.
///
/// # Panics
///
/// Panics if `buf` len is smaller than length of any of the drained
/// buffered pending package.
// TODO double check docs
fn push<'b>(
&mut self,
record: DeliveryRecord,
data: Vec<u8>,
buf: &'b mut [u8],
) -> Result<Deliveries<'_, 'b>, ReceivedIdError> {
let result = self.received.process(record.header().id());
if let Ok(_) | Err(ReceivedIdError::Duplicate) = result {
// Push to the buffer even duplicate packages, because the reason
// behind the re-delivery might be loss of the confirmation
// datagram.
self.confirms.push(record.time(), record.header().id());
}

Ok(match result? {
IdContinuity::Continuous(bound) => {
Deliveries::drain(bound, &mut self.pending, record, data, buf)
}
IdContinuity::Sparse => match record.header().reliability() {
Reliability::SemiOrdered => {
self.pending.store(record, &data);
Deliveries::empty(buf)
}
Reliability::Unordered => Deliveries::current(record, data, buf),
Reliability::Unreliable => {
unreachable!("Unreliable packages cannot be processed by receive handler.")
}
},
})
}
}

impl Connection for ReceiveHandler {
fn pending(&self) -> bool {
!self.confirms.is_empty()
}
}

pub(crate) struct Deliveries<'a, 'b> {
pending: Option<(PackageId, &'a mut Pending)>,
current: Option<(DeliveryRecord, Vec<u8>)>,
buf: &'b mut [u8],
}

impl<'a, 'b> Deliveries<'a, 'b> {
fn empty(buf: &'b mut [u8]) -> Self {
Self {
pending: None,
current: None,
buf,
}
}

fn current(current_record: DeliveryRecord, current_data: Vec<u8>, buf: &'b mut [u8]) -> Self {
Self {
pending: None,
current: Some((current_record, current_data)),
buf,
}
}

fn drain(
bound: PackageId,
pending: &'a mut Pending,
current_record: DeliveryRecord,
current_data: Vec<u8>,
buf: &'b mut [u8],
) -> Self {
Self {
pending: Some((bound, pending)),
current: Some((current_record, current_data)),
buf,
}
}
}

impl<'a, 'b> Iterator for Deliveries<'a, 'b> {
type Item = (DeliveryRecord, Vec<u8>);

fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.pending.and_then(|(bound, pending)| {
pending
.pop_first(bound, self.buf)
.map(|(record, len)| (record, (&self.buf[..len]).to_vec()))
}) {
return Some(item);
}

let mut item = None;
mem::swap(&mut item, &mut self.current);
item
}
}
6 changes: 5 additions & 1 deletion crates/net/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
pub(crate) use confirms::{Confirmations, ReceivedIdError};
pub(crate) use deliveries::Confirmations;
pub(crate) use received::ReceivedIdError;
pub(crate) use resend::Resends;

mod book;
mod confirms;
mod databuf;
mod deliveries;
mod pending;
mod received;
mod resend;
49 changes: 49 additions & 0 deletions crates/net/src/connection/pending.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::collections::BTreeMap;

use super::databuf::DataBuf;
use crate::{header::PackageId, record::DeliveryRecord};

/// Buffer for packages delivered out-of-order and thus awaiting the right
/// moment to be delivered.
pub(super) struct Pending {
ids: BTreeMap<PackageId, DeliveryRecord>,
buf: DataBuf,
}

impl Pending {
pub(super) fn new() -> Self {
Self {
ids: BTreeMap::new(),
buf: DataBuf::new(),
}
}

// TODO docs (incl panics)
pub(super) fn store(&mut self, record: DeliveryRecord, data: &[u8]) {
let id = record.header().id();
let result = self.ids.insert(id, record);
assert!(result.is_none());
self.buf.push(id, data);
}

// TODO docs (incl panics)
// TODO tests
pub(super) fn pop_first(
&mut self,
bound: PackageId,
buf: &mut [u8],
) -> Option<(DeliveryRecord, usize)> {
match self.ids.first_entry() {
Some(entry) => {
if entry.key().cmp(&bound).is_lt() {
let id = *entry.key();
let record = entry.remove();
Some((record, self.buf.pop(id, buf).unwrap()))
} else {
None
}
}
None => None,
}
}
}
Loading

0 comments on commit 2cc3b26

Please sign in to comment.