Skip to content

Commit

Permalink
feat(core/budget): time-based budget
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Mar 19, 2024
1 parent 62cdc0e commit 1a2cc49
Show file tree
Hide file tree
Showing 13 changed files with 337 additions and 101 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- next-header -->

## [Unreleased] - ReleaseDate
### Added
- coop: expose `coop::consume_budget()` to call in long computations. See documentation of the `coop` module for details.
- coop: prefer a time-based budgeting if the telemetry is enabled.

### Fixed
- telemetry: now `elfo_message_handling_time_seconds` doesn't include the time of task switching if an actor is preempted due to elfo's budget system.

## [0.2.0-alpha.14] - 2024-02-27
### Fixed
Expand Down
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,3 @@ readme = "README.md"

[profile.release]
debug = 1

[patch.crates-io]
sharded-slab = { git = "https://github.com/loyd/sharded-slab", branch = "main" }
8 changes: 7 additions & 1 deletion benches/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
[package]
name = "benches"
name = "elfo-benches"
version = "0.0.0"
publish = false

repository.workspace = true
authors.workspace = true
license.workspace = true
edition.workspace = true
readme.workspace = true

[dev-dependencies]
elfo = { path = "../elfo" }
Expand Down Expand Up @@ -37,3 +38,8 @@ harness = false
name = "messaging_tc"
path = "messaging_tc.rs"
harness = false

[[bench]]
name = "coop"
path = "coop.rs"
harness = false
46 changes: 46 additions & 0 deletions benches/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::{env, str::FromStr, thread::available_parallelism};

use tokio::runtime::{Builder, Runtime};

#[allow(dead_code)]
pub(crate) fn tokio_worker_threads() -> u32 {
env::var("TOKIO_WORKER_THREADS")
.ok()
.map(|s| u32::from_str(&s).expect("invalid value for TOKIO_WORKER_THREADS"))
.unwrap_or_else(|| {
usize::from(available_parallelism().expect("cannot get available parallelism")) as u32
})
}

#[allow(dead_code)]
pub(crate) fn max_parallelism() -> u32 {
env::var("ELFO_BENCH_MAX_PARALLELISM")
.ok()
.map(|s| u32::from_str(&s).expect("invalid value for ELFO_BENCH_MAX_PARALLELISM"))
.unwrap_or_else(|| {
usize::from(available_parallelism().expect("cannot get available parallelism")) as u32
})
}

#[allow(dead_code)]
pub(crate) fn make_mt_runtime(worker_threads: u32) -> Runtime {
use std::sync::atomic::{AtomicU32, Ordering};

// To make it easier to check in the profiler.
let test_no = {
static BENCH_NO: AtomicU32 = AtomicU32::new(0);
BENCH_NO.fetch_add(1, Ordering::Relaxed)
};

let next_worker_no = AtomicU32::new(0);

Builder::new_multi_thread()
.enable_all()
.worker_threads(worker_threads as usize)
.thread_name_fn(move || {
let worker_no = next_worker_no.fetch_add(1, Ordering::Relaxed);
format!("b{test_no}-w{worker_no}")
})
.build()
.unwrap()
}
61 changes: 61 additions & 0 deletions benches/coop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::time::{Duration, Instant};

use criterion::{criterion_group, criterion_main, Criterion};

use elfo::{config::AnyConfig, prelude::*, topology::Topology};

mod common;

#[message(ret = Duration)]
struct Summarize;

fn make_yielder(iter_count: u32) -> Blueprint {
ActorGroup::new().exec(move |mut ctx| async move {
let token = msg!(match ctx.recv().await.unwrap() {
(Summarize, token) => token,
_ => unreachable!(),
});

let start = Instant::now();
for _ in 0..iter_count {
elfo::coop::consume_budget().await;
}

ctx.respond(token, start.elapsed());
})
}

async fn run(iter_count: u32) -> Duration {
let topology = Topology::empty();
let yielder = topology.local("yielder");
let configurers = topology.local("system.configurers").entrypoint();

let yielder_addr = yielder.addr();

yielder.mount(make_yielder(iter_count));
configurers.mount(elfo::batteries::configurer::fixture(
&topology,
AnyConfig::default(),
));

elfo::_priv::do_start(topology, false, |ctx, _| async move {
ctx.request_to(yielder_addr, Summarize)
.resolve()
.await
.unwrap()
})
.await
.unwrap()
}

fn by_count(c: &mut Criterion) {
c.bench_function("count", |b| {
b.iter_custom(|iter_count| {
let rt = common::make_mt_runtime(common::tokio_worker_threads());
rt.block_on(run(iter_count as u32))
})
});
}

criterion_group!(cases, by_count);
criterion_main!(cases);
57 changes: 7 additions & 50 deletions benches/messaging.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
use std::{
env,
str::FromStr,
thread::available_parallelism,
time::{Duration, Instant},
};
use std::time::{Duration, Instant};

use criterion::{black_box, criterion_group, BenchmarkId, Criterion, Throughput};
use derive_more::Display;
use tokio::runtime::{Builder, Runtime};

use elfo::{
config::AnyConfig,
Expand All @@ -18,6 +12,9 @@ use elfo::{
Addr, Local,
};

#[path = "common.rs"]
mod common;

// === Messages ===

#[message]
Expand Down Expand Up @@ -247,28 +244,6 @@ fn make_name<const FLAGS: Flags>() -> (&'static str, &'static str) {
(group_id, function_id)
}

fn make_runtime(worker_threads: u32) -> Runtime {
use std::sync::atomic::{AtomicU32, Ordering};

// To make it easier to check in the profiler.
let test_no = {
static BENCH_NO: AtomicU32 = AtomicU32::new(0);
BENCH_NO.fetch_add(1, Ordering::Relaxed)
};

let next_worker_no = AtomicU32::new(0);

Builder::new_multi_thread()
.enable_all()
.worker_threads(worker_threads as usize)
.thread_name_fn(move || {
let worker_no = next_worker_no.fetch_add(1, Ordering::Relaxed);
format!("b{test_no}-w{worker_no}")
})
.build()
.unwrap()
}

fn case<const FLAGS: Flags>(c: &mut Criterion) {
let params = CaseParams::new::<FLAGS>();

Expand All @@ -280,7 +255,7 @@ fn case<const FLAGS: Flags>(c: &mut Criterion) {

group.bench_with_input(BenchmarkId::new(function_id, &p), &p.producers, |b, _| {
b.iter_custom(|iter_count| {
let rt = make_runtime(p.workers);
let rt = common::make_mt_runtime(p.workers);
let elapsed =
rt.block_on(run::<FLAGS>(p.producers, p.consumers, iter_count as u32));
rt.shutdown_timeout(Duration::from_secs(10));
Expand All @@ -301,8 +276,8 @@ struct CaseParams {

impl CaseParams {
fn new<const FLAGS: Flags>() -> Vec<Self> {
let max_parallelism = max_parallelism();
let workers = tokio_worker_threads();
let max_parallelism = common::max_parallelism();
let workers = common::tokio_worker_threads();

(1..=10)
.chain((12..=30).step_by(2))
Expand All @@ -323,24 +298,6 @@ impl CaseParams {
}
}

fn tokio_worker_threads() -> u32 {
env::var("TOKIO_WORKER_THREADS")
.ok()
.map(|s| u32::from_str(&s).expect("invalid value for TOKIO_WORKER_THREADS"))
.unwrap_or_else(|| {
usize::from(available_parallelism().expect("cannot get available parallelism")) as u32
})
}

fn max_parallelism() -> u32 {
env::var("ELFO_BENCH_MAX_PARALLELISM")
.ok()
.map(|s| u32::from_str(&s).expect("invalid value for ELFO_BENCH_MAX_PARALLELISM"))
.unwrap_or_else(|| {
usize::from(available_parallelism().expect("cannot get available parallelism")) as u32
})
}

// === Cases ===

// Sends messages using the routing subsystem.
Expand Down
29 changes: 9 additions & 20 deletions elfo-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
addr::Addr,
address_book::AddressBook,
config::AnyConfig,
coop,
demux::Demux,
dumping::{Direction, Dump, Dumper, INTERNAL_CLASS},
envelope::{AnyMessageBorrowed, AnyMessageOwned, Envelope, EnvelopeOwned, MessageKind},
Expand All @@ -26,9 +27,8 @@ use crate::{
source::{SourceHandle, Sources, UnattachedSource},
};

use self::{budget::Budget, stats::Stats};
use self::stats::Stats;

mod budget;
mod stats;

static DUMPER: Lazy<Dumper> = Lazy::new(|| Dumper::new(INTERNAL_CLASS));
Expand All @@ -46,7 +46,6 @@ pub struct Context<C = (), K = Singleton> {
sources: Sources,
stage: Stage,
stats: Stats,
budget: Budget,
}

#[derive(Clone, Copy, PartialEq)]
Expand Down Expand Up @@ -457,7 +456,7 @@ impl<C, K> Context<C, K> {
///
/// The method returns the execution back to the runtime once the actor's
/// budget has been exhausted. It prevents the actor from blocking the
/// runtime for too long.
/// runtime for too long. See [`coop`] for details.
///
/// # Cancel safety
///
Expand Down Expand Up @@ -487,10 +486,7 @@ impl<C, K> Context<C, K> {
C: 'static,
{
'outer: loop {
// TODO: reset if the mailbox is empty.
self.budget.acquire().await;

self.pre_recv();
self.pre_recv().await;

let envelope = 'received: {
let mailbox_fut = self.actor.as_ref()?.as_actor()?.recv();
Expand Down Expand Up @@ -534,7 +530,7 @@ impl<C, K> Context<C, K> {
///
/// The method returns the execution back to the runtime once the actor's
/// budget has been exhausted. It prevents the actor from blocking the
/// runtime for too long.
/// runtime for too long. See [`coop`] for details.
///
/// # Cancel safety
///
Expand Down Expand Up @@ -580,9 +576,7 @@ impl<C, K> Context<C, K> {
{
#[allow(clippy::never_loop)] // false positive
loop {
self.budget.acquire().await;

self.pre_recv();
self.pre_recv().await;

let envelope = 'received: {
let actor = ward!(
Expand Down Expand Up @@ -658,9 +652,11 @@ impl<C, K> Context<C, K> {
.expect("start_info is not available for a group context")
}

fn pre_recv(&mut self) {
async fn pre_recv(&mut self) {
self.stats.on_recv();

coop::consume_budget().await;

if unlikely(self.stage == Stage::Closed) {
panic!("calling `recv()` or `try_recv()` after `None` is returned, an infinite loop?");
}
Expand All @@ -678,8 +674,6 @@ impl<C, K> Context<C, K> {
where
C: 'static,
{
self.budget.decrement();

scope::set_trace_id(envelope.trace_id());

let envelope = msg!(match envelope {
Expand Down Expand Up @@ -760,7 +754,6 @@ impl<C, K> Context<C, K> {
sources: Sources::new(),
stage: self.stage,
stats: Stats::empty(),
budget: self.budget.clone(),
}
}

Expand All @@ -783,7 +776,6 @@ impl<C, K> Context<C, K> {
sources: self.sources,
stage: self.stage,
stats: self.stats,
budget: self.budget,
}
}

Expand Down Expand Up @@ -818,7 +810,6 @@ impl<C, K> Context<C, K> {
sources: self.sources,
stage: self.stage,
stats: self.stats,
budget: self.budget,
}
}
}
Expand Down Expand Up @@ -881,7 +872,6 @@ impl Context {
sources: Sources::new(),
stage: Stage::PreRecv,
stats: Stats::empty(),
budget: Budget::default(),
}
}
}
Expand All @@ -901,7 +891,6 @@ impl<C, K: Clone> Clone for Context<C, K> {
sources: Sources::new(),
stage: self.stage,
stats: Stats::empty(),
budget: self.budget.clone(),
}
}
}
Expand Down
Loading

0 comments on commit 1a2cc49

Please sign in to comment.