Skip to content

Commit

Permalink
Cleanup / write some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Aug 5, 2023
1 parent 046bf40 commit c4e0497
Show file tree
Hide file tree
Showing 12 changed files with 333 additions and 105 deletions.
2 changes: 1 addition & 1 deletion py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
when,
zeros,
)
from polars.interchange import from_dataframe
from polars.interchange.from_dataframe import from_dataframe
from polars.io import (
read_avro,
read_csv,
Expand Down
8 changes: 4 additions & 4 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
from xlsxwriter import Workbook

from polars import Expr, LazyFrame, Series
from polars.internals.interchange import PolarsDataFrameXchg
from polars.interchange.dataframe import PolarsDataFrame
from polars.type_aliases import (
AsofJoinStrategy,
AvroCompression,
Expand Down Expand Up @@ -1204,7 +1204,7 @@ def __array__(self, dtype: Any = None) -> np.ndarray[Any, Any]:

def __dataframe__(
self, nan_as_null: bool = False, allow_copy: bool = True
) -> PolarsDataFrameXchg:
) -> PolarsDataFrame:
"""
Convert to a dataframe object implementing the dataframe interchange protocol.
Expand Down Expand Up @@ -1237,9 +1237,9 @@ def __dataframe__(
contains categorical data.
"""
from polars.internals.interchange.dataframe import PolarsDataFrameXchg
from polars.interchange.dataframe import PolarsDataFrame

return PolarsDataFrameXchg(self, nan_as_null, allow_copy)
return PolarsDataFrame(self, nan_as_null, allow_copy)

def __dataframe_consortium_standard__(
self, *, api_version: str | None = None
Expand Down
7 changes: 0 additions & 7 deletions py-polars/polars/interchange/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +0,0 @@
from polars.interchange.dataframe import PolarsDataFrameXchg
from polars.interchange.from_dataframe import from_dataframe

__all__ = [
"PolarsDataFrameXchg",
"from_dataframe",
]
19 changes: 11 additions & 8 deletions py-polars/polars/interchange/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@

from typing import TYPE_CHECKING

from polars.internals.interchange.dataframe_protocol import (
from polars.interchange.dataframe_protocol import (
Buffer,
DlpackDeviceType,
DtypeKind,
)
from polars.internals.interchange.utils import polars_dtype_to_dtype
from polars.interchange.utils import polars_dtype_to_dtype

if TYPE_CHECKING:
import polars as pl
from typing import NoReturn

from polars import Series


class PolarsBuffer(Buffer):
"""A buffer represented by a Polars Series consisting of a single chunk."""
"""A buffer backed by a Polars Series consisting of a single chunk."""

def __init__(self, data: pl.Series, allow_copy: bool = True) -> None:
def __init__(self, data: Series, allow_copy: bool = True):
if data.n_chunks() > 1:
if allow_copy:
data = data.rechunk()
Expand All @@ -34,7 +36,8 @@ def bufsize(self) -> int:
bytes_per_element = dtype[1] // 8

if dtype[0] == DtypeKind.STRING:
return self._data.str.lengths().sum() * bytes_per_element
n_bytes: int = self._data.str.lengths().sum() # type: ignore[assignment]
return n_bytes * bytes_per_element
else:
return len(self._data) * bytes_per_element

Expand All @@ -43,11 +46,11 @@ def ptr(self) -> int:
"""Pointer to start of the buffer as an integer."""
return self._data._s.get_ptr()

def __dlpack__(self):
def __dlpack__(self) -> NoReturn:
"""Represent this structure as DLPack interface."""
raise NotImplementedError("__dlpack__")

def __dlpack_device__(self) -> tuple[DlpackDeviceType, int | None]:
def __dlpack_device__(self) -> tuple[DlpackDeviceType, None]:
"""Device type and device ID for where the data in the buffer resides."""
return (DlpackDeviceType.CPU, None)

Expand Down
39 changes: 27 additions & 12 deletions py-polars/polars/interchange/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,27 @@
from typing import TYPE_CHECKING

import polars as pl
from polars.internals.interchange.buffer import PolarsBuffer
from polars.internals.interchange.dataframe_protocol import (
from polars.interchange.buffer import PolarsBuffer
from polars.interchange.dataframe_protocol import (
Column,
ColumnNullType,
DtypeKind,
)
from polars.internals.interchange.utils import polars_dtype_to_dtype
from polars.interchange.utils import polars_dtype_to_dtype

if TYPE_CHECKING:
from collections.abc import Iterable
from typing import Any

from polars.internals.interchange.dataframe_protocol import (
from polars.interchange.dataframe_protocol import (
CategoricalDescription,
ColumnBuffers,
Dtype,
)


class PolarsColumn(Column):
"""A column represented by a Polars Series."""
"""A column backed by a Polars Series."""

def __init__(self, column: pl.Series, allow_copy: bool = True):
self._col = column
Expand Down Expand Up @@ -52,33 +52,48 @@ def dtype(self) -> Dtype:

@property
def describe_categorical(self) -> CategoricalDescription:
"""
Return information on the categorical column.
Raises
------
TypeError
If the dtype is not categorical.
"""
if self.dtype[0] != DtypeKind.CATEGORICAL:
raise TypeError(
"describe_categorical only works on a column with categorical dtype!"
)

categories = self._col.unique().cat.set_ordering("physical").sort()
return {
"is_ordered": self._col.cat.ordered, # TODO: Implement
"is_ordered": False, # self._col.cat.ordered, # TODO: Implement
"is_dictionary": True,
"categories": PolarsColumn(categories),
}

@property
def describe_null(self) -> tuple[ColumnNullType, int]:
"""Return the null representation the column dtype uses."""
return ColumnNullType.USE_BITMASK, 0

def null_count(self) -> int:
@property
def null_count(self) -> int | None:
"""The number of null elements."""
return self._col.null_count()

@property
def metadata(self) -> dict[str, Any]:
"""The metadata for the column."""
return {}

def num_chunks(self) -> int:
"""Return the number of chunks the column consists of."""
return self._col.n_chunks()

def get_chunks(self, n_chunks: int | None = None) -> Iterable[PolarsColumn]:
"""Return an iterator yielding the chunks."""
total_n_chunks = self.num_chunks()
chunks = self._col.get_chunks() # TODO: Implement

Expand All @@ -103,6 +118,7 @@ def get_chunks(self, n_chunks: int | None = None) -> Iterable[PolarsColumn]:
yield PolarsColumn(chunk[start : start + step], self._allow_copy)

def get_buffers(self) -> ColumnBuffers:
"""Return a dictionary containing the underlying buffers."""
buffers: ColumnBuffers = {
"data": self._get_data_buffer(),
"validity": self._get_validity_buffer(),
Expand Down Expand Up @@ -139,7 +155,7 @@ def _get_validity_buffer(self) -> tuple[PolarsBuffer, Dtype]:
"""
Return the buffer containing the mask values indicating missing data and
the buffer's associated dtype.
"""
""" # noqa: D205
buffer = PolarsBuffer(self._col.is_not_null(), self._allow_copy)
dtype = polars_dtype_to_dtype(pl.Boolean)
return buffer, dtype
Expand All @@ -150,19 +166,18 @@ def _get_offsets_buffer(self) -> tuple[PolarsBuffer, Dtype]:
data (e.g., variable-length strings) and the buffer's associated dtype.
Raises NoBufferPresent if the data buffer does not have an associated
offsets buffer.
"""
""" # noqa: D205
if self.dtype[0] != DtypeKind.STRING:
raise TypeError(
"This column has a fixed-length dtype so "
"it does not have an offsets buffer"
"This column has a fixed-length dtype so it does not have an offsets buffer"
)

offsets = (
self._col.str.n_chars()
.fill_null(0)
.cumsum()
.extend_constant(None, 1)
.shift_and_fill(1, 0)
.shift_and_fill(0)
.rechunk()
)

Expand Down
81 changes: 50 additions & 31 deletions py-polars/polars/interchange/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,44 @@
from itertools import accumulate
from typing import TYPE_CHECKING

from polars.internals.interchange.column import PolarsColumn
from polars.internals.interchange.dataframe_protocol import DataFrame as DataFrameXchg
from polars.interchange.column import PolarsColumn
from polars.interchange.dataframe_protocol import DataFrame as DataFrameObject

if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Iterable, Iterator
from typing import Any

import polars as pl
from polars import DataFrame


class PolarsDataFrameXchg(DataFrameXchg):
"""A dataframe represented by a Polars DataFrame."""
class PolarsDataFrame(DataFrameObject):
"""A dataframe object backed by a Polars DataFrame."""

def __init__(
self, df: pl.DataFrame, nan_as_null: bool = False, allow_copy: bool = True
self, df: DataFrame, nan_as_null: bool = False, allow_copy: bool = True
):
self._df = df
self._nan_as_null = nan_as_null # Has no effect for now
self._allow_copy = allow_copy

def __dataframe__(
self, nan_as_null: bool = False, allow_copy: bool = True
) -> PolarsDataFrameXchg:
return PolarsDataFrameXchg(self._df, nan_as_null, allow_copy)
) -> PolarsDataFrame:
"""Construct a new exchange object, potentially changing the parameters."""
return PolarsDataFrame(self._df, nan_as_null=nan_as_null, allow_copy=allow_copy)

@property
def metadata(self) -> dict[str, Any]:
"""The metadata for the dataframe."""
return {}

def num_columns(self) -> int:
return self._df.shape[1]
"""Return the number of columns in the DataFrame."""
return self._df.width

def num_rows(self) -> int:
return self._df.shape[0]
"""Return the number of rows in the DataFrame."""
return self._df.height

def num_chunks(self) -> int:
"""
Expand All @@ -49,50 +53,62 @@ def num_chunks(self) -> int:
See Also
--------
polars.internals.dataframe.frame.DataFrame.n_chunks.
polars.dataframe.frame.DataFrame.n_chunks
"""
return self._df.n_chunks()
return self._df.n_chunks("first")

def column_names(self) -> list[str]:
"""Return the column names."""
return self._df.columns

def get_column(self, i: int) -> PolarsColumn:
"""Return the column at the indicated position."""
return PolarsColumn(self._df.to_series(i), allow_copy=self._allow_copy)

def get_column_by_name(self, name: str) -> PolarsColumn:
"""Return the column whose name is the indicated name."""
return PolarsColumn(self._df[name], allow_copy=self._allow_copy)

def get_columns(self) -> list[PolarsColumn]:
return [
PolarsColumn(column, allow_copy=self._allow_copy)
for column in self._df.get_columns()
]
def get_columns(self) -> Iterable[PolarsColumn]:
"""Return an iterator yielding the columns."""
for column in self._df.get_columns():
yield PolarsColumn(column, allow_copy=self._allow_copy)

def select_columns(self, indices: Sequence[int]) -> PolarsDataFrameXchg:
def select_columns(self, indices: Sequence[int]) -> PolarsDataFrame:
"""Create a new DataFrame by selecting a subset of columns by index."""
if not isinstance(indices, Sequence):
raise ValueError("`indices` is not a sequence")
raise TypeError("`indices` is not a sequence")
if not isinstance(indices, list):
indices = list(indices)

return PolarsDataFrameXchg(
self._df[:, indices], self._nan_as_null, self._allow_copy
return PolarsDataFrame(
self._df[:, indices],
nan_as_null=self._nan_as_null,
allow_copy=self._allow_copy,
)

def select_columns_by_name(self, names: Sequence[str]) -> PolarsDataFrameXchg:
def select_columns_by_name(self, names: Sequence[str]) -> PolarsDataFrame:
"""Create a new DataFrame by selecting a subset of columns by name."""
if not isinstance(names, Sequence):
raise ValueError("`names` is not a sequence")

return PolarsDataFrameXchg(
self._df.select(names), self._nan_as_null, self._allow_copy
return PolarsDataFrame(
self._df.select(names),
nan_as_null=self._nan_as_null,
allow_copy=self._allow_copy,
)

def get_chunks(self, n_chunks: int | None = None) -> Iterable[PolarsDataFrameXchg]:
def get_chunks(self, n_chunks: int | None = None) -> Iterator[PolarsDataFrame]:
"""Return an iterator yielding the chunks."""
total_n_chunks = self.num_chunks()
chunks = self._get_chunks_from_col_chunks()

if (n_chunks is None) or (n_chunks == total_n_chunks):
for chunk in chunks:
yield PolarsDataFrameXchg(chunk, self._allow_copy)
yield PolarsDataFrame(
chunk, nan_as_null=self._nan_as_null, allow_copy=self._allow_copy
)

elif (n_chunks <= 0) or (n_chunks % total_n_chunks != 0):
raise ValueError(
Expand All @@ -108,18 +124,21 @@ def get_chunks(self, n_chunks: int | None = None) -> Iterable[PolarsDataFrameXch
if size % subchunks_per_chunk != 0:
step += 1
for start in range(0, step * subchunks_per_chunk, step):
yield PolarsDataFrameXchg(
chunk[start : start + step, :], self._allow_copy
yield PolarsDataFrame(
chunk[start : start + step, :],
nan_as_null=self._nan_as_null,
allow_copy=self._allow_copy,
)

def _get_chunks_from_col_chunks(self) -> Iterable[pl.DataFrame]:
def _get_chunks_from_col_chunks(self) -> Iterator[DataFrame]:
"""
Return chunks of this dataframe according to the chunks of the first column.
If columns are not all chunked identically, they will be rechunked like the
first column. If copy is not allowed, raises RuntimeError.
"""
col_chunks = self.get_column(0).get_chunks()
chunk_sizes = [len(chunk) for chunk in col_chunks]
chunk_sizes = [chunk.size() for chunk in col_chunks]
starts = [0] + list(accumulate(chunk_sizes))

for i in range(len(starts) - 1):
Expand Down
Loading

0 comments on commit c4e0497

Please sign in to comment.