From 10f4a6f4dbc533c38e2d8b7c13e78e5238e3bc84 Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Wed, 2 Oct 2024 01:44:02 +0530 Subject: [PATCH 01/10] Migrate the public endpoint DAG Details to FastAPI --- .../api_connexion/endpoints/dag_endpoint.py | 1 + airflow/api_fastapi/openapi/v1-generated.yaml | 44 ++++++++++ airflow/api_fastapi/serializers/dags.py | 61 +++++++++++++- airflow/api_fastapi/views/public/dags.py | 40 ++++++++- tests/api_fastapi/views/public/test_dags.py | 83 +++++++++++++++++-- 5 files changed, 219 insertions(+), 10 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py index 5d10a97dedce..3d0d3dd8bfab 100644 --- a/airflow/api_connexion/endpoints/dag_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_endpoint.py @@ -70,6 +70,7 @@ def get_dag( ) +@mark_fastapi_migration_done @security.requires_access_dag("GET") @provide_session def get_dag_details( diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index a54e0e4ca57d..ab5dcdb10965 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -319,6 +319,50 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/details: + get: + tags: + - DAG + summary: Get DAG details. + description: Get details of a specific DAG. + operationId: get_dag_details_public_dags_get_details + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGDetailsResponse' + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/connections/{connection_id}: delete: tags: diff --git a/airflow/api_fastapi/serializers/dags.py b/airflow/api_fastapi/serializers/dags.py index 59b47bdef9e9..ddad914bb8ac 100644 --- a/airflow/api_fastapi/serializers/dags.py +++ b/airflow/api_fastapi/serializers/dags.py @@ -17,10 +17,12 @@ from __future__ import annotations -from datetime import datetime -from typing import Any +from collections import abc +from datetime import datetime, timedelta +from typing import Any, Iterable from itsdangerous import URLSafeSerializer +from pendulum import FixedTimezone, Timezone from pydantic import ( BaseModel, computed_field, @@ -28,6 +30,7 @@ ) from airflow.configuration import conf +from airflow.models.param import Param from airflow.serialization.pydantic.dag import DagTagPydantic @@ -93,3 +96,57 @@ class DAGCollectionResponse(BaseModel): dags: list[DAGResponse] total_entries: int + + +class DAGDetailsResponse(DAGResponse): + """Specific serializer for DAG Details responses.""" + + catchup: bool + dagrun_timeout: timedelta | None + dataset_expression: dict | None + doc_md: str | None + start_date: datetime | None + end_date: datetime | None + is_paused_upon_creation: bool | None + orientation: str + params: Any | None + render_template_as_native_obj: bool + template_searchpath: str | Iterable[str] | None + timezone: str | None + last_loaded: datetime | None + + @field_validator("timezone", mode="before") + @classmethod + def get_timezone(cls, tz: Timezone | FixedTimezone) -> str | None: + """Convert timezone attribute to string representation.""" + if tz is None: + return None + return str(tz) + + @field_validator("timetable_summary", mode="before") + @classmethod + def get_timetable_summary(cls, tts: str | None) -> str | None: + """Validate the string representation of timetable_summary.""" + if tts is None or tts == "None": + return None + return str(tts) + + @field_validator("params", mode="before") + @classmethod + def get_params(cls, params: abc.MutableMapping | None) -> dict | None: + """Convert params attribute to dict representation.""" + if params is None: + return None + return {param_name: param_val.serialize() for param_name, param_val in params.items()} + + @computed_field + @property + def concurrency(self) -> int: + """Return max_active_tasks as concurrency.""" + return self.max_active_tasks + + @computed_field + @property + def last_parsed(self) -> datetime | None: + """Return last_loaded as last_parsed.""" + return self.last_loaded diff --git a/airflow/api_fastapi/views/public/dags.py b/airflow/api_fastapi/views/public/dags.py index 3761d593d2fd..d01378ad515b 100644 --- a/airflow/api_fastapi/views/public/dags.py +++ b/airflow/api_fastapi/views/public/dags.py @@ -17,7 +17,8 @@ from __future__ import annotations -from fastapi import Depends, HTTPException, Query +from fastapi import Depends, HTTPException, Query, Request +from pydantic import ValidationError from sqlalchemy import update from sqlalchemy.orm import Session from typing_extensions import Annotated @@ -41,9 +42,14 @@ QueryTagsFilter, SortParam, ) -from airflow.api_fastapi.serializers.dags import DAGCollectionResponse, DAGPatchBody, DAGResponse +from airflow.api_fastapi.serializers.dags import ( + DAGCollectionResponse, + DAGDetailsResponse, + DAGPatchBody, + DAGResponse, +) from airflow.api_fastapi.views.router import AirflowRouter -from airflow.models import DagModel +from airflow.models import DAG, DagModel dags_router = AirflowRouter(tags=["DAG"]) @@ -87,6 +93,34 @@ async def get_dags( ) +@dags_router.get( + "/dags/{dag_id}/details", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]) +) +async def get_dag_details( + dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request +) -> DAGDetailsResponse: + """Get details of DAG.""" + if not session: + raise HTTPException(401, "Invalid session") + + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + if not dag: + raise HTTPException(404, f"Dag with id {dag_id} was not found") + + dag_model: DagModel = session.get(DagModel, dag_id) + if not dag_model: + raise HTTPException(400, f"Unable to obtain dag with id {dag_id} from session") + + for key, value in dag.__dict__.items(): + if not key.startswith("_") and not hasattr(dag_model, key): + setattr(dag_model, key, value) + + try: + return DAGDetailsResponse.model_validate(dag_model, from_attributes=True) + except ValidationError as ve: + raise HTTPException(422, f"Error while validating dag model with id {dag_id}, details: {str(ve)}") + + @dags_router.patch("/dags/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404])) async def patch_dag( dag_id: str, diff --git a/tests/api_fastapi/views/public/test_dags.py b/tests/api_fastapi/views/public/test_dags.py index 7b68ebe512a2..529ffc427552 100644 --- a/tests/api_fastapi/views/public/test_dags.py +++ b/tests/api_fastapi/views/public/test_dags.py @@ -18,6 +18,7 @@ from datetime import datetime, timezone +import pendulum import pytest from airflow.models.dag import DagModel @@ -34,8 +35,10 @@ DAG1_DISPLAY_NAME = "display1" DAG2_ID = "test_dag2" DAG2_DISPLAY_NAME = "display2" +DAG2_START_DATE = datetime(2021, 6, 15, tzinfo=timezone.utc) DAG3_ID = "test_dag3" TASK_ID = "op1" +UTC_JSON_REPR = "UTC" if pendulum.__version__.startswith("3") else "Timezone('UTC')" @provide_session @@ -96,11 +99,11 @@ def setup(dag_maker) -> None: DAG2_ID, dag_display_name=DAG2_DISPLAY_NAME, schedule=None, - start_date=datetime( - 2021, - 6, - 15, - ), + start_date=DAG2_START_DATE, + doc_md="details", + params={"foo": 1}, + max_active_tasks=16, + max_active_runs=16, ): EmptyOperator(task_id=TASK_ID) @@ -225,3 +228,73 @@ def test_patch_dags(test_client, query_params, body, expected_status_code, expec assert [dag["dag_id"] for dag in body["dags"]] == expected_ids paused_dag_ids = [dag["dag_id"] for dag in body["dags"] if dag["is_paused"]] assert paused_dag_ids == expected_paused_ids + + +@pytest.mark.parametrize( + "query_params, dag_id, expected_status_code, dag_display_name, start_date", + [ + ({}, "fake_dag_id", 404, "fake_dag", datetime(2023, 12, 31, tzinfo=timezone.utc)), + ({}, DAG2_ID, 200, DAG2_DISPLAY_NAME, DAG2_START_DATE), + ], +) +def test_dag_details(test_client, query_params, dag_id, expected_status_code, dag_display_name, start_date): + response = test_client.get(f"/public/dags/{dag_id}/details", params=query_params) + assert response.status_code == expected_status_code + if expected_status_code != 200: + return + + # Match expected and actual responses below. + res_json = response.json() + last_loaded = res_json["last_loaded"] + last_parsed_time = res_json["last_parsed_time"] + file_token = res_json["file_token"] + expected = { + "catchup": True, + "concurrency": 16, + "dag_id": dag_id, + "dag_display_name": dag_display_name, + "dagrun_timeout": None, + "dataset_expression": None, + "default_view": "grid", + "description": None, + "doc_md": "details", + "end_date": None, + "fileloc": "/opt/airflow/tests/api_fastapi/views/public/test_dags.py", + "file_token": file_token, + "has_import_errors": False, + "has_task_concurrency_limits": False, + "is_active": True, + "is_paused": False, + "is_paused_upon_creation": None, + "last_expired": None, + "last_parsed": last_loaded, + "last_loaded": last_loaded, # TODO: remove last_loaded as it is the same as last_parsed + "last_parsed_time": last_parsed_time, + "last_pickled": None, + "max_active_runs": 16, + "max_active_tasks": 16, + "max_consecutive_failed_dag_runs": 0, + "next_dagrun": None, + "next_dagrun_create_after": None, + "next_dagrun_data_interval_end": None, + "next_dagrun_data_interval_start": None, + "orientation": "LR", + "owners": ["airflow"], + "params": { + "foo": { + "description": None, + "schema": {}, + "value": 1, + } + }, + "pickle_id": None, + "render_template_as_native_obj": False, + "timetable_summary": None, + "scheduler_lock": None, + "start_date": start_date.replace(tzinfo=None).isoformat() + "Z", # pydantic datetime format + "tags": [], + "template_searchpath": None, + "timetable_description": "Never, external triggers only", + "timezone": UTC_JSON_REPR, + } + assert res_json == expected From dba8b100633c3b704163f457e0c12a726d257c68 Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Wed, 2 Oct 2024 01:57:21 +0530 Subject: [PATCH 02/10] Add comment for computed_field, remove unused import --- airflow/api_fastapi/serializers/dags.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow/api_fastapi/serializers/dags.py b/airflow/api_fastapi/serializers/dags.py index ddad914bb8ac..363f52581790 100644 --- a/airflow/api_fastapi/serializers/dags.py +++ b/airflow/api_fastapi/serializers/dags.py @@ -30,7 +30,6 @@ ) from airflow.configuration import conf -from airflow.models.param import Param from airflow.serialization.pydantic.dag import DagTagPydantic @@ -139,13 +138,15 @@ def get_params(cls, params: abc.MutableMapping | None) -> dict | None: return None return {param_name: param_val.serialize() for param_name, param_val in params.items()} - @computed_field + # Mypy issue https://github.com/python/mypy/issues/1362 + @computed_field # type: ignore[misc] @property def concurrency(self) -> int: """Return max_active_tasks as concurrency.""" return self.max_active_tasks - @computed_field + # Mypy issue https://github.com/python/mypy/issues/1362 + @computed_field # type: ignore[misc] @property def last_parsed(self) -> datetime | None: """Return last_loaded as last_parsed.""" From af320e5a8032e074dfb97064ce14fe18dfe2be7c Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Wed, 2 Oct 2024 02:19:23 +0530 Subject: [PATCH 03/10] Update pendulum import path --- airflow/api_fastapi/serializers/dags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/serializers/dags.py b/airflow/api_fastapi/serializers/dags.py index 363f52581790..9b17fd53d52b 100644 --- a/airflow/api_fastapi/serializers/dags.py +++ b/airflow/api_fastapi/serializers/dags.py @@ -22,7 +22,7 @@ from typing import Any, Iterable from itsdangerous import URLSafeSerializer -from pendulum import FixedTimezone, Timezone +from pendulum.tz.timezone import FixedTimezone, Timezone from pydantic import ( BaseModel, computed_field, From 9113b353331624951c3181bf682110cf337fd332 Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Wed, 2 Oct 2024 02:25:40 +0530 Subject: [PATCH 04/10] Re-run breeze static checks --- airflow/api_fastapi/openapi/v1-generated.yaml | 298 ++++++++++++++++++ tests/api_fastapi/views/public/test_dags.py | 2 +- 2 files changed, 299 insertions(+), 1 deletion(-) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index ab5dcdb10965..2777a55f6844 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -252,6 +252,57 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/details: + get: + tags: + - DAG + summary: Get Dag Details + description: Get details of DAG. + operationId: get_dag_details + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGDetailsResponse' + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unprocessable Entity /public/dags/{dag_id}: patch: tags: @@ -422,6 +473,253 @@ components: - total_entries title: DAGCollectionResponse description: DAG Collection serializer for responses. + DAGDetailsResponse: + properties: + dag_id: + type: string + title: Dag Id + dag_display_name: + type: string + title: Dag Display Name + is_paused: + type: boolean + title: Is Paused + is_active: + type: boolean + title: Is Active + last_parsed_time: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Parsed Time + last_pickled: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Pickled + last_expired: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Expired + scheduler_lock: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Scheduler Lock + pickle_id: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Pickle Id + default_view: + anyOf: + - type: string + - type: 'null' + title: Default View + fileloc: + type: string + title: Fileloc + description: + anyOf: + - type: string + - type: 'null' + title: Description + timetable_summary: + anyOf: + - type: string + - type: 'null' + title: Timetable Summary + timetable_description: + anyOf: + - type: string + - type: 'null' + title: Timetable Description + tags: + items: + $ref: '#/components/schemas/DagTagPydantic' + type: array + title: Tags + max_active_tasks: + type: integer + title: Max Active Tasks + max_active_runs: + anyOf: + - type: integer + - type: 'null' + title: Max Active Runs + max_consecutive_failed_dag_runs: + type: integer + title: Max Consecutive Failed Dag Runs + has_task_concurrency_limits: + type: boolean + title: Has Task Concurrency Limits + has_import_errors: + type: boolean + title: Has Import Errors + next_dagrun: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun + next_dagrun_data_interval_start: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Data Interval Start + next_dagrun_data_interval_end: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Data Interval End + next_dagrun_create_after: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Create After + owners: + items: + type: string + type: array + title: Owners + catchup: + type: boolean + title: Catchup + dagrun_timeout: + anyOf: + - type: string + format: duration + - type: 'null' + title: Dagrun Timeout + dataset_expression: + anyOf: + - type: object + - type: 'null' + title: Dataset Expression + doc_md: + anyOf: + - type: string + - type: 'null' + title: Doc Md + start_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Start Date + end_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: End Date + is_paused_upon_creation: + anyOf: + - type: boolean + - type: 'null' + title: Is Paused Upon Creation + orientation: + type: string + title: Orientation + params: + anyOf: + - {} + - type: 'null' + title: Params + render_template_as_native_obj: + type: boolean + title: Render Template As Native Obj + template_searchpath: + anyOf: + - type: string + - items: + type: string + type: array + - type: 'null' + title: Template Searchpath + timezone: + anyOf: + - type: string + - type: 'null' + title: Timezone + last_loaded: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Loaded + file_token: + type: string + title: File Token + description: Return file token. + readOnly: true + concurrency: + type: integer + title: Concurrency + description: Return max_active_tasks as concurrency. + readOnly: true + last_parsed: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Parsed + description: Return last_loaded as last_parsed. + readOnly: true + type: object + required: + - dag_id + - dag_display_name + - is_paused + - is_active + - last_parsed_time + - last_pickled + - last_expired + - scheduler_lock + - pickle_id + - default_view + - fileloc + - description + - timetable_summary + - timetable_description + - tags + - max_active_tasks + - max_active_runs + - max_consecutive_failed_dag_runs + - has_task_concurrency_limits + - has_import_errors + - next_dagrun + - next_dagrun_data_interval_start + - next_dagrun_data_interval_end + - next_dagrun_create_after + - owners + - catchup + - dagrun_timeout + - dataset_expression + - doc_md + - start_date + - end_date + - is_paused_upon_creation + - orientation + - params + - render_template_as_native_obj + - template_searchpath + - timezone + - last_loaded + - file_token + - concurrency + - last_parsed + title: DAGDetailsResponse + description: Specific serializer for DAG Details responses. DAGPatchBody: properties: is_paused: diff --git a/tests/api_fastapi/views/public/test_dags.py b/tests/api_fastapi/views/public/test_dags.py index 529ffc427552..90adb7762bf3 100644 --- a/tests/api_fastapi/views/public/test_dags.py +++ b/tests/api_fastapi/views/public/test_dags.py @@ -268,7 +268,7 @@ def test_dag_details(test_client, query_params, dag_id, expected_status_code, da "is_paused_upon_creation": None, "last_expired": None, "last_parsed": last_loaded, - "last_loaded": last_loaded, # TODO: remove last_loaded as it is the same as last_parsed + "last_loaded": last_loaded, # TODO: remove last_loaded as it is the same as last_parsed "last_parsed_time": last_parsed_time, "last_pickled": None, "max_active_runs": 16, From 073d5a5f5e613e226a1ff1a7c2a38069700634aa Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Wed, 2 Oct 2024 02:43:19 +0530 Subject: [PATCH 05/10] Add openapi-gen for DAG Details API --- airflow/ui/openapi-gen/queries/common.ts | 16 + airflow/ui/openapi-gen/queries/prefetch.ts | 20 + airflow/ui/openapi-gen/queries/queries.ts | 26 ++ airflow/ui/openapi-gen/queries/suspense.ts | 26 ++ .../ui/openapi-gen/requests/schemas.gen.ts | 420 ++++++++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 29 ++ airflow/ui/openapi-gen/requests/types.gen.ts | 95 ++++ 7 files changed, 632 insertions(+) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index fcddded7dc12..f5bc875a56b6 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -74,6 +74,22 @@ export const UseDagServiceGetDagsKeyFn = ( }, ]), ]; +export type DagServiceGetDagDetailsDefaultResponse = Awaited< + ReturnType +>; +export type DagServiceGetDagDetailsQueryResult< + TData = DagServiceGetDagDetailsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagServiceGetDagDetailsKey = "DagServiceGetDagDetails"; +export const UseDagServiceGetDagDetailsKeyFn = ( + { + dagId, + }: { + dagId: string; + }, + queryKey?: Array, +) => [useDagServiceGetDagDetailsKey, ...(queryKey ?? [{ dagId }])]; export type DagServicePatchDagsMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 95c2c7b73734..210c4f7f77dd 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -94,3 +94,23 @@ export const prefetchUseDagServiceGetDags = ( tags, }), }); +/** + * Get Dag Details + * Get details of DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGDetailsResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagServiceGetDagDetails = ( + queryClient: QueryClient, + { + dagId, + }: { + dagId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }), + queryFn: () => DagService.getDagDetails({ dagId }), + }); diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index f83c151b91e2..f12a6504c848 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -118,6 +118,32 @@ export const useDagServiceGetDags = < }) as TData, ...options, }); +/** + * Get Dag Details + * Get details of DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGDetailsResponse Successful Response + * @throws ApiError + */ +export const useDagServiceGetDagDetails = < + TData = Common.DagServiceGetDagDetailsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + }: { + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }, queryKey), + queryFn: () => DagService.getDagDetails({ dagId }) as TData, + ...options, + }); /** * Patch Dags * Patch multiple DAGs. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index dc8b99dfb218..d9d62d35e3ca 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -109,3 +109,29 @@ export const useDagServiceGetDagsSuspense = < }) as TData, ...options, }); +/** + * Get Dag Details + * Get details of DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGDetailsResponse Successful Response + * @throws ApiError + */ +export const useDagServiceGetDagDetailsSuspense = < + TData = Common.DagServiceGetDagDetailsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + }: { + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }, queryKey), + queryFn: () => DagService.getDagDetails({ dagId }) as TData, + ...options, + }); diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index e8c9b5d70cdf..56f2412e879c 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -20,6 +20,426 @@ export const $DAGCollectionResponse = { description: "DAG Collection serializer for responses.", } as const; +export const $DAGDetailsResponse = { + properties: { + dag_id: { + type: "string", + title: "Dag Id", + }, + dag_display_name: { + type: "string", + title: "Dag Display Name", + }, + is_paused: { + type: "boolean", + title: "Is Paused", + }, + is_active: { + type: "boolean", + title: "Is Active", + }, + last_parsed_time: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Parsed Time", + }, + last_pickled: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Pickled", + }, + last_expired: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Expired", + }, + scheduler_lock: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Scheduler Lock", + }, + pickle_id: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Pickle Id", + }, + default_view: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Default View", + }, + fileloc: { + type: "string", + title: "Fileloc", + }, + description: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Description", + }, + timetable_summary: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Timetable Summary", + }, + timetable_description: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Timetable Description", + }, + tags: { + items: { + $ref: "#/components/schemas/DagTagPydantic", + }, + type: "array", + title: "Tags", + }, + max_active_tasks: { + type: "integer", + title: "Max Active Tasks", + }, + max_active_runs: { + anyOf: [ + { + type: "integer", + }, + { + type: "null", + }, + ], + title: "Max Active Runs", + }, + max_consecutive_failed_dag_runs: { + type: "integer", + title: "Max Consecutive Failed Dag Runs", + }, + has_task_concurrency_limits: { + type: "boolean", + title: "Has Task Concurrency Limits", + }, + has_import_errors: { + type: "boolean", + title: "Has Import Errors", + }, + next_dagrun: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun", + }, + next_dagrun_data_interval_start: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Data Interval Start", + }, + next_dagrun_data_interval_end: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Data Interval End", + }, + next_dagrun_create_after: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Create After", + }, + owners: { + items: { + type: "string", + }, + type: "array", + title: "Owners", + }, + catchup: { + type: "boolean", + title: "Catchup", + }, + dagrun_timeout: { + anyOf: [ + { + type: "string", + format: "duration", + }, + { + type: "null", + }, + ], + title: "Dagrun Timeout", + }, + dataset_expression: { + anyOf: [ + { + type: "object", + }, + { + type: "null", + }, + ], + title: "Dataset Expression", + }, + doc_md: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Doc Md", + }, + start_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Start Date", + }, + end_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "End Date", + }, + is_paused_upon_creation: { + anyOf: [ + { + type: "boolean", + }, + { + type: "null", + }, + ], + title: "Is Paused Upon Creation", + }, + orientation: { + type: "string", + title: "Orientation", + }, + params: { + anyOf: [ + {}, + { + type: "null", + }, + ], + title: "Params", + }, + render_template_as_native_obj: { + type: "boolean", + title: "Render Template As Native Obj", + }, + template_searchpath: { + anyOf: [ + { + type: "string", + }, + { + items: { + type: "string", + }, + type: "array", + }, + { + type: "null", + }, + ], + title: "Template Searchpath", + }, + timezone: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Timezone", + }, + last_loaded: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Loaded", + }, + file_token: { + type: "string", + title: "File Token", + description: "Return file token.", + readOnly: true, + }, + concurrency: { + type: "integer", + title: "Concurrency", + description: "Return max_active_tasks as concurrency.", + readOnly: true, + }, + last_parsed: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Parsed", + description: "Return last_loaded as last_parsed.", + readOnly: true, + }, + }, + type: "object", + required: [ + "dag_id", + "dag_display_name", + "is_paused", + "is_active", + "last_parsed_time", + "last_pickled", + "last_expired", + "scheduler_lock", + "pickle_id", + "default_view", + "fileloc", + "description", + "timetable_summary", + "timetable_description", + "tags", + "max_active_tasks", + "max_active_runs", + "max_consecutive_failed_dag_runs", + "has_task_concurrency_limits", + "has_import_errors", + "next_dagrun", + "next_dagrun_data_interval_start", + "next_dagrun_data_interval_end", + "next_dagrun_create_after", + "owners", + "catchup", + "dagrun_timeout", + "dataset_expression", + "doc_md", + "start_date", + "end_date", + "is_paused_upon_creation", + "orientation", + "params", + "render_template_as_native_obj", + "template_searchpath", + "timezone", + "last_loaded", + "file_token", + "concurrency", + "last_parsed", + ], + title: "DAGDetailsResponse", + description: "Specific serializer for DAG Details responses.", +} as const; + export const $DAGPatchBody = { properties: { is_paused: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 24c960d2b7d5..0aefb56d06e6 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -9,6 +9,8 @@ import type { GetDagsResponse, PatchDagsData, PatchDagsResponse, + GetDagDetailsData, + GetDagDetailsResponse, PatchDagData, PatchDagResponse, DeleteConnectionData, @@ -127,6 +129,33 @@ export class DagService { }); } + /** + * Get Dag Details + * Get details of DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGDetailsResponse Successful Response + * @throws ApiError + */ + public static getDagDetails( + data: GetDagDetailsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/details", + path: { + dag_id: data.dagId, + }, + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Unprocessable Entity", + }, + }); + } + /** * Patch Dag * Patch the specific DAG. diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index b38d5c00a69f..1a5637bd067c 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -8,6 +8,64 @@ export type DAGCollectionResponse = { total_entries: number; }; +/** + * Specific serializer for DAG Details responses. + */ +export type DAGDetailsResponse = { + dag_id: string; + dag_display_name: string; + is_paused: boolean; + is_active: boolean; + last_parsed_time: string | null; + last_pickled: string | null; + last_expired: string | null; + scheduler_lock: string | null; + pickle_id: string | null; + default_view: string | null; + fileloc: string; + description: string | null; + timetable_summary: string | null; + timetable_description: string | null; + tags: Array; + max_active_tasks: number; + max_active_runs: number | null; + max_consecutive_failed_dag_runs: number; + has_task_concurrency_limits: boolean; + has_import_errors: boolean; + next_dagrun: string | null; + next_dagrun_data_interval_start: string | null; + next_dagrun_data_interval_end: string | null; + next_dagrun_create_after: string | null; + owners: Array; + catchup: boolean; + dagrun_timeout: string | null; + dataset_expression: { + [key: string]: unknown; + } | null; + doc_md: string | null; + start_date: string | null; + end_date: string | null; + is_paused_upon_creation: boolean | null; + orientation: string; + params: unknown | null; + render_template_as_native_obj: boolean; + template_searchpath: string | Array | null; + timezone: string | null; + last_loaded: string | null; + /** + * Return file token. + */ + readonly file_token: string; + /** + * Return max_active_tasks as concurrency. + */ + readonly concurrency: number; + /** + * Return last_loaded as last_parsed. + */ + readonly last_parsed: string | null; +}; + /** * Dag Serializer for updatable body. */ @@ -126,6 +184,12 @@ export type PatchDagsData = { export type PatchDagsResponse = DAGCollectionResponse; +export type GetDagDetailsData = { + dagId: string; +}; + +export type GetDagDetailsResponse = DAGDetailsResponse; + export type PatchDagData = { dagId: string; requestBody: DAGPatchBody; @@ -202,6 +266,37 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/details": { + get: { + req: GetDagDetailsData; + res: { + /** + * Successful Response + */ + 200: DAGDetailsResponse; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Unprocessable Entity + */ + 422: HTTPExceptionResponse; + }; + }; + }; "/public/dags/{dag_id}": { patch: { req: PatchDagData; From d8f099d13bf4341f50f69778501990b60669a7bd Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Thu, 3 Oct 2024 01:39:09 +0530 Subject: [PATCH 06/10] Resolve review comments, test has_task_concurrency_limits --- airflow/api_fastapi/serializers/dags.py | 34 +++++++++++++-------- airflow/api_fastapi/views/public/dags.py | 11 ++----- tests/api_fastapi/views/public/test_dags.py | 18 ++++++----- 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/airflow/api_fastapi/serializers/dags.py b/airflow/api_fastapi/serializers/dags.py index 9b17fd53d52b..416cf34ba967 100644 --- a/airflow/api_fastapi/serializers/dags.py +++ b/airflow/api_fastapi/serializers/dags.py @@ -24,12 +24,15 @@ from itsdangerous import URLSafeSerializer from pendulum.tz.timezone import FixedTimezone, Timezone from pydantic import ( + AliasGenerator, BaseModel, + ConfigDict, computed_field, field_validator, ) from airflow.configuration import conf +from airflow.models.param import Param from airflow.serialization.pydantic.dag import DagTagPydantic @@ -101,18 +104,32 @@ class DAGDetailsResponse(DAGResponse): """Specific serializer for DAG Details responses.""" catchup: bool - dagrun_timeout: timedelta | None + dag_run_timeout: timedelta | None dataset_expression: dict | None doc_md: str | None start_date: datetime | None end_date: datetime | None is_paused_upon_creation: bool | None orientation: str - params: Any | None + params: abc.MutableMapping | None render_template_as_native_obj: bool - template_searchpath: str | Iterable[str] | None + template_search_path: Iterable[str] | None timezone: str | None - last_loaded: datetime | None + last_parsed: datetime | None + + def _validation_alias_fn(field_name: str): + val_dict = { + "dag_run_timeout": "dagrun_timeout", + "last_parsed": "last_loaded", + "template_search_path": "template_searchpath", + } + return val_dict.get(field_name, field_name) + + model_config = ConfigDict( + alias_generator=AliasGenerator( + validation_alias=_validation_alias_fn, + ) + ) @field_validator("timezone", mode="before") @classmethod @@ -136,7 +153,7 @@ def get_params(cls, params: abc.MutableMapping | None) -> dict | None: """Convert params attribute to dict representation.""" if params is None: return None - return {param_name: param_val.serialize() for param_name, param_val in params.items()} + return {param_name: param_val.dump() for param_name, param_val in params.items()} # Mypy issue https://github.com/python/mypy/issues/1362 @computed_field # type: ignore[misc] @@ -144,10 +161,3 @@ def get_params(cls, params: abc.MutableMapping | None) -> dict | None: def concurrency(self) -> int: """Return max_active_tasks as concurrency.""" return self.max_active_tasks - - # Mypy issue https://github.com/python/mypy/issues/1362 - @computed_field # type: ignore[misc] - @property - def last_parsed(self) -> datetime | None: - """Return last_loaded as last_parsed.""" - return self.last_loaded diff --git a/airflow/api_fastapi/views/public/dags.py b/airflow/api_fastapi/views/public/dags.py index d01378ad515b..ef76e184505c 100644 --- a/airflow/api_fastapi/views/public/dags.py +++ b/airflow/api_fastapi/views/public/dags.py @@ -18,7 +18,6 @@ from __future__ import annotations from fastapi import Depends, HTTPException, Query, Request -from pydantic import ValidationError from sqlalchemy import update from sqlalchemy.orm import Session from typing_extensions import Annotated @@ -100,25 +99,19 @@ async def get_dag_details( dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request ) -> DAGDetailsResponse: """Get details of DAG.""" - if not session: - raise HTTPException(401, "Invalid session") - dag: DAG = request.app.state.dag_bag.get_dag(dag_id) if not dag: raise HTTPException(404, f"Dag with id {dag_id} was not found") dag_model: DagModel = session.get(DagModel, dag_id) if not dag_model: - raise HTTPException(400, f"Unable to obtain dag with id {dag_id} from session") + raise HTTPException(404, f"Unable to obtain dag with id {dag_id} from session") for key, value in dag.__dict__.items(): if not key.startswith("_") and not hasattr(dag_model, key): setattr(dag_model, key, value) - try: - return DAGDetailsResponse.model_validate(dag_model, from_attributes=True) - except ValidationError as ve: - raise HTTPException(422, f"Error while validating dag model with id {dag_id}, details: {str(ve)}") + return DAGDetailsResponse.model_validate(dag_model, from_attributes=True) @dags_router.patch("/dags/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404])) diff --git a/tests/api_fastapi/views/public/test_dags.py b/tests/api_fastapi/views/public/test_dags.py index 90adb7762bf3..58b3daf35c64 100644 --- a/tests/api_fastapi/views/public/test_dags.py +++ b/tests/api_fastapi/views/public/test_dags.py @@ -77,7 +77,8 @@ def _create_deactivated_paused_dag(session=None): @pytest.fixture(autouse=True) -def setup(dag_maker) -> None: +@provide_session +def setup(dag_maker, session=None) -> None: clear_db_runs() clear_db_dags() clear_db_serialized_dags() @@ -108,6 +109,9 @@ def setup(dag_maker) -> None: EmptyOperator(task_id=TASK_ID) dag_maker.dagbag.sync_to_db() + dag_maker.dag_model.has_task_concurrency_limits = True + session.merge(dag_maker.dag_model) + session.commit() _create_deactivated_paused_dag() @@ -245,7 +249,7 @@ def test_dag_details(test_client, query_params, dag_id, expected_status_code, da # Match expected and actual responses below. res_json = response.json() - last_loaded = res_json["last_loaded"] + last_parsed = res_json["last_parsed"] last_parsed_time = res_json["last_parsed_time"] file_token = res_json["file_token"] expected = { @@ -253,7 +257,7 @@ def test_dag_details(test_client, query_params, dag_id, expected_status_code, da "concurrency": 16, "dag_id": dag_id, "dag_display_name": dag_display_name, - "dagrun_timeout": None, + "dag_run_timeout": None, "dataset_expression": None, "default_view": "grid", "description": None, @@ -262,13 +266,12 @@ def test_dag_details(test_client, query_params, dag_id, expected_status_code, da "fileloc": "/opt/airflow/tests/api_fastapi/views/public/test_dags.py", "file_token": file_token, "has_import_errors": False, - "has_task_concurrency_limits": False, + "has_task_concurrency_limits": True, "is_active": True, "is_paused": False, "is_paused_upon_creation": None, "last_expired": None, - "last_parsed": last_loaded, - "last_loaded": last_loaded, # TODO: remove last_loaded as it is the same as last_parsed + "last_parsed": last_parsed, "last_parsed_time": last_parsed_time, "last_pickled": None, "max_active_runs": 16, @@ -282,6 +285,7 @@ def test_dag_details(test_client, query_params, dag_id, expected_status_code, da "owners": ["airflow"], "params": { "foo": { + "__class": "airflow.models.param.Param", "description": None, "schema": {}, "value": 1, @@ -293,7 +297,7 @@ def test_dag_details(test_client, query_params, dag_id, expected_status_code, da "scheduler_lock": None, "start_date": start_date.replace(tzinfo=None).isoformat() + "Z", # pydantic datetime format "tags": [], - "template_searchpath": None, + "template_search_path": None, "timetable_description": "Never, external triggers only", "timezone": UTC_JSON_REPR, } From 7d0bb9db1eae13008d778b17ce26f6a03f7a491c Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Thu, 3 Oct 2024 01:40:17 +0530 Subject: [PATCH 07/10] Remove unused import --- airflow/api_fastapi/serializers/dags.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/api_fastapi/serializers/dags.py b/airflow/api_fastapi/serializers/dags.py index 416cf34ba967..7275455022f1 100644 --- a/airflow/api_fastapi/serializers/dags.py +++ b/airflow/api_fastapi/serializers/dags.py @@ -32,7 +32,6 @@ ) from airflow.configuration import conf -from airflow.models.param import Param from airflow.serialization.pydantic.dag import DagTagPydantic From 79ea68da81e1605a009f9241c43657bcc39fcc18 Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Thu, 3 Oct 2024 02:15:11 +0530 Subject: [PATCH 08/10] Use lambda for aliases, re-run breeze static checks --- airflow/api_fastapi/openapi/v1-generated.yaml | 30 +++++--------- airflow/api_fastapi/serializers/dags.py | 14 +++---- .../ui/openapi-gen/requests/schemas.gen.ts | 40 ++++++------------- airflow/ui/openapi-gen/requests/types.gen.ts | 14 +++---- 4 files changed, 33 insertions(+), 65 deletions(-) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 2777a55f6844..59471149edc3 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -594,12 +594,12 @@ components: catchup: type: boolean title: Catchup - dagrun_timeout: + dag_run_timeout: anyOf: - type: string format: duration - type: 'null' - title: Dagrun Timeout + title: Dag Run Timeout dataset_expression: anyOf: - type: object @@ -632,31 +632,30 @@ components: title: Orientation params: anyOf: - - {} + - type: object - type: 'null' title: Params render_template_as_native_obj: type: boolean title: Render Template As Native Obj - template_searchpath: + template_search_path: anyOf: - - type: string - items: type: string type: array - type: 'null' - title: Template Searchpath + title: Template Search Path timezone: anyOf: - type: string - type: 'null' title: Timezone - last_loaded: + last_parsed: anyOf: - type: string format: date-time - type: 'null' - title: Last Loaded + title: Last Parsed file_token: type: string title: File Token @@ -667,14 +666,6 @@ components: title: Concurrency description: Return max_active_tasks as concurrency. readOnly: true - last_parsed: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Last Parsed - description: Return last_loaded as last_parsed. - readOnly: true type: object required: - dag_id @@ -703,7 +694,7 @@ components: - next_dagrun_create_after - owners - catchup - - dagrun_timeout + - dag_run_timeout - dataset_expression - doc_md - start_date @@ -712,12 +703,11 @@ components: - orientation - params - render_template_as_native_obj - - template_searchpath + - template_search_path - timezone - - last_loaded + - last_parsed - file_token - concurrency - - last_parsed title: DAGDetailsResponse description: Specific serializer for DAG Details responses. DAGPatchBody: diff --git a/airflow/api_fastapi/serializers/dags.py b/airflow/api_fastapi/serializers/dags.py index 7275455022f1..964e526d31d2 100644 --- a/airflow/api_fastapi/serializers/dags.py +++ b/airflow/api_fastapi/serializers/dags.py @@ -116,17 +116,13 @@ class DAGDetailsResponse(DAGResponse): timezone: str | None last_parsed: datetime | None - def _validation_alias_fn(field_name: str): - val_dict = { - "dag_run_timeout": "dagrun_timeout", - "last_parsed": "last_loaded", - "template_search_path": "template_searchpath", - } - return val_dict.get(field_name, field_name) - model_config = ConfigDict( alias_generator=AliasGenerator( - validation_alias=_validation_alias_fn, + validation_alias=lambda field_name: { + "dag_run_timeout": "dagrun_timeout", + "last_parsed": "last_loaded", + "template_search_path": "template_searchpath", + }.get(field_name, field_name), ) ) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 56f2412e879c..e8aae616be06 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -239,7 +239,7 @@ export const $DAGDetailsResponse = { type: "boolean", title: "Catchup", }, - dagrun_timeout: { + dag_run_timeout: { anyOf: [ { type: "string", @@ -249,7 +249,7 @@ export const $DAGDetailsResponse = { type: "null", }, ], - title: "Dagrun Timeout", + title: "Dag Run Timeout", }, dataset_expression: { anyOf: [ @@ -314,7 +314,9 @@ export const $DAGDetailsResponse = { }, params: { anyOf: [ - {}, + { + type: "object", + }, { type: "null", }, @@ -325,11 +327,8 @@ export const $DAGDetailsResponse = { type: "boolean", title: "Render Template As Native Obj", }, - template_searchpath: { + template_search_path: { anyOf: [ - { - type: "string", - }, { items: { type: "string", @@ -340,7 +339,7 @@ export const $DAGDetailsResponse = { type: "null", }, ], - title: "Template Searchpath", + title: "Template Search Path", }, timezone: { anyOf: [ @@ -353,7 +352,7 @@ export const $DAGDetailsResponse = { ], title: "Timezone", }, - last_loaded: { + last_parsed: { anyOf: [ { type: "string", @@ -363,7 +362,7 @@ export const $DAGDetailsResponse = { type: "null", }, ], - title: "Last Loaded", + title: "Last Parsed", }, file_token: { type: "string", @@ -377,20 +376,6 @@ export const $DAGDetailsResponse = { description: "Return max_active_tasks as concurrency.", readOnly: true, }, - last_parsed: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Last Parsed", - description: "Return last_loaded as last_parsed.", - readOnly: true, - }, }, type: "object", required: [ @@ -420,7 +405,7 @@ export const $DAGDetailsResponse = { "next_dagrun_create_after", "owners", "catchup", - "dagrun_timeout", + "dag_run_timeout", "dataset_expression", "doc_md", "start_date", @@ -429,12 +414,11 @@ export const $DAGDetailsResponse = { "orientation", "params", "render_template_as_native_obj", - "template_searchpath", + "template_search_path", "timezone", - "last_loaded", + "last_parsed", "file_token", "concurrency", - "last_parsed", ], title: "DAGDetailsResponse", description: "Specific serializer for DAG Details responses.", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 1a5637bd067c..c37106abc8fc 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -38,7 +38,7 @@ export type DAGDetailsResponse = { next_dagrun_create_after: string | null; owners: Array; catchup: boolean; - dagrun_timeout: string | null; + dag_run_timeout: string | null; dataset_expression: { [key: string]: unknown; } | null; @@ -47,11 +47,13 @@ export type DAGDetailsResponse = { end_date: string | null; is_paused_upon_creation: boolean | null; orientation: string; - params: unknown | null; + params: { + [key: string]: unknown; + } | null; render_template_as_native_obj: boolean; - template_searchpath: string | Array | null; + template_search_path: Array | null; timezone: string | null; - last_loaded: string | null; + last_parsed: string | null; /** * Return file token. */ @@ -60,10 +62,6 @@ export type DAGDetailsResponse = { * Return max_active_tasks as concurrency. */ readonly concurrency: number; - /** - * Return last_loaded as last_parsed. - */ - readonly last_parsed: string | null; }; /** From 14a42e4df58c1de9744b2ecbd5d5f02e8b9c4c30 Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Thu, 3 Oct 2024 02:40:03 +0530 Subject: [PATCH 09/10] Remove duplicate entry --- airflow/api_fastapi/openapi/v1-generated.yaml | 44 ------------------- 1 file changed, 44 deletions(-) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 59471149edc3..ce488a996af4 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -370,50 +370,6 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/details: - get: - tags: - - DAG - summary: Get DAG details. - description: Get details of a specific DAG. - operationId: get_dag_details_public_dags_get_details - responses: - '200': - description: Successful Response - content: - application/json: - schema: - $ref: '#/components/schemas/DAGDetailsResponse' - '400': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Bad Request - '401': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Unauthorized - '403': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Forbidden - '404': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Not Found - '422': - description: Validation Error - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' /public/connections/{connection_id}: delete: tags: From 957c4d55a2c9532eb79d7354b7e91dfafd3c734d Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Thu, 3 Oct 2024 03:47:32 +0530 Subject: [PATCH 10/10] Use simpler approach for alias --- airflow/api_fastapi/serializers/dags.py | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/airflow/api_fastapi/serializers/dags.py b/airflow/api_fastapi/serializers/dags.py index 964e526d31d2..17677054c4c1 100644 --- a/airflow/api_fastapi/serializers/dags.py +++ b/airflow/api_fastapi/serializers/dags.py @@ -24,9 +24,9 @@ from itsdangerous import URLSafeSerializer from pendulum.tz.timezone import FixedTimezone, Timezone from pydantic import ( - AliasGenerator, + AliasChoices, BaseModel, - ConfigDict, + Field, computed_field, field_validator, ) @@ -103,7 +103,9 @@ class DAGDetailsResponse(DAGResponse): """Specific serializer for DAG Details responses.""" catchup: bool - dag_run_timeout: timedelta | None + dag_run_timeout: timedelta | None = Field( + validation_alias=AliasChoices("dag_run_timeout", "dagrun_timeout") + ) dataset_expression: dict | None doc_md: str | None start_date: datetime | None @@ -112,19 +114,11 @@ class DAGDetailsResponse(DAGResponse): orientation: str params: abc.MutableMapping | None render_template_as_native_obj: bool - template_search_path: Iterable[str] | None - timezone: str | None - last_parsed: datetime | None - - model_config = ConfigDict( - alias_generator=AliasGenerator( - validation_alias=lambda field_name: { - "dag_run_timeout": "dagrun_timeout", - "last_parsed": "last_loaded", - "template_search_path": "template_searchpath", - }.get(field_name, field_name), - ) + template_search_path: Iterable[str] | None = Field( + validation_alias=AliasChoices("template_search_path", "template_searchpath") ) + timezone: str | None + last_parsed: datetime | None = Field(validation_alias=AliasChoices("last_parsed", "last_loaded")) @field_validator("timezone", mode="before") @classmethod