diff --git a/api/Dockerfile b/api/Dockerfile index 40de313c..15e215f5 100755 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -48,5 +48,7 @@ RUN pip install stac_validator RUN pip install geopandas RUN pip install pyarrow RUN pip install starlette_exporter +RUN pip install geomet +RUN pip install rasterio COPY ./api /api diff --git a/api/api/routers/datasets/ingest_dataset.py b/api/api/routers/datasets/ingest_dataset.py index 61c10b0c..b9d77d1a 100755 --- a/api/api/routers/datasets/ingest_dataset.py +++ b/api/api/routers/datasets/ingest_dataset.py @@ -133,64 +133,4 @@ def ingest_stac_catalog( return ingest_stac(body.stac, dataset_id, user) except Exception as e: logger.exception("datasets:ingest_url") - raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) - - -# @router.post("/{dataset_id}") -# async def ingest( -# dataset_id: str, -# file: Optional[UploadFile] = File(None), -# version: int = Form(), # debería quitarlo (un file solo se puede subir a la última versión si no está ya) -# parent: str = Form(None), # debería quitarlo (sacarlo del nombre?) -# checksum: str = Form(None), # optional bc browser -# filename: str = Form(None), -# fileversion: int = Form(None), -# user: User = Depends(get_current_user), -# ): -# try: -# if filename: -# assert not file, "File provided as both file and filename" -# assert not parent, "Parent provided as both parent and filename" -# assert fileversion, "Fileversion not provided" -# dataset_id, dataset_name, file_name = await ingest_existing_dataset_file( -# filename, dataset_id, fileversion, version, checksum, user -# ) -# else: -# if file.size > 1000000000: # 1GB -# raise Exception( -# "File too large, please use the CLI to upload large files." -# ) -# dataset_id, dataset_name, file_name = await ingest_dataset_file( -# file, dataset_id, version, parent, checksum, user -# ) -# return { -# "dataset_id": dataset_id, -# "dataset_name": dataset_name, -# "file_name": file_name, -# } -# except Exception as e: -# logger.exception("datasets:ingest") -# raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) - - -# class IngestURLBody(BaseModel): -# url: str - -# @router.post("/{dataset_id}/url") -# async def ingest_url( -# dataset_id: str, -# body: IngestURLBody, -# user: User = Depends(get_current_user), -# ): -# # try: -# dataset_id, dataset_name, file_name = await ingest_dataset_file_url( -# body.url, dataset_id, user -# ) -# return { -# "dataset_id": dataset_id, -# "dataset_name": dataset_name, -# "file_name": file_name, -# } -# # except Exception as e: -# # logger.exception("datasets:ingest") -# # raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) \ No newline at end of file diff --git a/api/api/src/repos/geodb/GeoDBRepo.py b/api/api/src/repos/geodb/GeoDBRepo.py index e37e579b..42e79e93 100755 --- a/api/api/src/repos/geodb/GeoDBRepo.py +++ b/api/api/src/repos/geodb/GeoDBRepo.py @@ -1,7 +1,5 @@ -import geopandas as gpd from shapely.geometry import Polygon from .client import get_client -import json class GeoDBRepo: @@ -19,10 +17,6 @@ def create(self, collections): return self.client.create_collections(collections, database=self.database) def insert(self, collection, values): - values = gpd.GeoDataFrame.from_features(values["features"], crs="4326") # ??? - catalog = values[values["type"] == "Catalog"] - assert len(catalog) == 1, "STAC catalog must have exactly one root catalog" - catalog = json.loads(catalog.to_json())["features"][0]["properties"] values.rename(columns={"id": "stac_id"}, inplace=True) # convert None geometry to empty geometry wkt values.geometry = values.geometry.apply(lambda x: Polygon() if x is None else x) @@ -33,7 +27,6 @@ def insert(self, collection, values): self.client.insert_into_collection( collection, database=self.database, values=values ) - return catalog def retrieve(self, collection): return self.client.get_collection(collection, database=self.database) diff --git a/api/api/src/usecases/datasets/ingest_file.py b/api/api/src/usecases/datasets/ingest_file.py index 22712ff5..1fe90279 100755 --- a/api/api/src/usecases/datasets/ingest_file.py +++ b/api/api/src/usecases/datasets/ingest_file.py @@ -2,12 +2,17 @@ import zipfile import io import os +import geopandas as gpd +import json +import shutil from .retrieve_dataset import retrieve_owned_dataset from ...errors import DatasetVersionDoesNotExistError from ...repos import DatasetsDBRepo, GeoDBRepo from ..files import ingest_file, ingest_existing_file from ..user import retrieve_user_credentials +from .stac import MLDatasetQualityMetrics +from ..utils.stac import STACDataFrame async def ingest_dataset_file(file, dataset_id, checksum, user, version): @@ -90,18 +95,41 @@ def ingest_stac(stac, dataset_id, user): # check if dataset exists dataset = retrieve_owned_dataset(dataset_id, user.uid) # TODO: check all assets exist in os + # generate catalog + values = gpd.GeoDataFrame.from_features(stac["features"], crs="4326") # ??? + catalog = values[values["type"] == "Catalog"] + assert len(catalog) == 1, "STAC catalog must have exactly one root catalog" + catalog = json.loads(catalog.to_json())["features"][0]["properties"] + keys = list(catalog.keys()) + dataset_quality = 1 + # TODO: validate Q1 dataset with required fields/extensions (author, license) + # TODO: validate Q2 dataset, not only check name + if "ml-dataset:name" in keys: + dataset_quality = 2 + # compute and report automatic qa metrics + # save stac locally + tmp_path = f"/tmp/{dataset_id}" + df = STACDataFrame(values) + df.to_stac(tmp_path) + # compute metrics + catalog_path = f"{tmp_path}/{dataset.name}/catalog.json" + MLDatasetQualityMetrics.calculate(catalog_path) + # overwrite catalog with computed metrics + df = STACDataFrame.from_stac_file(catalog_path) + catalog = df[df["type"] == "Catalog"] + # print("1", catalog) + catalog = json.loads(catalog.to_json())["features"][0]["properties"] + # print("2", catalog) + # delete tmp files + shutil.rmtree(tmp_path) + print("quality", dataset_quality) # ingest to geodb credentials = retrieve_user_credentials(user) geodb_repo = GeoDBRepo(credentials) - catalog = geodb_repo.insert(dataset.id, stac) + geodb_repo.insert(dataset.id, values) # the catalog should contain all the info we want to show in the UI dataset.catalog = catalog - keys = list(catalog.keys()) - if "ml-dataset:name" in keys: - dataset.quality = 2 - # TODO: compute and report automatic qa metrics - # TODO: validate Q2 dataset, not only check name - # TODO: validate Q1 dataset with required fields/extensions (author, license) + dataset.quality = dataset_quality repo = DatasetsDBRepo() dataset.updatedAt = datetime.now() repo.update_dataset(dataset.id, dataset.model_dump()) diff --git a/api/api/src/usecases/datasets/stac.py b/api/api/src/usecases/datasets/stac.py new file mode 100644 index 00000000..2278366e --- /dev/null +++ b/api/api/src/usecases/datasets/stac.py @@ -0,0 +1,500 @@ +from typing import Union, Generic, TypeVar, Dict, Any, List +import pystac +from pystac.extensions.base import ExtensionManagementMixin, PropertiesExtension +from pystac.cache import ResolvedObjectCache +from tqdm import tqdm +from shutil import rmtree +from os.path import dirname +from pystac import STACValidationError +import traceback +from pystac.extensions.label import LabelExtension +import json + +T = TypeVar("T", pystac.Item, pystac.Collection, pystac.Catalog) +PREFIX: str = "ml-dataset:" +SCHEMA_URI: str = ( + "https://raw.githubusercontent.com/earthpulse/ml-dataset/main/json-schema/schema.json" +) + + +class MLDatasetExtension( + pystac.Catalog, + Generic[T], + PropertiesExtension, + ExtensionManagementMixin[ + Union[pystac.item.Item, pystac.collection.Collection, pystac.catalog.Catalog] + ], +): + """An abstract class that can be used to extend the properties of a + :class:`~pystac.Collection`, :class:`~pystac.Item`, or :class:`~pystac.Catalog` with + properties from the :stac-ext:`Machine Learning Dataset Extension `. This class is + generic over the type of STAC Object to be extended (e.g. :class:`~pystac.Item`, + :class:`~pystac.Asset`). + + To create a concrete instance of :class:`MLDatasetExtension`, use the + :meth:`MLDatasetExtension.ext` method. For example: + + .. code-block:: python + + >>> item: pystac.Item = ... + >>> ml_ext = MLDatasetExtension.ext(item) + """ + + catalog: pystac.Catalog + """The :class:`~pystac.Catalog` being extended.""" + + properties: Dict[str, Any] + """The :class:`~pystac.Catalog` extra fields, including extension properties.""" + + links: List[pystac.Link] + """The list of :class:`~pystac.Link` objects associated with the + :class:`~pystac.Catalog` being extended, including links added by this extension. + """ + + def __init__(self, catalog: pystac.Catalog): + super().__init__(id=catalog.id, description=catalog.description) + self._catalog = catalog + self.id = catalog.id + self.description = catalog.description + self.title = catalog.title if catalog.title else None + self.stac_extensions = ( + catalog.stac_extensions if catalog.stac_extensions else [] + ) + self.extra_fields = self.properties = ( + catalog.extra_fields if catalog.extra_fields else {} + ) + self.links = catalog.links + self._resolved_objects = ResolvedObjectCache() + + def apply(self, name: str = None) -> None: + """ + Applies the :stac-ext:`Machine Learning Dataset Extension ` to the extended + :class:`~pystac.Catalog`. + """ + self.name = name + + @property + def name(self) -> str: + """ + Name of the ML Dataset. + """ + return self.extra_fields[f"{PREFIX}name"] + + @name.setter + def name(self, v: str) -> None: + """ + Set the name of the ML Dataset. + """ + self.extra_fields[f"{PREFIX}name"] = v + + @property + def tasks(self) -> List: + """ + Tasks of the ML Dataset. + """ + return self.extra_fields[f"{PREFIX}tasks"] + + @tasks.setter + def tasks(self, v: Union[list, tuple]) -> None: + """ + Set the tasks of the ML Dataset. + """ + self.extra_fields[f"{PREFIX}tasks"] = v + + @property + def type(self) -> str: + """ + Type of the ML Dataset. + """ + return self.extra_fields[f"{PREFIX}type"] + + @type.setter + def type(self, v: str) -> None: + """ + Set the type of the ML Dataset. + """ + self.extra_fields[f"{PREFIX}type"] = v + + @property + def inputs_type(self) -> str: + """ + Inputs type of the ML Dataset. + """ + return self.extra_fields[f"{PREFIX}inputs-type"] + + @inputs_type.setter + def inputs_type(self, v: str) -> None: + """ + Set the inputs type of the ML Dataset. + """ + self.extra_fields[f"{PREFIX}inputs-type"] = v + + @property + def annotations_type(self) -> str: + """ + Annotations type of the ML Dataset. + """ + return self.extra_fields[f"{PREFIX}annotations-type"] + + @annotations_type.setter + def annotations_type(self, v: str) -> None: + """ + Set the annotations type of the ML Dataset. + """ + self.extra_fields[f"{PREFIX}annotations-type"] = v + + @property + def splits(self) -> List[str]: + """ + Splits of the ML Dataset. + """ + return self.extra_fields[f"{PREFIX}splits"] + + @splits.setter + def splits(self, v: dict) -> None: + """ + Set the splits of the ML Dataset. + """ + self.extra_fields[f"{PREFIX}splits"] = v + + @property + def quality_metrics(self) -> List[dict]: + """ + Quality metrics of the ML Dataset. + """ + return self.extra_fields[f"{PREFIX}quality-metrics"] + + @quality_metrics.setter + def quality_metrics(self, v: dict) -> None: + """ + Set the quality metrics of the ML Dataset. + """ + self.extra_fields[f"{PREFIX}quality-metrics"] = v + + @property + def version(self) -> str: + """ + Version of the ML Dataset. + """ + return self.extra_fields[f"{PREFIX}version"] + + @version.setter + def version(self, v: str) -> None: + """ + Set the version of the ML Dataset. + """ + self.extra_fields[f"{PREFIX}version"] = v + + @classmethod + def get_schema_uri(cls) -> str: + """ + Get the JSON Schema URI that validates the extended object. + """ + return SCHEMA_URI + + def add_metric(self, metric: dict) -> None: + """Add a metric to this object's set of metrics. + + Args: + metric : The metric to add. + """ + if not self.extra_fields.get(f"{PREFIX}quality-metrics"): + self.extra_fields[f"{PREFIX}quality-metrics"] = [] + + if metric not in self.extra_fields[f"{PREFIX}quality-metrics"]: + self.extra_fields[f"{PREFIX}quality-metrics"].append(metric) + + def add_metrics(self, metrics: List[dict]) -> None: + """Add a list of metrics to this object's set of metrics. + + Args: + metrics : The metrics to add. + """ + for metric in metrics: + self.add_metric(metric) + + @classmethod + def ext(cls, obj: T, add_if_missing: bool = False): + """Extends the given STAC Object with properties from the + :stac-ext:`Machine Learning Dataset Extension `. + + This extension can be applied to instances of :class:`~pystac.Catalog`, + :class:`~pystac.Collection` or :class:`~pystac.Item`. + + Raises: + pystac.ExtensionTypeError : If an invalid object type is passed. + """ + if isinstance(obj, pystac.Collection): + cls.validate_has_extension(obj, add_if_missing) + return CollectionMLDatasetExtension(obj) + elif isinstance(obj, pystac.Catalog): + cls.validate_has_extension(obj, add_if_missing) + return MLDatasetExtension(obj) + elif isinstance(obj, pystac.Item): + cls.validate_has_extension(obj, add_if_missing) + return ItemMLDatasetExtension(obj) + else: + raise pystac.ExtensionTypeError(cls._ext_error_message(obj)) + + +class CollectionMLDatasetExtension(MLDatasetExtension[pystac.Collection]): + """A concrete implementation of :class:`MLDatasetExtension` on an + :class:`~pystac.Collection` that extends the properties of the Collection to include + properties defined in the :stac-ext:`Machine Learning Dataset Extension `. + """ + + collection: pystac.Collection + properties: Dict[str, Any] + + def __init__(self, collection: pystac.Collection): + self.collection = collection + self.properties = collection.extra_fields + self.properties[f"{PREFIX}split-items"] = [] + + def __repr__(self) -> str: + return f"" + + @property + def splits(self) -> List[dict]: + """ + Splits of the ML Dataset. + """ + return self.extra_fields[f"{PREFIX}splits"] + + @splits.setter + def splits(self, v: dict) -> None: + """ + Set the splits of the ML Dataset. + """ + self.properties[f"{PREFIX}split-items"] = v + + def add_split(self, v: dict) -> None: + """ + Add a split to the ML Dataset. + """ + self.properties[f"{PREFIX}split-items"].append(v) + + def create_and_add_split( + self, split_data: List[pystac.Item], split_type: str + ) -> None: + """ + Create and add a split to the ML Dataset. + """ + items_ids = [item.id for item in split_data] + items_ids.sort() + + split = {"name": split_type, "items": items_ids} + + if not self.properties.get(f"{PREFIX}split-items"): + self.properties[f"{PREFIX}split-items"] = [] + + self.add_split(split) + print(f"Generating {split_type} split...") + for _item in tqdm(split_data): + item = self.collection.get_item(_item.id) + if item: + item_ml = MLDatasetExtension.ext(item, add_if_missing=True) + item_ml.split = split_type + + +class ItemMLDatasetExtension(MLDatasetExtension[pystac.Item]): + """A concrete implementation of :class:`MLDatasetExtension` on an + :class:`~pystac.Item` that extends the properties of the Item to include properties + defined in the :stac-ext:`Machine Learning Dataset Extension `. + + This class should generally not be instantiated directly. Instead, call + :meth:`MLDatasetExtension.ext` on an :class:`~pystac.Item` to extend it. + """ + + item: pystac.Item + properties: Dict[str, Any] + + def __init__(self, item: pystac.Item): + self.item = item + self.properties = item.properties + + @property + def split(self) -> str: + """ + Split of the ML Dataset. + """ + return self.properties[f"{PREFIX}split"] + + @split.setter + def split(self, v: str) -> None: + """ + Set the split of the ML Dataset. + """ + self.properties[f"{PREFIX}split"] = v + + def __repr__(self) -> str: + return f"" + + +class MLDatasetQualityMetrics: + """ + ML Dataset Quality Metrics + """ + + @classmethod + def calculate(cls, catalog: Union[pystac.Catalog, str]) -> None: + """ + Calculate the quality metrics of the catalog + """ + if isinstance(catalog, str): + catalog = MLDatasetExtension(pystac.read_file(catalog)) + elif isinstance(catalog, pystac.Catalog): + catalog = MLDatasetExtension(catalog) + else: + raise TypeError( + f"Catalog must be a pystac.Catalog object or a path to a STAC catalog file, not {type(catalog).__name__}" + ) + + # # Check the catalog has the extension + # if not MLDatasetExtension.has_extension(catalog): + # raise pystac.ExtensionNotImplemented( + # f"MLDatasetExtension does not apply to type '{type(catalog).__name__}'" + # ) + + try: + catalog.add_metric(cls._search_spatial_duplicates(catalog)) + # catalog.add_metric(cls._get_classes_balance(catalog)) + except AttributeError as exc: + raise pystac.ExtensionNotImplemented( + f"The catalog does not have the required properties or the ML-Dataset extension to calculate the metrics: {exc}" + ) + finally: + catalog.make_all_asset_hrefs_relative() + + try: + print("Validating and saving...") + catalog.validate() + destination = dirname(catalog.get_self_href()) + rmtree( + destination + ) # Remove the old catalog and replace it with the new one + catalog.set_root(catalog) + catalog.normalize_and_save(root_href=destination) + except STACValidationError: + # Return full callback + traceback.print_exc() + + @staticmethod + def _search_spatial_duplicates(catalog: pystac.Catalog): + """ + Search for spatial duplicates in the catalog + """ + items = list( + set( + [ + item + for item in tqdm( + catalog.get_items(recursive=True), + desc="Looking for spatial duplicates...", + ) + if not LabelExtension.has_extension(item) + ] + ) + ) + + # Initialize the spatial duplicates dict + spatial_duplicates = {"name": "spatial-duplicates", "values": [], "total": 0} + + items_bboxes = {} + for item in items: + # Get the item bounding box + bbox = str(item.bbox) + # If the bounding box is not in the items dict, add it + if bbox not in items_bboxes.keys(): + items_bboxes[bbox] = item.id + # If the bounding box is already in the items dict, add it to the duplicates dict + else: + spatial_duplicates["values"].append( + {"item": item.id, "duplicate": items_bboxes[bbox]} + ) + spatial_duplicates["total"] += 1 + + return spatial_duplicates + + @staticmethod + def _get_classes_balance(catalog: pystac.Catalog) -> dict: + """ + Get the classes balance of the catalog + """ + + def get_label_properties(items: List[pystac.Item]) -> List: + """ + Get the label properties of the catalog + """ + label_properties = [] + for label in items: + label_ext = LabelExtension.ext(label) + for prop in label_ext.label_properties: + if prop not in label_properties: + label_properties.append(prop) + + return label_properties + + catalog.make_all_asset_hrefs_absolute() + + labels = list( + set( + [ + item + for item in tqdm( + catalog.get_items(recursive=True), + desc="Calculating classes balance...", + ) + if LabelExtension.has_extension(item) + ] + ) + ) + + # Initialize the classes balance dict + classes_balance = {"name": "classes-balance", "values": []} + label_properties = get_label_properties(labels) + + for prop in label_properties: + property_balance = {"name": prop, "values": []} + properties = {} + for label in labels: + if "labels" not in label.assets: + continue + asset_path = label.assets["labels"].href + # Open the linked geoJSON to obtain the label properties + try: + with open(asset_path, mode="r", encoding="utf-8") as f: + label_data = json.load(f) + except FileNotFoundError: + raise FileNotFoundError( + f"The file {asset_path} does not exist. Make sure the assets hrefs are correct" + ) + # Get the property + for feature in label_data["features"]: + if prop in feature["properties"]: + property_value = feature["properties"][prop] + else: + if feature["properties"]["labels"]: + property_value = feature["properties"]["labels"][0] + else: + continue + if property_value not in properties: + properties[property_value] = 0 + properties[property_value] += 1 + + # Create the property balance dict + total_labels = sum(properties.values()) + for key, value in properties.items(): + property_balance["values"].append( + { + "class": key, + "total": value, + "percentage": int(value / total_labels * 100), + } + ) + + classes_balance["values"].append(property_balance) + + catalog.make_all_asset_hrefs_relative() + + return classes_balance diff --git a/api/api/src/usecases/utils/__init__.py b/api/api/src/usecases/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/api/src/usecases/utils/stac/__init__.py b/api/api/src/usecases/utils/stac/__init__.py new file mode 100644 index 00000000..dbb2e58d --- /dev/null +++ b/api/api/src/usecases/utils/stac/__init__.py @@ -0,0 +1 @@ +from .dataframe import STACDataFrame diff --git a/api/api/src/usecases/utils/stac/dataframe.py b/api/api/src/usecases/utils/stac/dataframe.py new file mode 100755 index 00000000..1ba316b8 --- /dev/null +++ b/api/api/src/usecases/utils/stac/dataframe.py @@ -0,0 +1,172 @@ +""" +Module for the STAC dataframe +""" + +import json + +from os.path import join +from os import makedirs +from typing import Union, Optional +from math import isnan +from pathlib import Path + +import pandas as pd +import geopandas as gpd +import pystac +from geomet import wkt + +from ..tools import convert_df_geom_to_shape, get_all_children + + +class STACDataFrame(gpd.GeoDataFrame): + """ + STACDataFrame class + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @classmethod + def from_stac_file(cls, stac_file: pystac.STACObject): + """ + Create a STACDataFrame from a STAC file + + :param stac_file: STAC file + """ + return read_stac(stac_file) + + def to_stac(self, path): + """ + Create a STAC catalog and children from a STACDataFrame + """ + df = self.copy() + + if "id" in df.columns and "stac_id" in df.columns: + id_column = "stac_id" + stac_id_exists = True + else: + id_column = "id" + stac_id_exists = False + + # First, create the catalog and its folder, if exists + catalog_df = df[df["type"] == "Catalog"] + + if catalog_df.empty: + makedirs(path, exist_ok=True) + else: + for _, row in catalog_df.iterrows(): + root_output_folder = path + "/" + row[id_column] + makedirs(root_output_folder, exist_ok=True) + row_json = row.to_dict() + + # Curate the json row + row_json = self.curate_json_row(row_json, stac_id_exists) + + with open( + join(root_output_folder, "catalog.json"), "w", encoding="utf-8" + ) as f: + json.dump(row_json, f) + + # Second, create the collections and their folders, if exist + collections = {} + collections_df = df[df["type"] == "Collection"] + for _, row in collections_df.iterrows(): + stac_output_folder = join(root_output_folder, row[id_column]) + collections[row[id_column]] = stac_output_folder + makedirs(stac_output_folder, exist_ok=True) + row_json = row.to_dict() + + # Curate the json row + row_json = self.curate_json_row(row_json, stac_id_exists) + + with open( + join(stac_output_folder, "collection.json"), "w", encoding="utf-8" + ) as f: + json.dump(row_json, f) + + # Then, create the items and their folders, if exist + features_df = df[df["type"] == "Feature"] + for _, row in features_df.iterrows(): + collection = row["collection"] + stac_output_folder = join(collections[collection], row[id_column]) + + # Convert the geometry from WKT back to geojson + row["geometry"] = row["geometry"].wkt + row["geometry"] = wkt.loads(row["geometry"]) + makedirs(stac_output_folder, exist_ok=True) + row_json = row.to_dict() + + # Curate the json row + row_json = self.curate_json_row(row_json, stac_id_exists) + + with open( + join(stac_output_folder, f'{row_json["id"]}.json'), + "w", + encoding="utf-8", + ) as f: + json.dump(row_json, f) + + def curate_json_row(self, row: dict, stac_id_exists: bool) -> dict: + """ + Curate the json row of a STACDataFrame, in order to generate a valid STAC file + + :param row: row of a STACDataFrame + :param stac_id_exists: if the stac_id column exists + """ + keys_to_remove = [] + + # Remove the created_at and modified_at columns, if the STACDataFrame comes from GeoDB + for i in "created_at", "modified_at": + if i in row.keys(): + keys_to_remove.append(i) + + # Rename the stac_id column to id, to avoid conflicts with the id column + if stac_id_exists: + row["id"] = row["stac_id"] + del row["stac_id"] + + # Remove the NaN values and empty strings + for k, v in row.items(): + if (isinstance(v, float) and isnan(v)) or v == "" or not v: + keys_to_remove.append(k) + + for key in keys_to_remove: + if key in row.keys(): + del row[key] + + # Convert the value to dict if it is a string and is possible + for k, v in row.items(): + if isinstance(v, str): + try: + row[k] = json.loads(v) + except json.decoder.JSONDecodeError: + pass + + return row + + +def read_stac( + stac_file: Union[pystac.Catalog, pystac.Collection, str], + geometry_column: Optional[str] = "geometry", +) -> STACDataFrame: + """ + Read a STAC file and return a STACDataFrame + + :param stac_file: STAC file to read + :param geometry_column: name of the geometry column + """ + if isinstance(stac_file, (str, Path)): + stac_file = pystac.read_file(stac_file) # we assume this is always a catalog + stac_file.make_all_asset_hrefs_absolute() + children = get_all_children(stac_file) + + # Convert Dataframe to STACDataFrame + dataframe = pd.DataFrame(children) + dataframe[geometry_column] = dataframe.apply(convert_df_geom_to_shape, axis=1) + stac_dataframe = STACDataFrame( + dataframe, + crs="EPSG:4326", + geometry=gpd.GeoSeries.from_wkt(dataframe[geometry_column]), + ) + + return stac_dataframe diff --git a/api/api/src/usecases/utils/tools/__init__.py b/api/api/src/usecases/utils/tools/__init__.py new file mode 100755 index 00000000..f9a2f7c8 --- /dev/null +++ b/api/api/src/usecases/utils/tools/__init__.py @@ -0,0 +1,6 @@ +""" +Tools module for eotdl package. +""" + +from .stac import * +from .geo_utils import * diff --git a/api/api/src/usecases/utils/tools/geo_utils.py b/api/api/src/usecases/utils/tools/geo_utils.py new file mode 100755 index 00000000..4197a4f1 --- /dev/null +++ b/api/api/src/usecases/utils/tools/geo_utils.py @@ -0,0 +1,246 @@ +""" +Geo Utils +""" + +import tarfile +from typing import Union +from statistics import mean + +import geopandas as gpd +import rasterio +import rasterio.warp + +from shapely import geometry +from shapely.geometry import box, Polygon, shape +from pyproj import Transformer + +# from sentinelhub import BBox, CRS, bbox_to_dimensions +from pandas import isna + + +def is_bounding_box(bbox: list) -> bool: + """ + Check if the given bounding box is a bounding box and is valid + """ + if not isinstance(bbox, (list, tuple)) or len(bbox) != 4: + return False + + for value in bbox: + if not isinstance(value, (int, float)): + return False + + minx, miny, maxx, maxy = bbox + if minx >= maxx or miny >= maxy: + return False + + return True + + +# def compute_image_size(bounding_box, parameters): +# """ +# Compute the image size from the bounding box and the resolution +# """ +# bbox = BBox(bbox=bounding_box, crs=CRS.WGS84) +# bbox_size = bbox_to_dimensions(bbox, resolution=parameters.RESOLUTION) + +# return bbox, bbox_size + + +def get_image_bbox(raster: Union[tarfile.ExFileObject, str]): + """ + Get the bounding box of a raster + """ + with rasterio.open(raster) as src: + bounds = src.bounds + dst_crs = "EPSG:4326" + left, bottom, right, top = rasterio.warp.transform_bounds( + src.crs, dst_crs, *bounds + ) + bbox = [left, bottom, right, top] + return bbox + + +def get_image_resolution(raster: Union[tarfile.ExFileObject, str]): + """ + Get the resolution of a raster + """ + with rasterio.open(raster) as src: + resolution = src.res + return resolution + + +def bbox_to_coordinates(bounding_box: list) -> list: + """ + Convert a bounding box to a list of polygon coordinates + """ + polygon_coordinates = [ + (bounding_box[0], bounding_box[1]), # bottom left + (bounding_box[0], bounding_box[3]), # top left + (bounding_box[2], bounding_box[3]), # top right + (bounding_box[2], bounding_box[1]), # bottom right + (bounding_box[0], bounding_box[1]), # back to bottom left + ] + + return polygon_coordinates + + +def bbox_to_polygon(bounding_box: list) -> Polygon: + """ + Convert a bounding box to a shapely polygon + """ + polygon = box(bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[3]) + + return polygon + + +from_4326_transformer = Transformer.from_crs("EPSG:4326", "EPSG:3857") +from_3857_transformer = Transformer.from_crs("EPSG:3857", "EPSG:4326") + + +def bbox_from_centroid( + x: Union[int, float], + y: Union[int, float], + pixel_size: Union[int, float], + width: Union[int, float], + height: Union[int, float], +) -> list: + """ + Generate a bounding box from a centroid, pixel size and image dimensions. + + Params + ------ + x: int or float + x coordinate of the centroid + y: int or float + y coordinate of the centroid + pixel_size: int or float + pixel size in meters + width: int or float + width of the image in pixels + height: int or float + height of the image in pixels + + Returns + ------- + bounding_box: list + list with the bounding box coordinates + """ + width_m = width * pixel_size + heigth_m = height * pixel_size + + # Transform the centroid coordinates to meters + centroid_m = from_4326_transformer.transform(x, y) + + # Calculate the bounding box coordinates + min_x = centroid_m[0] - width_m / 2 + min_y = centroid_m[1] - heigth_m / 2 + max_x = centroid_m[0] + width_m / 2 + max_y = centroid_m[1] + heigth_m / 2 + + # Convert the bounding box coordinates back to degrees + min_x, min_y = from_3857_transformer.transform(min_x, min_y) + max_x, max_y = from_3857_transformer.transform(max_x, max_y) + + return [min_y, min_x, max_y, max_x] + + +def generate_bounding_box(geom: geometry.point.Point, differences: list) -> list: + """ + Generate the bounding box of a given point using the difference + between the maximum and mininum coordinates of the bounding box + + :param geom: shapely geometry object of the point which we want to + generate the bounding box. + :param differences: list with the difference between the maximum + and minimum longitude and latitude coordinates. + :return: list with the resulting bounding box from the computing. + """ + long_diff, lat_diff = differences[0], differences[1] + lon, lat = geom.x, geom.y + + bbox = ( + lon - (long_diff / 2), + lat - (lat_diff / 2), + lon + (long_diff / 2), + lat + (lat_diff / 2), + ) + + # Round the coordinates to 6 decimals + bounding_box = [round(i, 6) for i in bbox] + + return bounding_box + + +def calculate_average_coordinates_distance(bounding_box_by_location: dict) -> list: + """ + Calculate the mean distance between maximum and minixum longitude + and latitude of the bounding boxes from the existing locations. + This is intended to use these mean distance to generate the bounding + boxes of the new locations given a centroid. + + :param bounding_box_by_location: dictionary with format + location_id : bounding_box for the existing locations in + the sen12floods dataset. + :return mean_long_diff, mean_lat_diff: mean longitude + and latitude difference in the bounding boxes + """ + long_diff_list, lat_diff_list = [], [] + + for bbox in bounding_box_by_location.values(): + long_diff = bbox[2] - bbox[0] + long_diff_list.append(long_diff) + lat_diff = bbox[3] - bbox[1] + lat_diff_list.append(lat_diff) + + mean_long_diff = mean(long_diff_list) + mean_lat_diff = mean(lat_diff_list) + + return mean_long_diff, mean_lat_diff + + +def generate_new_locations_bounding_boxes( + gdf: gpd.GeoDataFrame, mean_differences: list, latest_id: int +) -> dict: + """ + Generate the bounding box of every new location, using + the mean difference between the maximum and minimum calculated + longitude and latitude. This function also returns the time + interval which we want to request from Sentinel Hub Services. + + :param gdf: GeoDataFrame wiht the new locations that + are going to be added to the dataset + :param mean_differences: list with the longitude + and latitude mean differences, which are going to be used to generate + the bounding boxes. + :return: bbox_by_new_location: dict with format {: + {'bounding_box': list(), 'time_interval': list()}, ... } + that contains the bounding box and time interval of the imagery for each location + """ + bbox_by_new_location = {} + + for _, row in gdf.iterrows(): + new_location_id = str(latest_id + 1) + time_interval = row["Began"].strftime("%Y-%m-%d"), row["Ended"].strftime( + "%Y-%m-%d" + ) + bbox = generate_bounding_box(row["geometry"], mean_differences) + bbox_by_new_location[new_location_id] = { + "bounding_box": bbox, + "time_interval": time_interval, + } + latest_id += 1 + + return bbox_by_new_location + + +def convert_df_geom_to_shape(row): + """ + Convert the geometry of a dataframe row to a shapely shape + """ + if not isna(row["geometry"]): + geo = shape(row["geometry"]) + wkt = geo.wkt + else: + wkt = "POLYGON EMPTY" + + return wkt diff --git a/api/api/src/usecases/utils/tools/stac.py b/api/api/src/usecases/utils/tools/stac.py new file mode 100755 index 00000000..bfb087d7 --- /dev/null +++ b/api/api/src/usecases/utils/tools/stac.py @@ -0,0 +1,172 @@ +""" +Module for data engineering with STAC elements +""" + +from os.path import dirname, join, abspath +from os import makedirs +from json import dumps +from typing import Union, Optional +from shutil import rmtree +from traceback import print_exc + +import geopandas as gpd +import pystac + +from tqdm import tqdm + + +def stac_items_to_gdf(items: pystac.ItemCollection) -> gpd.GeoDataFrame: + """ + Get a GeoDataFrame from a given pystac.ItemCollection. + + :param: items: A pystac.ItemCollection + :return: GeoDataframe from the given ItemCollection + """ + _features = [i.to_dict() for i in items] + + # Get a new ItemCollection by removing duplicate items, if they exist + features = [] + for f in _features: + if f not in features: + # Add all the keys in the properties dict as columns in the GeoDataFrame + for k, v in f.items(): + if k not in f["properties"] and k != "geometry": + f["properties"][k] = v + if "scene_id" in f["properties"]: + f["properties"]["scene_id"] = f["id"].split("_")[3] + features.append(f) + + return gpd.GeoDataFrame.from_features(features) + + +def get_all_children(obj: pystac.STACObject) -> list: + """ + Get all the children of a STAC object + """ + children = [] + # Append the current object to the list + children.append(obj.to_dict()) + + # Collections + collections = list(obj.get_collections()) + for collection in collections: + children.append(collection.to_dict()) + + # Items + items = obj.get_items() + for item in items: + children.append(item.to_dict()) + + # Items from collections + for collection in collections: + items = collection.get_items() + for item in items: + children.append(item.to_dict()) + + return children + + +def make_links_relative_to_path( + path: str, + catalog: Union[pystac.Catalog, str], +) -> pystac.Catalog: + """ + Makes all asset HREFs and links in the STAC catalog relative to a given path + """ + if isinstance(catalog, str): + catalog = pystac.read_file(catalog) + path = abspath(path) + + # Create a temporary catalog in the destination path to set as root + future_path = join(path, "catalog.json") + makedirs(path, exist_ok=True) + with open(future_path, "w", encoding="utf-8") as f: + f.write(dumps(catalog.to_dict(), indent=4)) + temp_catalog = pystac.Catalog.from_file(future_path) + + catalog.set_root(temp_catalog) + catalog.make_all_asset_hrefs_absolute() + + for collection in catalog.get_children(): + # Create new collection + new_collection = collection.clone() + new_collection.set_self_href(join(path, collection.id, "collection.json")) + new_collection.set_root(catalog) + new_collection.set_parent(catalog) + # Remove old collection and add new one to catalog + catalog.remove_child(collection.id) + catalog.add_child(new_collection) + for item in collection.get_all_items(): + # Create new item from old collection and add it to the new collection + new_item = item.clone() + new_item.set_self_href( + join(path, collection.id, item.id, f"{item.id}.json") + ) + new_item.set_parent(collection) + new_item.set_root(catalog) + new_item.make_asset_hrefs_relative() + new_collection.add_item(new_item) + + catalog.make_all_asset_hrefs_relative() + + return catalog + + +def merge_stac_catalogs( + catalog_1: Union[pystac.Catalog, str], + catalog_2: Union[pystac.Catalog, str], + destination: Optional[str] = None, + keep_extensions: Optional[bool] = False, + catalog_type: Optional[pystac.CatalogType] = pystac.CatalogType.SELF_CONTAINED, +) -> None: + """ + Merge two STAC catalogs, keeping the properties, collection and items of both catalogs + """ + if isinstance(catalog_1, str): + catalog_1 = pystac.Catalog.from_file(catalog_1) + if isinstance(catalog_2, str): + catalog_2 = pystac.Catalog.from_file(catalog_2) + + for col1 in tqdm(catalog_1.get_children(), desc="Merging catalogs..."): + # Check if the collection exists in catalog_2 + col2 = catalog_2.get_child(col1.id) + if not col2: + # If it does not exist, add it + col1_ = col1.clone() + catalog_2.add_child(col1) + col2 = catalog_2.get_child(col1.id) + col2.clear_items() + for i in col1_.get_stac_objects(pystac.RelType.ITEM): + col2.add_item(i) + else: + # If it exists, merge the items + for item1 in col1.get_items(): + if col2.get_item(item1.id) is None: + col2.add_item(item1) + + if keep_extensions: + for ext in catalog_1.stac_extensions: + if ext not in catalog_2.stac_extensions: + catalog_2.stac_extensions.append(ext) + + for extra_field_name, extra_field_value in catalog_1.extra_fields.items(): + if extra_field_name not in catalog_2.extra_fields: + catalog_2.extra_fields[extra_field_name] = extra_field_value + + if destination: + make_links_relative_to_path(destination, catalog_2) + else: + destination = dirname(catalog_2.get_self_href()) + + # Save the merged catalog + try: + print("Validating and saving...") + catalog_2.validate() + rmtree( + destination + ) if not destination else None # Remove the old catalog and replace it with the new one + catalog_2.normalize_and_save(root_href=destination, catalog_type=catalog_type) + print("Success!") + except pystac.STACValidationError: + # Return full callback + print_exc() diff --git a/docker-compose.yml b/docker-compose.yml index 7766a273..a6e16a13 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,6 +41,7 @@ services: volumes: - ./api/api:/api - ./eotdl/eotdl:/api/eotdl + - ./kk:/tmp ports: - 8000:8000 command: uvicorn api.main:app --host 0.0.0.0 --reload \ No newline at end of file diff --git a/eotdl/eotdl/curation/stac/dataframe.py b/eotdl/eotdl/curation/stac/dataframe.py index c99fe83d..a31efaa2 100755 --- a/eotdl/eotdl/curation/stac/dataframe.py +++ b/eotdl/eotdl/curation/stac/dataframe.py @@ -157,9 +157,7 @@ def read_stac( """ if isinstance(stac_file, (str, Path)): stac_file = pystac.read_file(stac_file) # we assume this is always a catalog - print(stac_file) stac_file.make_all_asset_hrefs_absolute() - print("ie") children = get_all_children(stac_file) # Convert Dataframe to STACDataFrame diff --git a/eotdl/eotdl/datasets/download.py b/eotdl/eotdl/datasets/download.py index 77ff9886..76a5c067 100755 --- a/eotdl/eotdl/datasets/download.py +++ b/eotdl/eotdl/datasets/download.py @@ -60,11 +60,10 @@ def download_dataset( file_version, progress=True, ) - # if calculate_checksum(dst_path) != checksum: - # logger(f"Checksum for {file} does not match") - + if verbose: + logger("Generating README.md ...") + generate_metadata(download_path, dataset) else: - # raise NotImplementedError("Downloading a STAC dataset is not implemented") if verbose: logger("Downloading STAC metadata...") repo = DatasetsAPIRepo() @@ -92,11 +91,7 @@ def download_dataset( href, filename, f"{download_path}/assets", user ) else: - if verbose: - logger("To download assets, set assets=True or -a in the CLI.") - if verbose: - logger("Generating README.md ...") - generate_metadata(download_path, dataset) + logger("To download assets, set assets=True or -a in the CLI.") if verbose: logger("Done") return download_path diff --git a/tutorials/notebooks/03_q1_datasets.ipynb b/tutorials/notebooks/03_q1_datasets.ipynb index e1b447d1..58fcfd6c 100755 --- a/tutorials/notebooks/03_q1_datasets.ipynb +++ b/tutorials/notebooks/03_q1_datasets.ipynb @@ -29,15 +29,15 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ - "100%|██████████| 90.3M/90.3M [00:04<00:00, 22.8MiB/s]\n", - "100%|██████████| 2/2 [00:04<00:00, 2.30s/file]\n" + "100%|██████████| 90.3M/90.3M [00:04<00:00, 22.5MiB/s]\n", + "100%|██████████| 1/1 [00:04<00:00, 4.47s/file]\n" ] }, { @@ -46,7 +46,7 @@ "'data/EuroSAT-RGB/v1'" ] }, - "execution_count": 5, + "execution_count": 1, "metadata": {}, "output_type": "execute_result" } @@ -59,14 +59,14 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "EuroSAT-RGB.zip metadata.yml\n" + "EuroSAT-RGB.zip README.md\n" ] } ], @@ -76,7 +76,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 6, "metadata": {}, "outputs": [], "source": [ @@ -92,7 +92,7 @@ }, { "cell_type": "code", - "execution_count": 20, + "execution_count": 1, "metadata": {}, "outputs": [ { @@ -110,7 +110,7 @@ " 'Pasture']" ] }, - "execution_count": 20, + "execution_count": 1, "metadata": {}, "output_type": "execute_result" } @@ -133,7 +133,7 @@ }, { "cell_type": "code", - "execution_count": 21, + "execution_count": 8, "metadata": {}, "outputs": [], "source": [ @@ -156,7 +156,7 @@ }, { "cell_type": "code", - "execution_count": 22, + "execution_count": 9, "metadata": {}, "outputs": [ { @@ -254,7 +254,7 @@ "4 data/EuroSAT-RGB-small/source None None " ] }, - "execution_count": 22, + "execution_count": 9, "metadata": {}, "output_type": "execute_result" } @@ -279,7 +279,7 @@ }, { "cell_type": "code", - "execution_count": 23, + "execution_count": 10, "metadata": {}, "outputs": [ { @@ -310,7 +310,7 @@ "text": [ "/home/juan/miniconda3/envs/eotdl/lib/python3.8/site-packages/rasterio/__init__.py:304: NotGeoreferencedWarning: Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned.\n", " dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)\n", - "100%|██████████| 100/100 [00:00<00:00, 915.72it/s]" + "100%|██████████| 100/100 [00:00<00:00, 902.26it/s]" ] }, { @@ -343,7 +343,7 @@ }, { "cell_type": "code", - "execution_count": 24, + "execution_count": 11, "metadata": {}, "outputs": [ { @@ -357,7 +357,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "100it [00:00, 2342.89it/s]\n" + "100it [00:00, 2434.50it/s]\n" ] }, { @@ -389,7 +389,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 2, "metadata": {}, "outputs": [ { @@ -397,14 +397,14 @@ "output_type": "stream", "text": [ "Loading STAC catalog...\n", - "New version created, version: 2\n" + "New version created, version: 4\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ - "100%|██████████| 200/200 [00:26<00:00, 7.42it/s]\n" + "100%|██████████| 200/200 [00:27<00:00, 7.22it/s]\n" ] }, { @@ -431,21 +431,22 @@ }, { "cell_type": "code", - "execution_count": 20, + "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['EuroSAT-RGB',\n", - " 'EuroSAT',\n", + " 'EuroSAT-RGB-Q1',\n", + " 'EuroSAT-small',\n", + " 'EuroSAT-RGB-Q2',\n", " 'EuroSAT-RGB-STAC',\n", " 'EuroSAT-STAC',\n", - " 'EuroSAT-small',\n", - " 'EuroSAT-RGB-Q1']" + " 'EuroSAT']" ] }, - "execution_count": 20, + "execution_count": 3, "metadata": {}, "output_type": "execute_result" } @@ -635,4 +636,4 @@ }, "nbformat": 4, "nbformat_minor": 2 -} \ No newline at end of file +} diff --git a/tutorials/notebooks/04_q2_datasets.ipynb b/tutorials/notebooks/04_q2_datasets.ipynb index 351e7b0b..0eb59023 100755 --- a/tutorials/notebooks/04_q2_datasets.ipynb +++ b/tutorials/notebooks/04_q2_datasets.ipynb @@ -43,6 +43,7 @@ "['jaca_dataset_q2',\n", " 'sample_stacdataframe.csv',\n", " 'jaca_dataset',\n", + " 'jaca_dataset_structured',\n", " 'labels_scaneo',\n", " 'eurosat_rgb_stac',\n", " 'jaca_dataset_stac',\n", @@ -67,7 +68,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "metadata": {}, "outputs": [ { @@ -86,7 +87,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "100%|██████████| 80/80 [00:00<00:00, 4967.64it/s]\n" + "100%|██████████| 80/80 [00:00<00:00, 4701.54it/s]\n" ] }, { @@ -100,7 +101,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "100%|██████████| 10/10 [00:00<00:00, 3880.02it/s]\n" + "100%|██████████| 10/10 [00:00<00:00, 4386.89it/s]\n" ] }, { @@ -114,7 +115,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "100%|██████████| 10/10 [00:00<00:00, 5876.84it/s]" + "100%|██████████| 10/10 [00:00<00:00, 5307.90it/s]" ] }, { @@ -143,11 +144,11 @@ "source": [ "from eotdl.curation.stac.extensions import add_ml_extension\n", "\n", - "catalog = 'example_data/EuroSAT-RGB-small-STAC/catalog.json'\n", + "catalog = 'data/EuroSAT-RGB-small-STAC/catalog.json'\n", "\n", "add_ml_extension(\n", "\tcatalog,\n", - "\tdestination='data/EuroSAT-RGB-Q2',\n", + "\tdestination='data/EuroSAT-RGB-small-STAC-ML',\n", "\tsplits=True,\n", "\tsplits_collection_id=\"labels\",\n", "\tname='EuroSAT Q2 Dataset',\n", @@ -167,15 +168,15 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ - "Looking for spatial duplicates...: 400it [00:00, 6256.56it/s]\n", - "Calculating classes balance...: 400it [00:00, 196110.06it/s]" + "Looking for spatial duplicates...: 400it [00:00, 2802.51it/s]\n", + "Calculating classes balance...: 400it [00:00, 188847.55it/s]\n" ] }, { @@ -185,13 +186,6 @@ "Validating and saving...\n", "Success!\n" ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "\n" - ] } ], "source": [ diff --git a/ui/src/routes/datasets/[name]/+page.svelte b/ui/src/routes/datasets/[name]/+page.svelte index b53e8da7..ae0c6922 100755 --- a/ui/src/routes/datasets/[name]/+page.svelte +++ b/ui/src/routes/datasets/[name]/+page.svelte @@ -107,14 +107,14 @@

No description.

{/if} - + {/if}

Download the dataset with the CLI: