diff --git a/Cargo.toml b/Cargo.toml index b78d0a0..3602b60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,8 @@ exclude = ["/.*"] [dependencies] concurrent-queue = { version = "2", default-features = false } -event-listener = { version = "4.0.0", default-features = false } -event-listener-strategy = { version = "0.4.0", default-features = false } +event-listener = { version = "5.0.0", default-features = false } +event-listener-strategy = { version = "0.5.0", default-features = false } futures-core = { version = "0.3.5", default-features = false } pin-project-lite = "0.2.11" diff --git a/src/lib.rs b/src/lib.rs index 59045ba..9c9e3b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,7 @@ extern crate alloc; use core::fmt; use core::future::Future; +use core::marker::PhantomPinned; use core::pin::Pin; use core::sync::atomic::{AtomicUsize, Ordering}; use core::task::{Context, Poll}; @@ -52,6 +53,7 @@ use event_listener::{Event, EventListener}; use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy}; use futures_core::ready; use futures_core::stream::Stream; +use pin_project_lite::pin_project; struct Channel { /// Inner message queue. @@ -132,8 +134,9 @@ pub fn bounded(cap: usize) -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { - listener: EventListener::new(), + listener: None, channel, + _pin: PhantomPinned, }; (s, r) } @@ -172,8 +175,9 @@ pub fn unbounded() -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { - listener: EventListener::new(), + listener: None, channel, + _pin: PhantomPinned, }; (s, r) } @@ -247,7 +251,8 @@ impl Sender { Send::_new(SendInner { sender: self, msg: Some(msg), - listener: EventListener::new(), + listener: None, + _pin: PhantomPinned, }) } @@ -477,7 +482,7 @@ impl Clone for Sender { } } -pin_project_lite::pin_project! { +pin_project! { /// The receiving side of a channel. /// /// Receivers can be cloned and shared among threads. When all receivers associated with a channel @@ -491,8 +496,11 @@ pin_project_lite::pin_project! { channel: Arc>, // Listens for a send or close event to unblock this stream. + listener: Option, + + // Keeping this type `!Unpin` enables future optimizations. #[pin] - listener: EventListener, + _pin: PhantomPinned } impl PinnedDrop for Receiver { @@ -567,7 +575,8 @@ impl Receiver { pub fn recv(&self) -> Recv<'_, T> { Recv::_new(RecvInner { receiver: self, - listener: EventListener::new(), + listener: None, + _pin: PhantomPinned, }) } @@ -787,7 +796,8 @@ impl Clone for Receiver { Receiver { channel: self.channel.clone(), - listener: EventListener::new(), + listener: None, + _pin: PhantomPinned, } } } @@ -800,8 +810,9 @@ impl Stream for Receiver { // If this stream is listening for events, first wait for a notification. { let this = self.as_mut().project(); - if this.listener.is_listening() { - ready!(this.listener.poll(cx)); + if let Some(listener) = this.listener.as_mut() { + ready!(Pin::new(listener).poll(cx)); + *this.listener = None; } } @@ -810,26 +821,26 @@ impl Stream for Receiver { match self.try_recv() { Ok(msg) => { // The stream is not blocked on an event - drop the listener. - let mut this = self.project(); - this.listener.as_mut().set(EventListener::new()); + let this = self.as_mut().project(); + *this.listener = None; return Poll::Ready(Some(msg)); } Err(TryRecvError::Closed) => { // The stream is not blocked on an event - drop the listener. - let mut this = self.project(); - this.listener.as_mut().set(EventListener::new()); + let this = self.as_mut().project(); + *this.listener = None; return Poll::Ready(None); } Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - let mut this = self.as_mut().project(); - if this.listener.is_listening() { + let this = self.as_mut().project(); + if this.listener.is_some() { // Go back to the outer loop to wait for a notification. break; } else { - this.listener.as_mut().listen(&this.channel.stream_ops); + *this.listener = Some(this.channel.stream_ops.listen()); } } } @@ -914,7 +925,8 @@ impl WeakReceiver { } Ok(_) => Some(Receiver { channel: self.channel.clone(), - listener: EventListener::new(), + listener: None, + _pin: PhantomPinned, }), } } @@ -1084,13 +1096,22 @@ easy_wrapper! { pub(crate) wait(); } -pin_project_lite::pin_project! { +pin_project! { #[derive(Debug)] + #[project(!Unpin)] struct SendInner<'a, T> { + // Reference to the original sender. sender: &'a Sender, + + // The message to send. msg: Option, + + // Listener waiting on the channel. + listener: Option, + + // Keeping this type `!Unpin` enables future optimizations. #[pin] - listener: EventListener, + _pin: PhantomPinned } } @@ -1103,7 +1124,7 @@ impl<'a, T> EventListenerFuture for SendInner<'a, T> { strategy: &mut S, context: &mut S::Context, ) -> Poll>> { - let mut this = self.project(); + let this = self.project(); loop { let msg = this.msg.take().unwrap(); @@ -1115,11 +1136,11 @@ impl<'a, T> EventListenerFuture for SendInner<'a, T> { } // Sending failed - now start listening for notifications or wait for one. - if this.listener.is_listening() { + if this.listener.is_some() { // Poll using the given strategy - ready!(S::poll(strategy, this.listener.as_mut(), context)); + ready!(S::poll(strategy, &mut *this.listener, context)); } else { - this.listener.as_mut().listen(&this.sender.channel.send_ops); + *this.listener = Some(this.sender.channel.send_ops.listen()); } } } @@ -1134,12 +1155,19 @@ easy_wrapper! { pub(crate) wait(); } -pin_project_lite::pin_project! { +pin_project! { #[derive(Debug)] + #[project(!Unpin)] struct RecvInner<'a, T> { + // Reference to the receiver. receiver: &'a Receiver, + + // Listener waiting on the channel. + listener: Option, + + // Keeping this type `!Unpin` enables future optimizations. #[pin] - listener: EventListener, + _pin: PhantomPinned } } @@ -1152,7 +1180,7 @@ impl<'a, T> EventListenerFuture for RecvInner<'a, T> { strategy: &mut S, cx: &mut S::Context, ) -> Poll> { - let mut this = self.project(); + let this = self.project(); loop { // Attempt to receive a message. @@ -1163,13 +1191,11 @@ impl<'a, T> EventListenerFuture for RecvInner<'a, T> { } // Receiving failed - now start listening for notifications or wait for one. - if this.listener.is_listening() { + if this.listener.is_some() { // Poll using the given strategy - ready!(S::poll(strategy, this.listener.as_mut(), cx)); + ready!(S::poll(strategy, &mut *this.listener, cx)); } else { - this.listener - .as_mut() - .listen(&this.receiver.channel.recv_ops); + *this.listener = Some(this.receiver.channel.recv_ops.listen()); } } }