diff --git a/crates/polars-plan/src/dsl/mod.rs b/crates/polars-plan/src/dsl/mod.rs index a16fd25358bbf..7fa540ddb249c 100644 --- a/crates/polars-plan/src/dsl/mod.rs +++ b/crates/polars-plan/src/dsl/mod.rs @@ -1230,19 +1230,48 @@ impl Expr { move |s| { let mut by = s[1].clone(); by = by.rechunk(); - let s = &s[0]; + let series: Series; polars_ensure!( options.weights.is_none(), ComputeError: "`weights` is not supported in 'rolling by' expression" ); - let (by, tz) = match by.dtype() { + let (mut by, tz) = match by.dtype() { DataType::Datetime(tu, tz) => { (by.cast(&DataType::Datetime(*tu, None))?, tz) }, _ => (by.clone(), &None), }; - ensure_sorted_arg(&by, expr_name)?; + let sorting_indices; + let original_indices; + let by_flag = by.is_sorted_flag(); + if matches!(by_flag, IsSorted::Ascending) { + series = s[0].clone(); + original_indices = None; + } + else if matches!(by_flag, IsSorted::Descending) { + series = s[0].reverse(); + by = by.reverse(); + original_indices = None; + } + else { + if options.warn_if_unsorted { + eprintln!( + "PolarsPerformanceWarning: Series is not known to be \ + sorted by `by` column, so Polars is temporarily \ + sorting it for you.\n\ + You can silence this warning by:\n\ + - passing `warn_if_unsorted=False`;\n\ + - sorting your data by your `by` column beforehand;\n\ + - setting `.set_sorted()` if you already know your data is sorted\n\ + before passing it to the rolling aggregation function" + ); + } + sorting_indices = by.arg_sort(Default::default()); + by = by.take(&sorting_indices)?; + series = s[0].take(&sorting_indices)?; + original_indices = UInt32Chunked::from_iter_values("", 0..s[0].len() as u32).into_series().take(&sorting_indices).ok(); + }; let by = by.datetime().unwrap(); let by_values = by.cont_slice().map_err(|_| { polars_err!( @@ -1264,7 +1293,13 @@ impl Expr { fn_params: options.fn_params.clone(), }; - rolling_fn(s, options).map(Some) + if matches!(by_flag, IsSorted::Ascending) { + rolling_fn(&series, options).map(Some) + } else if matches!(by_flag, IsSorted::Descending) { + Ok(rolling_fn(&series, options)?.reverse()).map(Some) + } else { + rolling_fn(&series, options)?.take(&original_indices.unwrap().arg_sort(Default::default())).map(Some) + } }, &[col(by)], output_type, diff --git a/crates/polars-time/src/chunkedarray/rolling_window/mod.rs b/crates/polars-time/src/chunkedarray/rolling_window/mod.rs index dbb3e07d18e67..15208983ed982 100644 --- a/crates/polars-time/src/chunkedarray/rolling_window/mod.rs +++ b/crates/polars-time/src/chunkedarray/rolling_window/mod.rs @@ -38,6 +38,8 @@ pub struct RollingOptions { pub closed_window: Option, /// Optional parameters for the rolling function pub fn_params: DynArgs, + /// Warn if data is not known to be sorted by `by` column (if passed) + pub warn_if_unsorted: bool, } #[cfg(feature = "rolling_window")] @@ -51,6 +53,7 @@ impl Default for RollingOptions { by: None, closed_window: None, fn_params: None, + warn_if_unsorted: true, } } } diff --git a/py-polars/polars/expr/expr.py b/py-polars/polars/expr/expr.py index c1fc424b91bdf..5c7e315d1e879 100644 --- a/py-polars/polars/expr/expr.py +++ b/py-polars/polars/expr/expr.py @@ -5279,6 +5279,7 @@ def rolling_min( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Apply a rolling min (moving min) over the values in this array. @@ -5350,6 +5351,8 @@ def rolling_min( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -5475,7 +5478,7 @@ def rolling_min( ) return self._from_pyexpr( self._pyexpr.rolling_min( - window_size, weights, min_periods, center, by, closed + window_size, weights, min_periods, center, by, closed, warn_if_unsorted ) ) @@ -5489,6 +5492,7 @@ def rolling_max( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Apply a rolling max (moving max) over the values in this array. @@ -5556,6 +5560,8 @@ def rolling_max( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -5708,7 +5714,7 @@ def rolling_max( ) return self._from_pyexpr( self._pyexpr.rolling_max( - window_size, weights, min_periods, center, by, closed + window_size, weights, min_periods, center, by, closed, warn_if_unsorted ) ) @@ -5722,6 +5728,7 @@ def rolling_mean( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Apply a rolling mean (moving mean) over the values in this array. @@ -5793,6 +5800,8 @@ def rolling_mean( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -5945,7 +5954,13 @@ def rolling_mean( ) return self._from_pyexpr( self._pyexpr.rolling_mean( - window_size, weights, min_periods, center, by, closed + window_size, + weights, + min_periods, + center, + by, + closed, + warn_if_unsorted, ) ) @@ -5959,6 +5974,7 @@ def rolling_sum( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Apply a rolling sum (moving sum) over the values in this array. @@ -6026,6 +6042,8 @@ def rolling_sum( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -6178,7 +6196,7 @@ def rolling_sum( ) return self._from_pyexpr( self._pyexpr.rolling_sum( - window_size, weights, min_periods, center, by, closed + window_size, weights, min_periods, center, by, closed, warn_if_unsorted ) ) @@ -6193,6 +6211,7 @@ def rolling_std( by: str | None = None, closed: ClosedInterval = "left", ddof: int = 1, + warn_if_unsorted: bool = True, ) -> Self: """ Compute a rolling standard deviation. @@ -6262,6 +6281,8 @@ def rolling_std( applicable if `by` has been set. ddof "Delta Degrees of Freedom": The divisor for a length N window is N - ddof + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -6414,7 +6435,14 @@ def rolling_std( ) return self._from_pyexpr( self._pyexpr.rolling_std( - window_size, weights, min_periods, center, by, closed, ddof + window_size, + weights, + min_periods, + center, + by, + closed, + ddof, + warn_if_unsorted, ) ) @@ -6429,6 +6457,7 @@ def rolling_var( by: str | None = None, closed: ClosedInterval = "left", ddof: int = 1, + warn_if_unsorted: bool = True, ) -> Self: """ Compute a rolling variance. @@ -6498,6 +6527,8 @@ def rolling_var( applicable if `by` has been set. ddof "Delta Degrees of Freedom": The divisor for a length N window is N - ddof + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -6657,6 +6688,7 @@ def rolling_var( by, closed, ddof, + warn_if_unsorted, ) ) @@ -6670,6 +6702,7 @@ def rolling_median( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Compute a rolling median. @@ -6737,6 +6770,8 @@ def rolling_median( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -6815,7 +6850,7 @@ def rolling_median( ) return self._from_pyexpr( self._pyexpr.rolling_median( - window_size, weights, min_periods, center, by, closed + window_size, weights, min_periods, center, by, closed, warn_if_unsorted ) ) @@ -6831,6 +6866,7 @@ def rolling_quantile( center: bool = False, by: str | None = None, closed: ClosedInterval = "left", + warn_if_unsorted: bool = True, ) -> Self: """ Compute a rolling quantile. @@ -6902,6 +6938,8 @@ def rolling_quantile( closed : {'left', 'right', 'both', 'none'} Define which sides of the temporal interval are closed (inclusive); only applicable if `by` has been set. + warn_if_unsorted + Warn if data is not known to be sorted by `by` column (if passed). Warnings -------- @@ -7016,6 +7054,7 @@ def rolling_quantile( center, by, closed, + warn_if_unsorted, ) ) diff --git a/py-polars/src/expr/rolling.rs b/py-polars/src/expr/rolling.rs index dbce0d294203b..422ec97767eb6 100644 --- a/py-polars/src/expr/rolling.rs +++ b/py-polars/src/expr/rolling.rs @@ -11,7 +11,7 @@ use crate::{PyExpr, PySeries}; #[pymethods] impl PyExpr { - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] fn rolling_sum( &self, window_size: &str, @@ -20,6 +20,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -28,12 +29,13 @@ impl PyExpr { center, by, closed_window: closed.map(|c| c.0), + warn_if_unsorted, ..Default::default() }; self.inner.clone().rolling_sum(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] fn rolling_min( &self, window_size: &str, @@ -42,6 +44,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -50,12 +53,13 @@ impl PyExpr { center, by, closed_window: closed.map(|c| c.0), + warn_if_unsorted, ..Default::default() }; self.inner.clone().rolling_min(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] fn rolling_max( &self, window_size: &str, @@ -64,6 +68,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -72,12 +77,13 @@ impl PyExpr { center, by, closed_window: closed.map(|c| c.0), + warn_if_unsorted, ..Default::default() }; self.inner.clone().rolling_max(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] fn rolling_mean( &self, window_size: &str, @@ -86,6 +92,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -94,13 +101,14 @@ impl PyExpr { center, by, closed_window: closed.map(|c| c.0), + warn_if_unsorted, ..Default::default() }; self.inner.clone().rolling_mean(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, ddof))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, ddof, warn_if_unsorted))] #[allow(clippy::too_many_arguments)] fn rolling_std( &self, @@ -111,6 +119,7 @@ impl PyExpr { by: Option, closed: Option>, ddof: u8, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -120,12 +129,14 @@ impl PyExpr { by, closed_window: closed.map(|c| c.0), fn_params: Some(Arc::new(RollingVarParams { ddof }) as Arc), + warn_if_unsorted, + ..Default::default() }; self.inner.clone().rolling_std(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, ddof))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, ddof, warn_if_unsorted))] #[allow(clippy::too_many_arguments)] fn rolling_var( &self, @@ -136,6 +147,7 @@ impl PyExpr { by: Option, closed: Option>, ddof: u8, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -145,12 +157,13 @@ impl PyExpr { by, closed_window: closed.map(|c| c.0), fn_params: Some(Arc::new(RollingVarParams { ddof }) as Arc), + warn_if_unsorted, }; self.inner.clone().rolling_var(options).into() } - #[pyo3(signature = (window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] fn rolling_median( &self, window_size: &str, @@ -159,6 +172,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -171,11 +185,12 @@ impl PyExpr { prob: 0.5, interpol: QuantileInterpolOptions::Linear, }) as Arc), + warn_if_unsorted, }; self.inner.clone().rolling_quantile(options).into() } - #[pyo3(signature = (quantile, interpolation, window_size, weights, min_periods, center, by, closed))] + #[pyo3(signature = (quantile, interpolation, window_size, weights, min_periods, center, by, closed, warn_if_unsorted))] #[allow(clippy::too_many_arguments)] fn rolling_quantile( &self, @@ -187,6 +202,7 @@ impl PyExpr { center: bool, by: Option, closed: Option>, + warn_if_unsorted: bool, ) -> Self { let options = RollingOptions { window_size: Duration::parse(window_size), @@ -199,6 +215,7 @@ impl PyExpr { prob: quantile, interpol: interpolation.0, }) as Arc), + warn_if_unsorted, }; self.inner.clone().rolling_quantile(options).into() diff --git a/py-polars/tests/parametric/test_groupby_rolling.py b/py-polars/tests/parametric/test_groupby_rolling.py deleted file mode 100644 index c4c62b36a2500..0000000000000 --- a/py-polars/tests/parametric/test_groupby_rolling.py +++ /dev/null @@ -1,77 +0,0 @@ -from __future__ import annotations - -from datetime import timedelta -from typing import TYPE_CHECKING - -import hypothesis.strategies as st -from hypothesis import assume, given, reject - -import polars as pl -from polars.testing import assert_frame_equal -from polars.testing.parametric.primitives import column, dataframes -from polars.testing.parametric.strategies import strategy_closed, strategy_time_unit -from polars.utils.convert import _timedelta_to_pl_duration - -if TYPE_CHECKING: - from polars.type_aliases import ClosedInterval, TimeUnit - - -@given( - period=st.timedeltas(min_value=timedelta(microseconds=0)).map( - _timedelta_to_pl_duration - ), - offset=st.timedeltas().map(_timedelta_to_pl_duration), - closed=strategy_closed, - data=st.data(), - time_unit=strategy_time_unit, -) -def test_group_by_rolling( - period: str, - offset: str, - closed: ClosedInterval, - data: st.DataObject, - time_unit: TimeUnit, -) -> None: - assume(period != "") - dataframe = data.draw( - dataframes( - [ - column("ts", dtype=pl.Datetime(time_unit)), - column("value", dtype=pl.Int64), - ], - ) - ) - df = dataframe.sort("ts").unique("ts") - try: - result = df.group_by_rolling( - "ts", period=period, offset=offset, closed=closed - ).agg(pl.col("value")) - except pl.exceptions.PolarsPanicError as exc: - assert any( # noqa: PT017 - msg in str(exc) - for msg in ( - "attempt to multiply with overflow", - "attempt to add with overflow", - ) - ) - reject() - - expected_dict: dict[str, list[object]] = {"ts": [], "value": []} - for ts, _ in df.iter_rows(): - window = df.filter( - pl.col("ts").is_between( - pl.lit(ts, dtype=pl.Datetime(time_unit)).dt.offset_by(offset), - pl.lit(ts, dtype=pl.Datetime(time_unit)) - .dt.offset_by(offset) - .dt.offset_by(period), - closed=closed, - ) - ) - value = window["value"].to_list() - expected_dict["ts"].append(ts) - expected_dict["value"].append(value) - expected = pl.DataFrame(expected_dict).select( - pl.col("ts").cast(pl.Datetime(time_unit)), - pl.col("value").cast(pl.List(pl.Int64)), - ) - assert_frame_equal(result, expected) diff --git a/py-polars/tests/unit/operations/rolling/test_rolling.py b/py-polars/tests/unit/operations/rolling/test_rolling.py index 27f57e660a7c1..cdf6d845de581 100644 --- a/py-polars/tests/unit/operations/rolling/test_rolling.py +++ b/py-polars/tests/unit/operations/rolling/test_rolling.py @@ -835,18 +835,18 @@ def test_rolling_weighted_quantile_10031() -> None: ) -def test_rolling_aggregations_unsorted_raise_10991() -> None: +def test_rolling_aggregations_unsorted_temporarily_sorts_10991() -> None: df = pl.DataFrame( { "dt": [datetime(2020, 1, 3), datetime(2020, 1, 1), datetime(2020, 1, 2)], "val": [1, 2, 3], } ) - with pytest.raises( - pl.InvalidOperationError, - match="argument in operation 'rolling_sum' is not explicitly sorted", - ): - df.with_columns(roll=pl.col("val").rolling_sum("2d", by="dt", closed="right")) + result = df.with_columns( + roll=pl.col("val").rolling_sum("2d", by="dt", closed="right") + ) + expected = df.with_columns(roll=pl.Series([4, 2, 5])) + assert_frame_equal(result, expected) def test_rolling() -> None: