diff --git a/datacosmos/stac/__init__.py b/datacosmos/stac/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/datacosmos/stac/models/search_parameters.py b/datacosmos/stac/models/search_parameters.py new file mode 100644 index 0000000..189cda9 --- /dev/null +++ b/datacosmos/stac/models/search_parameters.py @@ -0,0 +1,53 @@ +from pydantic import BaseModel, Field, model_validator +from typing import Optional, Union + + +class SearchParameters(BaseModel): + """ + Encapsulates the parameters for the STAC search API with validation. + """ + + bbox: Optional[list[float]] = Field( + None, + description="Bounding box filter [minX, minY, maxX, maxY]. Optional six values for 3D bounding box.", + example=[-180.0, -90.0, 180.0, 90.0], + ) + datetime_range: Optional[str] = Field( + None, + alias="datetime", + description=( + "Temporal filter, either a single RFC 3339 datetime or an interval. " + 'Example: "2025-01-01T00:00:00Z/.."' + ), + ) + intersects: Optional[dict] = Field( + None, description="GeoJSON geometry filter, e.g., a Polygon or Point." + ) + ids: Optional[list[str]] = Field( + None, + description="Array of item IDs to filter by.", + example=["item1", "item2"], + ) + collections: Optional[list[str]] = Field( + None, + description="Array of collection IDs to filter by.", + example=["collection1", "collection2"], + ) + limit: Optional[int] = Field( + None, + ge=1, + le=10000, + description="Maximum number of items per page. Default: 10, Max: 10000.", + example=10, + ) + query: Optional[dict[str, dict[str, Union[str, int, float]]]] = Field( + None, + description="Additional property filters, e.g., { 'cloud_coverage': { 'lt': 10 } }.", + ) + + @model_validator(mode="before") + def validate_bbox(cls, values): + bbox = values.get("bbox") + if bbox and len(bbox) not in {4, 6}: + raise ValueError("bbox must contain 4 or 6 values.") + return values diff --git a/datacosmos/stac/stac_client.py b/datacosmos/stac/stac_client.py new file mode 100644 index 0000000..ef0d343 --- /dev/null +++ b/datacosmos/stac/stac_client.py @@ -0,0 +1,120 @@ +import pystac +from typing import Generator +from datacosmos.client import DatacosmosClient +from datacosmos.stac.models.search_parameters import SearchParameters + + +class STACClient: + """ + A client for interacting with the STAC API. + """ + + def __init__(self, client: DatacosmosClient): + """ + Initialize the STACClient with a DatacosmosClient. + + Args: + client (DatacosmosClient): The authenticated Datacosmos client instance. + """ + self.client = client + self.base_url = "https://test.app.open-cosmos.com/api/data/v0/stac" + + def search_items( + self, parameters: SearchParameters + ) -> Generator[pystac.Item, None, None]: + """ + Query the STAC catalog using the POST endpoint with flexible filters and pagination. + + Args: + parameters (SearchParameters): The search parameters. + + Yields: + pystac.Item: Parsed STAC item. + """ + url = f"{self.base_url}/search" + body = parameters.model_dump(by_alias=True, exclude_none=True) + return self._paginate_items(url, body) + + def fetch_item(self, item_id: str, collection_id: str) -> pystac.Item: + """ + Fetch a single STAC item by ID. + + Args: + item_id (str): The ID of the item to fetch. + collection_id (str): The ID of the collection containing the item. + + Returns: + pystac.Item: The fetched STAC item. + """ + url = f"{self.client.config.base_url}/collections/{collection_id}/items/{item_id}" + response = self.client.get(url) + response.raise_for_status() + return pystac.Item.from_dict(response.json()) + + def fetch_collection_items( + self, collection_id: str, parameters: SearchParameters = None + ) -> Generator[pystac.Item, None, None]: + """ + Fetch all items in a collection with pagination. + + Args: + collection_id (str): The ID of the collection. + parameters (SearchParameters): Optional additional parameters for the query. + + Yields: + pystac.Item: Parsed STAC item. + """ + if not parameters: + parameters = SearchParameters(collections=[collection_id]) + else: + parameters.collections = [collection_id] + + return self.search_items(parameters) + + def _paginate_items(self, url: str, body: dict) -> Generator[pystac.Item, None, None]: + """ + Handles pagination for the STAC search POST endpoint. + Fetches items one page at a time using the 'next' link. + + Args: + url (str): The base URL for the search endpoint. + body (dict): The request body containing search parameters. + + Yields: + pystac.Item: Parsed STAC item. + """ + params = {"limit": body.get("limit", 10)} # Default limit to 10 if not provided + + while True: + # Make the POST request + response = self.client.post(url, json=body, params=params) + response.raise_for_status() + data = response.json() + + # Process features (STAC items) + for feature in data.get("features", []): + yield pystac.Item.from_dict(feature) + + # Handle pagination via the 'next' link + next_link = next( + (link for link in data.get("links", []) if link.get("rel") == "next"), + None, + ) + if next_link: + next_href = next_link.get("href", "") + + # Validate the href + if not next_href: + self.client.logger.warning("Next link href is empty. Stopping pagination.") + break + + # Extract token from the href + try: + token = next_href.split("?")[1].split("=")[-1] + params["cursor"] = token + except IndexError: + self.client.logger.error(f"Failed to parse pagination token from {next_href}") + break + else: + break +