From b42ee0a0f39e48ae173d861da048b9bdf1dc8cad Mon Sep 17 00:00:00 2001 From: eitsupi <50911393+eitsupi@users.noreply.github.com> Date: Wed, 8 May 2024 23:28:22 +0900 Subject: [PATCH] feat: import_stream internal method for Series to support Arrow C stream interface (#1078) --- NAMESPACE | 2 + NEWS.md | 7 + R/as_polars.R | 116 +++++++---- R/construction.R | 8 +- R/extendr-wrappers.R | 10 +- man/as_polars_df.Rd | 37 ++-- man/as_polars_series.Rd | 8 +- src/rust/src/arrow_interop/mod.rs | 14 ++ src/rust/src/arrow_interop/to_rust.rs | 53 +---- src/rust/src/lib.rs | 2 + src/rust/src/rlib.rs | 28 --- src/rust/src/series.rs | 19 +- src/rust/src/utils/mod.rs | 16 -- tests/testthat/_snaps/after-wrappers.md | 43 ++--- tests/testthat/test-arrow-c-interface.R | 38 ++++ tests/testthat/test-arrow_extendr_polars.R | 70 ------- tests/testthat/test-as_polars.R | 213 +++++++++++++-------- 17 files changed, 350 insertions(+), 334 deletions(-) create mode 100644 tests/testthat/test-arrow-c-interface.R delete mode 100644 tests/testthat/test-arrow_extendr_polars.R diff --git a/NAMESPACE b/NAMESPACE index 97ac52165..72197e570 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -157,6 +157,7 @@ S3method(as_polars_df,RPolarsLazyFrame) S3method(as_polars_df,RPolarsLazyGroupBy) S3method(as_polars_df,RPolarsRollingGroupBy) S3method(as_polars_df,RPolarsSeries) +S3method(as_polars_df,RecordBatchReader) S3method(as_polars_df,data.frame) S3method(as_polars_df,default) S3method(as_polars_df,nanoarrow_array) @@ -171,6 +172,7 @@ S3method(as_polars_series,RPolarsChainedThen) S3method(as_polars_series,RPolarsExpr) S3method(as_polars_series,RPolarsSeries) S3method(as_polars_series,RPolarsThen) +S3method(as_polars_series,RecordBatchReader) S3method(as_polars_series,clock_sys_time) S3method(as_polars_series,clock_time_point) S3method(as_polars_series,clock_zoned_time) diff --git a/NEWS.md b/NEWS.md index 1db1b2c34..3177d4d22 100644 --- a/NEWS.md +++ b/NEWS.md @@ -15,6 +15,13 @@ - New S3 methods `nanoarrow::as_nanoarrow_array_stream()` and `nanoarrow::infer_nanoarrow_schema()` for `RPolarsSeries` (#1076). - New method `$dt$is_leap_year()` (#1077). +- `as_polars_df()` and `as_polars_series()` supports `arrow::RecordBatchReader` (#1078). +- The new `experimental` argument for `as_polars_df()`, `as_polars_df()`, + `as_polars_series()`, and `as_polars_df()` (#1078). + If `experimental = TRUE`, these functions switch to use + [the Arrow C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html) internally. + At this point, the performance is degraded under the expected use cases, + so the default is set to `experimental = FALSE`. ## Polars R Package 0.16.3 diff --git a/R/as_polars.R b/R/as_polars.R index 45a9e7a09..774fb25d2 100644 --- a/R/as_polars.R +++ b/R/as_polars.R @@ -7,6 +7,7 @@ #' [$collect()][LazyFrame_collect] or [$fetch()][LazyFrame_fetch], depending on #' whether the number of rows to fetch is infinite or not. #' @rdname as_polars_df +#' @inheritParams as_polars_series #' @param x Object to convert to a polars DataFrame. #' @param ... Additional arguments passed to methods. #' @return a [DataFrame][DataFrame_class] @@ -14,28 +15,28 @@ #' # Convert the row names of a data frame to a column #' as_polars_df(mtcars, rownames = "car") #' -#' # Convert an arrow Table to a polars DataFrame -#' at = arrow::arrow_table(x = 1:5, y = 6:10) -#' as_polars_df(at) -#' -#' # Convert an arrow Table, with renaming all columns +#' # Convert a data frame, with renaming all columns #' as_polars_df( -#' at, +#' data.frame(x = 1, y = 2), #' schema = c("a", "b") #' ) #' -#' # Convert an arrow Table, with renaming and casting all columns +#' # Convert a data frame, with renaming and casting all columns #' as_polars_df( -#' at, +#' data.frame(x = 1, y = 2), #' schema = list(b = pl$Int64, a = pl$String) #' ) #' -#' # Convert an arrow Table, with casting some columns +#' # Convert a data frame, with casting some columns #' as_polars_df( -#' at, +#' data.frame(x = 1, y = 2), #' schema_overrides = list(y = pl$String) # cast some columns #' ) #' +#' # Convert an arrow Table to a polars DataFrame +#' at = arrow::arrow_table(x = 1:5, y = 6:10) +#' as_polars_df(at) +#' #' # Create a polars DataFrame from a data.frame #' lf = as_polars_df(mtcars)$lazy() #' @@ -212,13 +213,33 @@ as_polars_df.ArrowTabular = function( ..., rechunk = TRUE, schema = NULL, - schema_overrides = NULL) { + schema_overrides = NULL, + experimental = FALSE) { arrow_to_rpldf( x, rechunk = rechunk, schema = schema, - schema_overrides = schema_overrides - ) + schema_overrides = schema_overrides, + experimental = experimental + ) |> + result() |> + unwrap("in as_polars_df():") +} + + +#' @rdname as_polars_df +#' @export +as_polars_df.RecordBatchReader = function(x, ..., experimental = FALSE) { + uw = \(res) unwrap(res, "in as_polars_df():") + + if (isTRUE(experimental)) { + as_polars_series(x, name = "")$to_frame()$unnest("") |> + result() |> + uw() + } else { + .pr$DataFrame$from_arrow_record_batches(x$batches()) |> + uw() + } } @@ -247,20 +268,16 @@ as_polars_df.nanoarrow_array = function(x, ...) { #' @rdname as_polars_df #' @export -as_polars_df.nanoarrow_array_stream = function(x, ...) { - if (!inherits(nanoarrow::infer_nanoarrow_ptype(x$get_schema()), "data.frame")) { +as_polars_df.nanoarrow_array_stream = function(x, ..., experimental = FALSE) { + if (!identical(nanoarrow::nanoarrow_schema_parse(x$get_schema())$type, "struct")) { Err_plain("Can't convert non-struct array stream to RPolarsDataFrame") |> unwrap("in as_polars_df():") } - series = as_polars_series.nanoarrow_array_stream(x, name = NULL) - - if (length(series)) { - series$to_frame()$unnest("") - } else { - # TODO: support 0-length array stream - pl$DataFrame() - } + as_polars_series.nanoarrow_array_stream( + x, + name = "", experimental = experimental + )$to_frame()$unnest("") } @@ -397,6 +414,20 @@ as_polars_series.Array = function(x, name = NULL, ..., rechunk = TRUE) { as_polars_series.ChunkedArray = as_polars_series.Array +#' @rdname as_polars_series +#' @export +as_polars_series.RecordBatchReader = function(x, name = NULL, ...) { + stream_out = polars_allocate_array_stream() + x$export_to_c(stream_out) + + .pr$Series$import_stream( + name %||% "", + stream_out + ) |> + unwrap("in as_polars_series():") +} + + #' @rdname as_polars_series #' @export as_polars_series.nanoarrow_array = function(x, name = NULL, ...) { @@ -406,26 +437,39 @@ as_polars_series.nanoarrow_array = function(x, name = NULL, ...) { } +#' @param experimental If `TRUE`, use experimental Arrow C stream interface inside the function. +#' This argument is experimental and may be removed in the future. #' @rdname as_polars_series #' @export -as_polars_series.nanoarrow_array_stream = function(x, name = NULL, ...) { +as_polars_series.nanoarrow_array_stream = function(x, name = NULL, ..., experimental = FALSE) { on.exit(x$release()) - list_of_arrays = nanoarrow::collect_array_stream(x, validate = FALSE) + if (isTRUE(experimental)) { + stream_out = polars_allocate_array_stream() + nanoarrow::nanoarrow_pointer_export(x, stream_out) - if (length(list_of_arrays) < 1L) { - # TODO: support 0-length array stream - out = pl$Series(name = name) - } else { - out = as_polars_series.nanoarrow_array(list_of_arrays[[1L]], name = name) - lapply( - list_of_arrays[-1L], - \(array) .pr$Series$append_mut(out, as_polars_series.nanoarrow_array(array)) + .pr$Series$import_stream( + name %||% "", + stream_out ) |> - invisible() - } + unwrap("in as_polars_series():") + } else { + list_of_arrays = nanoarrow::collect_array_stream(x, validate = FALSE) - out + if (length(list_of_arrays) < 1L) { + # TODO: support 0-length array stream + out = pl$Series(name = name) + } else { + out = as_polars_series.nanoarrow_array(list_of_arrays[[1L]], name = name) + lapply( + list_of_arrays[-1L], + \(array) .pr$Series$append_mut(out, as_polars_series.nanoarrow_array(array)) + ) |> + invisible() + } + + out + } } diff --git a/R/construction.R b/R/construction.R index 8c1dddac2..c8f764f41 100644 --- a/R/construction.R +++ b/R/construction.R @@ -9,9 +9,11 @@ #' If schema names or types do not match `x`, the columns will be renamed/recast. #' If `NULL` (default), convert columns as is. #' @param schema_overrides named list of DataTypes. Cast some columns to the DataType. +#' @param experimental If `TRUE`, use the Arrow C stream interface. #' @noRd #' @return RPolarsDataFrame -arrow_to_rpldf = function(at, schema = NULL, schema_overrides = NULL, rechunk = TRUE) { +arrow_to_rpldf = function( + at, schema = NULL, schema_overrides = NULL, rechunk = TRUE, ..., experimental = FALSE) { # new column names by schema, #todo get names if schema not NULL n_cols = at$num_columns @@ -53,9 +55,7 @@ arrow_to_rpldf = function(at, schema = NULL, schema_overrides = NULL, rechunk = if (tbl$num_rows == 0L) { rdf = pl$DataFrame() # TODO: support creating 0-row DataFrame } else { - rdf = unwrap( - .pr$DataFrame$from_arrow_record_batches(arrow::as_record_batch_reader(tbl)$batches()) - ) + rdf = as_polars_df(arrow::as_record_batch_reader(tbl), experimental = experimental) } } else { rdf = pl$DataFrame() diff --git a/R/extendr-wrappers.R b/R/extendr-wrappers.R index 80ae2629d..ff5d5d890 100644 --- a/R/extendr-wrappers.R +++ b/R/extendr-wrappers.R @@ -10,6 +10,8 @@ #' @useDynLib polars, .registration = TRUE NULL +polars_allocate_array_stream <- function() .Call(wrap__polars_allocate_array_stream) + all_horizontal <- function(dotdotdot) .Call(wrap__all_horizontal, dotdotdot) any_horizontal <- function(dotdotdot) .Call(wrap__any_horizontal, dotdotdot) @@ -58,12 +60,6 @@ struct_ <- function(exprs, eager, schema) .Call(wrap__struct_, exprs, eager, sch dtype_str_repr <- function(dtype) .Call(wrap__dtype_str_repr, dtype) -new_arrow_stream <- function() .Call(wrap__new_arrow_stream) - -arrow_stream_to_df <- function(robj_str) .Call(wrap__arrow_stream_to_df, robj_str) - -arrow_stream_to_series <- function(robj_str) .Call(wrap__arrow_stream_to_series, robj_str) - mem_address <- function(robj) .Call(wrap__mem_address, robj) clone_robj <- function(robj) .Call(wrap__clone_robj, robj) @@ -1376,7 +1372,7 @@ RPolarsSeries$struct_fields <- function() .Call(wrap__RPolarsSeries__struct_fiel RPolarsSeries$export_stream <- function(stream_ptr, pl_flavor) invisible(.Call(wrap__RPolarsSeries__export_stream, self, stream_ptr, pl_flavor)) -RPolarsSeries$from_arrow_array_stream_str <- function(name, robj_str) .Call(wrap__RPolarsSeries__from_arrow_array_stream_str, name, robj_str) +RPolarsSeries$import_stream <- function(name, stream_ptr) .Call(wrap__RPolarsSeries__import_stream, name, stream_ptr) RPolarsSeries$from_arrow_array_robj <- function(name, array) .Call(wrap__RPolarsSeries__from_arrow_array_robj, name, array) diff --git a/man/as_polars_df.Rd b/man/as_polars_df.Rd index e7ab6ada5..f6bdd1ba9 100644 --- a/man/as_polars_df.Rd +++ b/man/as_polars_df.Rd @@ -12,6 +12,7 @@ \alias{as_polars_df.RPolarsLazyFrame} \alias{as_polars_df.RPolarsLazyGroupBy} \alias{as_polars_df.ArrowTabular} +\alias{as_polars_df.RecordBatchReader} \alias{as_polars_df.nanoarrow_array} \alias{as_polars_df.nanoarrow_array_stream} \title{To polars DataFrame} @@ -58,11 +59,20 @@ as_polars_df(x, ...) \method{as_polars_df}{RPolarsLazyGroupBy}(x, ...) -\method{as_polars_df}{ArrowTabular}(x, ..., rechunk = TRUE, schema = NULL, schema_overrides = NULL) +\method{as_polars_df}{ArrowTabular}( + x, + ..., + rechunk = TRUE, + schema = NULL, + schema_overrides = NULL, + experimental = FALSE +) + +\method{as_polars_df}{RecordBatchReader}(x, ..., experimental = FALSE) \method{as_polars_df}{nanoarrow_array}(x, ...) -\method{as_polars_df}{nanoarrow_array_stream}(x, ...) +\method{as_polars_df}{nanoarrow_array_stream}(x, ..., experimental = FALSE) } \arguments{ \item{x}{Object to convert to a polars DataFrame.} @@ -126,6 +136,9 @@ into the resulting DataFrame. Useful in interactive mode to not lock R session.} \item{rechunk}{A logical flag (default \code{TRUE}). Make sure that all data of each column is in contiguous memory.} + +\item{experimental}{If \code{TRUE}, use experimental Arrow C stream interface inside the function. +This argument is experimental and may be removed in the future.} } \value{ a \link[=DataFrame_class]{DataFrame} @@ -144,28 +157,28 @@ whether the number of rows to fetch is infinite or not. # Convert the row names of a data frame to a column as_polars_df(mtcars, rownames = "car") -# Convert an arrow Table to a polars DataFrame -at = arrow::arrow_table(x = 1:5, y = 6:10) -as_polars_df(at) - -# Convert an arrow Table, with renaming all columns +# Convert a data frame, with renaming all columns as_polars_df( - at, + data.frame(x = 1, y = 2), schema = c("a", "b") ) -# Convert an arrow Table, with renaming and casting all columns +# Convert a data frame, with renaming and casting all columns as_polars_df( - at, + data.frame(x = 1, y = 2), schema = list(b = pl$Int64, a = pl$String) ) -# Convert an arrow Table, with casting some columns +# Convert a data frame, with casting some columns as_polars_df( - at, + data.frame(x = 1, y = 2), schema_overrides = list(y = pl$String) # cast some columns ) +# Convert an arrow Table to a polars DataFrame +at = arrow::arrow_table(x = 1:5, y = 6:10) +as_polars_df(at) + # Create a polars DataFrame from a data.frame lf = as_polars_df(mtcars)$lazy() diff --git a/man/as_polars_series.Rd b/man/as_polars_series.Rd index e34b23888..6cffd5ccd 100644 --- a/man/as_polars_series.Rd +++ b/man/as_polars_series.Rd @@ -12,6 +12,7 @@ \alias{as_polars_series.vctrs_rcrd} \alias{as_polars_series.Array} \alias{as_polars_series.ChunkedArray} +\alias{as_polars_series.RecordBatchReader} \alias{as_polars_series.nanoarrow_array} \alias{as_polars_series.nanoarrow_array_stream} \alias{as_polars_series.clock_time_point} @@ -42,9 +43,11 @@ as_polars_series(x, name = NULL, ...) \method{as_polars_series}{ChunkedArray}(x, name = NULL, ..., rechunk = TRUE) +\method{as_polars_series}{RecordBatchReader}(x, name = NULL, ...) + \method{as_polars_series}{nanoarrow_array}(x, name = NULL, ...) -\method{as_polars_series}{nanoarrow_array_stream}(x, name = NULL, ...) +\method{as_polars_series}{nanoarrow_array_stream}(x, name = NULL, ..., experimental = FALSE) \method{as_polars_series}{clock_time_point}(x, name = NULL, ...) @@ -64,6 +67,9 @@ will be used if \code{x} has no name.} \item{...}{Additional arguments passed to methods.} \item{rechunk}{A logical flag (default \code{TRUE}). Make sure that all data is in contiguous memory.} + +\item{experimental}{If \code{TRUE}, use experimental Arrow C stream interface inside the function. +This argument is experimental and may be removed in the future.} } \value{ a \link[=Series_class]{Series} diff --git a/src/rust/src/arrow_interop/mod.rs b/src/rust/src/arrow_interop/mod.rs index 256ef94e0..4aae05124 100644 --- a/src/rust/src/arrow_interop/mod.rs +++ b/src/rust/src/arrow_interop/mod.rs @@ -1,5 +1,7 @@ pub mod to_rust; +use polars_core::utils::arrow; + use extendr_api::prelude::*; use std::result::Result; @@ -61,3 +63,15 @@ impl RPackage for NanoArrowRPackage { "#) } } + +#[extendr] +pub fn polars_allocate_array_stream() -> Robj { + let aas = Box::new(arrow::ffi::ArrowArrayStream::empty()); + let x = Box::into_raw(aas); + format!("{:?}", x as usize).into() +} + +extendr_module! { + mod arrow_interop; + fn polars_allocate_array_stream; +} diff --git a/src/rust/src/arrow_interop/to_rust.rs b/src/rust/src/arrow_interop/to_rust.rs index a38126256..2dfe6ca1d 100644 --- a/src/rust/src/arrow_interop/to_rust.rs +++ b/src/rust/src/arrow_interop/to_rust.rs @@ -1,4 +1,4 @@ -use crate::rpolarserr::*; +use super::RArrowArrayClass; use extendr_api::prelude::*; use polars::prelude as pl; use polars_core::export::rayon::prelude::*; @@ -8,8 +8,6 @@ use polars_core::utils::arrow::ffi; use polars_core::POOL; use std::result::Result; -use super::RArrowArrayClass; - pub fn arrow_array_to_rust(arrow_array: Robj) -> Result { let mut array = Box::new(ffi::ArrowArray::empty()); let mut schema = Box::new(ffi::ArrowSchema::empty()); @@ -105,52 +103,3 @@ pub unsafe fn to_rust_df(rb: Robj) -> Result { let dfs = crate::utils::collect_hinted_result(rb_len, dfs_iter)?; Ok(accumulate_dataframes_vertical_unchecked(dfs)) } - -// r-polars as consumer 1: create a new stream and wrap pointer in Robj as str. -pub fn new_arrow_stream_internal() -> Robj { - let aas = Box::new(ffi::ArrowArrayStream::empty()); - let x = Box::leak(aas); // leak box to make lifetime static - let x = x as *mut ffi::ArrowArrayStream; - crate::utils::usize_to_robj_str(x as usize) -} - -// r-polars as consumer 2: recieve to pointer to own stream, which producer has exported to. Consume it. Return Series. -pub fn arrow_stream_to_series_internal(robj_str: Robj) -> RResult { - // reclaim ownership of leaked box, and then drop/release it when consumed. - let us = crate::utils::robj_str_ptr_to_usize(&robj_str)?; - let boxed_stream = unsafe { Box::from_raw(us as *mut ffi::ArrowArrayStream) }; - - //consume stream and produce a r-polars Series return as Robj - let s = consume_arrow_stream_to_series(boxed_stream)?; - Ok(s) -} - -// implementation of consuming stream to Series. Stream is drop/released hereafter. -fn consume_arrow_stream_to_series(boxed_stream: Box) -> RResult { - let mut iter = unsafe { ffi::ArrowArrayStreamReader::try_new(boxed_stream) }?; - - //import first array into pl::Series - let mut s = if let Some(array_res) = unsafe { iter.next() } { - let array = array_res?; - let series_res: pl::PolarsResult = - std::convert::TryFrom::try_from(("df", array)); - - series_res.map_err(polars_to_rpolars_err)? - } else { - rerr() - .plain("Arrow array stream was empty") - .hint("producer did not export to stream") - .when("consuming arrow array stream")?; - unreachable!(); - }; - - // append any other arrays to Series - while let Some(array_res) = unsafe { iter.next() } { - let array = array_res?; - let series_res: pl::PolarsResult = - std::convert::TryFrom::try_from(("df", array)); - let series = series_res.map_err(polars_to_rpolars_err)?; - s.append(&series).map_err(polars_to_rpolars_err)?; - } - Ok(s) -} diff --git a/src/rust/src/lib.rs b/src/rust/src/lib.rs index 15f6b6396..f9208d302 100644 --- a/src/rust/src/lib.rs +++ b/src/rust/src/lib.rs @@ -44,6 +44,7 @@ pub use crate::rbackground::RBGPOOL; #[cfg(not(feature = "sql"))] extendr_module! { mod polars; + use arrow_interop; use rlib; use concat; use rdataframe; @@ -58,6 +59,7 @@ extendr_module! { #[cfg(feature = "sql")] extendr_module! { mod polars; + use arrow_interop; use rlib; use concat; use rdataframe; diff --git a/src/rust/src/rlib.rs b/src/rust/src/rlib.rs index 377f666cb..c6500ab5a 100644 --- a/src/rust/src/rlib.rs +++ b/src/rust/src/rlib.rs @@ -1,7 +1,5 @@ use crate::lazy::dsl::RPolarsExpr; -use crate::rdataframe::RPolarsDataFrame; use crate::robj_to; -use crate::series::RPolarsSeries; use crate::utils::extendr_concurrent::{ParRObj, ThreadCom}; use crate::utils::robj_to_rchoice; use crate::RFnSignature; @@ -195,28 +193,7 @@ fn struct_(exprs: Robj, eager: Robj, schema: Robj) -> Result { } } -#[extendr] -fn new_arrow_stream() -> Robj { - crate::arrow_interop::to_rust::new_arrow_stream_internal() -} use crate::rpolarserr::*; -#[extendr] -fn arrow_stream_to_df(robj_str: Robj) -> RResult { - let s = crate::arrow_interop::to_rust::arrow_stream_to_series_internal(robj_str)?; - let ca = s - .struct_() - .map_err(polars_to_rpolars_err) - .when("unpack struct from producer") - .hint("producer exported a plain Series not a Struct series")?; - let df: pl::DataFrame = ca.clone().into(); - Ok(RPolarsDataFrame(df).into_robj()) -} - -#[extendr] -fn arrow_stream_to_series(robj_str: Robj) -> RResult { - let s = crate::arrow_interop::to_rust::arrow_stream_to_series_internal(robj_str)?; - Ok(RPolarsSeries(s).into_robj()) -} #[extendr] pub fn dtype_str_repr(dtype: Robj) -> RResult { @@ -459,11 +436,6 @@ extendr_module! { fn dtype_str_repr; - // arrow conversions - fn new_arrow_stream; - fn arrow_stream_to_df; - fn arrow_stream_to_series; - //robj meta fn mem_address; fn clone_robj; diff --git a/src/rust/src/series.rs b/src/rust/src/series.rs index f56e1519f..2efbe8306 100644 --- a/src/rust/src/series.rs +++ b/src/rust/src/series.rs @@ -575,11 +575,22 @@ impl RPolarsSeries { } } - pub fn from_arrow_array_stream_str(name: Robj, robj_str: Robj) -> RResult { + pub fn import_stream(name: Robj, stream_ptr: Robj) -> RResult { let name = robj_to!(str, name)?; - let s = crate::arrow_interop::to_rust::arrow_stream_to_series_internal(robj_str)? - .with_name(name); - Ok(RPolarsSeries(s).into_robj()) + let stream_in_ptr_addr = robj_to!(usize, stream_ptr)?; + let stream_in_ptr = + unsafe { Box::from_raw(stream_in_ptr_addr as *mut arrow::ffi::ArrowArrayStream) }; + + let mut stream = unsafe { arrow::ffi::ArrowArrayStreamReader::try_new(stream_in_ptr)? }; + let mut arrays: Vec> = Vec::new(); + while let Some(array_res) = unsafe { stream.next() } { + arrays.push(array_res?); + } + + let chunks = arrays.into_iter().collect::>(); + let s = pl::Series::try_from((name, chunks)).map_err(polars_to_rpolars_err)?; + + Ok(s.into()) } pub fn from_arrow_array_robj(name: Robj, array: Robj) -> Result { diff --git a/src/rust/src/utils/mod.rs b/src/rust/src/utils/mod.rs index f3069318c..cbf9d5845 100644 --- a/src/rust/src/utils/mod.rs +++ b/src/rust/src/utils/mod.rs @@ -1208,19 +1208,3 @@ pub fn collect_hinted_result_rerr( } Ok(new_vec) } - -//keep error simple to interface with other libs -pub fn robj_str_ptr_to_usize(robj: &Robj) -> RResult { - || -> RResult { - let str: &str = robj - .as_str() - .ok_or(RPolarsErr::new().plain("robj str ptr not a str".into()))?; - let us: usize = str.parse()?; - Ok(us) - }() - .when("converting robj str pointer to usize") -} - -pub fn usize_to_robj_str(us: usize) -> Robj { - format!("{us}").into() -} diff --git a/tests/testthat/_snaps/after-wrappers.md b/tests/testthat/_snaps/after-wrappers.md index adc76eee3..5c4691bd2 100644 --- a/tests/testthat/_snaps/after-wrappers.md +++ b/tests/testthat/_snaps/after-wrappers.md @@ -705,32 +705,23 @@ Code ls(.pr[[private_key]]) Output - [1] "add" "alias" - [3] "all" "any" - [5] "append_mut" "arg_max" - [7] "arg_min" "chunk_lengths" - [9] "clear" "clone" - [11] "compare" "div" - [13] "dtype" "equals" - [15] "export_stream" "fast_explode_flag" - [17] "from_arrow_array_robj" "from_arrow_array_stream_str" - [19] "get_fmt" "is_sorted" - [21] "is_sorted_flag" "is_sorted_reverse_flag" - [23] "len" "map_elements" - [25] "max" "mean" - [27] "median" "min" - [29] "mul" "n_chunks" - [31] "n_unique" "name" - [33] "new" "panic" - [35] "print" "rem" - [37] "rename_mut" "rep" - [39] "set_sorted_mut" "shape" - [41] "sleep" "sort" - [43] "std" "struct_fields" - [45] "sub" "sum" - [47] "to_fmt_char" "to_frame" - [49] "to_r" "value_counts" - [51] "var" + [1] "add" "alias" "all" + [4] "any" "append_mut" "arg_max" + [7] "arg_min" "chunk_lengths" "clear" + [10] "clone" "compare" "div" + [13] "dtype" "equals" "export_stream" + [16] "fast_explode_flag" "from_arrow_array_robj" "get_fmt" + [19] "import_stream" "is_sorted" "is_sorted_flag" + [22] "is_sorted_reverse_flag" "len" "map_elements" + [25] "max" "mean" "median" + [28] "min" "mul" "n_chunks" + [31] "n_unique" "name" "new" + [34] "panic" "print" "rem" + [37] "rename_mut" "rep" "set_sorted_mut" + [40] "shape" "sleep" "sort" + [43] "std" "struct_fields" "sub" + [46] "sum" "to_fmt_char" "to_frame" + [49] "to_r" "value_counts" "var" # public and private methods of each class RThreadHandle diff --git a/tests/testthat/test-arrow-c-interface.R b/tests/testthat/test-arrow-c-interface.R new file mode 100644 index 000000000..5d56bbefd --- /dev/null +++ b/tests/testthat/test-arrow-c-interface.R @@ -0,0 +1,38 @@ +patrick::with_parameters_test_that("round trip arrow array stream", + { + s_in = as_polars_series(.vec) + + ptr_stream = polars_allocate_array_stream() + .pr$Series$export_stream(s_in, ptr_stream, TRUE) + s_out = .pr$Series$import_stream("", ptr_stream) |> + unwrap() + + expect_true( + s_in$equals(s_out) + ) + + skip_if_not_installed("nanoarrow") + expect_true( + s_in$equals( + s_in |> + nanoarrow::as_nanoarrow_array_stream(future = TRUE) |> + as_polars_series() + ) + ) + + expect_true( + as_polars_df(.vec)$equals( + as_polars_df(.vec) |> + nanoarrow::as_nanoarrow_array_stream(future = TRUE) |> + as_polars_df() + ) + ) + }, + .vec = list( + 1:5, + letters[1:5], + rep(TRUE, 5), + as.factor(letters[1:5]), + mtcars[1:5, ] + ) +) diff --git a/tests/testthat/test-arrow_extendr_polars.R b/tests/testthat/test-arrow_extendr_polars.R deleted file mode 100644 index 956d16328..000000000 --- a/tests/testthat/test-arrow_extendr_polars.R +++ /dev/null @@ -1,70 +0,0 @@ -test_that("rust-polars DataFrame import/export via arrow stream", { - # this round trip conversion is only a unit test, not an integration test. - # Arrow export/import of DataFrame is mainly useful to interface with other R packages using - # rust-polars - - # see https://github.com/rpolars/extendrpolarsexamples/blob/main/src/rust/src/lib.rs - # for simple example of use to import/export polars DataFrames to another rust-polars - # compilation unit in another R package. Version of rust-polars does not have to match. - - # These function are not a part of the public user API. But package developer can use them to - # import/export df's. - - # ARROW STREAM HAS AN CONTRACT TO UPHOLD BY PRODUCER AND CONSUMER. WRONG BEHAVOIR CAUSES SEGFAULT. - # SEE OUTCOMMENTED EXAMPLES OF ILLEGAL BEHAVIOR LEADING TO SEGFAULT BELOW. - - # PRODUCER has some df which could be chunked as here. Categoricals with global string cache - # are also ok. - # pl$with_string_cache({ - # df_export = pl$concat(lapply(1:3, \(i) pl$DataFrame(iris))) - # }) - - # CONSUMER creates a new arrow stream and return ptr which is passed to PRODUCER - # str_ptr = new_arrow_stream() - - # PRODUCER exports the df into CONSUMERs stream - # export_df_to_arrow_stream(df_export, str_ptr) |> unwrap() - - # CONSUMER can now import the df from stream - # pl$with_string_cache({ - # df_import = arrow_stream_to_df(str_ptr) |> unwrap() - # }) - - # check imported/exported df's are identical - # expect_identical(df_import$to_list(), df_export$to_list()) - - ## UNSAFE / Undefined behavior / will blow up eventually / STUFF NOT TO DO - # examples below of did segfault ~every 5-10th time, during development - - # 1: DO NOT EXPORT TO STREAM MORE THAN ONCE - # new DataFrame can be exported to stream, but only the latest # BUT THIS SEGFAULTs sometimes - # export_df_to_arrow_stream(df_export, str_ptr) |> unwrap() - # export_df_to_arrow_stream(pl$DataFrame(mtcars), str_ptr) |> unwrap() - # mtcars_import = arrow_stream_to_df(str_ptr) |> unwrap() - - # 2: DO NOT IMPORT FROM STREAM MORE THAN ONCE - # reading from released(exhuasted) stream results in error most times - # BUT THIS SEGFAULTs sometimes - # ctx = arrow_stream_to_df(str_ptr)$err$contexts() - # expect_equal( - # ctx$PlainErrorMessage, - # r"{InvalidArgumentError("The C stream was already released")}" - # ) - - # 3: DO NOT IMPORT/EXPORT ARROW STREAM ACROSS PROCESSES (use IPC for that, see $map() docs) - # background process willSEGFAULT HERE - # str_ptr = new_arrow_stream() - # rsess = callr::r_bg(func = \(str_ptr) { - # library(polars) - # pl$with_string_cache({ - # df_export = pl$concat(lapply(1:3, \(i) pl$DataFrame(iris))) - # }) - # polars:::export_df_to_arrow_stream(df_export, str_ptr) - # },args = list(str_ptr=str_ptr)) - # - # Sys.sleep(3) - # df_import = arrow_stream_to_df(str_ptr) - # print(df_import) - # str_ptr = new_arrow_stream() - # rsess$get_result() -}) diff --git a/tests/testthat/test-as_polars.R b/tests/testthat/test-as_polars.R index 8e727d9b8..7f7f4350e 100644 --- a/tests/testthat/test-as_polars.R +++ b/tests/testthat/test-as_polars.R @@ -5,45 +5,45 @@ test_df = data.frame( "col_lgl" = rep_len(c(TRUE, FALSE, NA), 10) ) -if (requireNamespace("arrow", quietly = TRUE) && requireNamespace("nanoarrow", quietly = TRUE)) { - make_as_polars_df_cases = function() { - tibble::tribble( - ~.test_name, ~x, - "data.frame", test_df, - "polars_lf", pl$LazyFrame(test_df), - "polars_group_by", pl$DataFrame(test_df)$group_by("col_int"), - "polars_lazy_group_by", pl$LazyFrame(test_df)$group_by("col_int"), - "polars_rolling_group_by", pl$DataFrame(test_df)$rolling("col_int", period = "1i"), - "polars_lazy_rolling_group_by", pl$LazyFrame(test_df)$rolling("col_int", period = "1i"), - "polars_group_by_dynamic", pl$DataFrame(test_df)$group_by_dynamic("col_int", every = "1i"), - "polars_lazy_group_by_dynamic", pl$LazyFrame(test_df)$group_by_dynamic("col_int", every = "1i"), - "arrow Table", arrow::as_arrow_table(test_df), - "arrow RecordBatch", arrow::as_record_batch(test_df), - "nanoarrow_array", nanoarrow::as_nanoarrow_array(test_df), - "nanoarrow_array_stream", nanoarrow::as_nanoarrow_array_stream(test_df), - ) - } - - patrick::with_parameters_test_that( - "as_polars_df S3 methods", - { - pl_df = as_polars_df(x) - expect_s3_class(pl_df, "RPolarsDataFrame") - - if (inherits(x, "nanoarrow_array_stream")) { - # The stream should be released after conversion - expect_grepl_error(x$get_next(), "already been released") - } - actual = as.data.frame(pl_df) - expected = as.data.frame(pl$DataFrame(test_df)) +make_as_polars_df_cases = function() { + skip_if_not_installed("arrow") + skip_if_not_installed("nanoarrow") - expect_equal(actual, expected) - }, - .cases = make_as_polars_df_cases() + tibble::tribble( + ~.test_name, ~x, + "data.frame", test_df, + "polars_lf", pl$LazyFrame(test_df), + "polars_group_by", pl$DataFrame(test_df)$group_by("col_int"), + "polars_lazy_group_by", pl$LazyFrame(test_df)$group_by("col_int"), + "polars_rolling_group_by", pl$DataFrame(test_df)$rolling("col_int", period = "1i"), + "polars_lazy_rolling_group_by", pl$LazyFrame(test_df)$rolling("col_int", period = "1i"), + "polars_group_by_dynamic", pl$DataFrame(test_df)$group_by_dynamic("col_int", every = "1i"), + "polars_lazy_group_by_dynamic", pl$LazyFrame(test_df)$group_by_dynamic("col_int", every = "1i"), + "arrow Table", arrow::as_arrow_table(test_df), + "arrow RecordBatch", arrow::as_record_batch(test_df), + "arrow RecordBatchReader", arrow::as_record_batch_reader(test_df), + "nanoarrow_array", nanoarrow::as_nanoarrow_array(test_df), + "nanoarrow_array_stream", nanoarrow::as_nanoarrow_array_stream(test_df), ) } +patrick::with_parameters_test_that( + "as_polars_df S3 methods", + { + pl_df = as_polars_df(x) + expect_s3_class(pl_df, "RPolarsDataFrame") + + if (inherits(x, "nanoarrow_array_stream")) { + # The stream should be released after conversion + expect_grepl_error(x$get_next(), "already been released") + } + + expect_equal(as.data.frame(pl_df), as.data.frame(as_polars_df(test_df))) + }, + .cases = make_as_polars_df_cases() +) + test_that("as_polars_lf S3 method", { skip_if_not_installed("arrow") @@ -123,48 +123,49 @@ test_that("schema option and schema_overrides for as_polars_df.data.frame", { }) -if (requireNamespace("arrow", quietly = TRUE) && requireNamespace("nanoarrow", quietly = TRUE)) { - make_as_polars_series_cases = function() { - tibble::tribble( - ~.test_name, ~x, ~expected_name, - "vector", 1, "", - "Series", as_polars_series(1, "foo"), "foo", - "Expr", pl$lit(1)$alias("foo"), "foo", - "Then", pl$when(TRUE)$then(1), "literal", - "ChainedThen", pl$when(FALSE)$then(0)$when(TRUE)$then(1), "literal", - "list", list(1:4), "", - "data.frame", data.frame(x = 1, y = letters[1]), "", - "POSIXlt", as.POSIXlt("1900-01-01"), "", - "arrow Array", arrow::arrow_array(1), "", - "arrow ChunkedArray", arrow::chunked_array(1), "", - "nanoarrow_array", nanoarrow::as_nanoarrow_array(1), "", - "nanoarrow_array_stream", nanoarrow::as_nanoarrow_array_stream(data.frame(x = 1)), "", - ) - } +make_as_polars_series_cases = function() { + skip_if_not_installed("arrow") + skip_if_not_installed("nanoarrow") - patrick::with_parameters_test_that( - "as_polars_series S3 methods", - { - pl_series = as_polars_series(x) - expect_s3_class(pl_series, "RPolarsSeries") + tibble::tribble( + ~.test_name, ~x, ~expected_name, + "vector", 1, "", + "Series", as_polars_series(1, "foo"), "foo", + "Expr", pl$lit(1)$alias("foo"), "foo", + "Then", pl$when(TRUE)$then(1), "literal", + "ChainedThen", pl$when(FALSE)$then(0)$when(TRUE)$then(1), "literal", + "list", list(1:4), "", + "data.frame", data.frame(x = 1, y = letters[1]), "", + "POSIXlt", as.POSIXlt("1900-01-01"), "", + "arrow Array", arrow::arrow_array(1), "", + "arrow ChunkedArray", arrow::chunked_array(1), "", + "nanoarrow_array", nanoarrow::as_nanoarrow_array(1), "", + "nanoarrow_array_stream", nanoarrow::as_nanoarrow_array_stream(data.frame(x = 1)), "", + ) +} - expect_identical(length(pl_series), 1L) - expect_equal(pl_series$name, expected_name) +patrick::with_parameters_test_that( + "as_polars_series S3 methods", + { + pl_series = as_polars_series(x) + expect_s3_class(pl_series, "RPolarsSeries") - if (inherits(x, "nanoarrow_array_stream")) { - # The stream should be released after conversion - expect_grepl_error(x$get_next(), "already been released") + expect_identical(length(pl_series), 1L) + expect_equal(pl_series$name, expected_name) - # Re-create the stream for the next test - x = nanoarrow::as_nanoarrow_array_stream(data.frame(x = 1)) - } + if (inherits(x, "nanoarrow_array_stream")) { + # The stream should be released after conversion + expect_grepl_error(x$get_next(), "already been released") - pl_series = as_polars_series(x, name = "bar") - expect_equal(pl_series$name, "bar") - }, - .cases = make_as_polars_series_cases() - ) -} + # Re-create the stream for the next test + x = nanoarrow::as_nanoarrow_array_stream(data.frame(x = 1)) + } + + pl_series = as_polars_series(x, name = "bar") + expect_equal(pl_series$name, "bar") + }, + .cases = make_as_polars_series_cases() +) test_that("tests for vctrs_rcrd", { @@ -234,7 +235,6 @@ test_that("from arrow Table and ChunkedArray", { unname(as.list(at)) ) - # no rechunk expect_identical( lapply(at$columns, \(x) length(as_polars_series.ChunkedArray(x, rechunk = FALSE)$chunk_lengths())), lapply(at$columns, \(x) x$num_chunks) @@ -261,8 +261,7 @@ test_that("from arrow Table and ChunkedArray", { lapply(at$columns, \(x) x$num_chunks) ) - - # #not supported yet + # not supported yet # #chunked data with factors l = list( df1 = data.frame(factor = factor(c("apple", "apple", "banana"))), @@ -337,6 +336,8 @@ test_that("can convert an arrow Table contains dictionary }) make_nanoarrow_array_stream_cases = function() { + skip_if_not_installed("nanoarrow") + tibble::tribble( ~.test_name, ~x, "two chunks", nanoarrow::basic_array_stream(list(data.frame(a = 1, b = 2), data.frame(a = NA, b = 1))), @@ -346,8 +347,6 @@ make_nanoarrow_array_stream_cases = function() { patrick::with_parameters_test_that("as_polars_df for nanoarrow_array_stream", { - skip_if_not_installed("nanoarrow") - pl_df = as_polars_df(x) expect_s3_class(pl_df, "RPolarsDataFrame") expect_grepl_error(x$get_next(), "already been released") @@ -359,8 +358,6 @@ patrick::with_parameters_test_that("as_polars_df for nanoarrow_array_stream", patrick::with_parameters_test_that("as_polars_series for nanoarrow_array_stream", { - skip_if_not_installed("nanoarrow") - pl_series = as_polars_series(x) expect_s3_class(pl_series, "RPolarsSeries") expect_grepl_error(x$get_next(), "already been released") @@ -465,3 +462,63 @@ test_that("as_polars_df and pl$DataFrame for data.frame has list column", { as_polars_df(data)$dtypes[[1]] == pl$List(pl$Struct(b = pl$Int32)) ) }) + + +# TODO: This behavior is bug or intended? (upstream) +# If this is a bug, this behavior may be changed in the future. +test_that("automatically rechunked for struct array stream from C stream interface", { + skip_if_not_installed("nanoarrow") + + s_int_exp = nanoarrow::basic_array_stream( + list( + nanoarrow::as_nanoarrow_array(1:5), + nanoarrow::as_nanoarrow_array(6:10) + ) + ) |> + as_polars_series(experimental = TRUE) + + s_struct_exp = nanoarrow::basic_array_stream( + list( + nanoarrow::as_nanoarrow_array(mtcars[1:5, ]), + nanoarrow::as_nanoarrow_array(mtcars[6:10, ]) + ) + ) |> + as_polars_series(experimental = TRUE) + + s_struct_stable = nanoarrow::basic_array_stream( + list( + nanoarrow::as_nanoarrow_array(mtcars[1:5, ]), + nanoarrow::as_nanoarrow_array(mtcars[6:10, ]) + ) + ) |> + as_polars_series() + + expect_identical(s_int_exp$n_chunks(), 2) + expect_identical(s_struct_exp$n_chunks(), 1) + expect_identical(s_struct_stable$n_chunks(), 2) +}) + + +make_as_polars_df_experimental_cases = function() { + skip_if_not_installed("arrow") + skip_if_not_installed("nanoarrow") + + tibble::tribble( + ~.test_name, ~x, + "arrow Table", arrow::as_arrow_table(test_df), + "arrow RecordBatch", arrow::as_record_batch(test_df), + "arrow RecordBatchReader", arrow::as_record_batch_reader(test_df), + "nanoarrow_array_stream", nanoarrow::as_nanoarrow_array_stream(test_df), + ) +} + +patrick::with_parameters_test_that( + "as_polars_df S3 methods with experimental option", + { + pl_df = as_polars_df(x, experimental = TRUE) + expect_s3_class(pl_df, "RPolarsDataFrame") + + expect_equal(as.data.frame(pl_df), as.data.frame(as_polars_df(test_df))) + }, + .cases = make_as_polars_df_experimental_cases() +)