Skip to content

Commit

Permalink
equivalence classes: fix projection
Browse files Browse the repository at this point in the history
This patch fixes the logic that projects equivalence classes:
when run over the projection mapping to find new equivalent expressions,
we need to normalize a source expression.
  • Loading branch information
askalt committed Jan 28, 2025
1 parent 09a0844 commit a777890
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 13 deletions.
19 changes: 18 additions & 1 deletion datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,17 @@ impl EquivalenceGroup {
.collapse()
}

/// Makes a new [`ProjectionMapping`] where each source expression is normalized.
pub fn normalize_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
ProjectionMapping {
map: mapping
.map
.iter()
.map(|(src, dst)| (self.normalize_expr(Arc::clone(src)), Arc::clone(dst)))
.collect(),
}
}

/// Projects `expr` according to the given projection mapping.
/// If the resulting expression is invalid after projection, returns `None`.
pub fn project_expr(
Expand Down Expand Up @@ -584,12 +595,18 @@ impl EquivalenceGroup {
.collect::<Vec<_>>();
(new_class.len() > 1).then_some(EquivalenceClass::new(new_class))
});

// the key is the source expression and the value is the EquivalenceClass that contains the target expression of the source expression.
let mut new_classes: IndexMap<Arc<dyn PhysicalExpr>, EquivalenceClass> =
IndexMap::new();
mapping.iter().for_each(|(source, target)| {
// We need to find equivalent projected expressions.
// e.g. table with columns [a,b,c] and a == b, projection: [a+c, b+c].
// To conclude that a + c == b + c we firsty normalize all source expressions
// in the mapping, then merge all equivalent expressions into the classes.
let normalized_expr = self.normalize_expr(Arc::clone(source));
new_classes
.entry(Arc::clone(source))
.entry(normalized_expr)
.or_insert_with(EquivalenceClass::new_empty)
.push(Arc::clone(target));
});
Expand Down
11 changes: 1 addition & 10 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,16 +920,7 @@ impl EquivalenceProperties {
fn normalized_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
// Construct the mapping where source expressions are normalized. In this way
// In the algorithms below we can work on exact equalities
ProjectionMapping {
map: mapping
.iter()
.map(|(source, target)| {
let normalized_source =
self.eq_group.normalize_expr(Arc::clone(source));
(normalized_source, Arc::clone(target))
})
.collect(),
}
self.eq_group.normalize_mapping(mapping)
}

/// Computes projected orderings based on a given projection mapping.
Expand Down
123 changes: 121 additions & 2 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1000,13 +1000,19 @@ mod tests {
use super::*;
use std::sync::Arc;

use crate::common::collect;
use crate::execution_plan::Boundedness;
use crate::test;
use crate::{common::collect, execution_plan::EmissionType};
use datafusion_expr::Operator;
use datafusion_physical_expr::{
expressions::{binary, col},
Partitioning,
};

use arrow_schema::DataType;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::EquivalenceProperties;

use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};

#[test]
Expand Down Expand Up @@ -1197,4 +1203,117 @@ mod tests {

assert_eq!(result, expected);
}

#[derive(Debug)]
struct PropertiesExec {
properties: PlanProperties,
schema: SchemaRef,
}

impl DisplayAs for PropertiesExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(
f,
"PropertiesExec, schema={}, properties={:?}",
self.schema, self.properties
)
}
}

impl ExecutionPlan for PropertiesExec {
fn name(&self) -> &str {
"PropertiesExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
&self.properties
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!("not supposed to be executed")
}
}

#[tokio::test]
async fn test_stat_projection_equivalent_classes() -> Result<()> {
// - columns: [a, b, c].
// - "a" and "b" in the same equivalence class.
// - then after a+c, b+c projection col(0) and col(1) must be
// in the same class too.
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
eq_properties.add_equal_conditions(&col("a", &schema)?, &col("b", &schema)?)?;

let properties = PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
EmissionType::Both,
Boundedness::Bounded,
);

let projection = ProjectionExec::try_new(
vec![
(
binary(
col("a", &schema)?,
Operator::Plus,
col("c", &schema)?,
&schema,
)?,
"a+c".to_owned(),
),
(
binary(
col("b", &schema)?,
Operator::Plus,
col("c", &schema)?,
&schema,
)?,
"b+c".to_owned(),
),
],
Arc::new(PropertiesExec {
properties,
schema: Arc::clone(&schema),
}),
)?;

let actual_properties = projection.properties();
let eq_group = actual_properties.eq_properties.eq_group();

assert!(!eq_group.is_empty());
let first_normalized = eq_group.normalize_expr(col("a+c", &projection.schema)?);
let second_normalized = eq_group.normalize_expr(col("b+c", &projection.schema)?);

assert!(first_normalized.eq(&second_normalized));

Ok(())
}
}

0 comments on commit a777890

Please sign in to comment.