Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread manager crate to agave #3890

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
291 changes: 248 additions & 43 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ members = [
"svm-transaction",
"test-validator",
"thin-client",
"thread-manager",
"timings",
"tls-utils",
"tokens",
Expand Down Expand Up @@ -457,6 +458,7 @@ solana-bucket-map = { path = "bucket_map", version = "=2.2.0" }
solana-builtins = { path = "builtins", version = "=2.2.0" }
solana-builtins-default-costs = { path = "builtins-default-costs", version = "=2.2.0" }
agave-cargo-registry = { path = "cargo-registry", version = "=2.2.0" }
agave-thread-manager = { path = "thread-manager", version = "=2.2.0" }
solana-clap-utils = { path = "clap-utils", version = "=2.2.0" }
solana-clap-v3-utils = { path = "clap-v3-utils", version = "=2.2.0" }
solana-cli = { path = "cli", version = "=2.2.0" }
Expand Down
38 changes: 38 additions & 0 deletions thread-manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[package]
name = "agave-thread-manager"
description = "Thread pool manager for agave"

version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

publish = false

[dependencies]
anyhow = { workspace = true }
cfg-if = "1.0.0"
log = { workspace = true }
num_cpus = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true, features = ["derive"] }
solana-metrics = { workspace = true }
thread-priority = "1.2.0"
tokio = { workspace = true, features = ["time", "rt-multi-thread"] }
tower = "0.5.2"

[target.'cfg(target_os = "linux")'.dependencies]
affinity = "0.1.2"

[dev-dependencies]
agave-thread-manager = { path = ".", features = ["dev-context-only-utils"] }
axum = "0.7.9"
env_logger = { workspace = true }
hyper = { workspace = true, features = ["http1", "client", "stream", "tcp"] }
serde_json = { workspace = true }
toml = { workspace = true }

[features]
dev-context-only-utils = []
72 changes: 72 additions & 0 deletions thread-manager/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# thread-manager
Balances machine resources between multiple threaded runtimes.
The purpose is to manage thread contention between different parts
of the code that may benefit from a diverse set of management options.
For example, we may want to have cores 1-4 handling networking via
Tokio, core 5 handling file IO via Tokio, cores 9-16 allocated for
Rayon thread pool, and cores 6-8 available for general use by std::thread.
This will minimize contention for CPU caches and context switches that
would occur if Rayon was entirely unaware it was running side-by-side with
tokio, and each was to spawn as many threads as there are cores.

## Thread pool mapping
Thread manager will, by default, look for a particular named pool, e.g. "solGossip".
Matching is done independently for each type of runtime.
However, if no named pool is found, it will fall back to the "default" thread pool
of the same type (if specified in the config). If the default pool is not specified,
thread pool lookup will fail.

Multiple names can point to the same pool. For example, "solGossipConsume" and
"solSigverify" can both be executed on the same rayon pool named "rayonSigverify".
This, in principle, allows some degree of runtime sharing between different crates
in the codebase without having to manually patch the pointers through.

# Supported threading models
## Affinity
All threading models allow setting core affinity, but only on linux

For core affinity you can set e.g.
```toml
core_allocation.DedicatedCoreSet = { min = 16, max = 64 }
```
to pin the pool to cores 16-64.

## Scheduling policy and priority
If you want you can set thread scheduling policy and priority. Keep in mind that this will likely require
```bash
sudo setcap cap_sys_nice+ep

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the past we've had tuning scripts that could handle recommended configurations like this. If it makes sense, we could add this wherever relevant

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about priority stuff at this point, it is super platform specific and mostly a placeholder at the moment, as getting this past CI will be unfun (due to extra priveleges necessary)

```
or root privileges to run the resulting process.
To see which policies are supported check (the sources)[./src/policy.rs]
If you use realtime policies, priority to values from 1 (lowest) to 99 (highest) are possible.

## Tokio
Multiple tokio runtimes can be created, and each may be assigned its own pool of CPU cores to run on.
Number of worker and blocking threads is configurable, as are thread priorities for the pool.

## Native
Native threads (std::thread) can be spawned from managed pools, this allows them to inherit a particular
affinity from the pool, as well as to
control the total number of threads made in every pool.

## Rayon
Rayon already manages thread pools well enough, all thread_manager does on top is enforce affinity and
priority for rayon threads. Normally one would only ever have one rayon pool, but for priority allocations
one may want to spawn many rayon pools.

# Limitations

* Thread pools can only be created at process startup
* Once thread pool is created, its policy can not be modified at runtime
* Thread affinity & priority are not supported outside of linux
* Thread priority generally requires kernel level support and extra capabilities

# TODO:

* even more tests
* better thread priority support


# Examples
* core_contention_basics will demonstrate why core contention is bad, and how thread configs can help
* core_contention_sweep will sweep across a range of core counts to show how benefits scale with core counts
131 changes: 131 additions & 0 deletions thread-manager/examples/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use {
hyper::{Body, Request},
log::info,
std::{
future::IntoFuture,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
},
tokio::{net::TcpStream, sync::oneshot::Sender, time::timeout},
tower::ServiceExt,
};
const TEST_SECONDS: u64 = 10;

pub async fn axum_main(port: u16, ready: Sender<()>) {
use axum::{routing::get, Router};
// basic handler that responds with a static string
async fn root() -> &'static str {
tokio::time::sleep(Duration::from_millis(1)).await;
"Hello, World!"
}

// build our application with a route
let app = Router::new().route("/", get(root));

// run our app with hyper, listening globally on port 3000
let listener =
tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))
.await
.unwrap();
info!("Server on port {port} ready");
ready.send(()).unwrap();
let timeout = tokio::time::timeout(
Duration::from_secs(TEST_SECONDS + 1),
axum::serve(listener, app).into_future(),
)
.await;
match timeout {
Ok(v) => {
v.unwrap();
}
Err(_) => {
info!("Terminating server on port {port}");
}
}
}

#[allow(dead_code)]
#[derive(Debug)]
pub struct Stats {
pub latency_s: f32,
pub requests_per_second: f32,
}

pub async fn workload_main(ports: &[u16], tasks: usize) -> anyhow::Result<Stats> {
struct ControlBlock {
start_time: std::time::Instant,
requests: AtomicUsize,
cumulative_latency_us: AtomicUsize,
}

let cb = Arc::new(ControlBlock {
start_time: std::time::Instant::now(),
requests: AtomicUsize::new(0),
cumulative_latency_us: AtomicUsize::new(0),
});

async fn connection(port: u16, control_block: Arc<ControlBlock>) -> anyhow::Result<()> {
let sa = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
let stream = TcpStream::connect(sa).await?;

let (mut request_sender, connection) = hyper::client::conn::handshake(stream).await?;
// spawn a task to poll the connection and drive the HTTP state
tokio::spawn(async move {
if let Err(_e) = connection.await {
//eprintln!("Error in connection: {}", e);
}
});

let path = "/";
while control_block.start_time.elapsed() < Duration::from_secs(TEST_SECONDS) {
let req = Request::builder()
.uri(path)
.method("GET")
.body(Body::from(""))?;
let start = Instant::now();
let res = timeout(Duration::from_millis(100), request_sender.send_request(req)).await;
let res = match res {
Ok(res) => res?,
Err(_) => {
anyhow::bail!("Timeout on request!")
}
};
let _ = res.body();
if res.status() != 200 {
anyhow::bail!("Got error from server");
}

control_block
.cumulative_latency_us
.fetch_add(start.elapsed().as_micros() as usize, Ordering::Relaxed);
control_block.requests.fetch_add(1, Ordering::Relaxed);
// To send via the same connection again, it may not work as it may not be ready,
// so we have to wait until the request_sender becomes ready.
request_sender.ready().await?;
}
Ok(())
}

let mut join_handles = vec![];
for port in ports {
info!("Starting load generation on port {port}");
for _t in 0..tasks {
let jh = tokio::task::spawn(connection(*port, cb.clone()));
join_handles.push(jh);
}
}
for jh in join_handles {
let _ = jh.await?; //Ignore errors since we do not care about reasons here
}
let requests = cb.requests.load(Ordering::Relaxed);
let latency_accumulator_us = cb.cumulative_latency_us.load(Ordering::Relaxed);
Ok(Stats {
requests_per_second: requests as f32 / TEST_SECONDS as f32,
#[allow(clippy::arithmetic_side_effects)]
latency_s: (latency_accumulator_us / requests) as f32 / 1e6,
})
}
64 changes: 64 additions & 0 deletions thread-manager/examples/core_contention_basics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use {
agave_thread_manager::*,
log::info,
std::{io::Read, path::PathBuf, time::Duration},
tokio::sync::oneshot,
};

mod common;
use common::*;

fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let experiments = [
"examples/core_contention_dedicated_set.toml",
"examples/core_contention_contending_set.toml",
];

for exp in experiments {
info!("===================");
info!("Running {exp}");
let mut conf_file = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
conf_file.push(exp);
let mut buf = String::new();
std::fs::File::open(conf_file)?.read_to_string(&mut buf)?;
let cfg: ThreadManagerConfig = toml::from_str(&buf)?;

let manager = ThreadManager::new(cfg).unwrap();
let tokio1 = manager.get_tokio("axum1");
tokio1.start_metrics_sampling(Duration::from_secs(1));
let tokio2 = manager.get_tokio("axum2");
tokio2.start_metrics_sampling(Duration::from_secs(1));

let workload_runtime = TokioRuntime::new(
"LoadGenerator".to_owned(),
TokioConfig {
core_allocation: CoreAllocation::DedicatedCoreSet { min: 32, max: 64 },
..Default::default()
},
)?;

let results = std::thread::scope(|scope| {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

scope.spawn(|| {
tokio1.tokio.block_on(axum_main(8888, tx1));
});
scope.spawn(|| {
tokio2.tokio.block_on(axum_main(8889, tx2));
});

// Wait for axum servers to start
rx1.blocking_recv().unwrap();
rx2.blocking_recv().unwrap();

let join_handle =
scope.spawn(|| workload_runtime.block_on(workload_main(&[8888, 8889], 1000)));
join_handle.join().expect("Load generator crashed!")
});
//print out the results of the bench run
println!("Results are: {:?}", results);
}
Ok(())
}
13 changes: 13 additions & 0 deletions thread-manager/examples/core_contention_contending_set.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[native_configs]

[rayon_configs]

[tokio_configs.axum1]
worker_threads = 8
max_blocking_threads = 1
core_allocation.DedicatedCoreSet = { min = 0, max = 8 }

[tokio_configs.axum2]
worker_threads = 8
max_blocking_threads = 1
core_allocation.DedicatedCoreSet = { min = 0, max = 8 }
13 changes: 13 additions & 0 deletions thread-manager/examples/core_contention_dedicated_set.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[native_configs]

[rayon_configs]

[tokio_configs.axum1]
worker_threads = 4
max_blocking_threads = 1
core_allocation.DedicatedCoreSet = { min = 0, max = 4 }

[tokio_configs.axum2]
worker_threads = 4
max_blocking_threads = 1
core_allocation.DedicatedCoreSet = { min = 4, max = 8 }
Loading
Loading