From 9ca4c0fb3da1d4fdbd64c252ff94a85a32eab526 Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Fri, 10 Jan 2025 03:02:26 +0300 Subject: [PATCH] address comments by Brennan --- Cargo.lock | 10 +- thread-manager/Cargo.toml | 2 + thread-manager/README.md | 23 +-- thread-manager/examples/common/mod.rs | 131 +++++++++++++++++ .../examples/core_contention_basics.rs | 117 ++++----------- .../examples/core_contention_sweep.rs | 133 ++++-------------- thread-manager/src/lib.rs | 8 +- thread-manager/src/tokio_runtime.rs | 4 +- 8 files changed, 209 insertions(+), 219 deletions(-) create mode 100644 thread-manager/examples/common/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 4b95422bb332f3..6803e7c568f22e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -265,6 +265,7 @@ dependencies = [ "anyhow", "axum 0.7.9", "env_logger", + "hyper 0.14.32", "log", "num_cpus", "rayon", @@ -274,6 +275,7 @@ dependencies = [ "thread-priority", "tokio", "toml 0.8.12", + "tower 0.5.2", ] [[package]] @@ -880,7 +882,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.2", "tokio", - "tower 0.5.1", + "tower 0.5.2", "tower-layer", "tower-service", "tracing", @@ -11631,14 +11633,14 @@ dependencies = [ [[package]] name = "tower" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "sync_wrapper 0.1.2", + "sync_wrapper 1.0.2", "tokio", "tower-layer", "tower-service", diff --git a/thread-manager/Cargo.toml b/thread-manager/Cargo.toml index 0c760371ddfe0e..c104a215a5530d 100644 --- a/thread-manager/Cargo.toml +++ b/thread-manager/Cargo.toml @@ -20,6 +20,7 @@ serde = { workspace = true, features = ["derive"] } solana-metrics = { workspace = true } thread-priority = "1.2.0" tokio = { workspace = true, features = ["time", "rt-multi-thread"] } +tower = "0.5.2" [target.'cfg(target_os = "linux")'.dependencies] affinity = "0.1.2" @@ -27,5 +28,6 @@ affinity = "0.1.2" [dev-dependencies] axum = "0.7.9" env_logger = { workspace = true } +hyper = { workspace = true, features = ["http1", "client", "stream", "tcp"] } serde_json = { workspace = true } toml = { workspace = true } diff --git a/thread-manager/README.md b/thread-manager/README.md index 4e206870dafdc0..f6ac21d434ea69 100644 --- a/thread-manager/README.md +++ b/thread-manager/README.md @@ -1,10 +1,12 @@ # thread-manager -Balances machine resources between multiple threaded runtimes. The purpose is to manage thread contention -between different parts of the code that may -benefit from a diverse set of management options. For example, we may want to have cores 1-4 handling -networking via Tokio, core 5 handling file IO via Tokio, cores 9-16 hallocated for Rayon thread pool, -and cores 6-8 available for general use by std::thread. This will minimize contention for CPU caches -and context switches that would occur if Rayon was entirely unaware it was running side-by-side with +Balances machine resources between multiple threaded runtimes. +The purpose is to manage thread contention between different parts +of the code that may benefit from a diverse set of management options. +For example, we may want to have cores 1-4 handling networking via +Tokio, core 5 handling file IO via Tokio, cores 9-16 allocated for +Rayon thread pool, and cores 6-8 available for general use by std::thread. +This will minimize contention for CPU caches and context switches that +would occur if Rayon was entirely unaware it was running side-by-side with tokio, and each was to spawn as many threads as there are cores. # Supported threading models @@ -22,7 +24,7 @@ If you want you can set thread scheduling policy and priority. Keep in mind that ```bash sudo setcap cap_sys_nice+ep ``` -or root priviledges to run the resulting process. +or root privileges to run the resulting process. To see which policies are supported check (the sources)[./src/policy.rs] If you use realtime policies, priority to values from 1 (lowest) to 99 (highest) are possible. @@ -31,7 +33,7 @@ Multiple tokio runtimes can be created, and each may be assigned its own pool of Number of worker and blocking threads is configurable, as are thread priorities for the pool. ## Native -Native threads (std::thread) can be spawned from managed pools, this allows them to inheirt a particular +Native threads (std::thread) can be spawned from managed pools, this allows them to inherit a particular affinity from the pool, as well as to control the total number of threads made in every pool. @@ -44,7 +46,8 @@ one may want to spawn many rayon pools. * Thread pools can only be created at process startup * Once thread pool is created, its policy can not be modified at runtime - * Thread affinity not supported outside of linux + * Thread affinity & priority are not supported outside of linux + * Thread priority generally requires kernel level support and extra capabilities # TODO: @@ -55,7 +58,7 @@ one may want to spawn many rayon pools. # Examples -All examples need wrk for workload generation. Please install it before running. +All examples need `wrk` HTTP behnchmarking tool for load generation. Please install it before running. * core_contention_basics will demonstrate why core contention is bad, and how thread configs can help * core_contention_sweep will sweep across a range of core counts to show how benefits scale with core counts diff --git a/thread-manager/examples/common/mod.rs b/thread-manager/examples/common/mod.rs new file mode 100644 index 00000000000000..7b3608a9e7432e --- /dev/null +++ b/thread-manager/examples/common/mod.rs @@ -0,0 +1,131 @@ +use { + hyper::{Body, Request}, + log::info, + std::{ + future::IntoFuture, + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::{Duration, Instant}, + }, + tokio::{net::TcpStream, sync::oneshot::Sender, time::timeout}, + tower::ServiceExt, +}; +const TEST_SECONDS: u64 = 10; + +pub async fn axum_main(port: u16, ready: Sender<()>) { + use axum::{routing::get, Router}; + // basic handler that responds with a static string + async fn root() -> &'static str { + tokio::time::sleep(Duration::from_millis(1)).await; + "Hello, World!" + } + + // build our application with a route + let app = Router::new().route("/", get(root)); + + // run our app with hyper, listening globally on port 3000 + let listener = + tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port)) + .await + .unwrap(); + info!("Server on port {port} ready"); + ready.send(()).unwrap(); + let timeout = tokio::time::timeout( + Duration::from_secs(TEST_SECONDS + 1), + axum::serve(listener, app).into_future(), + ) + .await; + match timeout { + Ok(v) => { + v.unwrap(); + } + Err(_) => { + info!("Terminating server on port {port}"); + } + } +} + +#[allow(dead_code)] +#[derive(Debug)] +pub struct Stats { + pub latency_s: f32, + pub requests_per_second: f32, +} + +pub async fn workload_main(ports: &[u16], tasks: usize) -> anyhow::Result { + struct ControlBlock { + start_time: std::time::Instant, + requests: AtomicUsize, + cumulative_latency_us: AtomicUsize, + } + + let cb = Arc::new(ControlBlock { + start_time: std::time::Instant::now(), + requests: AtomicUsize::new(0), + cumulative_latency_us: AtomicUsize::new(0), + }); + + async fn connection(port: u16, control_block: Arc) -> anyhow::Result<()> { + let sa = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port); + let stream = TcpStream::connect(sa).await?; + + let (mut request_sender, connection) = hyper::client::conn::handshake(stream).await?; + // spawn a task to poll the connection and drive the HTTP state + tokio::spawn(async move { + if let Err(_e) = connection.await { + //eprintln!("Error in connection: {}", e); + } + }); + + let path = "/"; + while control_block.start_time.elapsed() < Duration::from_secs(TEST_SECONDS) { + let req = Request::builder() + .uri(path) + .method("GET") + .body(Body::from(""))?; + let start = Instant::now(); + let res = timeout(Duration::from_millis(100), request_sender.send_request(req)).await; + let res = match res { + Ok(res) => res?, + Err(_) => { + anyhow::bail!("Timeout on request!") + } + }; + let _ = res.body(); + if res.status() != 200 { + anyhow::bail!("Got error from server"); + } + + control_block + .cumulative_latency_us + .fetch_add(start.elapsed().as_micros() as usize, Ordering::Relaxed); + control_block.requests.fetch_add(1, Ordering::Relaxed); + // To send via the same connection again, it may not work as it may not be ready, + // so we have to wait until the request_sender becomes ready. + request_sender.ready().await?; + } + Ok(()) + } + + let mut join_handles = vec![]; + for port in ports { + info!("Starting load generation on port {port}"); + for _t in 0..tasks { + let jh = tokio::task::spawn(connection(*port, cb.clone())); + join_handles.push(jh); + } + } + for jh in join_handles { + let _ = jh.await?; //Ignore errors since we do not care about reasons here + } + let requests = cb.requests.load(Ordering::Relaxed); + let latency_accumulator_us = cb.cumulative_latency_us.load(Ordering::Relaxed); + Ok(Stats { + requests_per_second: requests as f32 / TEST_SECONDS as f32, + #[allow(clippy::arithmetic_side_effects)] + latency_s: (latency_accumulator_us / requests) as f32 / 1e6, + }) +} diff --git a/thread-manager/examples/core_contention_basics.rs b/thread-manager/examples/core_contention_basics.rs index 241866ee0c32a3..a28026b8962b0b 100644 --- a/thread-manager/examples/core_contention_basics.rs +++ b/thread-manager/examples/core_contention_basics.rs @@ -1,44 +1,12 @@ use { agave_thread_manager::*, - log::{debug, info}, - std::{ - future::IntoFuture, - io::{Read, Write}, - net::{IpAddr, Ipv4Addr, SocketAddr}, - path::PathBuf, - time::Duration, - }, + log::info, + std::{io::Read, path::PathBuf, time::Duration}, + tokio::sync::oneshot, }; -async fn axum_main(port: u16) { - use axum::{routing::get, Router}; - - // basic handler that responds with a static string - async fn root() -> &'static str { - tokio::time::sleep(Duration::from_millis(1)).await; - "Hello, World!" - } - - // build our application with a route - let app = Router::new().route("/", get(root)); - - // run our app with hyper, listening globally on port 3000 - let listener = - tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port)) - .await - .unwrap(); - let timeout = tokio::time::timeout( - Duration::from_secs(11), - axum::serve(listener, app).into_future(), - ) - .await; - match timeout { - Ok(v) => v.unwrap(), - Err(_) => { - info!("Terminating server on port {port}"); - } - } -} +mod common; +use common::*; fn main() -> anyhow::Result<()> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); @@ -62,16 +30,31 @@ fn main() -> anyhow::Result<()> { let tokio2 = manager.get_tokio("axum2"); tokio2.start_metrics_sampling(Duration::from_secs(1)); - let wrk_cores: Vec<_> = (32..64).collect(); + let workload_runtime = TokioRuntime::new( + "LoadGenerator".to_owned(), + TokioConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { min: 32, max: 64 }, + ..Default::default() + }, + )?; + let results = std::thread::scope(|scope| { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + scope.spawn(|| { - tokio1.tokio.block_on(axum_main(8888)); + tokio1.tokio.block_on(axum_main(8888, tx1)); }); scope.spawn(|| { - tokio2.tokio.block_on(axum_main(8889)); + tokio2.tokio.block_on(axum_main(8889, tx2)); }); + + // Wait for axum servers to start + rx1.blocking_recv().unwrap(); + rx2.blocking_recv().unwrap(); + let join_handle = - scope.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap()); + scope.spawn(|| workload_runtime.block_on(workload_main(&[8888, 8889], 1000))); join_handle.join().expect("WRK crashed!") }); //print out the results of the bench run @@ -79,55 +62,3 @@ fn main() -> anyhow::Result<()> { } Ok(()) } - -fn run_wrk( - ports: &[u16], - cpus: &[usize], - threads: usize, - connections: usize, -) -> anyhow::Result<(Vec, Vec)> { - let mut script = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - script.push("examples/report.lua"); - let cpus: Vec = cpus.iter().map(|c| c.to_string()).collect(); - let cpus = cpus.join(","); - - let mut children: Vec<_> = ports - .iter() - .map(|p| { - std::process::Command::new("taskset") - .arg("-c") - .arg(&cpus) - .arg("wrk") - .arg(format!("http://localhost:{}", p)) - .arg("-d10") - .arg(format!("-s{}", script.to_str().unwrap())) - .arg(format!("-t{threads}")) - .arg(format!("-c{connections}")) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn() - .unwrap() - }) - .collect(); - - use std::str; - let outs = children.drain(..).map(|c| c.wait_with_output().unwrap()); - let mut all_latencies = vec![]; - let mut all_rps = vec![]; - for (out, port) in outs.zip(ports.iter()) { - debug!("========================="); - std::io::stdout().write_all(&out.stderr)?; - let res = str::from_utf8(&out.stdout)?; - let mut res = res.lines().last().unwrap().split(' '); - - let latency_us: u64 = res.next().unwrap().parse()?; - let latency = Duration::from_micros(latency_us); - - let requests: usize = res.next().unwrap().parse()?; - let rps = requests as f32 / 10.0; - debug!("WRK results for port {port}: {latency:?} {rps}"); - all_latencies.push(Duration::from_micros(latency_us)); - all_rps.push(rps); - } - Ok((all_latencies, all_rps)) -} diff --git a/thread-manager/examples/core_contention_sweep.rs b/thread-manager/examples/core_contention_sweep.rs index 30b8011217e940..db3ee496f5b137 100644 --- a/thread-manager/examples/core_contention_sweep.rs +++ b/thread-manager/examples/core_contention_sweep.rs @@ -1,45 +1,13 @@ use { agave_thread_manager::*, - log::{debug, info}, - std::{ - collections::HashMap, - future::IntoFuture, - io::Write, - net::{IpAddr, Ipv4Addr, SocketAddr}, - path::PathBuf, - time::Duration, - }, + log::info, + std::{collections::HashMap, time::Duration}, + tokio::sync::oneshot, }; -async fn axum_main(port: u16) { - use axum::{routing::get, Router}; - // basic handler that responds with a static string - async fn root() -> &'static str { - tokio::time::sleep(Duration::from_millis(1)).await; - "Hello, World!" - } - - // build our application with a route - let app = Router::new().route("/", get(root)); +mod common; +use common::*; - // run our app with hyper, listening globally on port 3000 - let listener = - tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port)) - .await - .unwrap(); - info!("Server on port {port} ready"); - let timeout = tokio::time::timeout( - Duration::from_secs(11), - axum::serve(listener, app).into_future(), - ) - .await; - match timeout { - Ok(v) => v.unwrap(), - Err(_) => { - info!("Terminating server on port {port}"); - } - } -} fn make_config_shared(cc: usize) -> ThreadManagerConfig { let tokio_cfg_1 = TokioConfig { core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: cc }, @@ -94,7 +62,7 @@ impl Regime { #[derive(Debug, Default, serde::Serialize)] struct Results { latencies_s: Vec, - rps: Vec, + requests_per_second: Vec, } fn main() -> anyhow::Result<()> { @@ -121,34 +89,44 @@ fn main() -> anyhow::Result<()> { } }; - let wrk_cores: Vec<_> = (32..64).collect(); + let workload_runtime = TokioRuntime::new( + "LoadGenerator".to_owned(), + TokioConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { min: 32, max: 64 }, + ..Default::default() + }, + )?; let measurement = std::thread::scope(|s| { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); s.spawn(|| { tokio1.start_metrics_sampling(Duration::from_secs(1)); - tokio1.tokio.block_on(axum_main(8888)); + tokio1.tokio.block_on(axum_main(8888, tx1)); }); let jh = match regime { Regime::Single => s.spawn(|| { - run_wrk(&[8888, 8888], &wrk_cores, wrk_cores.len(), 3000).unwrap() + rx1.blocking_recv().unwrap(); + workload_runtime.block_on(workload_main(&[8888, 8888], 3000)) }), _ => { s.spawn(|| { tokio2.start_metrics_sampling(Duration::from_secs(1)); - tokio2.tokio.block_on(axum_main(8889)); + tokio2.tokio.block_on(axum_main(8889, tx2)); }); s.spawn(|| { - run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 3000).unwrap() + rx1.blocking_recv().unwrap(); + rx2.blocking_recv().unwrap(); + workload_runtime.block_on(workload_main(&[8888, 8889], 3000)) }) } }; - jh.join().expect("WRK crashed!") - }); + jh.join().expect("Some of the threads crashed!") + })?; info!("Results are: {:?}", measurement); - results.latencies_s.push( - measurement.0.iter().map(|a| a.as_secs_f32()).sum::() - / measurement.0.len() as f32, - ); - results.rps.push(measurement.1.iter().sum()); + results.latencies_s.push(measurement.latency_s); + results + .requests_per_second + .push(measurement.requests_per_second); } all_results.insert(format!("{regime:?}"), results); std::thread::sleep(Duration::from_secs(3)); @@ -159,58 +137,3 @@ fn main() -> anyhow::Result<()> { Ok(()) } - -fn run_wrk( - ports: &[u16], - cpus: &[usize], - threads: usize, - connections: usize, -) -> anyhow::Result<(Vec, Vec)> { - //Sleep a bit to let axum start - std::thread::sleep(Duration::from_millis(500)); - - let mut script = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - script.push("examples/report.lua"); - let cpus: Vec = cpus.iter().map(|c| c.to_string()).collect(); - let cpus = cpus.join(","); - - let mut children: Vec<_> = ports - .iter() - .map(|p| { - std::process::Command::new("taskset") - .arg("-c") - .arg(&cpus) - .arg("wrk") - .arg(format!("http://localhost:{}", p)) - .arg("-d10") - .arg(format!("-s{}", script.to_str().unwrap())) - .arg(format!("-t{threads}")) - .arg(format!("-c{connections}")) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn() - .unwrap() - }) - .collect(); - - use std::str; - let outs = children.drain(..).map(|c| c.wait_with_output().unwrap()); - let mut all_latencies = vec![]; - let mut all_rps = vec![]; - for (out, port) in outs.zip(ports.iter()) { - debug!("========================="); - std::io::stdout().write_all(&out.stderr)?; - let res = str::from_utf8(&out.stdout)?; - let mut res = res.lines().last().unwrap().split(' '); - - let latency_us: u64 = res.next().unwrap().parse()?; - let latency = Duration::from_micros(latency_us); - - let requests: usize = res.next().unwrap().parse()?; - let rps = requests as f32 / 10.0; - debug!("WRK results for port {port}: {latency:?} {rps}"); - all_latencies.push(Duration::from_micros(latency_us)); - all_rps.push(rps); - } - Ok((all_latencies, all_rps)) -} diff --git a/thread-manager/src/lib.rs b/thread-manager/src/lib.rs index 8ad1a9e951e301..7537ed5030a411 100644 --- a/thread-manager/src/lib.rs +++ b/thread-manager/src/lib.rs @@ -210,7 +210,7 @@ mod tests { }; #[test] - fn configtest() { + fn test_config_files() { let experiments = [ "examples/core_contention_dedicated_set.toml", "examples/core_contention_contending_set.toml", @@ -237,7 +237,7 @@ mod tests { } /* #[test] - fn thread_priority() { + fn test_thread_priority() { let priority_high = 10; let priority_default = crate::policy::DEFAULT_PRIORITY; let priority_low = 1; @@ -313,7 +313,7 @@ mod tests { #[cfg(target_os = "linux")] #[test] - fn process_affinity() { + fn test_process_affinity() { let conf = ThreadManagerConfig { native_configs: HashMap::from([( "pool1".to_owned(), @@ -354,7 +354,7 @@ mod tests { #[cfg(target_os = "linux")] #[test] - fn rayon_affinity() { + fn test_rayon_affinity() { let conf = ThreadManagerConfig { rayon_configs: HashMap::from([( "test".to_owned(), diff --git a/thread-manager/src/tokio_runtime.rs b/thread-manager/src/tokio_runtime.rs index 6e2101c6fde2f6..2ea48be127a527 100644 --- a/thread-manager/src/tokio_runtime.rs +++ b/thread-manager/src/tokio_runtime.rs @@ -116,9 +116,7 @@ impl TokioRuntime { let _tid = cur_thread .get_native_id() .expect("Can not get thread id for newly created thread"); - // todo - tracing - //let tname = cur_thread.name().unwrap(); - //println!("thread {tname} id {tid} started"); + apply_policy( &c.core_allocation, parse_policy(&c.policy),