Skip to content

Commit

Permalink
Prometheus pull metrics exporter (closes #275)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Aug 8, 2024
1 parent 0330743 commit bba371c
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 16 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ opentelemetry = { version = "0.24" }
opentelemetry_sdk = { version = "0.24" }
opentelemetry-otlp = { version = "0.17", features = ["http-proto", "reqwest-client"] }
opentelemetry-semantic-conventions = { version = "0.16.0" }
prometheus = { version = "0.13.4", default-features = false }
imagesize = "0.13"
sha1 = "0.10"
sha2 = "0.10.6"
Expand Down
26 changes: 22 additions & 4 deletions crates/common/src/config/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,15 @@ pub struct Tracers {

#[derive(Debug, Clone, Default)]
pub struct Metrics {
pub prometheus: bool,
pub prometheus: Option<PrometheusMetrics>,
pub otel: Option<Arc<OtelMetrics>>,
}

#[derive(Debug, Clone, Default)]
pub struct PrometheusMetrics {
pub auth: Option<String>,
}

impl Telemetry {
pub fn parse(config: &mut Config) -> Self {
let mut telemetry = Telemetry {
Expand Down Expand Up @@ -553,12 +558,25 @@ impl Tracers {
impl Metrics {
pub fn parse(config: &mut Config) -> Self {
let mut metrics = Metrics {
prometheus: config
.property_or_default("metrics.prometheus.enable", "true")
.unwrap_or(true),
prometheus: None,
otel: None,
};

if config
.property_or_default("metrics.prometheus.enable", "false")
.unwrap_or(false)
{
metrics.prometheus = Some(PrometheusMetrics {
auth: config
.value("metrics.prometheus.auth.username")
.and_then(|user| {
config
.value("metrics.prometheus.auth.secret")
.map(|secret| STANDARD.encode(format!("{user}:{secret}")))
}),
});
}

if config
.property_or_default("metrics.open-telemetry.enable", "false")
.unwrap_or(false)
Expand Down
1 change: 1 addition & 0 deletions crates/common/src/telemetry/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
*/

pub mod otel;
pub mod prometheus;
124 changes: 124 additions & 0 deletions crates/common/src/telemetry/metrics/prometheus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <[email protected]>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

use prometheus::{
proto::{Bucket, Counter, Gauge, Histogram, Metric, MetricFamily, MetricType},
TextEncoder,
};
use trc::{atomic::AtomicHistogram, collector::Collector};

use crate::Core;

impl Core {
pub async fn export_prometheus_metrics(&self) -> trc::Result<String> {
let mut metrics = Vec::new();

#[cfg(feature = "enterprise")]
let is_enterprise = self.is_enterprise_edition();

#[cfg(not(feature = "enterprise"))]
let is_enterprise = false;

// Add counters
for counter in Collector::collect_counters(is_enterprise) {
let mut metric = MetricFamily::default();
metric.set_name(metric_name(counter.id()));
metric.set_help(counter.description().into());
metric.set_field_type(MetricType::COUNTER);
metric.set_metric(vec![new_counter(counter.get())]);
metrics.push(metric);
}

// Add event counters
for counter in Collector::collect_event_counters(is_enterprise) {
let mut metric = MetricFamily::default();
metric.set_name(metric_name(counter.id()));
metric.set_help(counter.description().into());
metric.set_field_type(MetricType::COUNTER);
metric.set_metric(vec![new_counter(counter.value())]);
metrics.push(metric);
}

// Add gauges
for gauge in Collector::collect_gauges(is_enterprise) {
let mut metric = MetricFamily::default();
metric.set_name(metric_name(gauge.id()));
metric.set_help(gauge.description().into());
metric.set_field_type(MetricType::GAUGE);
metric.set_metric(vec![new_gauge(gauge.get())]);
metrics.push(metric);
}

// Add histograms
for histogram in Collector::collect_histograms(is_enterprise) {
let mut metric = MetricFamily::default();
metric.set_name(metric_name(histogram.id()));
metric.set_help(histogram.description().into());
metric.set_field_type(MetricType::HISTOGRAM);
metric.set_metric(vec![new_histogram(histogram)]);
metrics.push(metric);
}

TextEncoder::new()
.encode_to_string(&metrics)
.map_err(|e| trc::EventType::Telemetry(trc::TelemetryEvent::OtelExpoterError).reason(e))
}
}

fn metric_name(id: impl AsRef<str>) -> String {
let id = id.as_ref();
let mut name = String::with_capacity(id.len());
for c in id.chars() {
if c.is_ascii_alphanumeric() {
name.push(c);
} else {
name.push('_');
}
}
name
}

fn new_counter(value: u64) -> Metric {
let mut m = Metric::default();
let mut counter = Counter::default();
counter.set_value(value as f64);
m.set_counter(counter);
m
}

fn new_gauge(value: u64) -> Metric {
let mut m = Metric::default();
let mut gauge = Gauge::default();
gauge.set_value(value as f64);
m.set_gauge(gauge);
m
}

fn new_histogram(histogram: &AtomicHistogram<12>) -> Metric {
let mut m = Metric::default();
let mut h = Histogram::default();
h.set_sample_count(histogram.count());
h.set_sample_sum(histogram.sum() as f64);
h.set_bucket(
histogram
.buckets_iter()
.into_iter()
.zip(histogram.upper_bounds_iter())
.map(|(count, upper_bound)| {
let mut b = Bucket::default();
b.set_cumulative_count(count);
b.set_upper_bound(if upper_bound != u64::MAX {
upper_bound as f64
} else {
f64::INFINITY
});
b
})
.collect(),
);
m.set_histogram(h);
m
}
29 changes: 28 additions & 1 deletion crates/jmap/src/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use jmap_proto::{
};

use crate::{
auth::oauth::OAuthMetadata,
auth::{authenticate::HttpHeaders, oauth::OAuthMetadata},
blob::{DownloadResponse, UploadResponse},
services::state,
JmapInstance, JMAP,
Expand Down Expand Up @@ -322,6 +322,33 @@ impl JMAP {
}
_ => (),
},
"metrics" => match path.next().unwrap_or_default() {
"prometheus" => {
if let Some(prometheus) = &self.core.metrics.prometheus {
if let Some(auth) = &prometheus.auth {
if req
.authorization_basic()
.map_or(true, |secret| secret != auth)
{
return Err(trc::AuthEvent::Failed
.into_err()
.details("Invalid or missing credentials.")
.caused_by(trc::location!()));
}
}

return Ok(Resource {
content_type: "text/plain; version=0.0.4",
contents: self.core.export_prometheus_metrics().await?.into_bytes(),
}
.into_http_response());
}
}
"otel" => {
// Reserved for future use
}
_ => (),
},
_ => {
let path = req.uri().path();
let resource = self
Expand Down
41 changes: 30 additions & 11 deletions crates/jmap/src/auth/authenticate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,8 @@ impl JMAP {
req: &hyper::Request<hyper::body::Incoming>,
session: &HttpSessionData,
) -> trc::Result<(InFlight, Arc<AccessToken>)> {
if let Some((mechanism, token)) = req
.headers()
.get(header::AUTHORIZATION)
.and_then(|h| h.to_str().ok())
.and_then(|h| h.split_once(' ').map(|(l, t)| (l, t.trim().to_string())))
{
let access_token = if let Some(account_id) = self.inner.sessions.get_with_ttl(&token) {
if let Some((mechanism, token)) = req.authorization() {
let access_token = if let Some(account_id) = self.inner.sessions.get_with_ttl(token) {
self.get_cached_access_token(account_id).await?
} else {
let access_token = if mechanism.eq_ignore_ascii_case("basic") {
Expand All @@ -56,15 +51,15 @@ impl JMAP {
return Err(trc::AuthEvent::Error
.into_err()
.details("Failed to decode Basic auth request.")
.id(token)
.id(token.to_string())
.caused_by(trc::location!()));
}
} else if mechanism.eq_ignore_ascii_case("bearer") {
// Enforce anonymous rate limit for bearer auth requests
self.is_anonymous_allowed(&session.remote_ip).await?;

let (account_id, _, _) =
self.validate_access_token("access_token", &token).await?;
self.validate_access_token("access_token", token).await?;

self.get_access_token(account_id).await?
} else {
Expand All @@ -73,13 +68,13 @@ impl JMAP {
return Err(trc::AuthEvent::Error
.into_err()
.reason("Unsupported authentication mechanism.")
.details(token)
.details(token.to_string())
.caused_by(trc::location!()));
};

// Cache session
let access_token = Arc::new(access_token);
self.cache_session(token, &access_token);
self.cache_session(token.to_string(), &access_token);
self.cache_access_token(access_token.clone());
access_token
};
Expand Down Expand Up @@ -186,3 +181,27 @@ impl JMAP {
}
}
}

pub trait HttpHeaders {
fn authorization(&self) -> Option<(&str, &str)>;
fn authorization_basic(&self) -> Option<&str>;
}

impl HttpHeaders for hyper::Request<hyper::body::Incoming> {
fn authorization(&self) -> Option<(&str, &str)> {
self.headers()
.get(header::AUTHORIZATION)
.and_then(|h| h.to_str().ok())
.and_then(|h| h.split_once(' ').map(|(l, t)| (l, t.trim())))
}

fn authorization_basic(&self) -> Option<&str> {
self.authorization().and_then(|(l, t)| {
if l.eq_ignore_ascii_case("basic") {
Some(t)
} else {
None
}
})
}
}
4 changes: 4 additions & 0 deletions crates/trc/src/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ impl<const N: usize> AtomicHistogram<N> {
vec
}

pub fn buckets_len(&self) -> usize {
N
}

pub fn upper_bounds_iter(&self) -> impl IntoIterator<Item = u64> + '_ {
self.upper_bounds.iter().copied()
}
Expand Down
1 change: 1 addition & 0 deletions crates/trc/src/imple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,7 @@ impl TelemetryEvent {
TelemetryEvent::JournalError => "Journal collector error",
TelemetryEvent::OtelExpoterError => "OpenTelemetry exporter error",
TelemetryEvent::OtelMetricsExporterError => "OpenTelemetry metrics exporter error",
TelemetryEvent::PrometheusExporterError => "Prometheus exporter error",
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/trc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ pub enum TelemetryEvent {
WebhookError,
OtelExpoterError,
OtelMetricsExporterError,
PrometheusExporterError,
JournalError,
}

Expand Down

0 comments on commit bba371c

Please sign in to comment.