Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): better async_collect #10912

Merged
merged 8 commits into from
Sep 15, 2023
Merged
5 changes: 5 additions & 0 deletions py-polars/polars/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
_DATAFRAME_API_COMPAT_AVAILABLE = True
_DELTALAKE_AVAILABLE = True
_FSSPEC_AVAILABLE = True
_GEVENT_AVAILABLE = True
_HYPOTHESIS_AVAILABLE = True
_NUMPY_AVAILABLE = True
_PANDAS_AVAILABLE = True
Expand Down Expand Up @@ -155,6 +156,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]:
import dataframe_api_compat
import deltalake
import fsspec
import gevent
import hypothesis
import numpy
import pandas
Expand Down Expand Up @@ -189,6 +191,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]:
if sys.version_info >= (3, 9)
else _lazy_import("backports.zoneinfo")
)
gevent, _GEVENT_AVAILABLE = _lazy_import("gevent")


@lru_cache(maxsize=None)
Expand Down Expand Up @@ -228,6 +231,7 @@ def _check_for_pydantic(obj: Any) -> bool:
"dataframe_api_compat",
"deltalake",
"fsspec",
"gevent",
"numpy",
"pandas",
"pydantic",
Expand All @@ -242,6 +246,7 @@ def _check_for_pydantic(obj: Any) -> bool:
# exported flags/guards
"_DELTALAKE_AVAILABLE",
"_FSSPEC_AVAILABLE",
"_GEVENT_AVAILABLE",
"_HYPOTHESIS_AVAILABLE",
"_NUMPY_AVAILABLE",
"_PANDAS_AVAILABLE",
Expand Down
102 changes: 76 additions & 26 deletions py-polars/polars/functions/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import polars._reexport as pl
import polars.functions as F
from polars.datatypes import DTYPE_TEMPORAL_UNITS, Date, Datetime, Int64
from polars.utils._async import _AsyncDataFrameResult
from polars.utils._async import _AioDataFrameResult, _GeventDataFrameResult
from polars.utils._parse_expr_input import (
parse_as_expression,
parse_as_list_of_expressions,
Expand All @@ -23,8 +23,7 @@


if TYPE_CHECKING:
from queue import Queue
from typing import Collection, Literal
from typing import Awaitable, Collection, Literal

from polars import DataFrame, Expr, LazyFrame, Series
from polars.type_aliases import (
Expand Down Expand Up @@ -1652,10 +1651,46 @@ def collect_all(
return result


@overload
def collect_all_async(
lazy_frames: Sequence[LazyFrame],
*,
gevent: Literal[True],
type_coercion: bool = True,
predicate_pushdown: bool = True,
projection_pushdown: bool = True,
simplify_expression: bool = True,
no_optimization: bool = True,
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
streaming: bool = True,
) -> _GeventDataFrameResult[list[DataFrame]]:
...


@overload
def collect_all_async(
lazy_frames: Sequence[LazyFrame],
*,
gevent: Literal[False] = False,
type_coercion: bool = True,
predicate_pushdown: bool = True,
projection_pushdown: bool = True,
simplify_expression: bool = True,
no_optimization: bool = False,
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
streaming: bool = False,
) -> Awaitable[list[DataFrame]]:
...


def collect_all_async(
lazy_frames: Sequence[LazyFrame],
queue: Queue[list[DataFrame] | Exception],
*,
gevent: bool = False,
type_coercion: bool = True,
predicate_pushdown: bool = True,
projection_pushdown: bool = True,
Expand All @@ -1665,33 +1700,46 @@ def collect_all_async(
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
streaming: bool = False,
) -> _AsyncDataFrameResult[list[DataFrame]]:
) -> Awaitable[list[DataFrame]] | _GeventDataFrameResult[list[DataFrame]]:
"""
Collect multiple LazyFrames at the same time asynchronously in thread pool.

Collects into a list of DataFrame, like :func:`polars.collect_all`
but instead of returning them directly its collected inside thread pool
and gets put into `queue` with `put_nowait` method,
while this method returns almost instantly.
Collects into a list of DataFrame (like :func:`polars.collect_all`),
but instead of returning them directly, they are scheduled to be collected
inside thread pool, while this method returns almost instantly.

May be useful if you use gevent or asyncio and want to release control to other
greenlets/tasks while LazyFrames are being collected.
You must use correct queue in that case.
Given `queue` must be thread safe!

For gevent use
[`gevent.queue.Queue`](https://www.gevent.org/api/gevent.queue.html#gevent.queue.Queue).

For asyncio
[`asyncio.queues.Queue`](https://docs.python.org/3/library/asyncio-queue.html#queue)
can not be used, since it's not thread safe!
For that purpose use [janus](https://github.com/aio-libs/janus) library.
Parameters
----------
lazy_frames
A list of LazyFrames to collect.
gevent
Return wrapper to `gevent.event.AsyncResult` instead of Awaitable
type_coercion
Do type coercion optimization.
predicate_pushdown
Do predicate pushdown optimization.
projection_pushdown
Do projection pushdown optimization.
simplify_expression
Run simplify expressions optimization.
no_optimization
Turn off (certain) optimizations.
slice_pushdown
Slice pushdown optimization.
comm_subplan_elim
Will try to cache branching subplans that occur on self-joins or unions.
comm_subexpr_elim
Common subexpressions will be cached and reused.
streaming
Run parts of the query in a streaming fashion (this is in an alpha state)

Notes
-----
Results are put in queue exactly once using `put_nowait`.
If error occurred then Exception will be put in the queue instead of result
which is then raised by returned wrapper `get` method.
In case of error `set_exception` is used on
`asyncio.Future`/`gevent.event.AsyncResult` and will be reraised by them.

Warnings
--------
Expand All @@ -1705,8 +1753,10 @@ def collect_all_async(

Returns
-------
Wrapper that has `get` method and `queue` attribute with given queue.
`get` accepts kwargs that are passed down to `queue.get`.
If `gevent=False` (default) then returns awaitable.

If `gevent=True` then returns wrapper that has
`.get(block=True, timeout=None)` method.
"""
if no_optimization:
predicate_pushdown = False
Expand All @@ -1731,9 +1781,9 @@ def collect_all_async(
)
prepared.append(ldf)

result = _AsyncDataFrameResult(queue)
plr.collect_all_with_callback(prepared, result._callback_all)
return result
result = _GeventDataFrameResult() if gevent else _AioDataFrameResult()
plr.collect_all_with_callback(prepared, result._callback_all) # type: ignore[attr-defined]
return result # type: ignore[return-value]


def select(*exprs: IntoExpr | Iterable[IntoExpr], **named_exprs: IntoExpr) -> DataFrame:
Expand Down
113 changes: 80 additions & 33 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from polars.lazyframe.group_by import LazyGroupBy
from polars.selectors import _expand_selectors, expand_selector
from polars.slice import LazyPolarsSlice
from polars.utils._async import _AsyncDataFrameResult
from polars.utils._async import _AioDataFrameResult, _GeventDataFrameResult
from polars.utils._parse_expr_input import (
parse_as_expression,
parse_as_list_of_expressions,
Expand All @@ -75,8 +75,7 @@
if TYPE_CHECKING:
import sys
from io import IOBase
from queue import Queue
from typing import Literal
from typing import Awaitable, Literal

import pyarrow as pa

Expand Down Expand Up @@ -1703,10 +1702,44 @@ def collect(
)
return wrap_df(ldf.collect())

@overload
def collect_async(
self,
*,
gevent: Literal[True],
type_coercion: bool = True,
predicate_pushdown: bool = True,
projection_pushdown: bool = True,
simplify_expression: bool = True,
no_optimization: bool = True,
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
streaming: bool = True,
) -> _GeventDataFrameResult[DataFrame]:
...

@overload
def collect_async(
self,
*,
gevent: Literal[False] = False,
type_coercion: bool = True,
predicate_pushdown: bool = True,
projection_pushdown: bool = True,
simplify_expression: bool = True,
no_optimization: bool = True,
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
streaming: bool = True,
) -> Awaitable[DataFrame]:
...

def collect_async(
self,
queue: Queue[DataFrame | Exception],
*,
gevent: bool = False,
type_coercion: bool = True,
predicate_pushdown: bool = True,
projection_pushdown: bool = True,
Expand All @@ -1716,33 +1749,44 @@ def collect_async(
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
streaming: bool = False,
) -> _AsyncDataFrameResult[DataFrame]:
) -> Awaitable[DataFrame] | _GeventDataFrameResult[DataFrame]:
"""
Collect DataFrame asynchronously in thread pool.

Collects into a DataFrame, like :func:`collect`
but instead of returning DataFrame directly its collected inside thread pool
and gets put into `queue` with `put_nowait` method,
Collects into a DataFrame (like :func:`collect`), but instead of returning
dataframe directly, they are scheduled to be collected inside thread pool,
while this method returns almost instantly.

May be useful if you use gevent or asyncio and want to release control to other
greenlets/tasks while LazyFrames are being collected.
You must use correct queue in that case.
Given `queue` must be thread safe!

For gevent use
[`gevent.queue.Queue`](https://www.gevent.org/api/gevent.queue.html#gevent.queue.Queue).

For asyncio
[`asyncio.queues.Queue`](https://docs.python.org/3/library/asyncio-queue.html#queue)
can not be used, since it's not thread safe!
For that purpose use [janus](https://github.com/aio-libs/janus) library.
Parameters
----------
gevent
Return wrapper to `gevent.event.AsyncResult` instead of Awaitable
type_coercion
Do type coercion optimization.
predicate_pushdown
Do predicate pushdown optimization.
projection_pushdown
Do projection pushdown optimization.
simplify_expression
Run simplify expressions optimization.
no_optimization
Turn off (certain) optimizations.
slice_pushdown
Slice pushdown optimization.
comm_subplan_elim
Will try to cache branching subplans that occur on self-joins or unions.
comm_subexpr_elim
Common subexpressions will be cached and reused.
streaming
Run parts of the query in a streaming fashion (this is in an alpha state)

Notes
-----
Results are put in queue exactly once using `put_nowait`.
If error occurred then Exception will be put in the queue instead of result
which is then raised by returned wrapper `get` method.
In case of error `set_exception` is used on
`asyncio.Future`/`gevent.event.AsyncResult` and will be reraised by them.

Warnings
--------
Expand All @@ -1756,25 +1800,29 @@ def collect_async(

Returns
-------
Wrapper that has `get` method and `queue` attribute with given queue.
`get` accepts kwargs that are passed down to `queue.get`.
If `gevent=False` (default) then returns awaitable.

If `gevent=True` then returns wrapper that has
`.get(block=True, timeout=None)` method.

Examples
--------
>>> import queue
>>> import asyncio
>>> lf = pl.LazyFrame(
... {
... "a": ["a", "b", "a", "b", "b", "c"],
... "b": [1, 2, 3, 4, 5, 6],
... "c": [6, 5, 4, 3, 2, 1],
... }
... )
>>> a = (
... lf.group_by("a", maintain_order=True)
... .agg(pl.all().sum())
... .collect_async(queue.Queue())
... )
>>> a.get()
>>> async def main():
... return await (
... lf.group_by("a", maintain_order=True)
... .agg(pl.all().sum())
... .collect_async()
... )
...
>>> asyncio.run(main())
shape: (3, 3)
┌─────┬─────┬─────┐
│ a ┆ b ┆ c │
Expand All @@ -1785,7 +1833,6 @@ def collect_async(
│ b ┆ 11 ┆ 10 │
│ c ┆ 6 ┆ 1 │
└─────┴─────┴─────┘

"""
if no_optimization:
predicate_pushdown = False
Expand All @@ -1809,9 +1856,9 @@ def collect_async(
eager=False,
)

result = _AsyncDataFrameResult(queue)
ldf.collect_with_callback(result._callback)
return result
result = _GeventDataFrameResult() if gevent else _AioDataFrameResult()
ldf.collect_with_callback(result._callback) # type: ignore[attr-defined]
return result # type: ignore[return-value]

def sink_parquet(
self,
Expand Down
Loading