diff --git a/lib/explorer/backend/data_frame.ex b/lib/explorer/backend/data_frame.ex index 6dffaeebd..2df85c510 100644 --- a/lib/explorer/backend/data_frame.ex +++ b/lib/explorer/backend/data_frame.ex @@ -59,7 +59,13 @@ defmodule Explorer.Backend.DataFrame do parse_dates :: boolean(), eol_delimiter :: option(String.t()) ) :: io_result(df) - @callback to_csv(df, entry :: fs_entry(), header? :: boolean(), delimiter :: String.t()) :: + @callback to_csv( + df, + entry :: fs_entry(), + header? :: boolean(), + delimiter :: String.t(), + streaming :: boolean() + ) :: ok_result() @callback dump_csv(df, header? :: boolean(), delimiter :: String.t()) :: io_result(binary()) diff --git a/lib/explorer/data_frame.ex b/lib/explorer/data_frame.ex index ce33d4fd0..4fa5a25ca 100644 --- a/lib/explorer/data_frame.ex +++ b/lib/explorer/data_frame.ex @@ -1327,15 +1327,20 @@ defmodule Explorer.DataFrame do * `:config` - An optional struct, keyword list or map, normally associated with remote file systems. See [IO section](#module-io-operations) for more details. (default: `nil`) + * `:streaming` - Tells the backend if it should use streaming, which means + that the dataframe is not loaded to the memory at once, and instead it is + written in chunks from a lazy dataframe. Defaults to true for local filesystems, + ignored on all others. + """ @doc type: :io @spec to_csv(df :: DataFrame.t(), filename :: fs_entry() | String.t(), opts :: Keyword.t()) :: :ok | {:error, Exception.t()} def to_csv(df, filename, opts \\ []) do - opts = Keyword.validate!(opts, header: true, delimiter: ",", config: nil) + opts = Keyword.validate!(opts, header: true, delimiter: ",", streaming: true, config: nil) with {:ok, entry} <- normalise_entry(filename, opts[:config]) do - Shared.apply_impl(df, :to_csv, [entry, opts[:header], opts[:delimiter]]) + Shared.apply_impl(df, :to_csv, [entry, opts[:header], opts[:delimiter], opts[:streaming]]) end end @@ -4261,8 +4266,7 @@ defmodule Explorer.DataFrame do @doc type: :single @spec transpose(df :: DataFrame.t(), opts :: Keyword.t()) :: DataFrame.t() def transpose(df, opts \\ []) do - opts = - Keyword.validate!(opts, header: false, columns: nil) + opts = Keyword.validate!(opts, header: false, columns: nil) header = case opts[:header] do diff --git a/lib/explorer/polars_backend/data_frame.ex b/lib/explorer/polars_backend/data_frame.ex index f3b4eb5f6..cd5a33093 100644 --- a/lib/explorer/polars_backend/data_frame.ex +++ b/lib/explorer/polars_backend/data_frame.ex @@ -144,7 +144,7 @@ defmodule Explorer.PolarsBackend.DataFrame do end @impl true - def to_csv(%DataFrame{data: df}, %Local.Entry{} = entry, header?, delimiter) do + def to_csv(%DataFrame{data: df}, %Local.Entry{} = entry, header?, delimiter, _streaming) do <> = delimiter case Native.df_to_csv(df, entry.path, header?, delimiter) do @@ -154,7 +154,7 @@ defmodule Explorer.PolarsBackend.DataFrame do end @impl true - def to_csv(%DataFrame{data: df}, %S3.Entry{} = entry, header?, delimiter) do + def to_csv(%DataFrame{data: df}, %S3.Entry{} = entry, header?, delimiter, _streaming) do <> = delimiter case Native.df_to_csv_cloud(df, entry, header?, delimiter) do diff --git a/lib/explorer/polars_backend/lazy_frame.ex b/lib/explorer/polars_backend/lazy_frame.ex index 2ca088a03..7c979a780 100644 --- a/lib/explorer/polars_backend/lazy_frame.ex +++ b/lib/explorer/polars_backend/lazy_frame.ex @@ -353,6 +353,25 @@ defmodule Explorer.PolarsBackend.LazyFrame do end end + @impl true + def to_csv(%DF{} = ldf, %Local.Entry{} = entry, header?, delimiter, streaming) do + <> = delimiter + + polars_df = apply_operations(ldf) + + case Native.lf_to_csv(polars_df, entry.path, header?, delimiter, streaming) do + {:ok, _} -> :ok + {:error, error} -> {:error, RuntimeError.exception(error)} + end + end + + @impl true + def to_csv(%DF{} = ldf, %S3.Entry{} = entry, header?, delimiter, _streaming) do + eager_df = collect(ldf) + + Eager.to_csv(eager_df, entry, header?, delimiter, false) + end + @impl true def to_parquet(%DF{} = ldf, %Local.Entry{} = entry, {compression, level}, streaming) do polars_df = apply_operations(ldf) @@ -573,8 +592,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do @impl true def concat_rows([%DF{} | _tail] = dfs, %DF{} = out_df) do - polars_dfs = - Enum.map(dfs, &apply_operations/1) + polars_dfs = Enum.map(dfs, &apply_operations/1) # Since we apply operations in all DFs, and out_df is pointing to the `head`, # we need to reset the operations for `out_df` (they were applied already). @@ -608,7 +626,6 @@ defmodule Explorer.PolarsBackend.LazyFrame do put: 4, sample: 5, slice: 2, - to_csv: 4, to_ipc_stream: 3, to_ndjson: 2, to_rows: 2, diff --git a/lib/explorer/polars_backend/native.ex b/lib/explorer/polars_backend/native.ex index 06e293406..709d1887f 100644 --- a/lib/explorer/polars_backend/native.ex +++ b/lib/explorer/polars_backend/native.ex @@ -273,6 +273,7 @@ defmodule Explorer.PolarsBackend.Native do def lf_to_parquet(_df, _filename, _compression, _streaming), do: err() def lf_to_parquet_cloud(_df, _filename, _compression), do: err() def lf_to_ipc(_df, _filename, _compression, _streaming), do: err() + def lf_to_csv(_df, _filename, _header, _delimiter, _streaming), do: err() # Series def s_as_str(_s), do: err() diff --git a/native/explorer/src/lazyframe/io.rs b/native/explorer/src/lazyframe/io.rs index 735613eaf..8fef0e769 100644 --- a/native/explorer/src/lazyframe/io.rs +++ b/native/explorer/src/lazyframe/io.rs @@ -225,6 +225,43 @@ pub fn lf_from_csv( Ok(ExLazyFrame::new(df)) } +#[rustler::nif(schedule = "DirtyIo")] +pub fn lf_to_csv( + data: ExLazyFrame, + filename: &str, + include_headers: bool, + delimiter: u8, + streaming: bool, +) -> Result<(), ExplorerError> { + let lf = data.clone_inner(); + if streaming { + let serialize_options = SerializeOptions { + separator: delimiter, + ..Default::default() + }; + + let options = CsvWriterOptions { + include_header: include_headers, + maintain_order: true, + serialize_options, + ..Default::default() + }; + + lf.sink_csv(filename.into(), options)?; + Ok(()) + } else { + let df = lf.collect()?; + let file = File::create(filename)?; + let mut buf_writer = BufWriter::new(file); + + CsvWriter::new(&mut buf_writer) + .include_header(include_headers) + .with_separator(delimiter) + .finish(&mut df.clone())?; + Ok(()) + } +} + #[cfg(feature = "ndjson")] #[rustler::nif] pub fn lf_from_ndjson( diff --git a/native/explorer/src/lib.rs b/native/explorer/src/lib.rs index 0f77c2430..2f8963789 100644 --- a/native/explorer/src/lib.rs +++ b/native/explorer/src/lib.rs @@ -319,6 +319,7 @@ rustler::init!( lf_to_parquet, lf_to_parquet_cloud, lf_to_ipc, + lf_to_csv, // series s_as_str, s_abs, diff --git a/test/explorer/data_frame/lazy_test.exs b/test/explorer/data_frame/lazy_test.exs index aa4071c28..c73bdda51 100644 --- a/test/explorer/data_frame/lazy_test.exs +++ b/test/explorer/data_frame/lazy_test.exs @@ -249,6 +249,52 @@ defmodule Explorer.DataFrame.LazyTest do end end + @tag :tmp_dir + test "to_csv/2 - with defaults", %{ldf: ldf, tmp_dir: tmp_dir} do + path = Path.join([tmp_dir, "fossil_fuels.csv"]) + + ldf = DF.head(ldf, 15) + assert :ok = DF.to_csv(ldf, path) + + df = DF.collect(ldf) + df1 = DF.from_csv!(path) + + assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort() + end + + @tag :tmp_dir + test "to_csv/2 - with streaming disabled", %{ldf: ldf, tmp_dir: tmp_dir} do + path = Path.join([tmp_dir, "fossil_fuels.csv"]) + + ldf = DF.head(ldf, 15) + assert :ok = DF.to_csv(ldf, path, streaming: false) + + df = DF.collect(ldf) + df1 = DF.from_csv!(path) + + assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort() + end + + @tag :cloud_integration + test "to_csv/3 - cloud with streaming enabled - ignores streaming option", %{ldf: ldf} do + config = %FSS.S3.Config{ + access_key_id: "test", + secret_access_key: "test", + endpoint: "http://localhost:4566", + region: "us-east-1" + } + + path = "s3://test-bucket/test-lazy-writes/wine-#{System.monotonic_time()}.csv" + + ldf = DF.head(ldf, 15) + assert :ok = DF.to_csv(ldf, path, streaming: true, config: config) + + df = DF.collect(ldf) + df1 = DF.from_csv!(path, config: config) + + assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort() + end + @tag :tmp_dir test "to_ipc/2 - with defaults", %{ldf: ldf, tmp_dir: tmp_dir} do path = Path.join([tmp_dir, "fossil_fuels.ipc"])