Skip to content

Commit

Permalink
Refactor eager DF implementation to make use of lazy backend (#893)
Browse files Browse the repository at this point in the history
The idea is to have less code to maintain.
  • Loading branch information
philss authored Apr 12, 2024
1 parent 5167342 commit c3ad65a
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 314 deletions.
135 changes: 72 additions & 63 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
@moduledoc false

alias Explorer.DataFrame, as: DataFrame
alias Explorer.PolarsBackend.LazyFrame
alias Explorer.PolarsBackend.Native
alias Explorer.PolarsBackend.Series, as: PolarsSeries
alias Explorer.PolarsBackend.Shared
Expand All @@ -11,7 +12,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
alias FSS.Local
alias FSS.S3

import Explorer.PolarsBackend.Expression, only: [to_expr: 1, alias_expr: 2]
import Explorer.PolarsBackend.Expression, only: [to_expr: 1]

defstruct resource: nil

Expand Down Expand Up @@ -604,37 +605,47 @@ defmodule Explorer.PolarsBackend.DataFrame do
# Single table verbs

@impl true
def head(%DataFrame{} = df, rows),
do: Shared.apply_dataframe(df, df, :df_head, [rows, df.groups])
def head(%DataFrame{} = df, rows) do
df
|> lazy()
|> LazyFrame.head(rows)
|> LazyFrame.collect()
end

@impl true
def tail(%DataFrame{} = df, rows),
do: Shared.apply_dataframe(df, df, :df_tail, [rows, df.groups])
def tail(%DataFrame{} = df, rows) do
df
|> lazy()
|> LazyFrame.tail(rows)
|> LazyFrame.collect()
end

@impl true
def select(df, out_df),
do: Shared.apply_dataframe(df, out_df, :df_select, [out_df.names])
def select(df, out_df) do
df
|> lazy()
|> LazyFrame.select(out_df)
|> LazyFrame.collect()
end

@impl true
def mask(df, %Series{} = mask),
do: Shared.apply_dataframe(df, df, :df_mask, [mask.data])

@impl true
def filter_with(df, out_df, %Explorer.Backend.LazySeries{} = lseries) do
expressions = to_expr(lseries)
Shared.apply_dataframe(df, out_df, :df_filter_with, [expressions, df.groups])
df
|> lazy()
|> LazyFrame.filter_with(out_df, lseries)
|> LazyFrame.collect()
end

@impl true
def mutate_with(%DataFrame{} = df, %DataFrame{} = out_df, column_pairs) do
exprs =
for {name, lazy_series} <- column_pairs do
lazy_series
|> to_expr()
|> alias_expr(name)
end

Shared.apply_dataframe(df, out_df, :df_mutate_with_exprs, [exprs, df.groups])
df
|> lazy()
|> LazyFrame.mutate_with(out_df, column_pairs)
|> LazyFrame.collect()
end

@impl true
Expand Down Expand Up @@ -693,14 +704,19 @@ defmodule Explorer.PolarsBackend.DataFrame do

@impl true
def distinct(%DataFrame{} = df, %DataFrame{} = out_df, columns) do
maybe_columns_to_keep = if df.names != out_df.names, do: out_df.names

Shared.apply_dataframe(df, out_df, :df_distinct, [columns, maybe_columns_to_keep])
df
|> lazy()
|> LazyFrame.distinct(out_df, columns)
|> LazyFrame.collect()
end

@impl true
def rename(%DataFrame{} = df, %DataFrame{} = out_df, pairs),
do: Shared.apply_dataframe(df, out_df, :df_rename_columns, [pairs])
def rename(%DataFrame{} = df, %DataFrame{} = out_df, pairs) do
df
|> lazy()
|> LazyFrame.rename(out_df, pairs)
|> LazyFrame.collect()
end

@impl true
def dummies(df, out_df, names),
Expand Down Expand Up @@ -758,16 +774,19 @@ defmodule Explorer.PolarsBackend.DataFrame do
do: Shared.apply_dataframe(df, df, :df_slice, [offset, length, df.groups])

@impl true
def drop_nil(df, columns), do: Shared.apply_dataframe(df, df, :df_drop_nils, [columns])
def drop_nil(df, columns) do
df
|> lazy()
|> LazyFrame.drop_nil(columns)
|> LazyFrame.collect()
end

@impl true
def pivot_longer(df, out_df, columns_to_pivot, columns_to_keep, names_to, values_to) do
Shared.apply_dataframe(df, out_df, :df_pivot_longer, [
columns_to_keep,
columns_to_pivot,
names_to,
values_to
])
def pivot_longer(df, out_df, cols_to_pivot, cols_to_keep, names_to, values_to) do
df
|> lazy()
|> LazyFrame.pivot_longer(out_df, cols_to_pivot, cols_to_keep, names_to, values_to)
|> LazyFrame.collect()
end

@impl true
Expand All @@ -791,12 +810,18 @@ defmodule Explorer.PolarsBackend.DataFrame do

@impl true
def explode(df, out_df, columns) do
Shared.apply_dataframe(df, out_df, :df_explode, [columns])
df
|> lazy()
|> LazyFrame.explode(out_df, columns)
|> LazyFrame.collect()
end

@impl true
def unnest(df, out_df, columns) do
Shared.apply_dataframe(df, out_df, :df_unnest, [columns])
df
|> lazy()
|> LazyFrame.unnest(out_df, columns)
|> LazyFrame.collect()
end

@impl true
Expand All @@ -813,33 +838,22 @@ defmodule Explorer.PolarsBackend.DataFrame do

# Two or more table verbs

@impl true
def join(left, right, out_df, on, :right) do
# Join right is just the "join left" with inverted DFs and swapped "on" instructions.
# If columns on left have the same names from right, and they are not in "on" instructions,
# then we add a suffix "_left".
{left_on, right_on} =
on
|> Enum.reverse()
|> Enum.map(fn {left, right} -> {right, left} end)
|> Enum.unzip()

args = [left.data, left_on, right_on, "left", "_left"]
Shared.apply_dataframe(right, out_df, :df_join, args)
end

@impl true
def join(left, right, out_df, on, how) do
how = Atom.to_string(how)
{left_on, right_on} = Enum.unzip(on)
left = lazy(left)
right = lazy(right)

args = [right.data, left_on, right_on, how, "_right"]
Shared.apply_dataframe(left, out_df, :df_join, args)
ldf = LazyFrame.join(left, right, out_df, on, how)
LazyFrame.collect(ldf)
end

@impl true
def concat_rows([head | tail], out_df) do
Shared.apply_dataframe(head, out_df, :df_concat_rows, [Enum.map(tail, & &1.data)])
def concat_rows([_head | _tail] = dfs, out_df) do
lazy_dfs = Enum.map(dfs, &lazy/1)

lazy_dfs
|> LazyFrame.concat_rows(out_df)
|> LazyFrame.collect()
end

@impl true
Expand All @@ -856,18 +870,13 @@ defmodule Explorer.PolarsBackend.DataFrame do
# Groups

@impl true
def summarise_with(%DataFrame{groups: groups} = df, %DataFrame{} = out_df, column_pairs) do
exprs =
for {name, lazy_series} <- column_pairs do
original_expr = to_expr(lazy_series)
alias_expr(original_expr, name)
end

Shared.apply_dataframe(df, out_df, :df_summarise_with_exprs, [groups_exprs(groups), exprs])
def summarise_with(%DataFrame{} = df, %DataFrame{} = out_df, column_pairs) do
df
|> lazy()
|> LazyFrame.summarise_with(out_df, column_pairs)
|> LazyFrame.collect()
end

defp groups_exprs(groups), do: Enum.map(groups, &Native.expr_column/1)

# Inspect

@impl true
Expand Down
12 changes: 12 additions & 0 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ defmodule Explorer.PolarsBackend.LazyFrame do
# Introspection

@impl true
def inspect(ldf, opts) when node(ldf.data.resource) != node() do
Explorer.Backend.DataFrame.inspect(
ldf,
"LazyPolars (node: #{node(ldf.data.resource)})",
nil,
opts,
elide_columns: true
)
end

def inspect(ldf, opts) do
case Native.lf_fetch(ldf.data, opts.limit) do
{:ok, df} ->
Expand Down Expand Up @@ -417,6 +427,8 @@ defmodule Explorer.PolarsBackend.LazyFrame do
]
)
else
# TODO: log that we cannot set `maintain_order?` nor `nulls_last?`
# This is a limitation of the grouped lazy frame.
Shared.apply_dataframe(
df,
out_df,
Expand Down
14 changes: 0 additions & 14 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,13 @@ defmodule Explorer.PolarsBackend.Native do
do: err()

def df_concat_columns(_df, _others), do: err()
def df_concat_rows(_df, _others), do: err()
def df_distinct(_df, _subset, _selection), do: err()
def df_drop(_df, _name), do: err()
def df_drop_nils(_df, _subset), do: err()
def df_dtypes(_df), do: err()
def df_dump_csv(_df, _has_headers, _delimiter), do: err()
def df_dump_ndjson(_df), do: err()
def df_dump_parquet(_df, _compression), do: err()
def df_dump_ipc(_df, _compression), do: err()
def df_dump_ipc_stream(_df, _compression), do: err()
def df_filter_with(_df, _operation, _groups), do: err()

def df_from_csv(
_filename,
Expand Down Expand Up @@ -119,8 +115,6 @@ defmodule Explorer.PolarsBackend.Native do
def df_from_series(_columns), do: err()
def df_group_indices(_df, _column_names), do: err()
def df_groups(_df, _column_names), do: err()
def df_head(_df, _length, _groups), do: err()
def df_join(_df, _other, _left_on, _right_on, _how, _suffix), do: err()

def df_load_csv(
_binary,
Expand All @@ -147,24 +141,18 @@ defmodule Explorer.PolarsBackend.Native do
def df_load_parquet(_binary), do: err()

def df_mask(_df, _mask), do: err()
def df_mutate_with_exprs(_df, _exprs, _groups), do: err()
def df_n_rows(_df), do: err()
def df_names(_df), do: err()
def df_pivot_longer(_df, _id_vars, _value_vars, _names_to, _values_to), do: err()
def df_pivot_wider(_df, _id_columns, _pivot_column, _values_column, _names_prefix), do: err()
def df_pull(_df, _name), do: err()
def df_put_column(_df, _series), do: err()
def df_rename_columns(_df, _old_new_pairs), do: err()
def df_sample_frac(_df, _frac, _with_replacement, _shuffle, _seed, _groups), do: err()
def df_sample_n(_df, _n, _with_replacement, _shuffle, _seed, _groups), do: err()
def df_select(_df, _selection), do: err()
def df_select_at_idx(_df, _idx), do: err()
def df_shape(_df), do: err()
def df_slice(_df, _offset, _length, _groups), do: err()
def df_slice_by_indices(_df, _indices, _groups), do: err()
def df_slice_by_series(_df, _series, _groups), do: err()
def df_summarise_with_exprs(_df, _groups_exprs, _aggs_pairs), do: err()
def df_tail(_df, _length, _groups), do: err()
def df_transpose(_df, _keep_names_as, _new_col_names), do: err()
def df_to_csv(_df, _filename, _has_headers, _delimiter), do: err()
def df_to_csv_cloud(_df, _ex_entry, _has_headers, _delimiter), do: err()
Expand All @@ -180,8 +168,6 @@ defmodule Explorer.PolarsBackend.Native do
def df_to_parquet_cloud(_df, _ex_entry, _compression), do: err()
def df_width(_df), do: err()
def df_nil_count(_df), do: err()
def df_explode(_df, _columns), do: err()
def df_unnest(_df, _columns), do: err()

# Expressions (for lazy queries)
@multi_arity_expressions [slice: 2, slice: 3, log: 1, log: 2]
Expand Down
6 changes: 4 additions & 2 deletions lib/explorer/polars_backend/shared.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ defmodule Explorer.PolarsBackend.Shared do
# the full picture of the result yet.
check_df =
if match?(%PolarsLazyFrame{}, new_df) do
{:ok, new_df} = Native.lf_collect(new_df)
create_dataframe(new_df)
case Native.lf_collect(new_df) do
{:ok, new_df} -> create_dataframe(new_df)
{:error, error} -> raise runtime_error(error)
end
else
create_dataframe(new_df)
end
Expand Down
Loading

0 comments on commit c3ad65a

Please sign in to comment.