Skip to content

Commit

Permalink
feat: import_stream internal method for Series to support Arrow C str…
Browse files Browse the repository at this point in the history
…eam interface (#1078)
  • Loading branch information
eitsupi authored May 8, 2024
1 parent 595f022 commit b42ee0a
Show file tree
Hide file tree
Showing 17 changed files with 350 additions and 334 deletions.
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ S3method(as_polars_df,RPolarsLazyFrame)
S3method(as_polars_df,RPolarsLazyGroupBy)
S3method(as_polars_df,RPolarsRollingGroupBy)
S3method(as_polars_df,RPolarsSeries)
S3method(as_polars_df,RecordBatchReader)
S3method(as_polars_df,data.frame)
S3method(as_polars_df,default)
S3method(as_polars_df,nanoarrow_array)
Expand All @@ -171,6 +172,7 @@ S3method(as_polars_series,RPolarsChainedThen)
S3method(as_polars_series,RPolarsExpr)
S3method(as_polars_series,RPolarsSeries)
S3method(as_polars_series,RPolarsThen)
S3method(as_polars_series,RecordBatchReader)
S3method(as_polars_series,clock_sys_time)
S3method(as_polars_series,clock_time_point)
S3method(as_polars_series,clock_zoned_time)
Expand Down
7 changes: 7 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
- New S3 methods `nanoarrow::as_nanoarrow_array_stream()` and `nanoarrow::infer_nanoarrow_schema()`
for `RPolarsSeries` (#1076).
- New method `$dt$is_leap_year()` (#1077).
- `as_polars_df()` and `as_polars_series()` supports `arrow::RecordBatchReader` (#1078).
- The new `experimental` argument for `as_polars_df(<ArrowTabular>)`, `as_polars_df(<RecordBatchReader>)`,
`as_polars_series(<nanoarrow_array_stream>)`, and `as_polars_df(<nanoarrow_array_stream>)` (#1078).
If `experimental = TRUE`, these functions switch to use
[the Arrow C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html) internally.
At this point, the performance is degraded under the expected use cases,
so the default is set to `experimental = FALSE`.

## Polars R Package 0.16.3

Expand Down
116 changes: 80 additions & 36 deletions R/as_polars.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,36 @@
#' [$collect()][LazyFrame_collect] or [$fetch()][LazyFrame_fetch], depending on
#' whether the number of rows to fetch is infinite or not.
#' @rdname as_polars_df
#' @inheritParams as_polars_series
#' @param x Object to convert to a polars DataFrame.
#' @param ... Additional arguments passed to methods.
#' @return a [DataFrame][DataFrame_class]
#' @examplesIf requireNamespace("arrow", quietly = TRUE)
#' # Convert the row names of a data frame to a column
#' as_polars_df(mtcars, rownames = "car")
#'
#' # Convert an arrow Table to a polars DataFrame
#' at = arrow::arrow_table(x = 1:5, y = 6:10)
#' as_polars_df(at)
#'
#' # Convert an arrow Table, with renaming all columns
#' # Convert a data frame, with renaming all columns
#' as_polars_df(
#' at,
#' data.frame(x = 1, y = 2),
#' schema = c("a", "b")
#' )
#'
#' # Convert an arrow Table, with renaming and casting all columns
#' # Convert a data frame, with renaming and casting all columns
#' as_polars_df(
#' at,
#' data.frame(x = 1, y = 2),
#' schema = list(b = pl$Int64, a = pl$String)
#' )
#'
#' # Convert an arrow Table, with casting some columns
#' # Convert a data frame, with casting some columns
#' as_polars_df(
#' at,
#' data.frame(x = 1, y = 2),
#' schema_overrides = list(y = pl$String) # cast some columns
#' )
#'
#' # Convert an arrow Table to a polars DataFrame
#' at = arrow::arrow_table(x = 1:5, y = 6:10)
#' as_polars_df(at)
#'
#' # Create a polars DataFrame from a data.frame
#' lf = as_polars_df(mtcars)$lazy()
#'
Expand Down Expand Up @@ -212,13 +213,33 @@ as_polars_df.ArrowTabular = function(
...,
rechunk = TRUE,
schema = NULL,
schema_overrides = NULL) {
schema_overrides = NULL,
experimental = FALSE) {
arrow_to_rpldf(
x,
rechunk = rechunk,
schema = schema,
schema_overrides = schema_overrides
)
schema_overrides = schema_overrides,
experimental = experimental
) |>
result() |>
unwrap("in as_polars_df():")
}


#' @rdname as_polars_df
#' @export
as_polars_df.RecordBatchReader = function(x, ..., experimental = FALSE) {
uw = \(res) unwrap(res, "in as_polars_df(<RecordBatchReader>):")

if (isTRUE(experimental)) {
as_polars_series(x, name = "")$to_frame()$unnest("") |>
result() |>
uw()
} else {
.pr$DataFrame$from_arrow_record_batches(x$batches()) |>
uw()
}
}


Expand Down Expand Up @@ -247,20 +268,16 @@ as_polars_df.nanoarrow_array = function(x, ...) {

#' @rdname as_polars_df
#' @export
as_polars_df.nanoarrow_array_stream = function(x, ...) {
if (!inherits(nanoarrow::infer_nanoarrow_ptype(x$get_schema()), "data.frame")) {
as_polars_df.nanoarrow_array_stream = function(x, ..., experimental = FALSE) {
if (!identical(nanoarrow::nanoarrow_schema_parse(x$get_schema())$type, "struct")) {
Err_plain("Can't convert non-struct array stream to RPolarsDataFrame") |>
unwrap("in as_polars_df(<nanoarrow_array_stream>):")
}

series = as_polars_series.nanoarrow_array_stream(x, name = NULL)

if (length(series)) {
series$to_frame()$unnest("")
} else {
# TODO: support 0-length array stream
pl$DataFrame()
}
as_polars_series.nanoarrow_array_stream(
x,
name = "", experimental = experimental
)$to_frame()$unnest("")
}


Expand Down Expand Up @@ -397,6 +414,20 @@ as_polars_series.Array = function(x, name = NULL, ..., rechunk = TRUE) {
as_polars_series.ChunkedArray = as_polars_series.Array


#' @rdname as_polars_series
#' @export
as_polars_series.RecordBatchReader = function(x, name = NULL, ...) {
stream_out = polars_allocate_array_stream()
x$export_to_c(stream_out)

.pr$Series$import_stream(
name %||% "",
stream_out
) |>
unwrap("in as_polars_series(<RecordBatchReader>):")
}


#' @rdname as_polars_series
#' @export
as_polars_series.nanoarrow_array = function(x, name = NULL, ...) {
Expand All @@ -406,26 +437,39 @@ as_polars_series.nanoarrow_array = function(x, name = NULL, ...) {
}


#' @param experimental If `TRUE`, use experimental Arrow C stream interface inside the function.
#' This argument is experimental and may be removed in the future.
#' @rdname as_polars_series
#' @export
as_polars_series.nanoarrow_array_stream = function(x, name = NULL, ...) {
as_polars_series.nanoarrow_array_stream = function(x, name = NULL, ..., experimental = FALSE) {
on.exit(x$release())

list_of_arrays = nanoarrow::collect_array_stream(x, validate = FALSE)
if (isTRUE(experimental)) {
stream_out = polars_allocate_array_stream()
nanoarrow::nanoarrow_pointer_export(x, stream_out)

if (length(list_of_arrays) < 1L) {
# TODO: support 0-length array stream
out = pl$Series(name = name)
} else {
out = as_polars_series.nanoarrow_array(list_of_arrays[[1L]], name = name)
lapply(
list_of_arrays[-1L],
\(array) .pr$Series$append_mut(out, as_polars_series.nanoarrow_array(array))
.pr$Series$import_stream(
name %||% "",
stream_out
) |>
invisible()
}
unwrap("in as_polars_series(<nanoarrow_array_stream>):")
} else {
list_of_arrays = nanoarrow::collect_array_stream(x, validate = FALSE)

out
if (length(list_of_arrays) < 1L) {
# TODO: support 0-length array stream
out = pl$Series(name = name)
} else {
out = as_polars_series.nanoarrow_array(list_of_arrays[[1L]], name = name)
lapply(
list_of_arrays[-1L],
\(array) .pr$Series$append_mut(out, as_polars_series.nanoarrow_array(array))
) |>
invisible()
}

out
}
}


Expand Down
8 changes: 4 additions & 4 deletions R/construction.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
#' If schema names or types do not match `x`, the columns will be renamed/recast.
#' If `NULL` (default), convert columns as is.
#' @param schema_overrides named list of DataTypes. Cast some columns to the DataType.
#' @param experimental If `TRUE`, use the Arrow C stream interface.
#' @noRd
#' @return RPolarsDataFrame
arrow_to_rpldf = function(at, schema = NULL, schema_overrides = NULL, rechunk = TRUE) {
arrow_to_rpldf = function(
at, schema = NULL, schema_overrides = NULL, rechunk = TRUE, ..., experimental = FALSE) {
# new column names by schema, #todo get names if schema not NULL
n_cols = at$num_columns

Expand Down Expand Up @@ -53,9 +55,7 @@ arrow_to_rpldf = function(at, schema = NULL, schema_overrides = NULL, rechunk =
if (tbl$num_rows == 0L) {
rdf = pl$DataFrame() # TODO: support creating 0-row DataFrame
} else {
rdf = unwrap(
.pr$DataFrame$from_arrow_record_batches(arrow::as_record_batch_reader(tbl)$batches())
)
rdf = as_polars_df(arrow::as_record_batch_reader(tbl), experimental = experimental)
}
} else {
rdf = pl$DataFrame()
Expand Down
10 changes: 3 additions & 7 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#' @useDynLib polars, .registration = TRUE
NULL

polars_allocate_array_stream <- function() .Call(wrap__polars_allocate_array_stream)

all_horizontal <- function(dotdotdot) .Call(wrap__all_horizontal, dotdotdot)

any_horizontal <- function(dotdotdot) .Call(wrap__any_horizontal, dotdotdot)
Expand Down Expand Up @@ -58,12 +60,6 @@ struct_ <- function(exprs, eager, schema) .Call(wrap__struct_, exprs, eager, sch

dtype_str_repr <- function(dtype) .Call(wrap__dtype_str_repr, dtype)

new_arrow_stream <- function() .Call(wrap__new_arrow_stream)

arrow_stream_to_df <- function(robj_str) .Call(wrap__arrow_stream_to_df, robj_str)

arrow_stream_to_series <- function(robj_str) .Call(wrap__arrow_stream_to_series, robj_str)

mem_address <- function(robj) .Call(wrap__mem_address, robj)

clone_robj <- function(robj) .Call(wrap__clone_robj, robj)
Expand Down Expand Up @@ -1376,7 +1372,7 @@ RPolarsSeries$struct_fields <- function() .Call(wrap__RPolarsSeries__struct_fiel

RPolarsSeries$export_stream <- function(stream_ptr, pl_flavor) invisible(.Call(wrap__RPolarsSeries__export_stream, self, stream_ptr, pl_flavor))

RPolarsSeries$from_arrow_array_stream_str <- function(name, robj_str) .Call(wrap__RPolarsSeries__from_arrow_array_stream_str, name, robj_str)
RPolarsSeries$import_stream <- function(name, stream_ptr) .Call(wrap__RPolarsSeries__import_stream, name, stream_ptr)

RPolarsSeries$from_arrow_array_robj <- function(name, array) .Call(wrap__RPolarsSeries__from_arrow_array_robj, name, array)

Expand Down
37 changes: 25 additions & 12 deletions man/as_polars_df.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion man/as_polars_series.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b42ee0a

Please sign in to comment.