From 774a5d6b6e6b7f3ad59c0dde613136252854f7a3 Mon Sep 17 00:00:00 2001 From: Cory Grinstead Date: Mon, 19 Aug 2024 13:20:15 -0500 Subject: [PATCH] [FEAT]: add to_arrow_iter (#2681) closes https://github.com/Eventual-Inc/Daft/issues/2679 --- daft/dataframe/dataframe.py | 21 +++++++++++++++++++++ tests/table/test_from_py.py | 6 ++++++ 2 files changed, 27 insertions(+) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index bc818e2ec3..2ec299ddcb 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -245,6 +245,27 @@ def __iter__(self) -> Iterator[Dict[str, Any]]: row = {key: value[i] for (key, value) in pydict.items()} yield row + @DataframePublicAPI + def to_arrow_iter(self, results_buffer_size: Optional[int] = 1) -> Iterator["pyarrow.Table"]: + """ + Return an iterator of pyarrow tables for this dataframe. + """ + if results_buffer_size is not None and not results_buffer_size > 0: + raise ValueError(f"Provided `results_buffer_size` value must be > 0, received: {results_buffer_size}") + if self._result is not None: + # If the dataframe has already finished executing, + # use the precomputed results. + yield self.to_arrow() + + else: + # Execute the dataframe in a streaming fashion. + context = get_context() + partitions_iter = context.runner().run_iter_tables(self._builder, results_buffer_size) + + # Iterate through partitions. + for partition in partitions_iter: + yield partition.to_arrow() + @DataframePublicAPI def iter_partitions( self, results_buffer_size: Optional[int] = 1 diff --git a/tests/table/test_from_py.py b/tests/table/test_from_py.py index 216b66dae2..b8939d630d 100644 --- a/tests/table/test_from_py.py +++ b/tests/table/test_from_py.py @@ -664,3 +664,9 @@ def __iter__(self): table = daft.from_arrow(my_iter) tbl = table.to_pydict() assert tbl == {"text": ["foo1", "bar2", "foo2", "bar2", "foo3", "bar3"]} + + +def test_to_arrow_iterator() -> None: + df = daft.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}) + it = df.to_arrow_iter() + assert isinstance(next(it), pa.Table)