diff --git a/Cargo.lock b/Cargo.lock index 31699aa65f..c82e0f6ae1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3068,6 +3068,7 @@ name = "metric_unify" version = "0.1.0" dependencies = [ "clap", + "itertools 0.13.0", "num-format", "serde", "serde_json", diff --git a/ci/scripts/metric_unify/Cargo.toml b/ci/scripts/metric_unify/Cargo.toml index c57d5b6e8f..8c93d39fe9 100644 --- a/ci/scripts/metric_unify/Cargo.toml +++ b/ci/scripts/metric_unify/Cargo.toml @@ -8,3 +8,4 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } clap = { version = "4.0", features = ["derive"] } num-format = "0.4" +itertools = { workspace = true } diff --git a/ci/scripts/metric_unify/src/aggregate.rs b/ci/scripts/metric_unify/src/aggregate.rs new file mode 100644 index 0000000000..28176d9a26 --- /dev/null +++ b/ci/scripts/metric_unify/src/aggregate.rs @@ -0,0 +1,72 @@ +use std::{collections::HashSet, path::Path}; + +use crate::metric::{AggregationEntry, AggregationFile, AggregationOperation, Metric}; + +/// Load aggregation metrics from a file +pub fn load_aggregation_metrics>( + aggregation_file_path: P, +) -> Result, Box> { + let file = std::fs::File::open(aggregation_file_path)?; + let aggregation_file: AggregationFile = serde_json::from_reader(file)?; + Ok(aggregation_file.aggregations) +} + +/// Generate aggregation tables +pub fn aggregate_metrics(agg_entries: Vec, metrics: Vec) -> Vec { + let mut results = Vec::new(); + for agg_entry in agg_entries { + let group_by = &agg_entry.group_by; + let name = &agg_entry.name; + + // 1. Filter metrics by group_by(primary_labels) and name(metric_name) + let filtered_metrics_by_primary_labels: Vec<_> = metrics + .iter() + .filter(|m| group_by.iter().all(|g| m.primary_labels.contains(g)) && name == &m.name) + .collect(); + if filtered_metrics_by_primary_labels.is_empty() { + continue; + } + + // 2. Group filtered_metrics by secondary_labels + let secondary_labels_set: HashSet> = filtered_metrics_by_primary_labels + .iter() + .map(|m| m.secondary_labels.clone()) + .collect(); + let grouped_metrics_by_secondary_labels = + secondary_labels_set.into_iter().map(|secondary_labels| { + filtered_metrics_by_primary_labels + .iter() + .filter(|m| m.secondary_labels == secondary_labels) + .collect::>() + }); + + // 3. Aggregate metrics by secondary_labels and operation + let aggregated_metrics: Vec = grouped_metrics_by_secondary_labels + .map(|grouped_metrics| { + let secondary_labels = grouped_metrics[0].secondary_labels.clone(); + let aggregated_value: f64 = + grouped_metrics + .into_iter() + .fold(0.0, |acc, m| match agg_entry.operation { + AggregationOperation::Sum => acc + m.value, + AggregationOperation::Unique => { + assert!(acc == 0.0 || acc == m.value); + m.value + } + }); + Metric { + name: name.to_string(), + primary_labels: group_by.clone(), + secondary_labels, + value: aggregated_value, + ..Default::default() + } + }) + .collect(); + + // 4. Generate table + results.extend(aggregated_metrics); + } + + results +} diff --git a/ci/scripts/metric_unify/src/diff.rs b/ci/scripts/metric_unify/src/diff.rs new file mode 100644 index 0000000000..7034f7018e --- /dev/null +++ b/ci/scripts/metric_unify/src/diff.rs @@ -0,0 +1,21 @@ +use crate::metric::Metric; + +pub fn diff_metrics(news: Vec, olds: Vec) -> Vec { + let mut results = Vec::with_capacity(news.len()); + for new in news { + let old = olds + .iter() + .find(|old| **old == new && old.value != new.value); + if let Some(old) = old { + results.push(Metric { + diff: Some(new.value - old.value), + diff_percentage: Some((new.value - old.value) / old.value * 100.0), + ..new + }); + } else { + results.push(new.clone()); + } + } + + results +} diff --git a/ci/scripts/metric_unify/src/main.rs b/ci/scripts/metric_unify/src/main.rs index 338c71c883..177fe59e30 100644 --- a/ci/scripts/metric_unify/src/main.rs +++ b/ci/scripts/metric_unify/src/main.rs @@ -1,362 +1,87 @@ -mod types; - -use std::collections::HashMap; - -use clap::Parser; -use serde::{Deserialize, Serialize}; - -use crate::types::{Labels, Metric, MetricDb, MetricsFile}; +mod aggregate; +mod diff; +mod markdown; +mod metric; +mod summary; +mod util; + +use aggregate::{aggregate_metrics, load_aggregation_metrics}; +use clap::{Parser, Subcommand}; +use diff::diff_metrics; +use markdown::Tables; +use summary::load_summary_metrics; +use util::to_tables; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { - /// Path to the metrics JSON file - #[arg(value_name = "METRICS_JSON")] - metrics_json: String, - - /// Path to the aggregation JSON file - #[arg(long, value_name = "AGGREGATION_JSON")] - aggregation_json: Option, + #[command(subcommand)] + command: Commands, } -#[derive(Debug, Serialize, Deserialize)] -struct AggregationFile { - aggregations: Vec, +#[derive(Subcommand, Debug)] +enum Commands { + /// Generate summary tables + Summary { + /// Path to the metrics JSON file + #[arg(long, value_name = "METRICS_JSON", value_parser = clap::value_parser!(std::path::PathBuf))] + metrics_json: std::path::PathBuf, + }, + /// Generate aggregated tables + Aggregate { + /// Path to the metrics JSON file + #[arg(long, value_name = "METRICS_JSON", value_parser = clap::value_parser!(std::path::PathBuf))] + metrics_json: std::path::PathBuf, + + /// Path to the aggregation JSON file + #[arg(long, value_name = "AGGREGATION_JSON", value_parser = clap::value_parser!(std::path::PathBuf))] + aggregation_json: std::path::PathBuf, + }, + /// Compare metrics with previous metrics and show differences + Diff { + /// Path to the current metrics JSON file + #[arg(long, value_name = "METRICS_JSON", value_parser = clap::value_parser!(std::path::PathBuf))] + metrics_json: std::path::PathBuf, + + /// Path to the previous metrics JSON file + #[arg(long, value_name = "PREV_METRICS_JSON", value_parser = clap::value_parser!(std::path::PathBuf))] + prev_metrics_json: std::path::PathBuf, + }, } -#[derive(Debug, Clone, Serialize, Deserialize)] -struct Aggregation { - name: String, - group_by: Vec, - metrics: Vec, - operation: String, -} - -impl MetricDb { - fn new(metrics_file: &str) -> Result> { - let file = std::fs::File::open(metrics_file)?; - let metrics: MetricsFile = serde_json::from_reader(file)?; - - let mut db = MetricDb::default(); - - // Process counters - for entry in metrics.counter { - if entry.value == 0.0 { - continue; - } - let labels = Labels::from(entry.labels); - db.add_to_flat_dict(labels, entry.metric, entry.value); - } +fn main() -> Result<(), Box> { + let args = Args::parse(); - // Process gauges - for entry in metrics.gauge { - let labels = Labels::from(entry.labels); - db.add_to_flat_dict(labels, entry.metric, entry.value); + match args.command { + Commands::Summary { metrics_json } => { + let summary = load_summary_metrics(&metrics_json)?; + let tables = to_tables(summary); + println!("\n
\nDetailed Metrics\n\n"); + println!("{}", Tables::from(tables)); + println!("
\n\n"); } - - db.separate_by_label_types(); - - Ok(db) - } - - fn add_to_flat_dict(&mut self, labels: Labels, metric: String, value: f64) { - self.flat_dict - .entry(labels) - .or_default() - .push(Metric::new(metric, value)); - } - - // Custom sorting function that ensures 'group' comes first. - // Other keys are sorted alphabetically. - fn custom_sort_label_keys(label_keys: &mut [String]) { - // Prioritize 'group' by giving it the lowest possible sort value - label_keys.sort_by_key(|key| { - if key == "group" { - (0, key.clone()) // Lowest priority for 'group' - } else { - (1, key.clone()) // Normal priority for other keys - } - }); - } - - fn separate_by_label_types(&mut self) { - self.dict_by_label_types.clear(); - - for (labels, metrics) in &self.flat_dict { - // Get sorted label keys - let mut label_keys: Vec = labels.0.iter().map(|(key, _)| key.clone()).collect(); - Self::custom_sort_label_keys(&mut label_keys); - - // Create label_values based on sorted keys - let label_dict: HashMap = labels.0.iter().cloned().collect(); - - let label_values: Vec = label_keys - .iter() - .map(|key| label_dict.get(key).unwrap().clone()) - .collect(); - - // Add to dict_by_label_types - self.dict_by_label_types - .entry(label_keys) - .or_default() - .entry(label_values) - .or_default() - .extend(metrics.clone()); + Commands::Aggregate { + metrics_json, + aggregation_json, + } => { + let summary = load_summary_metrics(&metrics_json)?; + let aggregations = load_aggregation_metrics(&aggregation_json)?; + let aggregated_metrics = aggregate_metrics(aggregations, summary); + let tables = to_tables(aggregated_metrics); + println!("{}", Tables::from(tables)); } - } - - fn generate_markdown_tables(&self) -> String { - let mut markdown_output = String::new(); - // Get sorted keys to iterate in consistent order - let mut sorted_keys: Vec<_> = self.dict_by_label_types.keys().cloned().collect(); - sorted_keys.sort(); - - for label_keys in sorted_keys { - let metrics_dict = &self.dict_by_label_types[&label_keys]; - let mut metric_names: Vec = metrics_dict - .values() - .flat_map(|metrics| metrics.iter().map(|m| m.name.clone())) - .collect::>() - .into_iter() - .collect(); - metric_names.sort_by(|a, b| b.cmp(a)); - - // Create table header - let header = format!( - "| {} | {} |", - label_keys.join(" | "), - metric_names.join(" | ") - ); - - let separator = "| ".to_string() - + &vec!["---"; label_keys.len() + metric_names.len()].join(" | ") - + " |"; - - markdown_output.push_str(&header); - markdown_output.push('\n'); - markdown_output.push_str(&separator); - markdown_output.push('\n'); - - // Fill table rows - for (label_values, metrics) in metrics_dict { - let mut row = String::new(); - row.push_str("| "); - row.push_str(&label_values.join(" | ")); - row.push_str(" | "); - - // Add metric values - for metric_name in &metric_names { - let metric_value = metrics - .iter() - .find(|m| &m.name == metric_name) - .map(|m| Self::format_number(m.value)) - .unwrap_or_default(); - - row.push_str(&format!("{} | ", metric_value)); - } - - markdown_output.push_str(&row); - markdown_output.push('\n'); - } - - markdown_output.push('\n'); + Commands::Diff { + metrics_json, + prev_metrics_json, + } => { + let summary = load_summary_metrics(&metrics_json)?; + let prev_summary = load_summary_metrics(&prev_metrics_json)?; + let diff_matrics = diff_metrics(summary, prev_summary); + let tables = to_tables(diff_matrics); + println!("\n
\nDetailed Metrics\n\n"); + println!("{}", Tables::from(tables)); + println!("
\n\n"); } - - markdown_output } - - fn generate_aggregation_tables(&self, aggregations: &[Aggregation]) -> String { - let mut markdown_output = String::new(); - let group_tuple = vec!["group".to_string()]; - - // Get metrics grouped by "group" label - if let Some(metrics_dict) = self.dict_by_label_types.get(&group_tuple) { - let mut group_to_metrics: HashMap> = HashMap::new(); - - // Collect metrics for each group - for (group_values, metrics) in metrics_dict { - let group_name = &group_values[0]; - let agg_metrics: Vec = metrics - .iter() - .filter(|metric| aggregations.iter().any(|a| a.name == metric.name)) - .cloned() - .collect(); - - if !agg_metrics.is_empty() { - group_to_metrics - .entry(group_name.clone()) - .or_default() - .extend(agg_metrics); - } - } - - if !group_to_metrics.is_empty() { - // Get all unique metric names - let mut metric_names: Vec = group_to_metrics - .values() - .flat_map(|metrics| metrics.iter().map(|m| m.name.clone())) - .collect::>() - .into_iter() - .collect(); - metric_names.sort(); - - // Create table header - let header = format!("| group | {} |", metric_names.join(" | ")); - let separator = - format!("| --- | {} |", vec!["---"; metric_names.len()].join(" | ")); - markdown_output.push_str(&header); - markdown_output.push('\n'); - markdown_output.push_str(&separator); - markdown_output.push('\n'); - - // Fill table rows - for (group_name, metrics) in group_to_metrics { - let mut row = format!("| {} |", group_name); - - for metric_name in &metric_names { - let metric_str = metrics - .iter() - .find(|m| &m.name == metric_name) - .map(|m| format!(" {} |", Self::format_number(m.value))) - .unwrap_or_default(); - - row.push_str(&metric_str); - } - - markdown_output.push_str(&row); - markdown_output.push('\n'); - } - markdown_output.push('\n'); - } - } - - markdown_output - } - - fn read_aggregations( - aggregation_file: &str, - ) -> Result, Box> { - let file = std::fs::File::open(aggregation_file)?; - let aggregation_data: AggregationFile = serde_json::from_reader(file)?; - Ok(aggregation_data.aggregations) - } - - fn apply_aggregations(&mut self, aggregations: &[Aggregation]) { - for aggregation in aggregations { - let mut group_by_dict: HashMap, f64> = HashMap::new(); - - if aggregation.operation == "sum" || aggregation.operation == "unique" { - for (tuple_keys, metrics_dict) in &self.dict_by_label_types { - // Skip if not all group_by keys are present in tuple_keys - if !aggregation - .group_by - .iter() - .all(|key| tuple_keys.contains(key)) - { - continue; - } - - for (tuple_values, metrics) in metrics_dict { - // Create a mapping from label keys to values - let label_dict: HashMap<_, _> = - tuple_keys.iter().zip(tuple_values.iter()).collect(); - - // Extract values for group_by keys - let group_by_values: Vec = aggregation - .group_by - .iter() - .map(|key| label_dict[key].clone()) - .collect(); - - // Process metrics - for metric in metrics { - if aggregation.metrics.contains(&metric.name) { - match aggregation.operation.as_str() { - "sum" => { - *group_by_dict - .entry(group_by_values.clone()) - .or_default() += metric.value; - } - "unique" => { - let entry = group_by_dict - .entry(group_by_values.clone()) - .or_default(); - if *entry != 0.0 && *entry != metric.value { - println!( - "[WARN] Overwriting {}: previous value = {}, new value = {}", - metric.name, entry, metric.value - ); - } - *entry = metric.value; - } - _ => unreachable!(), - } - } - } - } - } - - // Add aggregated metrics back to the database - for (group_by_values, agg_value) in group_by_dict { - let labels = Labels( - aggregation - .group_by - .iter() - .zip(group_by_values.iter()) - .map(|(k, v)| (k.clone(), v.clone())) - .collect(), - ); - - let metric = Metric::new(aggregation.name.clone(), agg_value); - - // Check if metric already exists - if let Some(metrics) = self.flat_dict.get_mut(&labels) { - if let Some(existing_metric) = - metrics.iter_mut().find(|m| m.name == aggregation.name) - { - if existing_metric.value != agg_value { - println!( - "[WARN] Overwriting {}: previous value = {}, new value = {}", - aggregation.name, existing_metric.value, agg_value - ); - } - existing_metric.value = agg_value; - } else { - metrics.push(metric); - } - } else { - self.flat_dict.insert(labels, vec![metric]); - } - } - } - } - - self.separate_by_label_types(); - } -} - -fn main() -> Result<(), Box> { - let args = Args::parse(); - let mut db = MetricDb::new(&args.metrics_json)?; - - let mut markdown_output = String::new(); - - if let Some(aggregation_file) = args.aggregation_json { - let aggregations = MetricDb::read_aggregations(&aggregation_file)?; - db.apply_aggregations(&aggregations); - - // Generate aggregation tables - let agg_tables = db.generate_aggregation_tables(&aggregations); - markdown_output.push_str(&agg_tables); - - // Add detailed metrics in a collapsible section - markdown_output.push_str("\n
\nDetailed Metrics\n\n"); - markdown_output.push_str(&db.generate_markdown_tables()); - markdown_output.push_str("
\n\n"); - } else { - markdown_output.push_str(&db.generate_markdown_tables()); - } - - println!("{}", markdown_output); Ok(()) } diff --git a/ci/scripts/metric_unify/src/markdown.rs b/ci/scripts/metric_unify/src/markdown.rs new file mode 100644 index 0000000000..6eb46568e5 --- /dev/null +++ b/ci/scripts/metric_unify/src/markdown.rs @@ -0,0 +1,146 @@ +use std::fmt::Display; + +use num_format::{Locale, ToFormattedString}; + +use crate::metric::Metric; + +const TABLE_SEPARATOR: &str = "|"; +const COLUMN_SEPARATOR: &str = "---"; + +pub struct Tables { + tables: Vec, +} + +impl From> for Tables { + fn from(tables: Vec
) -> Self { + Self { tables } + } +} + +impl Display for Tables { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + self.tables + .iter() + .map(|t| t.to_string()) + .collect::>() + .join("\n\n") + ) + } +} + +#[derive(Debug, Clone)] +pub struct Table { + header: TableHeader, + rows: Vec, +} + +impl Table { + pub fn new(header: TableHeader, rows: Vec) -> Self { + assert!(!header.cells.is_empty()); + assert!(!rows.is_empty()); + Self { header, rows } + } +} + +impl Display for Table { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let header = self.header.to_string(); + let separators = + vec![COLUMN_SEPARATOR; self.header.cells.len()].join(&format!(" {} ", TABLE_SEPARATOR)); + let rows = self + .rows + .iter() + .map(|row| row.to_string()) + .collect::>(); + write!( + f, + "{}\n{} {} {}\n{}", + header, + TABLE_SEPARATOR, + separators, + TABLE_SEPARATOR, + rows.join("\n") + ) + } +} + +type Cell = String; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TableHeader { + cells: Vec, +} + +impl TableHeader { + pub fn new(cells: Vec) -> Self { + Self { cells } + } +} + +impl Display for TableHeader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "| {} |", self.cells.join(" | ")) + } +} + +/// Represents a row in the markdown table +#[derive(Debug, Clone)] +pub struct TableRow { + cells: Vec, + values: Vec, +} + +impl TableRow { + pub fn new(cells: Vec, values: Vec) -> Self { + Self { cells, values } + } +} + +impl Display for TableRow { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} ", TABLE_SEPARATOR)?; + write!(f, "{}", &self.cells.join(&format!(" {} ", TABLE_SEPARATOR)))?; + write!(f, " {} ", TABLE_SEPARATOR)?; + write!( + f, + " {} ", + &self.values.join(&format!(" {} ", TABLE_SEPARATOR)) + )?; + write!(f, " {}", TABLE_SEPARATOR) + } +} + +/// Formats a number for display, showing integers with thousands separators and floats with 2 decimal places +pub fn format_metric_value(metric: &Metric) -> String { + if metric.diff.is_some() { + format!( + "{value_str} ({diff} [{diff_percentage}%]) ", + diff = format_number(metric.diff.unwrap()), + diff_percentage = format_number(metric.diff_percentage.unwrap()), + value_str = format_number(metric.value), + color = { + if metric.diff.unwrap() > 0.0 { + "red" + } else { + "green" + } + }, + ) + } else { + format_number(metric.value) + } +} + +fn format_number(value: f64) -> String { + if value.fract() == 0.0 { + let int_value = value as i64; + int_value.to_formatted_string(&Locale::en) + } else if value.is_nan() { + String::default() + } else { + format!("{:.2}", value) + } +} diff --git a/ci/scripts/metric_unify/src/metric.rs b/ci/scripts/metric_unify/src/metric.rs new file mode 100644 index 0000000000..fcbc390ee3 --- /dev/null +++ b/ci/scripts/metric_unify/src/metric.rs @@ -0,0 +1,208 @@ +use serde::{Deserialize, Serialize}; + +/// A file containing aggregation entries +/// +/// ```rust +/// let file: AggregationFile = serde_json::from_str("{\"aggregations\": [{\"name\": \"total_time\", \"group_by\": [\"group\"], \"metrics\": [\"time\"], \"operation\": \"sum\"}]}")?; +/// ``` +#[derive(Debug, Serialize, Deserialize)] +pub struct AggregationFile { + pub aggregations: Vec, +} + +/// A file containing metric entries +/// +/// ```rust +/// let file: MetricsFile = serde_json::from_str("{\"counter\": [{\"labels\": [\"group\", \"bench_program_inner\"], \"name\": \"metric_name\", \"value\": 1.0}], \"gauge\": [{\"labels\": [\"group\", \"bench_program_inner\"], \"name\": \"metric_name\", \"value\": "1.0"}]}")?; +/// ``` +#[derive(Debug, Serialize, Deserialize)] +pub struct MetricsFile { + pub counter: Vec, + pub gauge: Vec, +} + +/// A metric entry +/// +/// ```rust +/// let metric: RowMetric = serde_json::from_str("{\"labels\": [\"group\", \"bench_program_inner\"], \"name\": \"metric_name\", \"value\": 1.0}")?; +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MetricEntry { + #[serde(alias = "metric")] + pub name: String, + pub labels: Labels, + #[serde(deserialize_with = "deserialize_f64_from_string")] + pub value: f64, +} + +/// Label identifies the type of metric +/// +/// Example: +/// +/// ```rust +/// let label: Label = serde_json::from_str("[\"group\", \"bench_program_innder\"]")?; +/// ``` +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +#[serde(from = "[String; 2]")] +pub struct Label { + pub primary: String, + pub secondary: String, +} + +impl From<[String; 2]> for Label { + fn from([primary, secondary]: [String; 2]) -> Self { + Self { primary, secondary } + } +} + +#[derive(Debug, Clone, Eq, Serialize, Deserialize)] +pub struct Labels(pub Vec
{ + // Remove duplicate metrics + let mut metrics = metrics + .into_iter() + .collect::>() + .into_iter() + .collect::>(); + + // Sort metrics by primary labels for later grouping + metrics.sort_by_key(|m| m.primary_labels.clone()); + + metrics + .into_iter() + .chunk_by(|m| m.primary_labels.clone()) + .into_iter() + .map(|(_, group)| group.collect()) + .map(GroupedMetricsByPrimaryLabels::new) + .map(GroupedMetricsByPrimaryLabels::to_table) + .collect() +}