Skip to content

Commit

Permalink
Add poll_recv method to Receiver (#56)
Browse files Browse the repository at this point in the history
This PR Adds a `poll_recv()` method to the `Receiver` type. It returns
the same `Result<T,RecvError>` type that the `receiver.recv()` future
returns (hence the name).

This method can be used when defining custom streams that internally
make use of `async_broadcast` and want to know about whether the
`async_broadcast` stream has overflowed or not.
  • Loading branch information
jsdw authored Mar 4, 2024
1 parent 1803434 commit 74d5889
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 37 deletions.
126 changes: 89 additions & 37 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,89 @@ impl<T: Clone> Receiver<T> {
listener: None,
}
}

/// A low level poll method that is similar to [`Receiver::recv()`] or
/// [`Receiver::recv_direct()`], and can be useful for building stream implementations which
/// use a [`Receiver`] under the hood and want to know if the stream has overflowed.
///
/// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible.
///
/// # Errors
///
/// If the number of messages that have been sent has overflowed the channel capacity, a
/// [`RecvError::Overflowed`] variant is returned containing the number of items that
/// overflowed and were lost.
///
/// # Examples
///
/// This example shows how the [`Receiver::poll_recv`] method can be used to allow a custom
/// stream implementation to internally make use of a [`Receiver`]. This example implementation
/// differs from the stream implementation of [`Receiver`] because it returns an error if
/// the channel capacity overflows, which the built in [`Receiver`] stream doesn't do.
///
/// ```
/// use futures_core::Stream;
/// use async_broadcast::{Receiver, RecvError};
/// use std::{pin::Pin, task::{Poll, Context}};
///
/// struct MyStream(Receiver<i32>);
///
/// impl futures_core::Stream for MyStream {
/// type Item = Result<i32, RecvError>;
/// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
/// Pin::new(&mut self.0).poll_recv(cx)
/// }
/// }
/// ```
pub fn poll_recv(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<T, RecvError>>> {
loop {
// If this stream is listening for events, first wait for a notification.
if let Some(listener) = self.listener.as_mut() {
ready!(Pin::new(listener).poll(cx));
self.listener = None;
}

loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(Ok(msg)));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Overflowed(n)) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(Err(RecvError::Overflowed(n))));
}
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Start listening and then try receiving again.
self.listener = {
let inner = self.inner.write().unwrap();
Some(inner.recv_ops.listen())
};
}
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
}
}
}
}
}

impl<T> Drop for Receiver<T> {
Expand Down Expand Up @@ -1363,43 +1446,12 @@ impl<T: Clone> Stream for Receiver<T> {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// If this stream is listening for events, first wait for a notification.
if let Some(listener) = self.listener.as_mut() {
ready!(Pin::new(listener).poll(cx));
self.listener = None;
}

loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Overflowed(_)) => continue,
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Start listening and then try receiving again.
self.listener = {
let inner = self.inner.write().unwrap();
Some(inner.recv_ops.listen())
};
}
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
}
match ready!(self.as_mut().poll_recv(cx)) {
Some(Ok(val)) => return Poll::Ready(Some(val)),
// If overflowed, we expect future operations to succeed so try again.
Some(Err(RecvError::Overflowed(_))) => continue,
// RecvError::Closed should never appear here, but handle it anyway.
None | Some(Err(RecvError::Closed)) => return Poll::Ready(None),
}
}
}
Expand Down
35 changes: 35 additions & 0 deletions tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,38 @@ fn inactive_drop() {

assert!(s.is_closed())
}

#[test]
fn poll_recv() {
let (s, mut r) = broadcast::<i32>(2);
r.set_overflow(true);

// A quick custom stream impl to demonstrate/test `poll_recv`.
struct MyStream(Receiver<i32>);
impl futures_core::Stream for MyStream {
type Item = Result<i32, RecvError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.0).poll_recv(cx)
}
}

block_on(async move {
let mut stream = MyStream(r);

s.broadcast(1).await.unwrap();
s.broadcast(2).await.unwrap();
s.broadcast(3).await.unwrap();
s.broadcast(4).await.unwrap();

assert_eq!(stream.next().await.unwrap(), Err(RecvError::Overflowed(2)));
assert_eq!(stream.next().await.unwrap(), Ok(3));
assert_eq!(stream.next().await.unwrap(), Ok(4));

drop(s);

assert_eq!(stream.next().await, None);
})
}

0 comments on commit 74d5889

Please sign in to comment.