From 54d080f09c58f902155197cf9417ceae46bcf6fe Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 29 Feb 2024 18:08:27 +0000 Subject: [PATCH] revert to RecvError, Errors section, example desc --- src/lib.rs | 37 ++++++++++++++++--------------------- tests/test.rs | 4 ++-- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ef273eb..a62079d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1303,22 +1303,28 @@ impl Receiver { /// [`Receiver::recv_direct()`], and can be useful for building stream implementations which /// use a [`Receiver`] under the hood and want to know if the stream has overflowed. /// - /// If the number of messages that have been sent has overflowed the channel capacity, an - /// [`OverflowError`] is returned containing the number of items that overflowed and were lost. - /// /// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible. /// + /// # Errors + /// + /// If the number of messages that have been sent has overflowed the channel capacity, a + /// [`RecvError::Overflowed`] variant is returned containing the number of items that + /// overflowed and were lost. + /// /// # Examples /// + /// This example shows how the [`Receiver::poll_recv`] method can be used to allow a custom + /// stream implementation to internally make use of a [`Receiver`]. + /// /// ``` /// use futures_core::Stream; - /// use async_broadcast::{Receiver, OverflowError}; + /// use async_broadcast::{Receiver, RecvError}; /// use std::{pin::Pin, task::{Poll, Context}}; /// /// struct MyStream(Receiver); /// /// impl futures_core::Stream for MyStream { - /// type Item = Result; + /// type Item = Result; /// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { /// Pin::new(&mut self.0).poll_recv(cx) /// } @@ -1327,7 +1333,7 @@ impl Receiver { pub fn poll_recv( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { loop { // If this stream is listening for events, first wait for a notification. if let Some(listener) = self.listener.as_mut() { @@ -1351,7 +1357,7 @@ impl Receiver { Err(TryRecvError::Overflowed(n)) => { // The stream is not blocked on an event - drop the listener. self.listener = None; - return Poll::Ready(Some(Err(OverflowError(n)))); + return Poll::Ready(Some(Err(RecvError::Overflowed(n)))); } Err(TryRecvError::Empty) => {} } @@ -1441,8 +1447,9 @@ impl Stream for Receiver { match ready!(self.as_mut().poll_recv(cx)) { Some(Ok(val)) => return Poll::Ready(Some(val)), // If overflowed, we expect future operations to succeed so try again. - Some(Err(OverflowError(_))) => continue, - None => return Poll::Ready(None), + Some(Err(RecvError::Overflowed(_))) => continue, + // RecvError::Closed should never appear here, but handle it anyway. + None | Some(Err(RecvError::Closed)) => return Poll::Ready(None), } } } @@ -1578,18 +1585,6 @@ impl fmt::Display for RecvError { } } -/// An error returned from [`Receiver::poll_recv()`]. -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub struct OverflowError(pub u64); - -impl error::Error for OverflowError {} - -impl fmt::Display for OverflowError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "receiving skipped {} messages", self.0) - } -} - /// An error returned from [`Receiver::try_recv()`]. #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub enum TryRecvError { diff --git a/tests/test.rs b/tests/test.rs index ed8daa4..79718cb 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -297,7 +297,7 @@ fn poll_recv() { // A quick custom stream impl to demonstrate/test `poll_recv`. struct MyStream(Receiver); impl futures_core::Stream for MyStream { - type Item = Result; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -314,7 +314,7 @@ fn poll_recv() { s.broadcast(3).await.unwrap(); s.broadcast(4).await.unwrap(); - assert_eq!(stream.next().await.unwrap(), Err(OverflowError(2))); + assert_eq!(stream.next().await.unwrap(), Err(RecvError::Overflowed(2))); assert_eq!(stream.next().await.unwrap(), Ok(3)); assert_eq!(stream.next().await.unwrap(), Ok(4));