Skip to content

Commit

Permalink
support bounded_window_agg_exec
Browse files Browse the repository at this point in the history
  • Loading branch information
irenjj committed Feb 7, 2025
1 parent f91739e commit 3ee7f5d
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 74 deletions.
25 changes: 1 addition & 24 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::utils::window_expr_common_partition_keys;

use async_trait::async_trait;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
Expand Down Expand Up @@ -558,34 +557,13 @@ impl DefaultPhysicalPlanner {
return exec_err!("Table '{table_name}' does not exist");
}
}
LogicalPlan::Window(Window {
input, window_expr, ..
}) => {
LogicalPlan::Window(Window { window_expr, .. }) => {
if window_expr.is_empty() {
return internal_err!("Impossibly got empty window expression");
}

let input_exec = children.one()?;

// at this moment we are guaranteed by the logical planner
// to have all the window_expr to have equal sort key
let partition_keys = window_expr_common_partition_keys(window_expr)?;

let can_repartition = !partition_keys.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_window_functions();

let physical_partition_keys = if can_repartition {
partition_keys
.iter()
.map(|e| {
self.create_physical_expr(e, input.schema(), session_state)
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
} else {
vec![]
};

let get_sort_keys = |expr: &Expr| match expr {
Expr::WindowFunction(WindowFunction {
ref partition_by,
Expand Down Expand Up @@ -635,7 +613,6 @@ impl DefaultPhysicalPlanner {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input_exec,
physical_partition_keys,
InputOrderMode::Sorted,
)?)
} else {
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
let running_window_exec = Arc::new(BoundedWindowAggExec::try_new(
vec![window_expr],
memory_exec.clone(),
vec![],
Linear,
)?);
let task_ctx = ctx.task_ctx();
Expand Down Expand Up @@ -679,7 +678,6 @@ async fn run_window_test(
false,
)?],
exec2,
vec![],
search_mode.clone(),
)?) as _;
let task_ctx = ctx.task_ctx();
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ pub fn bounded_window_exec_with_partition(
BoundedWindowAggExec::try_new(
vec![window_expr],
Arc::clone(&input),
vec![],
InputOrderMode::Sorted,
)
.unwrap(),
Expand Down Expand Up @@ -261,7 +260,6 @@ pub fn bounded_window_exec_non_set_monotonic(
)
.unwrap()],
Arc::clone(&input),
vec![],
InputOrderMode::Sorted,
)
.unwrap(),
Expand Down
16 changes: 6 additions & 10 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1193,19 +1193,15 @@ pub fn ensure_distribution(
} = remove_dist_changing_operators(dist_context)?;

if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
if let Some(updated_window) = get_best_fitting_window(
exec.window_expr(),
exec.input(),
&exec.partition_keys(),
)? {
if let Some(updated_window) =
get_best_fitting_window(exec.window_expr(), exec.input())?
{
plan = updated_window;
}
} else if let Some(exec) = plan.as_any().downcast_ref::<BoundedWindowAggExec>() {
if let Some(updated_window) = get_best_fitting_window(
exec.window_expr(),
exec.input(),
&exec.partition_keys,
)? {
if let Some(updated_window) =
get_best_fitting_window(exec.window_expr(), exec.input())?
{
plan = updated_window;
}
};
Expand Down
7 changes: 2 additions & 5 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,11 @@ fn adjust_window_sort_removal(
let (window_expr, new_window) =
if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
let window_expr = exec.window_expr();
let new_window =
get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?;
let new_window = get_best_fitting_window(window_expr, child_plan)?;
(window_expr, new_window)
} else if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
let window_expr = exec.window_expr();
let new_window =
get_best_fitting_window(window_expr, child_plan, &exec.partition_keys)?;
let new_window = get_best_fitting_window(window_expr, child_plan)?;
(window_expr, new_window)
} else {
return plan_err!("Expected WindowAggExec or BoundedWindowAggExec");
Expand Down Expand Up @@ -493,7 +491,6 @@ fn adjust_window_sort_removal(
Arc::new(BoundedWindowAggExec::try_new(
window_expr.to_vec(),
child_plan,
window_expr[0].partition_by().to_vec(),
InputOrderMode::Sorted,
)?) as _
} else {
Expand Down
18 changes: 9 additions & 9 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ pub struct BoundedWindowAggExec {
window_expr: Vec<Arc<dyn WindowExpr>>,
/// Schema after the window is run
schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Describes how the input is ordered relative to the partition keys
Expand All @@ -100,7 +98,6 @@ impl BoundedWindowAggExec {
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
input_order_mode: InputOrderMode,
) -> Result<Self> {
let schema = create_schema(&input.schema(), &window_expr)?;
Expand Down Expand Up @@ -128,7 +125,6 @@ impl BoundedWindowAggExec {
input,
window_expr,
schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
input_order_mode,
ordered_partition_by_indices,
Expand Down Expand Up @@ -209,6 +205,13 @@ impl BoundedWindowAggExec {
input.boundedness(),
)
}

pub fn partition_keys(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.window_expr
.iter()
.flat_map(|expr| expr.partition_by().to_vec())
.collect()
}
}

impl DisplayAs for BoundedWindowAggExec {
Expand Down Expand Up @@ -269,11 +272,11 @@ impl ExecutionPlan for BoundedWindowAggExec {
}

fn required_input_distribution(&self) -> Vec<Distribution> {
if self.partition_keys.is_empty() {
if self.partition_keys().is_empty() {
debug!("No partition defined for BoundedWindowAggExec!!!");
vec![Distribution::SinglePartition]
} else {
vec![Distribution::HashPartitioned(self.partition_keys.clone())]
vec![Distribution::HashPartitioned(self.partition_keys().clone())]
}
}

Expand All @@ -288,7 +291,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
Ok(Arc::new(BoundedWindowAggExec::try_new(
self.window_expr.clone(),
Arc::clone(&children[0]),
self.partition_keys.clone(),
self.input_order_mode.clone(),
)?))
}
Expand Down Expand Up @@ -1327,7 +1329,6 @@ mod tests {
false,
)?],
input,
partitionby_exprs,
input_order_mode,
)?))
}
Expand Down Expand Up @@ -1609,7 +1610,6 @@ mod tests {
let physical_plan = BoundedWindowAggExec::try_new(
window_exprs,
memory_exec,
vec![],
InputOrderMode::Sorted,
)
.map(|e| Arc::new(e) as Arc<dyn ExecutionPlan>)?;
Expand Down
5 changes: 0 additions & 5 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,6 @@ pub(crate) fn window_equivalence_properties(
pub fn get_best_fitting_window(
window_exprs: &[Arc<dyn WindowExpr>],
input: &Arc<dyn ExecutionPlan>,
// These are the partition keys used during repartitioning.
// They are either the same with `window_expr`'s PARTITION BY columns,
// or it is empty if partitioning is not desirable for this windowing operator.
physical_partition_keys: &[Arc<dyn PhysicalExpr>],
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
// Contains at least one window expr and all of the partition by and order by sections
// of the window_exprs are same.
Expand Down Expand Up @@ -425,7 +421,6 @@ pub fn get_best_fitting_window(
Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
window_expr,
Arc::clone(input),
physical_partition_keys.to_vec(),
input_order_mode,
)?) as _))
} else if input_order_mode != InputOrderMode::Sorted {
Expand Down
16 changes: 1 addition & 15 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,19 +357,6 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
})
.collect::<Result<Vec<_>, _>>()?;

let partition_keys = window_agg
.partition_keys
.iter()
.map(|expr| {
parse_physical_expr(
expr,
registry,
input.schema().as_ref(),
extension_codec,
)
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;

if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
let input_order_mode = match input_order_mode {
window_agg_exec_node::InputOrderMode::Linear(_) => {
Expand All @@ -388,7 +375,6 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
Ok(Arc::new(BoundedWindowAggExec::try_new(
physical_window_expr,
input,
partition_keys,
input_order_mode,
)?))
} else {
Expand Down Expand Up @@ -1905,7 +1891,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;

let partition_keys = exec
.partition_keys
.partition_keys()
.iter()
.map(|e| serialize_physical_expr(e, extension_codec))
.collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
Expand Down
2 changes: 0 additions & 2 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ fn roundtrip_udwf() -> Result<()> {
roundtrip_test(Arc::new(BoundedWindowAggExec::try_new(
vec![udwf_expr],
input,
vec![col("a", &schema)?],
InputOrderMode::Sorted,
)?))
}
Expand Down Expand Up @@ -1149,7 +1148,6 @@ fn roundtrip_udwf_extension_codec() -> Result<()> {
let window = Arc::new(BoundedWindowAggExec::try_new(
vec![udwf_expr],
input,
vec![col("b", &schema)?],
InputOrderMode::Sorted,
)?);

Expand Down

0 comments on commit 3ee7f5d

Please sign in to comment.