Skip to content

Commit

Permalink
feat: structured demo
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandervantrijffel committed Oct 27, 2024
1 parent 76d9c4c commit 24d2528
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 6 deletions.
14 changes: 8 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ path = "src/lib.rs"
doctest = false

[dependencies]
tokio = { version = "1.40", default-features= false, features = ["macros", "rt-multi-thread" ] }
futures = "0.3.31"
moro-local = "0.4.0"
tokio = { version = "1", default-features = false, features = ["rt", "time", "macros", "rt-multi-thread"] }
unicode-segmentation = "1"

[profile.release]
# debug = true # Enable debug symbols
strip = true # Strip symbols from binary
lto = true # Enable link-time optimization
codegen-units = 64 # Improve compile time
strip = true # Strip symbols from binary
lto = true # Enable link-time optimization
codegen-units = 64 # Improve compile time
panic = 'abort'
opt-level = 1 # Basic optimizations, improved compile time
incremental = true # shorten hot local builds duration
opt-level = 1 # Basic optimizations, improved compile time
incremental = true # shorten hot local builds duration
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ This repository offers some Rust snippets that can be useful when studying the l

- [vec_any](src/vec_any.rs) Collect trait objects in a vector and use downcast_ref to access the concrete type instances of the items
- [mutate_in_closure](src/mutate_in_closure.rs) Mutate a value in a closure without copy and clone by using Rc
- [async structured concurrency)[src/async_structured_concurrency.rs] Improve developer UX while maintaining a decent performance with async scopes
- [async_higher_order_fn](src/async_higher_order_fn.rs) Implement an async higher order function that accept an async closures and returns an async result
- [from_str](src/from_str.rs) Thou shall not implement From\<str'> but instead implement the FromStr trait
- [graphemes](src/graphemes.rs) Trim an unicode string to a maximum length with
Expand Down
6 changes: 6 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ test-all *ARGS: check-dependencies
test-watch-all *ARGS: check-dependencies
cargo watch -c -w . -x "nextest run --verbose {{ ARGS }}"

# run all tests with nextest in watch mode, use the cranelift compiler to reduce incremental build times
test-watch-all-cranelift *ARGS: check-dependencies
rustup override set nightly
export RUSTFLAGS="${RUSTFLAGS} -Zthreads=8"
CARGO_PROFILE_DEV_CODEGEN_BACKEND=cranelift cargo watch -q -c --ignore '**/generated_at_build.rs' -w . -x "+nightly nextest run -Zcodegen-backend --all-features --verbose {{ARGS}}"

# build the project in release mode
build-release:
cargo build --release --verbose
Expand Down
105 changes: 105 additions & 0 deletions src/async_structured_concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use futures::Stream;
use futures::{pin_mut, StreamExt};
use std::error::Error;
use std::sync::RwLock;

/// Minimal executable example of structured concurrency in Rust as eloquently described in this [blog
/// post](https://emschwartz.me/async-rust-can-be-a-pleasure-to-work-with-without-send-sync-static)
/// of Evan Schwartz.
/// This example proceses 10 incoming demo requests. Each incoming webrequest is processed in a
/// separate scope from the moro crate, similar to an std::thread::scope.

Check warning on line 10 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

item in documentation is missing backticks
/// With this approach, we don't need types that are Send like Arc to share the database and
/// service dependencies, and no async move is needed. This greatly improves the developer UX.
/// The test_demo test is configured to use the Tokio current thread runtime to emulate thread-per-core.

Check warning on line 13 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

item in documentation is missing backticks
/// The spawned tasks within the inner moro scope are executed within the same thread, these tasks are
/// not moved between threads and therefore the called future doees not need to be Send.
pub async fn structured_concurrency_demo(incoming: impl Stream<Item = Request>) -> Result<(), Box<dyn Error>> {

Check warning on line 16 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

docs for function which may panic missing `# Panics` section

Check warning on line 16 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

docs for function returning `Result` missing `# Errors` section

Check warning on line 16 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

future cannot be sent between threads safely
let context = Context::default();
pin_mut!(incoming);

moro_local::async_scope!(|scope| {
while let Some(request) = incoming.next().await {
let _response: Result<Response, ()> = scope
.spawn(async {
let request = request;
moro_local::async_scope!(|scope| {
let Ok(two_things) = context.db.load_two_things().await else {
return Err(());
};
for _ in two_things {
// these two tasks are executed within the scope and are awaited at the end of the
// inner scope
scope.spawn(context.service_a.do_something(&request, &context.requests_processed));
}
let result_b = context.service_b.do_something(&request, &context.requests_processed);
let result_c = context.service_c.do_something(&request, &context.requests_processed);
let (b, c) = futures::try_join!(result_b, result_c).map_err(|_| ())?;
Ok::<_, ()>(Response { b, c })
})
.await
})
.await;
}
})
.await;

println!("do_something executed {} times", *context.requests_processed.read().unwrap());

Ok(())
}

#[cfg(test)]
mod tests {
pub use super::*;

/// View the test output with `just test-watch-all test_demo --success-output immediate`
#[tokio::test(flavor = "current_thread")]
async fn test_demo() -> Result<(), Box<dyn Error>> {
let incoming = futures::stream::iter((1..=10).map(|_| Request));
structured_concurrency_demo(incoming).await
}
}

#[derive(Default)]
pub struct Thing;
#[derive(Default)]
pub struct Request;
#[derive(Default)]
pub struct Database;
#[derive(Default)]
#[allow(dead_code)]
pub struct Response {
b: ServiceResult,
c: ServiceResult,
}

impl Database {
async fn load_two_things(&self) -> Result<Vec<Thing>, Box<dyn Error>> {

Check warning on line 77 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

unused `async` for function with no await statements
Ok((1..=2).map(|_| Thing).collect())
}
}

#[derive(Default)]
pub struct Service;
impl Service {
/// simulate a task that takes 200ms
async fn do_something(&self, _request: &Request, requests_processed: &RwLock<usize>) -> Result<ServiceResult, Box<dyn Error>> {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;

let mut count = requests_processed.write().unwrap();

Check warning on line 89 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

temporary with significant `Drop` can be early dropped
*count += 1;

Ok(ServiceResult)
}
}
#[derive(Default)]
pub struct ServiceResult;

#[derive(Default)]
struct Context {
db: Database,
service_a: Service,
service_b: Service,
service_c: Service,
requests_processed: RwLock<usize>,
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod async_higher_order_fn;
pub mod async_structured_concurrency;
pub mod from_str;
pub mod graphemes;
pub mod mutate_in_closure;
Expand Down

0 comments on commit 24d2528

Please sign in to comment.