Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add load_stac to localprocessing #445

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `load_stac` for Client Side Processing, based on the [openeo-processes-dask implementation](https://github.com/Open-EO/openeo-processes-dask/pull/127)

### Changed

- Updated docs for Client Side Processing with `load_stac` examples, available at https://open-eo.github.io/openeo-python-client/cookbook/localprocessing.html

### Removed

### Fixed
Expand Down
98 changes: 87 additions & 11 deletions docs/cookbook/localprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ Background
----------

The client-side processing functionality allows to test and use openEO with its processes locally, i.e. without any connection to an openEO back-end.
It relies on the projects `openeo-pg-parser-networkx <https://github.com/Open-EO/openeo-pg-parser-networkx>`_, which provides an openEO process graph parsing tool, and `openeo-processes-dask <https://github.com/Open-EO/openeo-processes-dask>`_, which provides an Xarray and Dask implementation of most openEO processes.
It relies on the projects `openeo-pg-parser-networkx <https://github.com/Open-EO/openeo-pg-parser-networkx>`_, which provides an openEO process graph parsing tool, and `openeo-processes-dask <https://github.com/Open-EO/openeo-processes-dask>`_, which provides an Xarray and Dask implementation of most openEO processes.

Installation
------------

.. note::
This feature requires ``Python>=3.9`` and has been tested
with ``openeo-pg-parser-networkx==2023.3.1`` and
``openeo-processes-dask==2023.3.2``
This feature requires ``Python>=3.9``.
Tested with ``openeo-pg-parser-networkx==2023.5.1`` and
``openeo-processes-dask==2023.7.1``.

.. code:: bash

Expand All @@ -26,18 +26,68 @@ Installation
Usage
-----

Local Collections
Every openEO process graph relies on data, which was always provided by a cloud infrastructure (the openEO back-end) until now.
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
The client-side processing adds the possibility to read and use local netCDFs, geoTIFFs and ZARR files and remote STAC Collections or Items for your experiments.
clausmichele marked this conversation as resolved.
Show resolved Hide resolved

STAC Collections and Items
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
~~~~~~~~~~~~~~~~~

Every openEO process graph relies on data, which was always provided by a cloud infrastructure (the openEO back-end) until now.
The client-side processing adds the possibility to read and use local netCDFs, geoTIFFs and ZARR files for your experiments.
.. warning::
The provided examples using STAC rely on third party STAC Catalogs, we can't guarantee that the urls will remain valid.

With the `load_stac` process it's possible to load and use data provided by remote or local STAC Collections or Items.
The following code snippet loads Sentinel-2 L2A data from a public STAC Catalog, using specific spatial and temporal extent, band name and also properties for cloud coverage.

.. code-block:: pycon

>>> from openeo.local import LocalConnection
>>> local_conn = LocalConnection("./")

>>> url = "https://earth-search.aws.element84.com/v1/collections/sentinel-2-l2a"
>>> spatial_extent = {"west": 11, "east": 12, "south": 46, "north": 47}
>>> temporal_extent = ["2019-01-01","2019-06-15"]
>>> bands = ["red"]
>>> properties = {"eo:cloud_cover": dict(lt=50)}
>>> s2_cube = local_conn.load_stac(url=url,
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
>>> spatial_extent=spatial_extent,
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
>>> temporal_extent=temporal_extent,
>>> bands=bands,
>>> properties=properties)
>>> s2_cube.execute()
<xarray.DataArray 'stackstac-08730b1b5458a4ed34edeee60ac79254' (time: 177,
band: 1,
y: 11354,
x: 8025)>
dask.array<getitem, shape=(177, 1, 11354, 8025), dtype=float64, chunksize=(1, 1, 1024, 1024), chunktype=numpy.ndarray>
Coordinates: (12/53)
* time (time) datetime64[ns] 2019-01-02...
id (time) <U24 'S2B_32TPR_20190102_...
* band (band) <U3 'red'
* x (x) float64 6.52e+05 ... 7.323e+05
* y (y) float64 5.21e+06 ... 5.096e+06
s2:product_uri (time) <U65 'S2B_MSIL2A_20190102...
... ...
raster:bands object {'nodata': 0, 'data_type'...
gsd int32 10
common_name <U3 'red'
center_wavelength float64 0.665
full_width_half_max float64 0.038
epsg int32 32632
Attributes:
spec: RasterSpec(epsg=32632, bounds=(600000.0, 4990200.0, 809760.0...
crs: epsg:32632
transform: | 10.00, 0.00, 600000.00|\n| 0.00,-10.00, 5300040.00|\n| 0.0...
resolution: 10.0

Local Collections
~~~~~~~~~~~~~~~~~

If you want to use our sample data, please clone this repository:

.. code:: bash

git clone https://github.com/Open-EO/openeo-localprocessing-data.git

With some sample data we can now check the STAC metadata for the local files by doing:

.. code:: python
Expand Down Expand Up @@ -80,9 +130,8 @@ Let's start with the provided sample netCDF of Sentinel-2 data:
Attributes:
Conventions: CF-1.9
institution: openEO platform - Geotrellis backend: 0.9.5a1
description:
title:
...
description:
title:

As you can see in the previous example, we are using a call to execute() which will execute locally the generated openEO process graph.
In this case, the process graph consist only in a single load_collection, which performs lazy loading of the data. With this first step you can check if the data is being read correctly by openEO.
Expand All @@ -102,3 +151,30 @@ We can now do a simple processing for demo purposes, let's compute the median ND
result_ndvi.plot.imshow(cmap="Greens")

.. image:: ../_static/images/local/local_ndvi.jpg

We can perform the same example using data provided by STAC Collection:

.. code:: python

from openeo.local import LocalConnection
local_conn = LocalConnection("./")

url = "https://earth-search.aws.element84.com/v1/collections/sentinel-2-l2a"
spatial_extent = {"east": 11.406212,
"north": 46.522237,
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
"south": 46.461019,
"west": 11.259613}
temporal_extent = ["2022-06-01","2022-06-30"]
bands = ["red","nir"]
properties = {"eo:cloud_cover": dict(lt=80)}
s2_datacube = local_conn.load_stac(url=url,
spatial_extent=spatial_extent,
temporal_extent=temporal_extent,
bands=bands,
properties=properties)

b04 = s2_datacube.band("red")
b08 = s2_datacube.band("nir")
ndvi = (b08-b04)/(b08+b04)
ndvi_median = ndvi.reduce_dimension(dimension="time",reducer="median")
result_ndvi = ndvi_median.execute()
161 changes: 159 additions & 2 deletions openeo/local/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
from typing import Callable, Dict, List, Optional, Union

import xarray as xr
import numpy as np
from openeo_pg_parser_networkx.graph import OpenEOProcessGraph
from openeo_pg_parser_networkx.pg_schema import BoundingBox, TemporalInterval
from openeo_processes_dask.process_implementations.cubes import load_stac

from openeo.metadata import CollectionMetadata, SpatialDimension, TemporalDimension, BandDimension, Band
from openeo.internal.graph_building import PGNode, as_flat_graph
from openeo.internal.jupyter import VisualDict, VisualList
from openeo.local.collections import _get_geotiff_metadata, _get_local_collections, _get_netcdf_zarr_metadata
from openeo.local.processing import PROCESS_REGISTRY
from openeo.metadata import CollectionMetadata
from openeo.rest.datacube import DataCube

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -88,7 +91,161 @@ def load_collection(
fetch_metadata=fetch_metadata,
)

def execute(self, process_graph: Union[dict, str, Path]) -> xr.DataArray:
def datacube_from_process(self, process_id: str, namespace: Optional[str] = None, **kwargs) -> DataCube:
"""
Load a data cube from a (custom) process.

:param process_id: The process id.
:param namespace: optional: process namespace
:param kwargs: The arguments of the custom process
:return: A :py:class:`DataCube`, without valid metadata, as the client is not aware of this custom process.
"""
graph = PGNode(process_id, namespace=namespace, arguments=kwargs)
return DataCube(graph=graph, connection=self)

def load_stac(
self,
url: str,
spatial_extent: Optional[Dict[str, float]] = None,
temporal_extent: Optional[List[Union[str, datetime.datetime, datetime.date]]] = None,
bands: Optional[List[str]] = None,
properties: Optional[dict] = None,
) -> DataCube:
"""
Loads data from a static STAC catalog or a STAC API Collection and returns the data as a processable :py:class:`DataCube`.
A batch job result can be loaded by providing a reference to it.

If supported by the underlying metadata and file format, the data that is added to the data cube can be
restricted with the parameters ``spatial_extent``, ``temporal_extent`` and ``bands``.
If no data is available for the given extents, a ``NoDataAvailable`` error is thrown.

Remarks:

* The bands (and all dimensions that specify nominal dimension labels) are expected to be ordered as
specified in the metadata if the ``bands`` parameter is set to ``null``.
* If no additional parameter is specified this would imply that the whole data set is expected to be loaded.
Due to the large size of many data sets, this is not recommended and may be optimized by back-ends to only
load the data that is actually required after evaluating subsequent processes such as filters.
This means that the values should be processed only after the data has been limited to the required extent
and as a consequence also to a manageable size.


:param url: The URL to a static STAC catalog (STAC Item, STAC Collection, or STAC Catalog)
or a specific STAC API Collection that allows to filter items and to download assets.
This includes batch job results, which itself are compliant to STAC.
For external URLs, authentication details such as API keys or tokens may need to be included in the URL.

Batch job results can be specified in two ways:

- For Batch job results at the same back-end, a URL pointing to the corresponding batch job results
endpoint should be provided. The URL usually ends with ``/jobs/{id}/results`` and ``{id}``
is the corresponding batch job ID.
- For external results, a signed URL must be provided. Not all back-ends support signed URLs,
which are provided as a link with the link relation `canonical` in the batch job result metadata.
:param spatial_extent:
Limits the data to load to the specified bounding box or polygons.

For raster data, the process loads the pixel into the data cube if the point at the pixel center intersects
with the bounding box or any of the polygons (as defined in the Simple Features standard by the OGC).

For vector data, the process loads the geometry into the data cube if the geometry is fully within the
bounding box or any of the polygons (as defined in the Simple Features standard by the OGC).
Empty geometries may only be in the data cube if no spatial extent has been provided.

The GeoJSON can be one of the following feature types:

* A ``Polygon`` or ``MultiPolygon`` geometry,
* a ``Feature`` with a ``Polygon`` or ``MultiPolygon`` geometry, or
* a ``FeatureCollection`` containing at least one ``Feature`` with ``Polygon`` or ``MultiPolygon`` geometries.

Set this parameter to ``None`` to set no limit for the spatial extent.
Be careful with this when loading large datasets. It is recommended to use this parameter instead of
using ``filter_bbox()`` or ``filter_spatial()`` directly after loading unbounded data.

:param temporal_extent:
Limits the data to load to the specified left-closed temporal interval.
Applies to all temporal dimensions.
The interval has to be specified as an array with exactly two elements:

1. The first element is the start of the temporal interval.
The specified instance in time is **included** in the interval.
2. The second element is the end of the temporal interval.
The specified instance in time is **excluded** from the interval.

The second element must always be greater/later than the first element.
Otherwise, a `TemporalExtentEmpty` exception is thrown.

Also supports open intervals by setting one of the boundaries to ``None``, but never both.

Set this parameter to ``None`` to set no limit for the temporal extent.
Be careful with this when loading large datasets. It is recommended to use this parameter instead of
using ``filter_temporal()`` directly after loading unbounded data.

:param bands:
Only adds the specified bands into the data cube so that bands that don't match the list
of band names are not available. Applies to all dimensions of type `bands`.

Either the unique band name (metadata field ``name`` in bands) or one of the common band names
(metadata field ``common_name`` in bands) can be specified.
If the unique band name and the common name conflict, the unique band name has a higher priority.

The order of the specified array defines the order of the bands in the data cube.
If multiple bands match a common name, all matched bands are included in the original order.

It is recommended to use this parameter instead of using ``filter_bands()`` directly after loading unbounded data.

:param properties:
Limits the data by metadata properties to include only data in the data cube which
all given conditions return ``True`` for (AND operation).

Specify key-value-pairs with the key being the name of the metadata property,
which can be retrieved with the openEO Data Discovery for Collections.
The value must be a condition (user-defined process) to be evaluated against a STAC API.
This parameter is not supported for static STAC.

.. versionadded:: 0.21.0
"""
arguments = {"url": url}
# TODO: more normalization/validation of extent/band parameters and `properties`
if spatial_extent:
arguments["spatial_extent"] = spatial_extent
if temporal_extent:
arguments["temporal_extent"] = DataCube._get_temporal_extent(temporal_extent)
if bands:
arguments["bands"] = bands
if properties:
arguments["properties"] = properties
cube = self.datacube_from_process(process_id="load_stac", **arguments)
# detect actual metadata from URL
# run load_stac to get the datacube metadata
arguments["spatial_extent"] = BoundingBox.parse_obj(spatial_extent)
arguments["temporal_extent"] = TemporalInterval.parse_obj(temporal_extent)
xarray_cube = load_stac(**arguments)
attrs = xarray_cube.attrs
for at in attrs:
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
# allowed types: str, Number, ndarray, number, list, tuple
if not isinstance(attrs[at], (int, float, str, np.ndarray, list, tuple)):
attrs[at] = str(attrs[at])
metadata = CollectionMetadata(
attrs,
dimensions=[
SpatialDimension(name=xarray_cube.openeo.x_dim, extent=[]),
SpatialDimension(name=xarray_cube.openeo.y_dim, extent=[]),
TemporalDimension(name=xarray_cube.openeo.temporal_dims[0], extent=[]),
BandDimension(name=xarray_cube.openeo.band_dims[0],
bands=[Band(x) for x in xarray_cube[xarray_cube.openeo.band_dims[0]].values])
],
)
cube.metadata = metadata
return cube

def execute(
self,
process_graph: Union[
dict,
str,
],
) -> xr.DataArray:
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
"""
Execute locally the process graph and return the result as an xarray.DataArray.

Expand Down
4 changes: 2 additions & 2 deletions requirements-localprocessing.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Those requirements are for the experimental and optional client side processing feature
# that can be installed via pip install openeo[localprocessing].
# Please be aware that this functionality is not fully stable and may change quickly.
openeo-pg-parser-networkx==2023.3.1
openeo-processes-dask==2023.3.2
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
openeo-pg-parser-networkx==2023.5.1
openeo-processes-dask==2023.7.1