From 81052d1c56b26710107f6f4d95ba1c06b5426670 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Thu, 3 Oct 2024 18:34:40 -0400 Subject: [PATCH] dekaf: Expose more useful api histogram buckets --- Cargo.lock | 110 ++++++++++++++++++++++------- Cargo.toml | 2 +- crates/dekaf/Cargo.toml | 2 +- crates/dekaf/src/main.rs | 4 +- crates/dekaf/src/metrics_server.rs | 26 ++++--- 5 files changed, 103 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa5709896f..4f43ff7060 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1836,7 +1836,8 @@ dependencies = [ "lz4_flex", "md5", "metrics", - "metrics-prometheus", + "metrics-exporter-prometheus", + "models", "ops", "percent-encoding", "postgrest", @@ -1847,7 +1848,7 @@ dependencies = [ "regex", "rsasl", "rustls 0.23.10", - "rustls-native-certs", + "rustls-native-certs 0.7.2", "rustls-pemfile 2.1.3", "serde", "serde_json", @@ -2944,6 +2945,25 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.4.1", + "hyper-util", + "log", + "rustls 0.23.10", + "rustls-native-certs 0.8.0", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", +] + [[package]] name = "hyper-timeout" version = "0.5.1" @@ -3439,7 +3459,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if 1.0.0", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -3655,19 +3675,24 @@ dependencies = [ ] [[package]] -name = "metrics-prometheus" -version = "0.7.0" +name = "metrics-exporter-prometheus" +version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51557a875fdbd5b953b698ecd6cd06efef47618e02d95ad912e2392e5b5617ff" +checksum = "b4f0c8427b39666bf970460908b213ec09b3b350f20c0c2eabcbba51704a08e6" dependencies = [ - "arc-swap", + "base64 0.22.1", + "http-body-util", + "hyper 1.4.1", + "hyper-rustls 0.27.3", + "hyper-util", + "indexmap 2.3.0", + "ipnet", "metrics", "metrics-util", - "once_cell", - "prometheus", - "sealed", - "smallvec", + "quanta", "thiserror", + "tokio", + "tracing", ] [[package]] @@ -3681,6 +3706,8 @@ dependencies = [ "hashbrown 0.14.5", "metrics", "num_cpus", + "quanta", + "sketches-ddsketch", ] [[package]] @@ -4754,6 +4781,21 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quickcheck" version = "1.0.3" @@ -4916,6 +4958,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -5066,7 +5117,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", - "hyper-rustls", + "hyper-rustls 0.24.2", "ipnet", "js-sys", "log", @@ -5313,6 +5364,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.3", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -5494,18 +5558,6 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" -[[package]] -name = "sealed" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4a8caec23b7800fb97971a1c6ae365b6239aaeddfb934d6265f8505e795699d" -dependencies = [ - "heck 0.4.1", - "proc-macro2", - "quote", - "syn 2.0.74", -] - [[package]] name = "security-framework" version = "2.11.1" @@ -5801,6 +5853,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fed904c7fb2856d868b92464fc8fa597fce366edea1a9cbfaa8cb5fe080bd6d" +[[package]] +name = "sketches-ddsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" + [[package]] name = "slab" version = "0.4.9" @@ -6459,7 +6517,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "rustls-native-certs", + "rustls-native-certs 0.7.2", "rustls-pemfile 2.1.3", "socket2", "tokio", @@ -6671,7 +6729,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index 61dcd1dfe7..f7aa5b2dc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,7 +86,7 @@ lz4_flex = "0.11.0" mime = "0.3" memchr = "2.5" metrics = "0.23.0" -metrics-prometheus = "0.7.0" +metrics-exporter-prometheus = "0.15.3" prometheus = "0.13.4" md5 = "0.7.0" num-bigint = "0.4" diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index c94317aa81..11073f23bc 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -41,7 +41,7 @@ kafka-protocol = { workspace = true } lz4_flex = { workspace = true } md5 = { workspace = true } metrics = { workspace = true } -metrics-prometheus = { workspace = true } +metrics-exporter-prometheus = { workspace = true } percent-encoding = { workspace = true } postgrest = { workspace = true } prometheus = { workspace = true } diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index 2c40f7fcfd..df999d6bf6 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -106,8 +106,6 @@ async fn main() -> anyhow::Result<()> { .with_writer(std::io::stderr) .init(); - metrics_prometheus::install(); - let cli = Cli::parse(); tracing::info!("Starting dekaf"); @@ -159,7 +157,7 @@ async fn main() -> anyhow::Result<()> { .await .context("failed to bind server port")?; - let metrics_router = dekaf::metrics_server::build_router(app.clone()); + let metrics_router = dekaf::metrics_server::build_router(); let metrics_server_task = axum_server::bind(metrics_addr).serve(metrics_router.into_make_service()); tokio::spawn(async move { metrics_server_task.await.unwrap() }); diff --git a/crates/dekaf/src/metrics_server.rs b/crates/dekaf/src/metrics_server.rs index 607e44d17c..fffefa5354 100644 --- a/crates/dekaf/src/metrics_server.rs +++ b/crates/dekaf/src/metrics_server.rs @@ -1,13 +1,21 @@ -use super::App; -use std::sync::Arc; +use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; -pub fn build_router(app: Arc) -> axum::Router<()> { +pub fn build_router() -> axum::Router<()> { use axum::routing::get; + let prom = PrometheusBuilder::new() + .set_buckets( + &prometheus::exponential_buckets(0.00001, 2.5, 15) + .expect("calculating histogram buckets"), + ) + .expect("calculating histogram buckets") + .install_recorder() + .expect("failed to install prometheus recorder"); + let schema_router = axum::Router::new() .route("/metrics", get(prometheus_metrics)) .layer(tower_http::trace::TraceLayer::new_for_http()) - .with_state(app); + .with_state(prom); schema_router } @@ -23,12 +31,10 @@ fn record_jemalloc_stats() { } #[tracing::instrument(skip_all)] -async fn prometheus_metrics() -> (axum::http::StatusCode, String) { +async fn prometheus_metrics( + axum::extract::State(prom_handle): axum::extract::State, +) -> (axum::http::StatusCode, String) { record_jemalloc_stats(); - match prometheus::TextEncoder::new().encode_to_string(&prometheus::default_registry().gather()) - { - Err(e) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), - Ok(result) => (axum::http::StatusCode::OK, result), - } + (axum::http::StatusCode::OK, prom_handle.render()) }