Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add poll_recv method to Receiver #56

Merged
merged 6 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 89 additions & 37 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
//!
#![forbid(unsafe_code)]
#![deny(missing_debug_implementations, nonstandard_style, rust_2018_idioms)]
#![warn(rustdoc::missing_doc_code_examples, unreachable_pub)]

Check warning on line 96 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, nightly)

unknown lint: `rustdoc::missing_doc_code_examples`

Check warning on line 96 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, beta)

unknown lint: `rustdoc::missing_doc_code_examples`

Check warning on line 96 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable)

unknown lint: `rustdoc::missing_doc_code_examples`
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
Expand Down Expand Up @@ -1298,6 +1298,89 @@
listener: None,
}
}

/// A low level poll method that is similar to [`Receiver::recv()`] or
/// [`Receiver::recv_direct()`], and can be useful for building stream implementations which
/// use a [`Receiver`] under the hood and want to know if the stream has overflowed.
///
/// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible.
///
/// # Errors
///
/// If the number of messages that have been sent has overflowed the channel capacity, a
/// [`RecvError::Overflowed`] variant is returned containing the number of items that
/// overflowed and were lost.
///
/// # Examples
zeenix marked this conversation as resolved.
Show resolved Hide resolved
///
/// This example shows how the [`Receiver::poll_recv`] method can be used to allow a custom
/// stream implementation to internally make use of a [`Receiver`]. This example implementation
/// differs from the stream implementation of [`Receiver`] because it returns an error if
/// the channel capacity overflows, which the built in [`Receiver`] stream doesn't do.
///
/// ```
/// use futures_core::Stream;
/// use async_broadcast::{Receiver, RecvError};
/// use std::{pin::Pin, task::{Poll, Context}};
///
/// struct MyStream(Receiver<i32>);
///
/// impl futures_core::Stream for MyStream {
/// type Item = Result<i32, RecvError>;
/// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
/// Pin::new(&mut self.0).poll_recv(cx)
/// }
/// }
/// ```
pub fn poll_recv(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<T, RecvError>>> {
zeenix marked this conversation as resolved.
Show resolved Hide resolved
loop {
// If this stream is listening for events, first wait for a notification.
if let Some(listener) = self.listener.as_mut() {
ready!(Pin::new(listener).poll(cx));
self.listener = None;
}

loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(Ok(msg)));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Overflowed(n)) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(Err(RecvError::Overflowed(n))));
}
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Start listening and then try receiving again.
self.listener = {
let inner = self.inner.write().unwrap();
Some(inner.recv_ops.listen())
};
}
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
}
}
}
}
}

impl<T> Drop for Receiver<T> {
Expand Down Expand Up @@ -1363,43 +1446,12 @@

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// If this stream is listening for events, first wait for a notification.
if let Some(listener) = self.listener.as_mut() {
ready!(Pin::new(listener).poll(cx));
self.listener = None;
}

loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Overflowed(_)) => continue,
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Start listening and then try receiving again.
self.listener = {
let inner = self.inner.write().unwrap();
Some(inner.recv_ops.listen())
};
}
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
}
match ready!(self.as_mut().poll_recv(cx)) {
Some(Ok(val)) => return Poll::Ready(Some(val)),
// If overflowed, we expect future operations to succeed so try again.
Some(Err(RecvError::Overflowed(_))) => continue,
zeenix marked this conversation as resolved.
Show resolved Hide resolved
// RecvError::Closed should never appear here, but handle it anyway.
None | Some(Err(RecvError::Closed)) => return Poll::Ready(None),
}
}
}
Expand Down
35 changes: 35 additions & 0 deletions tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,38 @@ fn inactive_drop() {

assert!(s.is_closed())
}

#[test]
fn poll_recv() {
let (s, mut r) = broadcast::<i32>(2);
r.set_overflow(true);

// A quick custom stream impl to demonstrate/test `poll_recv`.
struct MyStream(Receiver<i32>);
impl futures_core::Stream for MyStream {
type Item = Result<i32, RecvError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.0).poll_recv(cx)
}
}

block_on(async move {
let mut stream = MyStream(r);

s.broadcast(1).await.unwrap();
s.broadcast(2).await.unwrap();
s.broadcast(3).await.unwrap();
s.broadcast(4).await.unwrap();

assert_eq!(stream.next().await.unwrap(), Err(RecvError::Overflowed(2)));
assert_eq!(stream.next().await.unwrap(), Ok(3));
assert_eq!(stream.next().await.unwrap(), Ok(4));

drop(s);

assert_eq!(stream.next().await, None);
})
}
Loading