From 395d01dbb41078de354e309ddb73304569003f44 Mon Sep 17 00:00:00 2001 From: datdenkikniet Date: Mon, 24 Oct 2022 19:43:00 +0200 Subject: [PATCH 1/2] Add an async implementation of `Queue` --- Cargo.toml | 5 ++ src/async_impl/mod.rs | 7 ++ src/async_impl/spsc/consumer.rs | 129 +++++++++++++++++++++++++++++++ src/async_impl/spsc/mod.rs | 97 +++++++++++++++++++++++ src/async_impl/spsc/producer.rs | 133 ++++++++++++++++++++++++++++++++ src/async_impl/ssq.rs | 89 +++++++++++++++++++++ src/lib.rs | 3 + 7 files changed, 463 insertions(+) create mode 100644 src/async_impl/mod.rs create mode 100644 src/async_impl/spsc/consumer.rs create mode 100644 src/async_impl/spsc/mod.rs create mode 100644 src/async_impl/spsc/producer.rs create mode 100644 src/async_impl/ssq.rs diff --git a/Cargo.toml b/Cargo.toml index 1292800e13..857ac6a755 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,11 @@ optional = true [dev-dependencies.ufmt] version = "0.1" +[dev-dependencies.tokio] +version = "1" +default-features = false +features = [ "macros", "rt", "time" ] + [dependencies.defmt] version = ">=0.2.0,<0.4" optional = true diff --git a/src/async_impl/mod.rs b/src/async_impl/mod.rs new file mode 100644 index 0000000000..85567c2192 --- /dev/null +++ b/src/async_impl/mod.rs @@ -0,0 +1,7 @@ +//! This module contains an async variant of [`Queue`] +//! +//! [`Queue`]: crate::spsc::Queue + +mod ssq; + +pub mod spsc; diff --git a/src/async_impl/spsc/consumer.rs b/src/async_impl/spsc/consumer.rs new file mode 100644 index 0000000000..3bca79bf76 --- /dev/null +++ b/src/async_impl/spsc/consumer.rs @@ -0,0 +1,129 @@ +use core::{ + future::Future, + task::{Poll, Waker}, +}; + +use crate::{ + async_impl::ssq::{WakerConsumer, WakerProducer}, + spsc::Consumer as HConsumer, +}; + +/// An async consumer +pub struct Consumer<'queue, T, const N: usize> +where + T: Unpin, +{ + inner: HConsumer<'queue, T, N>, + producer_waker: WakerConsumer<'queue>, + consumer_waker: WakerProducer<'queue>, +} + +impl<'queue, T, const N: usize> Consumer<'queue, T, N> +where + T: Unpin, +{ + pub(crate) fn new( + consumer: HConsumer<'queue, T, N>, + producer_waker: WakerConsumer<'queue>, + consumer_waker: WakerProducer<'queue>, + ) -> Self { + Self { + inner: consumer, + producer_waker, + consumer_waker, + } + } + + /// Check if there are any items to dequeue. + /// + /// When this returns true, at least the first subsequent [`Self::dequeue`] will succeed immediately + pub fn ready(&self) -> bool { + self.inner.ready() + } + + /// Returns the maximum number of elements the queue can hold + pub fn capacity(&self) -> usize { + self.inner.capacity() + } + + /// Returns the amount of elements currently in the queue + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Dequeue an item from the backing queue. + /// + /// The returned future only resolves once an item was succesfully + /// dequeued. + pub fn dequeue<'me>(&'me mut self) -> ConsumerFuture<'me, 'queue, T, N> { + ConsumerFuture { + consumer: self, + dequeued_value: None, + } + } + + /// Attempt to dequeue an item from the backing queue. + pub fn try_dequeue(&mut self) -> Option { + self.try_wake_producer(); + + self.inner.dequeue() + } + + /// Try to wake the [`Producer`](super::Producer) associated with the backing queue if + /// it is waiting to be awoken. + fn try_wake_producer(&mut self) { + self.producer_waker.dequeue().map(|w| w.wake()); + } + + /// Register `waker` as the waker for this [`Consumer`] + fn register_waker<'v>(&mut self, waker: Waker) { + // We can safely overwrite the old waker, as we can only ever have 1 instance + // of `self` waiting to be awoken. + self.consumer_waker.enqueue(waker); + } +} + +pub struct ConsumerFuture<'consumer, 'queue, T, const N: usize> +where + T: Unpin, +{ + consumer: &'consumer mut Consumer<'queue, T, N>, + dequeued_value: Option, +} + +impl Future for ConsumerFuture<'_, '_, T, N> +where + T: Unpin, +{ + type Output = T; + + fn poll( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> Poll { + let try_wake_producer = |me: &mut Self, value| { + me.consumer.try_wake_producer(); + return Poll::Ready(value); + }; + + let me = self.get_mut(); + let con = &mut me.consumer; + + if let Some(value) = me.dequeued_value.take() { + // Try to wake the producer because we managed to + // dequeue a value + return try_wake_producer(me, value); + } + + me.dequeued_value = con.inner.dequeue(); + if let Some(value) = me.dequeued_value.take() { + // Try to wake the producer because we managed to + // dequeue a value + try_wake_producer(me, value) + } else { + me.consumer.register_waker(cx.waker().clone()); + + Poll::Pending + } + } +} diff --git a/src/async_impl/spsc/mod.rs b/src/async_impl/spsc/mod.rs new file mode 100644 index 0000000000..48791aba88 --- /dev/null +++ b/src/async_impl/spsc/mod.rs @@ -0,0 +1,97 @@ +//! An async wrapper around [`Queue`] +use crate::spsc::Queue as HQueue; + +mod producer; +pub use producer::Producer; + +mod consumer; +pub use consumer::Consumer; + +use super::ssq::WakerQueue; + +/// An async queue +pub struct Queue +where + T: Unpin, +{ + inner: HQueue, + producer_waker: WakerQueue, + consumer_waker: WakerQueue, +} + +impl Queue +where + T: Unpin, +{ + /// Create a new Queue + pub const fn new() -> Self { + Self { + inner: HQueue::new(), + producer_waker: WakerQueue::new(), + consumer_waker: WakerQueue::new(), + } + } + + /// Split the queue into a producer and consumer + pub fn split(&mut self) -> (Producer<'_, T, N>, Consumer<'_, T, N>) { + let ((cwp, cwc), (pwp, pwc)) = (self.consumer_waker.split(), self.producer_waker.split()); + + let (producer, consumer) = self.inner.split(); + ( + Producer::new(producer, pwc, cwp), + Consumer::new(consumer, pwp, cwc), + ) + } +} + +#[cfg(test)] +mod test { + use std; + use std::boxed::Box; + use std::println; + use std::time::Duration; + use std::vec::Vec; + + use super::Queue; + + #[tokio::test] + async fn spsc() { + let queue: &'static mut Queue = Box::leak(Box::new(Queue::new())); + + let (mut tx, mut rx) = queue.split(); + const MAX: u32 = 100; + let mut data = Vec::new(); + for i in 0..=MAX { + data.push(i); + } + + let t1_data = data.clone(); + let t1 = tokio::task::spawn(async move { + println!("Dequeueing..."); + let mut rx_data = Vec::new(); + loop { + let value = rx.dequeue().await; + println!("Succesfully dequeued {}", value); + rx_data.push(value); + if value == MAX { + break; + } + } + assert_eq!(t1_data, rx_data); + }); + + let t2 = tokio::task::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_millis(1)); + println!("Enqueing..."); + for i in data { + tx.enqueue(i).await; + interval.tick().await; + println!("Succesfully enqueued {}", i); + } + }); + + let (t1, t2) = tokio::join!(t1, t2); + t1.unwrap(); + t2.unwrap(); + } +} diff --git a/src/async_impl/spsc/producer.rs b/src/async_impl/spsc/producer.rs new file mode 100644 index 0000000000..397ddedea1 --- /dev/null +++ b/src/async_impl/spsc/producer.rs @@ -0,0 +1,133 @@ +use core::{ + future::Future, + task::{Poll, Waker}, +}; + +use crate::{ + async_impl::ssq::{WakerConsumer, WakerProducer}, + spsc::Producer as HProducer, +}; + +/// An async producer +pub struct Producer<'queue, T, const N: usize> +where + T: Unpin, +{ + inner: HProducer<'queue, T, N>, + producer_waker: WakerProducer<'queue>, + consumer_waker: WakerConsumer<'queue>, +} + +impl<'queue, T, const N: usize> Producer<'queue, T, N> +where + T: Unpin, +{ + pub(crate) fn new( + producer: HProducer<'queue, T, N>, + producer_waker: WakerProducer<'queue>, + consumer_waker: WakerConsumer<'queue>, + ) -> Self { + Self { + inner: producer, + producer_waker, + consumer_waker, + } + } + + /// Check if an item can be enqueued. + /// + /// If this returns true, at least the first subsequent [`Self::enqueue`] will succeed + /// immediately. + pub fn ready(&self) -> bool { + self.inner.ready() + } + + /// Returns the maximum number of elements the queue can hold. + pub fn capacity(&self) -> usize { + self.inner.capacity() + } + + /// Returns the amount of elements currently in the queue. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Enqueue `value` into the backing queue. + /// + /// The returned Future only resolves once the value was + /// succesfully enqueued. + pub fn enqueue<'me>(&'me mut self, value: T) -> ProducerFuture<'me, 'queue, T, N> { + let value = self.inner.enqueue(value).err(); + ProducerFuture { + producer: self, + value_to_enqueue: value, + } + } + + /// Try to enqueue `value` into the backing queue. + pub fn try_enqueue(&mut self, value: T) -> Result<(), T> { + self.inner.enqueue(value) + } + + /// Try to wake the [`Consumer`](super::Consumer) associated with the backing queue if + /// it is waiting to be awoken. + fn wake_consumer(&mut self) { + self.consumer_waker.dequeue().map(|v| v.wake()); + } + + /// Register `waker` as the waker for this [`Producer`] + fn register_waker<'v>(&mut self, waker: Waker) { + // We can safely overwrite the old waker, as we can only ever have 1 instance + // of `self` waiting to be awoken. + self.producer_waker.enqueue(waker); + } +} + +pub struct ProducerFuture<'producer, 'queue, T, const N: usize> +where + T: Unpin, +{ + producer: &'producer mut Producer<'queue, T, N>, + value_to_enqueue: Option, +} + +impl Future for ProducerFuture<'_, '_, T, N> +where + T: Unpin, +{ + type Output = (); + + fn poll( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> Poll { + let try_wake_consumer = |me: &mut Self| { + me.producer.wake_consumer(); + Poll::Ready(()) + }; + + let me = self.get_mut(); + let prod = &mut me.producer; + let val_to_enqueue = &mut me.value_to_enqueue; + + let value = if let Some(value) = val_to_enqueue.take() { + value + } else { + // Try to wake the consumer because we've enqueued our value + return try_wake_consumer(me); + }; + + let failed_enqueue_value = if let Some(value) = prod.inner.enqueue(value).err() { + value + } else { + // Try to wake the consumer because we've enqueued our value + return try_wake_consumer(me); + }; + + me.value_to_enqueue = Some(failed_enqueue_value); + + me.producer.register_waker(cx.waker().clone()); + + Poll::Pending + } +} diff --git a/src/async_impl/ssq.rs b/src/async_impl/ssq.rs new file mode 100644 index 0000000000..7cdfc2b192 --- /dev/null +++ b/src/async_impl/ssq.rs @@ -0,0 +1,89 @@ +//! A Single Slot Queue + +#[cfg(cas_atomic_polyfill)] +use atomic_polyfill::{AtomicBool, Ordering}; +#[cfg(not(cas_atomic_polyfill))] +use core::sync::atomic::{AtomicBool, Ordering}; + +use core::{cell::UnsafeCell, mem::MaybeUninit, ptr}; + +pub type WakerQueue = SingleSlotQueue; +pub type WakerProducer<'a> = Producer<'a, core::task::Waker>; +pub type WakerConsumer<'a> = Consumer<'a, core::task::Waker>; + +/// Single slot queue. +pub struct SingleSlotQueue { + full: AtomicBool, + val: UnsafeCell>, +} + +impl SingleSlotQueue { + /// Create a new Single Slot Queue + pub const fn new() -> Self { + SingleSlotQueue { + full: AtomicBool::new(false), + val: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + /// Split this Single Slot Queue into a Consumer and Producer + pub fn split<'a>(&'a mut self) -> (Consumer<'a, T>, Producer<'a, T>) { + (Consumer { ssq: self }, Producer { ssq: self }) + } +} + +impl Drop for SingleSlotQueue { + fn drop(&mut self) { + if self.full.load(Ordering::Relaxed) { + unsafe { + ptr::drop_in_place(self.val.get() as *mut T); + } + } + } +} + +/// Read handle to a single slot queue. +pub struct Consumer<'a, T> { + ssq: &'a SingleSlotQueue, +} + +impl<'a, T> Consumer<'a, T> { + /// Try reading a value from the queue. + #[inline] + pub fn dequeue(&mut self) -> Option { + if self.ssq.full.load(Ordering::Acquire) { + let r = Some(unsafe { ptr::read(self.ssq.val.get().cast()) }); + self.ssq.full.store(false, Ordering::Release); + r + } else { + None + } + } +} + +/// Safety: We gurarantee the safety using an `AtomicBool` to gate the read of the `UnsafeCell`. +unsafe impl<'a, T> Send for Consumer<'a, T> {} + +/// Write handle to a single slot queue. +pub struct Producer<'a, T> { + ssq: &'a SingleSlotQueue, +} + +impl<'a, T> Producer<'a, T> { + /// Write a value into the queue. If there is a value already in the queue this will + /// return the value given to this method. + #[inline] + pub fn enqueue(&mut self, val: T) -> Option { + if !self.ssq.full.load(Ordering::Acquire) { + unsafe { ptr::write(self.ssq.val.get().cast(), val) }; + self.ssq.full.store(true, Ordering::Release); + None + } else { + Some(val) + } + } +} + +/// Safety: We gurarantee the safety using an `AtomicBool` to gate the write of the +/// `UnsafeCell`. +unsafe impl<'a, T> Send for Producer<'a, T> {} diff --git a/src/lib.rs b/src/lib.rs index 79d208c212..055ae06bd2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -100,6 +100,9 @@ mod linear_map; mod string; mod vec; +#[cfg(all(has_cas, feature = "cas"))] +pub mod async_impl; + #[cfg(feature = "serde")] mod de; #[cfg(feature = "serde")] From 4f27afc0531b9b5253c93ad37e1c98aa3c82a2e8 Mon Sep 17 00:00:00 2001 From: datdenkikniet Date: Mon, 24 Oct 2022 21:21:00 +0200 Subject: [PATCH 2/2] Failing to register the waker should cause an immediate re-wake instead of being ignored --- src/async_impl/spsc/consumer.rs | 12 +++++++----- src/async_impl/spsc/producer.rs | 11 +++++++---- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/async_impl/spsc/consumer.rs b/src/async_impl/spsc/consumer.rs index 3bca79bf76..f0a9348015 100644 --- a/src/async_impl/spsc/consumer.rs +++ b/src/async_impl/spsc/consumer.rs @@ -76,10 +76,8 @@ where } /// Register `waker` as the waker for this [`Consumer`] - fn register_waker<'v>(&mut self, waker: Waker) { - // We can safely overwrite the old waker, as we can only ever have 1 instance - // of `self` waiting to be awoken. - self.consumer_waker.enqueue(waker); + fn register_waker<'v>(&mut self, waker: Waker) -> bool { + self.consumer_waker.enqueue(waker).is_none() } } @@ -121,7 +119,11 @@ where // dequeue a value try_wake_producer(me, value) } else { - me.consumer.register_waker(cx.waker().clone()); + if !me.consumer.register_waker(cx.waker().clone()) { + // We failed to register the waker for some reason, + // wake immediately. + cx.waker().wake_by_ref(); + } Poll::Pending } diff --git a/src/async_impl/spsc/producer.rs b/src/async_impl/spsc/producer.rs index 397ddedea1..dab61a54e6 100644 --- a/src/async_impl/spsc/producer.rs +++ b/src/async_impl/spsc/producer.rs @@ -76,10 +76,10 @@ where } /// Register `waker` as the waker for this [`Producer`] - fn register_waker<'v>(&mut self, waker: Waker) { + fn register_waker<'v>(&mut self, waker: Waker) -> bool { // We can safely overwrite the old waker, as we can only ever have 1 instance // of `self` waiting to be awoken. - self.producer_waker.enqueue(waker); + self.producer_waker.enqueue(waker).is_none() } } @@ -126,8 +126,11 @@ where me.value_to_enqueue = Some(failed_enqueue_value); - me.producer.register_waker(cx.waker().clone()); - + if !me.producer.register_waker(cx.waker().clone()) { + // We failed to enqueue the waker for some reason, + // re-wake immediately. + cx.waker().wake_by_ref(); + } Poll::Pending } }