diff --git a/crates/polars-plan/src/dsl/mod.rs b/crates/polars-plan/src/dsl/mod.rs index a16fd25358bbf..89553c3665d9f 100644 --- a/crates/polars-plan/src/dsl/mod.rs +++ b/crates/polars-plan/src/dsl/mod.rs @@ -1,7 +1,5 @@ #![allow(ambiguous_glob_reexports)] //! Domain specific language for the Lazy API. -#[cfg(feature = "rolling_window")] -use polars_core::utils::ensure_sorted_arg; #[cfg(feature = "dtype-categorical")] pub mod cat; #[cfg(feature = "dtype-categorical")] @@ -1230,19 +1228,49 @@ impl Expr { move |s| { let mut by = s[1].clone(); by = by.rechunk(); - let s = &s[0]; + let series: Series; polars_ensure!( options.weights.is_none(), ComputeError: "`weights` is not supported in 'rolling by' expression" ); - let (by, tz) = match by.dtype() { + let (mut by, tz) = match by.dtype() { DataType::Datetime(tu, tz) => { (by.cast(&DataType::Datetime(*tu, None))?, tz) }, _ => (by.clone(), &None), }; - ensure_sorted_arg(&by, expr_name)?; + let sorting_indices; + let original_indices; + let by_flag = by.is_sorted_flag(); + if matches!(by_flag, IsSorted::Ascending) { + series = s[0].clone(); + original_indices = None; + } else if matches!(by_flag, IsSorted::Descending) { + series = s[0].reverse(); + by = by.reverse(); + original_indices = None; + } else { + if options.warn_if_unsorted { + eprintln!( + "PolarsPerformanceWarning: Series is not known to be \ + sorted by `by` column, so Polars is temporarily \ + sorting it for you.\n\ + You can silence this warning by:\n\ + - passing `warn_if_unsorted=False`;\n\ + - sorting your data by your `by` column beforehand;\n\ + - setting `.set_sorted()` if you already know your data is sorted\n\ + before passing it to the rolling aggregation function" + ); + } + sorting_indices = by.arg_sort(Default::default()); + unsafe { by = by.take_unchecked(&sorting_indices)? }; + unsafe { series = s[0].take_unchecked(&sorting_indices)? }; + let int_range = + UInt32Chunked::from_iter_values("", 0..s[0].len() as u32).into_series(); + original_indices = + unsafe { int_range.take_unchecked(&sorting_indices) }.ok() + }; let by = by.datetime().unwrap(); let by_values = by.cont_slice().map_err(|_| { polars_err!( @@ -1264,7 +1292,15 @@ impl Expr { fn_params: options.fn_params.clone(), }; - rolling_fn(s, options).map(Some) + if matches!(by_flag, IsSorted::Ascending) { + rolling_fn(&series, options).map(Some) + } else if matches!(by_flag, IsSorted::Descending) { + Ok(rolling_fn(&series, options)?.reverse()).map(Some) + } else { + let res = rolling_fn(&series, options)?; + let indices = &original_indices.unwrap().arg_sort(Default::default()); + unsafe { res.take_unchecked(indices) }.map(Some) + } }, &[col(by)], output_type, diff --git a/crates/polars-time/src/chunkedarray/rolling_window/mod.rs b/crates/polars-time/src/chunkedarray/rolling_window/mod.rs index dbb3e07d18e67..15208983ed982 100644 --- a/crates/polars-time/src/chunkedarray/rolling_window/mod.rs +++ b/crates/polars-time/src/chunkedarray/rolling_window/mod.rs @@ -38,6 +38,8 @@ pub struct RollingOptions { pub closed_window: Option, /// Optional parameters for the rolling function pub fn_params: DynArgs, + /// Warn if data is not known to be sorted by `by` column (if passed) + pub warn_if_unsorted: bool, } #[cfg(feature = "rolling_window")] @@ -51,6 +53,7 @@ impl Default for RollingOptions { by: None, closed_window: None, fn_params: None, + warn_if_unsorted: true, } } } diff --git a/py-polars/polars/expr/expr.py b/py-polars/polars/expr/expr.py index c1fc424b91bdf..5c7e315d1e879 100644 --- a/py-polars/polars/expr/expr.py +++ b/py-polars/polars/expr/expr.py @@ -5279,6 +5279,7 @@ def rolling_min( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Apply a rolling min (moving min) over the values in this array. @@ -5350,6 +5351,8 @@ def rolling_min( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -5475,7 +5478,7 @@ def rolling_min( ) return self._from_pyexpr( self._pyexpr.rolling_min( - window_size, weights, min_periods, center, by, closed + window_size, weights, min_periods, center, by, closed, warn_if_unsorted ) ) @@ -5489,6 +5492,7 @@ def rolling_max( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Apply a rolling max (moving max) over the values in this array. @@ -5556,6 +5560,8 @@ def rolling_max( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -5708,7 +5714,7 @@ def rolling_max( ) return self._from_pyexpr( self._pyexpr.rolling_max( - window_size, weights, min_periods, center, by, closed + window_size, weights, min_periods, center, by, closed, warn_if_unsorted ) ) @@ -5722,6 +5728,7 @@ def rolling_mean( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Apply a rolling mean (moving mean) over the values in this array. @@ -5793,6 +5800,8 @@ def rolling_mean( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -5945,7 +5954,13 @@ def rolling_mean( ) return self._from_pyexpr( self._pyexpr.rolling_mean( - window_size, weights, min_periods, center, by, closed + window_size, + weights, + min_periods, + center, + by, + closed, + warn_if_unsorted, ) ) @@ -5959,6 +5974,7 @@ def rolling_sum( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Apply a rolling sum (moving sum) over the values in this array. @@ -6026,6 +6042,8 @@ def rolling_sum( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -6178,7 +6196,7 @@ def rolling_sum( ) return self._from_pyexpr( self._pyexpr.rolling_sum( - window_size, weights, min_periods, center, by, closed + window_size, weights, min_periods, center, by, closed, warn_if_unsorted ) ) @@ -6193,6 +6211,7 @@ def rolling_std( by: str | None = None, closed: ClosedInterval = "left", ddof: int = 1, + warn_if_unsorted: bool = True, ) -> Self: """ Compute a rolling standard deviation. @@ -6262,6 +6281,8 @@ def rolling_std( applicable if `by` has been set. ddof "Delta Degrees of Freedom": The divisor for a length N window is N - ddof + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -6414,7 +6435,14 @@ def rolling_std( ) return self._from_pyexpr( self._pyexpr.rolling_std( - window_size, weights, min_periods, center, by, closed, ddof + window_size, + weights, + min_periods, + center, + by, + closed, + ddof, + warn_if_unsorted, ) ) @@ -6429,6 +6457,7 @@ def rolling_var( by: str | None = None, closed: ClosedInterval = "left", ddof: int = 1, + warn_if_unsorted: bool = True, ) -> Self: """ Compute a rolling variance. @@ -6498,6 +6527,8 @@ def rolling_var( applicable if `by` has been set. ddof "Delta Degrees of Freedom": The divisor for a length N window is N - ddof + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -6657,6 +6688,7 @@ def rolling_var( by, closed, ddof, + warn_if_unsorted, ) ) @@ -6670,6 +6702,7 @@ def rolling_median( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Compute a rolling median. @@ -6737,6 +6770,8 @@ def rolling_median( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -6815,7 +6850,7 @@ def rolling_median( ) return self._from_pyexpr( self._pyexpr.rolling_median( - window_size, weights, min_periods, center, by, closed + window_size, weights, min_periods, center, by, closed, warn_if_unsorted ) ) @@ -6831,6 +6866,7 @@ def rolling_quantile( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Compute a rolling quantile. @@ -6902,6 +6938,8 @@ def rolling_quantile( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -7016,6 +7054,7 @@ def rolling_quantile( center, by, closed, + warn_if_unsorted, ) ) diff --git a/py-polars/src/expr/rolling.rs b/py-polars/src/expr/rolling.rs index dbce0d294203b..5ee2055ba81d9 100644 --- a/py-polars/src/expr/rolling.rs +++ b/py-polars/src/expr/rolling.rs @@ -11,7 +11,8 @@ use crate::{PyExpr, PySeries}; #[pymethods] impl PyExpr { - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] + #[allow(clippy::too_many_arguments)] fn rolling_sum( &self, window_size: &str, @@ -20,6 +21,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -28,12 +30,14 @@ impl PyExpr { center, by, closed_window: closed.map(|c| c.0), + warn_if_unsorted, ..Default::default() }; self.inner.clone().rolling_sum(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] + #[allow(clippy::too_many_arguments)] fn rolling_min( &self, window_size: &str, @@ -42,6 +46,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -50,12 +55,14 @@ impl PyExpr { center, by, closed_window: closed.map(|c| c.0), + warn_if_unsorted, ..Default::default() }; self.inner.clone().rolling_min(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] + #[allow(clippy::too_many_arguments)] fn rolling_max( &self, window_size: &str, @@ -64,6 +71,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -72,12 +80,14 @@ impl PyExpr { center, by, closed_window: closed.map(|c| c.0), + warn_if_unsorted, ..Default::default() }; self.inner.clone().rolling_max(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] + #[allow(clippy::too_many_arguments)] fn rolling_mean( &self, window_size: &str, @@ -86,6 +96,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -94,13 +105,14 @@ impl PyExpr { center, by, closed_window: closed.map(|c| c.0), + warn_if_unsorted, ..Default::default() }; self.inner.clone().rolling_mean(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, ddof))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, ddof, warn_if_unsorted))] #[allow(clippy::too_many_arguments)] fn rolling_std( &self, @@ -111,6 +123,7 @@ impl PyExpr { by: Option, closed: Option>, ddof: u8, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -120,12 +133,13 @@ impl PyExpr { by, closed_window: closed.map(|c| c.0), fn_params: Some(Arc::new(RollingVarParams { ddof }) as Arc), + warn_if_unsorted, }; self.inner.clone().rolling_std(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, ddof))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, ddof, warn_if_unsorted))] #[allow(clippy::too_many_arguments)] fn rolling_var( &self, @@ -136,6 +150,7 @@ impl PyExpr { by: Option, closed: Option>, ddof: u8, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -145,12 +160,14 @@ impl PyExpr { by, closed_window: closed.map(|c| c.0), fn_params: Some(Arc::new(RollingVarParams { ddof }) as Arc), + warn_if_unsorted, }; self.inner.clone().rolling_var(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] + #[allow(clippy::too_many_arguments)] fn rolling_median( &self, window_size: &str, @@ -159,6 +176,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -171,11 +189,12 @@ impl PyExpr { prob: 0.5, interpol: QuantileInterpolOptions::Linear, }) as Arc), + warn_if_unsorted, }; self.inner.clone().rolling_quantile(options).into() } - #[pyo3(signature = (quantile, interpolation, window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (quantile, interpolation, window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] #[allow(clippy::too_many_arguments)] fn rolling_quantile( &self, @@ -187,6 +206,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -199,6 +219,7 @@ impl PyExpr { prob: quantile, interpol: interpolation.0, }) as Arc), + warn_if_unsorted, }; self.inner.clone().rolling_quantile(options).into() diff --git a/py-polars/tests/parametric/test_groupby_rolling.py b/py-polars/tests/parametric/test_groupby_rolling.py deleted file mode 100644 index c4c62b36a2500..0000000000000 --- a/py-polars/tests/parametric/test_groupby_rolling.py +++ /dev/null @@ -1,77 +0,0 @@ -from __future__ import annotations - -from datetime import timedelta -from typing import TYPE_CHECKING - -import hypothesis.strategies as st -from hypothesis import assume, given, reject - -import polars as pl -from polars.testing import assert_frame_equal -from polars.testing.parametric.primitives import column, dataframes -from polars.testing.parametric.strategies import strategy_closed, strategy_time_unit -from polars.utils.convert import _timedelta_to_pl_duration - -if TYPE_CHECKING: - from polars.type_aliases import ClosedInterval, TimeUnit - - -@given( - period=st.timedeltas(min_value=timedelta(microseconds=0)).map( - _timedelta_to_pl_duration - ), - offset=st.timedeltas().map(_timedelta_to_pl_duration), - closed=strategy_closed, - data=st.data(), - time_unit=strategy_time_unit, -) -def test_group_by_rolling( - period: str, - offset: str, - closed: ClosedInterval, - data: st.DataObject, - time_unit: TimeUnit, -) -> None: - assume(period != "") - dataframe = data.draw( - dataframes( - [ - column("ts", dtype=pl.Datetime(time_unit)), - column("value", dtype=pl.Int64), - ], - ) - ) - df = dataframe.sort("ts").unique("ts") - try: - result = df.group_by_rolling( - "ts", period=period, offset=offset, closed=closed - ).agg(pl.col("value")) - except pl.exceptions.PolarsPanicError as exc: - assert any( # noqa: PT017 - msg in str(exc) - for msg in ( - "attempt to multiply with overflow", - "attempt to add with overflow", - ) - ) - reject() - - expected_dict: dict[str, list[object]] = {"ts": [], "value": []} - for ts, _ in df.iter_rows(): - window = df.filter( - pl.col("ts").is_between( - pl.lit(ts, dtype=pl.Datetime(time_unit)).dt.offset_by(offset), - pl.lit(ts, dtype=pl.Datetime(time_unit)) - .dt.offset_by(offset) - .dt.offset_by(period), - closed=closed, - ) - ) - value = window["value"].to_list() - expected_dict["ts"].append(ts) - expected_dict["value"].append(value) - expected = pl.DataFrame(expected_dict).select( - pl.col("ts").cast(pl.Datetime(time_unit)), - pl.col("value").cast(pl.List(pl.Int64)), - ) - assert_frame_equal(result, expected) diff --git a/py-polars/tests/parametric/test_rolling.py b/py-polars/tests/parametric/test_rolling.py new file mode 100644 index 0000000000000..38536578ebe9a --- /dev/null +++ b/py-polars/tests/parametric/test_rolling.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +from datetime import timedelta +from typing import TYPE_CHECKING + +import hypothesis.strategies as st +from hypothesis import assume, given, reject + +import polars as pl +from polars.testing import assert_frame_equal, assert_series_equal +from polars.testing.parametric.primitives import column, dataframes +from polars.testing.parametric.strategies import strategy_closed, strategy_time_unit +from polars.utils.convert import _timedelta_to_pl_duration + +if TYPE_CHECKING: + from polars.type_aliases import ClosedInterval, TimeUnit + + +@given( + period=st.timedeltas(min_value=timedelta(microseconds=0)).map( + _timedelta_to_pl_duration + ), + offset=st.timedeltas().map(_timedelta_to_pl_duration), + closed=strategy_closed, + data=st.data(), + time_unit=strategy_time_unit, +) +def test_group_by_rolling( + period: str, + offset: str, + closed: ClosedInterval, + data: st.DataObject, + time_unit: TimeUnit, +) -> None: + assume(period != "") + dataframe = data.draw( + dataframes( + [ + column("ts", dtype=pl.Datetime(time_unit)), + column("value", dtype=pl.Int64), + ], + ) + ) + df = dataframe.sort("ts").unique("ts") + try: + result = df.group_by_rolling( + "ts", period=period, offset=offset, closed=closed + ).agg(pl.col("value")) + except pl.exceptions.PolarsPanicError as exc: + assert any( # noqa: PT017 + msg in str(exc) + for msg in ( + "attempt to multiply with overflow", + "attempt to add with overflow", + ) + ) + reject() + + expected_dict: dict[str, list[object]] = {"ts": [], "value": []} + for ts, _ in df.iter_rows(): + window = df.filter( + pl.col("ts").is_between( + pl.lit(ts, dtype=pl.Datetime(time_unit)).dt.offset_by(offset), + pl.lit(ts, dtype=pl.Datetime(time_unit)) + .dt.offset_by(offset) + .dt.offset_by(period), + closed=closed, + ) + ) + value = window["value"].to_list() + expected_dict["ts"].append(ts) + expected_dict["value"].append(value) + expected = pl.DataFrame(expected_dict).select( + pl.col("ts").cast(pl.Datetime(time_unit)), + pl.col("value").cast(pl.List(pl.Int64)), + ) + assert_frame_equal(result, expected) + + +@given( + window_size=st.timedeltas(min_value=timedelta(microseconds=0)).map( + _timedelta_to_pl_duration + ), + closed=strategy_closed, + data=st.data(), + time_unit=strategy_time_unit, + aggregation=st.sampled_from( + [ + "min", + "max", + "mean", + "sum", + # "std", blocked by https://github.com/pola-rs/polars/issues/11140 + # "var", blocked by https://github.com/pola-rs/polars/issues/11140 + "median", + ] + ), +) +def test_rolling_aggs( + window_size: str, + closed: ClosedInterval, + data: st.DataObject, + time_unit: TimeUnit, + aggregation: str, +) -> None: + # Check: + # - that we get the same results whether we sort the data beforehand, + # or whether polars sorts it for us under-the-hood + # - that even if polars temporarily sorts the data under-the-hood, the + # order that the user passed the data in is restored + assume(window_size != "") + dataframe = data.draw( + dataframes( + [ + column("ts", dtype=pl.Datetime(time_unit)), + column("value", dtype=pl.Int64), + ], + ) + ) + # take unique because of https://github.com/pola-rs/polars/issues/11150 + df = dataframe.unique("ts") + func = f"rolling_{aggregation}" + try: + result = df.with_columns( + getattr(pl.col("value"), func)( + window_size=window_size, by="ts", closed=closed, warn_if_unsorted=False + ) + ) + except pl.exceptions.PolarsPanicError as exc: + assert any( # noqa: PT017 + msg in str(exc) + for msg in ( + "attempt to multiply with overflow", + "attempt to add with overflow", + ) + ) + reject() + + expected = ( + df.with_row_count("index") + .sort("ts") + .with_columns( + getattr(pl.col("value"), func)( + window_size=window_size, by="ts", closed=closed + ), + "index", + ) + .sort("index") + .drop("index") + ) + assert_frame_equal(result, expected) + assert_series_equal(result["ts"], df["ts"]) + + expected_dict: dict[str, list[object]] = {"ts": [], "value": []} + for ts, _ in df.iter_rows(): + window = df.filter( + pl.col("ts").is_between( + pl.lit(ts, dtype=pl.Datetime(time_unit)).dt.offset_by( + f"-{window_size}" + ), + pl.lit(ts, dtype=pl.Datetime(time_unit)), + closed=closed, + ) + ) + expected_dict["ts"].append(ts) + if window.is_empty(): + expected_dict["value"].append(None) + else: + value = getattr(window["value"], aggregation)() + expected_dict["value"].append(value) + expected = pl.DataFrame(expected_dict).select( + pl.col("ts").cast(pl.Datetime(time_unit)), + pl.col("value").cast(result["value"].dtype), + ) + assert_frame_equal(result, expected) diff --git a/py-polars/tests/unit/operations/rolling/test_rolling.py b/py-polars/tests/unit/operations/rolling/test_rolling.py index 27f57e660a7c1..cdf6d845de581 100644 --- a/py-polars/tests/unit/operations/rolling/test_rolling.py +++ b/py-polars/tests/unit/operations/rolling/test_rolling.py @@ -835,18 +835,18 @@ def test_rolling_weighted_quantile_10031() -> None: ) -def test_rolling_aggregations_unsorted_raise_10991() -> None: +def test_rolling_aggregations_unsorted_temporarily_sorts_10991() -> None: df = pl.DataFrame( { "dt": [datetime(2020, 1, 3), datetime(2020, 1, 1), datetime(2020, 1, 2)], "val": [1, 2, 3], } ) - with pytest.raises( - pl.InvalidOperationError, - match="argument in operation 'rolling_sum' is not explicitly sorted", - ): - df.with_columns(roll=pl.col("val").rolling_sum("2d", by="dt", closed="right")) + result = df.with_columns( + roll=pl.col("val").rolling_sum("2d", by="dt", closed="right") + ) + expected = df.with_columns(roll=pl.Series([4, 2, 5])) + assert_frame_equal(result, expected) def test_rolling() -> None: