Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the dependency on async-lock #59

Merged
merged 9 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "blocking"
version = "1.5.1"
authors = ["Stjepan Glavina <[email protected]>"]
edition = "2021"
rust-version = "1.60"
rust-version = "1.63"
description = "A thread pool for isolating blocking I/O in async programs"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/blocking"
Expand All @@ -22,8 +22,5 @@ futures-lite = { version = "2.0.0", default-features = false }
piper = "0.2.0"
tracing = { version = "0.1.37", default-features = false, optional = true }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
async-lock = "3.0.0"

[dev-dependencies]
futures-lite = "2.0.0"
89 changes: 54 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@ pub use async_task::Task;

/// Default value for max threads that Executor can grow to
#[cfg(not(target_family = "wasm"))]
const DEFAULT_MAX_THREADS: usize = 500;
const DEFAULT_MAX_THREADS: NonZeroUsize = {
if let Some(size) = NonZeroUsize::new(500) {
size
} else {
panic!("DEFAULT_MAX_THREADS is non-zero");
}
};

/// Minimum value for max threads config
#[cfg(not(target_family = "wasm"))]
Expand Down Expand Up @@ -150,46 +156,56 @@ struct Inner {
/// This is the number of idle threads + the number of active threads.
thread_count: usize,

// TODO: The option is only used for const-initialization. This can be replaced with
// a normal VecDeque when the MSRV can be bumped passed
/// The queue of blocking tasks.
queue: VecDeque<Runnable>,
queue: Option<VecDeque<Runnable>>,

/// Maximum number of threads in the pool
thread_limit: NonZeroUsize,
thread_limit: Option<NonZeroUsize>,
}

impl Inner {
#[inline]
fn queue(&mut self) -> &mut VecDeque<Runnable> {
self.queue.get_or_insert_with(VecDeque::new)
}
}

impl Executor {
#[cfg(not(target_family = "wasm"))]
fn max_threads() -> usize {
fn max_threads() -> NonZeroUsize {
match env::var(MAX_THREADS_ENV) {
Ok(v) => v
.parse::<usize>()
.map(|v| v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS))
.ok()
.and_then(|v| NonZeroUsize::new(v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS)))
.unwrap_or(DEFAULT_MAX_THREADS),
Err(_) => DEFAULT_MAX_THREADS,
}
}

#[cfg(target_family = "wasm")]
fn max_threads() -> NonZeroUsize {
NonZeroUsize::new(1).unwrap()
}

/// Get a reference to the global executor.
#[inline]
fn get() -> &'static Self {
#[cfg(not(target_family = "wasm"))]
{
use async_lock::OnceCell;

static EXECUTOR: OnceCell<Executor> = OnceCell::new();

return EXECUTOR.get_or_init_blocking(|| {
let thread_limit = Self::max_threads();
Executor {
inner: Mutex::new(Inner {
idle_count: 0,
thread_count: 0,
queue: VecDeque::new(),
thread_limit: NonZeroUsize::new(thread_limit).unwrap(),
}),
cvar: Condvar::new(),
}
});
static EXECUTOR: Executor = Executor {
inner: Mutex::new(Inner {
idle_count: 0,
thread_count: 0,
queue: None,
thread_limit: None,
}),
cvar: Condvar::new(),
};

&EXECUTOR
}

#[cfg(target_family = "wasm")]
Expand Down Expand Up @@ -227,7 +243,7 @@ impl Executor {
inner.idle_count -= 1;

// Run tasks in the queue.
while let Some(runnable) = inner.queue.pop_front() {
while let Some(runnable) = inner.queue().pop_front() {
// We have found a task - grow the pool if needed.
self.grow_pool(inner);

Expand All @@ -249,7 +265,7 @@ impl Executor {
inner = lock;

// If there are no tasks after a while, stop this thread.
if res.timed_out() && inner.queue.is_empty() {
if res.timed_out() && inner.queue().is_empty() {
inner.idle_count -= 1;
inner.thread_count -= 1;
break;
Expand All @@ -266,7 +282,7 @@ impl Executor {
/// Schedules a runnable task for execution.
fn schedule(&'static self, runnable: Runnable) {
let mut inner = self.inner.lock().unwrap();
inner.queue.push_back(runnable);
inner.queue().push_back(runnable);

// Notify a sleeping thread and spawn more threads if needed.
self.cvar.notify_one();
Expand All @@ -278,17 +294,20 @@ impl Executor {
#[cfg(feature = "tracing")]
let _span = tracing::trace_span!(
"grow_pool",
queue_len = inner.queue.len(),
queue_len = inner.queue().len(),
idle_count = inner.idle_count,
thread_count = inner.thread_count,
)
.entered();

let thread_limit = inner
.thread_limit
.get_or_insert_with(Self::max_threads)
.get();

// If runnable tasks greatly outnumber idle threads and there aren't too many threads
// already, then be aggressive: wake all idle threads and spawn one more thread.
while inner.queue.len() > inner.idle_count * 5
&& inner.thread_count < inner.thread_limit.get()
{
while inner.queue().len() > inner.idle_count * 5 && inner.thread_count < thread_limit {
#[cfg(feature = "tracing")]
tracing::trace!("spawning a new thread to handle blocking tasks");

Expand Down Expand Up @@ -321,13 +340,13 @@ impl Executor {

// If the limit is about to be set to zero, set it to one instead so that if,
// in the future, we are able to spawn more threads, we will be able to do so.
NonZeroUsize::new(new_limit).unwrap_or_else(|| {
Some(NonZeroUsize::new(new_limit).unwrap_or_else(|| {
#[cfg(feature = "tracing")]
tracing::warn!(
"attempted to lower thread_limit to zero; setting to one instead"
);
NonZeroUsize::new(1).unwrap()
})
}))
};
}
}
Expand Down Expand Up @@ -985,22 +1004,22 @@ mod tests {
fn test_max_threads() {
// properly set env var
env::set_var(MAX_THREADS_ENV, "100");
assert_eq!(100, Executor::max_threads());
assert_eq!(100, Executor::max_threads().get());

// passed value below minimum, so we set it to minimum
env::set_var(MAX_THREADS_ENV, "0");
assert_eq!(1, Executor::max_threads());
assert_eq!(1, Executor::max_threads().get());

// passed value above maximum, so we set to allowed maximum
env::set_var(MAX_THREADS_ENV, "50000");
assert_eq!(10000, Executor::max_threads());
assert_eq!(10000, Executor::max_threads().get());

// no env var, use default
env::set_var(MAX_THREADS_ENV, "");
assert_eq!(500, Executor::max_threads());
assert_eq!(500, Executor::max_threads().get());

// not a number, use default
env::set_var(MAX_THREADS_ENV, "NOTINT");
assert_eq!(500, Executor::max_threads());
assert_eq!(500, Executor::max_threads().get());
}
}
Loading