From 13d3c683758513f2af0d6b64a3a08ac256960712 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Mon, 25 Dec 2023 08:01:26 -0800 Subject: [PATCH 1/4] Port to event-listener v5.0.0 Signed-off-by: John Nunley --- Cargo.toml | 6 ++- src/lib.rs | 132 +++++++++++++++++++++++------------------------------ 2 files changed, 62 insertions(+), 76 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b78d0a0..a77d50b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ exclude = ["/.*"] [dependencies] concurrent-queue = { version = "2", default-features = false } -event-listener = { version = "4.0.0", default-features = false } +event-listener = { version = "4.0.1", default-features = false } event-listener-strategy = { version = "0.4.0", default-features = false } futures-core = { version = "0.3.5", default-features = false } pin-project-lite = "0.2.11" @@ -31,3 +31,7 @@ wasm-bindgen-test = "0.3.37" [features] default = ["std"] std = ["concurrent-queue/std", "event-listener/std", "event-listener-strategy/std"] + +[patch.crates-io] +event-listener = { git = "https://github.com/smol-rs/event-listener.git", branch = "notgull/break" } +event-listener-strategy = { git = "https://github.com/smol-rs/event-listener-strategy.git", branch = "notgull/evl5" } diff --git a/src/lib.rs b/src/lib.rs index 59045ba..9259ac9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -132,7 +132,7 @@ pub fn bounded(cap: usize) -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { - listener: EventListener::new(), + listener: None, channel, }; (s, r) @@ -172,7 +172,7 @@ pub fn unbounded() -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { - listener: EventListener::new(), + listener: None, channel, }; (s, r) @@ -247,7 +247,7 @@ impl Sender { Send::_new(SendInner { sender: self, msg: Some(msg), - listener: EventListener::new(), + listener: None, }) } @@ -477,32 +477,27 @@ impl Clone for Sender { } } -pin_project_lite::pin_project! { - /// The receiving side of a channel. - /// - /// Receivers can be cloned and shared among threads. When all receivers associated with a channel - /// are dropped, the channel becomes closed. - /// - /// The channel can also be closed manually by calling [`Receiver::close()`]. - /// - /// Receivers implement the [`Stream`] trait. - pub struct Receiver { - // Inner channel state. - channel: Arc>, - - // Listens for a send or close event to unblock this stream. - #[pin] - listener: EventListener, - } +/// The receiving side of a channel. +/// +/// Receivers can be cloned and shared among threads. When all receivers associated with a channel +/// are dropped, the channel becomes closed. +/// +/// The channel can also be closed manually by calling [`Receiver::close()`]. +/// +/// Receivers implement the [`Stream`] trait. +pub struct Receiver { + // Inner channel state. + channel: Arc>, - impl PinnedDrop for Receiver { - fn drop(this: Pin<&mut Self>) { - let this = this.project(); + // Listens for a send or close event to unblock this stream. + listener: Option, +} - // Decrement the receiver count and close the channel if it drops down to zero. - if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { - this.channel.close(); - } +impl Drop for Receiver { + fn drop(&mut self) { + // Decrement the receiver count and close the channel if it drops down to zero. + if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { + self.channel.close(); } } } @@ -567,7 +562,7 @@ impl Receiver { pub fn recv(&self) -> Recv<'_, T> { Recv::_new(RecvInner { receiver: self, - listener: EventListener::new(), + listener: None, }) } @@ -787,7 +782,7 @@ impl Clone for Receiver { Receiver { channel: self.channel.clone(), - listener: EventListener::new(), + listener: None, } } } @@ -798,11 +793,9 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { // If this stream is listening for events, first wait for a notification. - { - let this = self.as_mut().project(); - if this.listener.is_listening() { - ready!(this.listener.poll(cx)); - } + if let Some(listener) = self.listener.as_mut() { + ready!(Pin::new(listener).poll(cx)); + self.listener = None; } loop { @@ -810,26 +803,23 @@ impl Stream for Receiver { match self.try_recv() { Ok(msg) => { // The stream is not blocked on an event - drop the listener. - let mut this = self.project(); - this.listener.as_mut().set(EventListener::new()); + self.listener = None; return Poll::Ready(Some(msg)); } Err(TryRecvError::Closed) => { // The stream is not blocked on an event - drop the listener. - let mut this = self.project(); - this.listener.as_mut().set(EventListener::new()); + self.listener = None; return Poll::Ready(None); } Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - let mut this = self.as_mut().project(); - if this.listener.is_listening() { + if self.listener.is_some() { // Go back to the outer loop to wait for a notification. break; } else { - this.listener.as_mut().listen(&this.channel.stream_ops); + self.listener = Some(self.channel.stream_ops.listen()); } } } @@ -914,7 +904,7 @@ impl WeakReceiver { } Ok(_) => Some(Receiver { channel: self.channel.clone(), - listener: EventListener::new(), + listener: None, }), } } @@ -1084,42 +1074,39 @@ easy_wrapper! { pub(crate) wait(); } -pin_project_lite::pin_project! { - #[derive(Debug)] - struct SendInner<'a, T> { - sender: &'a Sender, - msg: Option, - #[pin] - listener: EventListener, - } +#[derive(Debug)] +struct SendInner<'a, T> { + sender: &'a Sender, + msg: Option, + listener: Option, } +impl<'a, T> Unpin for SendInner<'a, T> {} + impl<'a, T> EventListenerFuture for SendInner<'a, T> { type Output = Result<(), SendError>; /// Run this future with the given `Strategy`. fn poll_with_strategy<'x, S: Strategy<'x>>( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, strategy: &mut S, context: &mut S::Context, ) -> Poll>> { - let mut this = self.project(); - loop { - let msg = this.msg.take().unwrap(); + let msg = self.msg.take().unwrap(); // Attempt to send a message. - match this.sender.try_send(msg) { + match self.sender.try_send(msg) { Ok(()) => return Poll::Ready(Ok(())), Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), - Err(TrySendError::Full(m)) => *this.msg = Some(m), + Err(TrySendError::Full(m)) => self.msg = Some(m), } // Sending failed - now start listening for notifications or wait for one. - if this.listener.is_listening() { + if self.listener.is_some() { // Poll using the given strategy - ready!(S::poll(strategy, this.listener.as_mut(), context)); + ready!(S::poll(strategy, &mut self.listener, context)); } else { - this.listener.as_mut().listen(&this.sender.channel.send_ops); + self.listener = Some(self.sender.channel.send_ops.listen()); } } } @@ -1134,42 +1121,37 @@ easy_wrapper! { pub(crate) wait(); } -pin_project_lite::pin_project! { - #[derive(Debug)] - struct RecvInner<'a, T> { - receiver: &'a Receiver, - #[pin] - listener: EventListener, - } +#[derive(Debug)] +struct RecvInner<'a, T> { + receiver: &'a Receiver, + listener: Option, } +impl<'a, T> Unpin for RecvInner<'a, T> {} + impl<'a, T> EventListenerFuture for RecvInner<'a, T> { type Output = Result; /// Run this future with the given `Strategy`. fn poll_with_strategy<'x, S: Strategy<'x>>( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, strategy: &mut S, cx: &mut S::Context, ) -> Poll> { - let mut this = self.project(); - loop { // Attempt to receive a message. - match this.receiver.try_recv() { + match self.receiver.try_recv() { Ok(msg) => return Poll::Ready(Ok(msg)), Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)), Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - if this.listener.is_listening() { + if self.listener.is_some() { // Poll using the given strategy - ready!(S::poll(strategy, this.listener.as_mut(), cx)); + ready!(S::poll(strategy, &mut self.listener, cx)); } else { - this.listener - .as_mut() - .listen(&this.receiver.channel.recv_ops); + self.listener = Some(self.receiver.channel.recv_ops.listen()); } } } From 239f96a36db6ac85de1ccd7e3e4b4c6ee7d94f37 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Mon, 25 Dec 2023 08:22:48 -0800 Subject: [PATCH 2/4] Make futures !Unpin If we figure out a good way to do the optimization, it will enable us to do that in the future. Signed-off-by: John Nunley --- src/lib.rs | 146 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 95 insertions(+), 51 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9259ac9..8cd3b2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,7 @@ extern crate alloc; use core::fmt; use core::future::Future; +use core::marker::PhantomPinned; use core::pin::Pin; use core::sync::atomic::{AtomicUsize, Ordering}; use core::task::{Context, Poll}; @@ -52,6 +53,7 @@ use event_listener::{Event, EventListener}; use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy}; use futures_core::ready; use futures_core::stream::Stream; +use pin_project_lite::pin_project; struct Channel { /// Inner message queue. @@ -134,6 +136,7 @@ pub fn bounded(cap: usize) -> (Sender, Receiver) { let r = Receiver { listener: None, channel, + _pin: PhantomPinned }; (s, r) } @@ -174,6 +177,7 @@ pub fn unbounded() -> (Sender, Receiver) { let r = Receiver { listener: None, channel, + _pin: PhantomPinned }; (s, r) } @@ -248,6 +252,7 @@ impl Sender { sender: self, msg: Some(msg), listener: None, + _pin: PhantomPinned }) } @@ -477,27 +482,35 @@ impl Clone for Sender { } } -/// The receiving side of a channel. -/// -/// Receivers can be cloned and shared among threads. When all receivers associated with a channel -/// are dropped, the channel becomes closed. -/// -/// The channel can also be closed manually by calling [`Receiver::close()`]. -/// -/// Receivers implement the [`Stream`] trait. -pub struct Receiver { - // Inner channel state. - channel: Arc>, +pin_project! { + /// The receiving side of a channel. + /// + /// Receivers can be cloned and shared among threads. When all receivers associated with a channel + /// are dropped, the channel becomes closed. + /// + /// The channel can also be closed manually by calling [`Receiver::close()`]. + /// + /// Receivers implement the [`Stream`] trait. + pub struct Receiver { + // Inner channel state. + channel: Arc>, - // Listens for a send or close event to unblock this stream. - listener: Option, -} + // Listens for a send or close event to unblock this stream. + listener: Option, -impl Drop for Receiver { - fn drop(&mut self) { - // Decrement the receiver count and close the channel if it drops down to zero. - if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { - self.channel.close(); + // Keeping this type `!Unpin` enables future optimizations. + #[pin] + _pin: PhantomPinned + } + + impl PinnedDrop for Receiver { + fn drop(this: Pin<&mut Self>) { + let this = this.project(); + + // Decrement the receiver count and close the channel if it drops down to zero. + if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { + this.channel.close(); + } } } } @@ -563,6 +576,7 @@ impl Receiver { Recv::_new(RecvInner { receiver: self, listener: None, + _pin: PhantomPinned }) } @@ -783,6 +797,7 @@ impl Clone for Receiver { Receiver { channel: self.channel.clone(), listener: None, + _pin: PhantomPinned } } } @@ -793,9 +808,12 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { // If this stream is listening for events, first wait for a notification. - if let Some(listener) = self.listener.as_mut() { - ready!(Pin::new(listener).poll(cx)); - self.listener = None; + { + let this = self.as_mut().project(); + if let Some(listener) = this.listener.as_mut() { + ready!(Pin::new(listener).poll(cx)); + *this.listener = None; + } } loop { @@ -803,23 +821,26 @@ impl Stream for Receiver { match self.try_recv() { Ok(msg) => { // The stream is not blocked on an event - drop the listener. - self.listener = None; + let this = self.as_mut().project(); + *this.listener = None; return Poll::Ready(Some(msg)); } Err(TryRecvError::Closed) => { // The stream is not blocked on an event - drop the listener. - self.listener = None; + let this = self.as_mut().project(); + *this.listener = None; return Poll::Ready(None); } Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - if self.listener.is_some() { + let this = self.as_mut().project(); + if this.listener.is_some() { // Go back to the outer loop to wait for a notification. break; } else { - self.listener = Some(self.channel.stream_ops.listen()); + *this.listener = Some(this.channel.stream_ops.listen()); } } } @@ -905,6 +926,7 @@ impl WeakReceiver { Ok(_) => Some(Receiver { channel: self.channel.clone(), listener: None, + _pin: PhantomPinned }), } } @@ -1074,39 +1096,51 @@ easy_wrapper! { pub(crate) wait(); } -#[derive(Debug)] -struct SendInner<'a, T> { - sender: &'a Sender, - msg: Option, - listener: Option, -} +pin_project! { + #[derive(Debug)] + #[project(!Unpin)] + struct SendInner<'a, T> { + // Reference to the original sender. + sender: &'a Sender, + + // The message to send. + msg: Option, -impl<'a, T> Unpin for SendInner<'a, T> {} + // Listener waiting on the channel. + listener: Option, + + // Keeping this type `!Unpin` enables future optimizations. + #[pin] + _pin: PhantomPinned + } +} impl<'a, T> EventListenerFuture for SendInner<'a, T> { type Output = Result<(), SendError>; /// Run this future with the given `Strategy`. fn poll_with_strategy<'x, S: Strategy<'x>>( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, strategy: &mut S, context: &mut S::Context, ) -> Poll>> { + let this = self.project(); + loop { - let msg = self.msg.take().unwrap(); + let msg = this.msg.take().unwrap(); // Attempt to send a message. - match self.sender.try_send(msg) { + match this.sender.try_send(msg) { Ok(()) => return Poll::Ready(Ok(())), Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), - Err(TrySendError::Full(m)) => self.msg = Some(m), + Err(TrySendError::Full(m)) => *this.msg = Some(m), } // Sending failed - now start listening for notifications or wait for one. - if self.listener.is_some() { + if this.listener.is_some() { // Poll using the given strategy - ready!(S::poll(strategy, &mut self.listener, context)); + ready!(S::poll(strategy, &mut *this.listener, context)); } else { - self.listener = Some(self.sender.channel.send_ops.listen()); + *this.listener = Some(this.sender.channel.send_ops.listen()); } } } @@ -1121,37 +1155,47 @@ easy_wrapper! { pub(crate) wait(); } -#[derive(Debug)] -struct RecvInner<'a, T> { - receiver: &'a Receiver, - listener: Option, -} +pin_project! { + #[derive(Debug)] + #[project(!Unpin)] + struct RecvInner<'a, T> { + // Reference to the receiver. + receiver: &'a Receiver, -impl<'a, T> Unpin for RecvInner<'a, T> {} + // Listener waiting on the channel. + listener: Option, + + // Keeping this type `!Unpin` enables future optimizations. + #[pin] + _pin: PhantomPinned + } +} impl<'a, T> EventListenerFuture for RecvInner<'a, T> { type Output = Result; /// Run this future with the given `Strategy`. fn poll_with_strategy<'x, S: Strategy<'x>>( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, strategy: &mut S, cx: &mut S::Context, ) -> Poll> { + let this = self.project(); + loop { // Attempt to receive a message. - match self.receiver.try_recv() { + match this.receiver.try_recv() { Ok(msg) => return Poll::Ready(Ok(msg)), Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)), Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - if self.listener.is_some() { + if this.listener.is_some() { // Poll using the given strategy - ready!(S::poll(strategy, &mut self.listener, cx)); + ready!(S::poll(strategy, &mut *this.listener, cx)); } else { - self.listener = Some(self.receiver.channel.recv_ops.listen()); + *this.listener = Some(this.receiver.channel.recv_ops.listen()); } } } From b29fad1bd9647611be8dd33c2babcffec2c738aa Mon Sep 17 00:00:00 2001 From: John Nunley Date: Mon, 25 Dec 2023 08:53:59 -0800 Subject: [PATCH 3/4] rustfmt Signed-off-by: John Nunley --- src/lib.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8cd3b2e..9c9e3b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -136,7 +136,7 @@ pub fn bounded(cap: usize) -> (Sender, Receiver) { let r = Receiver { listener: None, channel, - _pin: PhantomPinned + _pin: PhantomPinned, }; (s, r) } @@ -177,7 +177,7 @@ pub fn unbounded() -> (Sender, Receiver) { let r = Receiver { listener: None, channel, - _pin: PhantomPinned + _pin: PhantomPinned, }; (s, r) } @@ -252,7 +252,7 @@ impl Sender { sender: self, msg: Some(msg), listener: None, - _pin: PhantomPinned + _pin: PhantomPinned, }) } @@ -576,7 +576,7 @@ impl Receiver { Recv::_new(RecvInner { receiver: self, listener: None, - _pin: PhantomPinned + _pin: PhantomPinned, }) } @@ -797,7 +797,7 @@ impl Clone for Receiver { Receiver { channel: self.channel.clone(), listener: None, - _pin: PhantomPinned + _pin: PhantomPinned, } } } @@ -926,7 +926,7 @@ impl WeakReceiver { Ok(_) => Some(Receiver { channel: self.channel.clone(), listener: None, - _pin: PhantomPinned + _pin: PhantomPinned, }), } } From 66784ef45cbc2e17be99a6145c82b00da344f277 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 6 Feb 2024 19:15:45 -0800 Subject: [PATCH 4/4] chore: Update to released packages Signed-off-by: John Nunley --- Cargo.toml | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a77d50b..3602b60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,8 @@ exclude = ["/.*"] [dependencies] concurrent-queue = { version = "2", default-features = false } -event-listener = { version = "4.0.1", default-features = false } -event-listener-strategy = { version = "0.4.0", default-features = false } +event-listener = { version = "5.0.0", default-features = false } +event-listener-strategy = { version = "0.5.0", default-features = false } futures-core = { version = "0.3.5", default-features = false } pin-project-lite = "0.2.11" @@ -31,7 +31,3 @@ wasm-bindgen-test = "0.3.37" [features] default = ["std"] std = ["concurrent-queue/std", "event-listener/std", "event-listener-strategy/std"] - -[patch.crates-io] -event-listener = { git = "https://github.com/smol-rs/event-listener.git", branch = "notgull/break" } -event-listener-strategy = { git = "https://github.com/smol-rs/event-listener-strategy.git", branch = "notgull/evl5" }