Skip to content

Commit

Permalink
move collection to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
sehz committed Jan 17, 2025
1 parent de9339a commit aae3cea
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 46 deletions.
42 changes: 12 additions & 30 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::sync::{atomic::Ordering, Arc};

use anyhow::Result;
use async_channel::{unbounded, Receiver};

use fluvio_future::{task::spawn, future::timeout, timer::sleep};
use fluvio::{metadata::topic::TopicSpec, FluvioAdmin};
use crate::{config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::ProduceStat};
use crate::{config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector};

pub struct ProducerBenchmark {}

Expand Down Expand Up @@ -50,13 +48,13 @@ impl ProducerBenchmark {
let mut tx_controls = Vec::new();
let mut workers_jh = Vec::new();

let stat = Arc::new(ProduceStat::default());

let (stat_sender, stat_receiver) = unbounded();
// Set up producers
for producer_id in 0..config.shared_config.load_config.num_producers {
println!("starting up producer {}", producer_id);
let stat_collector = StatCollector::create(10000, stat_sender.clone());
let (tx_control, rx_control) = unbounded();
let worker = ProducerWorker::new(producer_id, config.clone(), stat.clone()).await?;
let worker = ProducerWorker::new(producer_id, config.clone(), stat_collector).await?;
let jh = spawn(timeout(
config.shared_config.worker_timeout,
ProducerDriver::main_loop(rx_control, worker),
Expand All @@ -71,31 +69,16 @@ impl ProducerBenchmark {
// delay 1 seconds, so produce can start
sleep(std::time::Duration::from_secs(1)).await;

let stats = stat.clone();
spawn(async move {
loop {
let stat = stats.clone();
let run_start_num_messages = stat.message_send.load(Ordering::Relaxed);
let run_start_bytes = stat.message_bytes.load(Ordering::Relaxed);

let start_time = std::time::Instant::now();
sleep(std::time::Duration::from_millis(500)).await;
let elapse = start_time.elapsed().as_millis();

let run_end_bytes = stat.message_bytes.load(Ordering::Relaxed);
let run_end_num_messages = stat.message_send.load(Ordering::Relaxed);
let bytes_send = run_end_bytes - run_start_bytes;
let message_send = run_end_num_messages - run_start_num_messages;

let bytes_per_sec = (bytes_send as f64 / elapse as f64) * 1000.0;
let human_readable_bytes = format!("{:9.1}mb/s", bytes_per_sec / 1000000.0);
let message_per_sec = ((message_send as f64 / elapse as f64) * 1000.0).round();
println!(
while let Ok(stat) = stat_receiver.recv().await {
if stat.end {
break;
}
let human_readable_bytes = format!("{:9.1}mb/s", stat.bytes_per_sec / 1000000.0);
println!(
"total bytes send: {} | total message send: {} | message: per second: {}, bytes per sec: {}, ",
bytes_send, message_send, message_per_sec, human_readable_bytes
stat.total_bytes_send, stat.total_message_send, stat.message_per_sec,human_readable_bytes
);
}
});
}

// Wait for all producers to finish
for jh in workers_jh {
Expand All @@ -104,7 +87,6 @@ impl ProducerBenchmark {

// Print stats
println!("Benchmark completed");
println!("Stats: {:#?}", stat);

Ok(())
}
Expand Down
19 changes: 8 additions & 11 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::{atomic::Ordering, Arc};

use anyhow::Result;

use fluvio::{
Expand All @@ -9,19 +7,19 @@ use fluvio::{

use crate::{
config::{ProducerConfig, RecordKeyAllocationStrategy},
stats_collector::ProduceStat,
stats_collector::StatCollector,
utils,
};

const SHARED_KEY: &str = "shared_key";

pub struct ProducerWorker {
pub(crate) struct ProducerWorker {
fluvio_producer: TopicProducerPool,
records_to_send: Vec<BenchmarkRecord>,
stat: Arc<ProduceStat>,
stat: StatCollector,
}
impl ProducerWorker {
pub async fn new(id: u64, config: ProducerConfig, stat: Arc<ProduceStat>) -> Result<Self> {
pub(crate) async fn new(id: u64, config: ProducerConfig, stat: StatCollector) -> Result<Self> {
let fluvio = Fluvio::connect().await?;

let fluvio_config = TopicProducerConfigBuilder::default()
Expand Down Expand Up @@ -65,20 +63,19 @@ impl ProducerWorker {
})
}

pub async fn send_batch(self) -> Result<()> {
pub async fn send_batch(mut self) -> Result<()> {
println!("producer is sending batch");

for record in self.records_to_send.into_iter() {
self.stat.start();
self.fluvio_producer
.send(record.key, record.data.clone())
.await?;
self.stat
.message_bytes
.fetch_add(record.data.len() as u64, Ordering::Relaxed);
self.stat.message_send.fetch_add(1, Ordering::Relaxed);
self.stat.record_record_send(record.data.len() as u64).await;
}

self.fluvio_producer.flush().await?;
self.stat.finish();
println!("producer is done sending batch");

Ok(())
Expand Down
99 changes: 94 additions & 5 deletions crates/fluvio-benchmark/src/stats_collector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,96 @@
use std::sync::atomic::AtomicU64;
use std::time::Instant;

#[derive(Default, Debug)]
pub struct ProduceStat {
pub message_send: AtomicU64,
pub message_bytes: AtomicU64,
use async_channel::Sender;

#[derive(Debug)]
pub(crate) struct ProducerStat {
record_send: u64,
record_bytes: u64,
start_time: Instant,
}

impl ProducerStat {
pub(crate) fn new() -> Self {
Self {
record_send: 0,
record_bytes: 0,
start_time: Instant::now(),
}
}

pub(crate) fn calcuate(&mut self) -> Stat {
let elapse = self.start_time.elapsed().as_millis();
let message_per_sec = ((self.record_send as f64 / elapse as f64) * 1000.0).round();
let bytes_per_sec = (self.record_bytes as f64 / elapse as f64) * 1000.0;

Stat {
message_per_sec,
bytes_per_sec,
total_bytes_send: self.record_bytes,
total_message_send: self.record_send,
end: false,
}
}

pub(crate) fn set_current_time(&mut self) {
self.start_time = Instant::now();
}
}

pub(crate) struct Stat {
pub message_per_sec: f64,
pub bytes_per_sec: f64,
pub total_bytes_send: u64,
pub total_message_send: u64,
pub end: bool,
}

pub(crate) struct StatCollector {
current: ProducerStat,
batch_size: u64, // number of records before we calculate stats
current_record: u64, // how many records we have sent in current cycle
sender: Sender<Stat>,
}

impl StatCollector {
pub(crate) fn create(batch_size: u64, sender: Sender<Stat>) -> Self {
Self {
current: ProducerStat::new(),
batch_size,
current_record: 0,
sender,
}
}

pub(crate) fn start(&mut self) {
if self.current_record == 0 {
self.current.set_current_time();
}
}

pub(crate) async fn record_record_send(&mut self, bytes: u64) {
self.current.record_send += 1;
self.current.record_bytes += bytes;
self.current_record += 1;

if self.current_record >= self.batch_size {
let stat = self.current.calcuate();
self.current_record = 0;
self.current.record_bytes = 0;
self.current.record_send = 0;
self.sender.try_send(stat).expect("send stats");
}
}

pub(crate) fn finish(&mut self) {
let end_record = Stat {
message_per_sec: 0.0,
bytes_per_sec: 0.0,
total_bytes_send: 0,
total_message_send: 0,
end: true,
};

self.sender.try_send(end_record).expect("send end stats");
}
}

0 comments on commit aae3cea

Please sign in to comment.