diff --git a/README.md b/README.md index daf801d2c..871fccc75 100644 --- a/README.md +++ b/README.md @@ -240,6 +240,35 @@ mix localstack.setup mix test --only cloud_integration ``` +## Precompilation + +Explorer ships with the NIF code precompiled for the most popular architectures out there. +We support the following: + +- `aarch64-apple-darwin` - MacOS running on ARM 64 bits CPUs. +- `aarch64-unknown-linux-gnu` - Linux running on ARM 64 bits CPUs, compiled with GCC. +- `aarch64-unknown-linux-musl` - Linux running on ARM 64 bits CPUs, compiled with Musl. +- `riscv64gc-unknown-linux-gnu` - Linux running on RISCV 64 bits CPUs, compiled with GCC. +- `x86_64-apple-darwin` - MacOS running on Intel/AMD 64 bits CPUs. +- `x86_64-pc-windows-msvc` - Windows running on Intel/AMD 64 bits CPUs, compiled with Visual C++. +- `x86_64-pc-windows-gnu` - Windows running on Intel/AMD 64 bits CPUs, compiled with GCC. +- `x86_64-unknown-linux-gnu` - Linux running on Intel/AMD 64 bits CPUs, compiled with GCC. +- `x86_64-unknown-linux-musl` - Linux running on Intel/AMD 64 bits CPUs, compiled with Musl. +- `x86_64-unknown-freebsd` - FreeBSD running on Intel/AMD 64 bits. + +This means that the problem is going to work without the need to compile it from source. +This currently **only works for Hex releases**. For more information on how it works, please +check the [RustlerPrecompiled project](https://hexdocs.pm/rustler_precompiled). + +### Features disabled + +Some of the features cannot be compiled to some targets, because one of the dependencies +don't work on it. + +This is the case for the **NDJSON** reads and writes, that don't work for the RISCV target. +We also disable the AWS S3 reads and writes for the RISCV target, because one of the dependencies +of `ObjectStore` does not compile on it. + ## Sponsors Amplified diff --git a/lib/explorer/backend/data_frame.ex b/lib/explorer/backend/data_frame.ex index df1dcfa20..b7e627fd8 100644 --- a/lib/explorer/backend/data_frame.ex +++ b/lib/explorer/backend/data_frame.ex @@ -43,7 +43,7 @@ defmodule Explorer.Backend.DataFrame do entry :: fs_entry(), dtypes, delimiter :: String.t(), - null_character :: String.t(), + nil_values :: list(String.t()), skip_rows :: integer(), header? :: boolean(), encoding :: String.t(), @@ -61,7 +61,7 @@ defmodule Explorer.Backend.DataFrame do contents :: String.t(), dtypes, delimiter :: String.t(), - null_character :: String.t(), + nil_values :: list(String.t()), skip_rows :: integer(), header? :: boolean(), encoding :: String.t(), diff --git a/lib/explorer/backend/lazy_series.ex b/lib/explorer/backend/lazy_series.ex index c1618dbfe..772a11090 100644 --- a/lib/explorer/backend/lazy_series.ex +++ b/lib/explorer/backend/lazy_series.ex @@ -68,6 +68,7 @@ defmodule Explorer.Backend.LazySeries do cumulative_product: 2, window_max: 5, window_mean: 5, + window_median: 5, window_min: 5, window_sum: 5, window_standard_deviation: 5, @@ -116,6 +117,7 @@ defmodule Explorer.Backend.LazySeries do trim: 2, upcase: 1, downcase: 1, + substring: 3, # Float round round: 2, floor: 1, @@ -155,6 +157,7 @@ defmodule Explorer.Backend.LazySeries do @window_fun_operations [ :window_max, :window_mean, + :window_median, :window_min, :window_sum ] @@ -890,6 +893,13 @@ defmodule Explorer.Backend.LazySeries do Backend.Series.new(data, :string) end + @impl true + def substring(series, offset, length) do + data = new(:substring, [lazy_series!(series), offset, length]) + + Backend.Series.new(data, :string) + end + @impl true def round(series, decimals) when is_integer(decimals) and decimals >= 0 do data = new(:round, [lazy_series!(series), decimals]) diff --git a/lib/explorer/backend/series.ex b/lib/explorer/backend/series.ex index eb1434e1d..fb8a1cd67 100644 --- a/lib/explorer/backend/series.ex +++ b/lib/explorer/backend/series.ex @@ -199,6 +199,13 @@ defmodule Explorer.Backend.Series do min_periods :: integer() | nil, center :: boolean() ) :: s + @callback window_median( + s, + window_size :: integer(), + weights :: [float()] | nil, + min_periods :: integer() | nil, + center :: boolean() + ) :: s @callback window_standard_deviation( s, window_size :: integer(), @@ -240,6 +247,7 @@ defmodule Explorer.Backend.Series do @callback trim(s, String.t() | nil) :: s @callback trim_leading(s, String.t() | nil) :: s @callback trim_trailing(s, String.t() | nil) :: s + @callback substring(s, integer(), non_neg_integer() | nil) :: s # Date / DateTime diff --git a/lib/explorer/data_frame.ex b/lib/explorer/data_frame.ex index 6880f3f97..fd695b2f7 100644 --- a/lib/explorer/data_frame.ex +++ b/lib/explorer/data_frame.ex @@ -516,7 +516,7 @@ defmodule Explorer.DataFrame do * `:max_rows` - Maximum number of lines to read. (default: `nil`) - * `:null_character` - The string that should be interpreted as a nil value. (default: `"NA"`) + * `:nil_values` - A list of strings that should be interpreted as a nil values. (default: `[]`) * `:skip_rows` - The number of lines to skip at the beginning of the file. (default: `0`) @@ -553,7 +553,7 @@ defmodule Explorer.DataFrame do encoding: "utf8", header: true, max_rows: nil, - null_character: "NA", + nil_values: [], skip_rows: 0, columns: nil, infer_schema_length: @default_infer_schema_length, @@ -568,7 +568,7 @@ defmodule Explorer.DataFrame do entry, check_dtypes!(opts[:dtypes]), opts[:delimiter], - opts[:null_character], + opts[:nil_values], opts[:skip_rows], opts[:header], opts[:encoding], @@ -611,7 +611,7 @@ defmodule Explorer.DataFrame do imputed from the first 1000 rows. (default: `[]`) * `:header` - Does the file have a header of column names as the first row or not? (default: `true`) * `:max_rows` - Maximum number of lines to read. (default: `nil`) - * `:null_character` - The string that should be interpreted as a nil value. (default: `"NA"`) + * `:nil_values` - A list of strings that should be interpreted as a nil values. (default: `[]`) * `:skip_rows` - The number of lines to skip at the beginning of the file. (default: `0`) * `:columns` - A list of column names or indexes to keep. If present, only these columns are read into the dataframe. (default: `nil`) * `:infer_schema_length` Maximum number of rows read for schema inference. Setting this to nil will do a full table scan and will be slow (default: `1000`). @@ -633,7 +633,7 @@ defmodule Explorer.DataFrame do encoding: "utf8", header: true, max_rows: nil, - null_character: "NA", + nil_values: [], skip_rows: 0, columns: nil, infer_schema_length: @default_infer_schema_length, @@ -647,7 +647,7 @@ defmodule Explorer.DataFrame do contents, check_dtypes!(opts[:dtypes]), opts[:delimiter], - opts[:null_character], + opts[:nil_values], opts[:skip_rows], opts[:header], opts[:encoding], @@ -785,7 +785,7 @@ defmodule Explorer.DataFrame do @doc type: :io @spec to_parquet(df :: DataFrame.t(), filename :: String.t() | fs_entry(), opts :: Keyword.t()) :: :ok | {:error, term()} - def to_parquet(df, filename, opts \\ []) do + def to_parquet(%DataFrame{} = df, filename, opts \\ []) do opts = Keyword.validate!(opts, compression: nil, streaming: true, config: nil) compression = parquet_compression(opts[:compression]) diff --git a/lib/explorer/polars_backend/data_frame.ex b/lib/explorer/polars_backend/data_frame.ex index d94030108..be058ad28 100644 --- a/lib/explorer/polars_backend/data_frame.ex +++ b/lib/explorer/polars_backend/data_frame.ex @@ -56,7 +56,7 @@ defmodule Explorer.PolarsBackend.DataFrame do %Local.Entry{} = entry, dtypes, <>, - null_character, + nil_values, skip_rows, header?, encoding, @@ -91,7 +91,7 @@ defmodule Explorer.PolarsBackend.DataFrame do columns, dtypes, encoding, - null_character, + nil_values, parse_dates, char_byte(eol_delimiter) ) @@ -130,14 +130,10 @@ defmodule Explorer.PolarsBackend.DataFrame do end @impl true - def to_csv(_df, %S3.Entry{}, _header?, _delimiter) do - raise "S3 is not supported yet" - end - - def to_csv_writer_sample(%DataFrame{data: df}, path, header?, delimiter) do + def to_csv(%DataFrame{data: df}, %S3.Entry{} = entry, header?, delimiter) do <> = delimiter - case Native.df_to_csv_writer_sample(df, path, header?, delimiter) do + case Native.df_to_csv_cloud(df, entry, header?, delimiter) do {:ok, _} -> :ok {:error, error} -> {:error, error} end @@ -153,7 +149,7 @@ defmodule Explorer.PolarsBackend.DataFrame do contents, dtypes, <>, - null_character, + nil_values, skip_rows, header?, encoding, @@ -188,7 +184,7 @@ defmodule Explorer.PolarsBackend.DataFrame do columns, dtypes, encoding, - null_character, + nil_values, parse_dates, char_byte(eol_delimiter) ) @@ -221,8 +217,11 @@ defmodule Explorer.PolarsBackend.DataFrame do end end - def to_ndjson(_, %S3.Entry{}) do - raise "S3 is not supported yet" + @impl true + def to_ndjson(%DataFrame{data: df}, %S3.Entry{} = entry) do + with {:ok, _} <- Native.df_to_ndjson_cloud(df, entry) do + :ok + end end @impl true @@ -280,12 +279,19 @@ defmodule Explorer.PolarsBackend.DataFrame do @impl true def to_parquet( - _df, - %S3.Entry{}, - _compression, + %DataFrame{data: df}, + %S3.Entry{} = entry, + {compression, compression_level}, _streaming ) do - raise "S3 is not supported yet" + case Native.df_to_parquet_cloud( + df, + entry, + parquet_compression(compression, compression_level) + ) do + {:ok, _} -> :ok + {:error, error} -> {:error, error} + end end @impl true @@ -326,20 +332,23 @@ defmodule Explorer.PolarsBackend.DataFrame do @impl true def to_ipc(%DataFrame{data: df}, %Local.Entry{} = entry, {compression, _level}, _streaming) do - case Native.df_to_ipc(df, entry.path, Atom.to_string(compression)) do + case Native.df_to_ipc(df, entry.path, maybe_atom_to_string(compression)) do {:ok, _} -> :ok {:error, error} -> {:error, error} end end @impl true - def to_ipc(_df, %S3.Entry{}, _, _) do - raise "S3 is not supported yet" + def to_ipc(%DataFrame{data: df}, %S3.Entry{} = entry, {compression, _level}, _streaming) do + case Native.df_to_ipc_cloud(df, entry, maybe_atom_to_string(compression)) do + {:ok, _} -> :ok + {:error, error} -> {:error, error} + end end @impl true def dump_ipc(%DataFrame{data: df}, {compression, _level}) do - Native.df_dump_ipc(df, Atom.to_string(compression)) + Native.df_dump_ipc(df, maybe_atom_to_string(compression)) end @impl true @@ -369,20 +378,23 @@ defmodule Explorer.PolarsBackend.DataFrame do @impl true def to_ipc_stream(%DataFrame{data: df}, %Local.Entry{} = entry, {compression, _level}) do - case Native.df_to_ipc_stream(df, entry.path, Atom.to_string(compression)) do + case Native.df_to_ipc_stream(df, entry.path, maybe_atom_to_string(compression)) do {:ok, _} -> :ok {:error, error} -> {:error, error} end end @impl true - def to_ipc_stream(_df, %S3.Entry{}, _compression) do - raise "S3 is not supported yet" + def to_ipc_stream(%DataFrame{data: df}, %S3.Entry{} = entry, {compression, _level}) do + case Native.df_to_ipc_stream_cloud(df, entry, maybe_atom_to_string(compression)) do + {:ok, _} -> :ok + {:error, error} -> {:error, error} + end end @impl true def dump_ipc_stream(%DataFrame{data: df}, {compression, _level}) do - Native.df_dump_ipc_stream(df, Atom.to_string(compression)) + Native.df_dump_ipc_stream(df, maybe_atom_to_string(compression)) end @impl true @@ -395,6 +407,9 @@ defmodule Explorer.PolarsBackend.DataFrame do end end + defp maybe_atom_to_string(nil), do: nil + defp maybe_atom_to_string(atom) when is_atom(atom), do: Atom.to_string(atom) + # Conversion @impl true diff --git a/lib/explorer/polars_backend/expression.ex b/lib/explorer/polars_backend/expression.ex index 6f551f835..caa15094b 100644 --- a/lib/explorer/polars_backend/expression.ex +++ b/lib/explorer/polars_backend/expression.ex @@ -109,6 +109,7 @@ defmodule Explorer.PolarsBackend.Expression do cumulative_product: 2, window_max: 5, window_mean: 5, + window_median: 5, window_min: 5, window_sum: 5, window_standard_deviation: 5, @@ -124,7 +125,8 @@ defmodule Explorer.PolarsBackend.Expression do trim_leading: 2, trim_trailing: 2, downcase: 1, - upcase: 1 + upcase: 1, + substring: 3 ] @custom_expressions [ diff --git a/lib/explorer/polars_backend/lazy_frame.ex b/lib/explorer/polars_backend/lazy_frame.ex index d52ae1680..19846cfc6 100644 --- a/lib/explorer/polars_backend/lazy_frame.ex +++ b/lib/explorer/polars_backend/lazy_frame.ex @@ -94,7 +94,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do %Local.Entry{} = entry, dtypes, <>, - null_character, + nil_values, skip_rows, header?, encoding, @@ -131,7 +131,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do true, dtypes, encoding, - null_character, + nil_values, parse_dates, char_byte(eol_delimiter) ) @@ -206,7 +206,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do contents, dtypes, delimiter, - null_character, + nil_values, skip_rows, header?, encoding, @@ -220,7 +220,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do contents, dtypes, delimiter, - null_character, + nil_values, skip_rows, header?, encoding, @@ -281,8 +281,15 @@ defmodule Explorer.PolarsBackend.LazyFrame do end @impl true - def to_parquet(_df, %S3.Entry{}, _compression, _streaming) do - raise "S3 is not supported yet" + def to_parquet(_df, %S3.Entry{}, _compression, _streaming = true) do + {:error, ArgumentError.exception("streaming is not supported for writes to AWS S3")} + end + + @impl true + def to_parquet(%DF{} = ldf, %S3.Entry{} = entry, compression, _streaming = false) do + eager_df = collect(ldf) + + Eager.to_parquet(eager_df, entry, compression, false) end @impl true @@ -294,8 +301,15 @@ defmodule Explorer.PolarsBackend.LazyFrame do end @impl true - def to_ipc(_df, %S3.Entry{}, _compression, _streaming) do - raise "S3 is not supported yet" + def to_ipc(_df, %S3.Entry{}, _compression, _streaming = true) do + {:error, ArgumentError.exception("streaming is not supported for writes to AWS S3")} + end + + @impl true + def to_ipc(%DF{} = ldf, %S3.Entry{} = entry, compression, _streaming = false) do + eager_df = collect(ldf) + + Eager.to_ipc(eager_df, entry, compression, false) end @impl true diff --git a/lib/explorer/polars_backend/native.ex b/lib/explorer/polars_backend/native.ex index 77591774b..6c1dad3f7 100644 --- a/lib/explorer/polars_backend/native.ex +++ b/lib/explorer/polars_backend/native.ex @@ -59,7 +59,7 @@ defmodule Explorer.PolarsBackend.Native do _columns, _dtypes, _encoding, - _null_char, + _nil_vals, _parse_dates, _eol_delimiter ), @@ -95,7 +95,7 @@ defmodule Explorer.PolarsBackend.Native do _columns, _dtypes, _encoding, - _null_char, + _nil_vals, _parse_dates, _eol_delimiter ), @@ -126,13 +126,17 @@ defmodule Explorer.PolarsBackend.Native do def df_summarise_with_exprs(_df, _groups_exprs, _aggs_pairs), do: err() def df_tail(_df, _length, _groups), do: err() def df_to_csv(_df, _filename, _has_headers, _delimiter), do: err() - def df_to_csv_writer_sample(_df, _path, _has_headers, _delimiter), do: err() + def df_to_csv_cloud(_df, _ex_entry, _has_headers, _delimiter), do: err() def df_to_dummies(_df, _columns), do: err() def df_to_ipc(_df, _filename, _compression), do: err() + def df_to_ipc_cloud(_df, _ex_entry, _compression), do: err() def df_to_ipc_stream(_df, _filename, _compression), do: err() + def df_to_ipc_stream_cloud(_df, _ex_entry, _compression), do: err() def df_to_lazy(_df), do: err() def df_to_ndjson(_df, _filename), do: err() + def df_to_ndjson_cloud(_df, _ex_entry), do: err() def df_to_parquet(_df, _filename, _compression), do: err() + def df_to_parquet_cloud(_df, _ex_entry, _compression), do: err() def df_width(_df), do: err() def df_describe(_df, _percentiles), do: err() def df_nil_count(_df), do: err() @@ -191,7 +195,7 @@ defmodule Explorer.PolarsBackend.Native do _rechunk, _dtypes, _encoding, - _null_char, + _nil_vals, _parse_dates, _eol_delimiter ), @@ -332,6 +336,7 @@ defmodule Explorer.PolarsBackend.Native do def s_unordered_distinct(_s), do: err() def s_frequencies(_s), do: err() def s_cut(_s, _bins, _labels, _break_point_label, _category_label), do: err() + def s_substring(_s, _offset, _length), do: err() def s_qcut(_s, _quantiles, _labels, _break_point_label, _category_label), do: err() @@ -339,6 +344,7 @@ defmodule Explorer.PolarsBackend.Native do def s_variance(_s), do: err() def s_window_max(_s, _window_size, _weight, _ignore_null, _min_periods), do: err() def s_window_mean(_s, _window_size, _weight, _ignore_null, _min_periods), do: err() + def s_window_median(_s, _window_size, _weight, _ignore_null, _min_periods), do: err() def s_window_min(_s, _window_size, _weight, _ignore_null, _min_periods), do: err() def s_window_sum(_s, _window_size, _weight, _ignore_null, _min_periods), do: err() diff --git a/lib/explorer/polars_backend/series.ex b/lib/explorer/polars_backend/series.ex index b0b3e385b..de5b48cda 100644 --- a/lib/explorer/polars_backend/series.ex +++ b/lib/explorer/polars_backend/series.ex @@ -520,6 +520,11 @@ defmodule Explorer.PolarsBackend.Series do window_function(:s_window_mean, series, window_size, weights, min_periods, center) end + @impl true + def window_median(series, window_size, weights, min_periods, center) do + window_function(:s_window_median, series, window_size, weights, min_periods, center) + end + @impl true def window_min(series, window_size, weights, min_periods, center) do window_function(:s_window_min, series, window_size, weights, min_periods, center) @@ -626,6 +631,10 @@ defmodule Explorer.PolarsBackend.Series do def trim_trailing(series, str), do: Shared.apply_series(series, :s_trim_trailing, [str]) + @impl true + def substring(series, offset, length), + do: Shared.apply_series(series, :s_substring, [offset, length]) + # Float round @impl true def round(series, decimals), diff --git a/lib/explorer/series.ex b/lib/explorer/series.ex index 3da8d8e5e..77abb063b 100644 --- a/lib/explorer/series.ex +++ b/lib/explorer/series.ex @@ -3850,6 +3850,46 @@ defmodule Explorer.Series do def window_mean(series, window_size, opts \\ []), do: apply_series(series, :window_mean, [window_size | window_args(opts)]) + @doc """ + Calculate the rolling median, given a window size and optional list of weights. + + ## Options + + * `:weights` - An optional list of weights with the same length as the window + that will be multiplied elementwise with the values in the window. Defaults to `nil`. + + * `:min_periods` - The number of values in the window that should be non-nil + before computing a result. If `nil`, it will be set equal to window size. Defaults to `1`. + + * `:center` - Set the labels at the center of the window. Defaults to `false`. + + ## Examples + + iex> s = 1..10 |> Enum.to_list() |> Explorer.Series.from_list() + iex> Explorer.Series.window_median(s, 4) + #Explorer.Series< + Polars[10] + float [1.0, 1.5, 2.0, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5] + > + + iex> s = 1..10 |> Enum.to_list() |> Explorer.Series.from_list() + iex> Explorer.Series.window_median(s, 2, weights: [0.25, 0.75]) + #Explorer.Series< + Polars[10] + float [2.0, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5] + > + + iex> s = 1..10 |> Enum.to_list() |> Explorer.Series.from_list() + iex> Explorer.Series.window_median(s, 2, weights: [0.25, 0.75], min_periods: nil) + #Explorer.Series< + Polars[10] + float [nil, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5] + > + """ + @doc type: :window + def window_median(series, window_size, opts \\ []), + do: apply_series(series, :window_median, [window_size | window_args(opts)]) + @doc """ Calculate the rolling min, given a window size and optional list of weights. @@ -4394,6 +4434,69 @@ defmodule Explorer.Series do def trim_trailing(%Series{dtype: dtype}, _string), do: dtype_error("trim_trailing/2", dtype, [:string]) + @doc """ + Returns a string sliced from the offset to the end of the string, supporting + negative indexing + + ## Examples + + iex> s = Explorer.Series.from_list(["earth", "mars", "neptune"]) + iex> Explorer.Series.substring(s, -3) + #Explorer.Series< + Polars[3] + string ["rth", "ars", "une"] + > + + iex> s = Explorer.Series.from_list(["earth", "mars", "neptune"]) + iex> Explorer.Series.substring(s, 1) + #Explorer.Series< + Polars[3] + string ["arth", "ars", "eptune"] + > + """ + @doc type: :string_wise + @spec substring(Series.t(), integer()) :: Series.t() + def substring(%Series{dtype: :string} = series, offset) when is_integer(offset), + do: apply_series(series, :substring, [offset, nil]) + + @doc """ + Returns a string sliced from the offset to the length provided, supporting + negative indexing + + ## Examples + + iex> s = Explorer.Series.from_list(["earth", "mars", "neptune"]) + iex> Explorer.Series.substring(s, -3, 2) + #Explorer.Series< + Polars[3] + string ["rt", "ar", "un"] + > + + iex> s = Explorer.Series.from_list(["earth", "mars", "neptune"]) + iex> Explorer.Series.substring(s, 1, 5) + #Explorer.Series< + Polars[3] + string ["arth", "ars", "eptun"] + > + + iex> d = Explorer.Series.from_list(["こんにちは世界", "مرحبًا", "안녕하세요"]) + iex> Explorer.Series.substring(d, 1, 3) + #Explorer.Series< + Polars[3] + string ["んにち", "رحب", "녕하세"] + > + """ + @doc type: :string_wise + @spec substring(Series.t(), integer(), integer()) :: Series.t() + def substring(%Series{dtype: :string} = series, offset, length) + when is_integer(offset) + when is_integer(length) + when length >= 0, + do: apply_series(series, :substring, [offset, length]) + + def substring(%Series{dtype: dtype}, _offset, _length), + do: dtype_error("substring/3", dtype, [:string]) + # Float @doc """ diff --git a/native/explorer/Cargo.toml b/native/explorer/Cargo.toml index 72bfda619..acb6684e7 100644 --- a/native/explorer/Cargo.toml +++ b/native/explorer/Cargo.toml @@ -19,9 +19,11 @@ rand_pcg = "0.3" rustler = { version = "0.29", default-features = false, features = ["derive"] } thiserror = "1" smartstring = "1" -tokio = { version = "1.29.1", features = ["rt"] } -tokio-util = { version = "0.7.8", features = ["io", "io-util"] } -object_store = "0.6.1" + +# Deps necessary for cloud features. +tokio = { version = "1.29", default-features = false, features = ["rt"], optional = true } +tokio-util = { version = "0.7", default-features = false, features = ["io", "io-util"], optional = true } +object_store = { version = "0.6", default-features = false, optional = true } # MiMalloc won´t compile on Windows with the GCC compiler. # On Linux with Musl it won´t load correctly. @@ -79,7 +81,7 @@ version = "0.31" [features] default = ["ndjson", "cloud", "nif_version_2_15"] -cloud = ["aws"] +cloud = ["object_store", "tokio", "tokio-util", "aws"] ndjson = ["polars/json"] aws = ["polars/async", "polars/aws"] diff --git a/native/explorer/src/cloud_writer.rs b/native/explorer/src/cloud_writer.rs index 2c5894af6..c130fbeb2 100644 --- a/native/explorer/src/cloud_writer.rs +++ b/native/explorer/src/cloud_writer.rs @@ -29,6 +29,8 @@ impl CloudWriter { /// which bridges the sync writing process with the async ObjectStore multipart uploading. pub fn new(object_store: Box, path: Path) -> Self { let runtime = tokio::runtime::Builder::new_current_thread() + .enable_time() + .enable_io() .build() .unwrap(); let (multipart_id, writer) = diff --git a/native/explorer/src/dataframe/io.rs b/native/explorer/src/dataframe/io.rs index ea4507ff7..389648297 100644 --- a/native/explorer/src/dataframe/io.rs +++ b/native/explorer/src/dataframe/io.rs @@ -1,8 +1,8 @@ // This file contains the IO functions related to a dataframe. // Each format has 8 functions related. They do the following: // -// - dump: dump a dataframe to a string using the given format (like in a CSV string). -// - load: load a dataframe from a given string (let's say, from a CSV string). +// - dump: dump a dataframe to a binary/string using the given format (like in a CSV string). +// - load: load a dataframe from a given binary/string (let's say, from a CSV string). // - from: reads a dataframe from a file that is encoded in a given format. // - to: writes a dataframe to a file in a given format. // @@ -18,10 +18,11 @@ use std::result::Result; use std::sync::Arc; use crate::dataframe::normalize_numeric_dtypes; -use crate::datatypes::ExParquetCompression; +use crate::datatypes::{ExParquetCompression, ExS3Entry}; use crate::{ExDataFrame, ExplorerError}; + // Note that we have two types of "Compression" for IPC: this one and IpcCompresion. -use polars::export::arrow::io::ipc::write::Compression; +use polars::export::arrow::io::ipc::write::Compression as IpcStreamCompression; fn finish_reader(reader: impl SerReader) -> Result where @@ -48,9 +49,9 @@ pub fn df_from_csv( delimiter_as_byte: u8, do_rechunk: bool, column_names: Option>, - dtypes: Option>, + dtypes: Vec<(&str, &str)>, encoding: &str, - null_char: String, + null_vals: Vec, parse_dates: bool, eol_delimiter: Option, ) -> Result { @@ -59,12 +60,6 @@ pub fn df_from_csv( _ => CsvEncoding::Utf8, }; - let schema = match dtypes { - Some(dtypes) => Some(schema_from_dtypes_pairs(dtypes)?), - - None => None, - }; - let reader = CsvReader::from_path(filename)? .infer_schema(infer_schema_length) .has_header(has_header) @@ -76,8 +71,8 @@ pub fn df_from_csv( .with_rechunk(do_rechunk) .with_encoding(encoding) .with_columns(column_names) - .with_dtypes(schema) - .with_null_values(Some(NullValues::AllColumns(vec![null_char]))) + .with_dtypes(Some(schema_from_dtypes_pairs(dtypes)?)) + .with_null_values(Some(NullValues::AllColumns(null_vals))) .with_end_of_line_char(eol_delimiter.unwrap_or(b'\n')); finish_reader(reader) @@ -124,6 +119,23 @@ pub fn df_to_csv( Ok(()) } +#[cfg(feature = "aws")] +#[rustler::nif(schedule = "DirtyIo")] +pub fn df_to_csv_cloud( + data: ExDataFrame, + ex_entry: ExS3Entry, + has_headers: bool, + delimiter: u8, +) -> Result<(), ExplorerError> { + let mut cloud_writer = build_aws_s3_cloud_writer(ex_entry)?; + + CsvWriter::new(&mut cloud_writer) + .has_header(has_headers) + .with_delimiter(delimiter) + .finish(&mut data.clone())?; + Ok(()) +} + #[rustler::nif(schedule = "DirtyCpu")] pub fn df_dump_csv( env: Env, @@ -156,9 +168,9 @@ pub fn df_load_csv( delimiter_as_byte: u8, do_rechunk: bool, column_names: Option>, - dtypes: Option>, + dtypes: Vec<(&str, &str)>, encoding: &str, - null_char: String, + null_vals: Vec, parse_dates: bool, eol_delimiter: Option, ) -> Result { @@ -167,12 +179,6 @@ pub fn df_load_csv( _ => CsvEncoding::Utf8, }; - let schema = match dtypes { - Some(dtypes) => Some(schema_from_dtypes_pairs(dtypes)?), - - None => None, - }; - let cursor = Cursor::new(binary.as_slice()); let reader = CsvReader::new(cursor) @@ -186,8 +192,8 @@ pub fn df_load_csv( .with_rechunk(do_rechunk) .with_encoding(encoding) .with_columns(column_names) - .with_dtypes(schema) - .with_null_values(Some(NullValues::AllColumns(vec![null_char]))) + .with_dtypes(Some(schema_from_dtypes_pairs(dtypes)?)) + .with_null_values(Some(NullValues::AllColumns(null_vals))) .with_end_of_line_char(eol_delimiter.unwrap_or(b'\n')); finish_reader(reader) @@ -230,6 +236,56 @@ pub fn df_to_parquet( Ok(()) } +#[cfg(feature = "aws")] +#[rustler::nif(schedule = "DirtyIo")] +pub fn df_to_parquet_cloud( + data: ExDataFrame, + ex_entry: ExS3Entry, + ex_compression: ExParquetCompression, +) -> Result<(), ExplorerError> { + let mut cloud_writer = build_aws_s3_cloud_writer(ex_entry)?; + + let compression = ParquetCompression::try_from(ex_compression)?; + + ParquetWriter::new(&mut cloud_writer) + .with_compression(compression) + .finish(&mut data.clone())?; + Ok(()) +} +fn object_store_to_explorer_error(error: impl std::fmt::Debug) -> ExplorerError { + ExplorerError::Other(format!("Internal ObjectStore error: #{error:?}")) +} + +#[cfg(feature = "aws")] +fn build_aws_s3_cloud_writer( + ex_entry: ExS3Entry, +) -> Result { + let config = ex_entry.config; + let mut aws_builder = object_store::aws::AmazonS3Builder::new() + .with_region(config.region) + .with_bucket_name(ex_entry.bucket) + .with_access_key_id(config.access_key_id) + .with_secret_access_key(config.secret_access_key); + + if let Some(endpoint) = config.endpoint { + aws_builder = aws_builder.with_allow_http(true).with_endpoint(endpoint); + } + + if let Some(token) = config.token { + aws_builder = aws_builder.with_token(token); + } + + let aws_s3 = aws_builder + .build() + .map_err(object_store_to_explorer_error)?; + + let object_store: Box = Box::new(aws_s3); + Ok(crate::cloud_writer::CloudWriter::new( + object_store, + ex_entry.key.into(), + )) +} + #[rustler::nif(schedule = "DirtyCpu")] pub fn df_dump_parquet( env: Env, @@ -281,11 +337,9 @@ pub fn df_to_ipc( filename: &str, compression: Option<&str>, ) -> Result<(), ExplorerError> { - // Select the compression algorithm. let compression = match compression { - Some("lz4") => Some(IpcCompression::LZ4), - Some("zstd") => Some(IpcCompression::ZSTD), - _ => None, + Some(algo) => Some(decode_ipc_compression(algo)?), + None => None, }; let file = File::create(filename)?; @@ -296,6 +350,26 @@ pub fn df_to_ipc( Ok(()) } +#[cfg(feature = "aws")] +#[rustler::nif(schedule = "DirtyIo")] +pub fn df_to_ipc_cloud( + data: ExDataFrame, + ex_entry: ExS3Entry, + compression: Option<&str>, +) -> Result<(), ExplorerError> { + let compression = match compression { + Some(algo) => Some(decode_ipc_compression(algo)?), + None => None, + }; + + let mut cloud_writer = build_aws_s3_cloud_writer(ex_entry)?; + + IpcWriter::new(&mut cloud_writer) + .with_compression(compression) + .finish(&mut data.clone())?; + Ok(()) +} + #[rustler::nif(schedule = "DirtyCpu")] pub fn df_dump_ipc<'a>( env: Env<'a>, @@ -304,11 +378,9 @@ pub fn df_dump_ipc<'a>( ) -> Result, ExplorerError> { let mut buf = vec![]; - // Select the compression algorithm. let compression = match compression { - Some("lz4") => Some(IpcCompression::LZ4), - Some("zstd") => Some(IpcCompression::ZSTD), - _ => None, + Some(algo) => Some(decode_ipc_compression(algo)?), + None => None, }; IpcWriter::new(&mut buf) @@ -335,6 +407,16 @@ pub fn df_load_ipc( finish_reader(reader) } +fn decode_ipc_compression(compression: &str) -> Result { + match compression { + "lz4" => Ok(IpcCompression::LZ4), + "zstd" => Ok(IpcCompression::ZSTD), + other => Err(ExplorerError::Other(format!( + "the algorithm {other} is not supported for IPC compression" + ))), + } +} + // ============ IPC Streaming ============ // #[rustler::nif(schedule = "DirtyIo")] @@ -358,20 +440,38 @@ pub fn df_to_ipc_stream( filename: &str, compression: Option<&str>, ) -> Result<(), ExplorerError> { - // Select the compression algorithm. let compression = match compression { - Some("lz4") => Some(Compression::LZ4), - Some("zstd") => Some(Compression::ZSTD), - _ => None, + Some(algo) => Some(decode_ipc_stream_compression(algo)?), + None => None, }; - let mut file = File::create(filename).expect("could not create file"); + let mut file = File::create(filename)?; IpcStreamWriter::new(&mut file) .with_compression(compression) .finish(&mut data.clone())?; Ok(()) } +#[cfg(feature = "aws")] +#[rustler::nif(schedule = "DirtyIo")] +pub fn df_to_ipc_stream_cloud( + data: ExDataFrame, + ex_entry: ExS3Entry, + compression: Option<&str>, +) -> Result<(), ExplorerError> { + let compression = match compression { + Some(algo) => Some(decode_ipc_stream_compression(algo)?), + None => None, + }; + + let mut cloud_writer = build_aws_s3_cloud_writer(ex_entry)?; + + IpcStreamWriter::new(&mut cloud_writer) + .with_compression(compression) + .finish(&mut data.clone())?; + Ok(()) +} + #[rustler::nif(schedule = "DirtyCpu")] pub fn df_dump_ipc_stream<'a>( env: Env<'a>, @@ -380,11 +480,9 @@ pub fn df_dump_ipc_stream<'a>( ) -> Result, ExplorerError> { let mut buf = vec![]; - // Select the compression algorithm. let compression = match compression { - Some("lz4") => Some(Compression::LZ4), - Some("zstd") => Some(Compression::ZSTD), - _ => None, + Some(algo) => Some(decode_ipc_stream_compression(algo)?), + None => None, }; IpcStreamWriter::new(&mut buf) @@ -411,6 +509,16 @@ pub fn df_load_ipc_stream( finish_reader(reader) } +fn decode_ipc_stream_compression(compression: &str) -> Result { + match compression { + "lz4" => Ok(IpcStreamCompression::LZ4), + "zstd" => Ok(IpcStreamCompression::ZSTD), + other => Err(ExplorerError::Other(format!( + "the algorithm {other} is not supported for IPC stream compression" + ))), + } +} + // ============ NDJSON ============ // #[cfg(feature = "ndjson")] @@ -442,6 +550,17 @@ pub fn df_to_ndjson(data: ExDataFrame, filename: &str) -> Result<(), ExplorerErr Ok(()) } +#[cfg(all(feature = "ndjson", feature = "aws"))] +#[rustler::nif(schedule = "DirtyIo")] +pub fn df_to_ndjson_cloud(data: ExDataFrame, ex_entry: ExS3Entry) -> Result<(), ExplorerError> { + let mut cloud_writer = build_aws_s3_cloud_writer(ex_entry)?; + + JsonWriter::new(&mut cloud_writer) + .with_json_format(JsonFormat::JsonLines) + .finish(&mut data.clone())?; + Ok(()) +} + #[cfg(feature = "ndjson")] #[rustler::nif(schedule = "DirtyCpu")] pub fn df_dump_ndjson(env: Env, data: ExDataFrame) -> Result { @@ -483,7 +602,9 @@ pub fn df_from_ndjson( _batch_size: usize, ) -> Result { Err(ExplorerError::Other(format!( - "NDJSON parsing is not enabled for this machine" + "Explorer was compiled without the \"ndjson\" feature enabled. \ + This is mostly due to this feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation" ))) } @@ -491,7 +612,9 @@ pub fn df_from_ndjson( #[rustler::nif] pub fn df_to_ndjson(_data: ExDataFrame, _filename: &str) -> Result<(), ExplorerError> { Err(ExplorerError::Other(format!( - "NDJSON writing is not enabled for this machine" + "Explorer was compiled without the \"ndjson\" feature enabled. \ + This is mostly due to this feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation" ))) } @@ -499,7 +622,9 @@ pub fn df_to_ndjson(_data: ExDataFrame, _filename: &str) -> Result<(), ExplorerE #[rustler::nif] pub fn df_dump_ndjson(_data: ExDataFrame) -> Result, ExplorerError> { Err(ExplorerError::Other(format!( - "NDJSON dumping is not enabled for this machine" + "Explorer was compiled without the \"ndjson\" feature enabled. \ + This is mostly due to this feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation" ))) } @@ -511,26 +636,75 @@ pub fn df_load_ndjson( _batch_size: usize, ) -> Result { Err(ExplorerError::Other(format!( - "NDJSON loading is not enabled for this machine" + "Explorer was compiled without the \"ndjson\" feature enabled. \ + This is mostly due to this feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation" ))) } -#[rustler::nif(schedule = "DirtyIo")] -pub fn df_to_csv_writer_sample( +#[cfg(not(feature = "aws"))] +#[rustler::nif] +pub fn df_to_parquet_cloud( + _data: ExDataFrame, + _ex_entry: ExS3Entry, + _ex_compression: ExParquetCompression, +) -> Result<(), ExplorerError> { + Err(ExplorerError::Other(format!( + "Explorer was compiled without the \"aws\" feature enabled. \ + This is mostly due to this feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation" + ))) +} + +#[cfg(not(feature = "aws"))] +#[rustler::nif] +pub fn df_to_csv_cloud( data: ExDataFrame, - path: &str, + ex_entry: ExS3Entry, has_headers: bool, delimiter: u8, ) -> Result<(), ExplorerError> { - use object_store::ObjectStore; + Err(ExplorerError::Other(format!( + "Explorer was compiled without the \"aws\" feature enabled. \ + This is mostly due to this feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation" + ))) +} - // Hard-coded local file system object store for now: - let object_store: Box = Box::new(object_store::local::LocalFileSystem::new()); - let mut cloud_writer = crate::cloud_writer::CloudWriter::new(object_store, path.into()); +#[cfg(not(feature = "aws"))] +#[rustler::nif] +pub fn df_to_ipc_cloud( + _data: ExDataFrame, + _ex_entry: ExS3Entry, + _compression: Option<&str>, +) -> Result<(), ExplorerError> { + Err(ExplorerError::Other(format!( + "Explorer was compiled without the \"aws\" feature enabled. \ + This is mostly due to this feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation" + ))) +} - CsvWriter::new(&mut cloud_writer) - .has_header(has_headers) - .with_delimiter(delimiter) - .finish(&mut data.clone())?; - Ok(()) +#[cfg(not(feature = "aws"))] +#[rustler::nif] +pub fn df_to_ipc_stream_cloud( + _data: ExDataFrame, + _ex_entry: ExS3Entry, + _compression: Option<&str>, +) -> Result<(), ExplorerError> { + Err(ExplorerError::Other(format!( + "Explorer was compiled without the \"aws\" feature enabled. \ + This is mostly due to this feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation" + ))) +} + +#[cfg(not(any(feature = "ndjson", feature = "aws")))] +#[rustler::nif(schedule = "DirtyIo")] +pub fn df_to_ndjson_cloud(data: ExDataFrame, ex_entry: ExS3Entry) -> Result<(), ExplorerError> { + Err(ExplorerError::Other(format!( + "Explorer was compiled without the \"aws\" and \"ndjson\" features enabled. \ + This is mostly due to these feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation" + ))) } diff --git a/native/explorer/src/expressions.rs b/native/explorer/src/expressions.rs index d2f3f8d2e..a184fd4a9 100644 --- a/native/explorer/src/expressions.rs +++ b/native/explorer/src/expressions.rs @@ -607,6 +607,7 @@ init_window_expr_fun!(expr_window_max, rolling_max); init_window_expr_fun!(expr_window_min, rolling_min); init_window_expr_fun!(expr_window_sum, rolling_sum); init_window_expr_fun!(expr_window_mean, rolling_mean); +init_window_expr_fun!(expr_window_median, rolling_median); #[rustler::nif(schedule = "DirtyCpu")] pub fn expr_window_standard_deviation( @@ -764,6 +765,12 @@ pub fn expr_trim_trailing(expr: ExExpr, string: Option) -> ExExpr { ExExpr::new(expr.str().rstrip(string)) } +#[rustler::nif] +pub fn expr_substring(expr: ExExpr, offset: i64, length: Option) -> ExExpr { + let expr = expr.clone_inner(); + ExExpr::new(expr.str().str_slice(offset, length)) +} + #[rustler::nif] pub fn expr_round(expr: ExExpr, decimals: u32) -> ExExpr { let expr = expr.clone_inner(); diff --git a/native/explorer/src/lazyframe/io.rs b/native/explorer/src/lazyframe/io.rs index 328ef9eba..e10c0647f 100644 --- a/native/explorer/src/lazyframe/io.rs +++ b/native/explorer/src/lazyframe/io.rs @@ -63,7 +63,9 @@ pub fn lf_from_parquet_cloud( _columns: Option>, ) -> Result { Err(ExplorerError::Other(format!( - "AWS reads and writes are not enabled for this machine" + "Explorer was compiled without the \"aws\" feature enabled. \ + This is mostly due to this feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation" ))) } @@ -104,7 +106,7 @@ pub fn lf_to_parquet( } } -#[rustler::nif] +#[rustler::nif(schedule = "DirtyIo")] pub fn lf_from_ipc(filename: &str) -> Result { let lf = LazyFrame::scan_ipc(filename, Default::default())?; @@ -156,9 +158,9 @@ pub fn lf_from_csv( skip_rows: usize, delimiter_as_byte: u8, do_rechunk: bool, - dtypes: Option>, + dtypes: Vec<(&str, &str)>, encoding: &str, - null_char: String, + null_vals: Vec, parse_dates: bool, eol_delimiter: Option, ) -> Result { @@ -167,12 +169,6 @@ pub fn lf_from_csv( _ => CsvEncoding::Utf8, }; - let schema = match dtypes { - Some(dtypes) => Some(schema_from_dtypes_pairs(dtypes)?), - - None => None, - }; - let df = LazyCsvReader::new(filename) .with_infer_schema_length(infer_schema_length) .has_header(has_header) @@ -182,15 +178,15 @@ pub fn lf_from_csv( .with_skip_rows(skip_rows) .with_rechunk(do_rechunk) .with_encoding(encoding) - .with_dtype_overwrite(schema.as_deref()) - .with_null_values(Some(NullValues::AllColumns(vec![null_char]))) + .with_dtype_overwrite(Some(schema_from_dtypes_pairs(dtypes)?.as_ref())) + .with_null_values(Some(NullValues::AllColumns(null_vals))) .with_end_of_line_char(eol_delimiter.unwrap_or(b'\n')) .finish()?; Ok(ExLazyFrame::new(df)) } -#[cfg(not(any(target_arch = "arm", target_arch = "riscv64")))] +#[cfg(feature = "ndjson")] #[rustler::nif] pub fn lf_from_ndjson( filename: String, @@ -205,7 +201,7 @@ pub fn lf_from_ndjson( Ok(ExLazyFrame::new(lf)) } -#[cfg(any(target_arch = "arm", target_arch = "riscv64"))] +#[cfg(not(feature = "ndjson"))] #[rustler::nif] pub fn lf_from_ndjson( _filename: &str, @@ -213,6 +209,8 @@ pub fn lf_from_ndjson( _batch_size: usize, ) -> Result { Err(ExplorerError::Other(format!( - "NDJSON parsing is not enabled for this machine" + "Explorer was compiled without the \"ndjson\" feature enabled. \ + This is mostly due to this feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation" ))) } diff --git a/native/explorer/src/lib.rs b/native/explorer/src/lib.rs index 9dd637b01..e261731f4 100644 --- a/native/explorer/src/lib.rs +++ b/native/explorer/src/lib.rs @@ -14,7 +14,9 @@ use rustler::{Env, Term}; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; +#[cfg(feature = "cloud")] mod cloud_writer; + mod dataframe; mod datatypes; mod encoding; @@ -119,13 +121,17 @@ rustler::init!( df_summarise_with_exprs, df_tail, df_to_csv, - df_to_csv_writer_sample, + df_to_csv_cloud, df_to_dummies, df_to_ipc, + df_to_ipc_cloud, df_to_ipc_stream, + df_to_ipc_stream_cloud, df_to_lazy, df_to_ndjson, + df_to_ndjson_cloud, df_to_parquet, + df_to_parquet_cloud, df_width, // expressions expr_atom, @@ -231,6 +237,7 @@ rustler::init!( expr_cumulative_product, expr_window_max, expr_window_mean, + expr_window_median, expr_window_min, expr_window_sum, expr_window_standard_deviation, @@ -244,6 +251,7 @@ rustler::init!( expr_trim, expr_trim_leading, expr_trim_trailing, + expr_substring, // float round expressions expr_round, expr_floor, @@ -396,6 +404,7 @@ rustler::init!( s_standard_deviation, s_tan, s_trim, + s_substring, s_subtract, s_sum, s_tail, @@ -410,6 +419,7 @@ rustler::init!( s_variance, s_window_max, s_window_mean, + s_window_median, s_window_min, s_window_sum, s_window_standard_deviation, diff --git a/native/explorer/src/series.rs b/native/explorer/src/series.rs index d91d30ac6..0c5697b09 100644 --- a/native/explorer/src/series.rs +++ b/native/explorer/src/series.rs @@ -672,6 +672,19 @@ pub fn s_window_mean( Ok(ExSeries::new(s1)) } +#[rustler::nif(schedule = "DirtyCpu")] +pub fn s_window_median( + series: ExSeries, + window_size: usize, + weights: Option>, + min_periods: Option, + center: bool, +) -> Result { + let opts = rolling_opts(window_size, weights, min_periods, center); + let s1 = series.rolling_median(opts.into())?; + Ok(ExSeries::new(s1)) +} + #[rustler::nif(schedule = "DirtyCpu")] pub fn s_window_max( series: ExSeries, @@ -1379,6 +1392,17 @@ pub fn s_trim_trailing(s1: ExSeries, pattern: Option<&str>) -> Result, +) -> Result { + Ok(ExSeries::new( + s1.utf8()?.str_slice(offset, length)?.into_series(), + )) +} + #[rustler::nif(schedule = "DirtyCpu")] pub fn s_round(s: ExSeries, decimals: u32) -> Result { Ok(ExSeries::new(s.round(decimals)?.into_series())) diff --git a/notebooks/exploring_explorer.livemd b/notebooks/exploring_explorer.livemd index b2704a3b0..ab85236cb 100644 --- a/notebooks/exploring_explorer.livemd +++ b/notebooks/exploring_explorer.livemd @@ -28,7 +28,7 @@ For CSV, your 'usual suspects' of options are available: * `dtypes` - A keyword list of `[column_name: dtype]`. If a type is not specified for a column, it is imputed from the first 1000 rows. (default: `[]`) * `header` - Does the file have a header of column names as the first row or not? (default: `true`) * `max_rows` - Maximum number of lines to read. (default: `nil`) -* `null_character` - The string that should be interpreted as a nil value. (default: `"NA"`) +* `nil_values` - A list of strings that should be interpreted as a nil values. (default: `[]`) * `skip_rows` - The number of lines to skip at the beginning of the file. (default: `0`) * `columns` - A list of column names to keep. If present, only these columns are read into the dataframe. (default: `nil`) diff --git a/test/explorer/data_frame/csv_test.exs b/test/explorer/data_frame/csv_test.exs index 85eb2fc6a..3ba6b06f8 100644 --- a/test/explorer/data_frame/csv_test.exs +++ b/test/explorer/data_frame/csv_test.exs @@ -113,6 +113,7 @@ defmodule Explorer.DataFrame.CSVTest do end test "string" do + assert_csv(:string, nil, nil) assert_csv(:string, "some string", "some string") assert_csv(:string, "éphémère", "éphémère") end @@ -328,7 +329,7 @@ defmodule Explorer.DataFrame.CSVTest do end @tag :tmp_dir - test "null_character", config do + test "nil_values", config do csv = tmp_csv(config.tmp_dir, """ a,b @@ -337,7 +338,7 @@ defmodule Explorer.DataFrame.CSVTest do c,d """) - df = DF.from_csv!(csv, null_character: "n/a") + df = DF.from_csv!(csv, nil_values: ["n/a"]) assert DF.to_columns(df, atom_keys: true) == %{ a: [nil, "nil", "c"], @@ -478,4 +479,46 @@ defmodule Explorer.DataFrame.CSVTest do } end end + + describe "to_csv/3" do + setup do + [df: Explorer.Datasets.wine()] + end + + @tag :tmp_dir + test "can write a CSV to file", %{df: df, tmp_dir: tmp_dir} do + csv_path = Path.join(tmp_dir, "test.csv") + + assert :ok = DF.to_csv(df, csv_path) + assert {:ok, csv_df} = DF.from_csv(csv_path) + + assert DF.names(df) == DF.names(csv_df) + assert DF.dtypes(df) == DF.dtypes(csv_df) + assert DF.to_columns(df) == DF.to_columns(csv_df) + end + end + + describe "to_csv/3 - cloud" do + setup do + [df: Explorer.Datasets.wine()] + end + + @tag :cloud_integration + test "writes a CSV file to S3", %{df: df} do + config = %FSS.S3.Config{ + access_key_id: "test", + secret_access_key: "test", + endpoint: "http://localhost:4566", + region: "us-east-1" + } + + path = "s3://test-bucket/test-writes/wine-#{System.monotonic_time()}.csv" + + assert :ok = DF.to_csv(df, path, config: config) + + # When we have the reader, we can activate this assertion. + # saved_df = DF.from_csv!(path, config: config) + # assert DF.to_columns(saved_df) == DF.to_columns(Explorer.Datasets.wine()) + end + end end diff --git a/test/explorer/data_frame/grouped_test.exs b/test/explorer/data_frame/grouped_test.exs index 98956ce5a..e08657cd7 100644 --- a/test/explorer/data_frame/grouped_test.exs +++ b/test/explorer/data_frame/grouped_test.exs @@ -548,13 +548,15 @@ defmodule Explorer.DataFrame.GroupedTest do [ b: Series.window_max(a, 2, weights: [1.0, 2.0]), c: Series.window_mean(a, 2, weights: [0.25, 0.75]), - d: Series.window_min(a, 2, weights: [1.0, 2.0]), - e: Series.window_sum(a, 2, weights: [1.0, 2.0]), - f: Series.cumulative_max(a), - g: Series.cumulative_min(a), - h: Series.cumulative_sum(a), - i: Series.cumulative_max(a, reverse: true), - j: Series.window_standard_deviation(a, 2) + d: Series.window_median(a, 2, weights: [0.25, 0.75]), + e: Series.window_min(a, 2, weights: [1.0, 2.0]), + f: Series.window_sum(a, 2, weights: [1.0, 2.0]), + g: Series.window_standard_deviation(a, 2), + p: Series.cumulative_max(a), + q: Series.cumulative_min(a), + r: Series.cumulative_sum(a), + s: Series.cumulative_max(a, reverse: true), + t: Series.cumulative_product(a) ] end) @@ -562,13 +564,10 @@ defmodule Explorer.DataFrame.GroupedTest do a: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], b: [1.0, 4.0, 6.0, 8.0, 10.0, 6, 14.0, 16.0, 18.0, 20.0], c: [0.25, 1.75, 2.75, 3.75, 4.75, 1.5, 6.75, 7.75, 8.75, 9.75], - d: [1.0, 1.0, 2.0, 3.0, 4.0, 6.0, 6.0, 7.0, 8.0, 9.0], - e: [1.0, 5.0, 8.0, 11.0, 14.0, 6.0, 20.0, 23.0, 26.0, 29.0], - f: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], - g: [1, 1, 1, 1, 1, 6, 6, 6, 6, 6], - h: [1, 3, 6, 10, 15, 6, 13, 21, 30, 40], - i: [5, 5, 5, 5, 5, 10, 10, 10, 10, 10], - j: [ + d: [2.0, 1.5, 2.5, 3.5, 4.5, 12.0, 6.5, 7.5, 8.5, 9.5], + e: [1.0, 1.0, 2.0, 3.0, 4.0, 6.0, 6.0, 7.0, 8.0, 9.0], + f: [1.0, 5.0, 8.0, 11.0, 14.0, 6.0, 20.0, 23.0, 26.0, 29.0], + g: [ 0.0, 0.7071067811865476, 0.7071067811865476, @@ -580,6 +579,11 @@ defmodule Explorer.DataFrame.GroupedTest do 0.7071067811865476, 0.7071067811865476 ], + p: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + q: [1, 1, 1, 1, 1, 6, 6, 6, 6, 6], + r: [1, 3, 6, 10, 15, 6, 13, 21, 30, 40], + s: [5, 5, 5, 5, 5, 10, 10, 10, 10, 10], + t: [1, 2, 6, 24, 120, 6, 42, 336, 3024, 30240], z: [1, 1, 1, 1, 1, 2, 2, 2, 2, 2] } end diff --git a/test/explorer/data_frame/ipc_stream_test.exs b/test/explorer/data_frame/ipc_stream_test.exs index 6d6769ba7..08e21c67b 100644 --- a/test/explorer/data_frame/ipc_stream_test.exs +++ b/test/explorer/data_frame/ipc_stream_test.exs @@ -102,4 +102,46 @@ defmodule Explorer.DataFrame.IPCStreamTest do assert_ipc_stream(:datetime, "1664624050123456", ~N[2022-10-01 11:34:10.123456]) end end + + describe "to_ipc_stream/3" do + setup do + [df: Explorer.Datasets.wine()] + end + + @tag :tmp_dir + test "can write an IPC stream to file", %{df: df, tmp_dir: tmp_dir} do + ipc_path = Path.join(tmp_dir, "test.ipcstream") + + assert :ok = DF.to_ipc_stream(df, ipc_path) + assert {:ok, ipc_df} = DF.from_ipc_stream(ipc_path) + + assert DF.names(df) == DF.names(ipc_df) + assert DF.dtypes(df) == DF.dtypes(ipc_df) + assert DF.to_columns(df) == DF.to_columns(ipc_df) + end + end + + describe "to_ipc_stream/3 - cloud" do + setup do + s3_config = %FSS.S3.Config{ + access_key_id: "test", + secret_access_key: "test", + endpoint: "http://localhost:4566", + region: "us-east-1" + } + + [df: Explorer.Datasets.wine(), s3_config: s3_config] + end + + @tag :cloud_integration + test "writes an IPC file to S3", %{df: df, s3_config: s3_config} do + path = "s3://test-bucket/test-writes/wine-#{System.monotonic_time()}.ipcstream" + + assert :ok = DF.to_ipc_stream(df, path, config: s3_config) + + # When we have the reader, we can activate this assertion. + # saved_df = DF.from_ipc!(path, config: config) + # assert DF.to_columns(saved_df) == DF.to_columns(Explorer.Datasets.wine()) + end + end end diff --git a/test/explorer/data_frame/ipc_test.exs b/test/explorer/data_frame/ipc_test.exs index b67743012..7e7561a84 100644 --- a/test/explorer/data_frame/ipc_test.exs +++ b/test/explorer/data_frame/ipc_test.exs @@ -102,4 +102,46 @@ defmodule Explorer.DataFrame.IPCTest do assert_ipc(:datetime, "1664624050123456", ~N[2022-10-01 11:34:10.123456]) end end + + describe "to_ipc/3" do + setup do + [df: Explorer.Datasets.wine()] + end + + @tag :tmp_dir + test "can write a CSV to file", %{df: df, tmp_dir: tmp_dir} do + ipc_path = Path.join(tmp_dir, "test.ipc") + + assert :ok = DF.to_ipc(df, ipc_path) + assert {:ok, ipc_df} = DF.from_ipc(ipc_path) + + assert DF.names(df) == DF.names(ipc_df) + assert DF.dtypes(df) == DF.dtypes(ipc_df) + assert DF.to_columns(df) == DF.to_columns(ipc_df) + end + end + + describe "to_ipc/3 - cloud" do + setup do + s3_config = %FSS.S3.Config{ + access_key_id: "test", + secret_access_key: "test", + endpoint: "http://localhost:4566", + region: "us-east-1" + } + + [df: Explorer.Datasets.wine(), s3_config: s3_config] + end + + @tag :cloud_integration + test "writes an IPC file to S3", %{df: df, s3_config: s3_config} do + path = "s3://test-bucket/test-writes/wine-#{System.monotonic_time()}.ipc" + + assert :ok = DF.to_ipc(df, path, config: s3_config) + + # When we have the reader, we can activate this assertion. + # saved_df = DF.from_ipc!(path, config: config) + # assert DF.to_columns(saved_df) == DF.to_columns(Explorer.Datasets.wine()) + end + end end diff --git a/test/explorer/data_frame/lazy_test.exs b/test/explorer/data_frame/lazy_test.exs index ee40d4f1b..21d1ddde2 100644 --- a/test/explorer/data_frame/lazy_test.exs +++ b/test/explorer/data_frame/lazy_test.exs @@ -256,6 +256,37 @@ defmodule Explorer.DataFrame.LazyTest do assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort() end + test "to_ipc/3 - cloud with streaming enabled", %{ldf: ldf} do + config = %FSS.S3.Config{ + access_key_id: "test", + secret_access_key: "test", + endpoint: "http://localhost:4566", + region: "us-east-1" + } + + path = "s3://test-bucket/test-lazy-writes/wine-#{System.monotonic_time()}.ipc" + + ldf = DF.head(ldf, 15) + assert {:error, error} = DF.to_ipc(ldf, path, streaming: true, config: config) + + assert error == ArgumentError.exception("streaming is not supported for writes to AWS S3") + end + + @tag :cloud_integration + test "to_ipc/2 - cloud with streaming disabled", %{ldf: ldf} do + config = %FSS.S3.Config{ + access_key_id: "test", + secret_access_key: "test", + endpoint: "http://localhost:4566", + region: "us-east-1" + } + + path = "s3://test-bucket/test-lazy-writes/wine-#{System.monotonic_time()}.ipc" + + ldf = DF.head(ldf, 15) + assert :ok = DF.to_ipc(ldf, path, streaming: false, config: config) + end + @tag :tmp_dir test "to_parquet/2 - with defaults", %{ldf: ldf, tmp_dir: tmp_dir} do path = Path.join([tmp_dir, "fossil_fuels.parquet"]) @@ -282,6 +313,37 @@ defmodule Explorer.DataFrame.LazyTest do assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort() end + test "to_parquet/2 - cloud with streaming enabled", %{ldf: ldf} do + config = %FSS.S3.Config{ + access_key_id: "test", + secret_access_key: "test", + endpoint: "http://localhost:4566", + region: "us-east-1" + } + + path = "s3://test-bucket/test-lazy-writes/wine-#{System.monotonic_time()}.parquet" + + ldf = DF.head(ldf, 15) + assert {:error, error} = DF.to_parquet(ldf, path, streaming: true, config: config) + + assert error == ArgumentError.exception("streaming is not supported for writes to AWS S3") + end + + @tag :cloud_integration + test "to_parquet/2 - cloud with streaming disabled", %{ldf: ldf} do + config = %FSS.S3.Config{ + access_key_id: "test", + secret_access_key: "test", + endpoint: "http://localhost:4566", + region: "us-east-1" + } + + path = "s3://test-bucket/test-lazy-writes/wine-#{System.monotonic_time()}.parquet" + + ldf = DF.head(ldf, 15) + assert :ok = DF.to_parquet(ldf, path, streaming: false, config: config) + end + @tag :tmp_dir test "from_ndjson/2 - with defaults", %{df: df, tmp_dir: tmp_dir} do path = Path.join([tmp_dir, "fossil_fuels.ndjson"]) diff --git a/test/explorer/data_frame/ndjson_test.exs b/test/explorer/data_frame/ndjson_test.exs index 94aa1936c..0bf6c8e0c 100644 --- a/test/explorer/data_frame/ndjson_test.exs +++ b/test/explorer/data_frame/ndjson_test.exs @@ -209,4 +209,28 @@ defmodule Explorer.DataFrame.NDJSONTest do """ end end + + describe "to_ndjson/3 - cloud" do + setup do + s3_config = %FSS.S3.Config{ + access_key_id: "test", + secret_access_key: "test", + endpoint: "http://localhost:4566", + region: "us-east-1" + } + + [df: Explorer.Datasets.wine(), s3_config: s3_config] + end + + @tag :cloud_integration + test "writes a NDJSON file to S3", %{df: df, s3_config: s3_config} do + path = "s3://test-bucket/test-writes/wine-#{System.monotonic_time()}.ndjson" + + assert :ok = DF.to_ndjson(df, path, config: s3_config) + + # When we have the reader, we can activate this assertion. + # saved_df = DF.from_ipc!(path, config: config) + # assert DF.to_columns(saved_df) == DF.to_columns(Explorer.Datasets.wine()) + end + end end diff --git a/test/explorer/data_frame/parquet_test.exs b/test/explorer/data_frame/parquet_test.exs index 2785c02ff..edc56c4cc 100644 --- a/test/explorer/data_frame/parquet_test.exs +++ b/test/explorer/data_frame/parquet_test.exs @@ -227,4 +227,28 @@ defmodule Explorer.DataFrame.ParquetTest do end end end + + describe "to_parquet/3 - cloud" do + setup do + [df: Explorer.Datasets.wine()] + end + + @tag :cloud_integration + test "writes a parquet file to S3", %{df: df} do + config = %FSS.S3.Config{ + access_key_id: "test", + secret_access_key: "test", + endpoint: "http://localhost:4566", + region: "us-east-1" + } + + path = "s3://test-bucket/test-writes/wine-#{System.monotonic_time()}.parquet" + + assert :ok = DF.to_parquet(df, path, config: config) + + saved_df = DF.from_parquet!(path, config: config) + + assert DF.to_columns(saved_df) == DF.to_columns(Explorer.Datasets.wine()) + end + end end diff --git a/test/explorer/data_frame_test.exs b/test/explorer/data_frame_test.exs index 8b3e9b69f..d512da581 100644 --- a/test/explorer/data_frame_test.exs +++ b/test/explorer/data_frame_test.exs @@ -1151,16 +1151,17 @@ defmodule Explorer.DataFrameTest do DF.mutate(df, b: window_max(a, 2, weights: [1.0, 2.0]), c: window_mean(a, 2, weights: [0.25, 0.75]), - d: window_min(a, 2, weights: [1.0, 2.0]), - e: window_sum(a, 2, weights: [1.0, 2.0]), - f: cumulative_max(a), - g: cumulative_min(a), - h: cumulative_sum(a), - i: cumulative_max(a, reverse: true), - j: ewm_mean(a), - k: cumulative_product(a), - l: abs(a), - m: window_standard_deviation(a, 2) + d: window_median(a, 2, weights: [0.25, 0.75]), + e: window_min(a, 2, weights: [1.0, 2.0]), + f: window_sum(a, 2, weights: [1.0, 2.0]), + g: window_standard_deviation(a, 2), + h: ewm_mean(a), + p: cumulative_max(a), + q: cumulative_min(a), + r: cumulative_sum(a), + s: cumulative_max(a, reverse: true), + t: cumulative_product(a), + z: abs(a) ) assert df1.dtypes == %{ @@ -1169,27 +1170,37 @@ defmodule Explorer.DataFrameTest do "c" => :float, "d" => :float, "e" => :float, - "f" => :integer, - "g" => :integer, - "h" => :integer, - "i" => :integer, - "j" => :float, - "k" => :integer, - "l" => :float, - "m" => :float + "f" => :float, + "g" => :float, + "h" => :float, + "p" => :integer, + "q" => :integer, + "r" => :integer, + "s" => :integer, + "t" => :integer, + "z" => :float } assert DF.to_columns(df1, atom_keys: true) == %{ a: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], b: [1.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 18.0, 20.0], c: [0.25, 1.75, 2.75, 3.75, 4.75, 5.75, 6.75, 7.75, 8.75, 9.75], - d: [1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], - e: [1.0, 5.0, 8.0, 11.0, 14.0, 17.0, 20.0, 23.0, 26.0, 29.0], - f: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], - g: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1], - h: [1, 3, 6, 10, 15, 21, 28, 36, 45, 55], - i: [10, 10, 10, 10, 10, 10, 10, 10, 10, 10], - j: [ + d: [2.0, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5], + e: [1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], + f: [1.0, 5.0, 8.0, 11.0, 14.0, 17.0, 20.0, 23.0, 26.0, 29.0], + g: [ + 0.0, + 0.7071067811865476, + 0.7071067811865476, + 0.7071067811865476, + 0.7071067811865476, + 0.7071067811865476, + 0.7071067811865476, + 0.7071067811865476, + 0.7071067811865476, + 0.7071067811865476 + ], + h: [ 1.0, 1.6666666666666667, 2.4285714285714284, @@ -1201,20 +1212,12 @@ defmodule Explorer.DataFrameTest do 8.017612524461839, 9.009775171065494 ], - k: [1, 2, 6, 24, 120, 720, 5040, 40320, 362_880, 3_628_800], - l: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0], - m: [ - 0.0, - 0.7071067811865476, - 0.7071067811865476, - 0.7071067811865476, - 0.7071067811865476, - 0.7071067811865476, - 0.7071067811865476, - 0.7071067811865476, - 0.7071067811865476, - 0.7071067811865476 - ] + p: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + q: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1], + r: [1, 3, 6, 10, 15, 21, 28, 36, 45, 55], + s: [10, 10, 10, 10, 10, 10, 10, 10, 10, 10], + t: [1, 2, 6, 24, 120, 720, 5040, 40320, 362_880, 3_628_800], + z: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0] } end @@ -1514,6 +1517,39 @@ defmodule Explorer.DataFrameTest do end end + test "slice strings" do + df = + DF.new( + a: ["_hello", "_world", "_foo", "_bar"], + b: ["venus", "earth", "mars", "jupiter"], + c: ["_foo", "_bar", "_baz", "_quox"], + d: ["_foo", "_bar", "_baz", "_quox"], + e: ["_foo", "_bar", "_baz", "_quox"] + ) + + df1 = + DF.mutate(df, + f: substring(a, 1), + g: substring(b, 2, 5), + h: substring(c, -3), + i: substring(d, 6, 10), + j: substring(e, -15, 2) + ) + + assert DF.to_columns(df1, atom_keys: true) == %{ + a: ["_hello", "_world", "_foo", "_bar"], + b: ["venus", "earth", "mars", "jupiter"], + c: ["_foo", "_bar", "_baz", "_quox"], + d: ["_foo", "_bar", "_baz", "_quox"], + e: ["_foo", "_bar", "_baz", "_quox"], + f: ["hello", "world", "foo", "bar"], + g: ["nus", "rth", "rs", "piter"], + h: ["foo", "bar", "baz", "uox"], + i: ["", "", "", ""], + j: ["_f", "_b", "_b", "_q"] + } + end + test "trim characters from string" do df = DF.new( diff --git a/test/explorer/series_test.exs b/test/explorer/series_test.exs index 6b4d5c663..dbb7f06e2 100644 --- a/test/explorer/series_test.exs +++ b/test/explorer/series_test.exs @@ -4004,6 +4004,36 @@ defmodule Explorer.SeriesTest do end end + describe "string_slicing" do + test "string_slice/2 positive offset" do + series = Series.from_list(["earth", "mars", "neptune"]) + + assert Series.substring(series, 2) |> Series.to_list() == ["rth", "rs", "ptune"] + assert Series.substring(series, 20) |> Series.to_list() == ["", "", ""] + end + + test "string_slice/2 negative offset" do + series = Series.from_list(["earth", "mars", "neptune"]) + + assert Series.substring(series, -3) |> Series.to_list() == ["rth", "ars", "une"] + assert Series.substring(series, -9) |> Series.to_list() == ["earth", "mars", "neptune"] + end + + test "string_slice/3 positive offset" do + series = Series.from_list(["earth", "mars", "neptune"]) + + assert Series.substring(series, 2, 3) |> Series.to_list() == ["rth", "rs", "ptu"] + assert Series.substring(series, 12, 13) |> Series.to_list() == ["", "", ""] + end + + test "string_slice/3 negative offset" do + series = Series.from_list(["earth", "mars", "neptune"]) + + assert Series.substring(series, -4, 4) |> Series.to_list() == ["arth", "mars", "tune"] + assert Series.substring(series, -20, 4) |> Series.to_list() == ["eart", "mars", "nept"] + end + end + describe "strptime/2 and strftime/2" do test "parse datetime from string" do series = Series.from_list(["2023-01-05 12:34:56", "XYZ", nil])