Skip to content

Commit

Permalink
automatic metric computation for Q2
Browse files Browse the repository at this point in the history
  • Loading branch information
earthpulse committed Mar 28, 2024
1 parent c03c56c commit 388dc5a
Show file tree
Hide file tree
Showing 17 changed files with 1,179 additions and 130 deletions.
2 changes: 2 additions & 0 deletions api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,7 @@ RUN pip install stac_validator
RUN pip install geopandas
RUN pip install pyarrow
RUN pip install starlette_exporter
RUN pip install geomet
RUN pip install rasterio

COPY ./api /api
62 changes: 1 addition & 61 deletions api/api/routers/datasets/ingest_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,64 +133,4 @@ def ingest_stac_catalog(
return ingest_stac(body.stac, dataset_id, user)
except Exception as e:
logger.exception("datasets:ingest_url")
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


# @router.post("/{dataset_id}")
# async def ingest(
# dataset_id: str,
# file: Optional[UploadFile] = File(None),
# version: int = Form(), # debería quitarlo (un file solo se puede subir a la última versión si no está ya)
# parent: str = Form(None), # debería quitarlo (sacarlo del nombre?)
# checksum: str = Form(None), # optional bc browser
# filename: str = Form(None),
# fileversion: int = Form(None),
# user: User = Depends(get_current_user),
# ):
# try:
# if filename:
# assert not file, "File provided as both file and filename"
# assert not parent, "Parent provided as both parent and filename"
# assert fileversion, "Fileversion not provided"
# dataset_id, dataset_name, file_name = await ingest_existing_dataset_file(
# filename, dataset_id, fileversion, version, checksum, user
# )
# else:
# if file.size > 1000000000: # 1GB
# raise Exception(
# "File too large, please use the CLI to upload large files."
# )
# dataset_id, dataset_name, file_name = await ingest_dataset_file(
# file, dataset_id, version, parent, checksum, user
# )
# return {
# "dataset_id": dataset_id,
# "dataset_name": dataset_name,
# "file_name": file_name,
# }
# except Exception as e:
# logger.exception("datasets:ingest")
# raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


# class IngestURLBody(BaseModel):
# url: str

# @router.post("/{dataset_id}/url")
# async def ingest_url(
# dataset_id: str,
# body: IngestURLBody,
# user: User = Depends(get_current_user),
# ):
# # try:
# dataset_id, dataset_name, file_name = await ingest_dataset_file_url(
# body.url, dataset_id, user
# )
# return {
# "dataset_id": dataset_id,
# "dataset_name": dataset_name,
# "file_name": file_name,
# }
# # except Exception as e:
# # logger.exception("datasets:ingest")
# # raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
7 changes: 0 additions & 7 deletions api/api/src/repos/geodb/GeoDBRepo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import geopandas as gpd
from shapely.geometry import Polygon
from .client import get_client
import json


class GeoDBRepo:
Expand All @@ -19,10 +17,6 @@ def create(self, collections):
return self.client.create_collections(collections, database=self.database)

def insert(self, collection, values):
values = gpd.GeoDataFrame.from_features(values["features"], crs="4326") # ???
catalog = values[values["type"] == "Catalog"]
assert len(catalog) == 1, "STAC catalog must have exactly one root catalog"
catalog = json.loads(catalog.to_json())["features"][0]["properties"]
values.rename(columns={"id": "stac_id"}, inplace=True)
# convert None geometry to empty geometry wkt
values.geometry = values.geometry.apply(lambda x: Polygon() if x is None else x)
Expand All @@ -33,7 +27,6 @@ def insert(self, collection, values):
self.client.insert_into_collection(
collection, database=self.database, values=values
)
return catalog

def retrieve(self, collection):
return self.client.get_collection(collection, database=self.database)
Expand Down
42 changes: 35 additions & 7 deletions api/api/src/usecases/datasets/ingest_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
import zipfile
import io
import os
import geopandas as gpd
import json
import shutil

from .retrieve_dataset import retrieve_owned_dataset
from ...errors import DatasetVersionDoesNotExistError
from ...repos import DatasetsDBRepo, GeoDBRepo
from ..files import ingest_file, ingest_existing_file
from ..user import retrieve_user_credentials
from .stac import MLDatasetQualityMetrics
from ..utils.stac import STACDataFrame


async def ingest_dataset_file(file, dataset_id, checksum, user, version):
Expand Down Expand Up @@ -90,18 +95,41 @@ def ingest_stac(stac, dataset_id, user):
# check if dataset exists
dataset = retrieve_owned_dataset(dataset_id, user.uid)
# TODO: check all assets exist in os
# generate catalog
values = gpd.GeoDataFrame.from_features(stac["features"], crs="4326") # ???
catalog = values[values["type"] == "Catalog"]
assert len(catalog) == 1, "STAC catalog must have exactly one root catalog"
catalog = json.loads(catalog.to_json())["features"][0]["properties"]
keys = list(catalog.keys())
dataset_quality = 1
# TODO: validate Q1 dataset with required fields/extensions (author, license)
# TODO: validate Q2 dataset, not only check name
if "ml-dataset:name" in keys:
dataset_quality = 2
# compute and report automatic qa metrics
# save stac locally
tmp_path = f"/tmp/{dataset_id}"
df = STACDataFrame(values)
df.to_stac(tmp_path)
# compute metrics
catalog_path = f"{tmp_path}/{dataset.name}/catalog.json"
MLDatasetQualityMetrics.calculate(catalog_path)
# overwrite catalog with computed metrics
df = STACDataFrame.from_stac_file(catalog_path)
catalog = df[df["type"] == "Catalog"]
# print("1", catalog)
catalog = json.loads(catalog.to_json())["features"][0]["properties"]
# print("2", catalog)
# delete tmp files
shutil.rmtree(tmp_path)
print("quality", dataset_quality)
# ingest to geodb
credentials = retrieve_user_credentials(user)
geodb_repo = GeoDBRepo(credentials)
catalog = geodb_repo.insert(dataset.id, stac)
geodb_repo.insert(dataset.id, values)
# the catalog should contain all the info we want to show in the UI
dataset.catalog = catalog
keys = list(catalog.keys())
if "ml-dataset:name" in keys:
dataset.quality = 2
# TODO: compute and report automatic qa metrics
# TODO: validate Q2 dataset, not only check name
# TODO: validate Q1 dataset with required fields/extensions (author, license)
dataset.quality = dataset_quality
repo = DatasetsDBRepo()
dataset.updatedAt = datetime.now()
repo.update_dataset(dataset.id, dataset.model_dump())
Expand Down
Loading

0 comments on commit 388dc5a

Please sign in to comment.