Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: import_stream internal method for Series to support Arrow C stream interface #1078

Merged
merged 27 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
98a81ec
refactor: import_stream method for Series
eitsupi May 5, 2024
16ec40f
test: add test
eitsupi May 5, 2024
9f095c1
test: more tests for nanoarrow
eitsupi May 6, 2024
2b976c1
refactor: handle name in rust side
eitsupi May 6, 2024
c6b4ee7
refactor!: use the Arrow C Stream interface inside `as_polars_df(<arr…
eitsupi May 6, 2024
8de1e21
refactor: simplify type check for struct nanoarrow_array_stream
eitsupi May 6, 2024
bbba938
feat: as_polars_df for arrow::RecordBatchReader
eitsupi May 6, 2024
350689c
refactor!: rewrite `as_polars_df` to use Arrow C Stream interface
eitsupi May 6, 2024
e4fdaad
docs(news): add bluets
eitsupi May 6, 2024
f7d917a
Merge remote-tracking branch 'upstream/main' into import_stream
eitsupi May 6, 2024
a70fbf6
test: ensure auto rechunk
eitsupi May 6, 2024
a543e95
test: add comment about the test case
eitsupi May 6, 2024
4cab33b
docs: update examples to remove removed options
eitsupi May 6, 2024
fa5e600
Revert "refactor!: rewrite `as_polars_df` to use Arrow C Stream inter…
eitsupi May 6, 2024
e04762c
feat: add the experimental argument to use the import_stream method i…
eitsupi May 6, 2024
fe06f29
feat: re add `$from_arrow_record_batches`
eitsupi May 7, 2024
2fde92a
feat: add `experimental` option to use C stream interface
eitsupi May 7, 2024
303ea0b
test: re add tests for as_polars_df(<ArrowTabular>)
eitsupi May 7, 2024
89fc55c
test: update snapshot
eitsupi May 7, 2024
e3b9567
chore: auto formatting
eitsupi May 7, 2024
5a97690
docs(news): update NEWS about C stream interface
eitsupi May 7, 2024
2b5e8a6
refactor: simplify
eitsupi May 7, 2024
038afb6
fix: `experimental` argument for as_polars_df(<nanoarrow_array_stream…
eitsupi May 8, 2024
b2277fb
docs(news): more notes about the Arrow C stream interface
eitsupi May 8, 2024
db284af
test: skip judges should be done inside `.cases`
eitsupi May 8, 2024
ccb51e7
refactor: simplify
eitsupi May 8, 2024
b6cc521
test: add more tests for the experimental argument
eitsupi May 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ S3method(as_polars_df,RPolarsLazyFrame)
S3method(as_polars_df,RPolarsLazyGroupBy)
S3method(as_polars_df,RPolarsRollingGroupBy)
S3method(as_polars_df,RPolarsSeries)
S3method(as_polars_df,RecordBatchReader)
S3method(as_polars_df,data.frame)
S3method(as_polars_df,default)
S3method(as_polars_df,nanoarrow_array)
Expand All @@ -171,6 +172,7 @@ S3method(as_polars_series,RPolarsChainedThen)
S3method(as_polars_series,RPolarsExpr)
S3method(as_polars_series,RPolarsSeries)
S3method(as_polars_series,RPolarsThen)
S3method(as_polars_series,RecordBatchReader)
S3method(as_polars_series,clock_sys_time)
S3method(as_polars_series,clock_time_point)
S3method(as_polars_series,clock_zoned_time)
Expand Down
9 changes: 9 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## Polars R Package (development version)

### Breaking changes

- The following arguments are removed from `as_polars_df(<ArrowTabular>)` (#1078).
- `schema` and `schema_overrides`. Use the `<DataFrame>$select()` method and
the `<Expr>$cast()` method after conversion to `DataFrame` instead.
- `rechunk`. All chunks are automatically rechunked now.

### New features

- `pl$read_ipc()` can read a raw vector of Apache Arrow IPC file (#1072).
Expand All @@ -15,6 +22,8 @@
- New S3 methods `nanoarrow::as_nanoarrow_array_stream()` and `nanoarrow::infer_nanoarrow_schema()`
for `RPolarsSeries` (#1076).
- New method `$dt$is_leap_year()` (#1077).
- `as_polars_series()` and `as_polars_df()` can create polars objects from `arrow::RecordBatchReader`
via the Apache Arrow C stream interface (#1078).

## Polars R Package 0.16.3

Expand Down
88 changes: 38 additions & 50 deletions R/as_polars.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ as_polars_df.default = function(x, ...) {
#' @param make_names_unique A logical flag to replace duplicated column names
#' with unique names. If `FALSE` and there are duplicated column names, an
#' error is thrown.
#' @param schema named list of DataTypes, or character vector of column names.
#' Should match the number of columns in `x` and correspond to each column in `x` by position.
#' If a column in `x` does not match the name or type at the same position, it will be renamed/recast.
#' If `NULL` (default), convert columns as is.
#' @param schema_overrides named list of DataTypes. Cast some columns to the DataType.
#' @inheritParams as_polars_df.ArrowTabular
#' @export
as_polars_df.data.frame = function(
Expand Down Expand Up @@ -199,26 +204,16 @@ as_polars_df.RPolarsLazyGroupBy = function(x, ...) {

# TODO: link to DataTypes documents
#' @rdname as_polars_df
#' @param rechunk A logical flag (default `TRUE`).
#' Make sure that all data of each column is in contiguous memory.
#' @param schema named list of DataTypes, or character vector of column names.
#' Should match the number of columns in `x` and correspond to each column in `x` by position.
#' If a column in `x` does not match the name or type at the same position, it will be renamed/recast.
#' If `NULL` (default), convert columns as is.
#' @param schema_overrides named list of DataTypes. Cast some columns to the DataType.
#' @export
as_polars_df.ArrowTabular = function(
x,
...,
rechunk = TRUE,
schema = NULL,
schema_overrides = NULL) {
arrow_to_rpldf(
x,
rechunk = rechunk,
schema = schema,
schema_overrides = schema_overrides
)
as_polars_df.ArrowTabular = function(x, ...) {
as_polars_df.RecordBatchReader(arrow::as_record_batch_reader(x))
}


#' @rdname as_polars_df
#' @export
as_polars_df.RecordBatchReader = function(x, ...) {
as_polars_series.RecordBatchReader(x, name = "")$to_frame()$unnest("")
}


Expand All @@ -234,33 +229,19 @@ as_polars_df.nanoarrow_array = function(x, ...) {
unwrap("in as_polars_df(<nanoarrow_array>):")
}

series = as_polars_series.nanoarrow_array(x, name = NULL)

if (length(series)) {
series$to_frame()$unnest("")
} else {
# TODO: support 0-length array
pl$DataFrame()
}
as_polars_series.nanoarrow_array(x, name = "")$to_frame()$unnest("")
}


#' @rdname as_polars_df
#' @export
as_polars_df.nanoarrow_array_stream = function(x, ...) {
if (!inherits(nanoarrow::infer_nanoarrow_ptype(x$get_schema()), "data.frame")) {
if (!identical(nanoarrow::nanoarrow_schema_parse(x$get_schema())$type, "struct")) {
Err_plain("Can't convert non-struct array stream to RPolarsDataFrame") |>
unwrap("in as_polars_df(<nanoarrow_array_stream>):")
}

series = as_polars_series.nanoarrow_array_stream(x, name = NULL)

if (length(series)) {
series$to_frame()$unnest("")
} else {
# TODO: support 0-length array stream
pl$DataFrame()
}
as_polars_series.nanoarrow_array_stream(x, name = "")$to_frame()$unnest("")
}


Expand Down Expand Up @@ -397,6 +378,20 @@ as_polars_series.Array = function(x, name = NULL, ..., rechunk = TRUE) {
as_polars_series.ChunkedArray = as_polars_series.Array


#' @rdname as_polars_series
#' @export
as_polars_series.RecordBatchReader = function(x, name = NULL, ...) {
stream_out = polars_allocate_array_stream()
x$export_to_c(stream_out)

.pr$Series$import_stream(
name %||% "",
stream_out
) |>
unwrap("in as_polars_series(<RecordBatchReader>):")
}


#' @rdname as_polars_series
#' @export
as_polars_series.nanoarrow_array = function(x, name = NULL, ...) {
Expand All @@ -411,21 +406,14 @@ as_polars_series.nanoarrow_array = function(x, name = NULL, ...) {
as_polars_series.nanoarrow_array_stream = function(x, name = NULL, ...) {
on.exit(x$release())

list_of_arrays = nanoarrow::collect_array_stream(x, validate = FALSE)

if (length(list_of_arrays) < 1L) {
# TODO: support 0-length array stream
out = pl$Series(name = name)
} else {
out = as_polars_series.nanoarrow_array(list_of_arrays[[1L]], name = name)
lapply(
list_of_arrays[-1L],
\(array) .pr$Series$append_mut(out, as_polars_series.nanoarrow_array(array))
) |>
invisible()
}
stream_out = polars_allocate_array_stream()
nanoarrow::nanoarrow_pointer_export(x, stream_out)

out
.pr$Series$import_stream(
name %||% "",
stream_out
) |>
unwrap("in as_polars_series(<nanoarrow_array_stream>):")
}


Expand Down
101 changes: 1 addition & 100 deletions R/construction.R
Original file line number Diff line number Diff line change
@@ -1,102 +1,3 @@
#' Internal function of `as_polars_df()` for `arrow::Table` class objects.
#'
#' This is a copy of Python Polars' `arrow_to_pydf` function.
#' @param at arrow::ArrowTabular (arrow::Table and arrow::RecordBatch)
#' @param rechunk A logical flag (default `TRUE`).
#' Make sure that all data of each column is in contiguous memory.
#' @param schema named list of DataTypes, or character vector of column names.
#' Should be the same length as the number of columns of `x`.
#' If schema names or types do not match `x`, the columns will be renamed/recast.
#' If `NULL` (default), convert columns as is.
#' @param schema_overrides named list of DataTypes. Cast some columns to the DataType.
#' @noRd
#' @return RPolarsDataFrame
arrow_to_rpldf = function(at, schema = NULL, schema_overrides = NULL, rechunk = TRUE) {
# new column names by schema, #todo get names if schema not NULL
n_cols = at$num_columns

new_schema = unpack_schema(
schema = schema %||% names(at),
schema_overrides = schema_overrides
)
col_names = names(new_schema)

if (length(col_names) != n_cols) {
Err_plain("schema length does not match column length") |>
unwrap()
}

data_cols = list()
# dictionaries cannot be built in different batches (categorical does not allow
# that) so we rechunk them and create them separately.
# struct columns don't work properly if they contain multiple chunks.
special_cols = list()

## iter over columns, possibly do special conversion
for (i in seq_len(n_cols)) {
column = at$column(i - 1L)
col_name = col_names[i]

if (is_arrow_dictionary(column)) {
column = coerce_arrow(column)
special_cols[[col_name]] = as_polars_series.ChunkedArray(column, col_name, rechunk = rechunk)
} else if (is_arrow_struct(column) && column$num_chunks > 1L) {
special_cols[[col_name]] = as_polars_series.ChunkedArray(column, col_name, rechunk = rechunk)
} else {
data_cols[[col_name]] = column
}
}

if (length(data_cols)) {
tbl = do.call(arrow::arrow_table, data_cols)

if (tbl$num_rows == 0L) {
rdf = pl$DataFrame() # TODO: support creating 0-row DataFrame
} else {
rdf = unwrap(
.pr$DataFrame$from_arrow_record_batches(arrow::as_record_batch_reader(tbl)$batches())
)
}
} else {
rdf = pl$DataFrame()
}

if (rechunk) {
rdf = rdf$select(pl$all()$rechunk())
}

if (length(special_cols)) {
rdf = rdf$with_columns(
unname(lapply(special_cols, \(s) pl$lit(s)$alias(s$name)))
)$select(
pl$col(col_names)
)
}

# cast any imported arrow fields not matching schema
cast_these_fields = mapply(
new_schema,
rdf$schema,
FUN = \(new_field, df_field) {
if (is.null(new_field) || new_field == df_field) NULL else new_field
},
SIMPLIFY = FALSE
) |> (\(l) l[!sapply(l, is.null)])()

if (length(cast_these_fields)) {
rdf = rdf$with_columns(
mapply(
cast_these_fields,
names(cast_these_fields),
FUN = \(dtype, name) pl$col(name)$cast(dtype),
SIMPLIFY = FALSE
) |> unname()
)
}

rdf
}

unpack_schema = function(
schema = NULL, # char vector of names or 'schema' a named list of DataTypes
schema_overrides = NULL # named list of DataTypes
Expand Down Expand Up @@ -206,7 +107,7 @@ arrow_to_rseries_result = function(name, values, rechunk = TRUE) {

#' Internal function of `as_polars_df()` for `data.frame` class objects.
#'
#' This is a copy of `arrow_to_rpldf`
#' This is a copy of Python Polars' `arrow_to_pydf` function.
#' @noRd
#' @return RPolarsDataFrame
df_to_rpldf = function(x, ..., schema = NULL, schema_overrides = NULL) {
Expand Down
12 changes: 3 additions & 9 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#' @useDynLib polars, .registration = TRUE
NULL

polars_allocate_array_stream <- function() .Call(wrap__polars_allocate_array_stream)

all_horizontal <- function(dotdotdot) .Call(wrap__all_horizontal, dotdotdot)

any_horizontal <- function(dotdotdot) .Call(wrap__any_horizontal, dotdotdot)
Expand Down Expand Up @@ -58,12 +60,6 @@ struct_ <- function(exprs, eager, schema) .Call(wrap__struct_, exprs, eager, sch

dtype_str_repr <- function(dtype) .Call(wrap__dtype_str_repr, dtype)

new_arrow_stream <- function() .Call(wrap__new_arrow_stream)

arrow_stream_to_df <- function(robj_str) .Call(wrap__arrow_stream_to_df, robj_str)

arrow_stream_to_series <- function(robj_str) .Call(wrap__arrow_stream_to_series, robj_str)

mem_address <- function(robj) .Call(wrap__mem_address, robj)

clone_robj <- function(robj) .Call(wrap__clone_robj, robj)
Expand Down Expand Up @@ -204,8 +200,6 @@ RPolarsDataFrame$partition_by <- function(by, maintain_order, include_key) .Call

RPolarsDataFrame$export_stream <- function(stream_ptr, pl_flavor) invisible(.Call(wrap__RPolarsDataFrame__export_stream, self, stream_ptr, pl_flavor))

RPolarsDataFrame$from_arrow_record_batches <- function(rbr) .Call(wrap__RPolarsDataFrame__from_arrow_record_batches, rbr)

RPolarsDataFrame$estimated_size <- function() .Call(wrap__RPolarsDataFrame__estimated_size, self)

RPolarsDataFrame$null_count <- function() .Call(wrap__RPolarsDataFrame__null_count, self)
Expand Down Expand Up @@ -1376,7 +1370,7 @@ RPolarsSeries$struct_fields <- function() .Call(wrap__RPolarsSeries__struct_fiel

RPolarsSeries$export_stream <- function(stream_ptr, pl_flavor) invisible(.Call(wrap__RPolarsSeries__export_stream, self, stream_ptr, pl_flavor))

RPolarsSeries$from_arrow_array_stream_str <- function(name, robj_str) .Call(wrap__RPolarsSeries__from_arrow_array_stream_str, name, robj_str)
RPolarsSeries$import_stream <- function(name, stream_ptr) .Call(wrap__RPolarsSeries__import_stream, name, stream_ptr)

RPolarsSeries$from_arrow_array_robj <- function(name, array) .Call(wrap__RPolarsSeries__from_arrow_array_robj, name, array)

Expand Down
8 changes: 4 additions & 4 deletions man/as_polars_df.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions man/as_polars_series.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions src/rust/src/arrow_interop/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod to_rust;

use polars_core::utils::arrow;

use extendr_api::prelude::*;
use std::result::Result;

Expand Down Expand Up @@ -61,3 +63,16 @@ impl RPackage for NanoArrowRPackage {
"#)
}
}

#[extendr]
pub fn polars_allocate_array_stream() -> Robj {
let aas = Box::new(arrow::ffi::ArrowArrayStream::empty());
let x = Box::leak(aas); // leak box to make lifetime static
let x = x as *mut arrow::ffi::ArrowArrayStream;
eitsupi marked this conversation as resolved.
Show resolved Hide resolved
format!("{:?}", x as usize).into()
}

extendr_module! {
mod arrow_interop;
fn polars_allocate_array_stream;
}
Loading
Loading