diff --git a/puffin/__init__.py b/puffin/__init__.py new file mode 100644 index 000000000..11960d81b --- /dev/null +++ b/puffin/__init__.py @@ -0,0 +1,8 @@ +from puffin import containers +from puffin.translate import get_namespace +from puffin.translate import to_original_object +from puffin.translate import to_polars_api + +__version__ = "0.2.6" + +__all__ = ["to_polars_api", "to_original_object", "get_namespace", "containers"] diff --git a/puffin/containers.py b/puffin/containers.py new file mode 100644 index 000000000..c4311a9c4 --- /dev/null +++ b/puffin/containers.py @@ -0,0 +1,102 @@ +try: + import polars as pl +except ModuleNotFoundError: + POLARS_AVAILABLE = False + pl = object # type: ignore[assignment] +else: + POLARS_AVAILABLE = True +try: + import pandas as pd +except ModuleNotFoundError: + PANDAS_AVAILABLE = False + pd = object +else: + PANDAS_AVAILABLE = True +try: + import cudf +except ModuleNotFoundError: + CUDF_AVAILABLE = False + cudf = object +else: + CUDF_AVAILABLE = True +try: + import modin.pandas as mpd +except ModuleNotFoundError: + MODIN_AVAILABLE = False + mpd = object +else: + MODIN_AVAILABLE = True + + +from typing import Any + + +def is_lazyframe(obj: Any) -> bool: + if hasattr(obj, "__lazyframe_namespace__"): + return True + if POLARS_AVAILABLE and isinstance( + obj, (pl.DataFrame, pl.LazyFrame, pl.Expr, pl.Series) + ): + return isinstance(obj, pl.LazyFrame) + return False + + +def is_dataframe(obj: Any) -> bool: + if hasattr(obj, "__dataframe_namespace__"): + return True + if POLARS_AVAILABLE and isinstance( + obj, (pl.DataFrame, pl.LazyFrame, pl.Expr, pl.Series) + ): + return isinstance(obj, pl.DataFrame) + return False + + +def is_expr(obj: Any) -> bool: + if hasattr(obj, "__expr_namespace__"): + return True + if POLARS_AVAILABLE and isinstance( + obj, (pl.DataFrame, pl.LazyFrame, pl.Expr, pl.Series) + ): + return isinstance(obj, pl.Expr) + return False + + +def is_series(obj: Any) -> bool: + if hasattr(obj, "__series_namespace__"): + return True + if POLARS_AVAILABLE and isinstance( + obj, (pl.DataFrame, pl.LazyFrame, pl.Expr, pl.Series) + ): + return isinstance(obj, pl.Series) + return False + + +def get_implementation(obj: Any) -> str: + if POLARS_AVAILABLE and isinstance( + obj, (pl.DataFrame, pl.LazyFrame, pl.Expr, pl.Series) + ): + return "polars" + if PANDAS_AVAILABLE and isinstance(obj, (pd.DataFrame, pd.Series)): + return "pandas" + if CUDF_AVAILABLE and isinstance(obj, (cudf.DataFrame, cudf.Series)): + return "cudf" + if MODIN_AVAILABLE and isinstance(obj, mpd.DataFrame): + return "modin" + msg = f"Unknown implementation: {obj}" + raise TypeError(msg) + + +def is_pandas(obj: Any) -> bool: + return get_implementation(obj) == "pandas" + + +def is_polars(obj: Any) -> bool: + return get_implementation(obj) == "polars" + + +def is_cudf(obj: Any) -> bool: + return get_implementation(obj) == "cudf" + + +def is_modin(obj: Any) -> bool: + return get_implementation(obj) == "modin" diff --git a/puffin/pandas_like/__init__.py b/puffin/pandas_like/__init__.py new file mode 100644 index 000000000..ea64c0adc --- /dev/null +++ b/puffin/pandas_like/__init__.py @@ -0,0 +1,412 @@ +from __future__ import annotations + +from functools import reduce +from typing import Any +from typing import Callable +from typing import Iterable + +from puffin.pandas_like.column_object import Series +from puffin.pandas_like.dataframe_object import DataFrame +from puffin.pandas_like.dataframe_object import LazyFrame +from puffin.spec import AnyDataFrame +from puffin.spec import DataFrame as DataFrameT +from puffin.spec import Expr as ExprT +from puffin.spec import ExprStringNamespace as ExprStringNamespaceT +from puffin.spec import IntoExpr +from puffin.spec import LazyFrame as LazyFrameT +from puffin.spec import Namespace as NamespaceT +from puffin.spec import Series as SeriesT +from puffin.utils import flatten_str +from puffin.utils import horizontal_concat +from puffin.utils import parse_into_exprs +from puffin.utils import register_expression_call +from puffin.utils import series_from_iterable + + +def translate( + df: Any, + implementation: str, + api_version: str, +) -> tuple[LazyFrameT, NamespaceT]: + df = LazyFrame( + df, + api_version=api_version, + implementation=implementation, + ) + return df, df.__lazyframe_namespace__() + + +class Namespace(NamespaceT): + def __init__(self, *, api_version: str, implementation: str) -> None: + self.__dataframeapi_version__ = api_version + self.api_version = api_version + self._implementation = implementation + + # --- horizontal reductions + def sum_horizontal(self, *exprs: IntoExpr | Iterable[IntoExpr]) -> ExprT: + return reduce(lambda x, y: x + y, parse_into_exprs(self, *exprs)) + + def all_horizontal(self, *exprs: IntoExpr | Iterable[IntoExpr]) -> ExprT: + return reduce(lambda x, y: x & y, parse_into_exprs(self, *exprs)) + + def any_horizontal(self, *exprs: IntoExpr | Iterable[IntoExpr]) -> ExprT: + return reduce(lambda x, y: x | y, parse_into_exprs(self, *exprs)) + + def concat(self, items: Iterable[AnyDataFrame], *, how: str) -> AnyDataFrame: + dfs: list[Any] = [] + kind: Any = {} + for df in items: + dfs.append(df.dataframe) # type: ignore[union-attr, attr-defined] + kind.append(type(df)) + if len(kind) > 1: + msg = "Can only concat DataFrames or LazyFrames, not mixtures of the two" + raise TypeError(msg) + if how != "horizontal": + msg = "Only horizontal concatenation is supported for now" + raise TypeError(msg) + if kind[0] is DataFrame: + return DataFrame( # type: ignore[return-value] + horizontal_concat(dfs, implementation=self._implementation), + api_version=self.api_version, + implementation=self._implementation, + ) + return LazyFrame( # type: ignore[return-value] + horizontal_concat(dfs, implementation=self._implementation), + api_version=self.api_version, + implementation=self._implementation, + ) + + def col(self, *column_names: str | Iterable[str]) -> ExprT: + return Expr.from_column_names( + *flatten_str(*column_names), implementation=self._implementation + ) + + def sum(self, *column_names: str) -> ExprT: + return Expr.from_column_names( + *column_names, implementation=self._implementation + ).sum() + + def mean(self, *column_names: str) -> ExprT: + return Expr.from_column_names( + *column_names, implementation=self._implementation + ).mean() + + def max(self, *column_names: str) -> ExprT: + return Expr.from_column_names( + *column_names, implementation=self._implementation + ).max() + + def min(self, *column_names: str) -> ExprT: + return Expr.from_column_names( + *column_names, implementation=self._implementation + ).min() + + def len(self) -> ExprT: + return Expr( + lambda df: [ + Series( + series_from_iterable( + [len(df.dataframe)], # type: ignore[union-attr] + name="len", + index=[0], + implementation=self._implementation, + ), + api_version=df.api_version, # type: ignore[union-attr] + implementation=self._implementation, + ), + ], + depth=0, + function_name="len", + root_names=None, + output_names=["len"], # todo: check this + implementation=self._implementation, + ) + + def _create_expr_from_callable( # noqa: PLR0913 + self, + func: Callable[[DataFrameT | LazyFrameT], list[SeriesT]], + *, + depth: int, + function_name: str | None, + root_names: list[str] | None, + output_names: list[str] | None, + ) -> ExprT: + return Expr( + func, + depth=depth, + function_name=function_name, + root_names=root_names, + output_names=output_names, + implementation=self._implementation, + ) + + def _create_series_from_scalar(self, value: Any, series: SeriesT) -> SeriesT: + return Series( + series_from_iterable( + [value], + name=series.series.name, # type: ignore[attr-defined] + index=series.series.index[0:1], # type: ignore[attr-defined] + implementation=self._implementation, + ), + api_version=self.api_version, + implementation=self._implementation, + ) + + def _create_expr_from_series(self, series: SeriesT) -> ExprT: + return Expr( + lambda _df: [series], + depth=0, + function_name="from_series", + root_names=None, + output_names=None, + implementation=self._implementation, + ) + + def all(self) -> ExprT: + return Expr( + lambda df: [ + Series( + df.dataframe.loc[:, column_name], # type: ignore[union-attr] + api_version=df.api_version, # type: ignore[union-attr] + implementation=self._implementation, + ) + for column_name in df.columns + ], + depth=0, + function_name="all", + root_names=None, + output_names=None, + implementation=self._implementation, + ) + + +class Expr(ExprT): + def __init__( # noqa: PLR0913 + self, + call: Callable[[DataFrameT | LazyFrameT], list[SeriesT]], + *, + depth: int | None, + function_name: str | None, + root_names: list[str] | None, + output_names: list[str] | None, + implementation: str, + ) -> None: + self.call = call + self.api_version = "0.20.0" # todo + self._depth = depth + self._function_name = function_name + self._root_names = root_names + self._depth = depth + self._output_names = output_names + self._implementation = implementation + + def __repr__(self) -> str: + return ( + f"Expr(" + f"depth={self._depth}, " + f"function_name={self._function_name}, " + f"root_names={self._root_names}, " + f"output_names={self._output_names}" + ) + + @classmethod + def from_column_names( + cls: type[Expr], *column_names: str, implementation: str + ) -> ExprT: + return cls( + lambda df: [ + Series( + df.dataframe.loc[:, column_name], # type: ignore[union-attr] + api_version=df.api_version, # type: ignore[union-attr] # type: ignore[union-attr] + implementation=implementation, + ) + for column_name in column_names + ], + depth=0, + function_name=None, + root_names=list(column_names), + output_names=list(column_names), + implementation=implementation, + ) + + def __expr_namespace__(self) -> Namespace: + return Namespace( + api_version="todo", + implementation=self._implementation, # type: ignore[attr-defined] + ) + + def __eq__(self, other: Expr | Any) -> ExprT: # type: ignore[override] + return register_expression_call(self, "__eq__", other) + + def __ne__(self, other: Expr | Any) -> ExprT: # type: ignore[override] + return register_expression_call(self, "__ne__", other) + + def __ge__(self, other: Expr | Any) -> ExprT: + return register_expression_call(self, "__ge__", other) + + def __gt__(self, other: Expr | Any) -> ExprT: + return register_expression_call(self, "__gt__", other) + + def __le__(self, other: Expr | Any) -> ExprT: + return register_expression_call(self, "__le__", other) + + def __lt__(self, other: Expr | Any) -> ExprT: + return register_expression_call(self, "__lt__", other) + + def __and__(self, other: Expr | bool | Any) -> ExprT: + return register_expression_call(self, "__and__", other) + + def __rand__(self, other: Any) -> ExprT: + return register_expression_call(self, "__rand__", other) + + def __or__(self, other: Expr | bool | Any) -> ExprT: + return register_expression_call(self, "__or__", other) + + def __ror__(self, other: Any) -> ExprT: + return register_expression_call(self, "__ror__", other) + + def __add__(self, other: Expr | Any) -> ExprT: # type: ignore[override] + return register_expression_call(self, "__add__", other) + + def __radd__(self, other: Any) -> ExprT: + return register_expression_call(self, "__radd__", other) + + def __sub__(self, other: Expr | Any) -> ExprT: + return register_expression_call(self, "__sub__", other) + + def __rsub__(self, other: Any) -> ExprT: + return register_expression_call(self, "__rsub__", other) + + def __mul__(self, other: Expr | Any) -> ExprT: + return register_expression_call(self, "__mul__", other) + + def __rmul__(self, other: Any) -> ExprT: + return self.__mul__(other) + + def __truediv__(self, other: Expr | Any) -> ExprT: + return register_expression_call(self, "__truediv__", other) + + def __rtruediv__(self, other: Any) -> ExprT: + raise NotImplementedError + + def __floordiv__(self, other: Expr | Any) -> ExprT: + return register_expression_call(self, "__floordiv__", other) + + def __rfloordiv__(self, other: Any) -> ExprT: + raise NotImplementedError + + def __pow__(self, other: Expr | Any) -> ExprT: + return register_expression_call(self, "__pow__", other) + + def __rpow__(self, other: Any) -> ExprT: # pragma: no cover + raise NotImplementedError + + def __mod__(self, other: Expr | Any) -> ExprT: + return register_expression_call(self, "__mod__", other) + + def __rmod__(self, other: Any) -> ExprT: # pragma: no cover + raise NotImplementedError + + # Unary + + def __invert__(self) -> ExprT: + return register_expression_call(self, "__invert__") + + # Reductions + + def sum(self) -> ExprT: + return register_expression_call(self, "sum") + + def mean(self) -> ExprT: + return register_expression_call(self, "mean") + + def max(self) -> ExprT: + return register_expression_call(self, "max") + + def min(self) -> ExprT: + return register_expression_call(self, "min") + + # Other + def is_between( + self, lower_bound: Any, upper_bound: Any, closed: str = "both" + ) -> ExprT: + return register_expression_call( + self, "is_between", lower_bound, upper_bound, closed + ) + + def is_null(self) -> ExprT: + return register_expression_call(self, "is_null") + + def is_in(self, other: Any) -> ExprT: + return register_expression_call(self, "is_in", other) + + def drop_nulls(self) -> ExprT: + return register_expression_call(self, "drop_nulls") + + def n_unique(self) -> ExprT: + return register_expression_call(self, "n_unique") + + def unique(self) -> ExprT: + return register_expression_call(self, "unique") + + def sample(self, n: int, fraction: float, *, with_replacement: bool) -> ExprT: + return register_expression_call(self, "sample", n, fraction, with_replacement) + + def alias(self, name: str) -> ExprT: + # Define this one manually, so that we can + # override `output_names` + if self._depth is None: + msg = "Unreachable code, please report a bug" + raise AssertionError(msg) + return Expr( + lambda df: [series.alias(name) for series in self.call(df)], + depth=self._depth + 1, + function_name=self._function_name, + root_names=self._root_names, + output_names=[name], + implementation=self._implementation, + ) + + @property + def str(self) -> ExprStringNamespaceT: + return ExprStringNamespace(self) + + +class ExprStringNamespace(ExprStringNamespaceT): + def __init__(self, expr: ExprT) -> None: + self._expr = expr + + def ends_with(self, suffix: str) -> ExprT: + # TODO make a register_expression_call for namespaces + return Expr( + lambda df: [ + Series( + series.series.str.endswith(suffix), + api_version=df.api_version, # type: ignore[union-attr] + implementation=df._implementation, # type: ignore[union-attr] + ) + for series in self._expr.call(df) # type: ignore[attr-defined] + ], + depth=self._expr._depth + 1, # type: ignore[attr-defined] + function_name=self._expr._function_name, # type: ignore[attr-defined] + root_names=self._expr._root_names, # type: ignore[attr-defined] + output_names=self._expr._output_names, # type: ignore[attr-defined] + implementation=self._expr._implementation, # type: ignore[attr-defined] + ) + + def strip_chars(self, characters: str = " ") -> ExprT: + return Expr( + lambda df: [ + Series( + series.series.str.strip(characters), # type: ignore[attr-defined] + api_version=df.api_version, # type: ignore[union-attr] + implementation=df._implementation, # type: ignore[union-attr] + ) + for series in self._expr.call(df) # type: ignore[attr-defined] + ], + depth=self._expr._depth + 1, # type: ignore[attr-defined] + function_name=self._expr._function_name, # type: ignore[attr-defined] + root_names=self._expr._root_names, # type: ignore[attr-defined] + output_names=self._expr._output_names, # type: ignore[attr-defined] + implementation=self._expr._implementation, # type: ignore[attr-defined] + ) diff --git a/puffin/pandas_like/column_object.py b/puffin/pandas_like/column_object.py new file mode 100644 index 000000000..426b5fa27 --- /dev/null +++ b/puffin/pandas_like/column_object.py @@ -0,0 +1,309 @@ +from __future__ import annotations + +from typing import Any + +from pandas.api.types import is_extension_array_dtype + +import puffin +from puffin.spec import Series as SeriesT +from puffin.utils import item +from puffin.utils import validate_column_comparand + + +class Series(SeriesT): + def __init__( + self, + series: Any, + *, + api_version: str, + implementation: str, + ) -> None: + """Parameters + ---------- + df + DataFrame this column originates from. + """ + + self._name = series.name + assert self._name is not None + self._series = series.reset_index(drop=True) + self.api_version = api_version + self._implementation = implementation + + def __repr__(self) -> str: # pragma: no cover + header = f" Standard Column (api_version={self.api_version}) " + length = len(header) + return ( + "┌" + + "─" * length + + "┐\n" + + f"|{header}|\n" + + "| Add `.column` to see native output |\n" + + "└" + + "─" * length + + "┘\n" + ) + + def _from_series(self, series: Any) -> Series: + return Series( + series.rename(series.name, copy=False), + api_version=self.api_version, + implementation=self._implementation, + ) + + def __series_namespace__( + self, + ) -> puffin.pandas_like.Namespace: + return puffin.pandas_like.Namespace( + api_version=self.api_version, + implementation=self._implementation, + ) + + @property + def name(self) -> str: + return self._name # type: ignore[no-any-return] + + @property + def series(self) -> Any: + return self._series + + def filter(self, mask: Series) -> Series: + ser = self.series + return self._from_series(ser.loc[validate_column_comparand(mask)]) + + def item(self) -> Any: + return item(self.series) + + def is_between( + self, lower_bound: Any, upper_bound: Any, closed: str = "both" + ) -> Series: + ser = self.series + return self._from_series(ser.between(lower_bound, upper_bound, inclusive=closed)) + + def is_in(self, other: Any) -> Series: + ser = self.series + return self._from_series(ser.isin(other)) + + # Binary comparisons + + def __eq__(self, other: object) -> Series: # type: ignore[override] + other = validate_column_comparand(other) + ser = self.series + return self._from_series((ser == other).rename(ser.name, copy=False)) + + def __ne__(self, other: object) -> Series: # type: ignore[override] + other = validate_column_comparand(other) + ser = self.series + return self._from_series((ser != other).rename(ser.name, copy=False)) + + def __ge__(self, other: Any) -> Series: + other = validate_column_comparand(other) + ser = self.series + return self._from_series((ser >= other).rename(ser.name, copy=False)) + + def __gt__(self, other: Any) -> Series: + other = validate_column_comparand(other) + ser = self.series + return self._from_series((ser > other).rename(ser.name, copy=False)) + + def __le__(self, other: Any) -> Series: + other = validate_column_comparand(other) + ser = self.series + return self._from_series((ser <= other).rename(ser.name, copy=False)) + + def __lt__(self, other: Any) -> Series: + other = validate_column_comparand(other) + ser = self.series + return self._from_series((ser < other).rename(ser.name, copy=False)) + + def __and__(self, other: Any) -> Series: + ser = self.series + other = validate_column_comparand(other) + return self._from_series((ser & other).rename(ser.name, copy=False)) + + def __rand__(self, other: Any) -> Series: + return self.__and__(other) + + def __or__(self, other: Any) -> Series: + ser = self.series + other = validate_column_comparand(other) + return self._from_series((ser | other).rename(ser.name, copy=False)) + + def __ror__(self, other: Any) -> Series: + return self.__or__(other) + + def __add__(self, other: Any) -> Series: + ser = self.series + other = validate_column_comparand(other) + return self._from_series((ser + other).rename(ser.name, copy=False)) + + def __radd__(self, other: Any) -> Series: + return self.__add__(other) + + def __sub__(self, other: Any) -> Series: + ser = self.series + other = validate_column_comparand(other) + return self._from_series((ser - other).rename(ser.name, copy=False)) + + def __rsub__(self, other: Any) -> Series: + return -1 * self.__sub__(other) + + def __mul__(self, other: Any) -> Series: + ser = self.series + other = validate_column_comparand(other) + return self._from_series((ser * other).rename(ser.name, copy=False)) + + def __rmul__(self, other: Any) -> Series: + return self.__mul__(other) + + def __truediv__(self, other: Any) -> Series: + ser = self.series + other = validate_column_comparand(other) + return self._from_series((ser / other).rename(ser.name, copy=False)) + + def __rtruediv__(self, other: Any) -> Series: + raise NotImplementedError + + def __floordiv__(self, other: Any) -> Series: + ser = self.series + other = validate_column_comparand(other) + return self._from_series((ser // other).rename(ser.name, copy=False)) + + def __rfloordiv__(self, other: Any) -> Series: + raise NotImplementedError + + def __pow__(self, other: Any) -> Series: + ser = self.series + other = validate_column_comparand(other) + return self._from_series((ser**other).rename(ser.name, copy=False)) + + def __rpow__(self, other: Any) -> Series: # pragma: no cover + raise NotImplementedError + + def __mod__(self, other: Any) -> Series: + ser = self.series + other = validate_column_comparand(other) + return self._from_series((ser % other).rename(ser.name, copy=False)) + + def __rmod__(self, other: Any) -> Series: # pragma: no cover + raise NotImplementedError + + # Unary + + def __invert__(self: Series) -> Series: + ser = self.series + return self._from_series(~ser) + + # Reductions + + def any(self) -> Any: + ser = self.series + return ser.any() + + def all(self) -> Any: + ser = self.series + return ser.all() + + def min(self) -> Any: + ser = self.series + return ser.min() + + def max(self) -> Any: + ser = self.series + return ser.max() + + def sum(self) -> Any: + ser = self.series + return ser.sum() + + def prod(self) -> Any: + ser = self.series + return ser.prod() + + def median(self) -> Any: + ser = self.series + return ser.median() + + def mean(self) -> Any: + ser = self.series + return ser.mean() + + def std( + self, + *, + correction: float = 1.0, + ) -> Any: + ser = self.series + return ser.std(ddof=correction) + + def var( + self, + *, + correction: float = 1.0, + ) -> Any: + ser = self.series + return ser.var(ddof=correction) + + def len(self) -> Any: + return len(self._series) + + # Transformations + + def is_null(self) -> Series: + ser = self.series + return self._from_series(ser.isna()) + + def drop_nulls(self) -> Series: + ser = self.series + return self._from_series(ser.dropna()) + + def n_unique(self) -> int: + ser = self.series + return ser.nunique() + + def zip_with(self, mask: SeriesT, other: SeriesT) -> SeriesT: + ser = self.series + return self._from_series(ser.where(mask, other)) + + def sample(self, n: int, fraction: float, *, with_replacement: bool) -> Series: + ser = self.series + return self._from_series( + ser.sample(n=n, frac=fraction, with_replacement=with_replacement) + ) + + def unique(self) -> SeriesT: + ser = self.series + return ser.unique() + + def is_nan(self) -> Series: + ser = self.series + if is_extension_array_dtype(ser.dtype): + return self._from_series((ser != ser).fillna(False)) # noqa: PLR0124 + return self._from_series(ser.isna()) + + def sort( + self, + *, + descending: bool = True, + ) -> Series: + ser = self.series + return self._from_series( + ser.sort_values(ascending=not descending).rename(self.name) + ) + + def alias(self, name: str) -> Series: + ser = self.series + return self._from_series(ser.rename(name, copy=False)) + + def to_numpy(self) -> Any: + return self.series.to_numpy() + + def to_pandas(self) -> Any: + if self._implementation == "pandas": + return self.series + elif self._implementation == "cudf": + return self.series.to_pandas() + elif self._implementation == "modin": + return self.series._to_pandas() + msg = f"Unknown implementation: {self._implementation}" + raise TypeError(msg) diff --git a/puffin/pandas_like/dataframe_object.py b/puffin/pandas_like/dataframe_object.py new file mode 100644 index 000000000..4b7cbf19c --- /dev/null +++ b/puffin/pandas_like/dataframe_object.py @@ -0,0 +1,350 @@ +from __future__ import annotations + +import collections +from typing import TYPE_CHECKING +from typing import Any +from typing import Iterable +from typing import Literal + +import puffin +from puffin.spec import DataFrame as DataFrameT +from puffin.spec import GroupBy as GroupByT +from puffin.spec import IntoExpr +from puffin.spec import LazyFrame as LazyFrameT +from puffin.spec import LazyGroupBy as LazyGroupByT +from puffin.spec import Namespace as NamespaceT +from puffin.utils import evaluate_into_exprs +from puffin.utils import flatten_str +from puffin.utils import horizontal_concat +from puffin.utils import validate_dataframe_comparand + +if TYPE_CHECKING: + from collections.abc import Sequence + + +class DataFrame(DataFrameT): + """dataframe object""" + + def __init__( + self, + dataframe: Any, + *, + api_version: str, + implementation: str, + ) -> None: + self._validate_columns(dataframe.columns) + self._dataframe = dataframe.reset_index(drop=True) + self.api_version = api_version + self._implementation = implementation + + @property + def columns(self) -> list[str]: + return self.dataframe.columns.tolist() + + def __repr__(self) -> str: # pragma: no cover + header = f" Standard DataFrame (api_version={self.api_version}) " + length = len(header) + return ( + "┌" + + "─" * length + + "┐\n" + + f"|{header}|\n" + + "| Add `.dataframe` to see native output |\n" + + "└" + + "─" * length + + "┘\n" + ) + + def _validate_columns(self, columns: Sequence[str]) -> None: + counter = collections.Counter(columns) + for col, count in counter.items(): + if count > 1: + msg = f"Expected unique column names, got {col} {count} time(s)" + raise ValueError( + msg, + ) + + def _validate_booleanness(self) -> None: + if not ( + (self.dataframe.dtypes == "bool") | (self.dataframe.dtypes == "boolean") + ).all(): + msg = "'any' can only be called on DataFrame where all dtypes are 'bool'" + raise TypeError( + msg, + ) + + @property + def dataframe(self) -> Any: + return self._dataframe + + def __dataframe_namespace__( + self, + ) -> NamespaceT: + return puffin.pandas_like.Namespace( + api_version=self.api_version, + implementation=self._implementation, # type: ignore[attr-defined] + ) + + @property + def shape(self) -> tuple[int, int]: + return self.dataframe.shape # type: ignore[no-any-return] + + def group_by(self, *keys: str | Iterable[str]) -> GroupByT: + from puffin.pandas_like.group_by_object import GroupBy + + return GroupBy(self, flatten_str(*keys), api_version=self.api_version) + + def select( + self, + *exprs: IntoExpr | Iterable[IntoExpr], + **named_exprs: IntoExpr, + ) -> DataFrameT: + return self.lazy().select(*exprs, **named_exprs).collect() + + def filter( + self, + *predicates: IntoExpr | Iterable[IntoExpr], + ) -> DataFrameT: + return self.lazy().filter(*predicates).collect() + + def with_columns( + self, + *exprs: IntoExpr | Iterable[IntoExpr], + **named_exprs: IntoExpr, + ) -> DataFrameT: + return self.lazy().with_columns(*exprs, **named_exprs).collect() + + def sort( + self, + by: str | Iterable[str], + *more_by: str, + descending: bool | Iterable[bool] = False, + ) -> DataFrameT: + return self.lazy().sort(by, *more_by, descending=descending).collect() + + def join( + self, + other: DataFrameT, + *, + how: Literal["left", "inner", "outer"] = "inner", + left_on: str | list[str], + right_on: str | list[str], + ) -> DataFrameT: + return ( + self.lazy() + .join(other.lazy(), how=how, left_on=left_on, right_on=right_on) + .collect() + ) + + def lazy(self) -> LazyFrameT: + return LazyFrame( + self.dataframe, + api_version=self.api_version, + implementation=self._implementation, + ) + + def head(self, n: int) -> DataFrameT: + return self.lazy().head(n).collect() + + def unique(self, subset: list[str]) -> DataFrameT: + return self.lazy().unique(subset).collect() + + def rename(self, mapping: dict[str, str]) -> DataFrameT: + return self.lazy().rename(mapping).collect() + + def to_numpy(self) -> Any: + return self.dataframe.to_numpy() + + def to_pandas(self) -> Any: + if self._implementation == "pandas": + return self.dataframe + elif self._implementation == "cudf": + return self.dataframe.to_pandas() + elif self._implementation == "modin": + return self.dataframe._to_pandas() + msg = f"Unknown implementation: {self._implementation}" + raise TypeError(msg) + + +class LazyFrame(LazyFrameT): + """dataframe object""" + + def __init__( + self, + dataframe: Any, + *, + api_version: str, + implementation: str, + ) -> None: + self._validate_columns(dataframe.columns) + self._df = dataframe.reset_index(drop=True) + self.api_version = api_version + self._implementation = implementation + + @property + def columns(self) -> list[str]: + return self.dataframe.columns.tolist() + + def __repr__(self) -> str: # pragma: no cover + header = f" Standard DataFrame (api_version={self.api_version}) " + length = len(header) + return ( + "┌" + + "─" * length + + "┐\n" + + f"|{header}|\n" + + "| Add `.dataframe` to see native output |\n" + + "└" + + "─" * length + + "┘\n" + ) + + def _validate_columns(self, columns: Sequence[str]) -> None: + counter = collections.Counter(columns) + for col, count in counter.items(): + if count > 1: + msg = f"Expected unique column names, got {col} {count} time(s)" + raise ValueError( + msg, + ) + + def _validate_booleanness(self) -> None: + if not ( + (self.dataframe.dtypes == "bool") | (self.dataframe.dtypes == "boolean") + ).all(): + msg = "'any' can only be called on DataFrame where all dtypes are 'bool'" + raise TypeError( + msg, + ) + + def _from_dataframe(self, df: Any) -> LazyFrameT: + return LazyFrame( + df, + api_version=self.api_version, + implementation=self._implementation, + ) + + @property + def dataframe(self) -> Any: + return self._df + + def __lazyframe_namespace__( + self, + ) -> NamespaceT: + return puffin.pandas_like.Namespace( + api_version=self.api_version, + implementation=self._implementation, # type: ignore[attr-defined] + ) + + def group_by(self, *keys: str | Iterable[str]) -> LazyGroupByT: + from puffin.pandas_like.group_by_object import LazyGroupBy + + return LazyGroupBy(self, flatten_str(*keys), api_version=self.api_version) + + def select( + self, + *exprs: IntoExpr | Iterable[IntoExpr], + **named_exprs: IntoExpr, + ) -> LazyFrameT: + new_series = evaluate_into_exprs(self, *exprs, **named_exprs) + df = horizontal_concat( + [series.series for series in new_series], # type: ignore[attr-defined] + implementation=self._implementation, + ) + return self._from_dataframe(df) + + def filter( + self, + *predicates: IntoExpr | Iterable[IntoExpr], + ) -> LazyFrameT: + plx = self.__lazyframe_namespace__() + expr = plx.all_horizontal(*predicates) + # Safety: all_horizontal's expression only returns a single column. + mask = expr.call(self)[0] # type: ignore[attr-defined] + _mask = validate_dataframe_comparand(mask) + return self._from_dataframe(self.dataframe.loc[_mask]) + + def with_columns( + self, + *exprs: IntoExpr | Iterable[IntoExpr], + **named_exprs: IntoExpr, + ) -> LazyFrameT: + new_series = evaluate_into_exprs(self, *exprs, **named_exprs) + df = self.dataframe.assign( + **{ + series.name: series.series # type: ignore[attr-defined] + for series in new_series + } + ) + return self._from_dataframe(df) + + def sort( + self, + by: str | Iterable[str], + *more_by: str, + descending: bool | Iterable[bool] = False, + ) -> LazyFrameT: + flat_keys = flatten_str([*flatten_str(by), *more_by]) + if not flat_keys: + flat_keys = self.dataframe.columns.tolist() + df = self.dataframe + if isinstance(descending, bool): + ascending: bool | list[bool] = not descending + else: + ascending = [not d for d in descending] + return self._from_dataframe( + df.sort_values(flat_keys, ascending=ascending), + ) + + # Other + def join( + self, + other: LazyFrameT, + *, + how: Literal["left", "inner", "outer"] = "inner", + left_on: str | list[str], + right_on: str | list[str], + ) -> LazyFrameT: + if how not in ["inner"]: + msg = "Only inner join supported for now, others coming soon" + raise ValueError(msg) + + if isinstance(left_on, str): + left_on = [left_on] + if isinstance(right_on, str): + right_on = [right_on] + + if overlap := (set(self.columns) - set(left_on)).intersection( + set(other.columns) - set(right_on), + ): + msg = f"Found overlapping columns in join: {overlap}. Please rename columns to avoid this." + raise ValueError(msg) + + return self._from_dataframe( + self.dataframe.merge( + other.dataframe, # type: ignore[attr-defined] + left_on=left_on, + right_on=right_on, + how=how, + ), + ) + + # Conversion + def collect(self) -> DataFrameT: + return DataFrame( + self.dataframe, + api_version=self.api_version, + implementation=self._implementation, + ) + + def cache(self) -> LazyFrameT: + return self + + def head(self, n: int) -> LazyFrameT: + return self._from_dataframe(self.dataframe.head(n)) + + def unique(self, subset: list[str]) -> LazyFrameT: + return self._from_dataframe(self.dataframe.drop_duplicates(subset=subset)) + + def rename(self, mapping: dict[str, str]) -> LazyFrameT: + return self._from_dataframe(self.dataframe.rename(columns=mapping)) diff --git a/puffin/pandas_like/expr.py b/puffin/pandas_like/expr.py new file mode 100644 index 000000000..e69de29bb diff --git a/puffin/pandas_like/group_by_object.py b/puffin/pandas_like/group_by_object.py new file mode 100644 index 000000000..a1b09292a --- /dev/null +++ b/puffin/pandas_like/group_by_object.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +import collections +from typing import Any +from typing import Iterable + +from puffin.pandas_like.dataframe_object import LazyFrame +from puffin.spec import DataFrame as DataFrameT +from puffin.spec import GroupBy as GroupByT +from puffin.spec import IntoExpr +from puffin.spec import LazyFrame as LazyFrameT +from puffin.spec import LazyGroupBy as LazyGroupByT +from puffin.utils import dataframe_from_dict +from puffin.utils import evaluate_simple_aggregation +from puffin.utils import get_namespace +from puffin.utils import horizontal_concat +from puffin.utils import is_simple_aggregation +from puffin.utils import parse_into_exprs + + +class GroupBy(GroupByT): + def __init__(self, df: DataFrameT, keys: list[str], api_version: str) -> None: + self._df = df + self._keys = list(keys) + self.api_version = api_version + + def agg( + self, + *aggs: IntoExpr | Iterable[IntoExpr], + **named_aggs: IntoExpr, + ) -> DataFrameT: + return ( + LazyGroupBy(self._df.lazy(), self._keys, self.api_version) + .agg(*aggs, **named_aggs) + .collect() + ) + + +class LazyGroupBy(LazyGroupByT): + def __init__(self, df: LazyFrameT, keys: list[str], api_version: str) -> None: + self._df = df + self._keys = list(keys) + self.api_version = api_version + + def agg( + self, + *aggs: IntoExpr | Iterable[IntoExpr], + **named_aggs: IntoExpr, + ) -> LazyFrameT: + df = self._df.dataframe # type: ignore[attr-defined] + exprs = parse_into_exprs( + get_namespace(self._df), + *aggs, + **named_aggs, + ) + grouped = df.groupby( + list(self._keys), + sort=False, + as_index=False, + ) + implementation: str = self._df._implementation # type: ignore[attr-defined] + output_names: list[str] = self._keys + for expr in exprs: + expr_output_names = expr._output_names # type: ignore[attr-defined] + if expr_output_names is None: + msg = ( + "Anonymous expressions are not supported in group_by.agg.\n" + "Instead of `pl.all()`, try using a named expression, such as " + "`pl.col('a', 'b')`\n" + ) + raise ValueError(msg) + output_names.extend(expr_output_names) + + dfs: list[Any] = [] + to_remove: list[int] = [] + for i, expr in enumerate(exprs): + if is_simple_aggregation(expr): + dfs.append(evaluate_simple_aggregation(expr, grouped)) + to_remove.append(i) + exprs = [expr for i, expr in enumerate(exprs) if i not in to_remove] + + out: dict[str, list[Any]] = collections.defaultdict(list) + for keys, df_keys in grouped: + for key, name in zip(keys, self._keys): + out[name].append(key) + for expr in exprs: + # TODO: it might be better to use groupby(...).apply + # in this case, but I couldn't get the multi-output + # case to work for cuDF. + results_keys = expr.call( # type: ignore[attr-defined] + LazyFrame( + df_keys, + api_version=self.api_version, + implementation=implementation, + ) + ) + for result_keys in results_keys: + out[result_keys.name].append(result_keys.item()) + + results_keys = dataframe_from_dict(out, implementation=implementation) + results_keys = horizontal_concat( + [results_keys, *dfs], implementation=implementation + ).loc[:, output_names] + return LazyFrame( + results_keys, + api_version=self.api_version, + implementation=self._df._implementation, # type: ignore[attr-defined] + ) diff --git a/puffin/spec/__init__.py b/puffin/spec/__init__.py new file mode 100644 index 000000000..556af4943 --- /dev/null +++ b/puffin/spec/__init__.py @@ -0,0 +1,298 @@ +from __future__ import annotations + +from typing import Any +from typing import Iterable +from typing import Literal +from typing import Protocol +from typing import TypeVar + +from typing_extensions import Self + + +class Expr(Protocol): + def alias(self, name: str) -> Expr: + ... + + def __and__(self, other: Any) -> Expr: + ... + + def __or__(self, other: Any) -> Expr: + ... + + def __add__(self, other: Any) -> Expr: + ... + + def __radd__(self, other: Any) -> Expr: + ... + + def __sub__(self, other: Any) -> Expr: + ... + + def __rsub__(self, other: Any) -> Expr: + ... + + def __mul__(self, other: Any) -> Expr: + ... + + def __rmul__(self, other: Any) -> Expr: + ... + + def __le__(self, other: Any) -> Expr: + ... + + def __lt__(self, other: Any) -> Expr: + ... + + def __gt__(self, other: Any) -> Expr: + ... + + def __ge__(self, other: Any) -> Expr: + ... + + def mean(self) -> Expr: + ... + + def sum(self) -> Expr: + ... + + def min(self) -> Expr: + ... + + def max(self) -> Expr: + ... + + def is_between( + self, lower_bound: Any, upper_bound: Any, closed: str = "both" + ) -> Expr: + ... + + def is_in(self, other: Any) -> Expr: + ... + + def is_null(self) -> Expr: + ... + + def drop_nulls(self) -> Expr: + ... + + def n_unique(self) -> Expr: + ... + + def sample(self, n: int, fraction: float, *, with_replacement: bool) -> Expr: + ... + + +class ExprStringNamespace(Protocol): + def ends_with(self, other: str) -> Expr: + ... + + +class ExprNameNamespace(Protocol): + def keep(self) -> Expr: + ... + + +class Namespace(Protocol): + def col(self, *names: str | Iterable[str]) -> Expr: + ... + + def all(self) -> Expr: + ... + + def sum(self, *columns: str) -> Expr: + ... + + def mean(self, *columns: str) -> Expr: + ... + + def len(self) -> Expr: + ... + + def all_horizontal(self, *exprs: IntoExpr | Iterable[IntoExpr]) -> Expr: + ... + + def any_horizontal(self, *exprs: IntoExpr | Iterable[IntoExpr]) -> Expr: + ... + + def sum_horizontal(self, *exprs: IntoExpr | Iterable[IntoExpr]) -> Expr: + ... + + def concat(self, items: Iterable[AnyDataFrame], *, how: str) -> AnyDataFrame: + ... + + +class Series(Protocol): + def alias(self, name: str) -> Self: + ... + + @property + def name(self) -> str: + ... + + def item(self) -> Any: + ... + + def is_between( + self, lower_bound: Any, upper_bound: Any, closed: str = "both" + ) -> Series: + ... + + def is_in(self, other: Any) -> Series: + ... + + def is_null(self) -> Series: + ... + + def drop_nulls(self) -> Series: + ... + + def n_unique(self) -> int: + ... + + def zip_with(self, mask: Series, other: Series) -> Series: + ... + + def sample(self, n: int, fraction: float, *, with_replacement: bool) -> Series: + ... + + def to_numpy(self) -> Any: + ... + + def to_pandas(self) -> Any: + ... + + +class DataFrame(Protocol): + def with_columns( + self, *exprs: IntoExpr | Iterable[IntoExpr], **named_exprs: IntoExpr + ) -> DataFrame: + ... + + def filter(self, *predicates: IntoExpr | Iterable[IntoExpr]) -> DataFrame: + ... + + def select( + self, *exprs: IntoExpr | Iterable[IntoExpr], **named_exprs: IntoExpr + ) -> DataFrame: + ... + + def sort( + self, + by: str | Iterable[str], + *more_by: str, + descending: bool | Iterable[bool] = False, + ) -> DataFrame: + ... + + def group_by(self, *keys: str | Iterable[str]) -> GroupBy: + ... + + def lazy(self) -> LazyFrame: + ... + + def join( + self, + other: DataFrame, + *, + how: Literal["inner"] = "inner", + left_on: str | list[str], + right_on: str | list[str], + ) -> DataFrame: + ... + + @property + def columns(self) -> list[str]: + ... + + def head(self, n: int) -> DataFrame: + ... + + def unique(self, subset: list[str]) -> DataFrame: + ... + + @property + def shape(self) -> tuple[int, int]: + ... + + def rename(self, mapping: dict[str, str]) -> DataFrame: + ... + + def to_numpy(self) -> Any: + ... + + def to_pandas(self) -> Any: + ... + + +class LazyFrame(Protocol): + @property + def columns(self) -> list[str]: + ... + + def with_columns( + self, *exprs: IntoExpr | Iterable[IntoExpr], **named_exprs: IntoExpr + ) -> LazyFrame: + ... + + def filter(self, *predicates: IntoExpr | Iterable[IntoExpr]) -> LazyFrame: + ... + + def select( + self, *exprs: IntoExpr | Iterable[IntoExpr], **named_exprs: IntoExpr + ) -> LazyFrame: + ... + + def sort( + self, + by: str | Iterable[str], + *more_by: str, + descending: bool | Iterable[bool] = False, + ) -> LazyFrame: + ... + + def collect(self) -> DataFrame: + ... + + def group_by(self, *keys: str | Iterable[str]) -> LazyGroupBy: + ... + + def join( + self, + other: LazyFrame, + *, + how: Literal["left", "inner", "outer"] = "inner", + left_on: str | list[str], + right_on: str | list[str], + ) -> LazyFrame: + ... + + def cache(self) -> LazyFrame: + ... + + def head(self, n: int) -> LazyFrame: + ... + + def unique(self, subset: list[str]) -> LazyFrame: + ... + + def rename(self, mapping: dict[str, str]) -> LazyFrame: + ... + + +class GroupBy(Protocol): + def agg( + self, *aggs: IntoExpr | Iterable[IntoExpr], **named_aggs: IntoExpr + ) -> DataFrame: + ... + + +class LazyGroupBy(Protocol): + def agg( + self, *aggs: IntoExpr | Iterable[IntoExpr], **named_aggs: IntoExpr + ) -> LazyFrame: + ... + + +IntoExpr = Expr | str | int | float | Series + +AnyDataFrame = TypeVar("AnyDataFrame", DataFrame, LazyFrame) diff --git a/puffin/translate.py b/puffin/translate.py new file mode 100644 index 000000000..b91252a0e --- /dev/null +++ b/puffin/translate.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING +from typing import Any + +if TYPE_CHECKING: + from puffin.spec import DataFrame + from puffin.spec import LazyFrame + from puffin.spec import Namespace + + +def to_polars_api(df: Any, version: str) -> tuple[LazyFrame, Namespace]: + if hasattr(df, "__puffin__"): + return df.__puffin__() + try: + import polars as pl + except ModuleNotFoundError: + pass + else: + if isinstance(df, pl.DataFrame): + return df.lazy(), pl # type: ignore[return-value] + if isinstance(df, pl.LazyFrame): + return df, pl # type: ignore[return-value] + try: + import pandas as pd + except ModuleNotFoundError: + pass + else: + if isinstance(df, pd.DataFrame): + from puffin.pandas_like import translate + + return translate(df, api_version=version, implementation="pandas") + try: + import cudf + except ModuleNotFoundError: + pass + else: + if isinstance(df, cudf.DataFrame): + from puffin.pandas_like import translate + + return translate(df, api_version=version, implementation="cudf") + try: + import modin.pandas as mpd + except ModuleNotFoundError: + pass + else: + if isinstance(df, mpd.DataFrame): + from puffin.pandas_like import translate + + return translate(df, api_version=version, implementation="modin") + msg = f"Could not translate DataFrame {type(df)}, please open a feature request." + raise TypeError(msg) + + +def quick_translate(df: Any, version: str, implementation: str) -> DataFrame: + """Translate to Polars API, if implementation is already known.""" + if implementation in ("pandas", "cudf"): + from puffin.pandas_like import translate + + df, _pl = translate(df, api_version=version, implementation=implementation) + return df + msg = f"Unknown implementation: {implementation}" + raise TypeError(msg) + + +def to_original_object(df: DataFrame | LazyFrame) -> Any: + try: + import polars as pl + except ModuleNotFoundError: + pass + else: + if isinstance(df, (pl.DataFrame, pl.LazyFrame)): + return df + return df.dataframe # type: ignore[union-attr] + + +def get_namespace(obj: Any, implementation: str | None = None) -> Namespace: + if implementation == "polars": + import polars as pl + + return pl # type: ignore[return-value] + try: + import polars as pl + except ModuleNotFoundError: + pass + else: + if isinstance(obj, (pl.DataFrame, pl.LazyFrame, pl.Series)): + return pl # type: ignore[return-value] + if hasattr(obj, "__dataframe_namespace__"): + return obj.__dataframe_namespace__() + if hasattr(obj, "__series_namespace__"): + return obj.__series_namespace__() + if hasattr(obj, "__lazyframe_namespace__"): + return obj.__lazyframe_namespace__() + if hasattr(obj, "__expr_namespace__"): + return obj.__expr_namespace__() + msg = f"Could not find namespace for object {obj}" + raise TypeError(msg) diff --git a/puffin/utils.py b/puffin/utils.py new file mode 100644 index 000000000..24ca89fbe --- /dev/null +++ b/puffin/utils.py @@ -0,0 +1,321 @@ +from __future__ import annotations + +from typing import Any +from typing import Iterable +from typing import TypeVar +from typing import cast + +from puffin.spec import DataFrame +from puffin.spec import Expr +from puffin.spec import IntoExpr +from puffin.spec import LazyFrame +from puffin.spec import Namespace +from puffin.spec import Series + +ExprT = TypeVar("ExprT", bound=Expr) + +T = TypeVar("T") + + +def validate_column_comparand(other: Any) -> Any: + """Validate RHS of binary operation. + + If the comparison isn't supported, return `NotImplemented` so that the + "right-hand-side" operation (e.g. `__radd__`) can be tried. + + If RHS is length 1, return the scalar value, so that the underlying + library can broadcast it. + """ + if isinstance(other, list): + if len(other) > 1: + # e.g. `plx.all() + plx.all()` + msg = "Multi-output expressions are not supported in this context" + raise ValueError(msg) + other = other[0] + if hasattr( + other, + "__dataframe_namespace__", + ): + return NotImplemented + if hasattr(other, "__series_namespace__"): + if other.len() == 1: + # broadcast + return other.item() + return other.series + return other + + +def validate_dataframe_comparand(other: Any) -> Any: + """Validate RHS of binary operation. + + If the comparison isn't supported, return `NotImplemented` so that the + "right-hand-side" operation (e.g. `__radd__`) can be tried. + """ + if isinstance(other, list) and len(other) > 1: + # e.g. `plx.all() + plx.all()` + msg = "Multi-output expressions are not supported in this context" + raise ValueError(msg) + if isinstance(other, list): + other = other[0] + if hasattr( + other, + "__dataframe_namespace__", + ): + return NotImplemented + if hasattr(other, "__series_namespace__"): + if other.len() == 1: + # broadcast + return other.get_value(0) + return other.series + return other + + +def maybe_evaluate_expr(df: DataFrame | LazyFrame, arg: Any) -> Any: + """Evaluate expression if it's an expression, otherwise return it as is.""" + if hasattr(arg, "__expr_namespace__"): + return arg.call(df) + return arg + + +def get_namespace(obj: Any) -> Namespace: + if hasattr(obj, "__dataframe_namespace__"): + return obj.__dataframe_namespace__() + if hasattr(obj, "__lazyframe_namespace__"): + return obj.__lazyframe_namespace__() + if hasattr(obj, "__series_namespace__"): + return obj.__series_namespace__() + if hasattr(obj, "__expr_namespace__"): + return obj.__expr_namespace__() + msg = f"Expected DataFrame or LazyFrame, got {type(obj)}" + raise TypeError(msg) + + +def parse_into_exprs( + plx: Namespace, *exprs: IntoExpr | Iterable[IntoExpr], **named_exprs: IntoExpr +) -> list[Expr]: + out = [parse_into_expr(plx, into_expr) for into_expr in flatten_into_expr(*exprs)] + for name, expr in named_exprs.items(): + out.append(parse_into_expr(plx, expr).alias(name)) + return out + + +def parse_into_expr(plx: Namespace, into_expr: IntoExpr) -> Expr: + if isinstance(into_expr, str): + return plx.col(into_expr) + if hasattr(into_expr, "__expr_namespace__"): + return cast(Expr, into_expr) + if hasattr(into_expr, "__series_namespace__"): + into_expr = cast(Series, into_expr) + return plx._create_expr_from_series(into_expr) # type: ignore[attr-defined] + msg = f"Expected IntoExpr, got {type(into_expr)}" + raise TypeError(msg) + + +def evaluate_into_expr(df: DataFrame | LazyFrame, into_expr: IntoExpr) -> list[Series]: + """ + Return list of raw columns. + """ + expr = parse_into_expr(get_namespace(df), into_expr) + return expr.call(df) # type: ignore[attr-defined] + + +def flatten_str(*args: str | Iterable[str]) -> list[str]: + out: list[str] = [] + for arg in args: + if isinstance(arg, str): + out.append(arg) + else: + out.extend(arg) + return out + + +def flatten_bool(*args: bool | Iterable[bool]) -> list[bool]: + out: list[bool] = [] + for arg in args: + if isinstance(arg, bool): + out.append(arg) + else: + out.extend(arg) + return out + + +def flatten_into_expr(*args: IntoExpr | Iterable[IntoExpr]) -> list[IntoExpr]: + out: list[IntoExpr] = [] + for arg in args: + if isinstance(arg, (list, tuple)): + out.extend(arg) + else: + out.append(arg) # type: ignore[arg-type] + return out + + +# in filter, I want to: +# - flatten the into exprs +# - convert all to exprs +# - pass these to all_horizontal + + +def evaluate_into_exprs( + df: DataFrame | LazyFrame, + *exprs: IntoExpr | Iterable[IntoExpr], + **named_exprs: IntoExpr, +) -> list[Series]: + """Evaluate each expr into Series.""" + series: list[Series] = [ + item + for sublist in [ + evaluate_into_expr(df, into_expr) for into_expr in flatten_into_expr(*exprs) + ] + for item in sublist + ] + for name, expr in named_exprs.items(): + evaluated_expr = evaluate_into_expr(df, expr) + if len(evaluated_expr) > 1: + msg = "Named expressions must return a single column" + raise ValueError(msg) + series.append(evaluated_expr[0].alias(name)) + return series + + +def register_expression_call(expr: Expr, attr: str, *args: Any, **kwargs: Any) -> Expr: + plx = get_namespace(expr) + + def func(df: DataFrame | LazyFrame) -> list[Series]: + out: list[Series] = [] + for column in expr.call(df): # type: ignore[attr-defined] + # should be enough to just evaluate? + # validation should happen within column methods? + _out = getattr(column, attr)( # type: ignore[no-any-return] + *[maybe_evaluate_expr(df, arg) for arg in args], + **{ + arg_name: maybe_evaluate_expr(df, arg_value) + for arg_name, arg_value in kwargs.items() + }, + ) + if hasattr(_out, "__series_namespace__"): + _out = cast(Series, _out) # help mypy + out.append(_out) + else: + out.append( + plx._create_series_from_scalar(_out, column) # type: ignore[attr-defined] + ) + return out + + if expr._depth is None: # type: ignore[attr-defined] + msg = "Unreachable code, please report a bug" + raise AssertionError(msg) + if expr._function_name is not None: # type: ignore[attr-defined] + function_name: str = f"{expr._function_name}->{attr}" # type: ignore[attr-defined] + else: + function_name = attr + return plx._create_expr_from_callable( # type: ignore[attr-defined] + func, + depth=expr._depth + 1, # type: ignore[attr-defined] + function_name=function_name, + root_names=expr._root_names, # type: ignore[attr-defined] + output_names=expr._output_names, # type: ignore[attr-defined] + ) + + +def item(s: Any) -> Any: + # cuDF doesn't have Series.item(). + if len(s) != 1: + msg = "Can only convert a Series of length 1 to a scalar" + raise ValueError(msg) + return s.iloc[0] + + +def is_simple_aggregation(expr: Expr) -> bool: + return ( + expr._function_name is not None # type: ignore[attr-defined] + and expr._depth is not None # type: ignore[attr-defined] + and expr._depth <= 2 # type: ignore[attr-defined] + # todo: avoid this one? + and expr._root_names is not None # type: ignore[attr-defined] + ) + + +def evaluate_simple_aggregation(expr: Expr, grouped: Any) -> Any: + """ + Use fastpath for simple aggregations if possible. + + If an aggregation is simple (e.g. `pl.col('a').mean()`), then pandas-like + implementations have a fastpath we can use. + + For example, `df.group_by('a').agg(pl.col('b').mean())` can be evaluated + as `df.groupby('a')['b'].mean()`, whereas + `df.group_by('a').agg(mean=(pl.col('b') - pl.col('c').mean()).mean())` + requires a lambda function, which is slower. + + Returns naive DataFrame. + """ + if expr._root_names is None or expr._output_names is None: # type: ignore[attr-defined] + msg = "Expected expr to have root_names and output_names set, but they are None. Please report a bug." + raise AssertionError(msg) + if len(expr._root_names) != len(expr._output_names): # type: ignore[attr-defined] + msg = "Expected expr to have same number of root_names and output_names, but they are different. Please report a bug." + raise AssertionError(msg) + new_names = dict(zip(expr._root_names, expr._output_names)) # type: ignore[attr-defined] + return getattr(grouped[expr._root_names], expr._function_name)()[ # type: ignore[attr-defined] + expr._root_names # type: ignore[attr-defined] + ].rename(columns=new_names) + + +def horizontal_concat(dfs: list[Any], implementation: str) -> Any: + """ + Concatenate (native) DataFrames. + + Should be in namespace. + """ + if implementation == "pandas": + import pandas as pd + + return pd.concat(dfs, axis=1, copy=False) + if implementation == "cudf": + import cudf + + return cudf.concat(dfs, axis=1) + if implementation == "modin": + import modin.pandas as mpd + + return mpd.concat(dfs, axis=1) + msg = f"Unknown implementation: {implementation}" + raise TypeError(msg) + + +def dataframe_from_dict(data: dict[str, Any], implementation: str) -> Any: + """Return native dataframe.""" + if implementation == "pandas": + import pandas as pd + + return pd.DataFrame(data, copy=False) + if implementation == "cudf": + import cudf + + return cudf.DataFrame(data) + if implementation == "modin": + import modin.pandas as mpd + + return mpd.DataFrame(data) + msg = f"Unknown implementation: {implementation}" + raise TypeError(msg) + + +def series_from_iterable( + data: Iterable[Any], name: str, index: Any, implementation: str +) -> Any: + """Return native series.""" + if implementation == "pandas": + import pandas as pd + + return pd.Series(data, name=name, index=index, copy=False) + if implementation == "cudf": + import cudf + + return cudf.Series(data, name=name, index=index) + if implementation == "modin": + import modin.pandas as mpd + + return mpd.Series(data, name=name, index=index) + msg = f"Unknown implementation: {implementation}" + raise TypeError(msg)