-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GTC-3081: Add political/id-lookup endpoint #616
Changes from 44 commits
b94f3fa
505cc16
0514d42
9d2ae13
44a253f
9c2e0ca
899e772
d633596
7f5e9f3
2f2facd
15de81a
40f7772
2cee550
e07c4f4
bb69f18
1b95ca2
79ae7c3
09e628e
2934fd6
e09cf01
2d979a2
0c6d541
ed5f2cd
34c41f8
1952fc0
c6384fd
4aef63c
0f80b9e
b23bf7f
275ff6e
53ddba5
3bc92f5
fbacd70
9c5fc87
849d68a
4668790
68f6590
d35bb02
0bdb3e5
a9cf2df
83aa0a0
0effc5b
84bf869
af3c244
498ecc6
d9e0cda
55e1a2f
ffe9b6b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
from typing import List, Optional | ||
|
||
from fastapi.params import Query | ||
from pydantic import Field, root_validator | ||
|
||
from app.models.pydantic.base import StrictBaseModel | ||
from app.models.pydantic.responses import Response | ||
from app.settings.globals import ENV, per_env_admin_boundary_versions | ||
|
||
|
||
class GeoencoderQueryParams(StrictBaseModel): | ||
admin_source: str = Field( | ||
"GADM", | ||
description=( | ||
"The source of administrative boundaries to use " | ||
"(currently the only valid choice is 'GADM')." | ||
), | ||
) | ||
admin_version: str = Query( | ||
..., | ||
description=( | ||
"The version of the administrative boundaries to use " | ||
"(note that this represents the release of the source dataset, " | ||
"not the GFW Data API's idea of the version in the database)." | ||
), | ||
) | ||
country: str = Query( | ||
..., | ||
description="Name of the country to match.", | ||
) | ||
region: Optional[str] = Query( | ||
None, | ||
description="Name of the region to match.", | ||
) | ||
subregion: Optional[str] = Query( | ||
None, | ||
description="Name of the subregion to match.", | ||
) | ||
normalize_search: bool = Query( | ||
True, | ||
description=( | ||
"Whether or not to perform a case- and " "accent-insensitive search." | ||
), | ||
) | ||
|
||
@root_validator(pre=True) | ||
def validate_params(cls, values): | ||
source = values.get("admin_source") | ||
if source is None: | ||
raise ValueError( | ||
"You must provide admin_source or leave unset for the " | ||
" default value of 'GADM'." | ||
) | ||
|
||
version = values.get("admin_version") | ||
if version is None: | ||
raise ValueError("You must provide an admin_version") | ||
|
||
sources_in_this_env = per_env_admin_boundary_versions[ENV] | ||
|
||
versions_of_source_in_this_env = sources_in_this_env.get(source) | ||
if versions_of_source_in_this_env is None: | ||
raise ValueError( | ||
f"Invalid administrative boundary source {source}. Valid " | ||
f"sources in this environment are {[v for v in sources_in_this_env.keys()]}" | ||
) | ||
|
||
deployed_version_in_data_api = versions_of_source_in_this_env.get(version) | ||
if deployed_version_in_data_api is None: | ||
raise ValueError( | ||
f"Invalid version {version} for administrative boundary source " | ||
f"{source}. Valid versions for this source in this environment are " | ||
f"{[v for v in versions_of_source_in_this_env.keys()]}" | ||
) | ||
|
||
return values | ||
|
||
|
||
class GeoencoderMatchElement(StrictBaseModel): | ||
id: str | None | ||
name: str | None | ||
|
||
|
||
class GeoencoderMatch(StrictBaseModel): | ||
country: GeoencoderMatchElement | ||
region: GeoencoderMatchElement | ||
subregion: GeoencoderMatchElement | ||
|
||
|
||
class GeoencoderResponseData(StrictBaseModel): | ||
adminSource: str | ||
adminVersion: str | ||
matches: List[GeoencoderMatch] | ||
|
||
|
||
class GeoencoderResponse(Response): | ||
data: GeoencoderResponseData |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
from typing import Annotated, Any, Dict, List | ||
|
||
from fastapi import APIRouter, HTTPException, Query | ||
from unidecode import unidecode | ||
|
||
from app.models.pydantic.geoencoder import GeoencoderQueryParams, GeoencoderResponse | ||
from app.routes.datasets.queries import _query_dataset_json | ||
from app.settings.globals import ENV, per_env_admin_boundary_versions | ||
|
||
router = APIRouter() | ||
|
||
|
||
@router.get("/id-lookup", status_code=200, include_in_schema=False) | ||
async def id_lookup(params: Annotated[GeoencoderQueryParams, Query()]): | ||
"""Look up administrative boundary IDs matching a specified country name | ||
(and region name and subregion name, if specified).""" | ||
admin_source_to_dataset: Dict[str, str] = {"GADM": "gadm_administrative_boundaries"} | ||
|
||
try: | ||
dataset: str = admin_source_to_dataset[params.admin_source] | ||
except KeyError: | ||
raise HTTPException( | ||
status_code=400, | ||
detail=( | ||
"Invalid admin boundary source. Valid sources:" | ||
f" {[source for source in admin_source_to_dataset.keys()]}" | ||
), | ||
) | ||
|
||
version_str: str = lookup_admin_source_version( | ||
params.admin_source, params.admin_version | ||
) | ||
|
||
names: List[str | None] = sanitize_names( | ||
params.normalize_search, params.country, params.region, params.subregion | ||
) | ||
|
||
adm_level: int = determine_admin_level(*names) | ||
|
||
sql: str = _admin_boundary_lookup_sql( | ||
adm_level, params.normalize_search, dataset, *names | ||
) | ||
|
||
json_data: List[Dict[str, Any]] = await _query_dataset_json( | ||
dataset, version_str, sql, None | ||
) | ||
|
||
return form_geoencoder_response( | ||
params.admin_source, params.admin_version, adm_level, json_data | ||
) | ||
|
||
|
||
def sanitize_names( | ||
normalize_search: bool, | ||
country: str | None, | ||
region: str | None, | ||
subregion: str | None, | ||
) -> List[str | None]: | ||
"""Turn any empty strings into Nones, enforces the admin level hierarchy, | ||
and optionally unaccents and decapitalizes names.""" | ||
names: List[str | None] = [] | ||
|
||
if subregion and not region: | ||
raise HTTPException( | ||
status_code=400, | ||
detail="If subregion is specified, region must be specified as well.", | ||
) | ||
|
||
for name in (country, region, subregion): | ||
if name and normalize_search: | ||
names.append(unidecode(name).lower()) | ||
elif name: | ||
names.append(name) | ||
else: | ||
names.append(None) | ||
return names | ||
|
||
|
||
def determine_admin_level( | ||
country: str | None, region: str | None, subregion: str | None | ||
) -> int: | ||
"""Infer the native admin level of a request based on the presence of non- | ||
empty fields.""" | ||
if subregion: | ||
return 2 | ||
elif region: | ||
return 1 | ||
elif country: | ||
return 0 | ||
else: # Shouldn't get here if FastAPI route definition worked | ||
raise HTTPException(status_code=400, detail="Country MUST be specified.") | ||
|
||
|
||
def _admin_boundary_lookup_sql( | ||
adm_level: int, | ||
normalize_search: bool, | ||
dataset: str, | ||
country_name: str, | ||
region_name: str | None, | ||
subregion_name: str | None, | ||
) -> str: | ||
"""Generate the SQL required to look up administrative boundary IDs by | ||
name.""" | ||
name_fields: List[str] = ["country", "name_1", "name_2"] | ||
if normalize_search: | ||
match_name_fields = [name_field + "_normalized" for name_field in name_fields] | ||
else: | ||
match_name_fields = name_fields | ||
|
||
sql = ( | ||
f"SELECT gid_0, gid_1, gid_2, {name_fields[0]}, {name_fields[1]}, {name_fields[2]}" | ||
f" FROM {dataset} WHERE {match_name_fields[0]}=$country${country_name}$country$" | ||
) | ||
if region_name is not None: | ||
sql += f" AND {match_name_fields[1]}=$region${region_name}$region$" | ||
if subregion_name is not None: | ||
sql += f" AND {match_name_fields[2]}=$subregion${subregion_name}$subregion$" | ||
|
||
sql += f" AND adm_level='{adm_level}'" | ||
|
||
return sql | ||
|
||
|
||
def lookup_admin_source_version(source, version) -> str: | ||
# The GeoencoderQueryParams validator should have already ensured | ||
# that the following is safe | ||
deployed_version_in_data_api = per_env_admin_boundary_versions[ENV][source][version] | ||
|
||
return deployed_version_in_data_api | ||
|
||
|
||
def form_geoencoder_response( | ||
admin_source, admin_version, adm_level, match_list | ||
) -> GeoencoderResponse: | ||
matches = [] | ||
|
||
for match in match_list: | ||
country = {"id": extract_level_gid(0, match), "name": match["country"]} | ||
|
||
if adm_level < 1: | ||
region = {"id": None, "name": None} | ||
else: | ||
region = {"id": extract_level_gid(1, match), "name": match["name_1"]} | ||
|
||
if adm_level < 2: | ||
subregion = {"id": None, "name": None} | ||
else: | ||
subregion = {"id": extract_level_gid(2, match), "name": match["name_2"]} | ||
|
||
matches.append({"country": country, "region": region, "subregion": subregion}) | ||
|
||
data = { | ||
"adminSource": admin_source, | ||
"adminVersion": admin_version, | ||
"matches": matches, | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit (optional): I guess this code works in Pydantic, but the types would be clearer if 'data' was a AdminIDLookupResponseData, which I guess you could do via: data = AdminIDLookupResponseData(**{ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, will implement, thanks! |
||
resp = GeoencoderResponse(**{"data": data}) | ||
return resp | ||
|
||
|
||
def extract_level_gid(gid_level, match): | ||
gid_level_name = f"gid_{gid_level}" | ||
return (match[gid_level_name].rsplit("_")[0]).split(".")[gid_level] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about
normalize_names
? Sanitize comes with a different connotation about what this method does (for me, anyway).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roger, will change, thanks!