Skip to content

Commit

Permalink
feat(telemeter/openmetrics): expose UNITs of registered metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Apr 9, 2024
1 parent 00d246b commit 2893a42
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- coop: prefer a time-based budgeting if the telemetry is enabled.
- proxy: add `Proxy::try_send_to()` and `Proxy::request_to()`.
- telemeter: support gzip.
- telemeter/openmetrics: expose units of registered metrics.

### Changed
- telemeter: a new sharded-by-threads storage, it increases perf and dramatically reduces contention.
Expand Down
7 changes: 6 additions & 1 deletion elfo-telemeter/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::sync::Arc;

use fxhash::FxHashMap;
use metrics::Key;
use metrics::{Key, Unit};
use sketches_ddsketch::{Config as DDSketchConfig, DDSketch};
use tracing::warn;

Expand All @@ -26,6 +26,11 @@ pub(crate) struct GetSnapshot;

pub(crate) type GaugeEpoch = u64;

pub(crate) struct Description {
pub(crate) details: Option<&'static str>,
pub(crate) unit: Option<Unit>,
}

/// Actual values of all metrics.
#[derive(Default, Clone)]
pub struct Snapshot {
Expand Down
9 changes: 4 additions & 5 deletions elfo-telemeter/src/render.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use fxhash::FxHashMap;
use metrics::Label;

use self::openmetrics::OpenMetricsRenderer;
use crate::{
config::{Config, Quantile},
protocol::Snapshot,
protocol::{Description, Snapshot},
};

use self::openmetrics::OpenMetricsRenderer;

mod openmetrics;

#[derive(Default)]
Expand All @@ -19,7 +18,7 @@ pub(crate) struct Renderer {

struct RenderOptions<'a> {
quantiles: &'a [(Quantile, Label)],
descriptions: &'a FxHashMap<String, &'static str>,
descriptions: &'a FxHashMap<String, Description>,
global_labels: &'a [Label],
}

Expand All @@ -45,7 +44,7 @@ impl Renderer {
pub(crate) fn render(
&mut self,
snapshot: &Snapshot,
descriptions: &FxHashMap<String, &'static str>,
descriptions: &FxHashMap<String, Description>,
) -> String {
let options = RenderOptions {
quantiles: &self.quantiles,
Expand Down
42 changes: 27 additions & 15 deletions elfo-telemeter/src/render/openmetrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use fxhash::FxHashSet;
use metrics::{Key, Label};

use super::RenderOptions;
use crate::protocol::{Distribution, Metrics, Snapshot};
use crate::protocol::{Description, Distribution, Metrics, Snapshot};

#[derive(Default)]
pub(super) struct OpenMetricsRenderer {
Expand Down Expand Up @@ -47,12 +47,12 @@ fn render(
for ((kind, original_name), by_labels) in group_by_name(snapshot) {
let name = &*sanitize_name(original_name);

write_type_line(buffer, name, kind);

if let Some(desc) = options.descriptions.get(original_name) {
write_help_line(buffer, name, desc);
}

write_type_line(buffer, name, kind);

for (meta, value) in by_labels {
let actor_group_label = meta
.actor_group
Expand Down Expand Up @@ -114,6 +114,8 @@ fn render(

buffer.push('\n');
}

buffer.push_str("# EOF\n");
}

type GroupedData<'a> = BTreeMap<(MetricKind, &'a str), BTreeMap<MetricMeta<'a>, MetricValue<'a>>>;
Expand Down Expand Up @@ -145,8 +147,8 @@ fn group_by_name(snapshot: &Snapshot) -> GroupedData<'_> {
);
}

for (group, per_group) in &snapshot.groupwise {
for (key, value, kind) in iter_metrics(per_group) {
for (group, groupwise) in &snapshot.groupwise {
for (key, value, kind) in iter_metrics(groupwise) {
data.entry((kind, key.name())).or_default().insert(
MetricMeta {
actor_group: Some(group),
Expand All @@ -158,8 +160,8 @@ fn group_by_name(snapshot: &Snapshot) -> GroupedData<'_> {
}
}

for (actor_meta, per_actor) in &snapshot.actorwise {
for (key, value, kind) in iter_metrics(per_actor) {
for (actor_meta, actorwise) in &snapshot.actorwise {
for (key, value, kind) in iter_metrics(actorwise) {
data.entry((kind, key.name())).or_default().insert(
MetricMeta {
actor_group: Some(&actor_meta.group),
Expand Down Expand Up @@ -191,14 +193,6 @@ fn iter_metrics(metrics: &Metrics) -> impl Iterator<Item = (&Key, MetricValue<'_
c.chain(g).chain(d)
}

fn write_help_line(buffer: &mut String, name: &str, desc: &str) {
buffer.push_str("# HELP ");
buffer.push_str(name);
buffer.push(' ');
buffer.push_str(desc);
buffer.push('\n');
}

fn write_type_line(buffer: &mut String, name: &str, kind: MetricKind) {
buffer.push_str("# TYPE ");
buffer.push_str(name);
Expand All @@ -211,6 +205,24 @@ fn write_type_line(buffer: &mut String, name: &str, kind: MetricKind) {
buffer.push('\n');
}

fn write_help_line(buffer: &mut String, name: &str, desc: &Description) {
if let Some(unit) = &desc.unit {
buffer.push_str("# UNIT ");
buffer.push_str(name);
buffer.push(' ');
buffer.push_str(unit.as_str());
buffer.push('\n');
}

if let Some(details) = &desc.details {
buffer.push_str("# HELP ");
buffer.push_str(name);
buffer.push(' ');
buffer.push_str(details); // TODO: escape
buffer.push('\n');
}
}

fn write_metric_line<'a, V: Display>(
buffer: &mut String,
name: &'a str,
Expand Down
24 changes: 9 additions & 15 deletions elfo-telemeter/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use elfo_core::{coop, scope::Scope, ActorMeta, Addr};

use crate::{
metrics::{Counter, Gauge, GaugeOrigin, Histogram, MetricKind},
protocol::{Metrics, Snapshot},
protocol::{Description, Metrics, Snapshot},
};

// === Scopes ===
Expand Down Expand Up @@ -137,7 +137,7 @@ pub(crate) struct Storage {
shards: ThreadLocal<Shard>,
// Shared gauge origins between shards. See `Gauge` for more details.
gauge_shared: GaugeShared,
descriptions: Mutex<FxHashMap<String, &'static str>>,
descriptions: Mutex<FxHashMap<String, Description>>,
}

#[derive(Default)]
Expand Down Expand Up @@ -205,23 +205,17 @@ impl Default for Storage {
}

impl Storage {
pub(crate) fn descriptions(&self) -> MutexGuard<'_, FxHashMap<String, &'static str>> {
pub(crate) fn descriptions(&self) -> MutexGuard<'_, FxHashMap<String, Description>> {
self.descriptions.lock()
}

// TODO: use `unit`
pub(crate) fn describe(
&self,
key: &Key,
_unit: Option<Unit>,
description: Option<&'static str>,
) {
if let Some(description) = description {
let mut descriptions = self.descriptions.lock();
if !descriptions.contains_key(key.name().to_string().as_str()) {
descriptions.insert(key.name().to_string(), description);
}
pub(crate) fn describe(&self, key: &Key, unit: Option<Unit>, details: Option<&'static str>) {
if unit.is_none() && details.is_none() {
return;
}

let mut descriptions = self.descriptions.lock();
descriptions.insert(key.name().to_string(), Description { details, unit });
}

pub(crate) fn upsert<S, M>(&self, scope: &S::Scope, key: &Key, value: M::Value)
Expand Down
6 changes: 5 additions & 1 deletion examples/examples/usage/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ mod reporter {
prelude::*,
time::Interval,
};
use metrics::increment_counter;
use metrics::{increment_counter, register_counter, Unit};
use serde::{Deserialize, Serialize};

use crate::protocol::*;
Expand All @@ -203,6 +203,10 @@ mod reporter {
struct SummarizeTick;

pub fn new() -> Blueprint {
// Optionally, register metrics to provide additional information.
// It's not required, unregistered metrics are published anyway.
register_counter!("ticks_total", Unit::Count, "Total number of ticks");

ActorGroup::new().config::<Config>().exec(reporter)
}

Expand Down

0 comments on commit 2893a42

Please sign in to comment.