Skip to content

Commit

Permalink
ARROW-10416: [R] Support Tables in Flight
Browse files Browse the repository at this point in the history
In addition to the feature described in the title, this PR

* renames `push_data()` to `flight_put()` and adds an `overwrite` argument to optionally check for the existence of a flight with the the same name
* adds `list_flights()` and `flight_path_exists()` to see available resources
* adds tests for the flight bindings, run if the demo flight server is found to be running (managing that process and/or running this in CI is deferred)
* adds `r_to_py` and `py_to_r` methods for Schema objects; this fixes an issue where schema metadata was lost when converting Python/R due to the indirect way that Tables are passed

Closes apache#9039 from nealrichardson/flight-tables

Authored-by: Neal Richardson <[email protected]>
Signed-off-by: Neal Richardson <[email protected]>
  • Loading branch information
nealrichardson committed Dec 30, 2020
1 parent 36d80e3 commit 635f12b
Show file tree
Hide file tree
Showing 14 changed files with 235 additions and 42 deletions.
4 changes: 3 additions & 1 deletion r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ export(fixed_size_binary)
export(fixed_size_list_of)
export(flight_connect)
export(flight_get)
export(flight_path_exists)
export(flight_put)
export(float)
export(float16)
export(float32)
Expand All @@ -233,6 +235,7 @@ export(large_binary)
export(large_list_of)
export(large_utf8)
export(last_col)
export(list_flights)
export(list_of)
export(load_flight_server)
export(map_batches)
Expand All @@ -244,7 +247,6 @@ export(null)
export(num_range)
export(one_of)
export(open_dataset)
export(push_data)
export(read_arrow)
export(read_csv_arrow)
export(read_delim_arrow)
Expand Down
16 changes: 6 additions & 10 deletions r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,13 @@
s3_register(m, cl)
}
}

s3_register("dplyr::tbl_vars", "arrow_dplyr_query")
s3_register("reticulate::py_to_r", "pyarrow.lib.Array")
s3_register("reticulate::py_to_r", "pyarrow.lib.RecordBatch")
s3_register("reticulate::py_to_r", "pyarrow.lib.ChunkedArray")
s3_register("reticulate::py_to_r", "pyarrow.lib.Table")
s3_register("reticulate::r_to_py", "Array")
s3_register("reticulate::r_to_py", "RecordBatch")
s3_register("reticulate::r_to_py", "ChunkedArray")
s3_register("reticulate::r_to_py", "Table")

for (cl in c("Array", "RecordBatch", "ChunkedArray", "Table", "Schema")) {
s3_register("reticulate::py_to_r", paste0("pyarrow.lib.", cl))
s3_register("reticulate::r_to_py", cl)
}

invisible()
}

Expand Down Expand Up @@ -128,4 +125,3 @@ ArrowObject <- R6Class("ArrowObject",
all.equal.ArrowObject <- function(target, current, ..., check.attributes = TRUE) {
target$Equals(current, check_metadata = check.attributes)
}

4 changes: 4 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 54 additions & 14 deletions r/R/flight.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,41 +41,81 @@ flight_connect <- function(host = "localhost", port, scheme = "grpc+tcp") {
#' Send data to a Flight server
#'
#' @param client `pyarrow.flight.FlightClient`, as returned by [flight_connect()]
#' @param data `data.frame` or [RecordBatch] to upload
#' @param data `data.frame`, [RecordBatch], or [Table] to upload
#' @param path string identifier to store the data under
#' @param overwrite logical: if `path` exists on `client` already, should we
#' replace it with the contents of `data`? Default is `TRUE`; if `FALSE` and
#' `path` exists, the function will error.
#' @return `client`, invisibly.
#' @export
push_data <- function(client, data, path) {
if (inherits(data, "data.frame")) {
data <- record_batch(data)
flight_put <- function(client, data, path, overwrite = TRUE) {
if (!overwrite && flight_path_exists(client, path)) {
stop(path, " exists.", call. = FALSE)
}
if (is.data.frame(data)) {
data <- Table$create(data)
}
# TODO: this is only RecordBatch; handle Table
py_data <- reticulate::r_to_py(data)
writer <- client$do_put(descriptor_for_path(path), py_data$schema)[[1]]
writer$write_batch(py_data)
if (inherits(data, "RecordBatch")) {
writer$write_batch(py_data)
} else {
writer$write_table(py_data)
}
writer$close()
invisible(client)
}

#' Get data from a Flight server
#'
#' @param client `pyarrow.flight.FlightClient`, as returned by [flight_connect()]
#' @param path string identifier under which the data is stored
#' @return A [RecordBatch]
#' @param path string identifier under which data is stored
#' @return A [Table]
#' @export
flight_get <- function(client, path) {
reader <- flight_reader(client, path)
reader$read_all()
}

# TODO: could use this as a RecordBatch iterator, call $read_chunk() on this
flight_reader <- function(client, path) {
info <- client$get_flight_info(descriptor_for_path(path))
# Hack: assume a single ticket, on the same server as client is already connected
ticket <- info$endpoints[[1]]$ticket
reader <- client$do_get(ticket)
# Next hack: assume a single record batch
# TODO: read_all() instead? Or read all chunks and build Table in R?
chunk <- reader$read_chunk()
# Drop $app_metadata and just return the data
chunk$data
client$do_get(ticket)
}

descriptor_for_path <- function(path) {
pa <- reticulate::import("pyarrow")
pa$flight$FlightDescriptor$for_path(path)
}

#' See available resources on a Flight server
#'
#' @inheritParams flight_get
#' @return `list_flights()` returns a character vector of paths.
#' `flight_path_exists()` returns a logical value, the equivalent of `path %in% list_flights()`
#' @export
list_flights <- function(client) {
generator <- client$list_flights()
out <- reticulate::iterate(generator, function(x) as.character(x$descriptor$path[[1]]))
out
}

#' @rdname list_flights
#' @export
flight_path_exists <- function(client, path) {
it_exists <- tryCatch({
client$get_flight_info(descriptor_for_path(path))
TRUE
},
error = function(e) {
msg <- conditionMessage(e)
if (!any(grepl("ArrowKeyError", msg))) {
# Raise an error if this fails for any reason other than not found
stop(e)
}
FALSE
}
)
}
25 changes: 23 additions & 2 deletions r/R/python.R
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ py_to_r.pyarrow.lib.ChunkedArray <- function(x, ...) {
r_to_py.Table <- function(x, convert = FALSE) {
# Import with convert = FALSE so that `_import_from_c` returns a Python object
pa <- reticulate::import("pyarrow", convert = FALSE)
out <- pa$Table$from_arrays(x$columns, names = names(x))
out <- pa$Table$from_arrays(x$columns, schema = x$schema)
# But set the convert attribute on the return object to the requested value
assign("convert", convert, out)
out
Expand All @@ -100,7 +100,28 @@ py_to_r.pyarrow.lib.Table <- function(x, ...) {
colnames <- maybe_py_to_r(x$column_names)
r_cols <- maybe_py_to_r(x$columns)
names(r_cols) <- colnames
Table$create(!!!r_cols)
Table$create(!!!r_cols, schema = maybe_py_to_r(x$schema))
}

py_to_r.pyarrow.lib.Schema <- function(x, ...) {
schema_ptr <- allocate_arrow_schema()
on.exit(delete_arrow_schema(schema_ptr))

x$`_export_to_c`(schema_ptr)
ImportSchema(schema_ptr)
}

r_to_py.Schema <- function(x, convert = FALSE) {
schema_ptr <- allocate_arrow_schema()
on.exit(delete_arrow_schema(schema_ptr))

# Import with convert = FALSE so that `_import_from_c` returns a Python object
pa <- reticulate::import("pyarrow", convert = FALSE)
ExportSchema(x, schema_ptr)
out <- pa$Schema$`_import_from_c`(schema_ptr)
# But set the convert attribute on the return object to the requested value
assign("convert", convert, out)
out
}

maybe_py_to_r <- function(x) {
Expand Down
3 changes: 2 additions & 1 deletion r/_pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ reference:
contents:
- load_flight_server
- flight_connect
- push_data
- flight_get
- flight_put
- list_flights
- title: File systems
contents:
- s3_bucket
Expand Down
4 changes: 2 additions & 2 deletions r/man/flight_get.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions r/man/push_data.Rd → r/man/flight_put.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions r/man/list_flights.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions r/src/py-to-r.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ std::shared_ptr<arrow::RecordBatch> ImportRecordBatch(
return ValueOrStop(arrow::ImportRecordBatch(array, schema));
}

// [[arrow::export]]
std::shared_ptr<arrow::Schema> ImportSchema(
arrow::r::Pointer<struct ArrowSchema> schema) {
return ValueOrStop(arrow::ImportSchema(schema));
}

// [[arrow::export]]
arrow::r::Pointer<struct ArrowSchema> allocate_arrow_schema() { return {}; }

Expand Down
63 changes: 63 additions & 0 deletions r/tests/testthat/test-python-flight.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Assumes:
# * We've already done arrow::install_pyarrow()
# * R -e 'arrow::load_flight_server("demo_flight_server")$DemoFlightServer(port = 8089)$serve()'
# TODO: set up CI job to test this, or some way of running a background process
if (process_is_running("demo_flight_server")) {
client <- flight_connect(port = 8089)
flight_obj <- tempfile()

test_that("flight_path_exists", {
expect_false(flight_path_exists(client, flight_obj))
expect_false(flight_obj %in% list_flights(client))
})

test_that("flight_put", {
flight_put(client, example_data, path = flight_obj)
expect_true(flight_path_exists(client, flight_obj))
expect_true(flight_obj %in% list_flights(client))
})

test_that("flight_get", {
expect_identical(as.data.frame(flight_get(client, flight_obj)), example_data)
})

test_that("flight_put with RecordBatch", {
flight_obj2 <- tempfile()
flight_put(client, RecordBatch$create(example_data), path = flight_obj2)
expect_identical(as.data.frame(flight_get(client, flight_obj2)), example_data)
})

test_that("flight_put with overwrite = FALSE", {
expect_error(
flight_put(client, example_with_times, path = flight_obj, overwrite = FALSE),
"exists"
)
# Default is TRUE so this will overwrite
flight_put(client, example_with_times, path = flight_obj)
expect_identical(as.data.frame(flight_get(client, flight_obj)), example_with_times)
})

} else {
# Kinda hacky, let's put a skipped test here, just so we note that the tests
# didn't run
test_that("Flight tests", {
skip("Flight server is not running")
})
}
Loading

0 comments on commit 635f12b

Please sign in to comment.