From 1a2cc490d1f401e99172971d7ab8e8fe70dbcf9a Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Tue, 19 Mar 2024 14:28:31 +0100 Subject: [PATCH] feat(core/budget): time-based budget --- CHANGELOG.md | 6 + Cargo.toml | 3 - benches/Cargo.toml | 8 +- benches/common.rs | 46 ++++++ benches/coop.rs | 61 +++++++ benches/messaging.rs | 57 +------ elfo-core/src/context.rs | 29 ++-- elfo-core/src/context/budget.rs | 25 --- elfo-core/src/coop.rs | 196 +++++++++++++++++++++++ elfo-core/src/lib.rs | 1 + elfo-core/src/supervisor/measure_poll.rs | 2 + elfo-utils/src/time.rs | 2 +- examples/Cargo.toml | 2 +- 13 files changed, 337 insertions(+), 101 deletions(-) create mode 100644 benches/common.rs create mode 100644 benches/coop.rs delete mode 100644 elfo-core/src/context/budget.rs create mode 100644 elfo-core/src/coop.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 3395f8d1..f0026fa0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Added +- coop: expose `coop::consume_budget()` to call in long computations. See documentation of the `coop` module for details. +- coop: prefer a time-based budgeting if the telemetry is enabled. + +### Fixed +- telemetry: now `elfo_message_handling_time_seconds` doesn't include the time of task switching if an actor is preempted due to elfo's budget system. ## [0.2.0-alpha.14] - 2024-02-27 ### Fixed diff --git a/Cargo.toml b/Cargo.toml index e7ae5f18..f1b9f5dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,3 @@ readme = "README.md" [profile.release] debug = 1 - -[patch.crates-io] -sharded-slab = { git = "https://github.com/loyd/sharded-slab", branch = "main" } diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 669ceddb..3504d65f 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "benches" +name = "elfo-benches" version = "0.0.0" publish = false @@ -7,6 +7,7 @@ repository.workspace = true authors.workspace = true license.workspace = true edition.workspace = true +readme.workspace = true [dev-dependencies] elfo = { path = "../elfo" } @@ -37,3 +38,8 @@ harness = false name = "messaging_tc" path = "messaging_tc.rs" harness = false + +[[bench]] +name = "coop" +path = "coop.rs" +harness = false diff --git a/benches/common.rs b/benches/common.rs new file mode 100644 index 00000000..352e5965 --- /dev/null +++ b/benches/common.rs @@ -0,0 +1,46 @@ +use std::{env, str::FromStr, thread::available_parallelism}; + +use tokio::runtime::{Builder, Runtime}; + +#[allow(dead_code)] +pub(crate) fn tokio_worker_threads() -> u32 { + env::var("TOKIO_WORKER_THREADS") + .ok() + .map(|s| u32::from_str(&s).expect("invalid value for TOKIO_WORKER_THREADS")) + .unwrap_or_else(|| { + usize::from(available_parallelism().expect("cannot get available parallelism")) as u32 + }) +} + +#[allow(dead_code)] +pub(crate) fn max_parallelism() -> u32 { + env::var("ELFO_BENCH_MAX_PARALLELISM") + .ok() + .map(|s| u32::from_str(&s).expect("invalid value for ELFO_BENCH_MAX_PARALLELISM")) + .unwrap_or_else(|| { + usize::from(available_parallelism().expect("cannot get available parallelism")) as u32 + }) +} + +#[allow(dead_code)] +pub(crate) fn make_mt_runtime(worker_threads: u32) -> Runtime { + use std::sync::atomic::{AtomicU32, Ordering}; + + // To make it easier to check in the profiler. + let test_no = { + static BENCH_NO: AtomicU32 = AtomicU32::new(0); + BENCH_NO.fetch_add(1, Ordering::Relaxed) + }; + + let next_worker_no = AtomicU32::new(0); + + Builder::new_multi_thread() + .enable_all() + .worker_threads(worker_threads as usize) + .thread_name_fn(move || { + let worker_no = next_worker_no.fetch_add(1, Ordering::Relaxed); + format!("b{test_no}-w{worker_no}") + }) + .build() + .unwrap() +} diff --git a/benches/coop.rs b/benches/coop.rs new file mode 100644 index 00000000..08440cf8 --- /dev/null +++ b/benches/coop.rs @@ -0,0 +1,61 @@ +use std::time::{Duration, Instant}; + +use criterion::{criterion_group, criterion_main, Criterion}; + +use elfo::{config::AnyConfig, prelude::*, topology::Topology}; + +mod common; + +#[message(ret = Duration)] +struct Summarize; + +fn make_yielder(iter_count: u32) -> Blueprint { + ActorGroup::new().exec(move |mut ctx| async move { + let token = msg!(match ctx.recv().await.unwrap() { + (Summarize, token) => token, + _ => unreachable!(), + }); + + let start = Instant::now(); + for _ in 0..iter_count { + elfo::coop::consume_budget().await; + } + + ctx.respond(token, start.elapsed()); + }) +} + +async fn run(iter_count: u32) -> Duration { + let topology = Topology::empty(); + let yielder = topology.local("yielder"); + let configurers = topology.local("system.configurers").entrypoint(); + + let yielder_addr = yielder.addr(); + + yielder.mount(make_yielder(iter_count)); + configurers.mount(elfo::batteries::configurer::fixture( + &topology, + AnyConfig::default(), + )); + + elfo::_priv::do_start(topology, false, |ctx, _| async move { + ctx.request_to(yielder_addr, Summarize) + .resolve() + .await + .unwrap() + }) + .await + .unwrap() +} + +fn by_count(c: &mut Criterion) { + c.bench_function("count", |b| { + b.iter_custom(|iter_count| { + let rt = common::make_mt_runtime(common::tokio_worker_threads()); + rt.block_on(run(iter_count as u32)) + }) + }); +} + +criterion_group!(cases, by_count); +criterion_main!(cases); diff --git a/benches/messaging.rs b/benches/messaging.rs index 1b2442f4..70ee285b 100644 --- a/benches/messaging.rs +++ b/benches/messaging.rs @@ -1,13 +1,7 @@ -use std::{ - env, - str::FromStr, - thread::available_parallelism, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; use criterion::{black_box, criterion_group, BenchmarkId, Criterion, Throughput}; use derive_more::Display; -use tokio::runtime::{Builder, Runtime}; use elfo::{ config::AnyConfig, @@ -18,6 +12,9 @@ use elfo::{ Addr, Local, }; +#[path = "common.rs"] +mod common; + // === Messages === #[message] @@ -247,28 +244,6 @@ fn make_name() -> (&'static str, &'static str) { (group_id, function_id) } -fn make_runtime(worker_threads: u32) -> Runtime { - use std::sync::atomic::{AtomicU32, Ordering}; - - // To make it easier to check in the profiler. - let test_no = { - static BENCH_NO: AtomicU32 = AtomicU32::new(0); - BENCH_NO.fetch_add(1, Ordering::Relaxed) - }; - - let next_worker_no = AtomicU32::new(0); - - Builder::new_multi_thread() - .enable_all() - .worker_threads(worker_threads as usize) - .thread_name_fn(move || { - let worker_no = next_worker_no.fetch_add(1, Ordering::Relaxed); - format!("b{test_no}-w{worker_no}") - }) - .build() - .unwrap() -} - fn case(c: &mut Criterion) { let params = CaseParams::new::(); @@ -280,7 +255,7 @@ fn case(c: &mut Criterion) { group.bench_with_input(BenchmarkId::new(function_id, &p), &p.producers, |b, _| { b.iter_custom(|iter_count| { - let rt = make_runtime(p.workers); + let rt = common::make_mt_runtime(p.workers); let elapsed = rt.block_on(run::(p.producers, p.consumers, iter_count as u32)); rt.shutdown_timeout(Duration::from_secs(10)); @@ -301,8 +276,8 @@ struct CaseParams { impl CaseParams { fn new() -> Vec { - let max_parallelism = max_parallelism(); - let workers = tokio_worker_threads(); + let max_parallelism = common::max_parallelism(); + let workers = common::tokio_worker_threads(); (1..=10) .chain((12..=30).step_by(2)) @@ -323,24 +298,6 @@ impl CaseParams { } } -fn tokio_worker_threads() -> u32 { - env::var("TOKIO_WORKER_THREADS") - .ok() - .map(|s| u32::from_str(&s).expect("invalid value for TOKIO_WORKER_THREADS")) - .unwrap_or_else(|| { - usize::from(available_parallelism().expect("cannot get available parallelism")) as u32 - }) -} - -fn max_parallelism() -> u32 { - env::var("ELFO_BENCH_MAX_PARALLELISM") - .ok() - .map(|s| u32::from_str(&s).expect("invalid value for ELFO_BENCH_MAX_PARALLELISM")) - .unwrap_or_else(|| { - usize::from(available_parallelism().expect("cannot get available parallelism")) as u32 - }) -} - // === Cases === // Sends messages using the routing subsystem. diff --git a/elfo-core/src/context.rs b/elfo-core/src/context.rs index ac1e8e0a..9754427c 100644 --- a/elfo-core/src/context.rs +++ b/elfo-core/src/context.rs @@ -11,6 +11,7 @@ use crate::{ addr::Addr, address_book::AddressBook, config::AnyConfig, + coop, demux::Demux, dumping::{Direction, Dump, Dumper, INTERNAL_CLASS}, envelope::{AnyMessageBorrowed, AnyMessageOwned, Envelope, EnvelopeOwned, MessageKind}, @@ -26,9 +27,8 @@ use crate::{ source::{SourceHandle, Sources, UnattachedSource}, }; -use self::{budget::Budget, stats::Stats}; +use self::stats::Stats; -mod budget; mod stats; static DUMPER: Lazy = Lazy::new(|| Dumper::new(INTERNAL_CLASS)); @@ -46,7 +46,6 @@ pub struct Context { sources: Sources, stage: Stage, stats: Stats, - budget: Budget, } #[derive(Clone, Copy, PartialEq)] @@ -457,7 +456,7 @@ impl Context { /// /// The method returns the execution back to the runtime once the actor's /// budget has been exhausted. It prevents the actor from blocking the - /// runtime for too long. + /// runtime for too long. See [`coop`] for details. /// /// # Cancel safety /// @@ -487,10 +486,7 @@ impl Context { C: 'static, { 'outer: loop { - // TODO: reset if the mailbox is empty. - self.budget.acquire().await; - - self.pre_recv(); + self.pre_recv().await; let envelope = 'received: { let mailbox_fut = self.actor.as_ref()?.as_actor()?.recv(); @@ -534,7 +530,7 @@ impl Context { /// /// The method returns the execution back to the runtime once the actor's /// budget has been exhausted. It prevents the actor from blocking the - /// runtime for too long. + /// runtime for too long. See [`coop`] for details. /// /// # Cancel safety /// @@ -580,9 +576,7 @@ impl Context { { #[allow(clippy::never_loop)] // false positive loop { - self.budget.acquire().await; - - self.pre_recv(); + self.pre_recv().await; let envelope = 'received: { let actor = ward!( @@ -658,9 +652,11 @@ impl Context { .expect("start_info is not available for a group context") } - fn pre_recv(&mut self) { + async fn pre_recv(&mut self) { self.stats.on_recv(); + coop::consume_budget().await; + if unlikely(self.stage == Stage::Closed) { panic!("calling `recv()` or `try_recv()` after `None` is returned, an infinite loop?"); } @@ -678,8 +674,6 @@ impl Context { where C: 'static, { - self.budget.decrement(); - scope::set_trace_id(envelope.trace_id()); let envelope = msg!(match envelope { @@ -760,7 +754,6 @@ impl Context { sources: Sources::new(), stage: self.stage, stats: Stats::empty(), - budget: self.budget.clone(), } } @@ -783,7 +776,6 @@ impl Context { sources: self.sources, stage: self.stage, stats: self.stats, - budget: self.budget, } } @@ -818,7 +810,6 @@ impl Context { sources: self.sources, stage: self.stage, stats: self.stats, - budget: self.budget, } } } @@ -881,7 +872,6 @@ impl Context { sources: Sources::new(), stage: Stage::PreRecv, stats: Stats::empty(), - budget: Budget::default(), } } } @@ -901,7 +891,6 @@ impl Clone for Context { sources: Sources::new(), stage: self.stage, stats: Stats::empty(), - budget: self.budget.clone(), } } } diff --git a/elfo-core/src/context/budget.rs b/elfo-core/src/context/budget.rs deleted file mode 100644 index 99676d80..00000000 --- a/elfo-core/src/context/budget.rs +++ /dev/null @@ -1,25 +0,0 @@ -#[derive(Clone)] -pub(crate) struct Budget(u8); - -impl Default for Budget { - fn default() -> Self { - Self(64) - } -} - -impl Budget { - pub(crate) async fn acquire(&mut self) { - if self.0 == 0 { - // We should reset the budget before `yield_now()` because - // `select! { _ => ctx.recv() .. }` above can lock the branch forever. - *self = Self::default(); - tokio::task::yield_now().await; - } - } - - pub(crate) fn decrement(&mut self) { - // We use a saturating operation here because `try_recv()` - // can be called many times without calling `Budget::acquire()`. - self.0 = self.0.saturating_sub(1); - } -} diff --git a/elfo-core/src/coop.rs b/elfo-core/src/coop.rs new file mode 100644 index 00000000..568ca51b --- /dev/null +++ b/elfo-core/src/coop.rs @@ -0,0 +1,196 @@ +//! Provides a cooperative budget for actors. +//! +//! # The Problem +//! +//! A single call to an actor's `poll` may potentially do a lot of work before +//! it returns `Poll::Pending`. If an actor runs for a long period of time +//! without yielding back to the executor, it can starve other actors waiting on +//! that executor to execute them, or drive underlying resources. Since Rust +//! does not have a runtime, it is difficult to forcibly preempt a long-running +//! task. Instead, this module provides an opt-in mechanism for actors to +//! collaborate with the executor to avoid starvation. +//! +//! This approach is similar to the one used in tokio. +//! +//! # Supported budgets +//! +//! Elfo supports two kinds of budgeting for actors: +//! * by time: the actor yields to the executor after a certain amount of time, +//! 5s by default. +//! * by count: the actor yields to the executor after a certain number of +//! [`consume_budget()`] calls, 64 by default. +//! +//! A used kind is determined by the presence of telemetry. It's a controversial +//! decision, but the reason is that telemetry requires fast time source +//! (usually, TSC), which is provided by the `quanta` crate. In this case, time +//! measurements are negligible and a time-based budget is preferred as a more +//! reliable way to prevent starvation. +//! +//! Note that methods [`Context::recv()`] and [`Context::try_recv()`] call +//! [`consume_budget()`] already, so you don't need to think about budgeting +//! in most cases. +//! +//! These limits cannot be configured for now. +//! +//! # Coordination with tokio's budget system +//! +//! Tokio has its own budget system, which is unstable and cannot be used by +//! elfo. Thus, elfo's budget system is independent of tokio's and work +//! simultaneously. +//! +//! However, since both budgets are reset at start of each actor's poll, no +//! matter which budget is exhausted first, both system work coordinately. +//! +//! [`Context::recv()`]: crate::context::Context::recv +//! [`Context::try_recv()`]: crate::context::Context::try_recv + +use std::cell::Cell; + +use elfo_utils::time::Instant; + +// TODO: make it configurable as `system.budget = "5ms" | 64 | "Unlimited"` +const MAX_TIME_NS: u64 = 5_000_000; // 5ms +const MAX_COUNT: u32 = 64; + +thread_local! { + static BUDGET: Cell = Cell::new(Budget::ByCount(0)); +} + +#[derive(Debug, Clone, Copy)] +#[cfg_attr(test, derive(PartialEq))] +enum Budget { + /// Used when telemetry is enabled. + /// We already measure time using `quanta` which is fast. + ByTime(/* busy_since */ Instant), + /// Otherwise, limit the number of `recv()` calls. + ByCount(u32), +} + +#[inline] +pub(crate) fn reset(busy_since: Option) { + BUDGET.with(|budget| budget.set(busy_since.map_or(Budget::ByCount(MAX_COUNT), Budget::ByTime))); +} + +/// Consumes a unit of budget and returns the execution back to the executor, +/// but only if the actor's coop budget has been exhausted. +/// +/// This function can be used in order to insert optional yield points into long +/// computations that do not use `Context::recv()`, `Context::try_recv()` or +/// tokio resources for a long time without redundantly yielding to the executor +/// each time. +/// +/// # Example +/// +/// Make sure that a function which returns a sum of (potentially lots of) +/// iterated values is cooperative. +/// +/// ``` +/// # use elfo_core as elfo; +/// async fn sum_iterator(input: &mut impl std::iter::Iterator) -> i64 { +/// let mut sum: i64 = 0; +/// while let Some(i) = input.next() { +/// sum += i; +/// elfo::coop::consume_budget().await; +/// } +/// sum +/// } +/// ``` +#[inline] +pub async fn consume_budget() { + let to_preempt = BUDGET.with(|cell| { + let budget = cell.get(); + + match budget { + Budget::ByTime(busy_since) => Instant::now().nanos_since(busy_since) >= MAX_TIME_NS, + Budget::ByCount(0) => true, + Budget::ByCount(left) => { + cell.set(Budget::ByCount(left - 1)); + false + } + } + }); + + if to_preempt { + tokio::task::yield_now().await; + } +} + +#[cfg(test)] +mod tests { + use std::{ + future::Future, + pin::{pin, Pin}, + task::{Context, Poll}, + time::Duration, + }; + + use pin_project::pin_project; + use tokio::runtime::Builder; + + use elfo_utils::time::with_instant_mock; + + use super::*; + + fn current_budget() -> Budget { + BUDGET.with(Cell::get) + } + + #[pin_project] + struct ResetOnPoll(/* use time */ bool, #[pin] F); + + impl Future for ResetOnPoll { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + reset(if *this.0 { Some(Instant::now()) } else { None }); + this.1.poll(cx) + } + } + + #[test] + fn by_count() { + let rt = Builder::new_current_thread().build().unwrap(); + + let task = async { + assert_eq!(current_budget(), current_budget()); + + for _ in 0..10 { + for i in (0..=MAX_COUNT).rev() { + assert_eq!(current_budget(), Budget::ByCount(i)); + consume_budget().await; + } + } + }; + + rt.block_on(ResetOnPoll(false, task)); + } + + #[test] + fn by_time() { + let rt = Builder::new_current_thread().build().unwrap(); + let steps = 10; + let timestep = Duration::from_nanos(MAX_TIME_NS / steps); + + with_instant_mock(|mock| { + let task = async move { + assert_eq!(current_budget(), current_budget()); + + for _ in 0..10 { + let before = current_budget(); + assert!(matches!(before, Budget::ByTime(_))); + + for _ in 0..steps { + mock.advance(timestep); + assert_eq!(current_budget(), before); + consume_budget().await; + } + + assert_ne!(current_budget(), before); + } + }; + + rt.block_on(ResetOnPoll(true, task)); + }) + } +} diff --git a/elfo-core/src/lib.rs b/elfo-core/src/lib.rs index e58a5def..38695b2d 100644 --- a/elfo-core/src/lib.rs +++ b/elfo-core/src/lib.rs @@ -30,6 +30,7 @@ pub use elfo_macros::{message_core as message, msg_core as msg}; mod macros; pub mod config; +pub mod coop; pub mod dumping; pub mod errors; pub mod init; diff --git a/elfo-core/src/supervisor/measure_poll.rs b/elfo-core/src/supervisor/measure_poll.rs index 2b2b4fa1..1a4e7c56 100644 --- a/elfo-core/src/supervisor/measure_poll.rs +++ b/elfo-core/src/supervisor/measure_poll.rs @@ -50,6 +50,7 @@ impl Future for MeasurePoll { let result = if let Some(recorder) = metrics::try_recorder() { let start_time = Instant::now(); + crate::coop::reset(Some(start_time)); let res = this.inner.poll(cx); let elapsed = Instant::now().secs_f64_since(start_time); recorder.record_histogram(&BUSY_TIME_SECONDS, elapsed); @@ -60,6 +61,7 @@ impl Future for MeasurePoll { }); res } else { + crate::coop::reset(None); this.inner.poll(cx) }; diff --git a/elfo-utils/src/time.rs b/elfo-utils/src/time.rs index 6598d018..95c52133 100644 --- a/elfo-utils/src/time.rs +++ b/elfo-utils/src/time.rs @@ -8,7 +8,7 @@ use quanta::Clock; /// A measurement of a monotonically nondecreasing clock. #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct Instant(u64); +pub struct Instant(u64); // TODO: make it `NonZeroU64`? impl Instant { /// Returns the current time. diff --git a/examples/Cargo.toml b/examples/Cargo.toml index fb521c1d..ea0a6d76 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -10,7 +10,7 @@ edition.workspace = true readme.workspace = true [dev-dependencies] -elfo = { version = "0.2.0-alpha.14", path = "../elfo", features = ["full", "network", "test-util"] } +elfo = { path = "../elfo", features = ["full", "network", "test-util"] } anyhow = "1.0.40" futures = "0.3.12"