Skip to content

Commit

Permalink
revert to RecvError, Errors section, example desc
Browse files Browse the repository at this point in the history
  • Loading branch information
jsdw committed Feb 29, 2024
1 parent 4d44bcd commit 54d080f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 23 deletions.
37 changes: 16 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,22 +1303,28 @@ impl<T: Clone> Receiver<T> {
/// [`Receiver::recv_direct()`], and can be useful for building stream implementations which
/// use a [`Receiver`] under the hood and want to know if the stream has overflowed.
///
/// If the number of messages that have been sent has overflowed the channel capacity, an
/// [`OverflowError`] is returned containing the number of items that overflowed and were lost.
///
/// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible.
///
/// # Errors
///
/// If the number of messages that have been sent has overflowed the channel capacity, a
/// [`RecvError::Overflowed`] variant is returned containing the number of items that
/// overflowed and were lost.
///
/// # Examples
///
/// This example shows how the [`Receiver::poll_recv`] method can be used to allow a custom
/// stream implementation to internally make use of a [`Receiver`].
///
/// ```
/// use futures_core::Stream;
/// use async_broadcast::{Receiver, OverflowError};
/// use async_broadcast::{Receiver, RecvError};
/// use std::{pin::Pin, task::{Poll, Context}};
///
/// struct MyStream(Receiver<i32>);
///
/// impl futures_core::Stream for MyStream {
/// type Item = Result<i32, OverflowError>;
/// type Item = Result<i32, RecvError>;
/// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
/// Pin::new(&mut self.0).poll_recv(cx)
/// }
Expand All @@ -1327,7 +1333,7 @@ impl<T: Clone> Receiver<T> {
pub fn poll_recv(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<T, OverflowError>>> {
) -> Poll<Option<Result<T, RecvError>>> {
loop {
// If this stream is listening for events, first wait for a notification.
if let Some(listener) = self.listener.as_mut() {
Expand All @@ -1351,7 +1357,7 @@ impl<T: Clone> Receiver<T> {
Err(TryRecvError::Overflowed(n)) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(Err(OverflowError(n))));
return Poll::Ready(Some(Err(RecvError::Overflowed(n))));
}
Err(TryRecvError::Empty) => {}
}
Expand Down Expand Up @@ -1441,8 +1447,9 @@ impl<T: Clone> Stream for Receiver<T> {
match ready!(self.as_mut().poll_recv(cx)) {
Some(Ok(val)) => return Poll::Ready(Some(val)),
// If overflowed, we expect future operations to succeed so try again.
Some(Err(OverflowError(_))) => continue,
None => return Poll::Ready(None),
Some(Err(RecvError::Overflowed(_))) => continue,
// RecvError::Closed should never appear here, but handle it anyway.
None | Some(Err(RecvError::Closed)) => return Poll::Ready(None),
}
}
}
Expand Down Expand Up @@ -1578,18 +1585,6 @@ impl fmt::Display for RecvError {
}
}

/// An error returned from [`Receiver::poll_recv()`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub struct OverflowError(pub u64);

impl error::Error for OverflowError {}

impl fmt::Display for OverflowError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "receiving skipped {} messages", self.0)
}
}

/// An error returned from [`Receiver::try_recv()`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TryRecvError {
Expand Down
4 changes: 2 additions & 2 deletions tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ fn poll_recv() {
// A quick custom stream impl to demonstrate/test `poll_recv`.
struct MyStream(Receiver<i32>);
impl futures_core::Stream for MyStream {
type Item = Result<i32, OverflowError>;
type Item = Result<i32, RecvError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
Expand All @@ -314,7 +314,7 @@ fn poll_recv() {
s.broadcast(3).await.unwrap();
s.broadcast(4).await.unwrap();

assert_eq!(stream.next().await.unwrap(), Err(OverflowError(2)));
assert_eq!(stream.next().await.unwrap(), Err(RecvError::Overflowed(2)));
assert_eq!(stream.next().await.unwrap(), Ok(3));
assert_eq!(stream.next().await.unwrap(), Ok(4));

Expand Down

0 comments on commit 54d080f

Please sign in to comment.