Skip to content

Commit

Permalink
Port to event-listener v5.0.0
Browse files Browse the repository at this point in the history
Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed Dec 25, 2023
1 parent e243d55 commit e5ac3bf
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 76 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ exclude = ["/.*"]

[dependencies]
concurrent-queue = { version = "2", default-features = false }
event-listener = { version = "4.0.0", default-features = false }
event-listener = { version = "4.0.1", default-features = false }
event-listener-strategy = { version = "0.4.0", default-features = false }
futures-core = { version = "0.3.5", default-features = false }
pin-project-lite = "0.2.11"
Expand All @@ -31,3 +31,7 @@ wasm-bindgen-test = "0.3.37"
[features]
default = ["std"]
std = ["concurrent-queue/std", "event-listener/std", "event-listener-strategy/std"]

[patch.crates-io]
event-listener = { git = "https://github.com/smol-rs/event-listener.git", branch = "notgull/break" }
event-listener-strategy = { git = "https://github.com/smol-rs/event-listener-strategy.git", branch = "notgull/evl5" }
132 changes: 57 additions & 75 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(),
listener: None,
channel,
};
(s, r)
Expand Down Expand Up @@ -172,7 +172,7 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(),
listener: None,
channel,
};
(s, r)
Expand Down Expand Up @@ -247,7 +247,7 @@ impl<T> Sender<T> {
Send::_new(SendInner {
sender: self,
msg: Some(msg),
listener: EventListener::new(),
listener: None,
})
}

Expand Down Expand Up @@ -477,32 +477,27 @@ impl<T> Clone for Sender<T> {
}
}

pin_project_lite::pin_project! {
/// The receiving side of a channel.
///
/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
/// are dropped, the channel becomes closed.
///
/// The channel can also be closed manually by calling [`Receiver::close()`].
///
/// Receivers implement the [`Stream`] trait.
pub struct Receiver<T> {
// Inner channel state.
channel: Arc<Channel<T>>,

// Listens for a send or close event to unblock this stream.
#[pin]
listener: EventListener,
}
/// The receiving side of a channel.
///
/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
/// are dropped, the channel becomes closed.
///
/// The channel can also be closed manually by calling [`Receiver::close()`].
///
/// Receivers implement the [`Stream`] trait.
pub struct Receiver<T> {
// Inner channel state.
channel: Arc<Channel<T>>,

impl<T> PinnedDrop for Receiver<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
// Listens for a send or close event to unblock this stream.
listener: Option<EventListener>,
}

// Decrement the receiver count and close the channel if it drops down to zero.
if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
this.channel.close();
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// Decrement the receiver count and close the channel if it drops down to zero.
if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.channel.close();
}
}
}
Expand Down Expand Up @@ -567,7 +562,7 @@ impl<T> Receiver<T> {
pub fn recv(&self) -> Recv<'_, T> {
Recv::_new(RecvInner {
receiver: self,
listener: EventListener::new(),
listener: None,
})
}

Expand Down Expand Up @@ -787,7 +782,7 @@ impl<T> Clone for Receiver<T> {

Receiver {
channel: self.channel.clone(),
listener: EventListener::new(),
listener: None,
}
}
}
Expand All @@ -798,38 +793,33 @@ impl<T> Stream for Receiver<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// If this stream is listening for events, first wait for a notification.
{
let this = self.as_mut().project();
if this.listener.is_listening() {
ready!(this.listener.poll(cx));
}
if let Some(listener) = self.listener.as_mut() {
ready!(Pin::new(listener).poll(cx));
self.listener = None;
}

loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
let mut this = self.project();
this.listener.as_mut().set(EventListener::new());
self.listener = None;
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
let mut this = self.project();
this.listener.as_mut().set(EventListener::new());
self.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
let mut this = self.as_mut().project();
if this.listener.is_listening() {
if self.listener.is_some() {
// Go back to the outer loop to wait for a notification.
break;
} else {
this.listener.as_mut().listen(&this.channel.stream_ops);
self.listener = Some(self.channel.stream_ops.listen());
}
}
}
Expand Down Expand Up @@ -914,7 +904,7 @@ impl<T> WeakReceiver<T> {
}
Ok(_) => Some(Receiver {
channel: self.channel.clone(),
listener: EventListener::new(),
listener: None,
}),
}
}
Expand Down Expand Up @@ -1084,42 +1074,39 @@ easy_wrapper! {
pub(crate) wait();
}

pin_project_lite::pin_project! {
#[derive(Debug)]
struct SendInner<'a, T> {
sender: &'a Sender<T>,
msg: Option<T>,
#[pin]
listener: EventListener,
}
#[derive(Debug)]
struct SendInner<'a, T> {
sender: &'a Sender<T>,
msg: Option<T>,
listener: Option<EventListener>,
}

impl<'a, T> Unpin for SendInner<'a, T> {}

impl<'a, T> EventListenerFuture for SendInner<'a, T> {
type Output = Result<(), SendError<T>>;

/// Run this future with the given `Strategy`.
fn poll_with_strategy<'x, S: Strategy<'x>>(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Result<(), SendError<T>>> {
let mut this = self.project();

loop {
let msg = this.msg.take().unwrap();
let msg = self.msg.take().unwrap();
// Attempt to send a message.
match this.sender.try_send(msg) {
match self.sender.try_send(msg) {
Ok(()) => return Poll::Ready(Ok(())),
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => *this.msg = Some(m),
Err(TrySendError::Full(m)) => self.msg = Some(m),
}

// Sending failed - now start listening for notifications or wait for one.
if this.listener.is_listening() {
if self.listener.is_some() {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), context));
ready!(S::poll(strategy, &mut self.listener, context));
} else {
this.listener.as_mut().listen(&this.sender.channel.send_ops);
self.listener = Some(self.sender.channel.send_ops.listen());
}
}
}
Expand All @@ -1134,42 +1121,37 @@ easy_wrapper! {
pub(crate) wait();
}

pin_project_lite::pin_project! {
#[derive(Debug)]
struct RecvInner<'a, T> {
receiver: &'a Receiver<T>,
#[pin]
listener: EventListener,
}
#[derive(Debug)]
struct RecvInner<'a, T> {
receiver: &'a Receiver<T>,
listener: Option<EventListener>,
}

impl<'a, T> Unpin for RecvInner<'a, T> {}

impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
type Output = Result<T, RecvError>;

/// Run this future with the given `Strategy`.
fn poll_with_strategy<'x, S: Strategy<'x>>(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Result<T, RecvError>> {
let mut this = self.project();

loop {
// Attempt to receive a message.
match this.receiver.try_recv() {
match self.receiver.try_recv() {
Ok(msg) => return Poll::Ready(Ok(msg)),
Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
if this.listener.is_listening() {
if self.listener.is_some() {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), cx));
ready!(S::poll(strategy, &mut self.listener, cx));
} else {
this.listener
.as_mut()
.listen(&this.receiver.channel.recv_ops);
self.listener = Some(self.receiver.channel.recv_ops.listen());
}
}
}
Expand Down

0 comments on commit e5ac3bf

Please sign in to comment.