From b5712e7c1710fadb2ab7425c20e7982ce03e5d2e Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Thu, 3 Oct 2024 14:08:01 +0530 Subject: [PATCH] AIP-84 Migrate the public endpoint DAG Details to FastAPI (#42631) * Migrate the public endpoint DAG Details to FastAPI * Add comment for computed_field, remove unused import * Update pendulum import path * Re-run breeze static checks * Add openapi-gen for DAG Details API * Resolve review comments, test has_task_concurrency_limits * Remove unused import * Use lambda for aliases, re-run breeze static checks * Remove duplicate entry * Use simpler approach for alias --- .../api_connexion/endpoints/dag_endpoint.py | 1 + airflow/api_fastapi/openapi/v1-generated.yaml | 288 +++++++++++++ airflow/api_fastapi/serializers/dags.py | 61 ++- airflow/api_fastapi/views/public/dags.py | 33 +- airflow/ui/openapi-gen/queries/common.ts | 16 + airflow/ui/openapi-gen/queries/prefetch.ts | 20 + airflow/ui/openapi-gen/queries/queries.ts | 26 ++ airflow/ui/openapi-gen/queries/suspense.ts | 26 ++ .../ui/openapi-gen/requests/schemas.gen.ts | 404 ++++++++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 29 ++ airflow/ui/openapi-gen/requests/types.gen.ts | 93 ++++ tests/api_fastapi/views/public/test_dags.py | 89 +++- 12 files changed, 1075 insertions(+), 11 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py index 5d10a97dedce..3d0d3dd8bfab 100644 --- a/airflow/api_connexion/endpoints/dag_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_endpoint.py @@ -70,6 +70,7 @@ def get_dag( ) +@mark_fastapi_migration_done @security.requires_access_dag("GET") @provide_session def get_dag_details( diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index a54e0e4ca57d..ce488a996af4 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -252,6 +252,57 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/details: + get: + tags: + - DAG + summary: Get Dag Details + description: Get details of DAG. + operationId: get_dag_details + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGDetailsResponse' + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unprocessable Entity /public/dags/{dag_id}: patch: tags: @@ -378,6 +429,243 @@ components: - total_entries title: DAGCollectionResponse description: DAG Collection serializer for responses. + DAGDetailsResponse: + properties: + dag_id: + type: string + title: Dag Id + dag_display_name: + type: string + title: Dag Display Name + is_paused: + type: boolean + title: Is Paused + is_active: + type: boolean + title: Is Active + last_parsed_time: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Parsed Time + last_pickled: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Pickled + last_expired: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Expired + scheduler_lock: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Scheduler Lock + pickle_id: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Pickle Id + default_view: + anyOf: + - type: string + - type: 'null' + title: Default View + fileloc: + type: string + title: Fileloc + description: + anyOf: + - type: string + - type: 'null' + title: Description + timetable_summary: + anyOf: + - type: string + - type: 'null' + title: Timetable Summary + timetable_description: + anyOf: + - type: string + - type: 'null' + title: Timetable Description + tags: + items: + $ref: '#/components/schemas/DagTagPydantic' + type: array + title: Tags + max_active_tasks: + type: integer + title: Max Active Tasks + max_active_runs: + anyOf: + - type: integer + - type: 'null' + title: Max Active Runs + max_consecutive_failed_dag_runs: + type: integer + title: Max Consecutive Failed Dag Runs + has_task_concurrency_limits: + type: boolean + title: Has Task Concurrency Limits + has_import_errors: + type: boolean + title: Has Import Errors + next_dagrun: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun + next_dagrun_data_interval_start: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Data Interval Start + next_dagrun_data_interval_end: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Data Interval End + next_dagrun_create_after: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Create After + owners: + items: + type: string + type: array + title: Owners + catchup: + type: boolean + title: Catchup + dag_run_timeout: + anyOf: + - type: string + format: duration + - type: 'null' + title: Dag Run Timeout + dataset_expression: + anyOf: + - type: object + - type: 'null' + title: Dataset Expression + doc_md: + anyOf: + - type: string + - type: 'null' + title: Doc Md + start_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Start Date + end_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: End Date + is_paused_upon_creation: + anyOf: + - type: boolean + - type: 'null' + title: Is Paused Upon Creation + orientation: + type: string + title: Orientation + params: + anyOf: + - type: object + - type: 'null' + title: Params + render_template_as_native_obj: + type: boolean + title: Render Template As Native Obj + template_search_path: + anyOf: + - items: + type: string + type: array + - type: 'null' + title: Template Search Path + timezone: + anyOf: + - type: string + - type: 'null' + title: Timezone + last_parsed: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Parsed + file_token: + type: string + title: File Token + description: Return file token. + readOnly: true + concurrency: + type: integer + title: Concurrency + description: Return max_active_tasks as concurrency. + readOnly: true + type: object + required: + - dag_id + - dag_display_name + - is_paused + - is_active + - last_parsed_time + - last_pickled + - last_expired + - scheduler_lock + - pickle_id + - default_view + - fileloc + - description + - timetable_summary + - timetable_description + - tags + - max_active_tasks + - max_active_runs + - max_consecutive_failed_dag_runs + - has_task_concurrency_limits + - has_import_errors + - next_dagrun + - next_dagrun_data_interval_start + - next_dagrun_data_interval_end + - next_dagrun_create_after + - owners + - catchup + - dag_run_timeout + - dataset_expression + - doc_md + - start_date + - end_date + - is_paused_upon_creation + - orientation + - params + - render_template_as_native_obj + - template_search_path + - timezone + - last_parsed + - file_token + - concurrency + title: DAGDetailsResponse + description: Specific serializer for DAG Details responses. DAGPatchBody: properties: is_paused: diff --git a/airflow/api_fastapi/serializers/dags.py b/airflow/api_fastapi/serializers/dags.py index 59b47bdef9e9..17677054c4c1 100644 --- a/airflow/api_fastapi/serializers/dags.py +++ b/airflow/api_fastapi/serializers/dags.py @@ -17,12 +17,16 @@ from __future__ import annotations -from datetime import datetime -from typing import Any +from collections import abc +from datetime import datetime, timedelta +from typing import Any, Iterable from itsdangerous import URLSafeSerializer +from pendulum.tz.timezone import FixedTimezone, Timezone from pydantic import ( + AliasChoices, BaseModel, + Field, computed_field, field_validator, ) @@ -93,3 +97,56 @@ class DAGCollectionResponse(BaseModel): dags: list[DAGResponse] total_entries: int + + +class DAGDetailsResponse(DAGResponse): + """Specific serializer for DAG Details responses.""" + + catchup: bool + dag_run_timeout: timedelta | None = Field( + validation_alias=AliasChoices("dag_run_timeout", "dagrun_timeout") + ) + dataset_expression: dict | None + doc_md: str | None + start_date: datetime | None + end_date: datetime | None + is_paused_upon_creation: bool | None + orientation: str + params: abc.MutableMapping | None + render_template_as_native_obj: bool + template_search_path: Iterable[str] | None = Field( + validation_alias=AliasChoices("template_search_path", "template_searchpath") + ) + timezone: str | None + last_parsed: datetime | None = Field(validation_alias=AliasChoices("last_parsed", "last_loaded")) + + @field_validator("timezone", mode="before") + @classmethod + def get_timezone(cls, tz: Timezone | FixedTimezone) -> str | None: + """Convert timezone attribute to string representation.""" + if tz is None: + return None + return str(tz) + + @field_validator("timetable_summary", mode="before") + @classmethod + def get_timetable_summary(cls, tts: str | None) -> str | None: + """Validate the string representation of timetable_summary.""" + if tts is None or tts == "None": + return None + return str(tts) + + @field_validator("params", mode="before") + @classmethod + def get_params(cls, params: abc.MutableMapping | None) -> dict | None: + """Convert params attribute to dict representation.""" + if params is None: + return None + return {param_name: param_val.dump() for param_name, param_val in params.items()} + + # Mypy issue https://github.com/python/mypy/issues/1362 + @computed_field # type: ignore[misc] + @property + def concurrency(self) -> int: + """Return max_active_tasks as concurrency.""" + return self.max_active_tasks diff --git a/airflow/api_fastapi/views/public/dags.py b/airflow/api_fastapi/views/public/dags.py index 3761d593d2fd..ef76e184505c 100644 --- a/airflow/api_fastapi/views/public/dags.py +++ b/airflow/api_fastapi/views/public/dags.py @@ -17,7 +17,7 @@ from __future__ import annotations -from fastapi import Depends, HTTPException, Query +from fastapi import Depends, HTTPException, Query, Request from sqlalchemy import update from sqlalchemy.orm import Session from typing_extensions import Annotated @@ -41,9 +41,14 @@ QueryTagsFilter, SortParam, ) -from airflow.api_fastapi.serializers.dags import DAGCollectionResponse, DAGPatchBody, DAGResponse +from airflow.api_fastapi.serializers.dags import ( + DAGCollectionResponse, + DAGDetailsResponse, + DAGPatchBody, + DAGResponse, +) from airflow.api_fastapi.views.router import AirflowRouter -from airflow.models import DagModel +from airflow.models import DAG, DagModel dags_router = AirflowRouter(tags=["DAG"]) @@ -87,6 +92,28 @@ async def get_dags( ) +@dags_router.get( + "/dags/{dag_id}/details", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]) +) +async def get_dag_details( + dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request +) -> DAGDetailsResponse: + """Get details of DAG.""" + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + if not dag: + raise HTTPException(404, f"Dag with id {dag_id} was not found") + + dag_model: DagModel = session.get(DagModel, dag_id) + if not dag_model: + raise HTTPException(404, f"Unable to obtain dag with id {dag_id} from session") + + for key, value in dag.__dict__.items(): + if not key.startswith("_") and not hasattr(dag_model, key): + setattr(dag_model, key, value) + + return DAGDetailsResponse.model_validate(dag_model, from_attributes=True) + + @dags_router.patch("/dags/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404])) async def patch_dag( dag_id: str, diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index fcddded7dc12..f5bc875a56b6 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -74,6 +74,22 @@ export const UseDagServiceGetDagsKeyFn = ( }, ]), ]; +export type DagServiceGetDagDetailsDefaultResponse = Awaited< + ReturnType +>; +export type DagServiceGetDagDetailsQueryResult< + TData = DagServiceGetDagDetailsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagServiceGetDagDetailsKey = "DagServiceGetDagDetails"; +export const UseDagServiceGetDagDetailsKeyFn = ( + { + dagId, + }: { + dagId: string; + }, + queryKey?: Array, +) => [useDagServiceGetDagDetailsKey, ...(queryKey ?? [{ dagId }])]; export type DagServicePatchDagsMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 95c2c7b73734..210c4f7f77dd 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -94,3 +94,23 @@ export const prefetchUseDagServiceGetDags = ( tags, }), }); +/** + * Get Dag Details + * Get details of DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGDetailsResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagServiceGetDagDetails = ( + queryClient: QueryClient, + { + dagId, + }: { + dagId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }), + queryFn: () => DagService.getDagDetails({ dagId }), + }); diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index f83c151b91e2..f12a6504c848 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -118,6 +118,32 @@ export const useDagServiceGetDags = < }) as TData, ...options, }); +/** + * Get Dag Details + * Get details of DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGDetailsResponse Successful Response + * @throws ApiError + */ +export const useDagServiceGetDagDetails = < + TData = Common.DagServiceGetDagDetailsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + }: { + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }, queryKey), + queryFn: () => DagService.getDagDetails({ dagId }) as TData, + ...options, + }); /** * Patch Dags * Patch multiple DAGs. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index dc8b99dfb218..d9d62d35e3ca 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -109,3 +109,29 @@ export const useDagServiceGetDagsSuspense = < }) as TData, ...options, }); +/** + * Get Dag Details + * Get details of DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGDetailsResponse Successful Response + * @throws ApiError + */ +export const useDagServiceGetDagDetailsSuspense = < + TData = Common.DagServiceGetDagDetailsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + }: { + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }, queryKey), + queryFn: () => DagService.getDagDetails({ dagId }) as TData, + ...options, + }); diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index e8c9b5d70cdf..e8aae616be06 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -20,6 +20,410 @@ export const $DAGCollectionResponse = { description: "DAG Collection serializer for responses.", } as const; +export const $DAGDetailsResponse = { + properties: { + dag_id: { + type: "string", + title: "Dag Id", + }, + dag_display_name: { + type: "string", + title: "Dag Display Name", + }, + is_paused: { + type: "boolean", + title: "Is Paused", + }, + is_active: { + type: "boolean", + title: "Is Active", + }, + last_parsed_time: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Parsed Time", + }, + last_pickled: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Pickled", + }, + last_expired: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Expired", + }, + scheduler_lock: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Scheduler Lock", + }, + pickle_id: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Pickle Id", + }, + default_view: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Default View", + }, + fileloc: { + type: "string", + title: "Fileloc", + }, + description: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Description", + }, + timetable_summary: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Timetable Summary", + }, + timetable_description: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Timetable Description", + }, + tags: { + items: { + $ref: "#/components/schemas/DagTagPydantic", + }, + type: "array", + title: "Tags", + }, + max_active_tasks: { + type: "integer", + title: "Max Active Tasks", + }, + max_active_runs: { + anyOf: [ + { + type: "integer", + }, + { + type: "null", + }, + ], + title: "Max Active Runs", + }, + max_consecutive_failed_dag_runs: { + type: "integer", + title: "Max Consecutive Failed Dag Runs", + }, + has_task_concurrency_limits: { + type: "boolean", + title: "Has Task Concurrency Limits", + }, + has_import_errors: { + type: "boolean", + title: "Has Import Errors", + }, + next_dagrun: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun", + }, + next_dagrun_data_interval_start: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Data Interval Start", + }, + next_dagrun_data_interval_end: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Data Interval End", + }, + next_dagrun_create_after: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Create After", + }, + owners: { + items: { + type: "string", + }, + type: "array", + title: "Owners", + }, + catchup: { + type: "boolean", + title: "Catchup", + }, + dag_run_timeout: { + anyOf: [ + { + type: "string", + format: "duration", + }, + { + type: "null", + }, + ], + title: "Dag Run Timeout", + }, + dataset_expression: { + anyOf: [ + { + type: "object", + }, + { + type: "null", + }, + ], + title: "Dataset Expression", + }, + doc_md: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Doc Md", + }, + start_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Start Date", + }, + end_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "End Date", + }, + is_paused_upon_creation: { + anyOf: [ + { + type: "boolean", + }, + { + type: "null", + }, + ], + title: "Is Paused Upon Creation", + }, + orientation: { + type: "string", + title: "Orientation", + }, + params: { + anyOf: [ + { + type: "object", + }, + { + type: "null", + }, + ], + title: "Params", + }, + render_template_as_native_obj: { + type: "boolean", + title: "Render Template As Native Obj", + }, + template_search_path: { + anyOf: [ + { + items: { + type: "string", + }, + type: "array", + }, + { + type: "null", + }, + ], + title: "Template Search Path", + }, + timezone: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Timezone", + }, + last_parsed: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Parsed", + }, + file_token: { + type: "string", + title: "File Token", + description: "Return file token.", + readOnly: true, + }, + concurrency: { + type: "integer", + title: "Concurrency", + description: "Return max_active_tasks as concurrency.", + readOnly: true, + }, + }, + type: "object", + required: [ + "dag_id", + "dag_display_name", + "is_paused", + "is_active", + "last_parsed_time", + "last_pickled", + "last_expired", + "scheduler_lock", + "pickle_id", + "default_view", + "fileloc", + "description", + "timetable_summary", + "timetable_description", + "tags", + "max_active_tasks", + "max_active_runs", + "max_consecutive_failed_dag_runs", + "has_task_concurrency_limits", + "has_import_errors", + "next_dagrun", + "next_dagrun_data_interval_start", + "next_dagrun_data_interval_end", + "next_dagrun_create_after", + "owners", + "catchup", + "dag_run_timeout", + "dataset_expression", + "doc_md", + "start_date", + "end_date", + "is_paused_upon_creation", + "orientation", + "params", + "render_template_as_native_obj", + "template_search_path", + "timezone", + "last_parsed", + "file_token", + "concurrency", + ], + title: "DAGDetailsResponse", + description: "Specific serializer for DAG Details responses.", +} as const; + export const $DAGPatchBody = { properties: { is_paused: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 24c960d2b7d5..0aefb56d06e6 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -9,6 +9,8 @@ import type { GetDagsResponse, PatchDagsData, PatchDagsResponse, + GetDagDetailsData, + GetDagDetailsResponse, PatchDagData, PatchDagResponse, DeleteConnectionData, @@ -127,6 +129,33 @@ export class DagService { }); } + /** + * Get Dag Details + * Get details of DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGDetailsResponse Successful Response + * @throws ApiError + */ + public static getDagDetails( + data: GetDagDetailsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/details", + path: { + dag_id: data.dagId, + }, + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Unprocessable Entity", + }, + }); + } + /** * Patch Dag * Patch the specific DAG. diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index b38d5c00a69f..c37106abc8fc 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -8,6 +8,62 @@ export type DAGCollectionResponse = { total_entries: number; }; +/** + * Specific serializer for DAG Details responses. + */ +export type DAGDetailsResponse = { + dag_id: string; + dag_display_name: string; + is_paused: boolean; + is_active: boolean; + last_parsed_time: string | null; + last_pickled: string | null; + last_expired: string | null; + scheduler_lock: string | null; + pickle_id: string | null; + default_view: string | null; + fileloc: string; + description: string | null; + timetable_summary: string | null; + timetable_description: string | null; + tags: Array; + max_active_tasks: number; + max_active_runs: number | null; + max_consecutive_failed_dag_runs: number; + has_task_concurrency_limits: boolean; + has_import_errors: boolean; + next_dagrun: string | null; + next_dagrun_data_interval_start: string | null; + next_dagrun_data_interval_end: string | null; + next_dagrun_create_after: string | null; + owners: Array; + catchup: boolean; + dag_run_timeout: string | null; + dataset_expression: { + [key: string]: unknown; + } | null; + doc_md: string | null; + start_date: string | null; + end_date: string | null; + is_paused_upon_creation: boolean | null; + orientation: string; + params: { + [key: string]: unknown; + } | null; + render_template_as_native_obj: boolean; + template_search_path: Array | null; + timezone: string | null; + last_parsed: string | null; + /** + * Return file token. + */ + readonly file_token: string; + /** + * Return max_active_tasks as concurrency. + */ + readonly concurrency: number; +}; + /** * Dag Serializer for updatable body. */ @@ -126,6 +182,12 @@ export type PatchDagsData = { export type PatchDagsResponse = DAGCollectionResponse; +export type GetDagDetailsData = { + dagId: string; +}; + +export type GetDagDetailsResponse = DAGDetailsResponse; + export type PatchDagData = { dagId: string; requestBody: DAGPatchBody; @@ -202,6 +264,37 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/details": { + get: { + req: GetDagDetailsData; + res: { + /** + * Successful Response + */ + 200: DAGDetailsResponse; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Unprocessable Entity + */ + 422: HTTPExceptionResponse; + }; + }; + }; "/public/dags/{dag_id}": { patch: { req: PatchDagData; diff --git a/tests/api_fastapi/views/public/test_dags.py b/tests/api_fastapi/views/public/test_dags.py index 7b68ebe512a2..58b3daf35c64 100644 --- a/tests/api_fastapi/views/public/test_dags.py +++ b/tests/api_fastapi/views/public/test_dags.py @@ -18,6 +18,7 @@ from datetime import datetime, timezone +import pendulum import pytest from airflow.models.dag import DagModel @@ -34,8 +35,10 @@ DAG1_DISPLAY_NAME = "display1" DAG2_ID = "test_dag2" DAG2_DISPLAY_NAME = "display2" +DAG2_START_DATE = datetime(2021, 6, 15, tzinfo=timezone.utc) DAG3_ID = "test_dag3" TASK_ID = "op1" +UTC_JSON_REPR = "UTC" if pendulum.__version__.startswith("3") else "Timezone('UTC')" @provide_session @@ -74,7 +77,8 @@ def _create_deactivated_paused_dag(session=None): @pytest.fixture(autouse=True) -def setup(dag_maker) -> None: +@provide_session +def setup(dag_maker, session=None) -> None: clear_db_runs() clear_db_dags() clear_db_serialized_dags() @@ -96,15 +100,18 @@ def setup(dag_maker) -> None: DAG2_ID, dag_display_name=DAG2_DISPLAY_NAME, schedule=None, - start_date=datetime( - 2021, - 6, - 15, - ), + start_date=DAG2_START_DATE, + doc_md="details", + params={"foo": 1}, + max_active_tasks=16, + max_active_runs=16, ): EmptyOperator(task_id=TASK_ID) dag_maker.dagbag.sync_to_db() + dag_maker.dag_model.has_task_concurrency_limits = True + session.merge(dag_maker.dag_model) + session.commit() _create_deactivated_paused_dag() @@ -225,3 +232,73 @@ def test_patch_dags(test_client, query_params, body, expected_status_code, expec assert [dag["dag_id"] for dag in body["dags"]] == expected_ids paused_dag_ids = [dag["dag_id"] for dag in body["dags"] if dag["is_paused"]] assert paused_dag_ids == expected_paused_ids + + +@pytest.mark.parametrize( + "query_params, dag_id, expected_status_code, dag_display_name, start_date", + [ + ({}, "fake_dag_id", 404, "fake_dag", datetime(2023, 12, 31, tzinfo=timezone.utc)), + ({}, DAG2_ID, 200, DAG2_DISPLAY_NAME, DAG2_START_DATE), + ], +) +def test_dag_details(test_client, query_params, dag_id, expected_status_code, dag_display_name, start_date): + response = test_client.get(f"/public/dags/{dag_id}/details", params=query_params) + assert response.status_code == expected_status_code + if expected_status_code != 200: + return + + # Match expected and actual responses below. + res_json = response.json() + last_parsed = res_json["last_parsed"] + last_parsed_time = res_json["last_parsed_time"] + file_token = res_json["file_token"] + expected = { + "catchup": True, + "concurrency": 16, + "dag_id": dag_id, + "dag_display_name": dag_display_name, + "dag_run_timeout": None, + "dataset_expression": None, + "default_view": "grid", + "description": None, + "doc_md": "details", + "end_date": None, + "fileloc": "/opt/airflow/tests/api_fastapi/views/public/test_dags.py", + "file_token": file_token, + "has_import_errors": False, + "has_task_concurrency_limits": True, + "is_active": True, + "is_paused": False, + "is_paused_upon_creation": None, + "last_expired": None, + "last_parsed": last_parsed, + "last_parsed_time": last_parsed_time, + "last_pickled": None, + "max_active_runs": 16, + "max_active_tasks": 16, + "max_consecutive_failed_dag_runs": 0, + "next_dagrun": None, + "next_dagrun_create_after": None, + "next_dagrun_data_interval_end": None, + "next_dagrun_data_interval_start": None, + "orientation": "LR", + "owners": ["airflow"], + "params": { + "foo": { + "__class": "airflow.models.param.Param", + "description": None, + "schema": {}, + "value": 1, + } + }, + "pickle_id": None, + "render_template_as_native_obj": False, + "timetable_summary": None, + "scheduler_lock": None, + "start_date": start_date.replace(tzinfo=None).isoformat() + "Z", # pydantic datetime format + "tags": [], + "template_search_path": None, + "timetable_description": "Never, external triggers only", + "timezone": UTC_JSON_REPR, + } + assert res_json == expected