Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: getting response from the callback concurrently
Browse files Browse the repository at this point in the history
fraidev committed Jan 21, 2025
1 parent 59341e0 commit e8abcb5
Showing 4 changed files with 68 additions and 40 deletions.
8 changes: 6 additions & 2 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
@@ -57,8 +57,12 @@ impl ProducerBenchmark {
for producer_id in 0..config.num_producers {
let (event_sender, event_receiver) = unbounded();
println!("starting up producer {}", producer_id);
let stat_collector =
StatCollector::create(end_sender.clone(), stat_sender.clone(), event_receiver);
let stat_collector = StatCollector::create(
config.num_records,
end_sender.clone(),
stat_sender.clone(),
event_receiver,
);
let (tx_control, rx_control) = unbounded();
let worker =
ProducerWorker::new(producer_id, config.clone(), stat_collector, event_sender)
97 changes: 59 additions & 38 deletions crates/fluvio-benchmark/src/stats_collector.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,11 @@ use std::{

use async_channel::{Receiver, Sender};
use fluvio::ProduceCompletionBatchEvent;
use fluvio_future::{sync::Mutex, task::spawn, timer::sleep};
use fluvio_future::{
sync::{Mutex, RwLock},
task::spawn,
timer::sleep,
};
use hdrhistogram::Histogram;

pub(crate) struct ProducerStat {}
@@ -34,6 +38,7 @@ pub struct EndProducerStat {

impl ProducerStat {
pub(crate) fn new(
num_records: u64,
end_sender: Sender<EndProducerStat>,
stats_sender: Sender<Stats>,
event_receiver: Receiver<ProduceCompletionBatchEvent>,
@@ -45,7 +50,13 @@ impl ProducerStat {

let histogram = Arc::new(Mutex::new(hdrhistogram::Histogram::<u64>::new(3).unwrap()));

ProducerStat::track_latency(end_sender, histogram.clone(), event_receiver, stats.clone());
ProducerStat::track_latency(
num_records,
end_sender,
histogram.clone(),
event_receiver,
stats.clone(),
);

ProducerStat::send_stats(histogram.clone(), stats_sender, stats.clone())
.expect("send stats");
@@ -103,52 +114,61 @@ impl ProducerStat {
}

fn track_latency(
num_records: u64,
end_sender: Sender<EndProducerStat>,
histogram: Arc<Mutex<Histogram<u64>>>,
event_receiver: Receiver<ProduceCompletionBatchEvent>,
stats: Arc<AtomicStats>,
) {
spawn(async move {
let hist = histogram.clone();
let mut first_start_time = Option::None;
let first_start_time = Arc::new(RwLock::new(None));
while let Ok(event) = event_receiver.recv().await {
if first_start_time.is_none() {
first_start_time = Some(event.created_at);
}
let elapsed = event.created_at.elapsed();
let mut hist = hist.lock().await;
hist.record(elapsed.as_nanos() as u64).expect("record");

stats
.record_send
.fetch_add(event.records_len, std::sync::atomic::Ordering::Relaxed);
stats
.record_bytes
.fetch_add(event.bytes_size, std::sync::atomic::Ordering::Relaxed);
let hist = hist.clone();
let stats = stats.clone();
let first_start_time = first_start_time.clone();
spawn(async move {
if first_start_time.read().await.is_none() {
first_start_time.write().await.replace(event.created_at);
}
let mut hist = hist.lock().await;
hist.record(event.elapsed.as_nanos() as u64)
.expect("record");

stats
.record_send
.fetch_add(event.records_len, std::sync::atomic::Ordering::Relaxed);
stats
.record_bytes
.fetch_add(event.bytes_size, std::sync::atomic::Ordering::Relaxed);
});
}

// send end
let hist = hist.lock().await;
let elapsed = first_start_time.expect("start time").elapsed();

let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0;
let records_per_sec = (stats.record_send.load(std::sync::atomic::Ordering::Relaxed)
as f64
/ elapsed_seconds)
.round() as u64;
let bytes_per_sec = (stats
.record_bytes
.load(std::sync::atomic::Ordering::Relaxed) as f64
/ elapsed_seconds)
.round() as u64;

let end = EndProducerStat {
histogram: hist.clone(),
total_records: stats.record_send.load(std::sync::atomic::Ordering::Relaxed),
records_per_sec,
bytes_per_sec,
};
end_sender.send(end).await.expect("send end");
if stats.record_send.load(std::sync::atomic::Ordering::Relaxed) >= num_records {
let hist = hist.lock().await;
let elapsed = first_start_time.read().await.expect("start time").elapsed();

let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0;
let records_per_sec = (stats.record_send.load(std::sync::atomic::Ordering::Relaxed)
as f64
/ elapsed_seconds)
.round() as u64;
let bytes_per_sec = (stats
.record_bytes
.load(std::sync::atomic::Ordering::Relaxed)
as f64
/ elapsed_seconds)
.round() as u64;

let end = EndProducerStat {
histogram: hist.clone(),
total_records: stats.record_send.load(std::sync::atomic::Ordering::Relaxed),
records_per_sec,
bytes_per_sec,
};
end_sender.send(end).await.expect("send end");
}
});
}
}
@@ -159,12 +179,13 @@ pub(crate) struct StatCollector {

impl StatCollector {
pub(crate) fn create(
num_records: u64,
end_sender: Sender<EndProducerStat>,
stat_sender: Sender<Stats>,
event_receiver: Receiver<ProduceCompletionBatchEvent>,
) -> Self {
Self {
_current: ProducerStat::new(end_sender, stat_sender, event_receiver),
_current: ProducerStat::new(num_records, end_sender, stat_sender, event_receiver),
}
}
}
1 change: 1 addition & 0 deletions crates/fluvio/src/producer/accumulator.rs
Original file line number Diff line number Diff line change
@@ -225,6 +225,7 @@ pub struct ProduceCompletionBatchEvent {
pub records_len: u64,
pub partition: PartitionId,
pub created_at: Instant,
pub elapsed: Duration,
}

pub type SharedProducerCallback = Arc<dyn ProducerCallback + Send + Sync>;
2 changes: 2 additions & 0 deletions crates/fluvio/src/producer/partition_producer.rs
Original file line number Diff line number Diff line change
@@ -220,12 +220,14 @@ where

if self.callback.is_some() {
let created_at = metadata.created_at;
let elapsed = created_at.elapsed();
let event = ProduceCompletionBatchEvent {
created_at,
topic: self.replica.topic.clone(),
partition: self.replica.partition,
bytes_size,
records_len,
elapsed,
};

events_to_callback.push(event);

0 comments on commit e8abcb5

Please sign in to comment.