Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
sehz committed Jan 17, 2025
1 parent 8a34d49 commit 7f25825
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 18 deletions.
12 changes: 3 additions & 9 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
use std::sync::{atomic::Ordering, Arc};

use anyhow::Result;
use async_channel::{unbounded, Receiver};

use fluvio_future::{task::spawn, future::timeout, timer::sleep};
use fluvio::{metadata::topic::TopicSpec, FluvioAdmin};
use crate::{
config::ProducerConfig,
producer_worker::ProducerWorker,
stats_collector::StatCollector,
};
use crate::{config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector};

pub struct ProducerBenchmark {}

Expand Down Expand Up @@ -58,7 +52,7 @@ impl ProducerBenchmark {
// Set up producers
for producer_id in 0..config.shared_config.load_config.num_producers {
println!("starting up producer {}", producer_id);
let stat_collector = StatCollector::create(10000,stat_sender.clone());
let stat_collector = StatCollector::create(10000, stat_sender.clone());
let (tx_control, rx_control) = unbounded();
let worker = ProducerWorker::new(producer_id, config.clone(), stat_collector).await?;
let jh = spawn(timeout(
Expand All @@ -79,7 +73,7 @@ impl ProducerBenchmark {
let human_readable_bytes = format!("{:9.1}mb/s", stat.bytes_per_sec / 1000000.0);
println!(
"total bytes send: {} | total message send: {} | message: per second: {}, bytes per sec: {}, ",
stat.total_bytes_send, stat.total_bytes_send, stat.message_per_sec,human_readable_bytes
stat.total_bytes_send, stat.total_message_send, stat.message_per_sec,human_readable_bytes
);
}

Expand Down
12 changes: 5 additions & 7 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::{atomic::Ordering, Arc};

use anyhow::Result;

use fluvio::{
Expand All @@ -9,19 +7,19 @@ use fluvio::{

use crate::{
config::{ProducerConfig, RecordKeyAllocationStrategy},
stats_collector::{ProducerStat, StatCollector},
stats_collector::StatCollector,
utils,
};

const SHARED_KEY: &str = "shared_key";

pub struct ProducerWorker {
pub(crate) struct ProducerWorker {
fluvio_producer: TopicProducerPool,
records_to_send: Vec<BenchmarkRecord>,
stat: StatCollector
stat: StatCollector,
}
impl ProducerWorker {
pub async fn new(id: u64, config: ProducerConfig, stat: StatCollector) -> Result<Self> {
pub(crate) async fn new(id: u64, config: ProducerConfig, stat: StatCollector) -> Result<Self> {
let fluvio = Fluvio::connect().await?;

let fluvio_config = TopicProducerConfigBuilder::default()
Expand Down Expand Up @@ -73,7 +71,7 @@ impl ProducerWorker {
self.fluvio_producer
.send(record.key, record.data.clone())
.await?;
self.stat.record_record_send(record.data.len() as u64);
self.stat.record_record_send(record.data.len() as u64).await;
}

self.fluvio_producer.flush().await?;
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-benchmark/src/stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ pub(crate) struct StatCollector {

impl StatCollector {
pub(crate) fn create(batch_size: u64, sender: Sender<Stat>) -> Self {
(Self {
Self {
current: ProducerStat::new(),
batch_size,
current_record: 0,
sender,
})
}
}

pub(crate) fn start(&mut self) {
Expand Down

0 comments on commit 7f25825

Please sign in to comment.