From a9a34db26dc276febb5e3d0d7b3992b61e00e4f9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 29 Jul 2024 09:32:43 -0400 Subject: [PATCH] Add metrics for skipped rows --- .../physical-plan/src/aggregates/row_hash.rs | 43 +++++++++++++------ 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index aec475bd801cf..9a1e7c57e0b6d 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -28,12 +28,12 @@ use crate::aggregates::{ PhysicalGroupBy, }; use crate::common::IPCWriter; -use crate::metrics::{BaselineMetrics, RecordOutput}; +use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; use crate::sorts::sort::sort_batch; use crate::sorts::streaming_merge; use crate::spill::read_spill_as_stream; use crate::stream::RecordBatchStreamAdapter; -use crate::{aggregates, ExecutionPlan, PhysicalExpr}; +use crate::{aggregates, metrics, ExecutionPlan, PhysicalExpr}; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; @@ -117,10 +117,17 @@ struct SkipAggregationProbe { /// Flag indicating that further updates of `SkipAggregationProbe` /// state won't make any effect is_locked: bool, + + /// Number of rows where state was output directly without aggregation + skipped_aggregation_rows: metrics::Count, } impl SkipAggregationProbe { - fn new(probe_rows_threshold: usize, probe_ratio_threshold: f64) -> Self { + fn new( + probe_rows_threshold: usize, + probe_ratio_threshold: f64, + skipped_aggregation_rows: metrics::Count, + ) -> Self { Self { input_rows: 0, num_groups: 0, @@ -128,6 +135,7 @@ impl SkipAggregationProbe { probe_ratio_threshold, should_skip: false, is_locked: false, + skipped_aggregation_rows, } } @@ -160,6 +168,12 @@ impl SkipAggregationProbe { self.should_skip = false; self.is_locked = true; } + + /// Record the number of rows that were output directly without aggregation + /// in the metrics + fn record_skipped(&mut self, batch: &RecordBatch) { + self.skipped_aggregation_rows.add(batch.num_rows()); + } } /// HashTable based Grouping Aggregator @@ -473,17 +487,17 @@ impl GroupedHashAggregateStream { .all(|acc| acc.convert_to_state_supported()) && agg_group_by.is_single() { + let options = &context.session_config().options().execution; + let probe_rows_threshold = + options.skip_partial_aggregation_probe_rows_threshold; + let probe_ratio_threshold = + options.skip_partial_aggregation_probe_ratio_threshold; + let skipped_aggregation_rows = MetricBuilder::new(&agg.metrics) + .counter("skipped_aggregation_rows", partition); Some(SkipAggregationProbe::new( - context - .session_config() - .options() - .execution - .skip_partial_aggregation_probe_rows_threshold, - context - .session_config() - .options() - .execution - .skip_partial_aggregation_probe_ratio_threshold, + probe_rows_threshold, + probe_ratio_threshold, + skipped_aggregation_rows, )) } else { None @@ -611,6 +625,9 @@ impl Stream for GroupedHashAggregateStream { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { let _timer = elapsed_compute.timer(); + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + probe.record_skipped(&batch); + } let states = self.transform_to_states(batch)?; return Poll::Ready(Some(Ok( states.record_output(&self.baseline_metrics)