diff --git a/Cargo.toml b/Cargo.toml index 939f52a..5507ef1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ exclude = ["/.*"] [dependencies] concurrent-queue = { version = "2", default-features = false } -event-listener = { version = "4.0.0", default-features = false } +event-listener = { version = "4.0.1", default-features = false } event-listener-strategy = { version = "0.4.0", default-features = false } futures-core = { version = "0.3.5", default-features = false } pin-project-lite = "0.2.11" @@ -31,3 +31,7 @@ wasm-bindgen-test = "0.3.37" [features] default = ["std"] std = ["concurrent-queue/std", "event-listener/std", "event-listener-strategy/std"] + +[patch.crates-io] +event-listener = { git = "https://github.com/smol-rs/event-listener.git", branch = "notgull/break" } +event-listener-strategy = { git = "https://github.com/smol-rs/event-listener-strategy.git", branch = "notgull/evl5" } diff --git a/src/lib.rs b/src/lib.rs index 59045ba..9259ac9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -132,7 +132,7 @@ pub fn bounded(cap: usize) -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { - listener: EventListener::new(), + listener: None, channel, }; (s, r) @@ -172,7 +172,7 @@ pub fn unbounded() -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { - listener: EventListener::new(), + listener: None, channel, }; (s, r) @@ -247,7 +247,7 @@ impl Sender { Send::_new(SendInner { sender: self, msg: Some(msg), - listener: EventListener::new(), + listener: None, }) } @@ -477,32 +477,27 @@ impl Clone for Sender { } } -pin_project_lite::pin_project! { - /// The receiving side of a channel. - /// - /// Receivers can be cloned and shared among threads. When all receivers associated with a channel - /// are dropped, the channel becomes closed. - /// - /// The channel can also be closed manually by calling [`Receiver::close()`]. - /// - /// Receivers implement the [`Stream`] trait. - pub struct Receiver { - // Inner channel state. - channel: Arc>, - - // Listens for a send or close event to unblock this stream. - #[pin] - listener: EventListener, - } +/// The receiving side of a channel. +/// +/// Receivers can be cloned and shared among threads. When all receivers associated with a channel +/// are dropped, the channel becomes closed. +/// +/// The channel can also be closed manually by calling [`Receiver::close()`]. +/// +/// Receivers implement the [`Stream`] trait. +pub struct Receiver { + // Inner channel state. + channel: Arc>, - impl PinnedDrop for Receiver { - fn drop(this: Pin<&mut Self>) { - let this = this.project(); + // Listens for a send or close event to unblock this stream. + listener: Option, +} - // Decrement the receiver count and close the channel if it drops down to zero. - if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { - this.channel.close(); - } +impl Drop for Receiver { + fn drop(&mut self) { + // Decrement the receiver count and close the channel if it drops down to zero. + if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { + self.channel.close(); } } } @@ -567,7 +562,7 @@ impl Receiver { pub fn recv(&self) -> Recv<'_, T> { Recv::_new(RecvInner { receiver: self, - listener: EventListener::new(), + listener: None, }) } @@ -787,7 +782,7 @@ impl Clone for Receiver { Receiver { channel: self.channel.clone(), - listener: EventListener::new(), + listener: None, } } } @@ -798,11 +793,9 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { // If this stream is listening for events, first wait for a notification. - { - let this = self.as_mut().project(); - if this.listener.is_listening() { - ready!(this.listener.poll(cx)); - } + if let Some(listener) = self.listener.as_mut() { + ready!(Pin::new(listener).poll(cx)); + self.listener = None; } loop { @@ -810,26 +803,23 @@ impl Stream for Receiver { match self.try_recv() { Ok(msg) => { // The stream is not blocked on an event - drop the listener. - let mut this = self.project(); - this.listener.as_mut().set(EventListener::new()); + self.listener = None; return Poll::Ready(Some(msg)); } Err(TryRecvError::Closed) => { // The stream is not blocked on an event - drop the listener. - let mut this = self.project(); - this.listener.as_mut().set(EventListener::new()); + self.listener = None; return Poll::Ready(None); } Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - let mut this = self.as_mut().project(); - if this.listener.is_listening() { + if self.listener.is_some() { // Go back to the outer loop to wait for a notification. break; } else { - this.listener.as_mut().listen(&this.channel.stream_ops); + self.listener = Some(self.channel.stream_ops.listen()); } } } @@ -914,7 +904,7 @@ impl WeakReceiver { } Ok(_) => Some(Receiver { channel: self.channel.clone(), - listener: EventListener::new(), + listener: None, }), } } @@ -1084,42 +1074,39 @@ easy_wrapper! { pub(crate) wait(); } -pin_project_lite::pin_project! { - #[derive(Debug)] - struct SendInner<'a, T> { - sender: &'a Sender, - msg: Option, - #[pin] - listener: EventListener, - } +#[derive(Debug)] +struct SendInner<'a, T> { + sender: &'a Sender, + msg: Option, + listener: Option, } +impl<'a, T> Unpin for SendInner<'a, T> {} + impl<'a, T> EventListenerFuture for SendInner<'a, T> { type Output = Result<(), SendError>; /// Run this future with the given `Strategy`. fn poll_with_strategy<'x, S: Strategy<'x>>( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, strategy: &mut S, context: &mut S::Context, ) -> Poll>> { - let mut this = self.project(); - loop { - let msg = this.msg.take().unwrap(); + let msg = self.msg.take().unwrap(); // Attempt to send a message. - match this.sender.try_send(msg) { + match self.sender.try_send(msg) { Ok(()) => return Poll::Ready(Ok(())), Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), - Err(TrySendError::Full(m)) => *this.msg = Some(m), + Err(TrySendError::Full(m)) => self.msg = Some(m), } // Sending failed - now start listening for notifications or wait for one. - if this.listener.is_listening() { + if self.listener.is_some() { // Poll using the given strategy - ready!(S::poll(strategy, this.listener.as_mut(), context)); + ready!(S::poll(strategy, &mut self.listener, context)); } else { - this.listener.as_mut().listen(&this.sender.channel.send_ops); + self.listener = Some(self.sender.channel.send_ops.listen()); } } } @@ -1134,42 +1121,37 @@ easy_wrapper! { pub(crate) wait(); } -pin_project_lite::pin_project! { - #[derive(Debug)] - struct RecvInner<'a, T> { - receiver: &'a Receiver, - #[pin] - listener: EventListener, - } +#[derive(Debug)] +struct RecvInner<'a, T> { + receiver: &'a Receiver, + listener: Option, } +impl<'a, T> Unpin for RecvInner<'a, T> {} + impl<'a, T> EventListenerFuture for RecvInner<'a, T> { type Output = Result; /// Run this future with the given `Strategy`. fn poll_with_strategy<'x, S: Strategy<'x>>( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, strategy: &mut S, cx: &mut S::Context, ) -> Poll> { - let mut this = self.project(); - loop { // Attempt to receive a message. - match this.receiver.try_recv() { + match self.receiver.try_recv() { Ok(msg) => return Poll::Ready(Ok(msg)), Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)), Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - if this.listener.is_listening() { + if self.listener.is_some() { // Poll using the given strategy - ready!(S::poll(strategy, this.listener.as_mut(), cx)); + ready!(S::poll(strategy, &mut self.listener, cx)); } else { - this.listener - .as_mut() - .listen(&this.receiver.channel.recv_ops); + self.listener = Some(self.receiver.channel.recv_ops.listen()); } } }