Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Jan 17, 2025
1 parent 7f25825 commit b8d8e94
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/fluvio-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ clap = { workspace = true, features = ["std","derive"] }
derive_builder = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
hdrhistogram = { workspace = true }
rand = { workspace = true }
rand_xoshiro = { workspace = true }
rayon = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions crates/fluvio-benchmark/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use bytesize::ByteSize;

use crate::utils;

const DEFAULT_BATCH_SIZE: ByteSize = ByteSize(16_384);
const DEFAULT_BATCH_SIZE: ByteSize = ByteSize::kib(16);
const DEFAULT_QUEUE_SIZE: u64 = 10;
const DEFAULT_MAX_REQUEST_SIZE: ByteSize = ByteSize(33_554_432);
const DEFAULT_MAX_REQUEST_SIZE: ByteSize = ByteSize::mib(32);
const DEFAULT_LINGER: &str = "0ms";
const DEFAULT_SERVER_TIMEOUT: &str = "5000ms";
const DEFAULT_COMPRESSION: Compression = Compression::None;
Expand All @@ -19,7 +19,7 @@ const DEFAULT_WORKER_TIMEOUT: &str = "3000s";
const DEFAULT_RECORD_KEY_ALLOCATION_STRATEGY: RecordKeyAllocationStrategy =
RecordKeyAllocationStrategy::NoKey;
const DEFAULT_NUM_PRODUCERS: u64 = 1;
const DEFAULT_RECORD_SIZE: ByteSize = ByteSize(5120);
const DEFAULT_RECORD_SIZE: ByteSize = ByteSize::kib(5);
const DEFAULT_NUM_RECORDS: u64 = 10_000;
const DEFAULT_PARTITIONS: u32 = 1;
const DEFAULT_REPLICAS: u32 = 1;
Expand Down
11 changes: 6 additions & 5 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Result;
use async_channel::{unbounded, Receiver};

use bytesize::ByteSize;
use fluvio_future::{task::spawn, future::timeout, timer::sleep};
use fluvio::{metadata::topic::TopicSpec, FluvioAdmin};
use crate::{config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector};
Expand Down Expand Up @@ -67,14 +68,14 @@ impl ProducerBenchmark {
println!("benchmark started");

// delay 1 seconds, so produce can start
sleep(std::time::Duration::from_secs(1)).await;
//sleep(std::time::Duration::from_secs(1)).await;

while let Ok(stat) = stat_receiver.recv().await {
let human_readable_bytes = format!("{:9.1}mb/s", stat.bytes_per_sec / 1000000.0);
let human_readable_bytes = ByteSize(stat.bytes_per_sec as u64).to_string();
println!(
"total bytes send: {} | total message send: {} | message: per second: {}, bytes per sec: {}, ",
stat.total_bytes_send, stat.total_message_send, stat.message_per_sec,human_readable_bytes
);
"Total bytes sent: {} | Total messages sent: {} | Messages per second: {} | Bytes per second: {}",
stat.total_bytes_send, stat.total_message_send, stat.message_per_sec, human_readable_bytes
);
}

// Wait for all producers to finish
Expand Down
46 changes: 43 additions & 3 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use std::{
sync::{Arc, Mutex},
time::{Duration, Instant},
};

use anyhow::Result;

use fluvio::{
dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, RecordKey,
dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, ProduceOutput, RecordKey,
TopicProducerConfigBuilder, TopicProducerPool,
};
use fluvio_future::{task::spawn, timer::sleep};

use crate::{
config::{ProducerConfig, RecordKeyAllocationStrategy},
Expand Down Expand Up @@ -66,15 +72,49 @@ impl ProducerWorker {
pub async fn send_batch(mut self) -> Result<()> {
println!("producer is sending batch");

let (tx, rx) = async_channel::unbounded::<(ProduceOutput, Instant)>();

spawn(async move {
let hist = Arc::new(Mutex::new(hdrhistogram::Histogram::<u64>::new(2).unwrap()));

loop {
let (send_out, time) = rx.recv().await.unwrap();
let hist = hist.clone();
spawn(async move {
let _o = send_out.wait().await.unwrap();
let duration = time.elapsed();
let mut hist = hist.lock().unwrap();
hist.record(duration.as_nanos() as u64).expect("record");

if hist.len() >= 100000 {
let mut latency_yaml = "- Variable: Latency\n".to_string();
for percentile in [0.0, 0.5, 0.95, 0.99, 1.0] {
latency_yaml.push_str(&format!(
" p{percentile:4.2}: {:?}\n",
Duration::from_nanos(hist.value_at_quantile(percentile))
));
}
println!("{}", latency_yaml);
}
});
}
});

for record in self.records_to_send.into_iter() {
self.stat.start();
self.fluvio_producer
let time = std::time::Instant::now();
let send_out = self
.fluvio_producer
.send(record.key, record.data.clone())
.await?;

tx.send((send_out, time)).await?;

self.stat.record_record_send(record.data.len() as u64).await;
}

self.fluvio_producer.flush().await?;

sleep(Duration::from_secs(10)).await;
println!("producer is done sending batch");

Ok(())
Expand Down
4 changes: 4 additions & 0 deletions crates/fluvio-benchmark/src/stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ impl ProducerStat {

pub(crate) fn calcuate(&mut self) -> Stat {
let elapse = self.start_time.elapsed().as_millis();

let message_per_sec = ((self.message_send as f64 / elapse as f64) * 1000.0).round();
let bytes_per_sec = (self.message_bytes as f64 / elapse as f64) * 1000.0;

Expand All @@ -34,6 +35,9 @@ impl ProducerStat {
pub(crate) fn set_current_time(&mut self) {
self.start_time = Instant::now();
}

pub(crate) fn set_out(&mut self, bytes: u64) {
}
}

pub(crate) struct Stat {
Expand Down

0 comments on commit b8d8e94

Please sign in to comment.