Skip to content

Commit

Permalink
ENH: minimal support for dask.dataframe query planning (dask-expr) (#285
Browse files Browse the repository at this point in the history
)

Co-authored-by: Tom Augspurger <[email protected]>
  • Loading branch information
jorisvandenbossche and Tom Augspurger authored May 6, 2024
1 parent 3489a1c commit cc9076f
Show file tree
Hide file tree
Showing 13 changed files with 1,191 additions and 29 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
env:
- ci/envs/310-no-optional-deps.yaml
- ci/envs/39-minimal.yaml
- ci/envs/311-no-expr.yaml
- ci/envs/311-latest.yaml
- ci/envs/312-latest.yaml

Expand All @@ -42,9 +43,6 @@ jobs:
- env: ci/envs/312-dev.yaml
os: ubuntu-latest

env:
DASK_DATAFRAME__QUERY_PLANNING: False

steps:
- uses: actions/checkout@v4

Expand Down
26 changes: 26 additions & 0 deletions ci/envs/311-no-expr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: test
channels:
- conda-forge
dependencies:
# required dependencies
- python=3.11
# pin to last release before dask-expr was turned on by default
- dask=2024.2
- distributed=2024.2
- geopandas
- pyproj=3.4
- packaging
# test dependencies
- pytest
- pytest-cov
- hilbertcurve
- s3fs
- moto<5 # <5 pin because of https://github.com/dask/dask/issues/10869
- flask # needed for moto server
# optional dependencies
- pyarrow
- pyogrio>=0.4
- pygeohash
- pip
- pip:
- pymorton
1 change: 1 addition & 0 deletions ci/envs/312-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ dependencies:
- git+https://github.com/shapely/shapely.git@main
- git+https://github.com/geopandas/geopandas.git@main
- git+https://github.com/dask/dask.git@main
- git+https://github.com/dask-contrib/dask-expr.git@main
26 changes: 18 additions & 8 deletions dask_geopandas/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
from ._version import get_versions

from . import backends
from .clip import clip
from .core import (
points_from_xy,
GeoDataFrame,
GeoSeries,
from_geopandas,
from_dask_dataframe,
)

if backends.QUERY_PLANNING_ON:
from .expr import (
points_from_xy,
GeoDataFrame,
GeoSeries,
from_geopandas,
from_dask_dataframe,
)
else:
from .core import (
points_from_xy,
GeoDataFrame,
GeoSeries,
from_geopandas,
from_dask_dataframe,
)
from .io.file import read_file
from .io.parquet import read_parquet, to_parquet
from .io.arrow import read_feather, to_feather
from .clip import clip
from .sjoin import sjoin


Expand Down
15 changes: 15 additions & 0 deletions dask_geopandas/backends.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
import uuid
from packaging.version import Version

from dask import config


# Check if dask-dataframe is using dask-expr (default of None means True as well)
QUERY_PLANNING_ON = config.get("dataframe.query-planning", False)
if QUERY_PLANNING_ON is None:
import pandas as pd

if Version(pd.__version__).major < 2:
QUERY_PLANNING_ON = False
else:
QUERY_PLANNING_ON = True


from dask.dataframe.core import get_parallel_type
from dask.dataframe.utils import meta_nonempty
Expand Down
25 changes: 19 additions & 6 deletions dask_geopandas/clip.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@
from dask.utils import derived_from
from dask.base import tokenize

from .core import GeoDataFrame, GeoSeries
from . import backends


@derived_from(geopandas.tools)
def clip(gdf, mask, keep_geom_type=False):

if backends.QUERY_PLANNING_ON:
from .expr import GeoDataFrame, GeoSeries
else:
from .core import GeoDataFrame, GeoSeries

if isinstance(mask, (GeoDataFrame, GeoSeries)):
raise NotImplementedError("Mask cannot be a Dask GeoDataFrame or GeoSeries.")

Expand Down Expand Up @@ -38,10 +44,17 @@ def clip(gdf, mask, keep_geom_type=False):
}
divisions = [None] * (len(dsk) + 1)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[gdf])
if isinstance(gdf, GeoDataFrame):
result = GeoDataFrame(graph, name, gdf._meta, tuple(divisions))
elif isinstance(gdf, GeoSeries):
result = GeoSeries(graph, name, gdf._meta, tuple(divisions))
result.spatial_partitions = new_spatial_partitions
if backends.QUERY_PLANNING_ON:
from dask_expr import from_graph

result = from_graph(graph, gdf._meta, tuple(divisions), dsk.keys(), "clip")
else:
from .core import GeoDataFrame, GeoSeries

if isinstance(gdf, GeoDataFrame):
result = GeoDataFrame(graph, name, gdf._meta, tuple(divisions))
elif isinstance(gdf, GeoSeries):
result = GeoSeries(graph, name, gdf._meta, tuple(divisions))

result.spatial_partitions = new_spatial_partitions
return result
Loading

0 comments on commit cc9076f

Please sign in to comment.