Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
avantgardnerio committed Oct 7, 2024
1 parent 88d1432 commit 5ac621b
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 16 deletions.
25 changes: 9 additions & 16 deletions datafusion/physical-optimizer/src/topk_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
use std::sync::Arc;

use datafusion_physical_plan::aggregates::AggregateExec;
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::ExecutionPlan;

Expand All @@ -33,6 +30,7 @@ use datafusion_common::Result;
use datafusion_physical_expr::expressions::Column;

use crate::PhysicalOptimizerRule;
use datafusion_physical_plan::execution_plan::CardinalityEffect;
use datafusion_physical_plan::projection::ProjectionExec;
use itertools::Itertools;

Expand Down Expand Up @@ -67,7 +65,6 @@ impl TopKAggregation {
}

// ensure the sort is on the same field as the aggregate output
println!("col={} field={}", order_by, field.name());
if order_by != field.name() {
return None;
}
Expand Down Expand Up @@ -98,15 +95,6 @@ impl TopKAggregation {
let mut cur_col_name = order.name().to_string();
let limit = sort.fetch()?;

let is_cardinality_preserving = |plan: Arc<dyn ExecutionPlan>| {
plan.as_any()
.downcast_ref::<CoalesceBatchesExec>()
.is_some()
|| plan.as_any().downcast_ref::<RepartitionExec>().is_some()
|| plan.as_any().downcast_ref::<FilterExec>().is_some()
|| plan.as_any().downcast_ref::<ProjectionExec>().is_some()
};

let mut cardinality_preserved = true;
let closure = |plan: Arc<dyn ExecutionPlan>| {
if !cardinality_preserved {
Expand All @@ -129,9 +117,14 @@ impl TopKAggregation {
}
}
} else {
// or we continue down whitelisted nodes of other types
if !is_cardinality_preserving(Arc::clone(&plan)) {
cardinality_preserved = false;
// or we continue down through types that don't reduce cardinality
match plan.cardinality_effect() {
CardinalityEffect::NoEffect
| CardinalityEffect::IncreaseOrNoEffect => {}
CardinalityEffect::Unknown
| CardinalityEffect::DecreaseOrNoEffect => {
cardinality_preserved = false;
}
}
}
Ok(Transformed::no(plan))
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use datafusion_physical_expr::{
PhysicalExpr, PhysicalSortRequirement,
};

use crate::execution_plan::CardinalityEffect;
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use itertools::Itertools;

Expand Down Expand Up @@ -785,6 +786,10 @@ impl ExecutionPlan for AggregateExec {
}
}
}

fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::DecreaseOrNoEffect
}
}

fn create_schema(
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion_common::Result;
use datafusion_execution::TaskContext;

use crate::coalesce::{BatchCoalescer, CoalescerState};
use crate::execution_plan::CardinalityEffect;
use futures::ready;
use futures::stream::{Stream, StreamExt};

Expand Down Expand Up @@ -199,6 +200,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
fn fetch(&self) -> Option<usize> {
self.fetch
}

fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::NoEffect
}
}

/// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details.
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use super::{

use crate::{DisplayFormatType, ExecutionPlan, Partitioning};

use crate::execution_plan::CardinalityEffect;
use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -178,6 +179,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
fn supports_limit_pushdown(&self) -> bool {
true
}

fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::NoEffect
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
fn fetch(&self) -> Option<usize> {
None
}

/// Gets the effect on cardinality, if known
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::Unknown
}
}

/// Extension trait provides an easy API to fetch various properties of
Expand Down Expand Up @@ -898,6 +903,13 @@ pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
actual.iter().map(|elem| elem.to_string()).collect()
}

pub enum CardinalityEffect {
Unknown,
NoEffect,
DecreaseOrNoEffect,
IncreaseOrNoEffect,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use datafusion_physical_expr::{
analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr,
};

use crate::execution_plan::CardinalityEffect;
use futures::stream::{Stream, StreamExt};
use log::trace;

Expand Down Expand Up @@ -372,6 +373,10 @@ impl ExecutionPlan for FilterExec {
fn statistics(&self) -> Result<Statistics> {
Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)
}

fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::DecreaseOrNoEffect
}
}

/// This function ensures that all bounds in the `ExprBoundaries` vector are
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;

use crate::execution_plan::CardinalityEffect;
use futures::stream::{Stream, StreamExt};
use log::trace;

Expand Down Expand Up @@ -336,6 +337,10 @@ impl ExecutionPlan for LocalLimitExec {
fn supports_limit_pushdown(&self) -> bool {
true
}

fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::DecreaseOrNoEffect
}
}

/// A Limit stream skips `skip` rows, and then fetch up to `fetch` rows.
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::Literal;

use crate::execution_plan::CardinalityEffect;
use futures::stream::{Stream, StreamExt};
use log::trace;

Expand Down Expand Up @@ -233,6 +234,10 @@ impl ExecutionPlan for ProjectionExec {
fn supports_limit_pushdown(&self) -> bool {
true
}

fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::NoEffect
}
}

/// If e is a direct column reference, returns the field level
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, PhysicalSortExpr};

use crate::execution_plan::CardinalityEffect;
use futures::stream::Stream;
use futures::{FutureExt, StreamExt, TryStreamExt};
use hashbrown::HashMap;
Expand Down Expand Up @@ -669,6 +670,10 @@ impl ExecutionPlan for RepartitionExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::NoEffect
}
}

impl RepartitionExec {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement;

use crate::execution_plan::CardinalityEffect;
use futures::{StreamExt, TryStreamExt};
use log::{debug, trace};

Expand Down Expand Up @@ -972,6 +973,10 @@ impl ExecutionPlan for SortExec {
fn fetch(&self) -> Option<usize> {
self.fetch
}

fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::NoEffect
}
}

#[cfg(test)]
Expand Down

0 comments on commit 5ac621b

Please sign in to comment.