Skip to content

Commit

Permalink
fix(python): add cluster_with_columns optimization toggle in python (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored May 23, 2024
1 parent c1c672b commit 2357125
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 0 deletions.
12 changes: 12 additions & 0 deletions py-polars/polars/functions/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,7 @@ def collect_all(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
streaming: bool = False,
) -> list[DataFrame]:
"""
Expand Down Expand Up @@ -1774,6 +1775,8 @@ def collect_all(
Will try to cache branching subplans that occur on self-joins or unions.
comm_subexpr_elim
Common subexpressions will be cached and reused.
cluster_with_columns
Combine sequential independent calls to with_columns
streaming
Process the query in batches to handle larger-than-memory data.
If set to `False` (default), the entire query is processed in a single
Expand All @@ -1798,6 +1801,7 @@ def collect_all(
slice_pushdown = False
comm_subplan_elim = False
comm_subexpr_elim = False
cluster_with_columns = False

if streaming:
issue_unstable_warning("Streaming mode is considered unstable.")
Expand All @@ -1814,6 +1818,7 @@ def collect_all(
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
cluster_with_columns,
streaming,
_eager=False,
)
Expand All @@ -1840,6 +1845,7 @@ def collect_all_async(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
streaming: bool = True,
) -> _GeventDataFrameResult[list[DataFrame]]: ...

Expand All @@ -1857,6 +1863,7 @@ def collect_all_async(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
streaming: bool = False,
) -> Awaitable[list[DataFrame]]: ...

Expand All @@ -1874,6 +1881,7 @@ def collect_all_async(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
streaming: bool = False,
) -> Awaitable[list[DataFrame]] | _GeventDataFrameResult[list[DataFrame]]:
"""
Expand Down Expand Up @@ -1912,6 +1920,8 @@ def collect_all_async(
Will try to cache branching subplans that occur on self-joins or unions.
comm_subexpr_elim
Common subexpressions will be cached and reused.
cluster_with_columns
Combine sequential independent calls to with_columns
streaming
Process the query in batches to handle larger-than-memory data.
If set to `False` (default), the entire query is processed in a single
Expand Down Expand Up @@ -1948,6 +1958,7 @@ def collect_all_async(
slice_pushdown = False
comm_subplan_elim = False
comm_subexpr_elim = False
cluster_with_columns = False

if streaming:
issue_unstable_warning("Streaming mode is considered unstable.")
Expand All @@ -1964,6 +1975,7 @@ def collect_all_async(
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
cluster_with_columns,
streaming,
_eager=False,
)
Expand Down
33 changes: 33 additions & 0 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ def explain(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
streaming: bool = False,
tree_format: bool = False,
) -> str:
Expand Down Expand Up @@ -918,6 +919,8 @@ def explain(
Will try to cache branching subplans that occur on self-joins or unions.
comm_subexpr_elim
Common subexpressions will be cached and reused.
cluster_with_columns
Combine sequential independent calls to with_columns
streaming
Run parts of the query in a streaming fashion (this is in an alpha state)
tree_format
Expand Down Expand Up @@ -945,6 +948,7 @@ def explain(
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
cluster_with_columns,
streaming,
_eager=False,
)
Expand All @@ -971,6 +975,7 @@ def show_graph(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
streaming: bool = False,
) -> str | None:
"""
Expand Down Expand Up @@ -1005,6 +1010,8 @@ def show_graph(
Will try to cache branching subplans that occur on self-joins or unions.
comm_subexpr_elim
Common subexpressions will be cached and reused.
cluster_with_columns
Combine sequential independent calls to with_columns
streaming
Run parts of the query in a streaming fashion (this is in an alpha state)
Expand All @@ -1029,6 +1036,7 @@ def show_graph(
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
cluster_with_columns,
streaming,
_eager=False,
)
Expand Down Expand Up @@ -1513,6 +1521,7 @@ def profile(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
show_plot: bool = False,
truncate_nodes: int = 0,
figsize: tuple[int, int] = (18, 8),
Expand Down Expand Up @@ -1545,6 +1554,8 @@ def profile(
Will try to cache branching subplans that occur on self-joins or unions.
comm_subexpr_elim
Common subexpressions will be cached and reused.
cluster_with_columns
Combine sequential independent calls to with_columns
show_plot
Show a gantt chart of the profiling result
truncate_nodes
Expand Down Expand Up @@ -1593,6 +1604,7 @@ def profile(
projection_pushdown = False
comm_subplan_elim = False
comm_subexpr_elim = False
cluster_with_columns = False

ldf = self._ldf.optimization_toggle(
type_coercion,
Expand All @@ -1602,6 +1614,7 @@ def profile(
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
cluster_with_columns,
streaming,
_eager=False,
)
Expand Down Expand Up @@ -1658,6 +1671,7 @@ def collect(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
no_optimization: bool = False,
streaming: bool = False,
background: Literal[True],
Expand All @@ -1675,6 +1689,7 @@ def collect(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
no_optimization: bool = False,
streaming: bool = False,
background: Literal[False] = False,
Expand All @@ -1691,6 +1706,7 @@ def collect(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
no_optimization: bool = False,
streaming: bool = False,
background: bool = False,
Expand Down Expand Up @@ -1719,6 +1735,8 @@ def collect(
Will try to cache branching subplans that occur on self-joins or unions.
comm_subexpr_elim
Common subexpressions will be cached and reused.
cluster_with_columns
Combine sequential independent calls to with_columns
no_optimization
Turn off (certain) optimizations.
streaming
Expand Down Expand Up @@ -1792,6 +1810,7 @@ def collect(
slice_pushdown = False
comm_subplan_elim = False
comm_subexpr_elim = False
cluster_with_columns = False

if streaming:
issue_unstable_warning("Streaming mode is considered unstable.")
Expand All @@ -1805,6 +1824,7 @@ def collect(
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
cluster_with_columns,
streaming,
_eager,
)
Expand All @@ -1829,6 +1849,7 @@ def collect_async(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
streaming: bool = True,
) -> _GeventDataFrameResult[DataFrame]: ...

Expand All @@ -1845,6 +1866,7 @@ def collect_async(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
streaming: bool = True,
) -> Awaitable[DataFrame]: ...

Expand All @@ -1860,6 +1882,7 @@ def collect_async(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
streaming: bool = False,
) -> Awaitable[DataFrame] | _GeventDataFrameResult[DataFrame]:
"""
Expand Down Expand Up @@ -1896,6 +1919,8 @@ def collect_async(
Will try to cache branching subplans that occur on self-joins or unions.
comm_subexpr_elim
Common subexpressions will be cached and reused.
cluster_with_columns
Combine sequential independent calls to with_columns
streaming
Process the query in batches to handle larger-than-memory data.
If set to `False` (default), the entire query is processed in a single
Expand Down Expand Up @@ -1960,6 +1985,7 @@ def collect_async(
slice_pushdown = False
comm_subplan_elim = False
comm_subexpr_elim = False
cluster_with_columns = False

if streaming:
issue_unstable_warning("Streaming mode is considered unstable.")
Expand All @@ -1973,6 +1999,7 @@ def collect_async(
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
cluster_with_columns,
streaming,
_eager=False,
)
Expand Down Expand Up @@ -2379,6 +2406,7 @@ def _set_sink_optimizations(
slice_pushdown,
comm_subplan_elim=False,
comm_subexpr_elim=False,
cluster_with_columns=False,
streaming=True,
_eager=False,
)
Expand All @@ -2395,6 +2423,7 @@ def fetch(
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
streaming: bool = False,
) -> DataFrame:
"""
Expand All @@ -2420,6 +2449,8 @@ def fetch(
Will try to cache branching subplans that occur on self-joins or unions.
comm_subexpr_elim
Common subexpressions will be cached and reused.
cluster_with_columns
Combine sequential independent calls to with_columns
streaming
Run parts of the query in a streaming fashion (this is in an alpha state)
Expand Down Expand Up @@ -2467,6 +2498,7 @@ def fetch(
slice_pushdown = False
comm_subplan_elim = False
comm_subexpr_elim = False
cluster_with_columns = False

lf = self._ldf.optimization_toggle(
type_coercion,
Expand All @@ -2476,6 +2508,7 @@ def fetch(
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
cluster_with_columns,
streaming,
_eager=False,
)
Expand Down
2 changes: 2 additions & 0 deletions py-polars/src/lazyframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ impl PyLazyFrame {
slice_pushdown: bool,
comm_subplan_elim: bool,
comm_subexpr_elim: bool,
cluster_with_columns: bool,
streaming: bool,
_eager: bool,
) -> Self {
Expand All @@ -466,6 +467,7 @@ impl PyLazyFrame {
.with_predicate_pushdown(predicate_pushdown)
.with_simplify_expr(simplify_expr)
.with_slice_pushdown(slice_pushdown)
.with_cluster_with_columns(cluster_with_columns)
.with_streaming(streaming)
._with_eager(_eager)
.with_projection_pushdown(projection_pushdown);
Expand Down
15 changes: 15 additions & 0 deletions py-polars/tests/unit/test_cwc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ def test_basic_cwc() -> None:
)


def test_disable_cwc() -> None:
df = (
pl.LazyFrame({"a": [1, 2]})
.with_columns(pl.col("a").alias("b") * 2)
.with_columns(pl.col("a").alias("c") * 3)
.with_columns(pl.col("a").alias("d") * 4)
)

explain = df.explain(cluster_with_columns=False)

assert """[[(col("a")) * (2)].alias("b")]""" in explain
assert """[[(col("a")) * (3)].alias("c")]""" in explain
assert """[[(col("a")) * (4)].alias("d")]""" in explain


def test_refuse_with_deps() -> None:
df = (
pl.LazyFrame({"a": [1, 2]})
Expand Down

0 comments on commit 2357125

Please sign in to comment.