Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Jan 17, 2025
1 parent eae7c72 commit bbe5a64
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/fluvio-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ clap = { workspace = true, features = ["std","derive"] }
derive_builder = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
hdrhistogram = { workspace = true }
rand = { workspace = true }
rand_xoshiro = { workspace = true }
rayon = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ProducerBenchmark {
println!("benchmark started");

// delay 1 seconds, so produce can start
sleep(std::time::Duration::from_secs(1)).await;
//sleep(std::time::Duration::from_secs(1)).await;

let stats = stat.clone();
spawn(async move {
Expand Down Expand Up @@ -126,7 +126,7 @@ impl ProducerBenchmark {
struct ProducerDriver;

impl ProducerDriver {
async fn main_loop(rx: Receiver<ControlMessage>, worker: ProducerWorker) -> Result<()> {
async fn main_loop(rx: Receiver<ControlMessage>, mut worker: ProducerWorker) -> Result<()> {
//loop {
match rx.recv().await? {
ControlMessage::SendBatch => {
Expand Down
68 changes: 63 additions & 5 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::sync::{atomic::Ordering, Arc};
use std::{
sync::{atomic::Ordering, Arc, Mutex},
time::Duration,
};

use anyhow::Result;

use fluvio::{
dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, RecordKey,
TopicProducerConfigBuilder, TopicProducerPool,
};
use fluvio_future::{task::spawn, timer::sleep};

use crate::{
config::{ProducerConfig, RecordKeyAllocationStrategy},
Expand Down Expand Up @@ -65,20 +69,74 @@ impl ProducerWorker {
})
}

pub async fn send_batch(self) -> Result<()> {
pub async fn send_batch(&mut self) -> Result<()> {
println!("producer is sending batch");

for record in self.records_to_send.into_iter() {
let metrics = self.fluvio_producer.metrics();
//let mut hist = hdrhistogram::Histogram::<u64>::new(3).unwrap();
let hist = Arc::new(Mutex::new(hdrhistogram::Histogram::<u64>::new(2).unwrap()));
let hist2 = hist.clone();

spawn(async move {
let metrics = metrics.clone();
let rx = &metrics.producer_client().latency_rx;
let hist = hist.clone();
loop {
let hist = hist.clone();
let rx = rx.clone();
match rx.recv().await {
Ok(v) => {
let duration = std::time::Duration::from_nanos(v);
hist.lock()
.unwrap()
.record(duration.as_nanos() as u64)
.expect("record");
}
Err(e) => {
println!("latency error: {:#?}", e);
}
}
}
});
for record in self.records_to_send.iter() {
//let time = Instant::now();
self.fluvio_producer
.send(record.key, record.data.clone())
//.send(record.key, record.data.clone())
.send(RecordKey::NULL, record.data.clone())
.await?;
//self.fluvio_producer.flush().await?;

self.stat
.message_bytes
.fetch_add(record.data.len() as u64, Ordering::Relaxed);
self.stat.message_send.fetch_add(1, Ordering::Relaxed);
}

self.fluvio_producer.flush().await?;

//sleep(Duration::from_secs(5)).await;

let hist = hist2.lock().unwrap();
println!("latencies: {:#?}", hist.mean());
println!("len: {:#?}", hist.len());
let mut latency_yaml = "- Variable: Latency\n".to_string();
for percentile in [0.0, 0.5, 0.95, 0.99, 1.0] {
latency_yaml.push_str(&format!(
" p{percentile:4.2}: {:?}\n",
Duration::from_nanos(hist.value_at_quantile(percentile))
));
}

println!("{}", latency_yaml);

println!(
"total records sent: {}",
self.stat.message_send.load(Ordering::Relaxed)
);
println!(
"total bytes sent: {}",
self.stat.message_bytes.load(Ordering::Relaxed)
);

println!("producer is done sending batch");

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-benchmark/src/stats_collector.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::atomic::AtomicU64;
use std::{sync::atomic::AtomicU64, time::Instant};

#[derive(Default, Debug)]
pub struct ProduceStat {
Expand Down
29 changes: 25 additions & 4 deletions crates/fluvio/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Serialize, Deserialize};
use serde::Serialize;

#[derive(Default, Debug, Deserialize, Serialize)]
#[derive(Default, Debug, Serialize)]
pub struct ClientMetrics {
consumer: RecordCounter,
producer_connector: RecordCounter,
Expand Down Expand Up @@ -40,7 +40,7 @@ impl ClientMetrics {
cfg_if::cfg_if! {
if #[cfg(any(target_arch = "wasm32", target_arch = "arm"))] {

#[derive(Default, Debug, Deserialize, Serialize)]
#[derive(Default, Debug, Deserialize)]
pub struct RecordCounter {

}
Expand All @@ -58,12 +58,27 @@ cfg_if::cfg_if! {
} else {
use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Default, Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize)]
pub struct RecordCounter {
pub records: AtomicU64,
pub bytes: AtomicU64,
#[serde(skip)]
pub latency_rx: async_channel::Receiver<u64>,
#[serde(skip)]
latency_tx: async_channel::Sender<u64>,
}

impl Default for RecordCounter {
fn default() -> Self {
let channel = async_channel::unbounded();
Self {
records: AtomicU64::new(0),
bytes: AtomicU64::new(0),
latency_rx: channel.1,
latency_tx: channel.0,
}
}
}
impl RecordCounter {
#[inline]
pub(crate) fn add_records(&self, value: u64) {
Expand All @@ -74,6 +89,12 @@ cfg_if::cfg_if! {
pub(crate) fn add_bytes(&self, value: u64) {
self.bytes.fetch_add(value, Ordering::SeqCst);
}

#[inline]
pub fn add_latency(&self, value: u64) -> anyhow::Result<()> {
self.latency_tx.try_send(value)?;
Ok(())
}
}

}
Expand Down
7 changes: 5 additions & 2 deletions crates/fluvio/src/producer/memory_batch.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{time::Instant, u128};

use chrono::Utc;

use fluvio_protocol::{
Expand All @@ -24,7 +26,7 @@ pub struct MemoryBatch {
}
impl MemoryBatch {
pub fn new(write_limit: usize, batch_limit: usize, compression: Compression) -> Self {
let now = Utc::now().timestamp_millis();
let now = Utc::now().timestamp_nanos_opt().expect("timestamp");
Self {
compression,
is_full: false,
Expand Down Expand Up @@ -90,7 +92,8 @@ impl MemoryBatch {
}

pub fn elapsed(&self) -> Timestamp {
let now = Utc::now().timestamp_millis();
//let now = Utc::now().timestamp_millis();
let now = Utc::now().timestamp_nanos_opt().expect("timestamp");

std::cmp::max(0, now - self.create_time)
}
Expand Down
15 changes: 15 additions & 0 deletions crates/fluvio/src/producer/partition_producer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::Arc;
use std::time::{Duration, Instant};

use async_lock::RwLock;
use chrono::Utc;
use tracing::{debug, info, instrument, error, trace};

use fluvio_protocol::record::ReplicaKey;
Expand Down Expand Up @@ -191,13 +193,17 @@ where

let mut batch_notifiers = vec![];

let mut created_batch = vec![];

for p_batch in batches_ready {
let mut partition_request = DefaultPartitionRequest {
partition_index: self.replica.partition,
..Default::default()
};
let notify = p_batch.notify.clone();
let batch = p_batch.batch();
let created = batch.get_base_timestamp();
created_batch.push(created);

let raw_batch: Batch<RawRecords> = batch.try_into()?;

Expand Down Expand Up @@ -225,6 +231,15 @@ where
}
}

let now = Utc::now().timestamp_nanos_opt().expect("timestamp");
for created in created_batch {
let producer_metrics = self.metrics.producer_client();
let delay = now - created;
if let Err(e) = producer_metrics.add_latency(delay as u64) {
error!("Failed to add latency metric: {}", e);
}
}

Ok(())
}

Expand Down

0 comments on commit bbe5a64

Please sign in to comment.