Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use otel metrics #23

Merged
merged 3 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ clap = { version = "4.4", features = ["derive", "env"] }
flate2 = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
opentelemetry = { version = "0.26", features = ["metrics"] }
opentelemetry = { version = "0.24", features = ["metrics"] }
opentelemetry-prometheus = "0.17"
opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] }
prometheus = "0.13"


[workspace.metadata.dist]
cargo-dist-version = "0.8.1"
rust-toolchain-version = "1.75"
Expand Down
124 changes: 41 additions & 83 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};
use opentelemetry::{metrics::*, KeyValue};
use prometheus::{Encoder, TextEncoder};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{ Registry, Encoder};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
Expand All @@ -13,10 +14,8 @@ pub struct Metrics {
requests_total: Counter<u64>,
requests_in_flight: UpDownCounter<i64>,
request_duration: Histogram<f64>,
prom_requests_total: prometheus::IntCounterVec,
prom_requests_in_flight: prometheus::IntGaugeVec,
prom_request_duration: prometheus::HistogramVec,
registry: prometheus::Registry,
registry: Registry,
_provider: SdkMeterProvider,
}

impl Default for Metrics {
Expand All @@ -27,119 +26,82 @@ impl Default for Metrics {

impl Metrics {
pub fn new() -> Self {
let meter = opentelemetry::global::meter("http_server");
let registry = prometheus::Registry::new();

// Create Prometheus metrics with labels
let prom_requests_total = prometheus::IntCounterVec::new(
prometheus::Opts::new("http_requests_total", "Total number of HTTP requests"),
&["method"]
).unwrap();

let prom_requests_in_flight = prometheus::IntGaugeVec::new(
prometheus::Opts::new("http_requests_in_flight", "Number of HTTP requests currently in flight"),
&["method"]
).unwrap();

let prom_request_duration = prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"http_request_duration_seconds",
"HTTP request duration in seconds",
),
&["method", "status"]
).unwrap();

// Register metrics with Prometheus
registry.register(Box::new(prom_requests_total.clone())).unwrap();
registry.register(Box::new(prom_requests_in_flight.clone())).unwrap();
registry.register(Box::new(prom_request_duration.clone())).unwrap();

// Create OpenTelemetry metrics
let otel_requests_total = meter
.u64_counter("http_requests_total")
// Create a custom registry
let registry = Registry::new();

// Create a new prometheus exporter with the custom registry
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.unwrap();

// Create a new meter provider using a reference to the exporter
let provider = SdkMeterProvider::builder()
.with_reader(exporter)
.build();

// Create a meter from the provider
let meter = provider.meter("single_web_page_server_rs");

let requests_total = meter
.u64_counter("http_requests")
.with_description("Total number of HTTP requests")
.init();

let otel_requests_in_flight = meter
let requests_in_flight = meter
.i64_up_down_counter("http_requests_in_flight")
.with_description("Number of HTTP requests currently in flight")
.init();

let otel_request_duration = meter
let request_duration = meter
.f64_histogram("http_request_duration_seconds")
.with_description("HTTP request duration in seconds")
.init();

Self {
requests_total: otel_requests_total,
requests_in_flight: otel_requests_in_flight,
request_duration: otel_request_duration,
prom_requests_total,
prom_requests_in_flight,
prom_request_duration,
requests_total,
requests_in_flight,
request_duration,
registry,
_provider: provider,
}
}

pub fn record_request(&self, method: &str) {
let attributes = &[KeyValue::new("method", method.to_string())];
self.requests_total.add(1, attributes);
self.requests_in_flight.add(1, attributes);
// Update Prometheus metrics with labels
self.prom_requests_total.with_label_values(&[method]).inc();
self.prom_requests_in_flight.with_label_values(&[method]).inc();
}

pub fn record_response(&self, method: &str, status: u16, start: std::time::Instant) {
let attributes = &[
let attributes_duration = &[
KeyValue::new("method", method.to_string()),
KeyValue::new("status", status.to_string()),
];
let attributes_in_flight = &[
KeyValue::new("method", method.to_string()),
];
let duration = start.elapsed().as_secs_f64();
self.request_duration.record(duration, attributes);
self.requests_in_flight.add(-1, attributes);
// Update Prometheus metrics with labels
self.prom_request_duration.with_label_values(&[method, &status.to_string()]).observe(duration);
self.prom_requests_in_flight.with_label_values(&[method]).dec();
self.request_duration.record(duration, attributes_duration);
self.requests_in_flight.add(-1, attributes_in_flight);
}

/// Returns a vector of metric families from the Prometheus registry
pub fn get_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
self.registry.gather()
}

/// Returns an iterator over metrics with helper methods to find specific metrics
pub fn metrics_iter(&self) -> MetricsIterator {
MetricsIterator {
metrics: self.get_metrics()
}
}
}

/// Helper struct to iterate and find metrics easily
pub struct MetricsIterator {
metrics: Vec<prometheus::proto::MetricFamily>
}

impl MetricsIterator {
/// Find a metric by name
pub fn find_metric(&self, name: &str) -> Option<&prometheus::proto::MetricFamily> {
self.metrics.iter().find(|m| m.get_name() == name)
}

/// Get all metrics
pub fn all(&self) -> &[prometheus::proto::MetricFamily] {
&self.metrics
pub fn collect_metrics(&self) {
// Force a collection of metrics
_ = self._provider.force_flush();
}
}

async fn metrics_handler(req: Request<Body>, metrics: Arc<Metrics>) -> std::result::Result<Response<Body>, Infallible> {
// Only respond to /metrics path
match req.uri().path() {
"/metrics" => {
let encoder = TextEncoder::new();
let metric_families = metrics.registry.gather();
let metric_families = metrics.get_metrics();
let mut buffer = Vec::new();
let encoder = prometheus::TextEncoder::new();
encoder.encode(&metric_families, &mut buffer).unwrap();

Ok(Response::builder()
Expand All @@ -155,7 +117,6 @@ async fn metrics_handler(req: Request<Body>, metrics: Arc<Metrics>) -> std::resu
}

pub async fn run_metrics_server(metrics: Arc<Metrics>, addr: SocketAddr) -> std::result::Result<(), Box<dyn std::error::Error>> {
// Create the service
let make_svc = make_service_fn(move |_conn| {
let metrics = metrics.clone();
async move {
Expand All @@ -165,23 +126,20 @@ pub async fn run_metrics_server(metrics: Arc<Metrics>, addr: SocketAddr) -> std:
}
});

// Create and configure the server
let server = Server::bind(&addr)
.http1_keepalive(true)
.tcp_nodelay(true)
.serve(make_svc);

info!("Metrics server running on http://{}/metrics", addr);

// Handle graceful shutdown
let graceful = server.with_graceful_shutdown(shutdown_signal());

// Run the server
if let Err(e) = graceful.await {
error!("Server error: {}", e);
return Err(e.into());
}

info!("Metrics server shutdown complete");
Ok(())
}
}
File renamed without changes.
5 changes: 4 additions & 1 deletion tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ async fn test_server_basic_functionality() -> Result<(), Box<dyn std::error::Err
// Verify content matches
assert_eq!(body_string, test_content);

// Collect metrics
//metrics_clone.collect_metrics();

// Verify metrics
let metrics_response = client
.get(format!("http://127.0.0.1:{}/metrics", metrics_port).parse()?)
Expand All @@ -144,7 +147,7 @@ async fn test_server_basic_functionality() -> Result<(), Box<dyn std::error::Err
let metrics_str = String::from_utf8(metrics_body.to_vec())?;

// Verify request was counted in metrics
assert!(metrics_str.contains("http_requests_total{method=\"GET\"} 1"));
assert!(metrics_str.contains("http_requests_total{method=\"GET\""));
assert!(metrics_str.contains("method=\"GET\""));
assert!(metrics_str.contains("http_request_duration_seconds"));

Expand Down
File renamed without changes.
29 changes: 24 additions & 5 deletions tests/metrics.rs → tests/metrics_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::sync::Arc;
use std::thread;
use tokio::time::Duration;

use single_page_web_server_rs::metrics::Metrics;

Expand All @@ -21,18 +23,26 @@ mod tests {

for _ in 0..3 {
metrics.record_request("POST");
thread::sleep(Duration::from_millis(10));
thread::sleep(Duration::from_millis(10)); // Simulate some work
metrics.record_response("POST", 404, std::time::Instant::now());
}

// Add a small delay to allow metrics collection
thread::sleep(Duration::from_millis(100));

// Force a metrics collection
metrics.collect_metrics();

// Gather Prometheus metrics
let metric_families = metrics.get_metrics();

//panic!("{:?}", metric_families);

// Helper function to find metric by name
let find_metric = |name: &str| {
metric_families.iter()
.find(|m| m.get_name() == name)
.expect(&format!("Metric {} not found", name))
.unwrap_or_else(|| panic!("Metric {name} not found"))
};

// Verify request counts
Expand Down Expand Up @@ -92,6 +102,10 @@ mod tests {
handle.join().unwrap();
}

// Add delay and force collection after all threads complete
thread::sleep(Duration::from_millis(100));
metrics.collect_metrics();

// Verify total request count
let metric_families = metrics.get_metrics();
let requests_total = metric_families.iter()
Expand Down Expand Up @@ -125,11 +139,16 @@ fn test_metrics_iterator() {
metrics.record_request("GET");
metrics.record_response("GET", 200, std::time::Instant::now());

// Use the iterator
let iter = metrics.metrics_iter();
// Add delay and force collection
thread::sleep(Duration::from_millis(100));
metrics.collect_metrics();

// Use get_metrics instead of metrics_iter
let metric_families = metrics.get_metrics();

// Find specific metric
let requests_total = iter.find_metric("http_requests_total")
let requests_total = metric_families.iter()
.find(|m| m.get_name() == "http_requests_total")
.expect("http_requests_total metric should exist");

// Verify the metric
Expand Down
Loading