diff --git a/crates/fluvio-benchmark/src/producer_benchmark.rs b/crates/fluvio-benchmark/src/producer_benchmark.rs index 6b9dd32543..9fb9bc7aeb 100644 --- a/crates/fluvio-benchmark/src/producer_benchmark.rs +++ b/crates/fluvio-benchmark/src/producer_benchmark.rs @@ -57,8 +57,12 @@ impl ProducerBenchmark { for producer_id in 0..config.num_producers { let (event_sender, event_receiver) = unbounded(); println!("starting up producer {}", producer_id); - let stat_collector = - StatCollector::create(end_sender.clone(), stat_sender.clone(), event_receiver); + let stat_collector = StatCollector::create( + config.num_records, + end_sender.clone(), + stat_sender.clone(), + event_receiver, + ); let (tx_control, rx_control) = unbounded(); let worker = ProducerWorker::new(producer_id, config.clone(), stat_collector, event_sender) diff --git a/crates/fluvio-benchmark/src/stats_collector.rs b/crates/fluvio-benchmark/src/stats_collector.rs index 06aa0afccf..2fe1dd8d42 100644 --- a/crates/fluvio-benchmark/src/stats_collector.rs +++ b/crates/fluvio-benchmark/src/stats_collector.rs @@ -6,7 +6,11 @@ use std::{ use async_channel::{Receiver, Sender}; use fluvio::ProduceCompletionBatchEvent; -use fluvio_future::{sync::Mutex, task::spawn, timer::sleep}; +use fluvio_future::{ + sync::{Mutex, RwLock}, + task::spawn, + timer::sleep, +}; use hdrhistogram::Histogram; pub(crate) struct ProducerStat {} @@ -34,6 +38,7 @@ pub struct EndProducerStat { impl ProducerStat { pub(crate) fn new( + num_records: u64, end_sender: Sender, stats_sender: Sender, event_receiver: Receiver, @@ -45,7 +50,13 @@ impl ProducerStat { let histogram = Arc::new(Mutex::new(hdrhistogram::Histogram::::new(3).unwrap())); - ProducerStat::track_latency(end_sender, histogram.clone(), event_receiver, stats.clone()); + ProducerStat::track_latency( + num_records, + end_sender, + histogram.clone(), + event_receiver, + stats.clone(), + ); ProducerStat::send_stats(histogram.clone(), stats_sender, stats.clone()) .expect("send stats"); @@ -103,6 +114,7 @@ impl ProducerStat { } fn track_latency( + num_records: u64, end_sender: Sender, histogram: Arc>>, event_receiver: Receiver, @@ -110,45 +122,53 @@ impl ProducerStat { ) { spawn(async move { let hist = histogram.clone(); - let mut first_start_time = Option::None; + let first_start_time = Arc::new(RwLock::new(None)); while let Ok(event) = event_receiver.recv().await { - if first_start_time.is_none() { - first_start_time = Some(event.created_at); - } - let elapsed = event.created_at.elapsed(); - let mut hist = hist.lock().await; - hist.record(elapsed.as_nanos() as u64).expect("record"); - - stats - .record_send - .fetch_add(event.records_len, std::sync::atomic::Ordering::Relaxed); - stats - .record_bytes - .fetch_add(event.bytes_size, std::sync::atomic::Ordering::Relaxed); + let hist = hist.clone(); + let stats = stats.clone(); + let first_start_time = first_start_time.clone(); + spawn(async move { + if first_start_time.read().await.is_none() { + first_start_time.write().await.replace(event.created_at); + } + let mut hist = hist.lock().await; + hist.record(event.elapsed.as_nanos() as u64) + .expect("record"); + + stats + .record_send + .fetch_add(event.records_len, std::sync::atomic::Ordering::Relaxed); + stats + .record_bytes + .fetch_add(event.bytes_size, std::sync::atomic::Ordering::Relaxed); + }); } // send end - let hist = hist.lock().await; - let elapsed = first_start_time.expect("start time").elapsed(); - - let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0; - let records_per_sec = (stats.record_send.load(std::sync::atomic::Ordering::Relaxed) - as f64 - / elapsed_seconds) - .round() as u64; - let bytes_per_sec = (stats - .record_bytes - .load(std::sync::atomic::Ordering::Relaxed) as f64 - / elapsed_seconds) - .round() as u64; - - let end = EndProducerStat { - histogram: hist.clone(), - total_records: stats.record_send.load(std::sync::atomic::Ordering::Relaxed), - records_per_sec, - bytes_per_sec, - }; - end_sender.send(end).await.expect("send end"); + if stats.record_send.load(std::sync::atomic::Ordering::Relaxed) >= num_records { + let hist = hist.lock().await; + let elapsed = first_start_time.read().await.expect("start time").elapsed(); + + let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0; + let records_per_sec = (stats.record_send.load(std::sync::atomic::Ordering::Relaxed) + as f64 + / elapsed_seconds) + .round() as u64; + let bytes_per_sec = (stats + .record_bytes + .load(std::sync::atomic::Ordering::Relaxed) + as f64 + / elapsed_seconds) + .round() as u64; + + let end = EndProducerStat { + histogram: hist.clone(), + total_records: stats.record_send.load(std::sync::atomic::Ordering::Relaxed), + records_per_sec, + bytes_per_sec, + }; + end_sender.send(end).await.expect("send end"); + } }); } } @@ -159,12 +179,13 @@ pub(crate) struct StatCollector { impl StatCollector { pub(crate) fn create( + num_records: u64, end_sender: Sender, stat_sender: Sender, event_receiver: Receiver, ) -> Self { Self { - _current: ProducerStat::new(end_sender, stat_sender, event_receiver), + _current: ProducerStat::new(num_records, end_sender, stat_sender, event_receiver), } } } diff --git a/crates/fluvio/src/producer/accumulator.rs b/crates/fluvio/src/producer/accumulator.rs index dd4545f7b2..7e1a00d0c7 100644 --- a/crates/fluvio/src/producer/accumulator.rs +++ b/crates/fluvio/src/producer/accumulator.rs @@ -225,6 +225,7 @@ pub struct ProduceCompletionBatchEvent { pub records_len: u64, pub partition: PartitionId, pub created_at: Instant, + pub elapsed: Duration, } pub type SharedProducerCallback = Arc; diff --git a/crates/fluvio/src/producer/partition_producer.rs b/crates/fluvio/src/producer/partition_producer.rs index bb08682a96..8d11772475 100644 --- a/crates/fluvio/src/producer/partition_producer.rs +++ b/crates/fluvio/src/producer/partition_producer.rs @@ -220,12 +220,14 @@ where if self.callback.is_some() { let created_at = metadata.created_at; + let elapsed = created_at.elapsed(); let event = ProduceCompletionBatchEvent { created_at, topic: self.replica.topic.clone(), partition: self.replica.partition, bytes_size, records_len, + elapsed, }; events_to_callback.push(event);