Skip to content

Commit

Permalink
fix(rust, python): fix cse profile (#10239)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Aug 2, 2023
1 parent 22a501d commit 9bde433
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 226 deletions.
202 changes: 2 additions & 200 deletions crates/polars-lazy/src/physical_plan/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod groupby_partitioned;
mod groupby_rolling;
mod join;
mod projection;
mod projection_utils;
#[cfg(feature = "python")]
mod python_scan;
mod scan;
Expand All @@ -24,6 +25,7 @@ pub use executor::*;
use polars_core::POOL;
use polars_plan::global::FETCH_ROWS;
use polars_plan::utils::*;
use projection_utils::*;
use rayon::prelude::*;

pub(super) use self::cache::*;
Expand All @@ -47,203 +49,3 @@ pub(super) use self::udf::*;
pub(super) use self::union::*;
pub(super) use self::unique::*;
use super::*;

fn execute_projection_cached_window_fns(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
) -> PolarsResult<Vec<Series>> {
// We partition by normal expression and window expression
// - the normal expressions can run in parallel
// - the window expression take more memory and often use the same groupby keys and join tuples
// so they are cached and run sequential

// the partitioning messes with column order, so we also store the idx
// and use those to restore the original projection order
#[allow(clippy::type_complexity)]
// String: partition_name,
// u32: index,
let mut windows: Vec<(String, Vec<(u32, Arc<dyn PhysicalExpr>)>)> = vec![];
let mut other = Vec::with_capacity(exprs.len());

// first we partition the window function by the values they group over.
// the groupby values should be cached
let mut index = 0u32;
exprs.iter().for_each(|phys| {
index += 1;
let e = phys.as_expression().unwrap();

let mut is_window = false;
for e in e.into_iter() {
if let Expr::Window { partition_by, .. } = e {
let groupby = format!("{:?}", partition_by.as_slice());
if let Some(tpl) = windows.iter_mut().find(|tpl| tpl.0 == groupby) {
tpl.1.push((index, phys.clone()))
} else {
windows.push((groupby, vec![(index, phys.clone())]))
}
is_window = true;
break;
}
}
if !is_window {
other.push((index, phys))
}
});

let mut selected_columns = POOL.install(|| {
other
.par_iter()
.map(|(idx, expr)| expr.evaluate(df, state).map(|s| (*idx, s)))
.collect::<PolarsResult<Vec<_>>>()
})?;

for partition in windows {
// clear the cache for every partitioned group
let mut state = state.split();
// inform the expression it has window functions.
state.insert_has_window_function_flag();

// don't bother caching if we only have a single window function in this partition
if partition.1.len() == 1 {
state.remove_cache_window_flag();
} else {
state.insert_cache_window_flag();
}

for (index, e) in partition.1 {
if e.as_expression()
.unwrap()
.into_iter()
.filter(|e| matches!(e, Expr::Window { .. }))
.count()
== 1
{
state.insert_cache_window_flag();
}
// caching more than one window expression is a complicated topic for another day
// see issue #2523
else {
state.remove_cache_window_flag();
}

let s = e.evaluate(df, &state)?;
selected_columns.push((index, s));
}
}

selected_columns.sort_unstable_by_key(|tpl| tpl.0);
let selected_columns = selected_columns.into_iter().map(|tpl| tpl.1).collect();
Ok(selected_columns)
}

fn run_exprs_par(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
) -> PolarsResult<Vec<Series>> {
POOL.install(|| {
exprs
.par_iter()
.map(|expr| expr.evaluate(df, state))
.collect()
})
}

pub(super) fn evaluate_physical_expressions(
df: &mut DataFrame,
cse_exprs: &[Arc<dyn PhysicalExpr>],
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
has_windows: bool,
) -> PolarsResult<Vec<Series>> {
let runner = if has_windows {
execute_projection_cached_window_fns
} else {
run_exprs_par
};

let selected_columns = if !cse_exprs.is_empty() {
let tmp_cols = runner(df, cse_exprs, state)?;
let width = df.width();

// put the cse expressions at the end
unsafe {
df.hstack_mut_unchecked(&tmp_cols);
}
let mut result = run_exprs_par(df, exprs, state)?;
// restore original df
unsafe {
df.get_columns_mut().truncate(width);
}

// the replace CSE has a temporary name
// we don't want this name in the result
for s in result.iter_mut() {
rename_cse_tmp_series(s);
}

result
} else {
runner(df, exprs, state)?
};

state.clear_window_expr_cache();

Ok(selected_columns)
}

pub(super) fn check_expand_literals(
mut selected_columns: Vec<Series>,
zero_length: bool,
) -> PolarsResult<DataFrame> {
let first_len = selected_columns[0].len();
let mut df_height = 0;
let mut all_equal_len = true;
{
let mut names = PlHashSet::with_capacity(selected_columns.len());
for s in &selected_columns {
let len = s.len();
df_height = std::cmp::max(df_height, len);
if len != first_len {
all_equal_len = false;
}
let name = s.name();
polars_ensure!(names.insert(name), duplicate = name);
}
}
// If all series are the same length it is ok. If not we can broadcast Series of length one.
if !all_equal_len {
selected_columns = selected_columns
.into_iter()
.map(|series| {
Ok(if series.len() == 1 && df_height > 1 {
series.new_from_index(0, df_height)
} else if series.len() == df_height || series.len() == 0 {
series
} else {
polars_bail!(
ComputeError: "series length {} doesn't match the dataframe height of {}",
series.len(), df_height
);
})
})
.collect::<PolarsResult<_>>()?
}

let df = DataFrame::new_no_checks(selected_columns);

// a literal could be projected to a zero length dataframe.
// This prevents a panic.
let df = if zero_length {
let min = df.get_columns().iter().map(|s| s.len()).min();
if min.is_some() {
df.head(min)
} else {
df
}
} else {
df
};
Ok(df)
}
16 changes: 11 additions & 5 deletions crates/polars-lazy/src/physical_plan/executors/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::*;
/// and a multiple PhysicalExpressions (create the output Series)
pub struct ProjectionExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) cse_expr: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) cse_exprs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) has_windows: bool,
pub(crate) input_schema: SchemaRef,
Expand All @@ -21,7 +21,7 @@ impl ProjectionExec {
#[allow(clippy::let_and_return)]
let selected_cols = evaluate_physical_expressions(
&mut df,
&self.cse_expr,
&self.cse_exprs,
&self.expr,
state,
self.has_windows,
Expand All @@ -48,10 +48,10 @@ impl Executor for ProjectionExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
if self.cse_expr.is_empty() {
if self.cse_exprs.is_empty() {
println!("run ProjectionExec");
} else {
println!("run ProjectionExec with {} CSE", self.cse_expr.len())
println!("run ProjectionExec with {} CSE", self.cse_exprs.len())
};
}
}
Expand All @@ -61,7 +61,13 @@ impl Executor for ProjectionExec {
let by = self
.expr
.iter()
.map(|s| Ok(s.to_field(&self.input_schema)?.name))
.map(|s| {
profile_name(
s.as_ref(),
self.input_schema.as_ref(),
!self.cse_exprs.is_empty(),
)
})
.collect::<PolarsResult<Vec<_>>>()?;
let name = comma_delimited("projection".to_string(), &by);
Cow::Owned(name)
Expand Down
Loading

0 comments on commit 9bde433

Please sign in to comment.