Skip to content

Commit

Permalink
address comments by Brennan
Browse files Browse the repository at this point in the history
  • Loading branch information
alexpyattaev committed Jan 10, 2025
1 parent 185ad7f commit 020d88f
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 215 deletions.
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions thread-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ serde = { workspace = true, features = ["derive"] }
solana-metrics = { workspace = true }
thread-priority = "1.2.0"
tokio = { workspace = true, features = ["time", "rt-multi-thread"] }
tower = "0.5.2"

[target.'cfg(target_os = "linux")'.dependencies]
affinity = "0.1.2"

[dev-dependencies]
axum = "0.7.9"
env_logger = { workspace = true }
hyper = { workspace = true, features = ["http1", "client", "stream", "tcp"] }
serde_json = { workspace = true }
toml = { workspace = true }
23 changes: 13 additions & 10 deletions thread-manager/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# thread-manager
Balances machine resources between multiple threaded runtimes. The purpose is to manage thread contention
between different parts of the code that may
benefit from a diverse set of management options. For example, we may want to have cores 1-4 handling
networking via Tokio, core 5 handling file IO via Tokio, cores 9-16 hallocated for Rayon thread pool,
and cores 6-8 available for general use by std::thread. This will minimize contention for CPU caches
and context switches that would occur if Rayon was entirely unaware it was running side-by-side with
Balances machine resources between multiple threaded runtimes.
The purpose is to manage thread contention between different parts
of the code that may benefit from a diverse set of management options.
For example, we may want to have cores 1-4 handling networking via
Tokio, core 5 handling file IO via Tokio, cores 9-16 allocated for
Rayon thread pool, and cores 6-8 available for general use by std::thread.
This will minimize contention for CPU caches and context switches that
would occur if Rayon was entirely unaware it was running side-by-side with
tokio, and each was to spawn as many threads as there are cores.

# Supported threading models
Expand All @@ -22,7 +24,7 @@ If you want you can set thread scheduling policy and priority. Keep in mind that
```bash
sudo setcap cap_sys_nice+ep
```
or root priviledges to run the resulting process.
or root privileges to run the resulting process.
To see which policies are supported check (the sources)[./src/policy.rs]
If you use realtime policies, priority to values from 1 (lowest) to 99 (highest) are possible.

Expand All @@ -31,7 +33,7 @@ Multiple tokio runtimes can be created, and each may be assigned its own pool of
Number of worker and blocking threads is configurable, as are thread priorities for the pool.

## Native
Native threads (std::thread) can be spawned from managed pools, this allows them to inheirt a particular
Native threads (std::thread) can be spawned from managed pools, this allows them to inherit a particular
affinity from the pool, as well as to
control the total number of threads made in every pool.

Expand All @@ -44,7 +46,8 @@ one may want to spawn many rayon pools.

* Thread pools can only be created at process startup
* Once thread pool is created, its policy can not be modified at runtime
* Thread affinity not supported outside of linux
* Thread affinity & priority are not supported outside of linux
* Thread priority generally requires kernel level support and extra capabilities

# TODO:

Expand All @@ -55,7 +58,7 @@ one may want to spawn many rayon pools.


# Examples
All examples need wrk for workload generation. Please install it before running.
All examples need `wrk` HTTP behnchmarking tool for load generation. Please install it before running.

* core_contention_basics will demonstrate why core contention is bad, and how thread configs can help
* core_contention_sweep will sweep across a range of core counts to show how benefits scale with core counts
131 changes: 131 additions & 0 deletions thread-manager/examples/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use {
hyper::{Body, Request},
log::info,
std::{
future::IntoFuture,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
},
tokio::{net::TcpStream, sync::oneshot::Sender, time::timeout},
tower::ServiceExt,
};
const TEST_SECONDS: u64 = 10;

pub async fn axum_main(port: u16, ready: Sender<()>) {
use axum::{routing::get, Router};
// basic handler that responds with a static string
async fn root() -> &'static str {
tokio::time::sleep(Duration::from_millis(1)).await;
"Hello, World!"
}

// build our application with a route
let app = Router::new().route("/", get(root));

// run our app with hyper, listening globally on port 3000
let listener =
tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))
.await
.unwrap();
info!("Server on port {port} ready");
ready.send(()).unwrap();
let timeout = tokio::time::timeout(
Duration::from_secs(TEST_SECONDS + 1),
axum::serve(listener, app).into_future(),
)
.await;
match timeout {
Ok(v) => {
v.unwrap();
}
Err(_) => {
info!("Terminating server on port {port}");
}
}
}

#[allow(dead_code)]
#[derive(Debug)]
pub struct Stats {
pub latency_s: f32,
pub requests_per_second: f32,
}

pub async fn workload_main(ports: &[u16], tasks: usize) -> anyhow::Result<Stats> {
struct ControlBlock {
start_time: std::time::Instant,
requests: AtomicUsize,
cumulative_latency_us: AtomicUsize,
}

let cb = Arc::new(ControlBlock {
start_time: std::time::Instant::now(),
requests: AtomicUsize::new(0),
cumulative_latency_us: AtomicUsize::new(0),
});

async fn connection(port: u16, control_block: Arc<ControlBlock>) -> anyhow::Result<()> {
let sa = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
let stream = TcpStream::connect(sa).await?;

let (mut request_sender, connection) = hyper::client::conn::handshake(stream).await?;
// spawn a task to poll the connection and drive the HTTP state
tokio::spawn(async move {
if let Err(_e) = connection.await {
//eprintln!("Error in connection: {}", e);
}
});

let path = "/";
while control_block.start_time.elapsed() < Duration::from_secs(TEST_SECONDS) {
let req = Request::builder()
.uri(path)
.method("GET")
.body(Body::from(""))?;
let start = Instant::now();
let res = timeout(Duration::from_millis(100), request_sender.send_request(req)).await;
let res = match res {
Ok(res) => res?,
Err(_) => {
anyhow::bail!("Timeout on request!")
}
};
let _ = res.body();
if res.status() != 200 {
anyhow::bail!("Got error from server");
}

control_block
.cumulative_latency_us
.fetch_add(start.elapsed().as_micros() as usize, Ordering::Relaxed);
control_block.requests.fetch_add(1, Ordering::Relaxed);
// To send via the same connection again, it may not work as it may not be ready,
// so we have to wait until the request_sender becomes ready.
request_sender.ready().await?;
}
Ok(())
}

let mut join_handles = vec![];
for port in ports {
info!("Starting load generation on port {port}");
for _t in 0..tasks {
let jh = tokio::task::spawn(connection(*port, cb.clone()));
join_handles.push(jh);
}
}
for jh in join_handles {
let _ = jh.await?; //Ignore errors since we do not care about reasons here
}
let requests = cb.requests.load(Ordering::Relaxed);
let latency_accumulator_us = cb.cumulative_latency_us.load(Ordering::Relaxed);
Ok(Stats {
requests_per_second: requests as f32 / TEST_SECONDS as f32,
#[allow(clippy::arithmetic_side_effects)]
latency_s: (latency_accumulator_us / requests) as f32 / 1e6,
})
}
117 changes: 24 additions & 93 deletions thread-manager/examples/core_contention_basics.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,12 @@
use {
agave_thread_manager::*,
log::{debug, info},
std::{
future::IntoFuture,
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
},
log::info,
std::{io::Read, path::PathBuf, time::Duration},
tokio::sync::oneshot,
};

async fn axum_main(port: u16) {
use axum::{routing::get, Router};

// basic handler that responds with a static string
async fn root() -> &'static str {
tokio::time::sleep(Duration::from_millis(1)).await;
"Hello, World!"
}

// build our application with a route
let app = Router::new().route("/", get(root));

// run our app with hyper, listening globally on port 3000
let listener =
tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))
.await
.unwrap();
let timeout = tokio::time::timeout(
Duration::from_secs(11),
axum::serve(listener, app).into_future(),
)
.await;
match timeout {
Ok(v) => v.unwrap(),
Err(_) => {
info!("Terminating server on port {port}");
}
}
}
mod common;
use common::*;

fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
Expand All @@ -62,72 +30,35 @@ fn main() -> anyhow::Result<()> {
let tokio2 = manager.get_tokio("axum2");
tokio2.start_metrics_sampling(Duration::from_secs(1));

let wrk_cores: Vec<_> = (32..64).collect();
let workload_runtime = TokioRuntime::new(
"LoadGenerator".to_owned(),
TokioConfig {
core_allocation: CoreAllocation::DedicatedCoreSet { min: 32, max: 64 },
..Default::default()
},
)?;

let results = std::thread::scope(|scope| {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

scope.spawn(|| {
tokio1.tokio.block_on(axum_main(8888));
tokio1.tokio.block_on(axum_main(8888, tx1));
});
scope.spawn(|| {
tokio2.tokio.block_on(axum_main(8889));
tokio2.tokio.block_on(axum_main(8889, tx2));
});

// Wait for axum servers to start
rx1.blocking_recv().unwrap();
rx2.blocking_recv().unwrap();

let join_handle =
scope.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap());
scope.spawn(|| workload_runtime.block_on(workload_main(&[8888, 8889], 1000)));
join_handle.join().expect("WRK crashed!")
});
//print out the results of the bench run
println!("Results are: {:?}", results);
}
Ok(())
}

fn run_wrk(
ports: &[u16],
cpus: &[usize],
threads: usize,
connections: usize,
) -> anyhow::Result<(Vec<Duration>, Vec<f32>)> {
let mut script = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
script.push("examples/report.lua");
let cpus: Vec<String> = cpus.iter().map(|c| c.to_string()).collect();
let cpus = cpus.join(",");

let mut children: Vec<_> = ports
.iter()
.map(|p| {
std::process::Command::new("taskset")
.arg("-c")
.arg(&cpus)
.arg("wrk")
.arg(format!("http://localhost:{}", p))
.arg("-d10")
.arg(format!("-s{}", script.to_str().unwrap()))
.arg(format!("-t{threads}"))
.arg(format!("-c{connections}"))
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap()
})
.collect();

use std::str;
let outs = children.drain(..).map(|c| c.wait_with_output().unwrap());
let mut all_latencies = vec![];
let mut all_rps = vec![];
for (out, port) in outs.zip(ports.iter()) {
debug!("=========================");
std::io::stdout().write_all(&out.stderr)?;
let res = str::from_utf8(&out.stdout)?;
let mut res = res.lines().last().unwrap().split(' ');

let latency_us: u64 = res.next().unwrap().parse()?;
let latency = Duration::from_micros(latency_us);

let requests: usize = res.next().unwrap().parse()?;
let rps = requests as f32 / 10.0;
debug!("WRK results for port {port}: {latency:?} {rps}");
all_latencies.push(Duration::from_micros(latency_us));
all_rps.push(rps);
}
Ok((all_latencies, all_rps))
}
Loading

0 comments on commit 020d88f

Please sign in to comment.