Skip to content

Commit

Permalink
add parallel benchmarking for chunk read/write operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Mili committed Mar 5, 2025
1 parent 4d42753 commit 366e732
Showing 1 changed file with 136 additions and 29 deletions.
165 changes: 136 additions & 29 deletions pumpkin-world/benches/chunk_noise_populate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use pumpkin_world::{
GlobalProtoNoiseRouter, GlobalRandomConfig, NOISE_ROUTER_ASTS, bench_create_and_populate_noise,
chunk::ChunkData, global_path, level::Level,
};
use tokio::sync::RwLock;
use tokio::{runtime::Runtime, sync::RwLock, task::JoinSet};

fn bench_populate_noise(c: &mut Criterion) {
let seed = 0;
Expand All @@ -19,44 +19,72 @@ fn bench_populate_noise(c: &mut Criterion) {
});
}

async fn test_reads(root_dir: PathBuf, positions: Vec<Vector2<i32>>) {
let level = Arc::new(Level::from_root_folder(root_dir));

async fn test_reads(level: &Arc<Level>, positions: Vec<Vector2<i32>>) {
let (send, mut recv) = tokio::sync::mpsc::unbounded_channel();
let level = level.clone();
tokio::spawn(async move { level.fetch_chunks(&positions, send).await });

while let Some(x) = recv.recv().await {
// Don't compile me away!
let _ = x;
}
}
async fn test_reads_parallel(level: &Arc<Level>, positions: Vec<Vector2<i32>>, threads: usize) {
let mut tasks = JoinSet::new();

// we write non overlapping chunks to avoid conflicts or level cache
// also we use `.rev()` to get the external radius first, avoiding
// multiple files on the same request.
for positions in positions.chunks(CHUNKS_ON_PARALLEL).rev().take(threads) {
let level = level.clone();
let positions = positions.to_vec();
tasks.spawn(async move {
test_reads(&level, positions.clone()).await;
});
}

async fn test_writes(root_dir: PathBuf, chunks: Vec<(Vector2<i32>, Arc<RwLock<ChunkData>>)>) {
let level = Arc::new(Level::from_root_folder(root_dir));
let _ = tasks.join_all().await;
}

async fn test_writes(level: &Arc<Level>, chunks: Vec<(Vector2<i32>, Arc<RwLock<ChunkData>>)>) {
level.write_chunks(chunks).await;
}

async fn test_writes_parallel(
level: &Arc<Level>,
chunks: Vec<(Vector2<i32>, Arc<RwLock<ChunkData>>)>,
threads: usize,
) {
let mut tasks = JoinSet::new();

// we write non overlapping chunks to avoid conflicts or level cache
// also we use `.rev()` to get the external radius first, avoiding
// multiple files on the same request.
for chunks in chunks.chunks(CHUNKS_ON_PARALLEL).rev().take(threads) {
let level = level.clone();
let chunks = chunks.to_vec();
tasks.spawn(async move {
test_writes(&level, chunks).await;
});
}

let _ = tasks.join_all().await;
}

// -16..16 == 32 chunks, 32*32 == 1024 chunks
const MIN_CHUNK: i32 = -16;
const MAX_CHUNK: i32 = 16;

// Depends on config options from `./config`
fn bench_chunk_io(c: &mut Criterion) {
// System temp dirs are in-memory, so we cant use temp_dir
let root_dir = global_path!("./bench_root_tmp");
let _ = fs::remove_dir_all(&root_dir); // delete if it exists
fs::create_dir(&root_dir).unwrap(); // create the directory

let async_handler = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
/// How many chunks to use on parallel tests
const CHUNKS_ON_PARALLEL: usize = 32;

fn initialize_level(
async_handler: &Runtime,
root_dir: PathBuf,
) -> Vec<(Vector2<i32>, Arc<RwLock<ChunkData>>)> {
println!("Initializing data...");

// Initial writes
let mut chunks = Vec::new();
let mut positions = Vec::new();
async_handler.block_on(async {
let (send, mut recv) = tokio::sync::mpsc::unbounded_channel();

Expand All @@ -75,33 +103,104 @@ fn bench_chunk_io(c: &mut Criterion) {
while let Some((chunk, _)) = recv.recv().await {
let pos = chunk.read().await.position;
chunks.push((pos, chunk));
positions.push(pos);
}
level_to_save.write_chunks(chunks.clone()).await;
});

// Sort by distance from origin to ensure a fair selection
// when using a subset of the total chunks for the benchmarks
chunks.sort_unstable_by_key(|chunk| (chunk.0.x * chunk.0.x) + (chunk.0.z * chunk.0.z));
positions.sort_unstable_by_key(|pos| (pos.x * pos.x) + (pos.z * pos.z));
return chunks;
}

// Depends on config options from `./config`
fn bench_chunk_io_parallel(c: &mut Criterion) {
// System temp dirs are in-memory, so we cant use temp_dir
let root_dir = global_path!("./bench_root_tmp");
let _ = fs::remove_dir_all(&root_dir); // delete if it exists
fs::create_dir(&root_dir).unwrap(); // create the directory

let async_handler = tokio::runtime::Builder::new_multi_thread().build().unwrap();

let chunks = initialize_level(&async_handler, root_dir.clone());
let positions = chunks.iter().map(|(pos, _)| *pos).collect::<Vec<_>>();

let iters = [1, 2, 8, 32];

let mut write_group_parallel = c.benchmark_group("write_chunks");
for n_requests in iters {
let root_dir = root_dir.clone();

write_group_parallel.bench_with_input(
BenchmarkId::new("Parallel", n_requests),
&chunks,
|b, paralel_chunks| {

Check warning on line 137 in pumpkin-world/benches/chunk_noise_populate.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"paralel" should be "parallel".
let chunks = paralel_chunks.to_vec();

Check warning on line 138 in pumpkin-world/benches/chunk_noise_populate.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"paralel" should be "parallel".
b.to_async(&async_handler).iter(async || {
let level = Arc::new(Level::from_root_folder(root_dir.clone()));
test_writes_parallel(&level, chunks.clone(), n_requests).await
})
},
);
}
write_group_parallel.finish();

let mut read_group = c.benchmark_group("read_chunks");
for n_requests in iters {
let root_dir = root_dir.clone();

read_group.bench_with_input(
BenchmarkId::new("Parallel", n_requests),
&positions,
|b, positions| {
let positions = positions.to_vec();
b.to_async(&async_handler).iter(async || {
let level = Arc::new(Level::from_root_folder(root_dir.clone()));
test_reads_parallel(&level, positions.clone(), n_requests).await
})
},
);
}
read_group.finish();

fs::remove_dir_all(&root_dir).unwrap(); // cleanup
}

// Depends on config options from `./config`
fn bench_chunk_io(c: &mut Criterion) {
// System temp dirs are in-memory, so we cant use temp_dir
let root_dir = global_path!("./bench_root_tmp");
let _ = fs::remove_dir_all(&root_dir); // delete if it exists
fs::create_dir(&root_dir).unwrap(); // create the directory

let async_handler = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

let chunks = initialize_level(&async_handler, root_dir.clone());
let positions = chunks.iter().map(|(pos, _)| *pos).collect::<Vec<_>>();

let iters = [16, 64, 256, 512];
// These test worst case: no caching done by `Level`
// testing with 16, 64, 256 chunks
let mut write_group = c.benchmark_group("write_chunks");
for n_chunks in [16, 64, 256, 512] {
for n_chunks in iters {
let chunks = &chunks[..n_chunks];
let root_dir = root_dir.clone();
assert!(
chunks.len() == n_chunks,
"Expected {} chunks, got {}",
n_chunks,
chunks.len()
);
write_group.bench_with_input(
BenchmarkId::from_parameter(n_chunks),
BenchmarkId::new("Single", n_chunks),
&chunks,
|b, chunks| {
b.to_async(&async_handler)
.iter(|| test_writes(root_dir.clone(), chunks.to_vec()))
b.to_async(&async_handler).iter(async || {
let level = Arc::new(Level::from_root_folder(root_dir.clone()));
test_writes(&level, chunks.to_vec()).await
})
},
);
}
Expand All @@ -110,8 +209,9 @@ fn bench_chunk_io(c: &mut Criterion) {
// These test worst case: no caching done by `Level`
// testing with 16, 64, 256 chunks
let mut read_group = c.benchmark_group("read_chunks");
for n_chunks in [16, 64, 256, 512] {
for n_chunks in iters {
let positions = &positions[..n_chunks];
let root_dir = root_dir.clone();
assert!(
positions.len() == n_chunks,
"Expected {} chunks, got {}",
Expand All @@ -120,11 +220,13 @@ fn bench_chunk_io(c: &mut Criterion) {
);

read_group.bench_with_input(
BenchmarkId::from_parameter(n_chunks),
BenchmarkId::new("Single", n_chunks),
&positions,
|b, positions| {
b.to_async(&async_handler)
.iter(|| test_reads(root_dir.clone(), positions.to_vec()))
b.to_async(&async_handler).iter(async || {
let level = Arc::new(Level::from_root_folder(root_dir.clone()));
test_reads(&level, positions.to_vec()).await
})
},
);
}
Expand All @@ -133,5 +235,10 @@ fn bench_chunk_io(c: &mut Criterion) {
fs::remove_dir_all(&root_dir).unwrap(); // cleanup
}

criterion_group!(benches, bench_populate_noise, bench_chunk_io);
criterion_group!(
benches,
bench_populate_noise,
bench_chunk_io_parallel,
bench_chunk_io
);
criterion_main!(benches);

0 comments on commit 366e732

Please sign in to comment.