Skip to content

Commit

Permalink
Use pydantic model constructors, minor refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
omkar-foss committed Oct 29, 2024
1 parent 833aee7 commit 56faf48
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
2 changes: 1 addition & 1 deletion airflow/api_fastapi/common/db/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def paginated_select(
offset: BaseParam | None = None,
limit: BaseParam | None = None,
session: Session = NEW_SESSION,
return_total_entries: bool = True
return_total_entries: bool = True,
) -> Select:
base_select = apply_filters_to_select(
base_select,
Expand Down
5 changes: 4 additions & 1 deletion airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:

# Common Safe DateTime
DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]

# DAG
QueryLimit = Annotated[_LimitFilter, Depends(_LimitFilter().depends)]
QueryOffset = Annotated[_OffsetFilter, Depends(_OffsetFilter().depends)]
Expand All @@ -320,8 +321,10 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:
]
QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter().depends)]
QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter().depends)]

# DagRun
QueryLastDagRunStateFilter = Annotated[_LastDagRunStateFilter, Depends(_LastDagRunStateFilter().depends)]
QueryDagIdsFilter = Annotated[_DagIdsFilter, Depends(_DagIdsFilter().depends)]

# DAGTags
QueryDagTagPatternSearch = Annotated[_DagTagNamePatternSearch, Depends(_DagTagNamePatternSearch().depends)]
QueryDagIdsFilter = Annotated[_DagIdsFilter, Depends(_DagIdsFilter().depends)]
20 changes: 11 additions & 9 deletions airflow/api_fastapi/core_api/routes/public/dag_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_stats import (
DagStatsCollectionResponse,
DagStatsResponse,
DagStatsStateResponse,
)
from airflow.utils.state import DagRunState

Expand All @@ -50,7 +52,7 @@ async def get_dag_stats(
base_select=dagruns_select_with_state_count,
filters=[dag_ids],
session=session,
return_total_entries=False
return_total_entries=False,
)
query_result = session.execute(dagruns_select)

Expand All @@ -62,16 +64,16 @@ async def get_dag_stats(
result_dag_ids.append(dag_id)

dags = [
{
"dag_id": dag_id,
"stats": [
{
"state": state,
"count": dag_state_data.get((dag_id, state), 0),
}
DagStatsResponse(
dag_id=dag_id,
stats=[
DagStatsStateResponse(
state=state,
count=dag_state_data.get((dag_id, state), 0),
)
for state in DagRunState
],
}
)
for dag_id in result_dag_ids
]
return DagStatsCollectionResponse(dags=dags, total_entries=len(dags))

0 comments on commit 56faf48

Please sign in to comment.