diff --git a/Cargo.toml b/Cargo.toml index 35c7076..ac5cd10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,13 +10,16 @@ path = "src/lib.rs" doctest = false [dependencies] +futures = "0.3.31" +moro-local = "0.4.0" +tokio = { version = "1", default-features = false, features = ["rt", "time", "macros", "rt-multi-thread"] } unicode-segmentation = "1" [profile.release] # debug = true # Enable debug symbols -strip = true # Strip symbols from binary -lto = true # Enable link-time optimization -codegen-units = 64 # Improve compile time +strip = true # Strip symbols from binary +lto = true # Enable link-time optimization +codegen-units = 64 # Improve compile time panic = 'abort' -opt-level = 1 # Basic optimizations, improved compile time -incremental = true # shorten hot local builds duration +opt-level = 1 # Basic optimizations, improved compile time +incremental = true # shorten hot local builds duration diff --git a/README.md b/README.md index 08a88e0..1205427 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,11 @@ # Rust gems -This repository offers some Rust snippets that might be useful when studying the language. +This repository offers some Rust snippets that can be useful when studying the language. - [vec_any](src/vec_any.rs) Collect trait objects in a vector and use downcast_ref to access the concrete type instances of the items -- [mutate_in_closure](src/mutate_in_closure.rs) Mutate a value in a closure without copy and clone +- [mutate_in_closure](src/mutate_in_closure.rs) Mutate a value in a closure without copy and clone by using Rc +- [async structured concurrency)[src/async_structured_concurrency.rs] Improve developer UX while maintaining a decent performance with async scopes +- [async_higher_order_fn](src/async_higher_order_fn.rs) Implement an async higher order function that accept an async closures and returns an async result - [from_str](src/from_str.rs) Thou shall not implement From\ but instead implement the FromStr trait - [graphemes](src/graphemes.rs) Trim an unicode string to a maximum length with diff --git a/justfile b/justfile index 5bd25be..aa945e7 100644 --- a/justfile +++ b/justfile @@ -14,6 +14,12 @@ test-all *ARGS: check-dependencies test-watch-all *ARGS: check-dependencies cargo watch -c -w . -x "nextest run --verbose {{ ARGS }}" +# run all tests with nextest in watch mode, use the cranelift compiler to reduce incremental build times +test-watch-all-cranelift *ARGS: check-dependencies + rustup override set nightly + export RUSTFLAGS="${RUSTFLAGS} -Zthreads=8" + CARGO_PROFILE_DEV_CODEGEN_BACKEND=cranelift cargo watch -q -c --ignore '**/generated_at_build.rs' -w . -x "+nightly nextest run -Zcodegen-backend --all-features --verbose {{ARGS}}" + # build the project in release mode build-release: cargo build --release --verbose diff --git a/src/async_higher_order_fn.rs b/src/async_higher_order_fn.rs new file mode 100644 index 0000000..a791c7c --- /dev/null +++ b/src/async_higher_order_fn.rs @@ -0,0 +1,46 @@ +#![allow(dead_code, unused)] + +use std::{error::Error, future::Future, pin::Pin}; + +/// function that accepts an async function handler +fn higher_order_function(f: FunctionHandler, data: Data) -> Pin>> { + f(data) +} + +pub type FunctionHandler = &'static (dyn Fn(Data) -> Pin>>); +pub type Response = Result>; + +fn my_function_handler(data: Data) -> Pin>> { + Box::pin(async move { async_dummy().await }) +} + +/// Dummy async function to demonstrate calling a async function from a async higher order function +async fn async_dummy() -> Result> { + Ok(ResponseData { processed: true }) +} + +pub struct Data { + title: String, +} + +pub struct ResponseData { + processed: bool, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_call_it() { + let response = higher_order_function( + &my_function_handler, + Data { + title: "Hello".to_string(), + }, + ) + .await; + assert!(response.is_ok()); + assert!(response.unwrap().processed); + } +} diff --git a/src/async_structured_concurrency.rs b/src/async_structured_concurrency.rs new file mode 100644 index 0000000..bc26d5d --- /dev/null +++ b/src/async_structured_concurrency.rs @@ -0,0 +1,105 @@ +use futures::Stream; +use futures::{pin_mut, StreamExt}; +use std::error::Error; +use std::sync::RwLock; + +/// Minimal executable example of structured concurrency in Rust as eloquently described in this [blog +/// post](https://emschwartz.me/async-rust-can-be-a-pleasure-to-work-with-without-send-sync-static) +/// of Evan Schwartz. +/// This example proceses 10 incoming demo requests. Each incoming webrequest is processed in a +/// separate scope from the moro crate, similar to an std::thread::scope. +/// With this approach, we don't need types that are Send like Arc to share the database and +/// service dependencies, and no async move is needed. This greatly improves the developer UX. +/// The test_demo test is configured to use the Tokio current thread runtime to emulate thread-per-core. +/// The spawned tasks within the inner moro scope are executed within the same thread, these tasks are +/// not moved between threads and therefore the called future doees not need to be Send. +pub async fn structured_concurrency_demo(incoming: impl Stream) -> Result<(), Box> { + let context = Context::default(); + pin_mut!(incoming); + + moro_local::async_scope!(|scope| { + while let Some(request) = incoming.next().await { + let _response = scope + .spawn(async { + let request = request; + moro_local::async_scope!(|scope| { + let Ok(two_things) = context.db.load_two_things().await else { + return Err(()); + }; + for _ in two_things { + // these two tasks are executed within the scope and are awaited at the end of the + // inner scope + scope.spawn(context.service_a.do_something(&request, &context.requests_processed)); + } + let result_b = context.service_b.do_something(&request, &context.requests_processed); + let result_c = context.service_c.do_something(&request, &context.requests_processed); + let (b, c) = futures::try_join!(result_b, result_c).map_err(|_| ())?; + Ok::<_, ()>(Response { b, c }) + }) + .await + }) + .await; + } + }) + .await; + + println!("do_something executed {} times", *context.requests_processed.read().unwrap()); + + Ok(()) +} + +#[cfg(test)] +mod tests { + pub use super::*; + + /// View the test output with `just test-watch-all test_demo --success-output immediate` + #[tokio::test(flavor = "current_thread")] + async fn test_demo() -> Result<(), Box> { + let incoming = futures::stream::iter((1..=10).map(|_| Request)); + structured_concurrency_demo(incoming).await + } +} + +#[derive(Default)] +pub struct Thing; +#[derive(Default)] +pub struct Request; +#[derive(Default)] +pub struct Database; +#[derive(Default)] +#[allow(dead_code)] +pub struct Response { + b: ServiceResult, + c: ServiceResult, +} + +impl Database { + async fn load_two_things(&self) -> Result, Box> { + Ok((1..=2).map(|_| Thing).collect()) + } +} + +#[derive(Default)] +pub struct Service; +impl Service { + /// simulate a task that takes 200ms + async fn do_something(&self, _request: &Request, requests_processed: &RwLock) -> Result> { + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + let mut count = requests_processed.write().unwrap(); + *count += 1; + + Ok(ServiceResult) + } +} +#[derive(Default)] +pub struct ServiceResult; + +#[derive(Default)] +struct Context { + db: Database, + service_a: Service, + service_b: Service, + service_c: Service, + requests_processed: RwLock, +} diff --git a/src/lib.rs b/src/lib.rs index bf79e15..d31374d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +mod async_higher_order_fn; +pub mod async_structured_concurrency; pub mod from_str; pub mod graphemes; pub mod mutate_in_closure;