Skip to content

Commit

Permalink
Bump Polars to v0.41.3 (#917)
Browse files Browse the repository at this point in the history
  • Loading branch information
lkarthee authored Jul 24, 2024
1 parent 3124d32 commit 59504fa
Show file tree
Hide file tree
Showing 23 changed files with 1,188 additions and 677 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- `Explorer.Series.pow/2` no longer casts to float when the exponent is a signed
integer. We are following the way Polars works now, which is to try to execute
the operation or raise an exception in case the exponent is negative.

- `Explorer.Series.pivot_wider/4` no longer includes the `names_from` column
name in the new columns when `values_from` is a list of columns. This is more
consistent with its behaviour when `values_from` is a single column.

- `Explorer.Series.substring/3` no longer cycles to the end of the string if the
negative offset surpasses the beginning of that string. In that case, an empty
string is returned.

- The `Explorer.Series.ewm_*` functions no longer replace `nil` values with the
value at the previous index. They now propogate `nil` values through to the
result series.

- Saving a dataframe as a Parquet file to S3 services no longer works when
streaming is enabled. This is temporary due to a bug in Polars. An exception
should be raised instead.

## [v0.8.3] - 2024-06-10

### Added
Expand Down
23 changes: 11 additions & 12 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2788,7 +2788,7 @@ defmodule Explorer.DataFrame do
#Explorer.DataFrame<
Polars[3 x 2]
a string ["a", "b", "c"]
b f64 [1.0, 4.0, 9.0]
b s64 [1, 4, 9]
>
It's possible to "reuse" a variable for different computations:
Expand Down Expand Up @@ -4755,8 +4755,7 @@ defmodule Explorer.DataFrame do
Multiple columns are accepted for the `values_from` parameter, but the behaviour is slightly
different for the naming of new columns in the resultant dataframe. The new columns are going
to be prefixed by the name of the original value column, followed by an underscore and the
original column name, followed by the name of the variable.
to be prefixed by the name of the original value column, followed by the name of the variable.
iex> df = Explorer.DataFrame.new(
iex> product_id: [1, 1, 1, 1, 2, 2, 2, 2],
Expand All @@ -4768,14 +4767,14 @@ defmodule Explorer.DataFrame do
#Explorer.DataFrame<
Polars[2 x 9]
product_id s64 [1, 2]
property_value_property_product_id s64 [1, 2]
property_value_property_width_cm s64 [42, 35]
property_value_property_height_cm s64 [40, 20]
property_value_property_length_cm s64 [64, 40]
another_value_property_product_id s64 [1, 2]
another_value_property_width_cm s64 [43, 36]
another_value_property_height_cm s64 [41, 21]
another_value_property_length_cm s64 [65, 42]
property_value_product_id s64 [1, 2]
property_value_width_cm s64 [42, 35]
property_value_height_cm s64 [40, 20]
property_value_length_cm s64 [64, 40]
another_value_product_id s64 [1, 2]
another_value_width_cm s64 [43, 36]
another_value_height_cm s64 [41, 21]
another_value_length_cm s64 [65, 42]
>
## Grouped examples
Expand Down Expand Up @@ -6073,7 +6072,7 @@ defmodule Explorer.DataFrame do
Basic example:
iex> df = Explorer.DataFrame.new(a: [1, 2, 3], b: ["x", "y", "y"])
iex> Explorer.DataFrame.sql(df, "select a, b from df group by b order by b")
iex> Explorer.DataFrame.sql(df, "select ARRAY_AGG(a), b from df group by b order by b")
#Explorer.DataFrame<
Polars[2 x 2]
a list[s64] [[1], [2, 3]]
Expand Down
38 changes: 22 additions & 16 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule Explorer.PolarsBackend.DataFrame do

with {:ok, df_result} <- adbc_result,
{:ok, df} <- df_result,
do: {:ok, Shared.create_dataframe(df)}
do: Shared.create_dataframe(df)
end

@impl true
Expand Down Expand Up @@ -122,7 +122,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
)

case df do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -195,6 +195,11 @@ defmodule Explorer.PolarsBackend.DataFrame do

{columns, with_projection} = column_names_or_projection(columns)

dtypes_list =
if not Enum.empty?(dtypes) do
Map.to_list(dtypes)
end

df =
Native.df_load_csv(
contents,
Expand All @@ -207,15 +212,15 @@ defmodule Explorer.PolarsBackend.DataFrame do
delimiter,
true,
columns,
Map.to_list(dtypes),
dtypes_list,
encoding,
nil_values,
parse_dates,
char_byte(eol_delimiter)
)

case df do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -242,7 +247,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
@impl true
def from_ndjson(%Local.Entry{} = entry, infer_schema_length, batch_size) do
case Native.df_from_ndjson(entry.path, infer_schema_length, batch_size) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -274,7 +279,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
@impl true
def load_ndjson(contents, infer_schema_length, batch_size) when is_binary(contents) do
case Native.df_load_ndjson(contents, infer_schema_length, batch_size) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -284,7 +289,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
# We first read using a lazy dataframe, then we collect.
with {:ok, ldf} <- Native.lf_from_parquet_cloud(entry, max_rows, columns),
{:ok, df} <- Native.lf_compute(ldf) do
{:ok, Shared.create_dataframe(df)}
Shared.create_dataframe(df)
end
end

Expand Down Expand Up @@ -316,7 +321,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
)

case df do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -370,7 +375,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
@impl true
def load_parquet(contents) when is_binary(contents) do
case Native.df_load_parquet(contents) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -394,7 +399,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
{columns, projection} = column_names_or_projection(columns)

case Native.df_from_ipc(entry.path, columns, projection) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -428,7 +433,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
{columns, projection} = column_names_or_projection(columns)

case Native.df_load_ipc(contents, columns, projection) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -452,7 +457,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
{columns, projection} = column_names_or_projection(columns)

case Native.df_from_ipc_stream(entry.path, columns, projection) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -486,7 +491,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
{columns, projection} = column_names_or_projection(columns)

case Native.df_load_ipc_stream(contents, columns, projection) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -556,7 +561,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
list = Enum.map(list, & &1.data)

Shared.apply(:df_from_series, [list])
|> Shared.create_dataframe()
|> Shared.create_dataframe!()
end

defp to_column_name!(column_name) when is_binary(column_name), do: column_name
Expand Down Expand Up @@ -669,7 +674,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
@impl true
def nil_count(%DataFrame{} = df) do
Shared.apply(:df_nil_count, [df.data])
|> Shared.create_dataframe()
|> Shared.create_dataframe!()
end

@impl true
Expand Down Expand Up @@ -707,6 +712,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
expressions,
directions,
maintain_order?,
multithreaded?,
nulls_last?,
df.groups
])
Expand Down Expand Up @@ -811,7 +817,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
values_from,
names_prefix_optional
])
|> Shared.create_dataframe()
|> Shared.create_dataframe!()
end

@impl true
Expand Down
21 changes: 7 additions & 14 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do
)

case result do
{:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)}
{:ok, polars_ldf} -> Shared.create_dataframe(polars_ldf)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -204,15 +204,15 @@ defmodule Explorer.PolarsBackend.LazyFrame do
@impl true
def from_parquet(%S3.Entry{} = entry, max_rows, columns, _rechunk) do
case Native.lf_from_parquet_cloud(entry, max_rows, columns) do
{:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)}
{:ok, polars_ldf} -> Shared.create_dataframe(polars_ldf)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end

@impl true
def from_parquet(%Local.Entry{} = entry, max_rows, columns, _rechunk) do
case Native.lf_from_parquet(entry.path, max_rows, columns) do
{:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)}
{:ok, polars_ldf} -> Shared.create_dataframe(polars_ldf)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -226,7 +226,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do
@impl true
def from_ndjson(%Local.Entry{} = entry, infer_schema_length, batch_size) do
case Native.lf_from_ndjson(entry.path, infer_schema_length, batch_size) do
{:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)}
{:ok, polars_ldf} -> Shared.create_dataframe(polars_ldf)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -240,7 +240,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do
@impl true
def from_ipc(%Local.Entry{} = entry, columns) when is_nil(columns) do
case Native.lf_from_ipc(entry.path) do
{:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)}
{:ok, polars_ldf} -> Shared.create_dataframe(polars_ldf)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -364,15 +364,8 @@ defmodule Explorer.PolarsBackend.LazyFrame do
end

@impl true
def to_parquet(%DF{} = ldf, %S3.Entry{} = entry, {compression, level}, _streaming = true) do
case Native.lf_to_parquet_cloud(
ldf.data,
entry,
Shared.parquet_compression(compression, level)
) do
{:ok, _} -> :ok
{:error, error} -> {:error, RuntimeError.exception(error)}
end
def to_parquet(%DF{} = _ldf, %S3.Entry{} = _entry, {_compression, _level}, _streaming = true) do
raise "streaming of a lazy frame to the cloud using parquet is currently unavailable. Please try again disabling the `:streaming` option."
end

@impl true
Expand Down
1 change: 1 addition & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ defmodule Explorer.PolarsBackend.Native do
_expressions,
_directions,
_maintain_order?,
_multithreaded?,
_nulls_last?,
_groups
),
Expand Down
13 changes: 10 additions & 3 deletions lib/explorer/polars_backend/series.ex
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ defmodule Explorer.PolarsBackend.Series do

def frequencies(%Series{} = series) do
Shared.apply(:s_frequencies, [series.data])
|> Shared.create_dataframe()
|> Shared.create_dataframe!()
|> DataFrame.rename(["values", "counts"])
end

Expand All @@ -534,7 +534,7 @@ defmodule Explorer.PolarsBackend.Series do
category_label
) do
{:ok, polars_df} ->
Shared.create_dataframe(polars_df)
Shared.create_dataframe!(polars_df)

{:error, "Polars Error: lengths don't match: " <> _rest} ->
raise ArgumentError, "lengths don't match: labels count must equal bins count"
Expand All @@ -553,7 +553,7 @@ defmodule Explorer.PolarsBackend.Series do
break_point_label,
category_label
])
|> Shared.create_dataframe()
|> Shared.create_dataframe!()
end

# Window
Expand Down Expand Up @@ -596,6 +596,13 @@ defmodule Explorer.PolarsBackend.Series do
end

defp window_function(operation, series, window_size, weights, min_periods, center) do
series =
if List.wrap(weights) == [] do
series
else
cast(series, {:f, 64})
end

Shared.apply_series(series, operation, [window_size, weights, min_periods, center])
end

Expand Down
Loading

0 comments on commit 59504fa

Please sign in to comment.