diff --git a/crates/net/src/connection/confirms.rs b/crates/net/src/connection/confirms.rs index 6666e317..7f072c1d 100644 --- a/crates/net/src/connection/confirms.rs +++ b/crates/net/src/connection/confirms.rs @@ -15,6 +15,8 @@ use tracing::warn; use super::{ book::{Connection, ConnectionBook}, databuf::DataBuf, + pending::{Pending, PendingDrain}, + received::{IdContinuity, Received, ReceivedIdError}, }; use crate::{ header::{DatagramHeader, PackageId, PackageIdRange}, @@ -27,9 +29,8 @@ use crate::{ const MAX_BUFF_SIZE: usize = 96; /// The buffer is flushed after the oldest part is older than this. const MAX_BUFF_AGE: Duration = Duration::from_millis(100); -// This must be less than `u32::MAX / 2` due to ID ordering circularity issues. -const MAX_SKIPPED: usize = 1024; +// TODO rename #[derive(Clone)] pub(crate) struct Confirmations { book: Arc>>, @@ -113,16 +114,18 @@ impl Confirmations { } // TODO rename -struct IdReceiver { +struct ReceiveHandler { received: Received, - buffer: Buffer, + pending: Pending, + confirms: ConfirmsBuffer, } -impl IdReceiver { +impl ReceiveHandler { fn new() -> Self { Self { received: Received::new(), - buffer: Buffer::new(), + pending: Pending::new(), + confirms: ConfirmsBuffer::new(), } } @@ -133,181 +136,35 @@ impl IdReceiver { time: Instant, id: PackageId, buf: &mut [u8], - ) -> Result, ReceivedIdError> { + ) -> Result, ReceivedIdError> { let result = self.received.process(id, buf); if let Ok(_) | Err(ReceivedIdError::Duplicate) = result { - warn!("AA: {id:?}"); // Push to the buffer even duplicate packages, because the reason // behind the re-delivery might be loss of the confirmation // datagram. - self.buffer.push(time, id); - } - result - } -} - -impl Connection for IdReceiver { - fn pending(&self) -> bool { - !self.buffer.is_empty() - } -} - -struct Received { - highest_id: Option, - holes: BTreeSet, - pending: Pending, -} - -impl Received { - fn new() -> Self { - Self { - highest_id: None, - holes: BTreeSet::new(), - pending: Pending::new(), + self.confirms.push(time, id); } - } - - /// Registers package as received and returns delivery order continuity in - /// respect with earlier sent packages. - // TODO update docs - fn process( - &mut self, - id: PackageId, - buf: &mut [u8], - ) -> Result, ReceivedIdError> { - let range_start = match self.highest_id { - Some(highest) => match highest.cmp(&id) { - Ordering::Less => highest.incremented(), - Ordering::Greater => { - if self.holes.remove(&id) { - return Ok(match self.holes.first() { - Some(first) => { - if first.cmp(&id).is_ge() { - Some(PendingIterator::new(*first, &mut self.pending, buf)) - } else { - None - } - } - None => Some(PendingIterator::new( - highest.incremented(), - &mut self.pending, - buf, - )), - }); - } else { - warn!("D: {:?}", id); - return Err(ReceivedIdError::Duplicate); - } - } - Ordering::Equal => { - warn!("E: {:?}", id); - return Err(ReceivedIdError::Duplicate); - } - }, - None => PackageId::zero(), - }; - - let range = PackageIdRange::range(range_start, id); - let skipped = range.size_hint().1.unwrap() + self.holes.len(); - if skipped > MAX_SKIPPED { - return Err(ReceivedIdError::TooManySkipped(skipped)); - } - - self.highest_id = Some(id); - for hole in range { - self.holes.insert(hole); - } - - Ok(if skipped == 0 { - Some(PendingIterator::new( - id.incremented(), - &mut self.pending, - buf, - )) - } else { - None + result.map(|continuity| match continuity { + IdContinuity::Continuous(bound) => Some(self.pending.drain(bound, buf)), + IdContinuity::Sparse => None, }) } } -#[derive(Error, Debug, PartialEq, Eq)] -pub(crate) enum ReceivedIdError { - #[error("Duplicate package")] - Duplicate, - #[error("Too many packages skipped: {0}")] - TooManySkipped(usize), -} - -// TODO docs -// TODO rename -pub(crate) struct PendingIterator<'a, 'b> { - bound: PackageId, - pending: &'a mut Pending, - buf: &'b mut [u8], -} - -impl<'a, 'b> PendingIterator<'a, 'b> { - // TODO docs - fn new(bound: PackageId, pending: &'a mut Pending, buf: &'b mut [u8]) -> Self { - Self { - bound, - pending, - buf, - } - } -} - -impl<'a, 'b> Iterator for PendingIterator<'a, 'b> { - type Item = usize; - - fn next(&mut self) -> Option { - self.pending.pop_first(self.bound, self.buf) - } -} - -struct Pending { - ids: BTreeSet, - buf: DataBuf, -} - -impl Pending { - fn new() -> Self { - Self { - ids: BTreeSet::new(), - buf: DataBuf::new(), - } - } - - fn store(&mut self, id: PackageId, data: &[u8]) { - if self.ids.insert(id) { - self.buf.push(id, data); - } - } - - // TODO docs (incl panics) - fn pop_first(&mut self, bound: PackageId, buf: &mut [u8]) -> Option { - match self.ids.first().copied() { - Some(first) => { - if first.cmp(&bound).is_ge() { - assert_eq!(first, self.ids.pop_first().unwrap()); - self.buf.pop(first, buf) - } else { - None - } - } - None => None, - } +impl Connection for ReceiveHandler { + fn pending(&self) -> bool { + !self.confirms.is_empty() } } /// Buffer with datagram confirmations. -struct Buffer { +struct ConfirmsBuffer { oldest: Instant, buffer: Vec, flushed: usize, } -impl Buffer { +impl ConfirmsBuffer { fn new() -> Self { Self { oldest: Instant::now(), @@ -365,59 +222,10 @@ impl Buffer { mod tests { use super::*; - #[test] - fn test_received() { - let mut received = Received::new(); - - // TODO - // assert_eq!( - // received.process(PackageId::from_bytes(&[0, 0, 2])), - // Ok(IdContinuity::Sparse) - // ); - // assert_eq!( - // received.process(PackageId::from_bytes(&[0, 0, 1])), - // Ok(IdContinuity::Sparse) - // ); - // assert_eq!( - // received.process(PackageId::from_bytes(&[0, 0, 1])), - // Err(ReceivedIdError::Duplicate) - // ); - // assert_eq!( - // received.process(PackageId::from_bytes(&[0, 0, 0])), - // Ok(IdContinuity::Continuous(PackageId::from_bytes(&[0, 0, 1]))) - // ); - - // assert_eq!( - // received.process(PackageId::from_bytes(&[0, 0, 5])), - // Ok(IdContinuity::Sparse) - // ); - // assert_eq!( - // received.process(PackageId::from_bytes(&[0, 0, 3])), - // Ok(IdContinuity::Continuous(PackageId::from_bytes(&[0, 0, 4]))) - // ); - // assert_eq!( - // received.process(PackageId::from_bytes(&[0, 0, 5])), - // Err(ReceivedIdError::Duplicate) - // ); - // assert_eq!( - // received.process(PackageId::from_bytes(&[0, 0, 6])), - // Ok(IdContinuity::Sparse) - // ); - // assert_eq!( - // received.process(PackageId::from_bytes(&[0, 0, 3])), - // Err(ReceivedIdError::Duplicate) - // ); - - // assert_eq!( - // received.process(PackageId::from_bytes(&[50, 0, 6])), - // Err(ReceivedIdError::TooManySkipped(3276800)) - // ); - } - #[test] fn test_buffer() { let now = Instant::now(); - let mut buf = Buffer::new(); + let mut buf = ConfirmsBuffer::new(); assert!(buf.flush(13).is_none()); assert!(buf.expiration().is_none()); diff --git a/crates/net/src/connection/mod.rs b/crates/net/src/connection/mod.rs index f8ff1c82..7020ff0f 100644 --- a/crates/net/src/connection/mod.rs +++ b/crates/net/src/connection/mod.rs @@ -4,4 +4,6 @@ pub(crate) use resend::Resends; mod book; mod confirms; mod databuf; +mod pending; +mod received; mod resend; diff --git a/crates/net/src/connection/pending.rs b/crates/net/src/connection/pending.rs new file mode 100644 index 00000000..265c566d --- /dev/null +++ b/crates/net/src/connection/pending.rs @@ -0,0 +1,66 @@ +use std::collections::BTreeSet; + +use super::databuf::DataBuf; +use crate::header::PackageId; + +/// Buffer for packages delivered out-of-order and thus awaiting the right +/// moment to be delivered. +pub(super) struct Pending { + ids: BTreeSet, + buf: DataBuf, +} + +impl Pending { + pub(super) fn new() -> Self { + Self { + ids: BTreeSet::new(), + buf: DataBuf::new(), + } + } + + pub(super) fn store(&mut self, id: PackageId, data: &[u8]) { + if self.ids.insert(id) { + self.buf.push(id, data); + } + } + + // TODO docs + // TODO test + pub(super) fn drain(&mut self, bound: PackageId, buf: &mut [u8]) -> PendingDrain { + PendingDrain { + bound, + pending: self, + buf, + } + } + + // TODO docs (incl panics) + fn pop_first(&mut self, bound: PackageId, buf: &mut [u8]) -> Option { + match self.ids.first().copied() { + Some(first) => { + if first.cmp(&bound).is_lt() { + assert_eq!(first, self.ids.pop_first().unwrap()); + self.buf.pop(first, buf) + } else { + None + } + } + None => None, + } + } +} + +// TODO docs +pub(crate) struct PendingDrain<'a, 'b> { + bound: PackageId, + pending: &'a mut Pending, + buf: &'b mut [u8], +} + +impl<'a, 'b> Iterator for PendingDrain<'a, 'b> { + type Item = usize; + + fn next(&mut self) -> Option { + self.pending.pop_first(self.bound, self.buf) + } +} diff --git a/crates/net/src/connection/received.rs b/crates/net/src/connection/received.rs new file mode 100644 index 00000000..51724ef3 --- /dev/null +++ b/crates/net/src/connection/received.rs @@ -0,0 +1,141 @@ +use std::{cmp::Ordering, collections::BTreeSet}; + +use thiserror::Error; + +use crate::header::{PackageId, PackageIdRange}; + +// This must be less than `u32::MAX / 2` due to ID ordering circularity issues. +const MAX_SKIPPED: usize = 1024; + +/// Database of already received packages. It servers for duplicate and +/// out-of-order delivery detection. +pub(super) struct Received { + highest_id: Option, + holes: BTreeSet, +} + +impl Received { + pub(super) fn new() -> Self { + Self { + highest_id: None, + holes: BTreeSet::new(), + } + } + + /// Registers package as received and returns delivery order continuity in + /// respect with earlier sent packages. + pub(super) fn process(&mut self, id: PackageId) -> Result { + let range_start = match self.highest_id { + Some(highest) => match highest.cmp(&id) { + Ordering::Less => highest.incremented(), + Ordering::Greater => { + if self.holes.remove(&id) { + return Ok(match self.holes.first() { + Some(first) => { + if first.cmp(&id).is_ge() { + IdContinuity::Continuous(*first) + } else { + IdContinuity::Sparse + } + } + None => IdContinuity::Continuous(highest.incremented()), + }); + } else { + return Err(ReceivedIdError::Duplicate); + } + } + Ordering::Equal => { + return Err(ReceivedIdError::Duplicate); + } + }, + None => PackageId::zero(), + }; + + let range = PackageIdRange::range(range_start, id); + let skipped = range.size_hint().1.unwrap() + self.holes.len(); + if skipped > MAX_SKIPPED { + return Err(ReceivedIdError::TooManySkipped(skipped)); + } + + self.highest_id = Some(id); + for hole in range { + self.holes.insert(hole); + } + + Ok(if skipped == 0 { + IdContinuity::Continuous(id.incremented()) + } else { + IdContinuity::Sparse + }) + } +} + +#[derive(Debug, PartialEq, Eq)] +pub(super) enum IdContinuity { + /// Some of the earlier sent packages has not yet been delivered. + Sparse, + /// All packages up to the given ID (exclusive) has been delivered. Just + /// delivered package is included in that range. + Continuous(PackageId), +} + +#[derive(Error, Debug, PartialEq, Eq)] +pub(crate) enum ReceivedIdError { + #[error("Duplicate package")] + Duplicate, + #[error("Too many packages skipped: {0}")] + TooManySkipped(usize), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_received() { + let mut received = Received::new(); + + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 2])), + Ok(IdContinuity::Sparse) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 1])), + Ok(IdContinuity::Sparse) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 1])), + Err(ReceivedIdError::Duplicate) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 0])), + Ok(IdContinuity::Continuous(PackageId::from_bytes(&[0, 0, 1]))) + ); + + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 5])), + Ok(IdContinuity::Sparse) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 3])), + Ok(IdContinuity::Continuous(PackageId::from_bytes(&[0, 0, 4]))) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 5])), + Err(ReceivedIdError::Duplicate) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 6])), + Ok(IdContinuity::Sparse) + ); + assert_eq!( + received.process(PackageId::from_bytes(&[0, 0, 3])), + Err(ReceivedIdError::Duplicate) + ); + + assert_eq!( + received.process(PackageId::from_bytes(&[50, 0, 6])), + Err(ReceivedIdError::TooManySkipped(3276800)) + ); + } +}