From 19d2b8a1ecead389a15726cf74a6dfb3816f3619 Mon Sep 17 00:00:00 2001 From: Abrar Quazi Date: Tue, 25 Feb 2025 15:13:28 -0800 Subject: [PATCH] Added endpoint to see caching metrics from grafana/prometheus --- rust/rsc/Cargo.lock | 23 +++++++++++++ rust/rsc/Cargo.toml | 2 ++ rust/rsc/src/bin/rsc/main.rs | 17 ++++++++++ rust/rsc/src/bin/rsc/read_job.rs | 58 +++++++++++++++++++++++++++++++- 4 files changed, 99 insertions(+), 1 deletion(-) diff --git a/rust/rsc/Cargo.lock b/rust/rsc/Cargo.lock index e132ba1be..1f417de08 100644 --- a/rust/rsc/Cargo.lock +++ b/rust/rsc/Cargo.lock @@ -2174,6 +2174,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "ptr_meta" version = "0.1.4" @@ -2417,9 +2438,11 @@ dependencies = [ "inquire", "is-terminal", "itertools", + "lazy_static", "migration", "mime", "papergrid", + "prometheus", "rand", "rlimit", "sea-orm", diff --git a/rust/rsc/Cargo.toml b/rust/rsc/Cargo.toml index bffb8ba31..6498b2bc2 100644 --- a/rust/rsc/Cargo.toml +++ b/rust/rsc/Cargo.toml @@ -35,3 +35,5 @@ mime = "0.3.17" rlimit = "0.10.1" tracing-appender = "0.2.3" sysinfo = "0.31.2" +prometheus = "0.13.4" +lazy_static = "1.5.0" diff --git a/rust/rsc/src/bin/rsc/main.rs b/rust/rsc/src/bin/rsc/main.rs index a16cc173f..ffa40e065 100644 --- a/rust/rsc/src/bin/rsc/main.rs +++ b/rust/rsc/src/bin/rsc/main.rs @@ -2,11 +2,13 @@ use axum::{ extract::{DefaultBodyLimit, Multipart}, routing::{get, post}, Router, + response::IntoResponse, }; use chrono::Utc; use clap::Parser; use itertools::Itertools; use migration::{Migrator, MigratorTrait}; +use prometheus::{Encoder, TextEncoder}; use rlimit::Resource; use rsc::database; use sea_orm::{prelude::Uuid, ConnectOptions, Database, DatabaseConnection}; @@ -106,6 +108,20 @@ async fn check_version(check: axum::extract::Query) -> axum::http: return axum::http::StatusCode::OK; } +async fn metrics_handler() -> impl IntoResponse { + let encoder = TextEncoder::new(); + let metric_families = prometheus::gather(); + let mut buffer = Vec::new(); + if let Err(err) = encoder.encode(&metric_families, &mut buffer) { + tracing::error! {%err, "Failed to encode metrics"}; + } + + ( + axum::http::StatusCode::OK, + String::from_utf8(buffer).unwrap_or_default() + ) +} + fn create_router( conn: Arc, config: Arc, @@ -169,6 +185,7 @@ fn create_router( move || dashboard::stats(conn) }), ) + .route("/metrics", get(metrics_handler)) .route( "/job/matching", post({ diff --git a/rust/rsc/src/bin/rsc/read_job.rs b/rust/rsc/src/bin/rsc/read_job.rs index ffd42e914..e59b92eca 100644 --- a/rust/rsc/src/bin/rsc/read_job.rs +++ b/rust/rsc/src/bin/rsc/read_job.rs @@ -17,6 +17,49 @@ use sea_orm::{ use std::collections::HashMap; use std::sync::{Arc, RwLock}; use tracing; +use std::time::Instant; +use lazy_static::lazy_static; +use prometheus::{register_counter, register_histogram, Counter, Histogram}; + +lazy_static! { + /// Counts how many cache hits we've had. + pub static ref CACHE_HITS: Counter = register_counter!( + "cache_hits", + "Number of cache hits" + ).unwrap(); + + /// Counts how many cache misses we've had. + pub static ref CACHE_MISSES: Counter = register_counter!( + "cache_misses", + "Number of cache misses" + ).unwrap(); + + /// Tracks latencies (in milliseconds) for read_job requests. + pub static ref HIT_LATENCY_MS: Histogram = register_histogram!( + "hit_latency_ms", + "Histogram of cache hit latencies in milliseconds" + ).unwrap(); + + pub static ref MISS_LATENCY_MS: Histogram = register_histogram!( + "miss_latency_ms", + "Histogram of cache miss latencies in milliseconds" + ).unwrap(); +} + + +#[tracing::instrument(skip_all)] +fn update_hit_counters(start_time: Instant) { + CACHE_HITS.inc(); + let elapsed_ms = start_time.elapsed().as_millis() as f64; + HIT_LATENCY_MS.observe(elapsed_ms); +} + +#[tracing::instrument(skip_all)] +fn update_miss_counters(start_time: Instant) { + CACHE_MISSES.inc(); + let elapsed_ms = start_time.elapsed().as_millis() as f64; + MISS_LATENCY_MS.observe(elapsed_ms); +} #[tracing::instrument(skip(hash, conn))] async fn record_hit(job_id: Uuid, hash: String, conn: Arc) { @@ -92,6 +135,8 @@ pub async fn read_job( conn: Arc, blob_stores: HashMap>, ) -> (StatusCode, Json) { + let start = Instant::now(); + let hash = payload.hash(); let hash_for_spawns = hash.clone(); @@ -123,6 +168,9 @@ pub async fn read_job( tokio::spawn(async move { record_miss(hash_copy, conn.clone()).await; }); + + update_miss_counters(start); + return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); }; @@ -136,6 +184,9 @@ pub async fn read_job( Ok(map) => map, Err(err) => { tracing::error!(%err, "Failed to resolve blobs. Resolving job as a cache miss."); + + update_miss_counters(start); + return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); } }; @@ -161,6 +212,9 @@ pub async fn read_job( Ok(files) => files, Err(err) => { tracing::error!(%err, "Failed to resolve all output files. Resolving job as a cache miss."); + + update_miss_counters(start); + return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); } }; @@ -188,6 +242,7 @@ pub async fn read_job( Some(blob) => blob.clone(), None => { tracing::error!("Failed to resolve stdout blob. Resolving job as a cache miss."); + update_miss_counters(start); return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); } }; @@ -196,6 +251,7 @@ pub async fn read_job( Some(blob) => blob.clone(), None => { tracing::error!("Failed to resolve stderr blob. Resolving job as a cache miss."); + update_miss_counters(start); return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); } }; @@ -221,7 +277,7 @@ pub async fn read_job( tokio::spawn(async move { record_hit(job_id, hash_copy, conn.clone()).await; }); - + update_hit_counters(start); (StatusCode::OK, Json(response)) }