Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Indy2222 committed Aug 20, 2023
1 parent 0bfd718 commit cdecb39
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 218 deletions.
243 changes: 25 additions & 218 deletions crates/net/src/connection/confirms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use tracing::warn;
use super::{
book::{Connection, ConnectionBook},
databuf::DataBuf,
pending::{Pending, PendingDrain},
received::{IdContinuity, Received, ReceivedIdError},
};
use crate::{
header::{DatagramHeader, PackageId, PackageIdRange},
Expand All @@ -27,12 +29,11 @@ use crate::{
const MAX_BUFF_SIZE: usize = 96;
/// The buffer is flushed after the oldest part is older than this.
const MAX_BUFF_AGE: Duration = Duration::from_millis(100);
// This must be less than `u32::MAX / 2` due to ID ordering circularity issues.
const MAX_SKIPPED: usize = 1024;

// TODO rename
#[derive(Clone)]
pub(crate) struct Confirmations {
book: Arc<Mutex<ConnectionBook<IdReceiver>>>,
book: Arc<Mutex<ConnectionBook<ReceiveHandler>>>,
}

impl Confirmations {
Expand Down Expand Up @@ -86,10 +87,10 @@ impl Confirmations {
let mut next = Instant::now() + MAX_BUFF_AGE;
let mut book = self.book.lock().await;

while let Some((addr, id_receiver)) = book.next() {
if let Some(expiration) = id_receiver.buffer.expiration() {
if force || expiration <= time || id_receiver.buffer.full() {
while let Some(data) = id_receiver.buffer.flush(MAX_PACKAGE_SIZE) {
while let Some((addr, handler)) = book.next() {
if let Some(expiration) = handler.confirms.expiration() {
if force || expiration <= time || handler.confirms.full() {
while let Some(data) = handler.confirms.flush(MAX_PACKAGE_SIZE) {
datagrams
.send(OutDatagram::new(
DatagramHeader::Confirmation,
Expand All @@ -112,17 +113,18 @@ impl Confirmations {
}
}

// TODO rename
struct IdReceiver {
struct ReceiveHandler {
received: Received,
buffer: Buffer,
pending: Pending,
confirms: ConfirmsBuffer,
}

impl IdReceiver {
impl ReceiveHandler {
fn new() -> Self {
Self {
received: Received::new(),
buffer: Buffer::new(),
pending: Pending::new(),
confirms: ConfirmsBuffer::new(),
}
}

Expand All @@ -133,181 +135,35 @@ impl IdReceiver {
time: Instant,
id: PackageId,
buf: &mut [u8],
) -> Result<Option<PendingIterator>, ReceivedIdError> {
) -> Result<Option<PendingDrain>, ReceivedIdError> {
let result = self.received.process(id, buf);
if let Ok(_) | Err(ReceivedIdError::Duplicate) = result {
warn!("AA: {id:?}");
// Push to the buffer even duplicate packages, because the reason
// behind the re-delivery might be loss of the confirmation
// datagram.
self.buffer.push(time, id);
}
result
}
}

impl Connection for IdReceiver {
fn pending(&self) -> bool {
!self.buffer.is_empty()
}
}

struct Received {
highest_id: Option<PackageId>,
holes: BTreeSet<PackageId>,
pending: Pending,
}

impl Received {
fn new() -> Self {
Self {
highest_id: None,
holes: BTreeSet::new(),
pending: Pending::new(),
}
}

/// Registers package as received and returns delivery order continuity in
/// respect with earlier sent packages.
// TODO update docs
fn process(
&mut self,
id: PackageId,
buf: &mut [u8],
) -> Result<Option<PendingIterator>, ReceivedIdError> {
let range_start = match self.highest_id {
Some(highest) => match highest.cmp(&id) {
Ordering::Less => highest.incremented(),
Ordering::Greater => {
if self.holes.remove(&id) {
return Ok(match self.holes.first() {
Some(first) => {
if first.cmp(&id).is_ge() {
Some(PendingIterator::new(*first, &mut self.pending, buf))
} else {
None
}
}
None => Some(PendingIterator::new(
highest.incremented(),
&mut self.pending,
buf,
)),
});
} else {
warn!("D: {:?}", id);
return Err(ReceivedIdError::Duplicate);
}
}
Ordering::Equal => {
warn!("E: {:?}", id);
return Err(ReceivedIdError::Duplicate);
}
},
None => PackageId::zero(),
};

let range = PackageIdRange::range(range_start, id);
let skipped = range.size_hint().1.unwrap() + self.holes.len();
if skipped > MAX_SKIPPED {
return Err(ReceivedIdError::TooManySkipped(skipped));
self.confirms.push(time, id);
}

self.highest_id = Some(id);
for hole in range {
self.holes.insert(hole);
}

Ok(if skipped == 0 {
Some(PendingIterator::new(
id.incremented(),
&mut self.pending,
buf,
))
} else {
None
result.map(|continuity| match continuity {
IdContinuity::Continuous(bound) => Some(self.pending.drain(bound, buf)),
IdContinuity::Sparse => None,
})
}
}

#[derive(Error, Debug, PartialEq, Eq)]
pub(crate) enum ReceivedIdError {
#[error("Duplicate package")]
Duplicate,
#[error("Too many packages skipped: {0}")]
TooManySkipped(usize),
}

// TODO docs
// TODO rename
pub(crate) struct PendingIterator<'a, 'b> {
bound: PackageId,
pending: &'a mut Pending,
buf: &'b mut [u8],
}

impl<'a, 'b> PendingIterator<'a, 'b> {
// TODO docs
fn new(bound: PackageId, pending: &'a mut Pending, buf: &'b mut [u8]) -> Self {
Self {
bound,
pending,
buf,
}
}
}

impl<'a, 'b> Iterator for PendingIterator<'a, 'b> {
type Item = usize;

fn next(&mut self) -> Option<Self::Item> {
self.pending.pop_first(self.bound, self.buf)
}
}

struct Pending {
ids: BTreeSet<PackageId>,
buf: DataBuf,
}

impl Pending {
fn new() -> Self {
Self {
ids: BTreeSet::new(),
buf: DataBuf::new(),
}
}

fn store(&mut self, id: PackageId, data: &[u8]) {
if self.ids.insert(id) {
self.buf.push(id, data);
}
}

// TODO docs (incl panics)
fn pop_first(&mut self, bound: PackageId, buf: &mut [u8]) -> Option<usize> {
match self.ids.first().copied() {
Some(first) => {
if first.cmp(&bound).is_ge() {
assert_eq!(first, self.ids.pop_first().unwrap());
self.buf.pop(first, buf)
} else {
None
}
}
None => None,
}
impl Connection for ReceiveHandler {
fn pending(&self) -> bool {
!self.confirms.is_empty()
}
}

/// Buffer with datagram confirmations.
struct Buffer {
struct ConfirmsBuffer {
oldest: Instant,
buffer: Vec<u8>,
flushed: usize,
}

impl Buffer {
impl ConfirmsBuffer {
fn new() -> Self {
Self {
oldest: Instant::now(),
Expand Down Expand Up @@ -365,59 +221,10 @@ impl Buffer {
mod tests {
use super::*;

#[test]
fn test_received() {
let mut received = Received::new();

// TODO
// assert_eq!(
// received.process(PackageId::from_bytes(&[0, 0, 2])),
// Ok(IdContinuity::Sparse)
// );
// assert_eq!(
// received.process(PackageId::from_bytes(&[0, 0, 1])),
// Ok(IdContinuity::Sparse)
// );
// assert_eq!(
// received.process(PackageId::from_bytes(&[0, 0, 1])),
// Err(ReceivedIdError::Duplicate)
// );
// assert_eq!(
// received.process(PackageId::from_bytes(&[0, 0, 0])),
// Ok(IdContinuity::Continuous(PackageId::from_bytes(&[0, 0, 1])))
// );

// assert_eq!(
// received.process(PackageId::from_bytes(&[0, 0, 5])),
// Ok(IdContinuity::Sparse)
// );
// assert_eq!(
// received.process(PackageId::from_bytes(&[0, 0, 3])),
// Ok(IdContinuity::Continuous(PackageId::from_bytes(&[0, 0, 4])))
// );
// assert_eq!(
// received.process(PackageId::from_bytes(&[0, 0, 5])),
// Err(ReceivedIdError::Duplicate)
// );
// assert_eq!(
// received.process(PackageId::from_bytes(&[0, 0, 6])),
// Ok(IdContinuity::Sparse)
// );
// assert_eq!(
// received.process(PackageId::from_bytes(&[0, 0, 3])),
// Err(ReceivedIdError::Duplicate)
// );

// assert_eq!(
// received.process(PackageId::from_bytes(&[50, 0, 6])),
// Err(ReceivedIdError::TooManySkipped(3276800))
// );
}

#[test]
fn test_buffer() {
let now = Instant::now();
let mut buf = Buffer::new();
let mut buf = ConfirmsBuffer::new();

assert!(buf.flush(13).is_none());
assert!(buf.expiration().is_none());
Expand Down
2 changes: 2 additions & 0 deletions crates/net/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ pub(crate) use resend::Resends;
mod book;
mod confirms;
mod databuf;
mod pending;
mod received;
mod resend;
66 changes: 66 additions & 0 deletions crates/net/src/connection/pending.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::collections::BTreeSet;

use super::databuf::DataBuf;
use crate::header::PackageId;

/// Buffer for packages delivered out-of-order and thus awaiting the right
/// moment to be delivered.
pub(super) struct Pending {
ids: BTreeSet<PackageId>,
buf: DataBuf,
}

impl Pending {
pub(super) fn new() -> Self {
Self {
ids: BTreeSet::new(),
buf: DataBuf::new(),
}
}

pub(super) fn store(&mut self, id: PackageId, data: &[u8]) {
if self.ids.insert(id) {
self.buf.push(id, data);
}
}

// TODO docs
// TODO test
pub(super) fn drain(&mut self, bound: PackageId, buf: &mut [u8]) -> PendingDrain {
PendingDrain {
bound,
pending: self,
buf,
}
}

// TODO docs (incl panics)
fn pop_first(&mut self, bound: PackageId, buf: &mut [u8]) -> Option<usize> {
match self.ids.first().copied() {
Some(first) => {
if first.cmp(&bound).is_lt() {
assert_eq!(first, self.ids.pop_first().unwrap());
self.buf.pop(first, buf)
} else {
None
}
}
None => None,
}
}
}

// TODO docs
pub(crate) struct PendingDrain<'a, 'b> {
bound: PackageId,
pending: &'a mut Pending,
buf: &'b mut [u8],
}

impl<'a, 'b> Iterator for PendingDrain<'a, 'b> {
type Item = usize;

fn next(&mut self) -> Option<Self::Item> {
self.pending.pop_first(self.bound, self.buf)
}
}
Loading

0 comments on commit cdecb39

Please sign in to comment.