Skip to content

Commit

Permalink
STAC models ingestion and download
Browse files Browse the repository at this point in the history
  • Loading branch information
juansensio committed Jun 13, 2024
1 parent 2c783a5 commit 8081e90
Show file tree
Hide file tree
Showing 18 changed files with 877 additions and 297 deletions.
25 changes: 4 additions & 21 deletions api/api/routers/datasets/download_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

from ..auth import get_current_user
from ...src.models import User
from ...src.usecases.datasets import download_dataset_file, download_stac_catalog, generate_presigned_url
from ...src.usecases.datasets import (
download_dataset_file,
download_stac_catalog,
)
from .responses import download_dataset_responses as responses

router = APIRouter()
Expand Down Expand Up @@ -57,23 +60,3 @@ async def download_stac_Catalog(
except Exception as e:
logger.exception("datasets:download")
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


# @router.get(
# "/{dataset_id}/url/{filename:path}",
# summary="Retrieve a presigend get url",
# responses=responses,
# )
# async def generate_a_presigned_url(
# dataset_id: str = Path(..., description="ID of the dataset to download"),
# filename: str = Path(
# ..., description="Filename or path to the file to download from the dataset"
# ), # podría ser un path... a/b/c/file.txt
# version: int = Query(None, description="Version of the dataset to download"),
# user: User = Depends(get_current_user),
# ):
# try:
# return generate_presigned_url(dataset_id, filename, version)
# except Exception as e:
# logger.exception("datasets:download")
# raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
31 changes: 27 additions & 4 deletions api/api/routers/models/create_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from ..auth import get_current_user
from ...src.models import User
from ...src.usecases.models import create_model, create_model_version
from ...src.usecases.models import create_model, create_model_version, create_stac_model
from .responses import create_model_responses, version_model_responses

router = APIRouter()
Expand Down Expand Up @@ -42,9 +42,15 @@ def create(
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


@router.post("/version/{model_id}", summary="Get the version of a model", responses=version_model_responses)
def version_model(model_id: str = Path(..., description="The ID of the model"),
user: User = Depends(get_current_user)):
@router.post(
"/version/{model_id}",
summary="Get the version of a model",
responses=version_model_responses,
)
def version_model(
model_id: str = Path(..., description="The ID of the model"),
user: User = Depends(get_current_user),
):
"""
Get the version of a model.
"""
Expand All @@ -54,3 +60,20 @@ def version_model(model_id: str = Path(..., description="The ID of the model"),
except Exception as e:
logger.exception("models:version")
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


class CreateSTACModelBody(BaseModel):
name: str


@router.post("/stac", summary="Create a new stac model")
def create_stac(
body: CreateSTACModelBody,
user: User = Depends(get_current_user),
):
try:
model_id = create_stac_model(user, body.name)
return {"model_id": model_id}
except Exception as e:
logger.exception("datasets:ingest")
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
62 changes: 34 additions & 28 deletions api/api/routers/models/download_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,55 @@

from ..auth import get_current_user
from ...src.models import User
from ...src.usecases.models import download_model_file
from ...src.usecases.models import download_model_file, download_stac_catalog
from .responses import download_model_responses

router = APIRouter()
logger = logging.getLogger(__name__)


@router.get("/{model_id}/download/{filename:path}", summary="Download a model", responses=download_model_responses)
@router.get(
"/{model_id}/download/{filename:path}",
summary="Download a model",
responses=download_model_responses,
)
async def download_model(
model_id: str = Path(..., description="ID of the model to download"),
filename: str = Path(..., description="Filename or path to the file to download from the model"), # podría ser un path... a/b/c/file.txt
filename: str = Path(
..., description="Filename or path to the file to download from the model"
), # podría ser un path... a/b/c/file.txt
version: int = Query(None, description="Version of the model to download"),
user: User = Depends(get_current_user),
):
"""
Download an entire model or a specific model file from the EOTDL.
"""
# try:
data_stream, object_info, _filename = download_model_file(
model_id, filename, user, version
)
response_headers = {
"Content-Disposition": f'attachment; filename="{filename}"',
"Content-Type": object_info.content_type,
"Content-Length": str(object_info.size),
}
return StreamingResponse(
data_stream(model_id, _filename),
headers=response_headers,
media_type=object_info.content_type,
)
# except Exception as e:
# logger.exception("models:download")
# raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


@router.get("/{model_id}/download")
async def download_stac_Catalog(
model_id: str,
user: User = Depends(get_current_user),
):
try:
data_stream, object_info, _filename = download_model_file(
model_id, filename, user, version
)
response_headers = {
"Content-Disposition": f'attachment; filename="{filename}"',
"Content-Type": object_info.content_type,
"Content-Length": str(object_info.size),
}
return StreamingResponse(
data_stream(model_id, _filename),
headers=response_headers,
media_type=object_info.content_type,
)
return download_stac_catalog(model_id, user)
except Exception as e:
logger.exception("models:download")
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


# @router.get("/{model_id}/download")
# async def download_stac_Catalog(
# model_id: str,
# user: User = Depends(get_current_user),
# ):
# try:
# return download_stac_catalog(model_id, user)
# except Exception as e:
# logger.exception("models:download")
# raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
114 changes: 61 additions & 53 deletions api/api/routers/models/ingest_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,55 @@
from fastapi import APIRouter, status, Depends, File, Form, UploadFile, Path, Query
import logging
from typing import List
from pydantic import BaseModel

from ..auth import get_current_user
from ...src.models import User
from ...src.usecases.models import (
ingest_model_files_batch,
add_files_batch_to_model_version,
ingest_stac,
ingest_model_file,
)
from .responses import ingest_files_responses

router = APIRouter()
logger = logging.getLogger(__name__)


@router.post(
"/{model_id}",
summary="Ingest file to a model",
responses=ingest_files_responses,
)
async def ingest_files(
model_id: str = Path(..., description="ID of the model"),
version: int = Query(None, description="Version of the dataset"),
file: UploadFile = File(..., description="file to ingest"),
checksum: str = Form(
...,
description="checksum of the file to ingest, calculated with SHA-1",
),
user: User = Depends(get_current_user),
):
"""
Batch ingest of files to an existing dataset. The batch file must be a compressed file (.zip).
The checksums are calculated using the SHA-1 checksums algorithm.
"""
try:
model_id, model_name, filename = await ingest_model_file(
file, model_id, checksum, user, version
)
return {
"model_id": model_id,
"model_name": model_name,
"filename": filename,
}
except Exception as e:
logger.exception("datasets:ingest")
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


@router.post(
"/{model_id}/batch",
summary="Batch ingest files to a model",
Expand All @@ -36,18 +72,18 @@ async def ingest_files_batch(
Batch ingest of files to an existing model. The batch file must be a compressed file (.zip).
The checksums are calculated using the SHA-1 checksums algorithm.
"""
# try:
model_id, model_name, filenames = await ingest_model_files_batch(
batch, model_id, checksums, user, version
)
return {
"model_id": model_id,
"model_name": model_name,
"filenames": filenames,
}
# except Exception as e:
# logger.exception("models:ingest")
# raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
try:
model_id, model_name, filenames = await ingest_model_files_batch(
batch, model_id, checksums, user, version
)
return {
"model_id": model_id,
"model_name": model_name,
"filenames": filenames,
}
except Exception as e:
logger.exception("models:ingest")
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


@router.post(
Expand Down Expand Up @@ -81,46 +117,18 @@ def ingest_existing_file(
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


# except Exception as e:
# logger.exception("models:ingest")
# raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
class IngestSTACBody(BaseModel):
stac: dict # json as string


# class IngestSTACBody(BaseModel):
# stac: dict # json as string


# @router.put("/stac/{model_id}")
# async def ingest_stac_catalog(
# model_id: str,
# body: IngestSTACBody,
# user: User = Depends(get_current_user),
# ):
# try:
# return ingest_stac(body.stac, model_id, user)
# except Exception as e:
# logger.exception("models:ingest_url")
# raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))

# class IngestURLBody(BaseModel):
# url: str


# @router.post("/{model_id}/url")
# async def ingest_url(
# model_id: str,
# body: IngestURLBody,
# user: User = Depends(get_current_user),
# ):
# # try:
# model_id, model_name, file_name = await ingest_file_url(
# body.url, model_id, user
# )
# return {
# "model_id": model_id,
# "model_name": model_name,
# "file_name": file_name,
# }
# # except Exception as e:
# # logger.exception("models:ingest")
# # raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
@router.put("/stac/{model_id}")
def ingest_stac_catalog(
model_id: str,
body: IngestSTACBody,
user: User = Depends(get_current_user),
):
try:
return ingest_stac(body.stac, model_id, user)
except Exception as e:
logger.exception("datasets:ingest_url")
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
2 changes: 1 addition & 1 deletion api/api/src/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
from .files import File, Files, UploadingFile, Folder, UploadingFile
from .tag import Tag
from .usage import Usage, Limits
from .model import Model
from .model import Model, STACModel
from .verison import Version
18 changes: 18 additions & 0 deletions api/api/src/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,21 @@ def check_source_is_url(cls, source):
if not source.startswith("http") and not source.startswith("https"):
raise ValueError("source must be a valid url")
return source


class STACModel(BaseModel):
uid: str
id: str
name: str
description: str = ""
tags: List[str] = []
createdAt: datetime = Field(default_factory=datetime.now)
updatedAt: datetime = Field(default_factory=datetime.now)
likes: int = 0
downloads: int = 0
quality: int = 1
size: int = 0
catalog: dict = {}
items: dict = {}
versions: List[Version] = []
files: str
6 changes: 5 additions & 1 deletion api/api/src/usecases/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
add_files_batch_to_dataset_version,
ingest_stac,
) # ingest_file_url
from .download_dataset import download_dataset_file, download_stac_catalog, generate_presigned_url
from .download_dataset import (
download_dataset_file,
download_stac_catalog,
generate_presigned_url,
)
from .update_dataset import toggle_like_dataset, update_dataset

from .delete_dataset import delete_dataset
Expand Down
28 changes: 1 addition & 27 deletions api/api/src/usecases/datasets/ingest_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def ingest_stac(stac, dataset_id, user):
values = gpd.GeoDataFrame.from_features(stac["features"], crs="4326") # ???
# values.to_csv("/tmp/iepa.csv")
catalog = values[values["type"] == "Catalog"]
items = values.drop_duplicates(subset='geometry')
items = values.drop_duplicates(subset="geometry")
items = items[items["type"] == "Feature"]
# convert to geojson
items = json.loads(items.to_json())
Expand Down Expand Up @@ -140,29 +140,3 @@ def ingest_stac(stac, dataset_id, user):
dataset.updatedAt = datetime.now()
repo.update_dataset(dataset.id, dataset.model_dump())
return dataset


# async def ingest_dataset_file(file, dataset_id, version, parent, checksum, user):
# dataset = retrieve_owned_dataset(dataset_id, user.uid)
# versions = [v.version_id for v in dataset.versions]
# if not version in versions:
# raise DatasetVersionDoesNotExistError()
# filename, file_size = await ingest_file(
# file, version, parent, dataset_id, checksum, dataset.quality, dataset.files
# )
# version = [v for v in dataset.versions if v.version_id == version][0]
# version.size += file_size # for Q0+ will add, so put to 0 before if necessary
# dataset.updatedAt = datetime.now()
# dataset_db_repo = DatasetsDBRepo()
# dataset_db_repo.update_dataset(dataset.id, dataset.dict())
# return dataset.id, dataset.name, filename


def ingest_file_url():
# TODO
return
# def get_file_name(self, file):
# return file.split("/")[-1]

# def persist_file(self, file, dataset_id, filename):
# return os_repo.persist_file_url(file, dataset_id, filename)
Loading

0 comments on commit 8081e90

Please sign in to comment.