diff --git a/crates/symbolicator-service/src/metrics.rs b/crates/symbolicator-service/src/metrics.rs index 0b0a836d8..570614ee9 100644 --- a/crates/symbolicator-service/src/metrics.rs +++ b/crates/symbolicator-service/src/metrics.rs @@ -89,7 +89,7 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L loop { thread::sleep(SEND_INTERVAL); - let (total_counters, total_distributions) = aggregate_all(&aggregators); + let (total_counters, total_distributions, total_sets) = aggregate_all(&aggregators); // send all the aggregated "counter like" metrics for (AggregationKey { ty, name, tags }, value) in total_counters { @@ -143,6 +143,33 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L suffix.clear(); } + + for (AggregationKey { ty, name, tags }, value) in total_sets { + suffix.push_str(&formatted_global_tags); + if let Some(tags) = tags { + if formatted_global_tags.is_empty() { + suffix.push_str("|#"); + } else { + suffix.push(','); + } + suffix.push_str(&tags); + } + + for value in value { + formatted_metric.push_str(&prefix); + formatted_metric.push_str(name); + + let _ = write!(&mut formatted_metric, ":{value}"); + + formatted_metric.push_str(ty); + formatted_metric.push_str(&suffix); + + let _ = sink.emit(&formatted_metric); + formatted_metric.clear(); + } + + suffix.clear(); + } } }; @@ -154,16 +181,20 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L local_aggregators } -fn aggregate_all(aggregators: &LocalAggregators) -> (AggregatedCounters, AggregatedDistributions) { +fn aggregate_all( + aggregators: &LocalAggregators, +) -> (AggregatedCounters, AggregatedDistributions, AggregatedSets) { let mut total_counters = AggregatedCounters::default(); let mut total_distributions = AggregatedDistributions::default(); + let mut total_sets = AggregatedSets::default(); for local_aggregator in aggregators.iter() { - let (local_counters, local_distributions) = { + let (local_counters, local_distributions, local_sets) = { let mut local_aggregator = local_aggregator.lock().unwrap(); ( std::mem::take(&mut local_aggregator.aggregated_counters), std::mem::take(&mut local_aggregator.aggregated_distributions), + std::mem::take(&mut local_aggregator.aggregated_sets), ) }; @@ -194,9 +225,19 @@ fn aggregate_all(aggregators: &LocalAggregators) -> (AggregatedCounters, Aggrega aggregated_value.extend(value); } } + + // aggregate all the "set like" metrics + if total_sets.is_empty() { + total_sets = local_sets; + } else { + for (key, value) in local_sets { + let aggregated_value = total_sets.entry(key).or_default(); + aggregated_value.extend(value); + } + } } - (total_counters, total_distributions) + (total_counters, total_distributions, total_sets) } /// The key by which we group/aggregate metrics.