Skip to content

Commit

Permalink
Update Polars to v0.42 and enable to_parquet/ipc cloud (#961)
Browse files Browse the repository at this point in the history
With this, we can enable the `to_parquet/2` again, and add the `to_ipc/2` cloud.

Closes #705
  • Loading branch information
philss authored Aug 15, 2024
1 parent efb7b1c commit a4963f3
Show file tree
Hide file tree
Showing 15 changed files with 228 additions and 310 deletions.
22 changes: 18 additions & 4 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,15 @@ defmodule Explorer.PolarsBackend.LazyFrame do
end

@impl true
def to_parquet(%DF{} = _ldf, %S3.Entry{} = _entry, {_compression, _level}, _streaming = true) do
raise "streaming of a lazy frame to the cloud using parquet is currently unavailable. Please try again disabling the `:streaming` option."
def to_parquet(%DF{} = ldf, %S3.Entry{} = entry, {compression, level}, _streaming = true) do
case Native.lf_to_parquet_cloud(
ldf.data,
entry,
Shared.parquet_compression(compression, level)
) do
{:ok, _} -> :ok
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end

@impl true
Expand All @@ -384,8 +391,15 @@ defmodule Explorer.PolarsBackend.LazyFrame do
end

@impl true
def to_ipc(_df, %S3.Entry{}, _compression, _streaming = true) do
{:error, ArgumentError.exception("streaming is not supported for writes to AWS S3")}
def to_ipc(%DF{} = ldf, %S3.Entry{} = entry, {compression, _level}, _streaming = true) do
case Native.lf_to_ipc_cloud(
ldf.data,
entry,
Atom.to_string(compression)
) do
{:ok, _} -> :ok
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end

@impl true
Expand Down
1 change: 1 addition & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ defmodule Explorer.PolarsBackend.Native do
def lf_to_parquet(_df, _filename, _compression, _streaming), do: err()
def lf_to_parquet_cloud(_df, _filename, _compression), do: err()
def lf_to_ipc(_df, _filename, _compression, _streaming), do: err()
def lf_to_ipc_cloud(_df, _cloud_entry, _compression), do: err()
def lf_to_csv(_df, _filename, _header, _delimiter, _streaming), do: err()
def lf_sql(_df, _sql_string, _table_name), do: err()

Expand Down
2 changes: 1 addition & 1 deletion lib/explorer/series.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6031,7 +6031,7 @@ defmodule Explorer.Series do
iex> Explorer.Series.re_named_captures(s, ~S/(b|d)/)
#Explorer.Series<
Polars[4]
struct[1] [%{"1" => "b"}, %{"1" => "d"}, %{"1" => "b"}, %{"1" => nil}]
struct[1] [%{"1" => "b"}, %{"1" => "d"}, %{"1" => "b"}, nil]
>
iex> s = Explorer.Series.from_list(["[email protected]", "[email protected]"])
Expand Down
Loading

0 comments on commit a4963f3

Please sign in to comment.