From 02f7d566a61abe55a0dd7259238cc907dacb8864 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 28 May 2024 16:21:47 +0200 Subject: [PATCH] Add RecordBatchReader --- arro3-core/src/ffi/from_python/mod.rs | 1 + .../ffi/from_python/record_batch_reader.rs | 15 ++++++ arro3-core/src/lib.rs | 2 + arro3-core/src/record_batch_reader.rs | 51 +++++++++++++++++++ 4 files changed, 69 insertions(+) create mode 100644 arro3-core/src/ffi/from_python/record_batch_reader.rs create mode 100644 arro3-core/src/record_batch_reader.rs diff --git a/arro3-core/src/ffi/from_python/mod.rs b/arro3-core/src/ffi/from_python/mod.rs index a57c173..5f4650f 100644 --- a/arro3-core/src/ffi/from_python/mod.rs +++ b/arro3-core/src/ffi/from_python/mod.rs @@ -3,6 +3,7 @@ pub mod chunked; pub mod ffi_stream; pub mod field; pub mod record_batch; +pub mod record_batch_reader; pub mod schema; pub mod table; pub mod utils; diff --git a/arro3-core/src/ffi/from_python/record_batch_reader.rs b/arro3-core/src/ffi/from_python/record_batch_reader.rs new file mode 100644 index 0000000..e0f1617 --- /dev/null +++ b/arro3-core/src/ffi/from_python/record_batch_reader.rs @@ -0,0 +1,15 @@ +use crate::ffi::from_python::utils::import_arrow_c_stream; +use crate::record_batch_reader::PyRecordBatchReader; +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; +use pyo3::{PyAny, PyResult}; + +impl<'a> FromPyObject<'a> for PyRecordBatchReader { + fn extract(ob: &'a PyAny) -> PyResult { + let stream = import_arrow_c_stream(ob)?; + let stream_reader = arrow::ffi_stream::ArrowArrayStreamReader::try_new(stream) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + + Ok(Self(Some(Box::new(stream_reader)))) + } +} diff --git a/arro3-core/src/lib.rs b/arro3-core/src/lib.rs index 8f47602..f35e46f 100644 --- a/arro3-core/src/lib.rs +++ b/arro3-core/src/lib.rs @@ -7,6 +7,7 @@ pub mod ffi; pub mod field; pub mod interop; pub mod record_batch; +pub mod record_batch_reader; pub mod schema; pub mod table; @@ -26,6 +27,7 @@ fn _rust(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/arro3-core/src/record_batch_reader.rs b/arro3-core/src/record_batch_reader.rs new file mode 100644 index 0000000..336a277 --- /dev/null +++ b/arro3-core/src/record_batch_reader.rs @@ -0,0 +1,51 @@ +use std::ffi::CString; + +use arrow::ffi_stream::FFI_ArrowArrayStream; +use arrow_array::RecordBatchReader; +use pyo3::exceptions::PyIOError; +use pyo3::prelude::*; +use pyo3::types::PyCapsule; + +use crate::error::PyArrowResult; + +/// A wrapper around an [arrow_array::RecordBatchReader] +#[pyclass(module = "arro3.core._rust", name = "RecordBatchReader", subclass)] +pub struct PyRecordBatchReader(pub(crate) Option>); + +impl PyRecordBatchReader { + pub fn into_reader(mut self) -> PyArrowResult> { + let stream = self + .0 + .take() + .ok_or(PyIOError::new_err("Cannot write from closed stream."))?; + Ok(stream) + } +} + +#[pymethods] +impl PyRecordBatchReader { + /// An implementation of the [Arrow PyCapsule + /// Interface](https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html). + /// This dunder method should not be called directly, but enables zero-copy + /// data transfer to other Python libraries that understand Arrow memory. + /// + /// For example, you can call [`pyarrow.table()`][pyarrow.table] to convert this array + /// into a pyarrow table, without copying memory. + fn __arrow_c_stream__( + &mut self, + _requested_schema: Option, + ) -> PyArrowResult { + let reader = self + .0 + .take() + .ok_or(PyIOError::new_err("Cannot read from closed stream"))?; + + let ffi_stream = FFI_ArrowArrayStream::new(reader); + let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); + + Python::with_gil(|py| { + let stream_capsule = PyCapsule::new(py, ffi_stream, Some(stream_capsule_name))?; + Ok(stream_capsule.to_object(py)) + }) + } +}