Skip to content

Commit

Permalink
feat: async higher order fn
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandervantrijffel committed Oct 27, 2024
1 parent fdc86e6 commit 5b6d823
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 7 deletions.
13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ path = "src/lib.rs"
doctest = false

[dependencies]
futures = "0.3.31"
moro-local = "0.4.0"
tokio = { version = "1", default-features = false, features = ["rt", "time", "macros", "rt-multi-thread"] }
unicode-segmentation = "1"

[profile.release]
# debug = true # Enable debug symbols
strip = true # Strip symbols from binary
lto = true # Enable link-time optimization
codegen-units = 64 # Improve compile time
strip = true # Strip symbols from binary
lto = true # Enable link-time optimization
codegen-units = 64 # Improve compile time
panic = 'abort'
opt-level = 1 # Basic optimizations, improved compile time
incremental = true # shorten hot local builds duration
opt-level = 1 # Basic optimizations, improved compile time
incremental = true # shorten hot local builds duration
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Rust gems

This repository offers some Rust snippets that might be useful when studying the language.
This repository offers some Rust snippets that can be useful when studying the language.

- [vec_any](src/vec_any.rs) Collect trait objects in a vector and use downcast_ref to access the concrete type instances of the items
- [mutate_in_closure](src/mutate_in_closure.rs) Mutate a value in a closure without copy and clone
- [mutate_in_closure](src/mutate_in_closure.rs) Mutate a value in a closure without copy and clone by using Rc
- [async structured concurrency)[src/async_structured_concurrency.rs] Improve developer UX while maintaining a decent performance with async scopes
- [async_higher_order_fn](src/async_higher_order_fn.rs) Implement an async higher order function that accept an async closures and returns an async result
- [from_str](src/from_str.rs) Thou shall not implement From\<str'> but instead implement the FromStr trait
- [graphemes](src/graphemes.rs) Trim an unicode string to a maximum length with

Expand Down
6 changes: 6 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ test-all *ARGS: check-dependencies
test-watch-all *ARGS: check-dependencies
cargo watch -c -w . -x "nextest run --verbose {{ ARGS }}"

# run all tests with nextest in watch mode, use the cranelift compiler to reduce incremental build times
test-watch-all-cranelift *ARGS: check-dependencies
rustup override set nightly
export RUSTFLAGS="${RUSTFLAGS} -Zthreads=8"
CARGO_PROFILE_DEV_CODEGEN_BACKEND=cranelift cargo watch -q -c --ignore '**/generated_at_build.rs' -w . -x "+nightly nextest run -Zcodegen-backend --all-features --verbose {{ARGS}}"

# build the project in release mode
build-release:
cargo build --release --verbose
Expand Down
46 changes: 46 additions & 0 deletions src/async_higher_order_fn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#![allow(dead_code, unused)]

use std::{error::Error, future::Future, pin::Pin};

/// function that accepts an async function handler
fn higher_order_function(f: FunctionHandler, data: Data) -> Pin<Box<dyn Future<Output = Response>>> {
f(data)
}

pub type FunctionHandler = &'static (dyn Fn(Data) -> Pin<Box<dyn Future<Output = Response>>>);
pub type Response = Result<ResponseData, Box<dyn Error>>;

fn my_function_handler(data: Data) -> Pin<Box<dyn Future<Output = Response>>> {

Check warning on line 13 in src/async_higher_order_fn.rs

View workflow job for this annotation

GitHub Actions / validate

this argument is passed by value, but not consumed in the function body
Box::pin(async move { async_dummy().await })
}

/// Dummy async function to demonstrate calling a async function from a async higher order function
async fn async_dummy() -> Result<ResponseData, Box<dyn Error>> {

Check warning on line 18 in src/async_higher_order_fn.rs

View workflow job for this annotation

GitHub Actions / validate

unused `async` for function with no await statements
Ok(ResponseData { processed: true })
}

pub struct Data {
title: String,
}

pub struct ResponseData {
processed: bool,
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_call_it() {
let response = higher_order_function(
&my_function_handler,
Data {
title: "Hello".to_string(),
},
)
.await;
assert!(response.is_ok());
assert!(response.unwrap().processed);
}
}
105 changes: 105 additions & 0 deletions src/async_structured_concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use futures::Stream;
use futures::{pin_mut, StreamExt};
use std::error::Error;
use std::sync::RwLock;

/// Minimal executable example of structured concurrency in Rust as eloquently described in this [blog
/// post](https://emschwartz.me/async-rust-can-be-a-pleasure-to-work-with-without-send-sync-static)
/// of Evan Schwartz.
/// This example proceses 10 incoming demo requests. Each incoming webrequest is processed in a
/// separate scope from the moro crate, similar to an std::thread::scope.

Check warning on line 10 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

item in documentation is missing backticks
/// With this approach, we don't need types that are Send like Arc to share the database and
/// service dependencies, and no async move is needed. This greatly improves the developer UX.
/// The test_demo test is configured to use the Tokio current thread runtime to emulate thread-per-core.

Check warning on line 13 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

item in documentation is missing backticks
/// The spawned tasks within the inner moro scope are executed within the same thread, these tasks are
/// not moved between threads and therefore the called future doees not need to be Send.
pub async fn structured_concurrency_demo(incoming: impl Stream<Item = Request>) -> Result<(), Box<dyn Error>> {

Check warning on line 16 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

docs for function which may panic missing `# Panics` section

Check warning on line 16 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

docs for function returning `Result` missing `# Errors` section

Check warning on line 16 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

future cannot be sent between threads safely
let context = Context::default();
pin_mut!(incoming);

moro_local::async_scope!(|scope| {
while let Some(request) = incoming.next().await {
let _response = scope
.spawn(async {
let request = request;
moro_local::async_scope!(|scope| {
let Ok(two_things) = context.db.load_two_things().await else {
return Err(());
};
for _ in two_things {
// these two tasks are executed within the scope and are awaited at the end of the
// inner scope
scope.spawn(context.service_a.do_something(&request, &context.requests_processed));
}
let result_b = context.service_b.do_something(&request, &context.requests_processed);
let result_c = context.service_c.do_something(&request, &context.requests_processed);
let (b, c) = futures::try_join!(result_b, result_c).map_err(|_| ())?;
Ok::<_, ()>(Response { b, c })
})
.await
})
.await;
}
})
.await;

println!("do_something executed {} times", *context.requests_processed.read().unwrap());

Ok(())
}

#[cfg(test)]
mod tests {
pub use super::*;

/// View the test output with `just test-watch-all test_demo --success-output immediate`
#[tokio::test(flavor = "current_thread")]
async fn test_demo() -> Result<(), Box<dyn Error>> {
let incoming = futures::stream::iter((1..=10).map(|_| Request));
structured_concurrency_demo(incoming).await
}
}

#[derive(Default)]
pub struct Thing;
#[derive(Default)]
pub struct Request;
#[derive(Default)]
pub struct Database;
#[derive(Default)]
#[allow(dead_code)]
pub struct Response {
b: ServiceResult,
c: ServiceResult,
}

impl Database {
async fn load_two_things(&self) -> Result<Vec<Thing>, Box<dyn Error>> {

Check warning on line 77 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

unused `async` for function with no await statements
Ok((1..=2).map(|_| Thing).collect())
}
}

#[derive(Default)]
pub struct Service;
impl Service {
/// simulate a task that takes 200ms
async fn do_something(&self, _request: &Request, requests_processed: &RwLock<usize>) -> Result<ServiceResult, Box<dyn Error>> {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;

let mut count = requests_processed.write().unwrap();

Check warning on line 89 in src/async_structured_concurrency.rs

View workflow job for this annotation

GitHub Actions / validate

temporary with significant `Drop` can be early dropped
*count += 1;

Ok(ServiceResult)
}
}
#[derive(Default)]
pub struct ServiceResult;

#[derive(Default)]
struct Context {
db: Database,
service_a: Service,
service_b: Service,
service_c: Service,
requests_processed: RwLock<usize>,
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod async_higher_order_fn;
pub mod async_structured_concurrency;
pub mod from_str;
pub mod graphemes;
pub mod mutate_in_closure;
Expand Down

0 comments on commit 5b6d823

Please sign in to comment.