Skip to content

Commit

Permalink
fix: benchmark dead channels (#4359)
Browse files Browse the repository at this point in the history
* fix: benchmark dead channels

* fix: last benchmark progress slow
  • Loading branch information
fraidev authored Jan 25, 2025
1 parent 2e67240 commit 4b46d63
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 257 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/fluvio-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ publish = false
anyhow = { workspace = true }
async-channel = { workspace = true }
bytesize = { workspace = true, features = ['serde'] }
chrono = { workspace = true, features = ['serde']}
clap = { workspace = true, features = ["std","derive"] }
derive_builder = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
hdrhistogram = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
rand_xoshiro = { workspace = true }
tokio = { workspace = true, features = ['macros'] }
rayon = { workspace = true }
tokio = { workspace = true, features = ['sync', 'macros'] }
serde = { workspace = true , features = ['derive'] }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
Expand Down
2 changes: 0 additions & 2 deletions crates/fluvio-benchmark/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ pub enum BenchmarkMode {
}

pub async fn run_benchmarks(opt: BenchmarkOpt) -> Result<()> {
println!("# Fluvio Benchmark Results");

if let Some(mode) = opt.benchmark {
BenchmarkDriver::run_benchmark(mode).await?;
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-benchmark/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bytesize::ByteSize;
use crate::utils;

const DEFAULT_BATCH_SIZE: &str = "16kib";
const DEFAULT_QUEUE_SIZE: u64 = 10;
const DEFAULT_QUEUE_SIZE: u64 = 100;
const DEFAULT_MAX_REQUEST_SIZE: &str = "32mib";
const DEFAULT_LINGER: &str = "0ms";
const DEFAULT_SERVER_TIMEOUT: &str = "5000ms";
Expand Down
177 changes: 91 additions & 86 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
use std::sync::Arc;

use anyhow::Result;
use async_channel::{unbounded, Receiver};
use async_channel::unbounded;

use bytesize::ByteSize;
use fluvio_future::{task::spawn, future::timeout, timer::sleep};
use fluvio_future::{future::timeout, task::spawn, timer::sleep};
use fluvio::{metadata::topic::TopicSpec, FluvioAdmin};
use tokio::select;
use futures_util::{stream::FuturesUnordered, StreamExt};
use tokio::sync::broadcast;
use tracing::debug;

use crate::{
config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector, utils,
config::ProducerConfig,
producer_worker::ProducerWorker,
stats_collector::{EndProducerStat, StatCollector, Stats},
utils,
};

pub struct ProducerBenchmark {}
Expand All @@ -28,7 +35,7 @@ impl ProducerBenchmark {
admin.create(topic_name.clone(), false, new_topic).await?;
}

println!("created topic {}", topic_name);
debug!("created topic {}", topic_name);
let result = ProducerBenchmark::run_samples(config.clone()).await;

sleep(std::time::Duration::from_millis(100)).await;
Expand All @@ -40,108 +47,106 @@ impl ProducerBenchmark {
// Clean up topic
if config.delete_topic {
admin.delete::<TopicSpec>(topic_name.clone()).await?;
print!("Topic deleted successfully {}", topic_name.clone());
debug!("Topic deleted successfully {}", topic_name.clone());
}

Ok(())
}

async fn run_samples(config: ProducerConfig) -> Result<()> {
let mut tx_controls = Vec::new();
let mut workers_jh = Vec::new();

let (stats_sender, stats_receiver) = unbounded();
let (end_sender, end_receiver) = unbounded();
let (end_sender, mut end_receiver) = broadcast::channel(2);
let end_sender = Arc::new(end_sender);
let stat_collector =
StatCollector::create(config.num_records, end_sender.clone(), stats_sender.clone());

// Set up producers
for producer_id in 0..config.num_producers {
let (event_sender, event_receiver) = unbounded();
stat_collector.add_producer(event_receiver);
println!("starting up producer {}", producer_id);
let (tx_control, rx_control) = unbounded();
let worker = ProducerWorker::new(producer_id, config.clone(), event_sender).await?;
let jh = spawn(timeout(
config.worker_timeout,
ProducerDriver::main_loop(rx_control, worker),
));
StatCollector::create(config.num_records, stats_sender.clone(), end_sender.clone());

tx_control.send(ControlMessage::SendBatch).await?;
tx_controls.push(tx_control);
workers_jh.push(jh);
}
Self::setup_producers(config.clone(), stat_collector).await;
println!("Benchmark started");
Self::print_progress_on_backgroud(stats_receiver).await;
Self::print_benchmark_on_end(&mut end_receiver).await;
println!("Benchmark completed");

loop {
select! {
stat_rx = stats_receiver.recv() => {
if let Ok(stat) = stat_rx {
let human_readable_bytes = ByteSize(stat.bytes_per_sec).to_string();
println!(
"{} records sent, {} records/sec: ({}/sec), {:.2}ms avg latency, {:.2}ms max latency",
stat.record_send, stat.records_per_sec, human_readable_bytes,
utils::nanos_to_ms_pritable(stat.latency_avg), utils::nanos_to_ms_pritable(stat.latency_max)
);
}
}
end = end_receiver.recv() => {
if let Ok(end) = end {
let mut latency_yaml = String::new();
latency_yaml.push_str(&format!("{:.2}ms avg latency, {:.2}ms max latency",
utils::nanos_to_ms_pritable(end.latencies_histogram.mean() as u64),
utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(1.0))));
for percentile in [0.5, 0.95, 0.99] {
latency_yaml.push_str(&format!(
", {:.2}ms p{percentile:4.2}",
utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(percentile)),
));
}
println!();
println!("{}", latency_yaml);

let human_readable_bytes = ByteSize(end.bytes_per_sec).to_string();
println!(
"{} total records sent, {} records/sec: ({}/sec) ",
end.total_records, end.records_per_sec, human_readable_bytes
);
}
break;
}
Ok(())
}

async fn setup_producers(config: ProducerConfig, stat_collector: StatCollector) {
spawn(async move {
let worker_futures = FuturesUnordered::new();
for producer_id in 0..config.num_producers {
let (event_sender, event_receiver) = unbounded();
stat_collector.add_producer(event_receiver);
let config = config.clone();
let jh = spawn(timeout(config.worker_timeout, async move {
debug!("starting up producer {}", producer_id);
let worker = ProducerWorker::new(producer_id, config, event_sender)
.await
.expect("create producer worker");
ProducerDriver::main_loop(worker).await.expect("main loop");
}));

worker_futures.push(jh);
}
}

// Wait for all producers to finish
for jh in workers_jh {
jh.await??;
}
for worker in worker_futures.collect::<Vec<_>>().await {
worker.expect("producer worker failed");
}
});
}

// Print stats
println!("Benchmark completed");
async fn print_progress_on_backgroud(stats_receiver: async_channel::Receiver<Stats>) {
spawn(async move {
while let Ok(stat) = stats_receiver.recv().await {
let human_readable_bytes = ByteSize(stat.bytes_per_sec).to_string();
println!(
"{} records sent, {} records/sec: ({}/sec), {} avg latency, {} max latency",
stat.record_send,
stat.records_per_sec,
human_readable_bytes,
utils::nanos_to_ms_pritable(stat.latency_avg),
utils::nanos_to_ms_pritable(stat.latency_max)
);
}
});
}

Ok(())
async fn print_benchmark_on_end(end_receiver: &mut broadcast::Receiver<EndProducerStat>) {
if let Ok(end) = end_receiver.recv().await {
// sleep enough time to make sure all stats are printed
sleep(std::time::Duration::from_secs(1)).await;
let mut latency_yaml = String::new();
latency_yaml.push_str(&format!(
"{} avg latency, {} max latency",
utils::nanos_to_ms_pritable(end.latencies_histogram.mean() as u64),
utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(1.0))
));
for percentile in [0.5, 0.95, 0.99] {
latency_yaml.push_str(&format!(
", {} p{percentile:4.2}",
utils::nanos_to_ms_pritable(
end.latencies_histogram.value_at_quantile(percentile)
),
));
}
println!();
println!("{}", latency_yaml);

let human_readable_bytes = ByteSize(end.bytes_per_sec).to_string();
println!(
"{} total records sent, {} records/sec: ({}/sec), total time: {}",
end.total_records,
end.records_per_sec,
human_readable_bytes,
utils::pretty_duration(end.elapsed)
);
}
}
}

struct ProducerDriver;

impl ProducerDriver {
async fn main_loop(rx: Receiver<ControlMessage>, worker: ProducerWorker) -> Result<()> {
//loop {
match rx.recv().await? {
ControlMessage::SendBatch => {
println!("producer send batch");
if let Err(err) = worker.send_batch().await {
println!("producer send batch error: {:#?}", err);
}
}
};
//}
async fn main_loop(worker: ProducerWorker) -> Result<()> {
worker.send_batch().await?;
Ok(())
}
}

#[derive(Clone, Copy, Debug)]
enum ControlMessage {
SendBatch,
}
35 changes: 3 additions & 32 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use fluvio::{
TopicProducerConfigBuilder, TopicProducerPool,
};
use futures_util::future::BoxFuture;
use tracing::debug;

use crate::{
config::{ProducerConfig, RecordKeyAllocationStrategy},
Expand Down Expand Up @@ -68,7 +69,7 @@ impl ProducerWorker {
.topic_producer_with_config(config.topic_name.clone(), fluvio_config)
.await?;

let num_records = records_per_producer(id, config.num_producers, config.num_records);
let num_records = utils::records_per_producer(id, config.num_producers, config.num_records);

let records_to_send = create_records(config.clone(), num_records, id);

Expand All @@ -79,7 +80,7 @@ impl ProducerWorker {
}

pub async fn send_batch(self) -> Result<()> {
println!("producer is sending batch");
debug!("producer is sending batch");

for record in self.records_to_send.into_iter() {
let _ = self
Expand Down Expand Up @@ -123,33 +124,3 @@ impl BenchmarkRecord {
Self { key, data }
}
}

/// Calculate the number of records each producer should send
fn records_per_producer(id: u64, num_producers: u64, num_records: u64) -> u64 {
if id == 0 {
num_records / num_producers + num_records % num_producers
} else {
num_records / num_producers
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_get_num_records_per_producer() {
let num_producers = 3;
let num_records = 10;

assert_eq!(records_per_producer(0, num_producers, num_records), 4);
assert_eq!(records_per_producer(1, num_producers, num_records), 3);
assert_eq!(records_per_producer(2, num_producers, num_records), 3);

let num_producers = 3;
let num_records = 12;
assert_eq!(records_per_producer(0, num_producers, num_records), 4);
assert_eq!(records_per_producer(1, num_producers, num_records), 4);
assert_eq!(records_per_producer(2, num_producers, num_records), 4);
}
}
Loading

0 comments on commit 4b46d63

Please sign in to comment.