From b8d8e9449c159554283f607b5ba58c372624a759 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Fri, 17 Jan 2025 18:06:11 -0300 Subject: [PATCH] wip --- Cargo.lock | 1 + crates/fluvio-benchmark/Cargo.toml | 1 + crates/fluvio-benchmark/src/config.rs | 6 +-- .../src/producer_benchmark.rs | 11 +++-- .../fluvio-benchmark/src/producer_worker.rs | 46 +++++++++++++++++-- .../fluvio-benchmark/src/stats_collector.rs | 4 ++ 6 files changed, 58 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbd2e763d6b..e51261eb182 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2328,6 +2328,7 @@ dependencies = [ "fluvio", "fluvio-future", "futures-util", + "hdrhistogram", "humantime", "rand", "rand_xoshiro", diff --git a/crates/fluvio-benchmark/Cargo.toml b/crates/fluvio-benchmark/Cargo.toml index fa94c1c1b65..a6144b84308 100644 --- a/crates/fluvio-benchmark/Cargo.toml +++ b/crates/fluvio-benchmark/Cargo.toml @@ -17,6 +17,7 @@ clap = { workspace = true, features = ["std","derive"] } derive_builder = { workspace = true } futures-util = { workspace = true } humantime = { workspace = true } +hdrhistogram = { workspace = true } rand = { workspace = true } rand_xoshiro = { workspace = true } rayon = { workspace = true } diff --git a/crates/fluvio-benchmark/src/config.rs b/crates/fluvio-benchmark/src/config.rs index f9d11d1f57c..d3b824e8260 100644 --- a/crates/fluvio-benchmark/src/config.rs +++ b/crates/fluvio-benchmark/src/config.rs @@ -7,9 +7,9 @@ use bytesize::ByteSize; use crate::utils; -const DEFAULT_BATCH_SIZE: ByteSize = ByteSize(16_384); +const DEFAULT_BATCH_SIZE: ByteSize = ByteSize::kib(16); const DEFAULT_QUEUE_SIZE: u64 = 10; -const DEFAULT_MAX_REQUEST_SIZE: ByteSize = ByteSize(33_554_432); +const DEFAULT_MAX_REQUEST_SIZE: ByteSize = ByteSize::mib(32); const DEFAULT_LINGER: &str = "0ms"; const DEFAULT_SERVER_TIMEOUT: &str = "5000ms"; const DEFAULT_COMPRESSION: Compression = Compression::None; @@ -19,7 +19,7 @@ const DEFAULT_WORKER_TIMEOUT: &str = "3000s"; const DEFAULT_RECORD_KEY_ALLOCATION_STRATEGY: RecordKeyAllocationStrategy = RecordKeyAllocationStrategy::NoKey; const DEFAULT_NUM_PRODUCERS: u64 = 1; -const DEFAULT_RECORD_SIZE: ByteSize = ByteSize(5120); +const DEFAULT_RECORD_SIZE: ByteSize = ByteSize::kib(5); const DEFAULT_NUM_RECORDS: u64 = 10_000; const DEFAULT_PARTITIONS: u32 = 1; const DEFAULT_REPLICAS: u32 = 1; diff --git a/crates/fluvio-benchmark/src/producer_benchmark.rs b/crates/fluvio-benchmark/src/producer_benchmark.rs index ea96326e66d..91c720ff3d3 100644 --- a/crates/fluvio-benchmark/src/producer_benchmark.rs +++ b/crates/fluvio-benchmark/src/producer_benchmark.rs @@ -1,6 +1,7 @@ use anyhow::Result; use async_channel::{unbounded, Receiver}; +use bytesize::ByteSize; use fluvio_future::{task::spawn, future::timeout, timer::sleep}; use fluvio::{metadata::topic::TopicSpec, FluvioAdmin}; use crate::{config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector}; @@ -67,14 +68,14 @@ impl ProducerBenchmark { println!("benchmark started"); // delay 1 seconds, so produce can start - sleep(std::time::Duration::from_secs(1)).await; + //sleep(std::time::Duration::from_secs(1)).await; while let Ok(stat) = stat_receiver.recv().await { - let human_readable_bytes = format!("{:9.1}mb/s", stat.bytes_per_sec / 1000000.0); + let human_readable_bytes = ByteSize(stat.bytes_per_sec as u64).to_string(); println!( - "total bytes send: {} | total message send: {} | message: per second: {}, bytes per sec: {}, ", - stat.total_bytes_send, stat.total_message_send, stat.message_per_sec,human_readable_bytes - ); + "Total bytes sent: {} | Total messages sent: {} | Messages per second: {} | Bytes per second: {}", + stat.total_bytes_send, stat.total_message_send, stat.message_per_sec, human_readable_bytes + ); } // Wait for all producers to finish diff --git a/crates/fluvio-benchmark/src/producer_worker.rs b/crates/fluvio-benchmark/src/producer_worker.rs index d5658c9050a..15255e50d73 100644 --- a/crates/fluvio-benchmark/src/producer_worker.rs +++ b/crates/fluvio-benchmark/src/producer_worker.rs @@ -1,9 +1,15 @@ +use std::{ + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; + use anyhow::Result; use fluvio::{ - dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, RecordKey, + dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, ProduceOutput, RecordKey, TopicProducerConfigBuilder, TopicProducerPool, }; +use fluvio_future::{task::spawn, timer::sleep}; use crate::{ config::{ProducerConfig, RecordKeyAllocationStrategy}, @@ -66,15 +72,49 @@ impl ProducerWorker { pub async fn send_batch(mut self) -> Result<()> { println!("producer is sending batch"); + let (tx, rx) = async_channel::unbounded::<(ProduceOutput, Instant)>(); + + spawn(async move { + let hist = Arc::new(Mutex::new(hdrhistogram::Histogram::::new(2).unwrap())); + + loop { + let (send_out, time) = rx.recv().await.unwrap(); + let hist = hist.clone(); + spawn(async move { + let _o = send_out.wait().await.unwrap(); + let duration = time.elapsed(); + let mut hist = hist.lock().unwrap(); + hist.record(duration.as_nanos() as u64).expect("record"); + + if hist.len() >= 100000 { + let mut latency_yaml = "- Variable: Latency\n".to_string(); + for percentile in [0.0, 0.5, 0.95, 0.99, 1.0] { + latency_yaml.push_str(&format!( + " p{percentile:4.2}: {:?}\n", + Duration::from_nanos(hist.value_at_quantile(percentile)) + )); + } + println!("{}", latency_yaml); + } + }); + } + }); + for record in self.records_to_send.into_iter() { self.stat.start(); - self.fluvio_producer + let time = std::time::Instant::now(); + let send_out = self + .fluvio_producer .send(record.key, record.data.clone()) .await?; + + tx.send((send_out, time)).await?; + self.stat.record_record_send(record.data.len() as u64).await; } - self.fluvio_producer.flush().await?; + + sleep(Duration::from_secs(10)).await; println!("producer is done sending batch"); Ok(()) diff --git a/crates/fluvio-benchmark/src/stats_collector.rs b/crates/fluvio-benchmark/src/stats_collector.rs index 99cec5eed1f..8430dc98b5b 100644 --- a/crates/fluvio-benchmark/src/stats_collector.rs +++ b/crates/fluvio-benchmark/src/stats_collector.rs @@ -20,6 +20,7 @@ impl ProducerStat { pub(crate) fn calcuate(&mut self) -> Stat { let elapse = self.start_time.elapsed().as_millis(); + let message_per_sec = ((self.message_send as f64 / elapse as f64) * 1000.0).round(); let bytes_per_sec = (self.message_bytes as f64 / elapse as f64) * 1000.0; @@ -34,6 +35,9 @@ impl ProducerStat { pub(crate) fn set_current_time(&mut self) { self.start_time = Instant::now(); } + + pub(crate) fn set_out(&mut self, bytes: u64) { + } } pub(crate) struct Stat {