diff --git a/Cargo.toml b/Cargo.toml index bd72a79..38ec908 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "blocking" version = "1.5.1" authors = ["Stjepan Glavina "] edition = "2021" -rust-version = "1.60" +rust-version = "1.63" description = "A thread pool for isolating blocking I/O in async programs" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/blocking" @@ -22,8 +22,5 @@ futures-lite = { version = "2.0.0", default-features = false } piper = "0.2.0" tracing = { version = "0.1.37", default-features = false, optional = true } -[target.'cfg(not(target_family = "wasm"))'.dependencies] -async-lock = "3.0.0" - [dev-dependencies] futures-lite = "2.0.0" diff --git a/src/lib.rs b/src/lib.rs index 6fd29c5..66f6f52 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,7 +115,13 @@ pub use async_task::Task; /// Default value for max threads that Executor can grow to #[cfg(not(target_family = "wasm"))] -const DEFAULT_MAX_THREADS: usize = 500; +const DEFAULT_MAX_THREADS: NonZeroUsize = { + if let Some(size) = NonZeroUsize::new(500) { + size + } else { + panic!("DEFAULT_MAX_THREADS is non-zero"); + } +}; /// Minimum value for max threads config #[cfg(not(target_family = "wasm"))] @@ -150,46 +156,56 @@ struct Inner { /// This is the number of idle threads + the number of active threads. thread_count: usize, + // TODO: The option is only used for const-initialization. This can be replaced with + // a normal VecDeque when the MSRV can be bumped passed /// The queue of blocking tasks. - queue: VecDeque, + queue: Option>, /// Maximum number of threads in the pool - thread_limit: NonZeroUsize, + thread_limit: Option, +} + +impl Inner { + #[inline] + fn queue(&mut self) -> &mut VecDeque { + self.queue.get_or_insert_with(VecDeque::new) + } } impl Executor { #[cfg(not(target_family = "wasm"))] - fn max_threads() -> usize { + fn max_threads() -> NonZeroUsize { match env::var(MAX_THREADS_ENV) { Ok(v) => v .parse::() - .map(|v| v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS)) + .ok() + .and_then(|v| NonZeroUsize::new(v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS))) .unwrap_or(DEFAULT_MAX_THREADS), Err(_) => DEFAULT_MAX_THREADS, } } + #[cfg(target_family = "wasm")] + fn max_threads() -> NonZeroUsize { + NonZeroUsize::new(1).unwrap() + } + /// Get a reference to the global executor. #[inline] fn get() -> &'static Self { #[cfg(not(target_family = "wasm"))] { - use async_lock::OnceCell; - - static EXECUTOR: OnceCell = OnceCell::new(); - - return EXECUTOR.get_or_init_blocking(|| { - let thread_limit = Self::max_threads(); - Executor { - inner: Mutex::new(Inner { - idle_count: 0, - thread_count: 0, - queue: VecDeque::new(), - thread_limit: NonZeroUsize::new(thread_limit).unwrap(), - }), - cvar: Condvar::new(), - } - }); + static EXECUTOR: Executor = Executor { + inner: Mutex::new(Inner { + idle_count: 0, + thread_count: 0, + queue: None, + thread_limit: None, + }), + cvar: Condvar::new(), + }; + + &EXECUTOR } #[cfg(target_family = "wasm")] @@ -227,7 +243,7 @@ impl Executor { inner.idle_count -= 1; // Run tasks in the queue. - while let Some(runnable) = inner.queue.pop_front() { + while let Some(runnable) = inner.queue().pop_front() { // We have found a task - grow the pool if needed. self.grow_pool(inner); @@ -249,7 +265,7 @@ impl Executor { inner = lock; // If there are no tasks after a while, stop this thread. - if res.timed_out() && inner.queue.is_empty() { + if res.timed_out() && inner.queue().is_empty() { inner.idle_count -= 1; inner.thread_count -= 1; break; @@ -266,7 +282,7 @@ impl Executor { /// Schedules a runnable task for execution. fn schedule(&'static self, runnable: Runnable) { let mut inner = self.inner.lock().unwrap(); - inner.queue.push_back(runnable); + inner.queue().push_back(runnable); // Notify a sleeping thread and spawn more threads if needed. self.cvar.notify_one(); @@ -278,17 +294,20 @@ impl Executor { #[cfg(feature = "tracing")] let _span = tracing::trace_span!( "grow_pool", - queue_len = inner.queue.len(), + queue_len = inner.queue().len(), idle_count = inner.idle_count, thread_count = inner.thread_count, ) .entered(); + let thread_limit = inner + .thread_limit + .get_or_insert_with(Self::max_threads) + .get(); + // If runnable tasks greatly outnumber idle threads and there aren't too many threads // already, then be aggressive: wake all idle threads and spawn one more thread. - while inner.queue.len() > inner.idle_count * 5 - && inner.thread_count < inner.thread_limit.get() - { + while inner.queue().len() > inner.idle_count * 5 && inner.thread_count < thread_limit { #[cfg(feature = "tracing")] tracing::trace!("spawning a new thread to handle blocking tasks"); @@ -321,13 +340,13 @@ impl Executor { // If the limit is about to be set to zero, set it to one instead so that if, // in the future, we are able to spawn more threads, we will be able to do so. - NonZeroUsize::new(new_limit).unwrap_or_else(|| { + Some(NonZeroUsize::new(new_limit).unwrap_or_else(|| { #[cfg(feature = "tracing")] tracing::warn!( "attempted to lower thread_limit to zero; setting to one instead" ); NonZeroUsize::new(1).unwrap() - }) + })) }; } } @@ -985,22 +1004,22 @@ mod tests { fn test_max_threads() { // properly set env var env::set_var(MAX_THREADS_ENV, "100"); - assert_eq!(100, Executor::max_threads()); + assert_eq!(100, Executor::max_threads().get()); // passed value below minimum, so we set it to minimum env::set_var(MAX_THREADS_ENV, "0"); - assert_eq!(1, Executor::max_threads()); + assert_eq!(1, Executor::max_threads().get()); // passed value above maximum, so we set to allowed maximum env::set_var(MAX_THREADS_ENV, "50000"); - assert_eq!(10000, Executor::max_threads()); + assert_eq!(10000, Executor::max_threads().get()); // no env var, use default env::set_var(MAX_THREADS_ENV, ""); - assert_eq!(500, Executor::max_threads()); + assert_eq!(500, Executor::max_threads().get()); // not a number, use default env::set_var(MAX_THREADS_ENV, "NOTINT"); - assert_eq!(500, Executor::max_threads()); + assert_eq!(500, Executor::max_threads().get()); } }