diff --git a/lib/explorer/polars_backend/data_frame.ex b/lib/explorer/polars_backend/data_frame.ex index cf5babc5c..7867f7ede 100644 --- a/lib/explorer/polars_backend/data_frame.ex +++ b/lib/explorer/polars_backend/data_frame.ex @@ -2,6 +2,7 @@ defmodule Explorer.PolarsBackend.DataFrame do @moduledoc false alias Explorer.DataFrame, as: DataFrame + alias Explorer.PolarsBackend.LazyFrame alias Explorer.PolarsBackend.Native alias Explorer.PolarsBackend.Series, as: PolarsSeries alias Explorer.PolarsBackend.Shared @@ -11,7 +12,7 @@ defmodule Explorer.PolarsBackend.DataFrame do alias FSS.Local alias FSS.S3 - import Explorer.PolarsBackend.Expression, only: [to_expr: 1, alias_expr: 2] + import Explorer.PolarsBackend.Expression, only: [to_expr: 1] defstruct resource: nil @@ -604,16 +605,28 @@ defmodule Explorer.PolarsBackend.DataFrame do # Single table verbs @impl true - def head(%DataFrame{} = df, rows), - do: Shared.apply_dataframe(df, df, :df_head, [rows, df.groups]) + def head(%DataFrame{} = df, rows) do + df + |> lazy() + |> LazyFrame.head(rows) + |> LazyFrame.collect() + end @impl true - def tail(%DataFrame{} = df, rows), - do: Shared.apply_dataframe(df, df, :df_tail, [rows, df.groups]) + def tail(%DataFrame{} = df, rows) do + df + |> lazy() + |> LazyFrame.tail(rows) + |> LazyFrame.collect() + end @impl true - def select(df, out_df), - do: Shared.apply_dataframe(df, out_df, :df_select, [out_df.names]) + def select(df, out_df) do + df + |> lazy() + |> LazyFrame.select(out_df) + |> LazyFrame.collect() + end @impl true def mask(df, %Series{} = mask), @@ -621,20 +634,18 @@ defmodule Explorer.PolarsBackend.DataFrame do @impl true def filter_with(df, out_df, %Explorer.Backend.LazySeries{} = lseries) do - expressions = to_expr(lseries) - Shared.apply_dataframe(df, out_df, :df_filter_with, [expressions, df.groups]) + df + |> lazy() + |> LazyFrame.filter_with(out_df, lseries) + |> LazyFrame.collect() end @impl true def mutate_with(%DataFrame{} = df, %DataFrame{} = out_df, column_pairs) do - exprs = - for {name, lazy_series} <- column_pairs do - lazy_series - |> to_expr() - |> alias_expr(name) - end - - Shared.apply_dataframe(df, out_df, :df_mutate_with_exprs, [exprs, df.groups]) + df + |> lazy() + |> LazyFrame.mutate_with(out_df, column_pairs) + |> LazyFrame.collect() end @impl true @@ -693,14 +704,19 @@ defmodule Explorer.PolarsBackend.DataFrame do @impl true def distinct(%DataFrame{} = df, %DataFrame{} = out_df, columns) do - maybe_columns_to_keep = if df.names != out_df.names, do: out_df.names - - Shared.apply_dataframe(df, out_df, :df_distinct, [columns, maybe_columns_to_keep]) + df + |> lazy() + |> LazyFrame.distinct(out_df, columns) + |> LazyFrame.collect() end @impl true - def rename(%DataFrame{} = df, %DataFrame{} = out_df, pairs), - do: Shared.apply_dataframe(df, out_df, :df_rename_columns, [pairs]) + def rename(%DataFrame{} = df, %DataFrame{} = out_df, pairs) do + df + |> lazy() + |> LazyFrame.rename(out_df, pairs) + |> LazyFrame.collect() + end @impl true def dummies(df, out_df, names), @@ -758,16 +774,19 @@ defmodule Explorer.PolarsBackend.DataFrame do do: Shared.apply_dataframe(df, df, :df_slice, [offset, length, df.groups]) @impl true - def drop_nil(df, columns), do: Shared.apply_dataframe(df, df, :df_drop_nils, [columns]) + def drop_nil(df, columns) do + df + |> lazy() + |> LazyFrame.drop_nil(columns) + |> LazyFrame.collect() + end @impl true - def pivot_longer(df, out_df, columns_to_pivot, columns_to_keep, names_to, values_to) do - Shared.apply_dataframe(df, out_df, :df_pivot_longer, [ - columns_to_keep, - columns_to_pivot, - names_to, - values_to - ]) + def pivot_longer(df, out_df, cols_to_pivot, cols_to_keep, names_to, values_to) do + df + |> lazy() + |> LazyFrame.pivot_longer(out_df, cols_to_pivot, cols_to_keep, names_to, values_to) + |> LazyFrame.collect() end @impl true @@ -791,12 +810,18 @@ defmodule Explorer.PolarsBackend.DataFrame do @impl true def explode(df, out_df, columns) do - Shared.apply_dataframe(df, out_df, :df_explode, [columns]) + df + |> lazy() + |> LazyFrame.explode(out_df, columns) + |> LazyFrame.collect() end @impl true def unnest(df, out_df, columns) do - Shared.apply_dataframe(df, out_df, :df_unnest, [columns]) + df + |> lazy() + |> LazyFrame.unnest(out_df, columns) + |> LazyFrame.collect() end @impl true @@ -813,33 +838,22 @@ defmodule Explorer.PolarsBackend.DataFrame do # Two or more table verbs - @impl true - def join(left, right, out_df, on, :right) do - # Join right is just the "join left" with inverted DFs and swapped "on" instructions. - # If columns on left have the same names from right, and they are not in "on" instructions, - # then we add a suffix "_left". - {left_on, right_on} = - on - |> Enum.reverse() - |> Enum.map(fn {left, right} -> {right, left} end) - |> Enum.unzip() - - args = [left.data, left_on, right_on, "left", "_left"] - Shared.apply_dataframe(right, out_df, :df_join, args) - end - @impl true def join(left, right, out_df, on, how) do - how = Atom.to_string(how) - {left_on, right_on} = Enum.unzip(on) + left = lazy(left) + right = lazy(right) - args = [right.data, left_on, right_on, how, "_right"] - Shared.apply_dataframe(left, out_df, :df_join, args) + ldf = LazyFrame.join(left, right, out_df, on, how) + LazyFrame.collect(ldf) end @impl true - def concat_rows([head | tail], out_df) do - Shared.apply_dataframe(head, out_df, :df_concat_rows, [Enum.map(tail, & &1.data)]) + def concat_rows([_head | _tail] = dfs, out_df) do + lazy_dfs = Enum.map(dfs, &lazy/1) + + lazy_dfs + |> LazyFrame.concat_rows(out_df) + |> LazyFrame.collect() end @impl true @@ -856,18 +870,13 @@ defmodule Explorer.PolarsBackend.DataFrame do # Groups @impl true - def summarise_with(%DataFrame{groups: groups} = df, %DataFrame{} = out_df, column_pairs) do - exprs = - for {name, lazy_series} <- column_pairs do - original_expr = to_expr(lazy_series) - alias_expr(original_expr, name) - end - - Shared.apply_dataframe(df, out_df, :df_summarise_with_exprs, [groups_exprs(groups), exprs]) + def summarise_with(%DataFrame{} = df, %DataFrame{} = out_df, column_pairs) do + df + |> lazy() + |> LazyFrame.summarise_with(out_df, column_pairs) + |> LazyFrame.collect() end - defp groups_exprs(groups), do: Enum.map(groups, &Native.expr_column/1) - # Inspect @impl true diff --git a/lib/explorer/polars_backend/lazy_frame.ex b/lib/explorer/polars_backend/lazy_frame.ex index b2e9a62b6..b0a3a92fd 100644 --- a/lib/explorer/polars_backend/lazy_frame.ex +++ b/lib/explorer/polars_backend/lazy_frame.ex @@ -41,6 +41,16 @@ defmodule Explorer.PolarsBackend.LazyFrame do # Introspection @impl true + def inspect(ldf, opts) when node(ldf.data.resource) != node() do + Explorer.Backend.DataFrame.inspect( + ldf, + "LazyPolars (node: #{node(ldf.data.resource)})", + nil, + opts, + elide_columns: true + ) + end + def inspect(ldf, opts) do case Native.lf_fetch(ldf.data, opts.limit) do {:ok, df} -> @@ -417,6 +427,8 @@ defmodule Explorer.PolarsBackend.LazyFrame do ] ) else + # TODO: log that we cannot set `maintain_order?` nor `nulls_last?` + # This is a limitation of the grouped lazy frame. Shared.apply_dataframe( df, out_df, diff --git a/lib/explorer/polars_backend/native.ex b/lib/explorer/polars_backend/native.ex index 32b1df0d3..016402274 100644 --- a/lib/explorer/polars_backend/native.ex +++ b/lib/explorer/polars_backend/native.ex @@ -73,17 +73,13 @@ defmodule Explorer.PolarsBackend.Native do do: err() def df_concat_columns(_df, _others), do: err() - def df_concat_rows(_df, _others), do: err() - def df_distinct(_df, _subset, _selection), do: err() def df_drop(_df, _name), do: err() - def df_drop_nils(_df, _subset), do: err() def df_dtypes(_df), do: err() def df_dump_csv(_df, _has_headers, _delimiter), do: err() def df_dump_ndjson(_df), do: err() def df_dump_parquet(_df, _compression), do: err() def df_dump_ipc(_df, _compression), do: err() def df_dump_ipc_stream(_df, _compression), do: err() - def df_filter_with(_df, _operation, _groups), do: err() def df_from_csv( _filename, @@ -119,8 +115,6 @@ defmodule Explorer.PolarsBackend.Native do def df_from_series(_columns), do: err() def df_group_indices(_df, _column_names), do: err() def df_groups(_df, _column_names), do: err() - def df_head(_df, _length, _groups), do: err() - def df_join(_df, _other, _left_on, _right_on, _how, _suffix), do: err() def df_load_csv( _binary, @@ -147,24 +141,18 @@ defmodule Explorer.PolarsBackend.Native do def df_load_parquet(_binary), do: err() def df_mask(_df, _mask), do: err() - def df_mutate_with_exprs(_df, _exprs, _groups), do: err() def df_n_rows(_df), do: err() def df_names(_df), do: err() - def df_pivot_longer(_df, _id_vars, _value_vars, _names_to, _values_to), do: err() def df_pivot_wider(_df, _id_columns, _pivot_column, _values_column, _names_prefix), do: err() def df_pull(_df, _name), do: err() def df_put_column(_df, _series), do: err() - def df_rename_columns(_df, _old_new_pairs), do: err() def df_sample_frac(_df, _frac, _with_replacement, _shuffle, _seed, _groups), do: err() def df_sample_n(_df, _n, _with_replacement, _shuffle, _seed, _groups), do: err() - def df_select(_df, _selection), do: err() def df_select_at_idx(_df, _idx), do: err() def df_shape(_df), do: err() def df_slice(_df, _offset, _length, _groups), do: err() def df_slice_by_indices(_df, _indices, _groups), do: err() def df_slice_by_series(_df, _series, _groups), do: err() - def df_summarise_with_exprs(_df, _groups_exprs, _aggs_pairs), do: err() - def df_tail(_df, _length, _groups), do: err() def df_transpose(_df, _keep_names_as, _new_col_names), do: err() def df_to_csv(_df, _filename, _has_headers, _delimiter), do: err() def df_to_csv_cloud(_df, _ex_entry, _has_headers, _delimiter), do: err() @@ -180,8 +168,6 @@ defmodule Explorer.PolarsBackend.Native do def df_to_parquet_cloud(_df, _ex_entry, _compression), do: err() def df_width(_df), do: err() def df_nil_count(_df), do: err() - def df_explode(_df, _columns), do: err() - def df_unnest(_df, _columns), do: err() # Expressions (for lazy queries) @multi_arity_expressions [slice: 2, slice: 3, log: 1, log: 2] diff --git a/lib/explorer/polars_backend/shared.ex b/lib/explorer/polars_backend/shared.ex index 304c1ae1a..cab2f2615 100644 --- a/lib/explorer/polars_backend/shared.ex +++ b/lib/explorer/polars_backend/shared.ex @@ -47,8 +47,10 @@ defmodule Explorer.PolarsBackend.Shared do # the full picture of the result yet. check_df = if match?(%PolarsLazyFrame{}, new_df) do - {:ok, new_df} = Native.lf_collect(new_df) - create_dataframe(new_df) + case Native.lf_collect(new_df) do + {:ok, new_df} -> create_dataframe(new_df) + {:error, error} -> raise runtime_error(error) + end else create_dataframe(new_df) end diff --git a/native/explorer/src/dataframe.rs b/native/explorer/src/dataframe.rs index 7170d61c0..dd264569c 100644 --- a/native/explorer/src/dataframe.rs +++ b/native/explorer/src/dataframe.rs @@ -32,35 +32,6 @@ pub fn df_transpose( Ok(ExDataFrame::new(new_df)) } -#[rustler::nif(schedule = "DirtyCpu")] -pub fn df_join( - df: ExDataFrame, - other: ExDataFrame, - left_on: Vec<&str>, - right_on: Vec<&str>, - how: &str, - suffix: Option, -) -> Result { - let how = match how { - "left" => JoinType::Left, - "inner" => JoinType::Inner, - "outer" => JoinType::Outer { coalesce: false }, - "cross" => JoinType::Cross, - _ => { - return Err(ExplorerError::Other(format!( - "Join method {how} not supported" - ))) - } - }; - - let mut join_args = JoinArgs::new(how); - join_args.suffix = suffix; - - let new_df = df.join(&other, left_on, right_on, join_args)?; - - Ok(ExDataFrame::new(new_df)) -} - #[rustler::nif] pub fn df_names(df: ExDataFrame) -> Result, ExplorerError> { let names = to_string_names(df.get_column_names()); @@ -93,26 +64,6 @@ pub fn df_width(df: ExDataFrame) -> Result { Ok(df.width()) } -#[rustler::nif(schedule = "DirtyCpu")] -pub fn df_concat_rows( - data: ExDataFrame, - others: Vec, -) -> Result { - let mut out_df = data.clone(); - let names = out_df.get_column_names(); - let dfs = others - .into_iter() - .map(|ex_df| ex_df.select(&names)) - .collect::, _>>()?; - - for df in dfs { - out_df.vstack_mut(&df)?; - } - // Follows recommendation from docs and rechunk after many vstacks. - out_df.as_single_chunk_par(); - Ok(ExDataFrame::new(out_df)) -} - #[rustler::nif(schedule = "DirtyCpu")] pub fn df_concat_columns( data: ExDataFrame, @@ -143,15 +94,6 @@ pub fn df_concat_columns( Ok(ExDataFrame::new(out_df.drop([id_column]).collect()?)) } -#[rustler::nif(schedule = "DirtyCpu")] -pub fn df_drop_nils( - df: ExDataFrame, - subset: Option>, -) -> Result { - let new_df = df.drop_nulls(subset.as_deref())?; - Ok(ExDataFrame::new(new_df)) -} - #[rustler::nif(schedule = "DirtyCpu")] pub fn df_drop(df: ExDataFrame, name: &str) -> Result { let new_df = df.drop(name)?; @@ -170,12 +112,6 @@ pub fn df_pull(df: ExDataFrame, name: &str) -> Result { Ok(series) } -#[rustler::nif] -pub fn df_select(df: ExDataFrame, selection: Vec<&str>) -> Result { - let new_df = df.select(selection)?; - Ok(ExDataFrame::new(new_df)) -} - #[rustler::nif(schedule = "DirtyCpu")] pub fn df_mask(df: ExDataFrame, mask: ExSeries) -> Result { if let Ok(ca) = mask.bool() { @@ -186,24 +122,6 @@ pub fn df_mask(df: ExDataFrame, mask: ExSeries) -> Result, -) -> Result { - let exp = ex_expr.clone_inner(); - - let new_df = if groups.is_empty() { - df.clone_inner().lazy().filter(exp).collect()? - } else { - df.group_by_stable(groups)? - .apply(|df| df.lazy().filter(exp.clone()).collect())? - }; - - Ok(ExDataFrame::new(new_df)) -} - #[rustler::nif(schedule = "DirtyCpu")] pub fn df_slice_by_indices( df: ExDataFrame, @@ -413,69 +331,6 @@ pub fn df_slice( Ok(ExDataFrame::new(new_df)) } -#[rustler::nif(schedule = "DirtyCpu")] -pub fn df_head( - df: ExDataFrame, - length: Option, - groups: Vec<&str>, -) -> Result { - let new_df = if groups.is_empty() { - df.head(length) - } else { - df.group_by_stable(groups)? - .apply(|df| Ok(df.head(length)))? - }; - Ok(ExDataFrame::new(new_df)) -} - -#[rustler::nif(schedule = "DirtyCpu")] -pub fn df_tail( - df: ExDataFrame, - length: Option, - groups: Vec<&str>, -) -> Result { - let new_df = if groups.is_empty() { - df.tail(length) - } else { - df.group_by_stable(groups)? - .apply(|df| Ok(df.tail(length)))? - }; - Ok(ExDataFrame::new(new_df)) -} - -#[rustler::nif(schedule = "DirtyCpu")] -pub fn df_pivot_longer( - df: ExDataFrame, - id_vars: Vec<&str>, - value_vars: Vec<&str>, - names_to: String, - values_to: String, -) -> Result { - let melt_opts = MeltArgs { - id_vars: to_smart_strings(id_vars), - value_vars: to_smart_strings(value_vars), - variable_name: Some(names_to.into()), - value_name: Some(values_to.into()), - streamable: true, - }; - let new_df = df.melt2(melt_opts)?; - Ok(ExDataFrame::new(new_df)) -} - -#[rustler::nif(schedule = "DirtyCpu")] -pub fn df_distinct( - df: ExDataFrame, - subset: Vec, - columns_to_keep: Option>, -) -> Result { - let new_df = df.unique_stable(Some(&subset), UniqueKeepStrategy::First, None)?; - - match columns_to_keep { - Some(columns) => Ok(ExDataFrame::new(new_df.select(columns)?)), - None => Ok(ExDataFrame::new(new_df)), - } -} - #[rustler::nif(schedule = "DirtyCpu")] pub fn df_to_dummies(df: ExDataFrame, selection: Vec<&str>) -> Result { let drop_first = false; @@ -501,27 +356,6 @@ pub fn df_nil_count(df: ExDataFrame) -> Result { Ok(ExDataFrame::new(new_df)) } -#[rustler::nif(schedule = "DirtyCpu")] -pub fn df_mutate_with_exprs( - df: ExDataFrame, - columns: Vec, - groups: Vec<&str>, -) -> Result { - let mutations = ex_expr_to_exprs(columns); - let new_df = if groups.is_empty() { - df.clone_inner() - .lazy() - .with_comm_subexpr_elim(false) - .with_columns(mutations) - .collect()? - } else { - df.group_by_stable(groups)? - .apply(|df| df.lazy().with_columns(&mutations).collect())? - }; - - Ok(ExDataFrame::new(new_df)) -} - #[rustler::nif] pub fn df_from_series(columns: Vec) -> Result { let columns = columns.into_iter().map(|c| c.clone_inner()).collect(); @@ -531,19 +365,6 @@ pub fn df_from_series(columns: Vec) -> Result, -) -> Result { - let mut df = df.clone(); - for (original, new_name) in renames { - df.rename(original, new_name)?; - } - - Ok(ExDataFrame::new(df)) -} - #[rustler::nif(schedule = "DirtyCpu")] pub fn df_groups(df: ExDataFrame, groups: Vec<&str>) -> Result { let groups = df.group_by(groups)?.groups()?; @@ -567,33 +388,6 @@ pub fn df_group_indices( Ok(series) } -#[rustler::nif(schedule = "DirtyCpu")] -pub fn df_summarise_with_exprs( - df: ExDataFrame, - groups: Vec, - aggs: Vec, -) -> Result { - let groups = ex_expr_to_exprs(groups); - let aggs = ex_expr_to_exprs(aggs); - - let lf = df.clone_inner().lazy(); - - let new_lf = if groups.is_empty() { - // We do add a "shadow" column to be able to group by it. - // This is going to force some aggregations like "mode" to be always inside - // a "list". - let s = Series::new_null("__explorer_null_for_group__", 1); - lf.with_column(s.lit()) - .group_by_stable(["__explorer_null_for_group__"]) - .agg(aggs) - .select(&[col("*").exclude(["__explorer_null_for_group__"])]) - } else { - lf.group_by_stable(groups).agg(aggs) - }; - - Ok(ExDataFrame::new(new_lf.collect()?)) -} - #[rustler::nif(schedule = "DirtyCpu")] pub fn df_pivot_wider( df: ExDataFrame, @@ -675,18 +469,6 @@ pub fn df_pivot_wider( Ok(ExDataFrame::new(new_df)) } -#[rustler::nif(schedule = "DirtyCpu")] -pub fn df_explode(df: ExDataFrame, columns: Vec<&str>) -> Result { - let new_df = df.explode(columns)?; - Ok(ExDataFrame::new(new_df)) -} - -#[rustler::nif(schedule = "DirtyCpu")] -pub fn df_unnest(df: ExDataFrame, columns: Vec<&str>) -> Result { - let new_df = df.clone_inner().unnest(columns)?; - Ok(ExDataFrame::new(new_df)) -} - #[rustler::nif(schedule = "DirtyCpu")] pub fn df_lazy(df: ExDataFrame) -> Result { let new_lf = df.clone_inner().lazy(); diff --git a/native/explorer/src/lazyframe.rs b/native/explorer/src/lazyframe.rs index f3f5d545f..7ed529ab9 100644 --- a/native/explorer/src/lazyframe.rs +++ b/native/explorer/src/lazyframe.rs @@ -190,10 +190,15 @@ pub fn lf_mutate_with( data: ExLazyFrame, columns: Vec, ) -> Result { - let ldf = data.clone_inner(); let mutations = ex_expr_to_exprs(columns); + // Maybe there is a bug when some expressions are in use without this "comm_subexpr_elim" + // turned off. + let ldf = data + .clone_inner() + .with_comm_subexpr_elim(false) + .with_columns(mutations); - Ok(ExLazyFrame::new(ldf.with_columns(mutations))) + Ok(ExLazyFrame::new(ldf)) } #[rustler::nif] diff --git a/native/explorer/src/lib.rs b/native/explorer/src/lib.rs index 2d2c30fb3..b0bee4796 100644 --- a/native/explorer/src/lib.rs +++ b/native/explorer/src/lib.rs @@ -78,19 +78,14 @@ rustler::init!( df_sort_by, df_sort_with, df_concat_columns, - df_concat_rows, df_nil_count, - df_distinct, df_drop, - df_drop_nils, df_dtypes, df_dump_csv, df_dump_ndjson, df_dump_parquet, df_dump_ipc, df_dump_ipc_stream, - df_explode, - df_filter_with, df_from_csv, df_from_ipc, df_from_ipc_stream, @@ -99,32 +94,24 @@ rustler::init!( df_from_series, df_group_indices, df_groups, - df_head, - df_join, df_load_csv, df_load_ndjson, df_load_parquet, df_load_ipc, df_load_ipc_stream, df_mask, - df_mutate_with_exprs, df_n_rows, df_names, - df_pivot_longer, df_pivot_wider, df_pull, df_put_column, - df_rename_columns, df_sample_frac, df_sample_n, - df_select, df_select_at_idx, df_shape, df_slice, df_slice_by_indices, df_slice_by_series, - df_summarise_with_exprs, - df_tail, df_transpose, df_to_csv, df_to_csv_cloud, @@ -138,7 +125,6 @@ rustler::init!( df_to_ndjson_cloud, df_to_parquet, df_to_parquet_cloud, - df_unnest, df_width, // expressions expr_nil, diff --git a/test/explorer/data_frame_test.exs b/test/explorer/data_frame_test.exs index 09475ebf9..3851e1639 100644 --- a/test/explorer/data_frame_test.exs +++ b/test/explorer/data_frame_test.exs @@ -2943,7 +2943,7 @@ defmodule Explorer.DataFrameTest do test "with string column names and a target that is duplicated" do df = DF.new(a: [1, 2, 3], b: ["a", "b", "c"]) - assert_raise RuntimeError, ~r"duplicate column names found", fn -> + assert_raise RuntimeError, ~r"column with name 'first' has more than one occurrences", fn -> DF.rename(df, [{"a", "first"}, {"b", "first"}]) end end