From 3bf5c5784c53b90b2da1f788b4d30615a31b498a Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Sat, 25 Jan 2025 01:12:46 -0300 Subject: [PATCH] feat: add benchmark table again (#4363) * feat: add benchmark table again * chore: init batch timestamp at record push --- Cargo.lock | 205 +++++++++++++++--- crates/fluvio-benchmark/Cargo.toml | 1 + .../src/producer_benchmark.rs | 32 ++- crates/fluvio/src/producer/accumulator.rs | 37 +++- crates/fluvio/src/producer/record.rs | 11 +- 5 files changed, 244 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6936171f41..e48608b8b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -778,6 +778,21 @@ dependencies = [ "serde", ] +[[package]] +name = "calamine" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a3a315226fdc5b1c3e33521073e1712a05944bc0664d665ff1f6ff0396334da" +dependencies = [ + "byteorder", + "codepage", + "encoding_rs", + "log", + "quick-xml", + "serde", + "zip", +] + [[package]] name = "camino" version = "1.1.9" @@ -880,7 +895,7 @@ dependencies = [ "heck", "home", "ignore", - "indexmap", + "indexmap 2.7.1", "indicatif", "liquid", "liquid-core", @@ -1114,7 +1129,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.1", "terminal_size", ] @@ -1160,6 +1175,15 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" +[[package]] +name = "codepage" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48f68d061bc2828ae826206326e61251aca94c1e4a5305cf52d9138639c918b4" +dependencies = [ + "encoding_rs", +] + [[package]] name = "color-eyre" version = "0.6.3" @@ -1626,6 +1650,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "ctrlc" version = "3.4.5" @@ -1698,7 +1743,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.11.1", "syn 2.0.96", ] @@ -1938,6 +1983,18 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "docopt" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f3f119846c823f9eafcf953a8f6ffb6ed69bf6240883261a7f13b634579a51f" +dependencies = [ + "lazy_static", + "regex", + "serde", + "strsim 0.10.0", +] + [[package]] name = "duct" version = "0.13.7" @@ -2345,12 +2402,13 @@ dependencies = [ "futures-util", "hdrhistogram", "humantime", + "madato", "once_cell", "rand", "rand_xoshiro", "rayon", "serde", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "thiserror 2.0.11", "tokio", "tracing", @@ -2439,7 +2497,7 @@ dependencies = [ "semver 1.0.25", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "sha2", "thiserror 2.0.11", "tokio", @@ -2522,7 +2580,7 @@ dependencies = [ "semver 1.0.25", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "sysinfo", "tar", "tempfile", @@ -2575,7 +2633,7 @@ dependencies = [ "futures-util", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "tokio", "tracing", "trybuild 1.0.103", @@ -2617,7 +2675,7 @@ dependencies = [ "openapiv3", "pretty_assertions", "serde", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "tempfile", "toml 0.8.19", "tracing", @@ -2653,7 +2711,7 @@ dependencies = [ "semver 1.0.25", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "thiserror 2.0.11", "toml 0.8.19", "tracing", @@ -2673,7 +2731,7 @@ dependencies = [ "semver 1.0.25", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "thiserror 2.0.11", "timeago", "toml 0.8.19", @@ -2749,7 +2807,7 @@ dependencies = [ "fluvio-types", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "thiserror 2.0.11", "toml 0.8.19", "tracing", @@ -2786,7 +2844,7 @@ dependencies = [ "semver 1.0.25", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "sha2", "ssh-key", "tar", @@ -2953,7 +3011,7 @@ dependencies = [ "humantime-serde", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "thiserror 2.0.11", "tracing", "wasi-common", @@ -3122,7 +3180,7 @@ dependencies = [ "parking_lot 0.12.3", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "tempfile", "tokio", "tracing", @@ -3595,7 +3653,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" dependencies = [ "fallible-iterator", - "indexmap", + "indexmap 2.7.1", "stable_deref_trait", ] @@ -3892,7 +3950,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap", + "indexmap 2.7.1", "slab", "tokio", "tokio-util", @@ -3911,7 +3969,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.2.0", - "indexmap", + "indexmap 2.7.1", "slab", "tokio", "tokio-util", @@ -4423,6 +4481,16 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.7.1" @@ -4643,7 +4711,7 @@ dependencies = [ "serde", "serde_json", "serde_qs", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "tokio", "tracing", ] @@ -4658,7 +4726,7 @@ dependencies = [ "hostfile", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.34+deprecated", "thiserror 1.0.69", "tracing", ] @@ -4870,6 +4938,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +dependencies = [ + "serde", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -4976,6 +5053,25 @@ dependencies = [ "libc", ] +[[package]] +name = "madato" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d656b04c44587180ab2c2bc1ba16327d7fc81e31a7818af22b57ddffa8272f" +dependencies = [ + "calamine", + "csv", + "docopt", + "indexmap 1.9.3", + "linked-hash-map", + "regex", + "serde", + "serde_derive", + "serde_json", + "serde_yaml 0.8.26", + "thiserror 1.0.69", +] + [[package]] name = "matchers" version = "0.1.0" @@ -5300,7 +5396,7 @@ checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" dependencies = [ "crc32fast", "hashbrown 0.15.2", - "indexmap", + "indexmap 2.7.1", "memchr", ] @@ -5331,7 +5427,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc02deea53ffe807708244e5914f6b099ad7015a207ee24317c22112e17d9c5c" dependencies = [ - "indexmap", + "indexmap 2.7.1", "serde", "serde_json", ] @@ -5951,6 +6047,16 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1190fd18ae6ce9e137184f207593877e70f39b015040156b1e05081cdfe3733a" +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "encoding_rs", + "memchr", +] + [[package]] name = "quinn" version = "0.11.6" @@ -6711,13 +6817,25 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.8.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "578a7433b776b56a35785ed5ce9a7e777ac0598aac5a6dd1b4b18a307c7fc71b" +dependencies = [ + "indexmap 1.9.3", + "ryu", + "serde", + "yaml-rust", +] + [[package]] name = "serde_yaml" version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap", + "indexmap 2.7.1", "itoa", "ryu", "serde", @@ -6992,6 +7110,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "strsim" version = "0.11.1" @@ -7409,7 +7533,7 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" dependencies = [ - "indexmap", + "indexmap 2.7.1", "serde", "serde_spanned", "toml_datetime", @@ -7438,7 +7562,7 @@ version = "0.22.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" dependencies = [ - "indexmap", + "indexmap 2.7.1", "serde", "serde_spanned", "toml_datetime", @@ -7942,7 +8066,7 @@ checksum = "9845c470a2e10b61dd42c385839cdd6496363ed63b5c9e420b5488b77bd22083" dependencies = [ "bitflags 2.8.0", "hashbrown 0.15.2", - "indexmap", + "indexmap 2.7.1", "semver 1.0.25", "serde", ] @@ -7955,7 +8079,7 @@ checksum = "65881a664fdd43646b647bb27bf186ab09c05bf56779d40aed4c6dce47d423f5" dependencies = [ "bitflags 2.8.0", "hashbrown 0.15.2", - "indexmap", + "indexmap 2.7.1", "semver 1.0.25", "serde", ] @@ -7988,7 +8112,7 @@ dependencies = [ "fxprof-processed-profile", "gimli 0.31.1", "hashbrown 0.14.5", - "indexmap", + "indexmap 2.7.1", "ittapi", "libc", "log", @@ -8115,7 +8239,7 @@ dependencies = [ "cranelift-bitset", "cranelift-entity", "gimli 0.31.1", - "indexmap", + "indexmap 2.7.1", "log", "object 0.36.7", "postcard", @@ -8220,7 +8344,7 @@ checksum = "8358319c2dd1e4db79e3c1c5d3a5af84956615343f9f89f4e4996a36816e06e6" dependencies = [ "anyhow", "heck", - "indexmap", + "indexmap 2.7.1", "wit-parser", ] @@ -8672,7 +8796,7 @@ checksum = "fbe1538eea6ea5ddbe5defd0dc82539ad7ba751e1631e9185d24a931f0a5adc8" dependencies = [ "anyhow", "id-arena", - "indexmap", + "indexmap 2.7.1", "log", "semver 1.0.25", "serde", @@ -8751,6 +8875,15 @@ dependencies = [ "time", ] +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "yansi" version = "1.0.1" @@ -8851,6 +8984,18 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "zip" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261" +dependencies = [ + "byteorder", + "crc32fast", + "crossbeam-utils", + "flate2", +] + [[package]] name = "zstd" version = "0.13.2" diff --git a/crates/fluvio-benchmark/Cargo.toml b/crates/fluvio-benchmark/Cargo.toml index d2a9524995..957b2cad65 100644 --- a/crates/fluvio-benchmark/Cargo.toml +++ b/crates/fluvio-benchmark/Cargo.toml @@ -22,6 +22,7 @@ rand = { workspace = true } rand_xoshiro = { workspace = true } rayon = { workspace = true } tokio = { workspace = true, features = ['sync', 'macros'] } +madato = { workspace = true } serde = { workspace = true , features = ['derive'] } serde_yaml = { workspace = true } thiserror = { workspace = true } diff --git a/crates/fluvio-benchmark/src/producer_benchmark.rs b/crates/fluvio-benchmark/src/producer_benchmark.rs index 50e092d064..10760798e8 100644 --- a/crates/fluvio-benchmark/src/producer_benchmark.rs +++ b/crates/fluvio-benchmark/src/producer_benchmark.rs @@ -7,6 +7,7 @@ use bytesize::ByteSize; use fluvio_future::{future::timeout, task::spawn, timer::sleep}; use fluvio::{metadata::topic::TopicSpec, FluvioAdmin}; use futures_util::{stream::FuturesUnordered, StreamExt}; +use madato::yaml::mk_md_table_from_yaml; use tokio::sync::broadcast; use tracing::debug; @@ -115,9 +116,10 @@ impl ProducerBenchmark { sleep(std::time::Duration::from_secs(1)).await; let mut latency_yaml = String::new(); latency_yaml.push_str(&format!( - "{} avg latency, {} max latency", + "latencies: {} min, {} avg, {} max", + utils::nanos_to_ms_pritable(end.latencies_histogram.min()), utils::nanos_to_ms_pritable(end.latencies_histogram.mean() as u64), - utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(1.0)) + utils::nanos_to_ms_pritable(end.latencies_histogram.max()) )); for percentile in [0.5, 0.95, 0.99] { latency_yaml.push_str(&format!( @@ -138,7 +140,33 @@ impl ProducerBenchmark { human_readable_bytes, utils::pretty_duration(end.elapsed) ); + + println!("{}", Self::to_markdown_table(&end)); + } + } + + pub fn to_markdown_table(end: &EndProducerStat) -> String { + let mut md = String::new(); + md.push('\n'); + let mut latency_yaml = "- Variable: Latency\n".to_string(); + for percentile in [0.0, 0.5, 0.95, 0.99, 1.0] { + latency_yaml.push_str(&format!( + " p{percentile:4.2}: {}\n", + utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(percentile)), + )); } + md.push_str("**Per Record E2E Latency**\n\n"); + md.push_str(&mk_md_table_from_yaml(&latency_yaml, &None)); + md.push_str("\n\n**Throughput (Total Produced Bytes / Time)**\n\n"); + let mut throughput_yaml = String::new(); + throughput_yaml.push_str("- Variable: Produced Throughput\n"); + throughput_yaml.push_str(&format!( + " Speed: \"{}/sec\"\n", + ByteSize(end.bytes_per_sec) + )); + md.push_str(&mk_md_table_from_yaml(&throughput_yaml, &None)); + md.push('\n'); + md } } diff --git a/crates/fluvio/src/producer/accumulator.rs b/crates/fluvio/src/producer/accumulator.rs index e8187b096c..fac7a67870 100644 --- a/crates/fluvio/src/producer/accumulator.rs +++ b/crates/fluvio/src/producer/accumulator.rs @@ -100,6 +100,8 @@ impl RecordAccumulator { record: Record, partition_id: PartitionId, ) -> Result { + let created_at = Instant::now(); + let batches_lock = self.batches.read().await; let (batch_events, batches_lock) = batches_lock .get(&partition_id) @@ -126,7 +128,7 @@ impl RecordAccumulator { // Create and push a new batch if needed let push_record = self - .create_and_new_batch(batch_events, &mut batches, record, 1) + .create_and_new_batch(batch_events, &mut batches, record, 1, created_at) .await?; return Ok(PushRecord::new( @@ -143,7 +145,7 @@ impl RecordAccumulator { // Create and push a new batch if needed let push_record = self - .create_and_new_batch(batch_events, &mut batches, record, 1) + .create_and_new_batch(batch_events, &mut batches, record, 1, created_at) .await?; Ok(PushRecord::new( @@ -177,6 +179,7 @@ impl RecordAccumulator { batches: &mut VecDeque, record: Record, attempts: usize, + created_at: Instant, ) -> Result { if attempts > 2 { // This should never happen, but if it does, we should stop the recursion @@ -185,8 +188,12 @@ impl RecordAccumulator { )); } - let mut batch = - ProducerBatch::new(self.max_request_size, self.batch_size, self.compression); + let mut batch = ProducerBatch::new( + self.max_request_size, + self.batch_size, + self.compression, + created_at, + ); match batch.push_record(record) { Ok(ProduceBatchStatus::Added(push_record)) => { @@ -206,8 +213,14 @@ impl RecordAccumulator { batches.push_back(batch); // Box the future to avoid infinite size due to recursion - Box::pin(self.create_and_new_batch(batch_events, batches, record, attempts + 1)) - .await + Box::pin(self.create_and_new_batch( + batch_events, + batches, + record, + attempts + 1, + created_at, + )) + .await } Err(err) => Err(err), } @@ -254,9 +267,14 @@ pub(crate) struct ProducerBatch { batch: MemoryBatch, } impl ProducerBatch { - fn new(write_limit: usize, batch_limit: usize, compression: Compression) -> Self { + fn new( + write_limit: usize, + batch_limit: usize, + compression: Compression, + created_at: Instant, + ) -> Self { let (sender, receiver) = async_channel::bounded(1); - let batch_metadata = Arc::new(BatchMetadata::new(receiver)); + let batch_metadata = Arc::new(BatchMetadata::new(receiver, Some(created_at))); let batch = MemoryBatch::new(write_limit, batch_limit, compression); Self { @@ -415,6 +433,7 @@ mod test { + Batch::::default().write_size(0) + Vec::::default().write_size(0), Compression::None, + Instant::now(), ); assert!(matches!( @@ -450,6 +469,7 @@ mod test { + Batch::::default().write_size(0) + Vec::::default().write_size(0), Compression::None, + Instant::now(), ); assert!(matches!( @@ -487,6 +507,7 @@ mod test { + Batch::::default().write_size(0) + Vec::::default().write_size(0), Compression::None, + Instant::now(), ); assert!(matches!( diff --git a/crates/fluvio/src/producer/record.rs b/crates/fluvio/src/producer/record.rs index 82f7780d7a..b3b9cd03b6 100644 --- a/crates/fluvio/src/producer/record.rs +++ b/crates/fluvio/src/producer/record.rs @@ -50,10 +50,17 @@ pub(crate) struct BatchMetadata { } impl BatchMetadata { - pub(crate) fn new(receiver: Receiver) -> Self { + pub(crate) fn new( + receiver: Receiver, + created_at: Option, + ) -> Self { Self { state: RwLock::new(BatchMetadataState::Buffered(receiver)), - created_at: Instant::now(), + created_at: if let Some(created_at) = created_at { + created_at + } else { + Instant::now() + }, } }