Skip to content

Commit

Permalink
Merge pull request #3 from plabayo/v0.1.5/loom-tests
Browse files Browse the repository at this point in the history
V0.1.5: loom tests
  • Loading branch information
GlenDC authored Sep 20, 2023
2 parents 7cd0418 + d8e9da8 commit 3e49135
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 71 deletions.
21 changes: 20 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: CI

env:
CARGO_TERM_COLOR: always
RUST_TOOLCHAIN: nightly-2023-07-08
RUST_TOOLCHAIN: stable

on:
push:
Expand Down Expand Up @@ -98,6 +98,25 @@ jobs:
command: test
args: --all-features --examples

test-loom:
needs: check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
toolchain: ${{env.RUST_TOOLCHAIN}}
override: true
profile: minimal
- uses: Swatinem/rust-cache@v1
- name: Run tests
uses: actions-rs/cargo@v1
with:
command: test
args: test_loom --release
env:
RUSTFLAGS: --cfg loom

cargo-hack:
needs: check
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

# 0.1.5 (13. September, 2023)

- Support and use Loom for testing... (TODO);

# 0.1.4 (08. September, 2023)

- Add example regarding ensuring you do catch exits and document it;
Expand Down
15 changes: 10 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,27 @@
categories = ["asynchronous", "network-programming"]
edition = "2021"
name = "tokio-graceful"
version = "0.1.4"
version = "0.1.5"
description = "util for graceful shutdown of tokio applications"
homepage = "https://github.com/plabayo/tokio-graceful"
readme = "README.md"
keywords = ["io", "async", "non-blocking", "futures"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/plabayo/tokio-graceful"

[target.'cfg(loom)'.dependencies]
loom = { version = "0.7", features = ["futures", "checkpoint"] }

[dependencies]
pin-project-lite = "0.2.13"
slab = "0.4.9"
pin-project-lite = "0.2"
slab = "0.4"
tokio = { version = "1", features = ["rt", "signal", "sync", "macros", "time"] }
tracing = "0.1"

[dev-dependencies]
hyper = { version = "0.14", features = [ "server", "tcp", "http1", "http2" ] }
rand = "0.8.5"
rand = "0.8"
tokio = { version = "1", features = ["net", "rt-multi-thread", "io-util", "test-util"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[target.'cfg(not(loom))'.dev-dependencies]
hyper = { version = "0.14", features = [ "server", "tcp", "http1", "http2" ] }
2 changes: 1 addition & 1 deletion examples/waitgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() {
// NOTE: you can also manually create
// a guard using `shutdown.guard()` and spawn
// you async tasks manually in case you do not wish to run these
// using `tokio::spawn`.
// using `tokio_graceful::sync::spawn` (Tokio by default).
let sleep = tokio::time::sleep(Duration::from_secs(countdown));
shutdown.spawn_task(async move {
sleep.await;
Expand Down
65 changes: 21 additions & 44 deletions src/guard.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::{
future::Future,
mem::ManuallyDrop,
sync::{atomic::AtomicUsize, Arc},
};

use tokio::task::JoinHandle;
use std::{future::Future, mem::ManuallyDrop};

use crate::trigger::{Receiver, Sender};
use crate::{
sync::{Arc, AtomicUsize, JoinHandle, Ordering},
trigger::{Receiver, Sender},
};

/// A guard, linked to a [`Shutdown`] struct,
/// prevents the [`Shutdown::shutdown`] future from completing.
Expand Down Expand Up @@ -35,7 +32,7 @@ pub struct WeakShutdownGuard {

impl ShutdownGuard {
pub(crate) fn new(trigger_rx: Receiver, zero_tx: Sender, ref_count: Arc<AtomicUsize>) -> Self {
let value = ref_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let value = ref_count.fetch_add(1, Ordering::SeqCst);
tracing::trace!("new shutdown guard: ref_count+1: {}", value + 1);
Self(ManuallyDrop::new(WeakShutdownGuard::new(
trigger_rx, zero_tx, ref_count,
Expand All @@ -59,80 +56,68 @@ impl ShutdownGuard {
self.0.cancelled().await
}

/// Returns a Tokio [`JoinHandle`] that can be awaited on
/// Returns a [`crate::sync::JoinHandle`] that can be awaited on
/// to wait for the spawned task to complete. See
/// [`tokio::spawn`] for more information.
///
/// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html
/// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html
/// [`crate::sync::spawn`] for more information.
pub fn spawn_task<T>(&self, task: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let guard = self.clone();
tokio::spawn(async move {
crate::sync::spawn(async move {
let output = task.await;
drop(guard);
output
})
}

/// Returns a Tokio [`JoinHandle`] that can be awaited on
/// Returns a Tokio [`crate::sync::JoinHandle`] that can be awaited on
/// to wait for the spawned task (future) to complete. See
/// [`tokio::spawn`] for more information.
/// [`crate::sync::spawn`] for more information.
///
/// In contrast to [`ShutdownGuard::spawn_task`] this method consumes the guard,
/// ensuring the guard is dropped once the task future is fulfilled.
///
/// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html
/// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html
/// [`ShutdownGuard::spawn_task`]: crate::ShutdownGuard::spawn_task
pub fn into_spawn_task<T>(self, task: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(async move {
crate::sync::spawn(async move {
let output = task.await;
drop(self);
output
})
}

/// Returns a Tokio [`JoinHandle`] that can be awaited on
/// Returns a Tokio [`crate::sync::JoinHandle`] that can be awaited on
/// to wait for the spawned task (fn) to complete. See
/// [`tokio::spawn`] for more information.
///
/// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html
/// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html
/// [`crate::sync::spawn`] for more information.
pub fn spawn_task_fn<F, T>(&self, task: F) -> JoinHandle<T::Output>
where
F: FnOnce(ShutdownGuard) -> T + Send + 'static,
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let guard = self.clone();
tokio::spawn(async move { task(guard).await })
crate::sync::spawn(async move { task(guard).await })
}

/// Returns a Tokio [`JoinHandle`] that can be awaited on
/// Returns a Tokio [`crate::sync::JoinHandle`] that can be awaited on
/// to wait for the spawned task (fn) to complete. See
/// [`tokio::spawn`] for more information.
/// [`crate::sync::spawn`] for more information.
///
/// In contrast to [`ShutdownGuard::spawn_task_fn`] this method consumes the guard,
/// ensuring the guard is dropped once the task future is fulfilled.
///
/// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html
/// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html
/// [`ShutdownGuard::spawn_task_fn`]: crate::ShutdownGuard::spawn_task_fn
pub fn into_spawn_task_fn<F, T>(self, task: F) -> JoinHandle<T::Output>
where
F: FnOnce(ShutdownGuard) -> T + Send + 'static,
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(async move { task(self).await })
crate::sync::spawn(async move { task(self).await })
}

/// Downgrades the guard to a [`WeakShutdownGuard`],
Expand All @@ -156,31 +141,23 @@ impl ShutdownGuard {

impl Clone for ShutdownGuard {
fn clone(&self) -> Self {
let value = &self
.0
.ref_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let value = &self.0.ref_count.fetch_add(1, Ordering::SeqCst);
tracing::trace!("clone shutdown guard: ref_count+1: {}", value + 1);
Self(self.0.clone())
}
}

impl From<WeakShutdownGuard> for ShutdownGuard {
fn from(weak_guard: WeakShutdownGuard) -> ShutdownGuard {
let value = weak_guard
.ref_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let value = weak_guard.ref_count.fetch_add(1, Ordering::SeqCst);
tracing::trace!("from weak shutdown guard: ref_count+1: {}", value + 1);
Self(ManuallyDrop::new(weak_guard))
}
}

impl Drop for ShutdownGuard {
fn drop(&mut self) {
let cnt = self
.0
.ref_count
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
let cnt = self.0.ref_count.fetch_sub(1, Ordering::SeqCst);
tracing::trace!("drop shutdown guard: ref_count-1: {}", cnt - 1);
if cnt == 1 {
self.0.zero_tx.trigger();
Expand Down
9 changes: 6 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ mod guard;
pub use guard::{ShutdownGuard, WeakShutdownGuard};

mod shutdown;
pub use shutdown::{default_signal, Shutdown};
#[cfg(not(loom))]
pub use shutdown::default_signal;
pub use shutdown::Shutdown;

pub(crate) mod sync;
pub(crate) mod trigger;

#[doc = include_str!("../README.md")]
Expand All @@ -62,7 +65,7 @@ mod tests {
let shutdown = Shutdown::new(async {
rx.await.unwrap();
});
tokio::spawn(async move {
crate::sync::spawn(async move {
tx.send(()).unwrap();
});
shutdown.shutdown().await;
Expand All @@ -74,7 +77,7 @@ mod tests {
let shutdown = Shutdown::new(async {
rx.await.unwrap();
});
tokio::spawn(async move {
crate::sync::spawn(async move {
tx.send(()).unwrap();
});
shutdown
Expand Down
27 changes: 13 additions & 14 deletions src/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{future::Future, sync::Arc, time};
use std::{future::Future, time};

use crate::{
sync::{Arc, JoinHandle},
trigger::{trigger, Receiver},
ShutdownGuard, WeakShutdownGuard,
};
Expand All @@ -12,6 +13,8 @@ use crate::{
/// create a [`Shutdown`] with [`Shutdown::default`], which uses the default
/// signal handler to trigger shutdown. See [`default_signal`] for more info.
///
/// > (NOTE: that these defaults are not available when compiling with --cfg loom)
///
/// See the [README] for more info on how to use this crate.
///
/// [`Future`]: std::future::Future
Expand All @@ -33,7 +36,7 @@ impl Shutdown {

let guard = ShutdownGuard::new(signal_rx, zero_tx, Arc::new(0usize.into()));

tokio::spawn(async move {
crate::sync::spawn(async move {
signal.await;
signal_tx.trigger();
});
Expand Down Expand Up @@ -80,29 +83,23 @@ impl Shutdown {
self.guard.clone_weak()
}

/// Returns a Tokio [`JoinHandle`] that can be awaited on
/// Returns a Tokio [`crate::sync::JoinHandle`] that can be awaited on
/// to wait for the spawned task to complete. See
/// [`tokio::spawn`] for more information.
///
/// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html
/// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html
/// [`crate::sync::spawn`] for more information.
#[inline]
pub fn spawn_task<T>(&self, task: T) -> tokio::task::JoinHandle<T::Output>
pub fn spawn_task<T>(&self, task: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
self.guard.spawn_task(task)
}

/// Returns a Tokio [`JoinHandle`] that can be awaited on
/// Returns a Tokio [`crate::sync::JoinHandle`] that can be awaited on
/// to wait for the spawned task (fn) to complete. See
/// [`tokio::spawn`] for more information.
///
/// [`JoinHandle`]: https://docs.rs/tokio/*/tokio/task/struct.JoinHandle.html
/// [`tokio::spawn`]: https://docs.rs/tokio/*/tokio/task/fn.spawn.html
/// [`crate::sync::spawn`] for more information.
#[inline]
pub fn spawn_task_fn<T, F>(&self, task: F) -> tokio::task::JoinHandle<T::Output>
pub fn spawn_task_fn<T, F>(&self, task: F) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
Expand Down Expand Up @@ -187,6 +184,7 @@ impl Shutdown {
///
/// [`Future`]: std::future::Future
/// [`tokio::time::sleep`]: https://docs.rs/tokio/*/tokio/time/fn.sleep.html
#[cfg(not(loom))]
pub async fn default_signal() {
let ctrl_c = tokio::signal::ctrl_c();
let signal = async {
Expand All @@ -201,6 +199,7 @@ pub async fn default_signal() {
}
}

#[cfg(not(loom))]
impl Default for Shutdown {
fn default() -> Self {
Self::new(default_signal())
Expand Down
4 changes: 4 additions & 0 deletions src/sync/default.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
};
4 changes: 4 additions & 0 deletions src/sync/loom.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub use loom::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
};
11 changes: 11 additions & 0 deletions src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#[cfg(loom)]
mod loom;
#[cfg(loom)]
pub use self::loom::*;

#[cfg(not(loom))]
mod default;
#[cfg(not(loom))]
pub use default::*;

pub use tokio::task::{spawn, JoinHandle};
Loading

0 comments on commit 3e49135

Please sign in to comment.