From e82dd85cf74e826805b0e147122e22732254f38d Mon Sep 17 00:00:00 2001 From: glendc Date: Thu, 14 Sep 2023 13:12:20 +0200 Subject: [PATCH 1/7] prepare for loom: declare and use internal sync mod --- CHANGELOG.md | 4 ++++ Cargo.toml | 2 +- src/guard.rs | 27 +++++++++------------------ src/lib.rs | 1 + src/shutdown.rs | 3 ++- src/sync.rs | 11 +++++++++++ src/trigger.rs | 7 ++++--- 7 files changed, 32 insertions(+), 23 deletions(-) create mode 100644 src/sync.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index e477ed3..e92f2fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +# 0.1.5 (13. September, 2023) + +- Support and use Loom for testing... (TODO); + # 0.1.4 (08. September, 2023) - Add example regarding ensuring you do catch exits and document it; diff --git a/Cargo.toml b/Cargo.toml index c71a608..b90b0a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ categories = ["asynchronous", "network-programming"] edition = "2021" name = "tokio-graceful" -version = "0.1.4" +version = "0.1.5" description = "util for graceful shutdown of tokio applications" homepage = "https://github.com/plabayo/tokio-graceful" readme = "README.md" diff --git a/src/guard.rs b/src/guard.rs index fa80acf..527015e 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -1,12 +1,11 @@ -use std::{ - future::Future, - mem::ManuallyDrop, - sync::{atomic::AtomicUsize, Arc}, -}; +use std::{future::Future, mem::ManuallyDrop}; use tokio::task::JoinHandle; -use crate::trigger::{Receiver, Sender}; +use crate::{ + sync::{Arc, AtomicUsize, Ordering}, + trigger::{Receiver, Sender}, +}; /// A guard, linked to a [`Shutdown`] struct, /// prevents the [`Shutdown::shutdown`] future from completing. @@ -35,7 +34,7 @@ pub struct WeakShutdownGuard { impl ShutdownGuard { pub(crate) fn new(trigger_rx: Receiver, zero_tx: Sender, ref_count: Arc) -> Self { - let value = ref_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let value = ref_count.fetch_add(1, Ordering::SeqCst); tracing::trace!("new shutdown guard: ref_count+1: {}", value + 1); Self(ManuallyDrop::new(WeakShutdownGuard::new( trigger_rx, zero_tx, ref_count, @@ -156,10 +155,7 @@ impl ShutdownGuard { impl Clone for ShutdownGuard { fn clone(&self) -> Self { - let value = &self - .0 - .ref_count - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let value = &self.0.ref_count.fetch_add(1, Ordering::SeqCst); tracing::trace!("clone shutdown guard: ref_count+1: {}", value + 1); Self(self.0.clone()) } @@ -167,9 +163,7 @@ impl Clone for ShutdownGuard { impl From for ShutdownGuard { fn from(weak_guard: WeakShutdownGuard) -> ShutdownGuard { - let value = weak_guard - .ref_count - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let value = weak_guard.ref_count.fetch_add(1, Ordering::SeqCst); tracing::trace!("from weak shutdown guard: ref_count+1: {}", value + 1); Self(ManuallyDrop::new(weak_guard)) } @@ -177,10 +171,7 @@ impl From for ShutdownGuard { impl Drop for ShutdownGuard { fn drop(&mut self) { - let cnt = self - .0 - .ref_count - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + let cnt = self.0.ref_count.fetch_sub(1, Ordering::SeqCst); tracing::trace!("drop shutdown guard: ref_count-1: {}", cnt - 1); if cnt == 1 { self.0.zero_tx.trigger(); diff --git a/src/lib.rs b/src/lib.rs index f439090..102c54a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,7 @@ pub use guard::{ShutdownGuard, WeakShutdownGuard}; mod shutdown; pub use shutdown::{default_signal, Shutdown}; +pub(crate) mod sync; pub(crate) mod trigger; #[doc = include_str!("../README.md")] diff --git a/src/shutdown.rs b/src/shutdown.rs index b9aee84..1770c24 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -1,6 +1,7 @@ -use std::{future::Future, sync::Arc, time}; +use std::{future::Future, time}; use crate::{ + sync::Arc, trigger::{trigger, Receiver}, ShutdownGuard, WeakShutdownGuard, }; diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 0000000..54a5ed6 --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,11 @@ +#[cfg(loom)] +pub use loom::sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, Ordering, +}; + +#[cfg(not(loom))] +pub use std::sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, +}; diff --git a/src/trigger.rs b/src/trigger.rs index 59dbb59..1f675eb 100644 --- a/src/trigger.rs +++ b/src/trigger.rs @@ -18,13 +18,14 @@ use std::{ future::Future, pin::Pin, - sync::{atomic::AtomicBool, Arc, Mutex}, task::{Context, Poll, Waker}, }; use pin_project_lite::pin_project; use slab::Slab; +use crate::sync::{Arc, AtomicBool, Mutex, Ordering}; + type WakerList = Arc>>>; type TriggerState = Arc; @@ -58,7 +59,7 @@ impl Subscriber { /// and we can update the waker with the new waker. Otherwise we insert the waker /// into the waker list as a new waker. Either way, we return the key of the waker. pub fn state(&self, cx: &mut Context, key: Option) -> SubscriberState { - if self.state.load(std::sync::atomic::Ordering::SeqCst) { + if self.state.load(Ordering::SeqCst) { return SubscriberState::Triggered; } @@ -181,7 +182,7 @@ impl Sender { /// Triggers the Receiver, with a short circuit if the trigger has already been triggered. pub fn trigger(&self) { - if self.state.swap(true, std::sync::atomic::Ordering::SeqCst) { + if self.state.swap(true, Ordering::SeqCst) { return; } From 10d96c8db5cb26d782fbb50ae34df6a756da05e3 Mon Sep 17 00:00:00 2001 From: glendc Date: Thu, 14 Sep 2023 14:10:12 +0200 Subject: [PATCH 2/7] whole codebase is now ready for loom testing --- Cargo.toml | 9 ++++++--- examples/waitgroup.rs | 2 +- src/guard.rs | 40 +++++++++++++--------------------------- src/lib.rs | 4 ++-- src/shutdown.rs | 16 +++++----------- src/sync.rs | 11 ----------- src/sync/default.rs | 6 ++++++ src/sync/loom.rs | 6 ++++++ src/sync/mod.rs | 9 +++++++++ 9 files changed, 48 insertions(+), 55 deletions(-) delete mode 100644 src/sync.rs create mode 100644 src/sync/default.rs create mode 100644 src/sync/loom.rs create mode 100644 src/sync/mod.rs diff --git a/Cargo.toml b/Cargo.toml index b90b0a4..54c94c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,14 +10,17 @@ keywords = ["io", "async", "non-blocking", "futures"] license = "MIT OR Apache-2.0" repository = "https://github.com/plabayo/tokio-graceful" +[target.'cfg(loom)'.dependencies] +loom = "0.7" + [dependencies] -pin-project-lite = "0.2.13" -slab = "0.4.9" +pin-project-lite = "0.2" +slab = "0.4" tokio = { version = "1", features = ["rt", "signal", "sync", "macros", "time"] } tracing = "0.1" [dev-dependencies] hyper = { version = "0.14", features = [ "server", "tcp", "http1", "http2" ] } -rand = "0.8.5" +rand = "0.8" tokio = { version = "1", features = ["net", "rt-multi-thread", "io-util", "test-util"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/waitgroup.rs b/examples/waitgroup.rs index 8cecd90..7c9b650 100644 --- a/examples/waitgroup.rs +++ b/examples/waitgroup.rs @@ -23,7 +23,7 @@ async fn main() { // NOTE: you can also manually create // a guard using `shutdown.guard()` and spawn // you async tasks manually in case you do not wish to run these - // using `tokio::spawn`. + // using `tokio_graceful::sync::spawn` (Tokio by default). let sleep = tokio::time::sleep(Duration::from_secs(countdown)); shutdown.spawn_task(async move { sleep.await; diff --git a/src/guard.rs b/src/guard.rs index 527015e..f8cc248 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -1,9 +1,7 @@ use std::{future::Future, mem::ManuallyDrop}; -use tokio::task::JoinHandle; - use crate::{ - sync::{Arc, AtomicUsize, Ordering}, + sync::{Arc, AtomicUsize, JoinHandle, Ordering}, trigger::{Receiver, Sender}, }; @@ -58,53 +56,44 @@ impl ShutdownGuard { self.0.cancelled().await } - /// Returns a Tokio [`JoinHandle`] that can be awaited on + /// Returns a [`crate::sync::JoinHandle`] that can be awaited on /// to wait for the spawned task to complete. See - /// [`tokio::spawn`] for more information. - /// - /// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html - /// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html + /// [`crate::sync::spawn`] for more information. pub fn spawn_task(&self, task: T) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, { let guard = self.clone(); - tokio::spawn(async move { + crate::sync::spawn(async move { let output = task.await; drop(guard); output }) } - /// Returns a Tokio [`JoinHandle`] that can be awaited on + /// Returns a Tokio [`crate::sync::JoinHandle`] that can be awaited on /// to wait for the spawned task (future) to complete. See - /// [`tokio::spawn`] for more information. + /// [`crate::sync::spawn`] for more information. /// /// In contrast to [`ShutdownGuard::spawn_task`] this method consumes the guard, /// ensuring the guard is dropped once the task future is fulfilled. - /// - /// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html - /// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html /// [`ShutdownGuard::spawn_task`]: crate::ShutdownGuard::spawn_task pub fn into_spawn_task(self, task: T) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, { - tokio::spawn(async move { + crate::sync::spawn(async move { let output = task.await; drop(self); output }) } - /// Returns a Tokio [`JoinHandle`] that can be awaited on + /// Returns a Tokio [`crate::sync::JoinHandle`] that can be awaited on /// to wait for the spawned task (fn) to complete. See - /// [`tokio::spawn`] for more information. - /// - /// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html - /// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html + /// [`crate::sync::spawn`] for more information. pub fn spawn_task_fn(&self, task: F) -> JoinHandle where F: FnOnce(ShutdownGuard) -> T + Send + 'static, @@ -112,18 +101,15 @@ impl ShutdownGuard { T::Output: Send + 'static, { let guard = self.clone(); - tokio::spawn(async move { task(guard).await }) + crate::sync::spawn(async move { task(guard).await }) } - /// Returns a Tokio [`JoinHandle`] that can be awaited on + /// Returns a Tokio [`crate::sync::JoinHandle`] that can be awaited on /// to wait for the spawned task (fn) to complete. See - /// [`tokio::spawn`] for more information. + /// [`crate::sync::spawn`] for more information. /// /// In contrast to [`ShutdownGuard::spawn_task_fn`] this method consumes the guard, /// ensuring the guard is dropped once the task future is fulfilled. - /// - /// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html - /// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html /// [`ShutdownGuard::spawn_task_fn`]: crate::ShutdownGuard::spawn_task_fn pub fn into_spawn_task_fn(self, task: F) -> JoinHandle where @@ -131,7 +117,7 @@ impl ShutdownGuard { T: Future + Send + 'static, T::Output: Send + 'static, { - tokio::spawn(async move { task(self).await }) + crate::sync::spawn(async move { task(self).await }) } /// Downgrades the guard to a [`WeakShutdownGuard`], diff --git a/src/lib.rs b/src/lib.rs index 102c54a..8e847c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,7 +63,7 @@ mod tests { let shutdown = Shutdown::new(async { rx.await.unwrap(); }); - tokio::spawn(async move { + crate::sync::spawn(async move { tx.send(()).unwrap(); }); shutdown.shutdown().await; @@ -75,7 +75,7 @@ mod tests { let shutdown = Shutdown::new(async { rx.await.unwrap(); }); - tokio::spawn(async move { + crate::sync::spawn(async move { tx.send(()).unwrap(); }); shutdown diff --git a/src/shutdown.rs b/src/shutdown.rs index 1770c24..0fd09ff 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -34,7 +34,7 @@ impl Shutdown { let guard = ShutdownGuard::new(signal_rx, zero_tx, Arc::new(0usize.into())); - tokio::spawn(async move { + crate::sync::spawn(async move { signal.await; signal_tx.trigger(); }); @@ -81,12 +81,9 @@ impl Shutdown { self.guard.clone_weak() } - /// Returns a Tokio [`JoinHandle`] that can be awaited on + /// Returns a Tokio [`crate::sync::JoinHandle`] that can be awaited on /// to wait for the spawned task to complete. See - /// [`tokio::spawn`] for more information. - /// - /// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html - /// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html + /// [`crate::sync::spawn`] for more information. #[inline] pub fn spawn_task(&self, task: T) -> tokio::task::JoinHandle where @@ -96,12 +93,9 @@ impl Shutdown { self.guard.spawn_task(task) } - /// Returns a Tokio [`JoinHandle`] that can be awaited on + /// Returns a Tokio [`crate::sync::JoinHandle`] that can be awaited on /// to wait for the spawned task (fn) to complete. See - /// [`tokio::spawn`] for more information. - /// - /// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html - /// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html + /// [`crate::sync::spawn`] for more information. #[inline] pub fn spawn_task_fn(&self, task: F) -> tokio::task::JoinHandle where diff --git a/src/sync.rs b/src/sync.rs deleted file mode 100644 index 54a5ed6..0000000 --- a/src/sync.rs +++ /dev/null @@ -1,11 +0,0 @@ -#[cfg(loom)] -pub use loom::sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, Ordering, -}; - -#[cfg(not(loom))] -pub use std::sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, -}; diff --git a/src/sync/default.rs b/src/sync/default.rs new file mode 100644 index 0000000..7ccb73d --- /dev/null +++ b/src/sync/default.rs @@ -0,0 +1,6 @@ +pub use std::sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, +}; + +pub use tokio::task::{spawn, JoinHandle}; diff --git a/src/sync/loom.rs b/src/sync/loom.rs new file mode 100644 index 0000000..f79713c --- /dev/null +++ b/src/sync/loom.rs @@ -0,0 +1,6 @@ +pub use loom::sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, Ordering, +}; + +pub use loom::task::{spawn, JoinHandle}; diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 0000000..d3ac3e2 --- /dev/null +++ b/src/sync/mod.rs @@ -0,0 +1,9 @@ +#[cfg(loom)] +mod loom; +#[cfg(loom)] +pub use loom::*; + +#[cfg(not(loom))] +mod default; +#[cfg(not(loom))] +pub use default::*; From ffc7f90fcdbe8ce23e0fbca4e53c4a6faa2cbb32 Mon Sep 17 00:00:00 2001 From: glendc Date: Thu, 14 Sep 2023 15:09:56 +0200 Subject: [PATCH 3/7] prepare trigger channel code for loom test and ensure rest of code can still just run fine as-is --- .github/workflows/CI.yml | 19 +++++++++++++++++++ Cargo.toml | 4 +++- src/lib.rs | 4 +++- src/shutdown.rs | 10 +++++++--- src/sync/default.rs | 2 -- src/sync/loom.rs | 4 +--- src/sync/mod.rs | 4 +++- src/trigger.rs | 8 ++++++++ 8 files changed, 44 insertions(+), 11 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index e08f02b..ceb607c 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -98,6 +98,25 @@ jobs: command: test args: --all-features --examples + test-loom: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions-rs/toolchain@v1 + with: + toolchain: ${{env.RUST_TOOLCHAIN}} + override: true + profile: minimal + - uses: Swatinem/rust-cache@v1 + - name: Run tests + uses: actions-rs/cargo@v1 + with: + command: test + args: test_loom --release + env: + RUSTFLAGS: --cfg loom + cargo-hack: needs: check runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index 54c94c7..14cfa7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,9 @@ tokio = { version = "1", features = ["rt", "signal", "sync", "macros", "time"] } tracing = "0.1" [dev-dependencies] -hyper = { version = "0.14", features = [ "server", "tcp", "http1", "http2" ] } rand = "0.8" tokio = { version = "1", features = ["net", "rt-multi-thread", "io-util", "test-util"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +[target.'cfg(not(loom))'.dev-dependencies] +hyper = { version = "0.14", features = [ "server", "tcp", "http1", "http2" ] } diff --git a/src/lib.rs b/src/lib.rs index 8e847c3..2a00dbf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,7 +40,9 @@ mod guard; pub use guard::{ShutdownGuard, WeakShutdownGuard}; mod shutdown; -pub use shutdown::{default_signal, Shutdown}; +#[cfg(not(loom))] +pub use shutdown::default_signal; +pub use shutdown::Shutdown; pub(crate) mod sync; pub(crate) mod trigger; diff --git a/src/shutdown.rs b/src/shutdown.rs index 0fd09ff..1258be4 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -1,7 +1,7 @@ use std::{future::Future, time}; use crate::{ - sync::Arc, + sync::{Arc, JoinHandle}, trigger::{trigger, Receiver}, ShutdownGuard, WeakShutdownGuard, }; @@ -13,6 +13,8 @@ use crate::{ /// create a [`Shutdown`] with [`Shutdown::default`], which uses the default /// signal handler to trigger shutdown. See [`default_signal`] for more info. /// +/// > (NOTE: that these defaults are not available when compiling with --cfg loom) +/// /// See the [README] for more info on how to use this crate. /// /// [`Future`]: std::future::Future @@ -85,7 +87,7 @@ impl Shutdown { /// to wait for the spawned task to complete. See /// [`crate::sync::spawn`] for more information. #[inline] - pub fn spawn_task(&self, task: T) -> tokio::task::JoinHandle + pub fn spawn_task(&self, task: T) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, @@ -97,7 +99,7 @@ impl Shutdown { /// to wait for the spawned task (fn) to complete. See /// [`crate::sync::spawn`] for more information. #[inline] - pub fn spawn_task_fn(&self, task: F) -> tokio::task::JoinHandle + pub fn spawn_task_fn(&self, task: F) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, @@ -182,6 +184,7 @@ impl Shutdown { /// /// [`Future`]: std::future::Future /// [`tokio::time::sleep`]: https://docs.rs/tokio/*/tokio/time/fn.sleep.html +#[cfg(not(loom))] pub async fn default_signal() { let ctrl_c = tokio::signal::ctrl_c(); let signal = async { @@ -196,6 +199,7 @@ pub async fn default_signal() { } } +#[cfg(not(loom))] impl Default for Shutdown { fn default() -> Self { Self::new(default_signal()) diff --git a/src/sync/default.rs b/src/sync/default.rs index 7ccb73d..b4dd310 100644 --- a/src/sync/default.rs +++ b/src/sync/default.rs @@ -2,5 +2,3 @@ pub use std::sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex, }; - -pub use tokio::task::{spawn, JoinHandle}; diff --git a/src/sync/loom.rs b/src/sync/loom.rs index f79713c..d6b2a92 100644 --- a/src/sync/loom.rs +++ b/src/sync/loom.rs @@ -1,6 +1,4 @@ pub use loom::sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, Ordering, + Arc, Mutex, }; - -pub use loom::task::{spawn, JoinHandle}; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index d3ac3e2..34c3590 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,9 +1,11 @@ #[cfg(loom)] mod loom; #[cfg(loom)] -pub use loom::*; +pub use self::loom::*; #[cfg(not(loom))] mod default; #[cfg(not(loom))] pub use default::*; + +pub use tokio::task::{spawn, JoinHandle}; diff --git a/src/trigger.rs b/src/trigger.rs index 1f675eb..313c788 100644 --- a/src/trigger.rs +++ b/src/trigger.rs @@ -213,3 +213,11 @@ pub fn trigger() -> (Sender, Receiver) { (sender, receiver) } + +#[cfg(all(test, loom))] +mod loom_tests { + use super::*; + + #[test] + fn test_loom_concurrent() {} +} From 41b77187f9fed59fc48074a7256e6f078477f2d4 Mon Sep 17 00:00:00 2001 From: glendc Date: Fri, 15 Sep 2023 09:04:01 +0200 Subject: [PATCH 4/7] implement first 'real' loom test --- Cargo.toml | 2 +- src/trigger.rs | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 14cfa7e..e461f71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ license = "MIT OR Apache-2.0" repository = "https://github.com/plabayo/tokio-graceful" [target.'cfg(loom)'.dependencies] -loom = "0.7" +loom = { version = "0.7", features = ["futures", "checkpoint"] } [dependencies] pin-project-lite = "0.2" diff --git a/src/trigger.rs b/src/trigger.rs index 313c788..67899f4 100644 --- a/src/trigger.rs +++ b/src/trigger.rs @@ -218,6 +218,22 @@ pub fn trigger() -> (Sender, Receiver) { mod loom_tests { use super::*; + use loom::{future::block_on, thread}; + #[test] - fn test_loom_concurrent() {} + fn test_loom_sender_trigger() { + loom::model(|| { + let (sender, receiver) = trigger(); + + let th = thread::spawn(move || { + sender.trigger(); + }); + + block_on(async move { + receiver.await; + }); + + th.join().unwrap(); + }); + } } From 7a9c299339e94169035ac7509d91186bed30bf77 Mon Sep 17 00:00:00 2001 From: glendc Date: Wed, 20 Sep 2023 15:10:00 +0200 Subject: [PATCH 5/7] switch CI environment to stable (same as production/expected code) --- .github/workflows/CI.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index ceb607c..ef02523 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -2,7 +2,7 @@ name: CI env: CARGO_TERM_COLOR: always - RUST_TOOLCHAIN: nightly-2023-07-08 + RUST_TOOLCHAIN: stable on: push: From aff9488b9ea1baf6a7e2278cdb15251f23c6e1f4 Mon Sep 17 00:00:00 2001 From: glendc Date: Wed, 20 Sep 2023 16:31:31 +0200 Subject: [PATCH 6/7] add non-loom tokio trigger (unit) tests --- src/trigger.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/trigger.rs b/src/trigger.rs index 67899f4..7a75634 100644 --- a/src/trigger.rs +++ b/src/trigger.rs @@ -214,6 +214,32 @@ pub fn trigger() -> (Sender, Receiver) { (sender, receiver) } +#[cfg(all(test, not(loom)))] +mod tests { + use super::*; + + #[tokio::test] + async fn test_sender_trigger() { + let (sender, receiver) = trigger(); + + let th = tokio::spawn(async move { + sender.trigger(); + }); + + receiver.await; + + th.await.unwrap(); + } + + #[tokio::test] + async fn test_sender_never_trigger() { + let (_, receiver) = trigger(); + tokio::time::timeout(std::time::Duration::from_millis(100), receiver) + .await + .unwrap_err(); + } +} + #[cfg(all(test, loom))] mod loom_tests { use super::*; From d8e9da8d671b6c8e2a77d67ab1373eedd7219cc7 Mon Sep 17 00:00:00 2001 From: glendc Date: Wed, 20 Sep 2023 17:34:23 +0200 Subject: [PATCH 7/7] ensure to check trigger prior to registering waker fix deadlock detected by loom --- src/trigger.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/trigger.rs b/src/trigger.rs index 7a75634..29647c0 100644 --- a/src/trigger.rs +++ b/src/trigger.rs @@ -65,6 +65,14 @@ impl Subscriber { let mut wakers = self.wakers.lock().unwrap(); + // check again after locking the wakers + // if we didn't miss this for some reason... + // (without this, we could miss a trigger, and never wake up...) + // (this was a bug detected by loom) + if self.state.load(Ordering::SeqCst) { + return SubscriberState::Triggered; + } + let waker = Some(cx.waker().clone()); SubscriberState::Waiting(if let Some(key) = key {