Skip to content

Commit

Permalink
Add first version of stac catalog query functionalities to datacosmos…
Browse files Browse the repository at this point in the history
… sdk
  • Loading branch information
TiagoOpenCosmos committed Jan 29, 2025
1 parent a7e17b5 commit 929c0b5
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 0 deletions.
Empty file added datacosmos/stac/__init__.py
Empty file.
53 changes: 53 additions & 0 deletions datacosmos/stac/models/search_parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from pydantic import BaseModel, Field, model_validator
from typing import Optional, Union


class SearchParameters(BaseModel):
"""
Encapsulates the parameters for the STAC search API with validation.
"""

bbox: Optional[list[float]] = Field(
None,
description="Bounding box filter [minX, minY, maxX, maxY]. Optional six values for 3D bounding box.",
example=[-180.0, -90.0, 180.0, 90.0],
)
datetime_range: Optional[str] = Field(
None,
alias="datetime",
description=(
"Temporal filter, either a single RFC 3339 datetime or an interval. "
'Example: "2025-01-01T00:00:00Z/.."'
),
)
intersects: Optional[dict] = Field(
None, description="GeoJSON geometry filter, e.g., a Polygon or Point."
)
ids: Optional[list[str]] = Field(
None,
description="Array of item IDs to filter by.",
example=["item1", "item2"],
)
collections: Optional[list[str]] = Field(
None,
description="Array of collection IDs to filter by.",
example=["collection1", "collection2"],
)
limit: Optional[int] = Field(
None,
ge=1,
le=10000,
description="Maximum number of items per page. Default: 10, Max: 10000.",
example=10,
)
query: Optional[dict[str, dict[str, Union[str, int, float]]]] = Field(
None,
description="Additional property filters, e.g., { 'cloud_coverage': { 'lt': 10 } }.",
)

@model_validator(mode="before")
def validate_bbox(cls, values):
bbox = values.get("bbox")
if bbox and len(bbox) not in {4, 6}:
raise ValueError("bbox must contain 4 or 6 values.")
return values
120 changes: 120 additions & 0 deletions datacosmos/stac/stac_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import pystac
from typing import Generator
from datacosmos.client import DatacosmosClient
from datacosmos.stac.models.search_parameters import SearchParameters


class STACClient:
"""
A client for interacting with the STAC API.
"""

def __init__(self, client: DatacosmosClient):
"""
Initialize the STACClient with a DatacosmosClient.
Args:
client (DatacosmosClient): The authenticated Datacosmos client instance.
"""
self.client = client
self.base_url = "https://test.app.open-cosmos.com/api/data/v0/stac"

def search_items(
self, parameters: SearchParameters
) -> Generator[pystac.Item, None, None]:
"""
Query the STAC catalog using the POST endpoint with flexible filters and pagination.
Args:
parameters (SearchParameters): The search parameters.
Yields:
pystac.Item: Parsed STAC item.
"""
url = f"{self.base_url}/search"
body = parameters.model_dump(by_alias=True, exclude_none=True)
return self._paginate_items(url, body)

def fetch_item(self, item_id: str, collection_id: str) -> pystac.Item:
"""
Fetch a single STAC item by ID.
Args:
item_id (str): The ID of the item to fetch.
collection_id (str): The ID of the collection containing the item.
Returns:
pystac.Item: The fetched STAC item.
"""
url = f"{self.client.config.base_url}/collections/{collection_id}/items/{item_id}"
response = self.client.get(url)
response.raise_for_status()
return pystac.Item.from_dict(response.json())

def fetch_collection_items(
self, collection_id: str, parameters: SearchParameters = None
) -> Generator[pystac.Item, None, None]:
"""
Fetch all items in a collection with pagination.
Args:
collection_id (str): The ID of the collection.
parameters (SearchParameters): Optional additional parameters for the query.
Yields:
pystac.Item: Parsed STAC item.
"""
if not parameters:
parameters = SearchParameters(collections=[collection_id])
else:
parameters.collections = [collection_id]

return self.search_items(parameters)

def _paginate_items(self, url: str, body: dict) -> Generator[pystac.Item, None, None]:
"""
Handles pagination for the STAC search POST endpoint.
Fetches items one page at a time using the 'next' link.

Check failure on line 78 in datacosmos/stac/stac_client.py

View workflow job for this annotation

GitHub Actions / Flake8

datacosmos/stac/stac_client.py#L78

Cognitive complexity is too high (12 > 7) (CCR001)
Args:
url (str): The base URL for the search endpoint.
body (dict): The request body containing search parameters.
Yields:
pystac.Item: Parsed STAC item.
"""
params = {"limit": body.get("limit", 10)} # Default limit to 10 if not provided

while True:
# Make the POST request
response = self.client.post(url, json=body, params=params)
response.raise_for_status()
data = response.json()

# Process features (STAC items)
for feature in data.get("features", []):
yield pystac.Item.from_dict(feature)

# Handle pagination via the 'next' link
next_link = next(
(link for link in data.get("links", []) if link.get("rel") == "next"),
None,
)
if next_link:
next_href = next_link.get("href", "")

# Validate the href
if not next_href:
self.client.logger.warning("Next link href is empty. Stopping pagination.")
break

# Extract token from the href
try:
token = next_href.split("?")[1].split("=")[-1]
params["cursor"] = token
except IndexError:
self.client.logger.error(f"Failed to parse pagination token from {next_href}")
break
else:
break

0 comments on commit 929c0b5

Please sign in to comment.