From 12928269d8055e723673bc1fa1c3414439114746 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 14:45:07 -0700 Subject: [PATCH 1/8] Remove the dependency on async-lock --- Cargo.toml | 5 +---- src/lib.rs | 42 ++++++++++++++++++++---------------------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 25bd243..ccdca08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "blocking" version = "1.5.1" authors = ["Stjepan Glavina "] edition = "2021" -rust-version = "1.60" +rust-version = "1.63" description = "A thread pool for isolating blocking I/O in async programs" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/blocking" @@ -23,8 +23,5 @@ futures-lite = { version = "2.0.0", default-features = false } piper = "0.2.0" tracing = { version = "0.1.37", default-features = false } -[target.'cfg(not(target_family = "wasm"))'.dependencies] -async-lock = "3.0.0" - [dev-dependencies] futures-lite = "2.0.0" diff --git a/src/lib.rs b/src/lib.rs index 1d0ff6b..5d8fc65 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -154,7 +154,7 @@ struct Inner { queue: VecDeque, /// Maximum number of threads in the pool - thread_limit: NonZeroUsize, + thread_limit: Option, } impl Executor { @@ -174,22 +174,17 @@ impl Executor { fn get() -> &'static Self { #[cfg(not(target_family = "wasm"))] { - use async_lock::OnceCell; - - static EXECUTOR: OnceCell = OnceCell::new(); - - return EXECUTOR.get_or_init_blocking(|| { - let thread_limit = Self::max_threads(); - Executor { - inner: Mutex::new(Inner { - idle_count: 0, - thread_count: 0, - queue: VecDeque::new(), - thread_limit: NonZeroUsize::new(thread_limit).unwrap(), - }), - cvar: Condvar::new(), - } - }); + static EXECUTOR: Executor = Executor { + inner: Mutex::new(Inner { + idle_count: 0, + thread_count: 0, + queue: VecDeque::new(), + thread_limit: None, + }), + cvar: Condvar::new(), + }; + + return &EXECUTOR; } #[cfg(target_family = "wasm")] @@ -280,11 +275,14 @@ impl Executor { ); let _enter = span.enter(); + let thread_limit = inner + .thread_limit + .get_or_insert_with(|| NonZeroUsize::new(Self::max_threads()).unwrap()) + .get(); + // If runnable tasks greatly outnumber idle threads and there aren't too many threads // already, then be aggressive: wake all idle threads and spawn one more thread. - while inner.queue.len() > inner.idle_count * 5 - && inner.thread_count < inner.thread_limit.get() - { + while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < thread_limit { tracing::trace!("spawning a new thread to handle blocking tasks"); // The new thread starts in idle state. @@ -315,12 +313,12 @@ impl Executor { // If the limit is about to be set to zero, set it to one instead so that if, // in the future, we are able to spawn more threads, we will be able to do so. - NonZeroUsize::new(new_limit).unwrap_or_else(|| { + Some(NonZeroUsize::new(new_limit).unwrap_or_else(|| { tracing::warn!( "attempted to lower thread_limit to zero; setting to one instead" ); NonZeroUsize::new(1).unwrap() - }) + })) }; } } From 852b3dc9898db967ba212c9289478bc2d8612207 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 14:58:59 -0700 Subject: [PATCH 2/8] Fix WASM builds --- src/lib.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 5d8fc65..09bb3de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -169,6 +169,11 @@ impl Executor { } } + #[cfg(target_family = "wasm")] + fn max_threads() -> usize { + DEFAULT_MAX_THREADS + } + /// Get a reference to the global executor. #[inline] fn get() -> &'static Self { @@ -184,7 +189,7 @@ impl Executor { cvar: Condvar::new(), }; - return &EXECUTOR; + &EXECUTOR } #[cfg(target_family = "wasm")] From 27fd77f39ff78ebccbd213576185e9d98d3d9a00 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 15:00:42 -0700 Subject: [PATCH 3/8] Atttempt 2 at fixing wasm builds --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 09bb3de..f16a0cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -171,7 +171,7 @@ impl Executor { #[cfg(target_family = "wasm")] fn max_threads() -> usize { - DEFAULT_MAX_THREADS + 1 } /// Get a reference to the global executor. From bffdffa7113f2aeaad3b7a1897e5bcdbe629f587 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 15:03:15 -0700 Subject: [PATCH 4/8] VecDeque::new is const only in 1.68+ --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ccdca08..05e541f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "blocking" version = "1.5.1" authors = ["Stjepan Glavina "] edition = "2021" -rust-version = "1.63" +rust-version = "1.68" description = "A thread pool for isolating blocking I/O in async programs" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/blocking" From c1816209963ced955be8a8f5f4ebcbf45de18966 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 16:51:26 -0700 Subject: [PATCH 5/8] Return NonZeroUsize instead --- src/lib.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f16a0cf..4018045 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,7 +115,13 @@ pub use async_task::Task; /// Default value for max threads that Executor can grow to #[cfg(not(target_family = "wasm"))] -const DEFAULT_MAX_THREADS: usize = 500; +const DEFAULT_MAX_THREADS: NonZeroUsize = { + if let Some(size) = NonZeroUsize::new(500) { + size + } else { + panic!("DEFAULT_MAX_THREADS is non-zero"); + } +}; /// Minimum value for max threads config #[cfg(not(target_family = "wasm"))] @@ -159,19 +165,20 @@ struct Inner { impl Executor { #[cfg(not(target_family = "wasm"))] - fn max_threads() -> usize { + fn max_threads() -> NonZeroUsize { match env::var(MAX_THREADS_ENV) { Ok(v) => v .parse::() - .map(|v| v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS)) + .ok() + .and_then(|v| NonZeroUsize::new(v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS))) .unwrap_or(DEFAULT_MAX_THREADS), Err(_) => DEFAULT_MAX_THREADS, } } #[cfg(target_family = "wasm")] - fn max_threads() -> usize { - 1 + fn max_threads() -> NonZeroUsize { + NonZeroUsize::new(1).unwrap() } /// Get a reference to the global executor. @@ -282,7 +289,7 @@ impl Executor { let thread_limit = inner .thread_limit - .get_or_insert_with(|| NonZeroUsize::new(Self::max_threads()).unwrap()) + .get_or_insert_with(|| Self::max_threads()) .get(); // If runnable tasks greatly outnumber idle threads and there aren't too many threads From f7cfaf2b0c5ca0d00b6f9913257d0c394fd0f7af Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 16:52:52 -0700 Subject: [PATCH 6/8] Fix tests --- src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4018045..80bce4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -988,22 +988,22 @@ mod tests { fn test_max_threads() { // properly set env var env::set_var(MAX_THREADS_ENV, "100"); - assert_eq!(100, Executor::max_threads()); + assert_eq!(100, Executor::max_threads().get()); // passed value below minimum, so we set it to minimum env::set_var(MAX_THREADS_ENV, "0"); - assert_eq!(1, Executor::max_threads()); + assert_eq!(1, Executor::max_threads().get()); // passed value above maximum, so we set to allowed maximum env::set_var(MAX_THREADS_ENV, "50000"); - assert_eq!(10000, Executor::max_threads()); + assert_eq!(10000, Executor::max_threads().get()); // no env var, use default env::set_var(MAX_THREADS_ENV, ""); - assert_eq!(500, Executor::max_threads()); + assert_eq!(500, Executor::max_threads().get()); // not a number, use default env::set_var(MAX_THREADS_ENV, "NOTINT"); - assert_eq!(500, Executor::max_threads()); + assert_eq!(500, Executor::max_threads().get()); } } From 14677bad4f1814b27854be65f443d64618f072eb Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 16:54:59 -0700 Subject: [PATCH 7/8] Clippy --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 80bce4b..a8929f0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -289,7 +289,7 @@ impl Executor { let thread_limit = inner .thread_limit - .get_or_insert_with(|| Self::max_threads()) + .get_or_insert_with(Self::max_threads) .get(); // If runnable tasks greatly outnumber idle threads and there aren't too many threads From aa778a11c96ecd75ec8acb262578f076eaf3ae30 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 13 May 2024 00:03:42 -0700 Subject: [PATCH 8/8] Lower MSRV to 1.63 using an Option --- Cargo.toml | 2 +- src/lib.rs | 23 ++++++++++++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1e456c1..38ec908 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "blocking" version = "1.5.1" authors = ["Stjepan Glavina "] edition = "2021" -rust-version = "1.68" +rust-version = "1.63" description = "A thread pool for isolating blocking I/O in async programs" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/blocking" diff --git a/src/lib.rs b/src/lib.rs index 0ad993a..66f6f52 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -156,13 +156,22 @@ struct Inner { /// This is the number of idle threads + the number of active threads. thread_count: usize, + // TODO: The option is only used for const-initialization. This can be replaced with + // a normal VecDeque when the MSRV can be bumped passed /// The queue of blocking tasks. - queue: VecDeque, + queue: Option>, /// Maximum number of threads in the pool thread_limit: Option, } +impl Inner { + #[inline] + fn queue(&mut self) -> &mut VecDeque { + self.queue.get_or_insert_with(VecDeque::new) + } +} + impl Executor { #[cfg(not(target_family = "wasm"))] fn max_threads() -> NonZeroUsize { @@ -190,7 +199,7 @@ impl Executor { inner: Mutex::new(Inner { idle_count: 0, thread_count: 0, - queue: VecDeque::new(), + queue: None, thread_limit: None, }), cvar: Condvar::new(), @@ -234,7 +243,7 @@ impl Executor { inner.idle_count -= 1; // Run tasks in the queue. - while let Some(runnable) = inner.queue.pop_front() { + while let Some(runnable) = inner.queue().pop_front() { // We have found a task - grow the pool if needed. self.grow_pool(inner); @@ -256,7 +265,7 @@ impl Executor { inner = lock; // If there are no tasks after a while, stop this thread. - if res.timed_out() && inner.queue.is_empty() { + if res.timed_out() && inner.queue().is_empty() { inner.idle_count -= 1; inner.thread_count -= 1; break; @@ -273,7 +282,7 @@ impl Executor { /// Schedules a runnable task for execution. fn schedule(&'static self, runnable: Runnable) { let mut inner = self.inner.lock().unwrap(); - inner.queue.push_back(runnable); + inner.queue().push_back(runnable); // Notify a sleeping thread and spawn more threads if needed. self.cvar.notify_one(); @@ -285,7 +294,7 @@ impl Executor { #[cfg(feature = "tracing")] let _span = tracing::trace_span!( "grow_pool", - queue_len = inner.queue.len(), + queue_len = inner.queue().len(), idle_count = inner.idle_count, thread_count = inner.thread_count, ) @@ -298,7 +307,7 @@ impl Executor { // If runnable tasks greatly outnumber idle threads and there aren't too many threads // already, then be aggressive: wake all idle threads and spawn one more thread. - while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < thread_limit { + while inner.queue().len() > inner.idle_count * 5 && inner.thread_count < thread_limit { #[cfg(feature = "tracing")] tracing::trace!("spawning a new thread to handle blocking tasks");