Skip to content

Commit

Permalink
feat(telemeter): emit storage metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Apr 9, 2024
1 parent 2893a42 commit f1c28bf
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- proxy: add `Proxy::try_send_to()` and `Proxy::request_to()`.
- telemeter: support gzip.
- telemeter/openmetrics: expose units of registered metrics.
- telemeter: the `elfo_metrics_storage_shards` gauge metric.

### Changed
- telemeter: a new sharded-by-threads storage, it increases perf and dramatically reduces contention.
Expand Down
6 changes: 4 additions & 2 deletions elfo-telemeter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod hyper;
mod metrics;
mod recorder;
mod render;
mod stats;
mod storage;

#[cfg(feature = "unstable")]
Expand All @@ -43,8 +44,9 @@ pub fn init() -> Blueprint {
let recorder = Recorder::new(storage.clone());
let blueprint = actor::new(storage);

if let Err(err) = ::metrics::set_boxed_recorder(Box::new(recorder)) {
error!(error = %err, "failed to set a metric recorder");
match ::metrics::set_boxed_recorder(Box::new(recorder)) {
Ok(_) => stats::register(),
Err(err) => error!(error = %err, "failed to set a metric recorder"),
}

blueprint
Expand Down
3 changes: 2 additions & 1 deletion elfo-telemeter/src/metrics/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ impl MetricKind for Counter {
self.0 += value;
}

fn merge(self, out: &mut Self::Output) {
fn merge(self, out: &mut Self::Output) -> usize {
*out += self.0;
0
}
}
5 changes: 4 additions & 1 deletion elfo-telemeter/src/metrics/gauge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl MetricKind for Gauge {
// are still available for updates, the same as the shared state (origin).
//
// However, all shards are merged consecutively.
fn merge(self, (out_value, out_epoch): &mut Self::Output) {
fn merge(self, (out_value, out_epoch): &mut Self::Output) -> usize {
let (last_absolute, current_epoch) = self.origin.get();

// The epoch is always monotonically increasing.
Expand All @@ -80,6 +80,9 @@ impl MetricKind for Gauge {
if current_epoch == self.epoch {
*out_value += self.delta;
}

// It's inaccurate because the same origin can be accounted multiple times.
std::mem::size_of::<GaugeOrigin>()
}
}

Expand Down
10 changes: 8 additions & 2 deletions elfo-telemeter/src/metrics/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@ impl MetricKind for Histogram {
self.0.push(value);
}

fn merge(self, out: &mut Self::Output) {
for segment in self.0.into_segments() {
fn merge(self, out: &mut Self::Output) -> usize {
let segments = self.0.into_segments();
let mut additional_size = segments.capacity() * mem::size_of::<Vec<f64>>();

for segment in segments {
out.add(&segment);
additional_size += segment.capacity() * mem::size_of::<f64>();
}

additional_size
}
}

Expand Down
4 changes: 3 additions & 1 deletion elfo-telemeter/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ pub(crate) trait MetricKind: Sized {

fn new(shared: Self::Shared) -> Self;
fn update(&mut self, value: Self::Value);
fn merge(self, out: &mut Self::Output);

/// Returns additional size to add to metrics.
fn merge(self, out: &mut Self::Output) -> usize;
}
96 changes: 96 additions & 0 deletions elfo-telemeter/src/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::mem;

use fxhash::FxHashMap;
use metrics::{gauge, register_gauge, Unit};

pub(crate) fn register() {
register_gauge!(
"elfo_metrics_usage_bytes",
Unit::Bytes,
"Total size occupied by metrics"
);
register_gauge!(
"elfo_metrics_storage_shards",
Unit::Count,
"The number of storage shards"
);
}

// The total size estimator.
pub(crate) struct StorageStats {
shards_total: u32,
shards_active: u32,
total_size: usize,
}

impl StorageStats {
pub(crate) fn new<T>() -> Self {
Self {
shards_total: 0,
shards_active: 0,
total_size: mem::size_of::<T>(),
}
}

pub(crate) fn add_shard(&mut self, stats: &ShardStats) {
self.shards_total += 1;
self.shards_active += stats.has_metrics as u32;
self.total_size += stats.size;
}

pub(crate) fn add_descriptions<K, V>(&mut self, registry: &FxHashMap<K, V>) {
self.total_size += estimate_hashbrown_size::<(K, V)>(registry.capacity());
}

pub(crate) fn emit(&self) {
let shards_inactive = self.shards_total - self.shards_active;
gauge!("elfo_metrics_usage_bytes", self.total_size as f64, "object" => "Storage");
gauge!("elfo_metrics_storage_shards", self.shards_active as f64, "status" => "Active");
gauge!("elfo_metrics_storage_shards", shards_inactive as f64, "status" => "Inactive");
}
}

pub(crate) struct ShardStats {
has_metrics: bool,
size: usize,
}

impl ShardStats {
pub(crate) fn new<T>() -> Self {
Self {
has_metrics: false,
size: mem::size_of::<T>(),
}
}

pub(crate) fn add_registry<K, V>(&mut self, registry: &FxHashMap<K, V>) {
self.has_metrics |= !registry.is_empty();
self.size += estimate_hashbrown_size::<(K, V)>(registry.capacity());
}

pub(crate) fn add_additional_size(&mut self, size: usize) {
self.size += size;
}
}

// From the `datasize` crate.
fn estimate_hashbrown_size<T>(capacity: usize) -> usize {
// https://github.com/rust-lang/hashbrown/blob/v0.12.3/src/raw/mod.rs#L185
let buckets = if capacity < 8 {
if capacity < 4 {
4
} else {
8
}
} else {
(capacity * 8 / 7).next_power_of_two()
};
// https://github.com/rust-lang/hashbrown/blob/v0.12.3/src/raw/mod.rs#L242
let size = mem::size_of::<T>();
// `Group` is u32, u64, or __m128i depending on the CPU architecture.
// Return a lower bound, ignoring its constant contributions
// (through ctrl_align and Group::WIDTH, at most 31 bytes).
let ctrl_offset = size * buckets;
// Add one byte of "control" metadata per bucket
ctrl_offset + buckets
}
40 changes: 29 additions & 11 deletions elfo-telemeter/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use elfo_core::{coop, scope::Scope, ActorMeta, Addr};
use crate::{
metrics::{Counter, Gauge, GaugeOrigin, Histogram, MetricKind},
protocol::{Description, Metrics, Snapshot},
stats::{ShardStats, StorageStats},
};

// === Scopes ===
Expand Down Expand Up @@ -237,13 +238,27 @@ impl Storage {
}

pub(crate) async fn merge(&self, snapshot: &mut Snapshot, only_compact: bool) {
let mut storage_stats = StorageStats::new::<Self>();

if !only_compact {
storage_stats.add_descriptions(&*self.descriptions.lock());
}

for shard in self.shards.iter() {
self.merge_registries::<GlobalScope>(shard, snapshot, only_compact)
let mut stats = ShardStats::new::<Shard>();

self.merge_registries::<GlobalScope>(shard, snapshot, only_compact, &mut stats)
.await;
self.merge_registries::<GroupScope>(shard, snapshot, only_compact)
self.merge_registries::<GroupScope>(shard, snapshot, only_compact, &mut stats)
.await;
self.merge_registries::<ActorScope>(shard, snapshot, only_compact)
self.merge_registries::<ActorScope>(shard, snapshot, only_compact, &mut stats)
.await;

storage_stats.add_shard(&stats);
}

if !only_compact {
storage_stats.emit();
}
}

Expand All @@ -252,22 +267,25 @@ impl Storage {
shard: &Shard,
snapshot: &mut Snapshot,
only_compact: bool,
stats: &mut ShardStats,
) {
let registries = S::registries(shard);

if !only_compact {
self.merge_registry::<S, Counter>(registries, snapshot)
self.merge_registry::<S, Counter>(registries, snapshot, stats)
.await;
self.merge_registry::<S, Gauge>(registries, snapshot, stats)
.await;
self.merge_registry::<S, Gauge>(registries, snapshot).await;
}
self.merge_registry::<S, Histogram>(registries, snapshot)
self.merge_registry::<S, Histogram>(registries, snapshot, stats)
.await;
}

async fn merge_registry<S: ScopeKind, M: Storable>(
&self,
registries: &Registries<S>,
snapshot: &mut Snapshot,
stats: &mut ShardStats,
) {
let registry = M::registry(registries);

Expand All @@ -281,22 +299,22 @@ impl Storage {
mem::replace(&mut *registry, empty)
};

// TODO: stats
// elfo_metrics_usage_bytes{object="Storage|Snapshot"}
// elfo_metrics_storage_shards{status="Inactive|Active"} GAUGE
// elfo_metrics{kind="Counter|Gauge|Histogram"} GAUGE
stats.add_registry(&registry);

for (_, entry) in registry.into_iter() {
let metrics = S::snapshot(snapshot, &entry.meta);
let out = M::snapshot(metrics, &entry.key);
entry.data.merge(out);
let additional_size = entry.data.merge(out);
stats.add_additional_size(additional_size);
}

// The merge process can be quite long, so we should be preemptive.
coop::consume_budget().await;
}
}

// === Storable ===

pub(crate) trait Storable: MetricKind {
fn registry<S: ScopeKind>(registries: &Registries<S>) -> &Mutex<Registry<S, Self>>;
fn shared<S: ScopeKind>(storage: &Storage, key: S::Key) -> Self::Shared;
Expand Down

0 comments on commit f1c28bf

Please sign in to comment.