Skip to content

Commit

Permalink
Refactor eager DF implementation to make use of lazy backend
Browse files Browse the repository at this point in the history
The idea is to have less code to maintain.
  • Loading branch information
philss committed Apr 11, 2024
1 parent 5167342 commit b58622f
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 314 deletions.
135 changes: 72 additions & 63 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
@moduledoc false

alias Explorer.DataFrame, as: DataFrame
alias Explorer.PolarsBackend.LazyFrame
alias Explorer.PolarsBackend.Native
alias Explorer.PolarsBackend.Series, as: PolarsSeries
alias Explorer.PolarsBackend.Shared
Expand All @@ -11,7 +12,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
alias FSS.Local
alias FSS.S3

import Explorer.PolarsBackend.Expression, only: [to_expr: 1, alias_expr: 2]
import Explorer.PolarsBackend.Expression, only: [to_expr: 1]

defstruct resource: nil

Expand Down Expand Up @@ -604,37 +605,47 @@ defmodule Explorer.PolarsBackend.DataFrame do
# Single table verbs

@impl true
def head(%DataFrame{} = df, rows),
do: Shared.apply_dataframe(df, df, :df_head, [rows, df.groups])
def head(%DataFrame{} = df, rows) do
df
|> lazy()
|> LazyFrame.head(rows)
|> LazyFrame.collect()
end

@impl true
def tail(%DataFrame{} = df, rows),
do: Shared.apply_dataframe(df, df, :df_tail, [rows, df.groups])
def tail(%DataFrame{} = df, rows) do
df
|> lazy()
|> LazyFrame.tail(rows)
|> LazyFrame.collect()
end

@impl true
def select(df, out_df),
do: Shared.apply_dataframe(df, out_df, :df_select, [out_df.names])
def select(df, out_df) do
df
|> lazy()
|> LazyFrame.select(out_df)
|> LazyFrame.collect()
end

@impl true
def mask(df, %Series{} = mask),
do: Shared.apply_dataframe(df, df, :df_mask, [mask.data])

@impl true
def filter_with(df, out_df, %Explorer.Backend.LazySeries{} = lseries) do
expressions = to_expr(lseries)
Shared.apply_dataframe(df, out_df, :df_filter_with, [expressions, df.groups])
df
|> lazy()
|> LazyFrame.filter_with(out_df, lseries)
|> LazyFrame.collect()
end

@impl true
def mutate_with(%DataFrame{} = df, %DataFrame{} = out_df, column_pairs) do
exprs =
for {name, lazy_series} <- column_pairs do
lazy_series
|> to_expr()
|> alias_expr(name)
end

Shared.apply_dataframe(df, out_df, :df_mutate_with_exprs, [exprs, df.groups])
df
|> lazy()
|> LazyFrame.mutate_with(out_df, column_pairs)
|> LazyFrame.collect()
end

@impl true
Expand Down Expand Up @@ -693,14 +704,19 @@ defmodule Explorer.PolarsBackend.DataFrame do

@impl true
def distinct(%DataFrame{} = df, %DataFrame{} = out_df, columns) do
maybe_columns_to_keep = if df.names != out_df.names, do: out_df.names

Shared.apply_dataframe(df, out_df, :df_distinct, [columns, maybe_columns_to_keep])
df
|> lazy()
|> LazyFrame.distinct(out_df, columns)
|> LazyFrame.collect()
end

@impl true
def rename(%DataFrame{} = df, %DataFrame{} = out_df, pairs),
do: Shared.apply_dataframe(df, out_df, :df_rename_columns, [pairs])
def rename(%DataFrame{} = df, %DataFrame{} = out_df, pairs) do
df
|> lazy()
|> LazyFrame.rename(out_df, pairs)
|> LazyFrame.collect()
end

@impl true
def dummies(df, out_df, names),
Expand Down Expand Up @@ -758,16 +774,19 @@ defmodule Explorer.PolarsBackend.DataFrame do
do: Shared.apply_dataframe(df, df, :df_slice, [offset, length, df.groups])

@impl true
def drop_nil(df, columns), do: Shared.apply_dataframe(df, df, :df_drop_nils, [columns])
def drop_nil(df, columns) do
df
|> lazy()
|> LazyFrame.drop_nil(columns)
|> LazyFrame.collect()
end

@impl true
def pivot_longer(df, out_df, columns_to_pivot, columns_to_keep, names_to, values_to) do
Shared.apply_dataframe(df, out_df, :df_pivot_longer, [
columns_to_keep,
columns_to_pivot,
names_to,
values_to
])
def pivot_longer(df, out_df, cols_to_pivot, cols_to_keep, names_to, values_to) do
df
|> lazy()
|> LazyFrame.pivot_longer(out_df, cols_to_pivot, cols_to_keep, names_to, values_to)
|> LazyFrame.collect()
end

@impl true
Expand All @@ -791,12 +810,18 @@ defmodule Explorer.PolarsBackend.DataFrame do

@impl true
def explode(df, out_df, columns) do
Shared.apply_dataframe(df, out_df, :df_explode, [columns])
df
|> lazy()
|> LazyFrame.explode(out_df, columns)
|> LazyFrame.collect()
end

@impl true
def unnest(df, out_df, columns) do
Shared.apply_dataframe(df, out_df, :df_unnest, [columns])
df
|> lazy()
|> LazyFrame.unnest(out_df, columns)
|> LazyFrame.collect()
end

@impl true
Expand All @@ -813,33 +838,22 @@ defmodule Explorer.PolarsBackend.DataFrame do

# Two or more table verbs

@impl true
def join(left, right, out_df, on, :right) do
# Join right is just the "join left" with inverted DFs and swapped "on" instructions.
# If columns on left have the same names from right, and they are not in "on" instructions,
# then we add a suffix "_left".
{left_on, right_on} =
on
|> Enum.reverse()
|> Enum.map(fn {left, right} -> {right, left} end)
|> Enum.unzip()

args = [left.data, left_on, right_on, "left", "_left"]
Shared.apply_dataframe(right, out_df, :df_join, args)
end

@impl true
def join(left, right, out_df, on, how) do
how = Atom.to_string(how)
{left_on, right_on} = Enum.unzip(on)
left = lazy(left)
right = lazy(right)

args = [right.data, left_on, right_on, how, "_right"]
Shared.apply_dataframe(left, out_df, :df_join, args)
ldf = LazyFrame.join(left, right, out_df, on, how)
LazyFrame.collect(ldf)
end

@impl true
def concat_rows([head | tail], out_df) do
Shared.apply_dataframe(head, out_df, :df_concat_rows, [Enum.map(tail, & &1.data)])
def concat_rows([_head | _tail] = dfs, out_df) do
lazy_dfs = Enum.map(dfs, &lazy/1)

lazy_dfs
|> LazyFrame.concat_rows(out_df)
|> LazyFrame.collect()
end

@impl true
Expand All @@ -856,18 +870,13 @@ defmodule Explorer.PolarsBackend.DataFrame do
# Groups

@impl true
def summarise_with(%DataFrame{groups: groups} = df, %DataFrame{} = out_df, column_pairs) do
exprs =
for {name, lazy_series} <- column_pairs do
original_expr = to_expr(lazy_series)
alias_expr(original_expr, name)
end

Shared.apply_dataframe(df, out_df, :df_summarise_with_exprs, [groups_exprs(groups), exprs])
def summarise_with(%DataFrame{} = df, %DataFrame{} = out_df, column_pairs) do
df
|> lazy()
|> LazyFrame.summarise_with(out_df, column_pairs)
|> LazyFrame.collect()
end

defp groups_exprs(groups), do: Enum.map(groups, &Native.expr_column/1)

# Inspect

@impl true
Expand Down
12 changes: 12 additions & 0 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ defmodule Explorer.PolarsBackend.LazyFrame do
# Introspection

@impl true
def inspect(ldf, opts) when node(ldf.data.resource) != node() do
Explorer.Backend.DataFrame.inspect(
ldf,
"LazyPolars (node: #{node(ldf.data.resource)})",
nil,
opts,
elide_columns: true
)
end

def inspect(ldf, opts) do
case Native.lf_fetch(ldf.data, opts.limit) do
{:ok, df} ->
Expand Down Expand Up @@ -417,6 +427,8 @@ defmodule Explorer.PolarsBackend.LazyFrame do
]
)
else
# TODO: log that we cannot set `maintain_order?` nor `nulls_last?`
# This is a limitation of the grouped lazy frame.
Shared.apply_dataframe(
df,
out_df,
Expand Down
14 changes: 0 additions & 14 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,13 @@ defmodule Explorer.PolarsBackend.Native do
do: err()

def df_concat_columns(_df, _others), do: err()
def df_concat_rows(_df, _others), do: err()
def df_distinct(_df, _subset, _selection), do: err()
def df_drop(_df, _name), do: err()
def df_drop_nils(_df, _subset), do: err()
def df_dtypes(_df), do: err()
def df_dump_csv(_df, _has_headers, _delimiter), do: err()
def df_dump_ndjson(_df), do: err()
def df_dump_parquet(_df, _compression), do: err()
def df_dump_ipc(_df, _compression), do: err()
def df_dump_ipc_stream(_df, _compression), do: err()
def df_filter_with(_df, _operation, _groups), do: err()

def df_from_csv(
_filename,
Expand Down Expand Up @@ -119,8 +115,6 @@ defmodule Explorer.PolarsBackend.Native do
def df_from_series(_columns), do: err()
def df_group_indices(_df, _column_names), do: err()
def df_groups(_df, _column_names), do: err()
def df_head(_df, _length, _groups), do: err()
def df_join(_df, _other, _left_on, _right_on, _how, _suffix), do: err()

def df_load_csv(
_binary,
Expand All @@ -147,24 +141,18 @@ defmodule Explorer.PolarsBackend.Native do
def df_load_parquet(_binary), do: err()

def df_mask(_df, _mask), do: err()
def df_mutate_with_exprs(_df, _exprs, _groups), do: err()
def df_n_rows(_df), do: err()
def df_names(_df), do: err()
def df_pivot_longer(_df, _id_vars, _value_vars, _names_to, _values_to), do: err()
def df_pivot_wider(_df, _id_columns, _pivot_column, _values_column, _names_prefix), do: err()
def df_pull(_df, _name), do: err()
def df_put_column(_df, _series), do: err()
def df_rename_columns(_df, _old_new_pairs), do: err()
def df_sample_frac(_df, _frac, _with_replacement, _shuffle, _seed, _groups), do: err()
def df_sample_n(_df, _n, _with_replacement, _shuffle, _seed, _groups), do: err()
def df_select(_df, _selection), do: err()
def df_select_at_idx(_df, _idx), do: err()
def df_shape(_df), do: err()
def df_slice(_df, _offset, _length, _groups), do: err()
def df_slice_by_indices(_df, _indices, _groups), do: err()
def df_slice_by_series(_df, _series, _groups), do: err()
def df_summarise_with_exprs(_df, _groups_exprs, _aggs_pairs), do: err()
def df_tail(_df, _length, _groups), do: err()
def df_transpose(_df, _keep_names_as, _new_col_names), do: err()
def df_to_csv(_df, _filename, _has_headers, _delimiter), do: err()
def df_to_csv_cloud(_df, _ex_entry, _has_headers, _delimiter), do: err()
Expand All @@ -180,8 +168,6 @@ defmodule Explorer.PolarsBackend.Native do
def df_to_parquet_cloud(_df, _ex_entry, _compression), do: err()
def df_width(_df), do: err()
def df_nil_count(_df), do: err()
def df_explode(_df, _columns), do: err()
def df_unnest(_df, _columns), do: err()

# Expressions (for lazy queries)
@multi_arity_expressions [slice: 2, slice: 3, log: 1, log: 2]
Expand Down
6 changes: 4 additions & 2 deletions lib/explorer/polars_backend/shared.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ defmodule Explorer.PolarsBackend.Shared do
# the full picture of the result yet.
check_df =
if match?(%PolarsLazyFrame{}, new_df) do
{:ok, new_df} = Native.lf_collect(new_df)
create_dataframe(new_df)
case Native.lf_collect(new_df) do
{:ok, new_df} -> create_dataframe(new_df)
{:error, error} -> raise runtime_error(error)
end
else
create_dataframe(new_df)
end
Expand Down
Loading

0 comments on commit b58622f

Please sign in to comment.