Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added endpoint to see caching metrics from grafana/prometheus #1678

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions rust/rsc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/rsc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ mime = "0.3.17"
rlimit = "0.10.1"
tracing-appender = "0.2.3"
sysinfo = "0.31.2"
prometheus = "0.13.4"
lazy_static = "1.5.0"
17 changes: 17 additions & 0 deletions rust/rsc/src/bin/rsc/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use axum::{
extract::{DefaultBodyLimit, Multipart},
routing::{get, post},
Router,
response::IntoResponse,
};
use chrono::Utc;
use clap::Parser;
use itertools::Itertools;
use migration::{Migrator, MigratorTrait};
use prometheus::{Encoder, TextEncoder};
use rlimit::Resource;
use rsc::database;
use sea_orm::{prelude::Uuid, ConnectOptions, Database, DatabaseConnection};
Expand Down Expand Up @@ -106,6 +108,20 @@ async fn check_version(check: axum::extract::Query<VersionCheck>) -> axum::http:
return axum::http::StatusCode::OK;
}

async fn metrics_handler() -> impl IntoResponse {
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = Vec::new();
if let Err(err) = encoder.encode(&metric_families, &mut buffer) {
tracing::error! {%err, "Failed to encode metrics"};
}

(
axum::http::StatusCode::OK,
String::from_utf8(buffer).unwrap_or_default()
)
}

fn create_router(
conn: Arc<DatabaseConnection>,
config: Arc<config::RSCConfig>,
Expand Down Expand Up @@ -169,6 +185,7 @@ fn create_router(
move || dashboard::stats(conn)
}),
)
.route("/metrics", get(metrics_handler))
.route(
"/job/matching",
post({
Expand Down
58 changes: 57 additions & 1 deletion rust/rsc/src/bin/rsc/read_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,49 @@ use sea_orm::{
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tracing;
use std::time::Instant;
use lazy_static::lazy_static;
use prometheus::{register_counter, register_histogram, Counter, Histogram};

lazy_static! {
Copy link
Contributor Author

@AbrarQuazi AbrarQuazi Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prometheus should be able to hit the /metrics endpoint and scrape data for the following pomql queries to show in grafana dashboards:

// query for average hits per second over specified times
rate(cache_hits[5m]) #or alternatively total hits over specified time: increase(cache_hits[5m])

// query for average misses per second over specified times
rate(cache_misses[5m])

// query for hit ratio over specified time
sum(increase(cache_hits[5m])) / sum(increase(cache_hits[5m]) + increase(cache_misses[5m]))

// query for rolling average latency
rate(hit_latency_ms_sum[5m]) / rate(hit_latency_ms_count[5m])
rate(miss_latency_ms_sum[5m]) / rate(miss_latency_ms_count[5m])

/// Counts how many cache hits we've had.
pub static ref CACHE_HITS: Counter = register_counter!(
"cache_hits",
"Number of cache hits"
).unwrap();

/// Counts how many cache misses we've had.
pub static ref CACHE_MISSES: Counter = register_counter!(
"cache_misses",
"Number of cache misses"
).unwrap();

/// Tracks latencies (in milliseconds) for read_job requests.
pub static ref HIT_LATENCY_MS: Histogram = register_histogram!(
"hit_latency_ms",
"Histogram of cache hit latencies in milliseconds"
).unwrap();

pub static ref MISS_LATENCY_MS: Histogram = register_histogram!(
"miss_latency_ms",
"Histogram of cache miss latencies in milliseconds"
).unwrap();
}


#[tracing::instrument(skip_all)]
fn update_hit_counters(start_time: Instant) {
CACHE_HITS.inc();
let elapsed_ms = start_time.elapsed().as_millis() as f64;
HIT_LATENCY_MS.observe(elapsed_ms);
}

#[tracing::instrument(skip_all)]
fn update_miss_counters(start_time: Instant) {
CACHE_MISSES.inc();
let elapsed_ms = start_time.elapsed().as_millis() as f64;
MISS_LATENCY_MS.observe(elapsed_ms);
}

#[tracing::instrument(skip(hash, conn))]
async fn record_hit(job_id: Uuid, hash: String, conn: Arc<DatabaseConnection>) {
Expand Down Expand Up @@ -92,6 +135,8 @@ pub async fn read_job(
conn: Arc<DatabaseConnection>,
blob_stores: HashMap<Uuid, Arc<dyn blob::DebugBlobStore + Sync + Send>>,
) -> (StatusCode, Json<ReadJobResponse>) {
let start = Instant::now();

let hash = payload.hash();
let hash_for_spawns = hash.clone();

Expand Down Expand Up @@ -123,6 +168,9 @@ pub async fn read_job(
tokio::spawn(async move {
record_miss(hash_copy, conn.clone()).await;
});

update_miss_counters(start);

return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
};

Expand All @@ -136,6 +184,9 @@ pub async fn read_job(
Ok(map) => map,
Err(err) => {
tracing::error!(%err, "Failed to resolve blobs. Resolving job as a cache miss.");

update_miss_counters(start);

return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
};
Expand All @@ -161,6 +212,9 @@ pub async fn read_job(
Ok(files) => files,
Err(err) => {
tracing::error!(%err, "Failed to resolve all output files. Resolving job as a cache miss.");

update_miss_counters(start);

return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
};
Expand Down Expand Up @@ -188,6 +242,7 @@ pub async fn read_job(
Some(blob) => blob.clone(),
None => {
tracing::error!("Failed to resolve stdout blob. Resolving job as a cache miss.");
update_miss_counters(start);
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
};
Expand All @@ -196,6 +251,7 @@ pub async fn read_job(
Some(blob) => blob.clone(),
None => {
tracing::error!("Failed to resolve stderr blob. Resolving job as a cache miss.");
update_miss_counters(start);
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
};
Expand All @@ -221,7 +277,7 @@ pub async fn read_job(
tokio::spawn(async move {
record_hit(job_id, hash_copy, conn.clone()).await;
});

update_hit_counters(start);
(StatusCode::OK, Json(response))
}

Expand Down
Loading