Skip to content

Commit

Permalink
Introduce named global executor for choice in execution
Browse files Browse the repository at this point in the history
It is a good for binary crate to active at most one global spawner
compat. But it could be relatively hard if dependency graph is large.
In this case, it would be good to offer choices in execution.

This commit uses environment variable SPAWNS_GLOBAL_EXECUTOR to choose
one in absent of thread context spawners.
  • Loading branch information
kezhuw committed May 6, 2024
1 parent 3f38b2d commit 13c8567
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 33 deletions.
1 change: 1 addition & 0 deletions spawns-compat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async-global-executor = { version = "2", optional = true }

[dev-dependencies]
async-std = "1.12.0"
futures-lite = "2.3.0"
tokio = { version = "1.37.0", features = ["full"] }

[package.metadata.docs.rs]
Expand Down
14 changes: 9 additions & 5 deletions spawns-compat/src/async_global_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use spawns_core::{Compat, Task, COMPATS};
use std::boxed::Box;

#[distributed_slice(COMPATS)]
pub static ASYNC_GLOBAL_EXECUTOR: Compat = Compat::Global(async_global);
pub static ASYNC_GLOBAL_EXECUTOR: Compat = Compat::NamedGlobal {
name: "async-global-executor",
spawn: async_global,
};

fn async_global(task: Task) {
let Task { future, .. } = task;
Expand All @@ -15,10 +18,11 @@ fn async_global(task: Task) {
#[cfg(not(feature = "smol"))]
mod tests {
use spawns_core::*;
use futures_lite::future;

#[test]
fn spawn_one() {
async_std::task::block_on(async {
future::block_on(async {
let handle = spawn(async { id() });
let id = handle.id();
assert_eq!(handle.await.unwrap(), id);
Expand All @@ -27,7 +31,7 @@ mod tests {

#[test]
fn spawn_cascading() {
async_std::task::block_on(async {
future::block_on(async {
let handle = spawn(async { spawn(async { id() }) });
let handle = handle.await.unwrap();
let id = handle.id();
Expand All @@ -37,7 +41,7 @@ mod tests {

#[test]
fn spawn_interleaving() {
async_std::task::block_on(async move {
future::block_on(async move {
let handle = spawn(async { async_std::task::spawn(async { spawn(async { id() }) }) });
let handle = handle.await.unwrap().await;
let id = handle.id();
Expand All @@ -47,7 +51,7 @@ mod tests {

#[test]
fn spawn_into_smol() {
async_std::task::block_on(async move {
future::block_on(async move {
let handle = spawn(async { async_std::task::spawn(async { try_id() }) });
let handle = handle.await.unwrap();
assert_eq!(handle.await, None);
Expand Down
14 changes: 9 additions & 5 deletions spawns-compat/src/smol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use spawns_core::{Compat, Task, COMPATS};
use std::boxed::Box;

#[distributed_slice(COMPATS)]
pub static SMOL: Compat = Compat::Global(smol_global);
pub static SMOL: Compat = Compat::NamedGlobal {
name: "smol",
spawn: smol_global,
};

fn smol_global(task: Task) {
let Task { future, .. } = task;
Expand All @@ -15,10 +18,11 @@ fn smol_global(task: Task) {
#[cfg(not(feature = "async-global-executor"))]
mod tests {
use spawns_core::*;
use futures_lite::future;

#[test]
fn spawn_one() {
smol::block_on(async {
future::block_on(async {
let handle = spawn(async { id() });
let id = handle.id();
assert_eq!(handle.await.unwrap(), id);
Expand All @@ -27,7 +31,7 @@ mod tests {

#[test]
fn spawn_cascading() {
smol::block_on(async {
future::block_on(async {
let handle = spawn(async { spawn(async { id() }) });
let handle = handle.await.unwrap();
let id = handle.id();
Expand All @@ -37,7 +41,7 @@ mod tests {

#[test]
fn spawn_interleaving() {
smol::block_on(async move {
future::block_on(async move {
let handle = spawn(async { smol::spawn(async { spawn(async { id() }) }) });
let handle = handle.await.unwrap().await;
let id = handle.id();
Expand All @@ -47,7 +51,7 @@ mod tests {

#[test]
fn spawn_into_smol() {
smol::block_on(async move {
future::block_on(async move {
let handle = spawn(async { smol::spawn(async { try_id() }) });
let handle = handle.await.unwrap();
assert_eq!(handle.await, None);
Expand Down
1 change: 1 addition & 0 deletions spawns-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ compat = ["linkme"]
panic-multiple-global-spawners = []
test-compat-global1 = ["compat"]
test-compat-global2 = ["compat", "test-compat-global1"]
test-named-global = []

[dependencies]
linkme = { version = "0.3.25", optional = true }
Expand Down
91 changes: 73 additions & 18 deletions spawns-core/src/compat.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use crate::Task;
use linkme::distributed_slice;
use std::sync::OnceLock;

/// Compat encapsulate functions to find async runtimes to spawn task.
pub enum Compat {
/// Named global function to spawn task.
NamedGlobal { name: &'static str, spawn: fn(Task) },
/// Global function to spawn task.
///
/// [spawn](`crate::spawn()`) will panic if there is no local spawners but multiple global spawners.
#[doc(hidden)]
#[deprecated(since = "1.0.3", note = "use NamedGlobal instead")]
Global(fn(Task)),
#[allow(clippy::type_complexity)]
/// Local function to detect async runtimes.
Expand All @@ -16,33 +19,85 @@ pub enum Compat {
#[distributed_slice]
pub static COMPATS: [Compat] = [..];

pub(crate) fn find_spawn() -> Option<fn(Task)> {
match COMPATS.len() {
0 => return None,
1 => match COMPATS[0] {
Compat::Global(inject) => return Some(inject),
Compat::Local(detect) => return detect(),
},
_ => {}
}
#[derive(Clone, Copy)]
pub(crate) enum Failure {
NotFound,
#[allow(dead_code)]
MultipleGlobals,
}

let mut last_global = None;
fn pick_global(choose: Option<&str>) -> Result<fn(Task), Failure> {
let mut globals = 0;
match COMPATS.iter().find_map(|injection| match injection {
Compat::Local(local) => local(),
let mut last_named = None;
let mut last_unnamed = None;
match COMPATS.iter().find_map(|compat| match compat {
Compat::Local(_) => None,
#[allow(deprecated)]
Compat::Global(global) => {
globals += 1;
last_global = Some(global);
last_unnamed = Some(global);
None
}
Compat::NamedGlobal { spawn, name } => {
if choose == Some(name) {
Some(spawn)
} else {
globals += 1;
last_named = Some(spawn);
None
}
}
}) {
Some(spawn) => Some(spawn),
Some(spawn) => Ok(*spawn),
None => {
#[cfg(feature = "panic-multiple-global-spawners")]
if globals > 1 {
panic!("multiple global spawners")
return Err(Failure::MultipleGlobals);
}
last_global.copied()
last_named
.or(last_unnamed)
.ok_or(Failure::NotFound)
.copied()
}
}
}

fn find_global() -> Result<fn(Task), Failure> {
static FOUND: OnceLock<Result<fn(Task), Failure>> = OnceLock::new();
if let Some(found) = FOUND.get() {
return *found;
}
let choose = std::env::var("SPAWNS_GLOBAL_SPAWNER").ok();
let result = pick_global(choose.as_deref());
*FOUND.get_or_init(|| result)
}

fn find_local() -> Option<fn(Task)> {
COMPATS.iter().find_map(|compat| match compat {
Compat::Local(local) => local(),
#[allow(deprecated)]
Compat::Global(_) => None,
Compat::NamedGlobal { .. } => None,
})
}

pub(crate) fn find_spawn() -> Option<fn(Task)> {
match COMPATS.len() {
0 => return None,

Check warning on line 86 in spawns-core/src/compat.rs

View check run for this annotation

Codecov / codecov/patch

spawns-core/src/compat.rs#L86

Added line #L86 was not covered by tests
1 => match COMPATS[0] {
Compat::NamedGlobal { spawn, .. } => return Some(spawn),
#[allow(deprecated)]
Compat::Global(spawn) => return Some(spawn),

Check warning on line 90 in spawns-core/src/compat.rs

View check run for this annotation

Codecov / codecov/patch

spawns-core/src/compat.rs#L90

Added line #L90 was not covered by tests
Compat::Local(local) => return local(),
},
_ => {}
}
match find_local()
.ok_or(Failure::NotFound)
.or_else(|_| find_global())
{
Ok(spawn) => Some(spawn),
Err(Failure::NotFound) => None,
Err(Failure::MultipleGlobals) => panic!("multiple global spawners"),
}
}
24 changes: 22 additions & 2 deletions spawns-core/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,15 @@ mod tests {

#[cfg(feature = "test-compat-global1")]
#[distributed_slice(COMPATS)]
#[allow(deprecated)]
pub static THREAD_GLOBAL: Compat = Compat::Global(thread_global);

#[cfg(feature = "test-compat-global2")]
#[distributed_slice(COMPATS)]
pub static DROP_GLOBAL: Compat = Compat::Global(drop_global);
pub static DROP_GLOBAL: Compat = Compat::NamedGlobal {
name: "drop",
spawn: drop_global,
};

#[cfg(feature = "test-compat-global2")]
fn drop_global(task: Task) {
Expand Down Expand Up @@ -255,6 +259,7 @@ mod tests {
}

#[cfg(feature = "test-compat-global2")]
#[cfg(not(feature = "test-named-global"))]
#[cfg(feature = "panic-multiple-global-spawners")]
#[test]
#[should_panic(expected = "multiple global spawners")]
Expand All @@ -263,10 +268,25 @@ mod tests {
}

#[cfg(feature = "test-compat-global2")]
#[cfg(not(feature = "test-named-global"))]
#[cfg(not(feature = "panic-multiple-global-spawners"))]
#[test]
fn multiple_globals() {
block_on(spawn(ready(()))).unwrap();
// The one chosen is indeterminate.
spawn(ready(()));
}

// Rust runs all tests in one process for given features, so it is crucial to keep features
// set unique for this test as it setup environment variable SPAWNS_GLOBAL_SPAWNER.
#[cfg(feature = "test-compat-global2")]
#[cfg(feature = "test-named-global")]
#[cfg(feature = "panic-multiple-global-spawners")]
#[test]
fn multiple_globals_choose_named() {
std::env::set_var("SPAWNS_GLOBAL_SPAWNER", "drop");
let handle = spawn(ready(()));
let err = block_on(handle).unwrap_err();
assert!(err.is_cancelled());
}
}
}
21 changes: 18 additions & 3 deletions spawns/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,28 @@
//! }
//! ```
//!
//! To cooperate with existing async runtimes, it provides features to inject spawners for them.
//! ## Compatibility with existing async runtimes
//!
//! This is an open world, there might be tens async runtimes. `spawns` provides features to inject
//! spawners for few.
//!
//! * `tokio`: uses `tokio::runtime::Handle::try_current()` to detect thread local `tokio` runtime handle.
//! * `smol`: uses `smol::spawn` to spawn task in absent of thread local spawners.
//! * `async-global-executor`: uses `async_global_executor::spawn` to spawn task in absent of thread local spawners.
//!
//! Since `smol` and `async-global-executor` both blindly spawn tasks, it is unknown which one is
//! chosen. Feature "panic-multiple-global-spawners" is provided to panic on this situation.
//! For other async runtimes, one could inject [Compat]s to [static@COMPATS] themselves.
//!
//! Noted that, all those compatibility features, injections should only active on tests and
//! binaries. Otherwise, they will be propagated to dependents with unnecessary dependencies.
//!
//! ## Dealing with multiple global executors
//! Global executor cloud spawn task with no help from thread context. But this exposes us an
//! dilemma to us, which one to use if there are multiple global executors present ? By default,
//! `spawns` randomly chooses one and stick to it to spawn tasks in absent of thread context
//! spawners. Generally, this should be safe as global executors should be designed to spawn
//! everywhere. If this is not the case, one could use environment variable `SPAWNS_GLOBAL_SPAWNER`
//! to specify one. As a safety net, feature `panic-multiple-global-spawners` is provided to panic
//! if there are multiple global candidates.
pub use spawns_core::*;

Expand Down

0 comments on commit 13c8567

Please sign in to comment.