Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: minimal support for dask.dataframe query planning (dask-expr) #285

Merged
merged 12 commits into from
May 6, 2024
4 changes: 1 addition & 3 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
env:
- ci/envs/310-no-optional-deps.yaml
- ci/envs/39-minimal.yaml
- ci/envs/311-no-expr.yaml
- ci/envs/311-latest.yaml
- ci/envs/312-latest.yaml

Expand All @@ -42,9 +43,6 @@ jobs:
- env: ci/envs/312-dev.yaml
os: ubuntu-latest

env:
DASK_DATAFRAME__QUERY_PLANNING: False

steps:
- uses: actions/checkout@v4

Expand Down
26 changes: 26 additions & 0 deletions ci/envs/311-no-expr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: test
channels:
- conda-forge
dependencies:
# required dependencies
- python=3.11
# pin to last release before dask-expr was turned on by default
- dask=2024.2
- distributed=2024.2
- geopandas
- pyproj=3.4
- packaging
# test dependencies
- pytest
- pytest-cov
- hilbertcurve
- s3fs
- moto<5 # <5 pin because of https://github.com/dask/dask/issues/10869
- flask # needed for moto server
# optional dependencies
- pyarrow
- pyogrio>=0.4
- pygeohash
- pip
- pip:
- pymorton
1 change: 1 addition & 0 deletions ci/envs/312-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ dependencies:
- git+https://github.com/shapely/shapely.git@main
- git+https://github.com/geopandas/geopandas.git@main
- git+https://github.com/dask/dask.git@main
- git+https://github.com/dask-contrib/dask-expr.git@main
26 changes: 18 additions & 8 deletions dask_geopandas/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
from ._version import get_versions

from . import backends
from .clip import clip
from .core import (
points_from_xy,
GeoDataFrame,
GeoSeries,
from_geopandas,
from_dask_dataframe,
)

if backends.QUERY_PLANNING_ON:
from .expr import (
points_from_xy,
GeoDataFrame,
GeoSeries,
from_geopandas,
from_dask_dataframe,
)
else:
from .core import (
points_from_xy,
GeoDataFrame,
GeoSeries,
from_geopandas,
from_dask_dataframe,
)
from .io.file import read_file
from .io.parquet import read_parquet, to_parquet
from .io.arrow import read_feather, to_feather
from .clip import clip
from .sjoin import sjoin


Expand Down
15 changes: 15 additions & 0 deletions dask_geopandas/backends.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
import uuid
from packaging.version import Version

from dask import config


# Check if dask-dataframe is using dask-expr (default of None means True as well)
QUERY_PLANNING_ON = config.get("dataframe.query-planning", False)
if QUERY_PLANNING_ON is None:
import pandas as pd

if Version(pd.__version__).major < 2:
QUERY_PLANNING_ON = False
else:
QUERY_PLANNING_ON = True


from dask.dataframe.core import get_parallel_type
from dask.dataframe.utils import meta_nonempty
Expand Down
25 changes: 19 additions & 6 deletions dask_geopandas/clip.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@
from dask.utils import derived_from
from dask.base import tokenize

from .core import GeoDataFrame, GeoSeries
from . import backends


@derived_from(geopandas.tools)
def clip(gdf, mask, keep_geom_type=False):

if backends.QUERY_PLANNING_ON:
from .expr import GeoDataFrame, GeoSeries
else:
from .core import GeoDataFrame, GeoSeries

if isinstance(mask, (GeoDataFrame, GeoSeries)):
raise NotImplementedError("Mask cannot be a Dask GeoDataFrame or GeoSeries.")

Expand Down Expand Up @@ -38,10 +44,17 @@ def clip(gdf, mask, keep_geom_type=False):
}
divisions = [None] * (len(dsk) + 1)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[gdf])
if isinstance(gdf, GeoDataFrame):
result = GeoDataFrame(graph, name, gdf._meta, tuple(divisions))
elif isinstance(gdf, GeoSeries):
result = GeoSeries(graph, name, gdf._meta, tuple(divisions))
result.spatial_partitions = new_spatial_partitions
if backends.QUERY_PLANNING_ON:
from dask_expr import from_graph

result = from_graph(graph, gdf._meta, tuple(divisions), dsk.keys(), "clip")
else:
from .core import GeoDataFrame, GeoSeries

if isinstance(gdf, GeoDataFrame):
result = GeoDataFrame(graph, name, gdf._meta, tuple(divisions))
elif isinstance(gdf, GeoSeries):
result = GeoSeries(graph, name, gdf._meta, tuple(divisions))

result.spatial_partitions = new_spatial_partitions
return result
Loading
Loading