Skip to content

Commit

Permalink
fix(metrics): Actually emit set metrics (#1606)
Browse files Browse the repository at this point in the history
#1600 added aggregation for set metrics
but neglected to actually emit them.
  • Loading branch information
loewenheim authored Jan 30, 2025
1 parent 9886b44 commit 1fd7ec6
Showing 1 changed file with 45 additions and 4 deletions.
49 changes: 45 additions & 4 deletions crates/symbolicator-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
loop {
thread::sleep(SEND_INTERVAL);

let (total_counters, total_distributions) = aggregate_all(&aggregators);
let (total_counters, total_distributions, total_sets) = aggregate_all(&aggregators);

// send all the aggregated "counter like" metrics
for (AggregationKey { ty, name, tags }, value) in total_counters {
Expand Down Expand Up @@ -143,6 +143,33 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L

suffix.clear();
}

for (AggregationKey { ty, name, tags }, value) in total_sets {
suffix.push_str(&formatted_global_tags);
if let Some(tags) = tags {
if formatted_global_tags.is_empty() {
suffix.push_str("|#");
} else {
suffix.push(',');
}
suffix.push_str(&tags);
}

for value in value {
formatted_metric.push_str(&prefix);
formatted_metric.push_str(name);

let _ = write!(&mut formatted_metric, ":{value}");

formatted_metric.push_str(ty);
formatted_metric.push_str(&suffix);

let _ = sink.emit(&formatted_metric);
formatted_metric.clear();
}

suffix.clear();
}
}
};

Expand All @@ -154,16 +181,20 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
local_aggregators
}

fn aggregate_all(aggregators: &LocalAggregators) -> (AggregatedCounters, AggregatedDistributions) {
fn aggregate_all(
aggregators: &LocalAggregators,
) -> (AggregatedCounters, AggregatedDistributions, AggregatedSets) {
let mut total_counters = AggregatedCounters::default();
let mut total_distributions = AggregatedDistributions::default();
let mut total_sets = AggregatedSets::default();

for local_aggregator in aggregators.iter() {
let (local_counters, local_distributions) = {
let (local_counters, local_distributions, local_sets) = {
let mut local_aggregator = local_aggregator.lock().unwrap();
(
std::mem::take(&mut local_aggregator.aggregated_counters),
std::mem::take(&mut local_aggregator.aggregated_distributions),
std::mem::take(&mut local_aggregator.aggregated_sets),
)
};

Expand Down Expand Up @@ -194,9 +225,19 @@ fn aggregate_all(aggregators: &LocalAggregators) -> (AggregatedCounters, Aggrega
aggregated_value.extend(value);
}
}

// aggregate all the "set like" metrics
if total_sets.is_empty() {
total_sets = local_sets;
} else {
for (key, value) in local_sets {
let aggregated_value = total_sets.entry(key).or_default();
aggregated_value.extend(value);
}
}
}

(total_counters, total_distributions)
(total_counters, total_distributions, total_sets)
}

/// The key by which we group/aggregate metrics.
Expand Down

0 comments on commit 1fd7ec6

Please sign in to comment.