Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Fix: Allow Null Values for end_date Field in Dashboard Endpint in FastAPI #44043

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,16 @@ def depends(self, tag_name_pattern: str | None = None) -> _DagTagNamePatternSear
return self.set_value(tag_name_pattern)


def _safe_parse_datetime(date_to_check: str) -> datetime:
def _safe_parse_datetime(date_to_check: str) -> datetime | None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argument type need to be adjusted

"""
Parse datetime and raise error for invalid dates.
:param date_to_check: the string value to be parsed
:param date_to_check: The string value to be parsed
:return: The parsed datetime object with timezone information. Returns None if the input is None.
"""
if not date_to_check:
raise ValueError(f"{date_to_check} cannot be None.")
return None
try:
return timezone.parse(date_to_check, strict=True)
except (TypeError, ParserError):
Expand Down
14 changes: 10 additions & 4 deletions airflow/api_fastapi/core_api/routes/ui/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from typing import TYPE_CHECKING, Annotated

from fastapi import Depends, status
from fastapi import Depends, HTTPException, status
from sqlalchemy import func, select
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -49,12 +49,18 @@ def historical_metrics(
session: Annotated[Session, Depends(get_session)],
) -> HistoricalMetricDataResponse:
"""Return cluster activity historical metrics."""
if start_date is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DateTimeQuery need to be adjusted, because by definition, it cannot return str | None, only str:

DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]

Maybe create a OptionalDateTimeQuery = Annotated[str | None, AfterValidator(_safe_parse_datetime)], or maybe DateTimeQuery | None already works

raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="start_date parameter is required in the request",
)

Comment on lines +53 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be handled automatically by FastAPI. If the Query parameter is properly defined, we shouldn't allow None. No need to handle that manually here. (Also that would be a 422 and not a 400, because this is how fastapi handles validation errors)

# DagRuns
dag_run_types = session.execute(
select(DagRun.run_type, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= func.coalesce(end_date, timezone.utcnow()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timezone.utcnow() can be just defined once at the top of the function to avoid different values due to multiple calls.

)
.group_by(DagRun.run_type)
).all()
Expand All @@ -63,7 +69,7 @@ def historical_metrics(
select(DagRun.state, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= func.coalesce(end_date, timezone.utcnow()),
)
.group_by(DagRun.state)
).all()
Expand All @@ -74,7 +80,7 @@ def historical_metrics(
.join(TaskInstance.dag_run)
.where(
DagRun.start_date >= start_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= func.coalesce(end_date, timezone.utcnow()),
)
.group_by(TaskInstance.state)
).all()
Expand Down
126 changes: 82 additions & 44 deletions tests/api_fastapi/core_api/routes/ui/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,53 +100,91 @@ def make_dag_runs(dag_maker, session, time_machine):


class TestHistoricalMetricsDataEndpoint:
@pytest.mark.parametrize(
"params, expected",
[
(
{"start_date": "2023-01-01T00:00", "end_date": "2023-08-02T00:00"},
{
"dag_run_states": {"failed": 1, "queued": 0, "running": 1, "success": 1},
"dag_run_types": {"backfill": 0, "asset_triggered": 1, "manual": 0, "scheduled": 2},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 2,
"queued": 0,
"removed": 0,
"restarting": 0,
"running": 0,
"scheduled": 0,
"skipped": 0,
"success": 2,
"up_for_reschedule": 0,
"up_for_retry": 0,
"upstream_failed": 0,
},
},
),
(
{"start_date": "2023-02-02T00:00", "end_date": "2023-06-02T00:00"},
{
"dag_run_states": {"failed": 1, "queued": 0, "running": 0, "success": 0},
"dag_run_types": {"backfill": 0, "asset_triggered": 1, "manual": 0, "scheduled": 0},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 0,
"queued": 0,
"removed": 0,
"restarting": 0,
"running": 0,
"scheduled": 0,
"skipped": 0,
"success": 0,
"up_for_reschedule": 0,
"up_for_retry": 0,
"upstream_failed": 0,
},
},
),
(
{"start_date": "2023-02-02T00:00", "end_date": None},
{
"dag_run_states": {"failed": 1, "queued": 0, "running": 1, "success": 0},
"dag_run_types": {"backfill": 0, "asset_triggered": 1, "manual": 0, "scheduled": 1},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 2,
"queued": 0,
"removed": 0,
"restarting": 0,
"running": 0,
"scheduled": 0,
"skipped": 0,
"success": 0,
"up_for_reschedule": 0,
"up_for_retry": 0,
"upstream_failed": 0,
},
},
),
],
)
@pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
def test_historical_metrics_data(self, test_client, time_machine):
params = {"start_date": "2023-01-01T00:00", "end_date": "2023-08-02T00:00"}
def test_historical_metrics_data(self, test_client, params, expected):
response = test_client.get("/ui/dashboard/historical_metrics_data", params=params)

assert response.status_code == 200
assert response.json() == {
"dag_run_states": {"failed": 1, "queued": 0, "running": 1, "success": 1},
"dag_run_types": {"backfill": 0, "asset_triggered": 1, "manual": 0, "scheduled": 2},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 2,
"queued": 0,
"removed": 0,
"restarting": 0,
"running": 0,
"scheduled": 0,
"skipped": 0,
"success": 2,
"up_for_reschedule": 0,
"up_for_retry": 0,
"upstream_failed": 0,
},
}
assert response.json() == expected

@pytest.mark.parametrize(
"params",
[
{"start_date": None, "end_date": "2023-08-02T00:00"},
],
)
@pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
def test_historical_metrics_data_date_filters(self, test_client):
params = {"start_date": "2023-02-02T00:00", "end_date": "2023-06-02T00:00"}
def test_historical_metrics_data_start_date_none(self, test_client, params):
response = test_client.get("/ui/dashboard/historical_metrics_data", params=params)
assert response.status_code == 200
assert response.json() == {
"dag_run_states": {"failed": 1, "queued": 0, "running": 0, "success": 0},
"dag_run_types": {"backfill": 0, "asset_triggered": 1, "manual": 0, "scheduled": 0},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 0,
"queued": 0,
"removed": 0,
"restarting": 0,
"running": 0,
"scheduled": 0,
"skipped": 0,
"success": 0,
"up_for_reschedule": 0,
"up_for_retry": 0,
"upstream_failed": 0,
},
}
assert response.status_code == 400
assert response.json() == {"detail": "start_date parameter is required in the request"}