Skip to content

Commit

Permalink
Implements :streaming option for DataFrame.to_csv/3 (#889)
Browse files Browse the repository at this point in the history
* Implements streaming option for DataFrame.to_csv/3

* Adds more lazy tests for streaming with S3 enabled

* Rust linting errors fixed

* Ignores streaming option on unsupported backends

* Update lib/explorer/data_frame.ex
  • Loading branch information
ryancurtin authored Mar 27, 2024
1 parent 8c5e04b commit 5b6eaf3
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 10 deletions.
8 changes: 7 additions & 1 deletion lib/explorer/backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ defmodule Explorer.Backend.DataFrame do
parse_dates :: boolean(),
eol_delimiter :: option(String.t())
) :: io_result(df)
@callback to_csv(df, entry :: fs_entry(), header? :: boolean(), delimiter :: String.t()) ::
@callback to_csv(
df,
entry :: fs_entry(),
header? :: boolean(),
delimiter :: String.t(),
streaming :: boolean()
) ::
ok_result()
@callback dump_csv(df, header? :: boolean(), delimiter :: String.t()) :: io_result(binary())

Expand Down
12 changes: 8 additions & 4 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1327,15 +1327,20 @@ defmodule Explorer.DataFrame do
* `:config` - An optional struct, keyword list or map, normally associated with remote
file systems. See [IO section](#module-io-operations) for more details. (default: `nil`)
* `:streaming` - Tells the backend if it should use streaming, which means
that the dataframe is not loaded to the memory at once, and instead it is
written in chunks from a lazy dataframe. Defaults to true for local filesystems,
ignored on all others.
"""
@doc type: :io
@spec to_csv(df :: DataFrame.t(), filename :: fs_entry() | String.t(), opts :: Keyword.t()) ::
:ok | {:error, Exception.t()}
def to_csv(df, filename, opts \\ []) do
opts = Keyword.validate!(opts, header: true, delimiter: ",", config: nil)
opts = Keyword.validate!(opts, header: true, delimiter: ",", streaming: true, config: nil)

with {:ok, entry} <- normalise_entry(filename, opts[:config]) do
Shared.apply_impl(df, :to_csv, [entry, opts[:header], opts[:delimiter]])
Shared.apply_impl(df, :to_csv, [entry, opts[:header], opts[:delimiter], opts[:streaming]])
end
end

Expand Down Expand Up @@ -4261,8 +4266,7 @@ defmodule Explorer.DataFrame do
@doc type: :single
@spec transpose(df :: DataFrame.t(), opts :: Keyword.t()) :: DataFrame.t()
def transpose(df, opts \\ []) do
opts =
Keyword.validate!(opts, header: false, columns: nil)
opts = Keyword.validate!(opts, header: false, columns: nil)

header =
case opts[:header] do
Expand Down
4 changes: 2 additions & 2 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
end

@impl true
def to_csv(%DataFrame{data: df}, %Local.Entry{} = entry, header?, delimiter) do
def to_csv(%DataFrame{data: df}, %Local.Entry{} = entry, header?, delimiter, _streaming) do
<<delimiter::utf8>> = delimiter

case Native.df_to_csv(df, entry.path, header?, delimiter) do
Expand All @@ -154,7 +154,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
end

@impl true
def to_csv(%DataFrame{data: df}, %S3.Entry{} = entry, header?, delimiter) do
def to_csv(%DataFrame{data: df}, %S3.Entry{} = entry, header?, delimiter, _streaming) do
<<delimiter::utf8>> = delimiter

case Native.df_to_csv_cloud(df, entry, header?, delimiter) do
Expand Down
23 changes: 20 additions & 3 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,25 @@ defmodule Explorer.PolarsBackend.LazyFrame do
end
end

@impl true
def to_csv(%DF{} = ldf, %Local.Entry{} = entry, header?, delimiter, streaming) do
<<delimiter::utf8>> = delimiter

polars_df = apply_operations(ldf)

case Native.lf_to_csv(polars_df, entry.path, header?, delimiter, streaming) do
{:ok, _} -> :ok
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end

@impl true
def to_csv(%DF{} = ldf, %S3.Entry{} = entry, header?, delimiter, _streaming) do
eager_df = collect(ldf)

Eager.to_csv(eager_df, entry, header?, delimiter, false)
end

@impl true
def to_parquet(%DF{} = ldf, %Local.Entry{} = entry, {compression, level}, streaming) do
polars_df = apply_operations(ldf)
Expand Down Expand Up @@ -573,8 +592,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do

@impl true
def concat_rows([%DF{} | _tail] = dfs, %DF{} = out_df) do
polars_dfs =
Enum.map(dfs, &apply_operations/1)
polars_dfs = Enum.map(dfs, &apply_operations/1)

# Since we apply operations in all DFs, and out_df is pointing to the `head`,
# we need to reset the operations for `out_df` (they were applied already).
Expand Down Expand Up @@ -608,7 +626,6 @@ defmodule Explorer.PolarsBackend.LazyFrame do
put: 4,
sample: 5,
slice: 2,
to_csv: 4,
to_ipc_stream: 3,
to_ndjson: 2,
to_rows: 2,
Expand Down
1 change: 1 addition & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ defmodule Explorer.PolarsBackend.Native do
def lf_to_parquet(_df, _filename, _compression, _streaming), do: err()
def lf_to_parquet_cloud(_df, _filename, _compression), do: err()
def lf_to_ipc(_df, _filename, _compression, _streaming), do: err()
def lf_to_csv(_df, _filename, _header, _delimiter, _streaming), do: err()

# Series
def s_as_str(_s), do: err()
Expand Down
37 changes: 37 additions & 0 deletions native/explorer/src/lazyframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,43 @@ pub fn lf_from_csv(
Ok(ExLazyFrame::new(df))
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn lf_to_csv(
data: ExLazyFrame,
filename: &str,
include_headers: bool,
delimiter: u8,
streaming: bool,
) -> Result<(), ExplorerError> {
let lf = data.clone_inner();
if streaming {
let serialize_options = SerializeOptions {
separator: delimiter,
..Default::default()
};

let options = CsvWriterOptions {
include_header: include_headers,
maintain_order: true,
serialize_options,
..Default::default()
};

lf.sink_csv(filename.into(), options)?;
Ok(())
} else {
let df = lf.collect()?;
let file = File::create(filename)?;
let mut buf_writer = BufWriter::new(file);

CsvWriter::new(&mut buf_writer)
.include_header(include_headers)
.with_separator(delimiter)
.finish(&mut df.clone())?;
Ok(())
}
}

#[cfg(feature = "ndjson")]
#[rustler::nif]
pub fn lf_from_ndjson(
Expand Down
1 change: 1 addition & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ rustler::init!(
lf_to_parquet,
lf_to_parquet_cloud,
lf_to_ipc,
lf_to_csv,
// series
s_as_str,
s_abs,
Expand Down
46 changes: 46 additions & 0 deletions test/explorer/data_frame/lazy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,52 @@ defmodule Explorer.DataFrame.LazyTest do
end
end

@tag :tmp_dir
test "to_csv/2 - with defaults", %{ldf: ldf, tmp_dir: tmp_dir} do
path = Path.join([tmp_dir, "fossil_fuels.csv"])

ldf = DF.head(ldf, 15)
assert :ok = DF.to_csv(ldf, path)

df = DF.collect(ldf)
df1 = DF.from_csv!(path)

assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort()
end

@tag :tmp_dir
test "to_csv/2 - with streaming disabled", %{ldf: ldf, tmp_dir: tmp_dir} do
path = Path.join([tmp_dir, "fossil_fuels.csv"])

ldf = DF.head(ldf, 15)
assert :ok = DF.to_csv(ldf, path, streaming: false)

df = DF.collect(ldf)
df1 = DF.from_csv!(path)

assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort()
end

@tag :cloud_integration
test "to_csv/3 - cloud with streaming enabled - ignores streaming option", %{ldf: ldf} do
config = %FSS.S3.Config{
access_key_id: "test",
secret_access_key: "test",
endpoint: "http://localhost:4566",
region: "us-east-1"
}

path = "s3://test-bucket/test-lazy-writes/wine-#{System.monotonic_time()}.csv"

ldf = DF.head(ldf, 15)
assert :ok = DF.to_csv(ldf, path, streaming: true, config: config)

df = DF.collect(ldf)
df1 = DF.from_csv!(path, config: config)

assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort()
end

@tag :tmp_dir
test "to_ipc/2 - with defaults", %{ldf: ldf, tmp_dir: tmp_dir} do
path = Path.join([tmp_dir, "fossil_fuels.ipc"])
Expand Down

0 comments on commit 5b6eaf3

Please sign in to comment.