Skip to content

Commit

Permalink
xfail py38 pandas_pyarrow
Browse files Browse the repository at this point in the history
  • Loading branch information
FBruzzesi committed Nov 12, 2024
2 parents 28d329a + 626e5bf commit 46f7baa
Show file tree
Hide file tree
Showing 69 changed files with 1,769 additions and 329 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/downstream_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ jobs:
run: |
uv pip uninstall narwhals --system
uv pip install -e . --system
# temporarily pin websockets to get CI green
uv pip install "websockets<14.0" --system
- name: show-deps
run: uv pip freeze
- name: Run `make narwhals-test-integration`
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ jobs:
run: uv pip install ibis-framework[duckdb]>=6.0.0 --system
# Ibis puts upper bounds on dependencies, and requires Python3.10+,
# which messes with other dependencies on lower Python versions
if: matrix.python-version == '3.12'
if: matrix.python-version == '3.11'
- name: Run pytest
run: pytest tests --cov=narwhals --cov=tests --cov-fail-under=100 --runslow
- name: Run doctests
if: matrix.python-version == '3.12'
if: matrix.python-version == '3.13'
run: pytest narwhals --doctest-modules
2 changes: 2 additions & 0 deletions docs/api-reference/expr.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
- is_null
- is_unique
- len
- map_batches
- max
- mean
- median
- min
- mode
- null_count
Expand Down
1 change: 1 addition & 0 deletions docs/api-reference/narwhals.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Here are the top-level functions available in Narwhals.
- maybe_set_index
- mean
- mean_horizontal
- median
- min
- min_horizontal
- narwhalify
Expand Down
1 change: 1 addition & 0 deletions docs/api-reference/series.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
- len
- max
- mean
- median
- min
- mode
- name
Expand Down
2 changes: 1 addition & 1 deletion docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ To verify the installation, start the Python REPL and execute:
```python
>>> import narwhals
>>> narwhals.__version__
'1.13.3'
'1.13.4'
```
If you see the version number, then the installation was successful!

Expand Down
4 changes: 3 additions & 1 deletion narwhals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from narwhals.expr import max_horizontal
from narwhals.expr import mean
from narwhals.expr import mean_horizontal
from narwhals.expr import median
from narwhals.expr import min
from narwhals.expr import min_horizontal
from narwhals.expr import nth
Expand Down Expand Up @@ -67,7 +68,7 @@
from narwhals.utils import maybe_reset_index
from narwhals.utils import maybe_set_index

__version__ = "1.13.3"
__version__ = "1.13.4"

__all__ = [
"dependencies",
Expand Down Expand Up @@ -99,6 +100,7 @@
"max_horizontal",
"mean",
"mean_horizontal",
"median",
"min",
"min_horizontal",
"nth",
Expand Down
56 changes: 53 additions & 3 deletions narwhals/_arrow/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from narwhals._expression_parsing import reuse_series_implementation
from narwhals._expression_parsing import reuse_series_namespace_implementation
from narwhals.dependencies import get_numpy
from narwhals.dependencies import is_numpy_array
from narwhals.utils import Implementation

if TYPE_CHECKING:
Expand Down Expand Up @@ -205,6 +207,9 @@ def filter(self, *predicates: IntoArrowExpr) -> Self:
def mean(self) -> Self:
return reuse_series_implementation(self, "mean", returns_scalar=True)

def median(self) -> Self:
return reuse_series_implementation(self, "median", returns_scalar=True)

def count(self) -> Self:
return reuse_series_implementation(self, "count", returns_scalar=True)

Expand Down Expand Up @@ -303,8 +308,15 @@ def sample(
seed=seed,
)

def fill_null(self: Self, value: Any) -> Self:
return reuse_series_implementation(self, "fill_null", value=value)
def fill_null(
self: Self,
value: Any | None = None,
strategy: Literal["forward", "backward"] | None = None,
limit: int | None = None,
) -> Self:
return reuse_series_implementation(
self, "fill_null", value=value, strategy=strategy, limit=limit
)

def is_duplicated(self: Self) -> Self:
return reuse_series_implementation(self, "is_duplicated")
Expand All @@ -322,7 +334,7 @@ def unique(self: Self, *, maintain_order: bool = False) -> Self:
return reuse_series_implementation(self, "unique", maintain_order=maintain_order)

def replace_strict(
self: Self, old: Sequence[Any], new: Sequence[Any], *, return_dtype: DType
self: Self, old: Sequence[Any], new: Sequence[Any], *, return_dtype: DType | None
) -> Self:
return reuse_series_implementation(
self, "replace_strict", old, new, return_dtype=return_dtype
Expand Down Expand Up @@ -380,6 +392,44 @@ def func(df: ArrowDataFrame) -> list[ArrowSeries]:
def mode(self: Self) -> Self:
return reuse_series_implementation(self, "mode")

def map_batches(
self: Self,
function: Callable[[Any], Any],
return_dtype: DType | None = None,
) -> Self:
def func(df: ArrowDataFrame) -> list[ArrowSeries]:
input_series_list = self._call(df)
output_names = [input_series.name for input_series in input_series_list]
result = [function(series) for series in input_series_list]

if is_numpy_array(result[0]):
result = [
df.__narwhals_namespace__()
._create_compliant_series(array)
.alias(output_name)
for array, output_name in zip(result, output_names)
]
elif (np := get_numpy()) is not None and np.isscalar(result[0]):
result = [
df.__narwhals_namespace__()
._create_compliant_series([array])
.alias(output_name)
for array, output_name in zip(result, output_names)
]
if return_dtype is not None:
result = [series.cast(return_dtype) for series in result]
return result

return self.__class__(
func,
depth=self._depth + 1,
function_name=self._function_name + "->map_batches",
root_names=self._root_names,
output_names=self._output_names,
backend_version=self._backend_version,
dtypes=self._dtypes,
)

def is_finite(self: Self) -> Self:
return reuse_series_implementation(self, "is_finite")

Expand Down
1 change: 1 addition & 0 deletions narwhals/_arrow/group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

POLARS_TO_ARROW_AGGREGATIONS = {
"len": "count",
"median": "approximate_median",
"n_unique": "count_distinct",
"std": "stddev",
"var": "variance", # currently unused, we don't have `var` yet
Expand Down
15 changes: 11 additions & 4 deletions narwhals/_arrow/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ def _create_expr_from_series(self, series: ArrowSeries) -> ArrowExpr:
dtypes=self._dtypes,
)

def _create_series_from_scalar(self, value: Any, series: ArrowSeries) -> ArrowSeries:
def _create_series_from_scalar(
self, value: Any, *, reference_series: ArrowSeries
) -> ArrowSeries:
from narwhals._arrow.series import ArrowSeries

if self._backend_version < (13,) and hasattr(value, "as_py"):
value = value.as_py()
return ArrowSeries._from_iterable(
[value],
name=series.name,
name=reference_series.name,
backend_version=self._backend_version,
dtypes=self._dtypes,
)
Expand Down Expand Up @@ -152,7 +154,7 @@ def lit(self, value: Any, dtype: DType | None) -> ArrowExpr:
def _lit_arrow_series(_: ArrowDataFrame) -> ArrowSeries:
arrow_series = ArrowSeries._from_iterable(
data=[value],
name="lit",
name="literal",
backend_version=self._backend_version,
dtypes=self._dtypes,
)
Expand All @@ -165,7 +167,7 @@ def _lit_arrow_series(_: ArrowDataFrame) -> ArrowSeries:
depth=0,
function_name="lit",
root_names=None,
output_names=["lit"],
output_names=[_lit_arrow_series.__name__],
backend_version=self._backend_version,
dtypes=self._dtypes,
)
Expand Down Expand Up @@ -325,6 +327,11 @@ def mean(self, *column_names: str) -> ArrowExpr:
*column_names, backend_version=self._backend_version, dtypes=self._dtypes
).mean()

def median(self, *column_names: str) -> ArrowExpr:
return ArrowExpr.from_column_names(
*column_names, backend_version=self._backend_version, dtypes=self._dtypes
).median()

def max(self, *column_names: str) -> ArrowExpr:
return ArrowExpr.from_column_names(
*column_names, backend_version=self._backend_version, dtypes=self._dtypes
Expand Down
91 changes: 80 additions & 11 deletions narwhals/_arrow/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,17 @@ def mean(self) -> int:

return pc.mean(self._native_series) # type: ignore[no-any-return]

def median(self) -> int:
import pyarrow.compute as pc # ignore-banned-import()

from narwhals._exceptions import InvalidOperationError

if not self.dtype.is_numeric():
msg = "`median` operation not supported for non-numeric input type."
raise InvalidOperationError(msg)

return pc.approximate_median(self._native_series) # type: ignore[no-any-return]

def min(self) -> int:
import pyarrow.compute as pc # ignore-banned-import()

Expand Down Expand Up @@ -569,14 +580,57 @@ def sample(

return self._from_native_series(pc.take(ser, mask))

def fill_null(self: Self, value: Any) -> Self:
def fill_null(
self: Self,
value: Any | None = None,
strategy: Literal["forward", "backward"] | None = None,
limit: int | None = None,
) -> Self:
import numpy as np # ignore-banned-import
import pyarrow as pa # ignore-banned-import()
import pyarrow.compute as pc # ignore-banned-import()

def fill_aux(
arr: pa.Array,
limit: int,
direction: Literal["forward", "backward"] | None = None,
) -> pa.Array:
# this algorithm first finds the indices of the valid values to fill all the null value positions
# then it calculates the distance of each new index and the original index
# if the distance is equal to or less than the limit and the original value is null, it is replaced
valid_mask = pc.is_valid(arr)
indices = pa.array(np.arange(len(arr)), type=pa.int64())
if direction == "forward":
valid_index = np.maximum.accumulate(np.where(valid_mask, indices, -1))
distance = indices - valid_index
else:
valid_index = np.minimum.accumulate(
np.where(valid_mask[::-1], indices[::-1], len(arr))
)[::-1]
distance = valid_index - indices
return pc.if_else(
pc.and_(
pc.is_null(arr),
pc.less_equal(distance, pa.scalar(limit)),
),
arr.take(valid_index),
arr,
)

ser = self._native_series
dtype = ser.type

return self._from_native_series(pc.fill_null(ser, pa.scalar(value, dtype)))
if value is not None:
res_ser = self._from_native_series(pc.fill_null(ser, pa.scalar(value, dtype)))
elif limit is None:
fill_func = (
pc.fill_null_forward if strategy == "forward" else pc.fill_null_backward
)
res_ser = self._from_native_series(fill_func(ser))
else:
res_ser = self._from_native_series(fill_aux(ser, limit, strategy))

return res_ser

def to_frame(self: Self) -> ArrowDataFrame:
import pyarrow as pa # ignore-banned-import()
Expand Down Expand Up @@ -656,16 +710,16 @@ def unique(self: Self, *, maintain_order: bool = False) -> ArrowSeries:
return self._from_native_series(pc.unique(self._native_series))

def replace_strict(
self, old: Sequence[Any], new: Sequence[Any], *, return_dtype: DType
self, old: Sequence[Any], new: Sequence[Any], *, return_dtype: DType | None
) -> ArrowSeries:
import pyarrow as pa # ignore-banned-import
import pyarrow.compute as pc # ignore-banned-import

# https://stackoverflow.com/a/79111029/4451315
idxs = pc.index_in(self._native_series, pa.array(old))
result_native = pc.take(pa.array(new), idxs).cast(
narwhals_to_native_dtype(return_dtype, self._dtypes)
)
result_native = pc.take(pa.array(new), idxs)
if return_dtype is not None:
result_native.cast(narwhals_to_native_dtype(return_dtype, self._dtypes))
result = self._from_native_series(result_native)
if result.is_null().sum() != self.is_null().sum():
msg = (
Expand Down Expand Up @@ -698,17 +752,32 @@ def to_dummies(
from narwhals._arrow.dataframe import ArrowDataFrame

series = self._native_series
da = series.dictionary_encode().combine_chunks()
name = self._name
da = series.dictionary_encode(null_encoding="encode").combine_chunks()

columns = np.zeros((len(da.dictionary), len(da)), np.uint8)
columns = np.zeros((len(da.dictionary), len(da)), np.int8)
columns[da.indices, np.arange(len(da))] = 1
names = [f"{self._name}{separator}{v}" for v in da.dictionary]
null_col_pa, null_col_pl = f"{name}{separator}None", f"{name}{separator}null"
cols = [
{null_col_pa: null_col_pl}.get(
f"{name}{separator}{v}", f"{name}{separator}{v}"
)
for v in da.dictionary
]

output_order = (
[
null_col_pl,
*sorted([c for c in cols if c != null_col_pl])[int(drop_first) :],
]
if null_col_pl in cols
else sorted(cols)[int(drop_first) :]
)
return ArrowDataFrame(
pa.Table.from_arrays(columns, names=names),
pa.Table.from_arrays(columns, names=cols),
backend_version=self._backend_version,
dtypes=self._dtypes,
).select(*sorted(names)[int(drop_first) :])
).select(*output_order)

def quantile(
self: Self,
Expand Down
Loading

0 comments on commit 46f7baa

Please sign in to comment.