From fe86dfa07122167ccad54ea849d6e62020f0cab6 Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Thu, 10 Oct 2024 13:17:49 +0530 Subject: [PATCH] AIP-84 Migrate the public endpoint Get DAG to FastAPI (#42848) * Migrate the public endpoint Get DAG to FastAPI * Use proper name for test function --- airflow/api_fastapi/openapi/v1-generated.yaml | 62 +++++++++++++++++-- airflow/api_fastapi/serializers/dags.py | 40 +++++++----- airflow/api_fastapi/views/public/dags.py | 20 ++++++ airflow/ui/openapi-gen/queries/common.ts | 16 +++++ airflow/ui/openapi-gen/queries/prefetch.ts | 20 ++++++ airflow/ui/openapi-gen/queries/queries.ts | 26 ++++++++ airflow/ui/openapi-gen/queries/suspense.ts | 26 ++++++++ .../ui/openapi-gen/requests/services.gen.ts | 45 +++++++++++--- airflow/ui/openapi-gen/requests/types.gen.ts | 49 ++++++++++++--- tests/api_fastapi/views/public/test_dags.py | 48 ++++++++++++++ 10 files changed, 313 insertions(+), 39 deletions(-) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 463cc1e92f4d..fb19c1abd1c1 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -291,13 +291,13 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/details: + /public/dags/{dag_id}: get: tags: - DAG - summary: Get Dag Details - description: Get details of DAG. - operationId: get_dag_details + summary: Get Dag + description: Get basic information about a DAG. + operationId: get_dag parameters: - name: dag_id in: path @@ -311,7 +311,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/DAGDetailsResponse' + $ref: '#/components/schemas/DAGResponse' '400': content: application/json: @@ -342,7 +342,6 @@ paths: schema: $ref: '#/components/schemas/HTTPExceptionResponse' description: Unprocessable Entity - /public/dags/{dag_id}: patch: tags: - DAG @@ -409,6 +408,57 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/details: + get: + tags: + - DAG + summary: Get Dag Details + description: Get details of DAG. + operationId: get_dag_details + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGDetailsResponse' + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unprocessable Entity /public/connections/{connection_id}: delete: tags: diff --git a/airflow/api_fastapi/serializers/dags.py b/airflow/api_fastapi/serializers/dags.py index 17677054c4c1..9879badf2504 100644 --- a/airflow/api_fastapi/serializers/dags.py +++ b/airflow/api_fastapi/serializers/dags.py @@ -24,9 +24,9 @@ from itsdangerous import URLSafeSerializer from pendulum.tz.timezone import FixedTimezone, Timezone from pydantic import ( - AliasChoices, + AliasGenerator, BaseModel, - Field, + ConfigDict, computed_field, field_validator, ) @@ -77,6 +77,14 @@ def get_owners(cls, v: Any) -> list[str] | None: return v.split(",") return v + @field_validator("timetable_summary", mode="before") + @classmethod + def get_timetable_summary(cls, tts: str | None) -> str | None: + """Validate the string representation of timetable_summary.""" + if tts is None or tts == "None": + return None + return str(tts) + # Mypy issue https://github.com/python/mypy/issues/1362 @computed_field # type: ignore[misc] @property @@ -103,9 +111,7 @@ class DAGDetailsResponse(DAGResponse): """Specific serializer for DAG Details responses.""" catchup: bool - dag_run_timeout: timedelta | None = Field( - validation_alias=AliasChoices("dag_run_timeout", "dagrun_timeout") - ) + dag_run_timeout: timedelta | None dataset_expression: dict | None doc_md: str | None start_date: datetime | None @@ -114,11 +120,19 @@ class DAGDetailsResponse(DAGResponse): orientation: str params: abc.MutableMapping | None render_template_as_native_obj: bool - template_search_path: Iterable[str] | None = Field( - validation_alias=AliasChoices("template_search_path", "template_searchpath") - ) + template_search_path: Iterable[str] | None timezone: str | None - last_parsed: datetime | None = Field(validation_alias=AliasChoices("last_parsed", "last_loaded")) + last_parsed: datetime | None + + model_config = ConfigDict( + alias_generator=AliasGenerator( + validation_alias=lambda field_name: { + "dag_run_timeout": "dagrun_timeout", + "last_parsed": "last_loaded", + "template_search_path": "template_searchpath", + }.get(field_name, field_name), + ) + ) @field_validator("timezone", mode="before") @classmethod @@ -128,14 +142,6 @@ def get_timezone(cls, tz: Timezone | FixedTimezone) -> str | None: return None return str(tz) - @field_validator("timetable_summary", mode="before") - @classmethod - def get_timetable_summary(cls, tts: str | None) -> str | None: - """Validate the string representation of timetable_summary.""" - if tts is None or tts == "None": - return None - return str(tts) - @field_validator("params", mode="before") @classmethod def get_params(cls, params: abc.MutableMapping | None) -> dict | None: diff --git a/airflow/api_fastapi/views/public/dags.py b/airflow/api_fastapi/views/public/dags.py index f0df86b787a2..ca0f44162eb2 100644 --- a/airflow/api_fastapi/views/public/dags.py +++ b/airflow/api_fastapi/views/public/dags.py @@ -92,6 +92,26 @@ async def get_dags( ) +@dags_router.get("/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422])) +async def get_dag( + dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request +) -> DAGResponse: + """Get basic information about a DAG.""" + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + if not dag: + raise HTTPException(404, f"Dag with id {dag_id} was not found") + + dag_model: DagModel = session.get(DagModel, dag_id) + if not dag_model: + raise HTTPException(404, f"Unable to obtain dag with id {dag_id} from session") + + for key, value in dag.__dict__.items(): + if not key.startswith("_") and not hasattr(dag_model, key): + setattr(dag_model, key, value) + + return DAGResponse.model_validate(dag_model, from_attributes=True) + + @dags_router.get("/{dag_id}/details", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422])) async def get_dag_details( dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 72fd0ef9ccde..a4d65c69003f 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -98,6 +98,22 @@ export const UseDagServiceGetDagsKeyFn = ( }, ]), ]; +export type DagServiceGetDagDefaultResponse = Awaited< + ReturnType +>; +export type DagServiceGetDagQueryResult< + TData = DagServiceGetDagDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagServiceGetDagKey = "DagServiceGetDag"; +export const UseDagServiceGetDagKeyFn = ( + { + dagId, + }: { + dagId: string; + }, + queryKey?: Array, +) => [useDagServiceGetDagKey, ...(queryKey ?? [{ dagId }])]; export type DagServiceGetDagDetailsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index a114b9dc92c6..8bd691ca33be 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -126,6 +126,26 @@ export const prefetchUseDagServiceGetDags = ( tags, }), }); +/** + * Get Dag + * Get basic information about a DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagServiceGetDag = ( + queryClient: QueryClient, + { + dagId, + }: { + dagId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagServiceGetDagKeyFn({ dagId }), + queryFn: () => DagService.getDag({ dagId }), + }); /** * Get Dag Details * Get details of DAG. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index a3ce02257160..51b8f4fb051d 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -153,6 +153,32 @@ export const useDagServiceGetDags = < }) as TData, ...options, }); +/** + * Get Dag + * Get basic information about a DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGResponse Successful Response + * @throws ApiError + */ +export const useDagServiceGetDag = < + TData = Common.DagServiceGetDagDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + }: { + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagServiceGetDagKeyFn({ dagId }, queryKey), + queryFn: () => DagService.getDag({ dagId }) as TData, + ...options, + }); /** * Get Dag Details * Get details of DAG. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index fbef843c6e0a..b437007468f2 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -148,6 +148,32 @@ export const useDagServiceGetDagsSuspense = < }) as TData, ...options, }); +/** + * Get Dag + * Get basic information about a DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGResponse Successful Response + * @throws ApiError + */ +export const useDagServiceGetDagSuspense = < + TData = Common.DagServiceGetDagDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + }: { + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagServiceGetDagKeyFn({ dagId }, queryKey), + queryFn: () => DagService.getDag({ dagId }) as TData, + ...options, + }); /** * Get Dag Details * Get details of DAG. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 7f61fd32f349..24fbb9c29c46 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -11,10 +11,12 @@ import type { GetDagsResponse, PatchDagsData, PatchDagsResponse, - GetDagDetailsData, - GetDagDetailsResponse, + GetDagData, + GetDagResponse, PatchDagData, PatchDagResponse, + GetDagDetailsData, + GetDagDetailsResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, @@ -166,19 +168,17 @@ export class DagService { } /** - * Get Dag Details - * Get details of DAG. + * Get Dag + * Get basic information about a DAG. * @param data The data for the request. * @param data.dagId - * @returns DAGDetailsResponse Successful Response + * @returns DAGResponse Successful Response * @throws ApiError */ - public static getDagDetails( - data: GetDagDetailsData, - ): CancelablePromise { + public static getDag(data: GetDagData): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/dags/{dag_id}/details", + url: "/public/dags/{dag_id}", path: { dag_id: data.dagId, }, @@ -225,6 +225,33 @@ export class DagService { }, }); } + + /** + * Get Dag Details + * Get details of DAG. + * @param data The data for the request. + * @param data.dagId + * @returns DAGDetailsResponse Successful Response + * @throws ApiError + */ + public static getDagDetails( + data: GetDagDetailsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/details", + path: { + dag_id: data.dagId, + }, + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Unprocessable Entity", + }, + }); + } } export class ConnectionService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 7b5fc54065a0..368c981b9da1 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -260,11 +260,11 @@ export type PatchDagsData = { export type PatchDagsResponse = DAGCollectionResponse; -export type GetDagDetailsData = { +export type GetDagData = { dagId: string; }; -export type GetDagDetailsResponse = DAGDetailsResponse; +export type GetDagResponse = DAGResponse; export type PatchDagData = { dagId: string; @@ -274,6 +274,12 @@ export type PatchDagData = { export type PatchDagResponse = DAGResponse; +export type GetDagDetailsData = { + dagId: string; +}; + +export type GetDagDetailsResponse = DAGDetailsResponse; + export type DeleteConnectionData = { connectionId: string; }; @@ -379,14 +385,14 @@ export type $OpenApiTs = { }; }; }; - "/public/dags/{dag_id}/details": { + "/public/dags/{dag_id}": { get: { - req: GetDagDetailsData; + req: GetDagData; res: { /** * Successful Response */ - 200: DAGDetailsResponse; + 200: DAGResponse; /** * Bad Request */ @@ -409,8 +415,6 @@ export type $OpenApiTs = { 422: HTTPExceptionResponse; }; }; - }; - "/public/dags/{dag_id}": { patch: { req: PatchDagData; res: { @@ -441,6 +445,37 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/details": { + get: { + req: GetDagDetailsData; + res: { + /** + * Successful Response + */ + 200: DAGDetailsResponse; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Unprocessable Entity + */ + 422: HTTPExceptionResponse; + }; + }; + }; "/public/connections/{connection_id}": { delete: { req: DeleteConnectionData; diff --git a/tests/api_fastapi/views/public/test_dags.py b/tests/api_fastapi/views/public/test_dags.py index 7ac93a2f2e07..5512f7bb1384 100644 --- a/tests/api_fastapi/views/public/test_dags.py +++ b/tests/api_fastapi/views/public/test_dags.py @@ -303,3 +303,51 @@ def test_dag_details(test_client, query_params, dag_id, expected_status_code, da "timezone": UTC_JSON_REPR, } assert res_json == expected + + +@pytest.mark.parametrize( + "query_params, dag_id, expected_status_code, dag_display_name", + [ + ({}, "fake_dag_id", 404, "fake_dag"), + ({}, DAG2_ID, 200, DAG2_DISPLAY_NAME), + ], +) +def test_get_dag(test_client, query_params, dag_id, expected_status_code, dag_display_name): + response = test_client.get(f"/public/dags/{dag_id}", params=query_params) + assert response.status_code == expected_status_code + if expected_status_code != 200: + return + + # Match expected and actual responses below. + res_json = response.json() + last_parsed_time = res_json["last_parsed_time"] + file_token = res_json["file_token"] + expected = { + "dag_id": dag_id, + "dag_display_name": dag_display_name, + "description": None, + "fileloc": "/opt/airflow/tests/api_fastapi/views/public/test_dags.py", + "file_token": file_token, + "is_paused": False, + "is_active": True, + "owners": ["airflow"], + "timetable_summary": None, + "tags": [], + "next_dagrun": None, + "has_task_concurrency_limits": True, + "next_dagrun_data_interval_start": None, + "next_dagrun_data_interval_end": None, + "max_active_runs": 16, + "max_consecutive_failed_dag_runs": 0, + "next_dagrun_create_after": None, + "last_expired": None, + "max_active_tasks": 16, + "last_pickled": None, + "default_view": "grid", + "last_parsed_time": last_parsed_time, + "scheduler_lock": None, + "timetable_description": "Never, external triggers only", + "has_import_errors": False, + "pickle_id": None, + } + assert res_json == expected