From 95b612935a782fdc58cabb31949e75f739990b16 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 4 Oct 2024 11:56:50 +0530 Subject: [PATCH 01/13] get dag_run init --- .../endpoints/dag_run_endpoint.py | 2 + airflow/api_fastapi/openapi/v1-generated.yaml | 49 +++++++++++++++++++ airflow/api_fastapi/views/public/__init__.py | 2 + airflow/api_fastapi/views/public/dag_run.py | 44 +++++++++++++++++ airflow/ui/openapi-gen/queries/common.ts | 19 +++++++ airflow/ui/openapi-gen/queries/prefetch.ts | 28 ++++++++++- airflow/ui/openapi-gen/queries/queries.ts | 32 ++++++++++++ airflow/ui/openapi-gen/queries/suspense.ts | 37 +++++++++++++- .../ui/openapi-gen/requests/services.gen.ts | 31 ++++++++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 34 +++++++++++++ 10 files changed, 276 insertions(+), 2 deletions(-) create mode 100644 airflow/api_fastapi/views/public/dag_run.py diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 44891c0ef2c8..a862b7c96950 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -63,6 +63,7 @@ from airflow.models import DagModel, DagRun from airflow.timetables.base import DataInterval from airflow.utils.airflow_flask_app import get_airflow_app +from airflow.utils.api_migration import mark_fastapi_migration_done from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import DagRunState @@ -90,6 +91,7 @@ def delete_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSI return NoContent, HTTPStatus.NO_CONTENT +@mark_fastapi_migration_done @security.requires_access_dag("GET", DagAccessEntity.RUN) @provide_session def get_dag_run( diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 23c4ecf545d9..9650864aecf8 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -411,6 +411,55 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}: + get: + tags: + - DagRun + summary: Get Dag Run + operationId: get_dag_run + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' components: schemas: DAGCollectionResponse: diff --git a/airflow/api_fastapi/views/public/__init__.py b/airflow/api_fastapi/views/public/__init__.py index 9c0eefebb875..904d02cc713e 100644 --- a/airflow/api_fastapi/views/public/__init__.py +++ b/airflow/api_fastapi/views/public/__init__.py @@ -18,6 +18,7 @@ from __future__ import annotations from airflow.api_fastapi.views.public.connections import connections_router +from airflow.api_fastapi.views.public.dag_run import dag_run_router from airflow.api_fastapi.views.public.dags import dags_router from airflow.api_fastapi.views.router import AirflowRouter @@ -26,3 +27,4 @@ public_router.include_router(dags_router) public_router.include_router(connections_router) +public_router.include_router(dag_run_router) diff --git a/airflow/api_fastapi/views/public/dag_run.py b/airflow/api_fastapi/views/public/dag_run.py new file mode 100644 index 000000000000..7214d3e29a70 --- /dev/null +++ b/airflow/api_fastapi/views/public/dag_run.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from fastapi import Depends, HTTPException +from sqlalchemy import select +from sqlalchemy.orm import Session +from typing_extensions import Annotated + +from airflow.api_fastapi.db.common import get_session +from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.views.router import AirflowRouter +from airflow.models import DagRun + +dag_run_router = AirflowRouter(tags=["DagRun"]) + + +@dag_run_router.get( + "/dags/{dag_id}/dagRuns/{dag_run_id}", responses=create_openapi_http_exception_doc([401, 403, 404]) +) +async def get_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]): + dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) + + if dag_run is None: + raise HTTPException( + 404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" + ) + + return dag_run diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index f5bc875a56b6..276be7e078e9 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -4,6 +4,7 @@ import { UseQueryResult } from "@tanstack/react-query"; import { AssetService, ConnectionService, + DagRunService, DagService, } from "../requests/services.gen"; import { DagRunState } from "../requests/types.gen"; @@ -90,6 +91,24 @@ export const UseDagServiceGetDagDetailsKeyFn = ( }, queryKey?: Array, ) => [useDagServiceGetDagDetailsKey, ...(queryKey ?? [{ dagId }])]; +export type DagRunServiceGetDagRunDefaultResponse = Awaited< + ReturnType +>; +export type DagRunServiceGetDagRunQueryResult< + TData = DagRunServiceGetDagRunDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagRunServiceGetDagRunKey = "DagRunServiceGetDagRun"; +export const UseDagRunServiceGetDagRunKeyFn = ( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: Array, +) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])]; export type DagServicePatchDagsMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 210c4f7f77dd..d57fcdb53b98 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1,7 +1,11 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.0 import { type QueryClient } from "@tanstack/react-query"; -import { AssetService, DagService } from "../requests/services.gen"; +import { + AssetService, + DagRunService, + DagService, +} from "../requests/services.gen"; import { DagRunState } from "../requests/types.gen"; import * as Common from "./common"; @@ -114,3 +118,25 @@ export const prefetchUseDagServiceGetDagDetails = ( queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }), queryFn: () => DagService.getDagDetails({ dagId }), }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns unknown Successful Response + * @throws ApiError + */ +export const prefetchUseDagRunServiceGetDagRun = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }), + }); diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index f12a6504c848..5b45989f6484 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -9,6 +9,7 @@ import { import { AssetService, ConnectionService, + DagRunService, DagService, } from "../requests/services.gen"; import { DAGPatchBody, DagRunState } from "../requests/types.gen"; @@ -144,6 +145,37 @@ export const useDagServiceGetDagDetails = < queryFn: () => DagService.getDagDetails({ dagId }) as TData, ...options, }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns unknown Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetDagRun = < + TData = Common.DagRunServiceGetDagRunDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, + ...options, + }); /** * Patch Dags * Patch multiple DAGs. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index d9d62d35e3ca..2d3256801f97 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -1,7 +1,11 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.0 import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query"; -import { AssetService, DagService } from "../requests/services.gen"; +import { + AssetService, + DagRunService, + DagService, +} from "../requests/services.gen"; import { DagRunState } from "../requests/types.gen"; import * as Common from "./common"; @@ -135,3 +139,34 @@ export const useDagServiceGetDagDetailsSuspense = < queryFn: () => DagService.getDagDetails({ dagId }) as TData, ...options, }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns unknown Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetDagRunSuspense = < + TData = Common.DagRunServiceGetDagRunDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, + ...options, + }); diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 0e91fa416571..0fe2be971490 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -15,6 +15,8 @@ import type { PatchDagResponse, DeleteConnectionData, DeleteConnectionResponse, + GetDagRunData, + GetDagRunResponse, } from "./types.gen"; export class AssetService { @@ -218,3 +220,32 @@ export class ConnectionService { }); } } + +export class DagRunService { + /** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns unknown Successful Response + * @throws ApiError + */ + public static getDagRun( + data: GetDagRunData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } +} diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index b87a17236358..29bbd82ba8c7 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -202,6 +202,13 @@ export type DeleteConnectionData = { export type DeleteConnectionResponse = void; +export type GetDagRunData = { + dagId: string; + dagRunId: string; +}; + +export type GetDagRunResponse = unknown; + export type $OpenApiTs = { "/ui/next_run_assets/{dag_id}": { get: { @@ -353,4 +360,31 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}": { + get: { + req: GetDagRunData; + res: { + /** + * Successful Response + */ + 200: unknown; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; }; From d1e76815f7b66a495a758b4174e5bd0296ad9437 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 4 Oct 2024 12:16:48 +0530 Subject: [PATCH 02/13] add serializer --- airflow/api_fastapi/openapi/v1-generated.yaml | 77 +++++++++- airflow/api_fastapi/serializers/dag_run.py | 38 +++++ airflow/api_fastapi/views/public/dag_run.py | 5 +- airflow/ui/openapi-gen/queries/prefetch.ts | 2 +- airflow/ui/openapi-gen/queries/queries.ts | 2 +- airflow/ui/openapi-gen/queries/suspense.ts | 2 +- .../ui/openapi-gen/requests/schemas.gen.ts | 131 ++++++++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 2 +- airflow/ui/openapi-gen/requests/types.gen.ts | 25 +++- 9 files changed, 276 insertions(+), 8 deletions(-) create mode 100644 airflow/api_fastapi/serializers/dag_run.py diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 9650864aecf8..87f20382b4b5 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -435,7 +435,8 @@ paths: description: Successful Response content: application/json: - schema: {} + schema: + $ref: '#/components/schemas/DAGRunResponse' '401': content: application/json: @@ -878,6 +879,80 @@ components: - file_token title: DAGResponse description: DAG serializer for responses. + DAGRunResponse: + properties: + dag_run_id: + anyOf: + - type: string + - type: 'null' + title: Dag Run Id + dag_id: + type: string + title: Dag Id + logical_date: + anyOf: + - type: string + - type: 'null' + title: Logical Date + start_date: + anyOf: + - type: string + - type: 'null' + title: Start Date + end_date: + anyOf: + - type: string + - type: 'null' + title: End Date + data_interval_start: + anyOf: + - type: string + - type: 'null' + title: Data Interval Start + data_interval_end: + anyOf: + - type: string + - type: 'null' + title: Data Interval End + last_scheduling_decision: + anyOf: + - type: string + - type: 'null' + title: Last Scheduling Decision + run_type: + type: string + title: Run Type + state: + type: string + title: State + external_trigger: + type: boolean + title: External Trigger + conf: + type: object + title: Conf + notes: + anyOf: + - type: string + - type: 'null' + title: Notes + type: object + required: + - dag_run_id + - dag_id + - logical_date + - start_date + - end_date + - data_interval_start + - data_interval_end + - last_scheduling_decision + - run_type + - state + - external_trigger + - conf + - notes + title: DAGRunResponse + description: DAG Run serializer for responses. DagRunState: type: string enum: diff --git a/airflow/api_fastapi/serializers/dag_run.py b/airflow/api_fastapi/serializers/dag_run.py new file mode 100644 index 000000000000..ec525405c90b --- /dev/null +++ b/airflow/api_fastapi/serializers/dag_run.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from pydantic import BaseModel + + +class DAGRunResponse(BaseModel): + """DAG Run serializer for responses.""" + + dag_run_id: str | None + dag_id: str + logical_date: str | None + start_date: str | None + end_date: str | None + data_interval_start: str | None + data_interval_end: str | None + last_scheduling_decision: str | None + run_type: str # Enum + state: str # Enum + external_trigger: bool + conf: dict + notes: str | None diff --git a/airflow/api_fastapi/views/public/dag_run.py b/airflow/api_fastapi/views/public/dag_run.py index 7214d3e29a70..ada8507cc7e4 100644 --- a/airflow/api_fastapi/views/public/dag_run.py +++ b/airflow/api_fastapi/views/public/dag_run.py @@ -24,6 +24,7 @@ from airflow.api_fastapi.db.common import get_session from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.serializers.dag_run import DAGRunResponse from airflow.api_fastapi.views.router import AirflowRouter from airflow.models import DagRun @@ -33,7 +34,9 @@ @dag_run_router.get( "/dags/{dag_id}/dagRuns/{dag_run_id}", responses=create_openapi_http_exception_doc([401, 403, 404]) ) -async def get_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]): +async def get_dag_run( + dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)] +) -> DAGRunResponse: dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) if dag_run is None: diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index d57fcdb53b98..60c5e0dcac23 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -123,7 +123,7 @@ export const prefetchUseDagServiceGetDagDetails = ( * @param data The data for the request. * @param data.dagId * @param data.dagRunId - * @returns unknown Successful Response + * @returns DAGRunResponse Successful Response * @throws ApiError */ export const prefetchUseDagRunServiceGetDagRun = ( diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 5b45989f6484..aef35e72242b 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -150,7 +150,7 @@ export const useDagServiceGetDagDetails = < * @param data The data for the request. * @param data.dagId * @param data.dagRunId - * @returns unknown Successful Response + * @returns DAGRunResponse Successful Response * @throws ApiError */ export const useDagRunServiceGetDagRun = < diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 2d3256801f97..c9a1c6d70c47 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -144,7 +144,7 @@ export const useDagServiceGetDagDetailsSuspense = < * @param data The data for the request. * @param data.dagId * @param data.dagRunId - * @returns unknown Successful Response + * @returns DAGRunResponse Successful Response * @throws ApiError */ export const useDagRunServiceGetDagRunSuspense = < diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index e8aae616be06..43e32648a695 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -692,6 +692,137 @@ export const $DAGResponse = { description: "DAG serializer for responses.", } as const; +export const $DAGRunResponse = { + properties: { + dag_run_id: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Dag Run Id", + }, + dag_id: { + type: "string", + title: "Dag Id", + }, + logical_date: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Logical Date", + }, + start_date: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Start Date", + }, + end_date: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "End Date", + }, + data_interval_start: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Data Interval Start", + }, + data_interval_end: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Data Interval End", + }, + last_scheduling_decision: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Last Scheduling Decision", + }, + run_type: { + type: "string", + title: "Run Type", + }, + state: { + type: "string", + title: "State", + }, + external_trigger: { + type: "boolean", + title: "External Trigger", + }, + conf: { + type: "object", + title: "Conf", + }, + notes: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Notes", + }, + }, + type: "object", + required: [ + "dag_run_id", + "dag_id", + "logical_date", + "start_date", + "end_date", + "data_interval_start", + "data_interval_end", + "last_scheduling_decision", + "run_type", + "state", + "external_trigger", + "conf", + "notes", + ], + title: "DAGRunResponse", + description: "DAG Run serializer for responses.", +} as const; + export const $DagRunState = { type: "string", enum: ["queued", "running", "success", "failed"], diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 0fe2be971490..f98a398ea121 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -227,7 +227,7 @@ export class DagRunService { * @param data The data for the request. * @param data.dagId * @param data.dagRunId - * @returns unknown Successful Response + * @returns DAGRunResponse Successful Response * @throws ApiError */ public static getDagRun( diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 29bbd82ba8c7..bbf9c69a5aed 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -106,6 +106,27 @@ export type DAGResponse = { readonly file_token: string; }; +/** + * DAG Run serializer for responses. + */ +export type DAGRunResponse = { + dag_run_id: string | null; + dag_id: string; + logical_date: string | null; + start_date: string | null; + end_date: string | null; + data_interval_start: string | null; + data_interval_end: string | null; + last_scheduling_decision: string | null; + run_type: string; + state: string; + external_trigger: boolean; + conf: { + [key: string]: unknown; + }; + notes: string | null; +}; + /** * All possible states that a DagRun can be in. * @@ -207,7 +228,7 @@ export type GetDagRunData = { dagRunId: string; }; -export type GetDagRunResponse = unknown; +export type GetDagRunResponse = DAGRunResponse; export type $OpenApiTs = { "/ui/next_run_assets/{dag_id}": { @@ -367,7 +388,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: unknown; + 200: DAGRunResponse; /** * Unauthorized */ From 8c43a30e9ce797046298648229dafd056fd1d6fb Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 4 Oct 2024 16:27:22 +0530 Subject: [PATCH 03/13] Merge branch 'main' of https://github.com/apache/airflow into kalyan/AIP-84/get_dag_run --- airflow/api_fastapi/openapi/v1-generated.yaml | 50 +++++++++++++++++++ airflow/ui/openapi-gen/queries/common.ts | 19 +++++++ airflow/ui/openapi-gen/queries/prefetch.ts | 23 +++++++++ airflow/ui/openapi-gen/queries/queries.ts | 32 ++++++++++++ airflow/ui/openapi-gen/queries/suspense.ts | 32 ++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 31 ++++++++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 34 +++++++++++++ 7 files changed, 221 insertions(+) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index acf80c9699c5..14e5c8d8949b 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -455,6 +455,56 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}: + get: + tags: + - DagRun + summary: Get Dag Run + operationId: get_dag_run + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGRunResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' components: schemas: ConnectionResponse: diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index ff7bb3995cb4..2e5ebe4c2c7d 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -4,6 +4,7 @@ import { UseQueryResult } from "@tanstack/react-query"; import { AssetService, ConnectionService, + DagRunService, DagService, } from "../requests/services.gen"; import { DagRunState } from "../requests/types.gen"; @@ -110,6 +111,24 @@ export const UseConnectionServiceGetConnectionKeyFn = ( useConnectionServiceGetConnectionKey, ...(queryKey ?? [{ connectionId }]), ]; +export type DagRunServiceGetDagRunDefaultResponse = Awaited< + ReturnType +>; +export type DagRunServiceGetDagRunQueryResult< + TData = DagRunServiceGetDagRunDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagRunServiceGetDagRunKey = "DagRunServiceGetDagRun"; +export const UseDagRunServiceGetDagRunKeyFn = ( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: Array, +) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])]; export type DagServicePatchDagsMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index cbb43cca3abb..dbd0f8b54870 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -4,6 +4,7 @@ import { type QueryClient } from "@tanstack/react-query"; import { AssetService, ConnectionService, + DagRunService, DagService, } from "../requests/services.gen"; import { DagRunState } from "../requests/types.gen"; @@ -138,3 +139,25 @@ export const prefetchUseConnectionServiceGetConnection = ( queryKey: Common.UseConnectionServiceGetConnectionKeyFn({ connectionId }), queryFn: () => ConnectionService.getConnection({ connectionId }), }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagRunServiceGetDagRun = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }), + }); diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 4aa627d74fd0..a5b626ea6ded 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -9,6 +9,7 @@ import { import { AssetService, ConnectionService, + DagRunService, DagService, } from "../requests/services.gen"; import { DAGPatchBody, DagRunState } from "../requests/types.gen"; @@ -173,6 +174,37 @@ export const useConnectionServiceGetConnection = < queryFn: () => ConnectionService.getConnection({ connectionId }) as TData, ...options, }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetDagRun = < + TData = Common.DagRunServiceGetDagRunDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, + ...options, + }); /** * Patch Dags * Patch multiple DAGs. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 04d7eb94b320..f75fe7cb046a 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -4,6 +4,7 @@ import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query"; import { AssetService, ConnectionService, + DagRunService, DagService, } from "../requests/services.gen"; import { DagRunState } from "../requests/types.gen"; @@ -168,3 +169,34 @@ export const useConnectionServiceGetConnectionSuspense = < queryFn: () => ConnectionService.getConnection({ connectionId }) as TData, ...options, }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetDagRunSuspense = < + TData = Common.DagRunServiceGetDagRunDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, + ...options, + }); diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 023a2a458dd7..75d2d2a598a9 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -17,6 +17,8 @@ import type { DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, + GetDagRunData, + GetDagRunResponse, } from "./types.gen"; export class AssetService { @@ -246,3 +248,32 @@ export class ConnectionService { }); } } + +export class DagRunService { + /** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ + public static getDagRun( + data: GetDagRunData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } +} diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 5f16d3dab602..a5ad2acc34ea 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -243,6 +243,13 @@ export type GetConnectionData = { export type GetConnectionResponse = ConnectionResponse; +export type GetDagRunData = { + dagId: string; + dagRunId: string; +}; + +export type GetDagRunResponse = DAGRunResponse; + export type $OpenApiTs = { "/ui/next_run_assets/{dag_id}": { get: { @@ -419,4 +426,31 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}": { + get: { + req: GetDagRunData; + res: { + /** + * Successful Response + */ + 200: DAGRunResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; }; From f8332d1d80e67ed98f5d043800174be1dff22657 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 4 Oct 2024 16:43:28 +0530 Subject: [PATCH 04/13] add types --- airflow/api_fastapi/openapi/v1-generated.yaml | 15 +++++++++++---- airflow/api_fastapi/serializers/dag_run.py | 7 +++++-- airflow/ui/openapi-gen/requests/schemas.gen.ts | 13 +++++++++---- airflow/ui/openapi-gen/requests/types.gen.ts | 13 +++++++++++-- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 14e5c8d8949b..9b52fdeff7d0 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -1014,11 +1014,9 @@ components: - type: 'null' title: Last Scheduling Decision run_type: - type: string - title: Run Type + $ref: '#/components/schemas/DagRunType' state: - type: string - title: State + $ref: '#/components/schemas/DagRunState' external_trigger: type: boolean title: External Trigger @@ -1063,6 +1061,15 @@ components: so please ensure that their values always match the ones with the same name in TaskInstanceState.' + DagRunType: + type: string + enum: + - backfill + - scheduled + - manual + - dataset_triggered + title: DagRunType + description: Class with DagRun types. DagTagPydantic: properties: name: diff --git a/airflow/api_fastapi/serializers/dag_run.py b/airflow/api_fastapi/serializers/dag_run.py index ec525405c90b..3de560485ca1 100644 --- a/airflow/api_fastapi/serializers/dag_run.py +++ b/airflow/api_fastapi/serializers/dag_run.py @@ -19,6 +19,9 @@ from pydantic import BaseModel +from airflow.utils.state import DagRunState +from airflow.utils.types import DagRunType + class DAGRunResponse(BaseModel): """DAG Run serializer for responses.""" @@ -31,8 +34,8 @@ class DAGRunResponse(BaseModel): data_interval_start: str | None data_interval_end: str | None last_scheduling_decision: str | None - run_type: str # Enum - state: str # Enum + run_type: DagRunType + state: DagRunState external_trigger: bool conf: dict notes: str | None diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 3d2cbd8d4b40..c38773a402a6 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -868,12 +868,10 @@ export const $DAGRunResponse = { title: "Last Scheduling Decision", }, run_type: { - type: "string", - title: "Run Type", + $ref: "#/components/schemas/DagRunType", }, state: { - type: "string", - title: "State", + $ref: "#/components/schemas/DagRunState", }, external_trigger: { type: "boolean", @@ -926,6 +924,13 @@ so please ensure that their values always match the ones with the same name in TaskInstanceState.`, } as const; +export const $DagRunType = { + type: "string", + enum: ["backfill", "scheduled", "manual", "dataset_triggered"], + title: "DagRunType", + description: "Class with DagRun types.", +} as const; + export const $DagTagPydantic = { properties: { name: { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index a5ad2acc34ea..a3e20b4b2f17 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -132,8 +132,8 @@ export type DAGRunResponse = { data_interval_start: string | null; data_interval_end: string | null; last_scheduling_decision: string | null; - run_type: string; - state: string; + run_type: DagRunType; + state: DagRunState; external_trigger: boolean; conf: { [key: string]: unknown; @@ -150,6 +150,15 @@ export type DAGRunResponse = { */ export type DagRunState = "queued" | "running" | "success" | "failed"; +/** + * Class with DagRun types. + */ +export type DagRunType = + | "backfill" + | "scheduled" + | "manual" + | "dataset_triggered"; + /** * Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API. */ From 939153e34b8a9aeb5421df1e2068998899cfb585 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 4 Oct 2024 21:06:32 +0530 Subject: [PATCH 05/13] add test --- airflow/api_fastapi/openapi/v1-generated.yaml | 22 ++- airflow/api_fastapi/serializers/dag_run.py | 7 +- .../ui/openapi-gen/requests/schemas.gen.ts | 26 +++- airflow/ui/openapi-gen/requests/types.gen.ts | 16 ++- .../api_fastapi/views/public/test_dag_run.py | 127 ++++++++++++++++++ 5 files changed, 188 insertions(+), 10 deletions(-) create mode 100644 tests/api_fastapi/views/public/test_dag_run.py diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 9b52fdeff7d0..27b896067410 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -975,11 +975,11 @@ components: description: DAG serializer for responses. DAGRunResponse: properties: - dag_run_id: + run_id: anyOf: - type: string - type: 'null' - title: Dag Run Id + title: Run Id dag_id: type: string title: Dag Id @@ -1020,6 +1020,8 @@ components: external_trigger: type: boolean title: External Trigger + triggered_by: + $ref: '#/components/schemas/DagRunTriggeredByType' conf: type: object title: Conf @@ -1030,7 +1032,7 @@ components: title: Notes type: object required: - - dag_run_id + - run_id - dag_id - logical_date - start_date @@ -1041,6 +1043,7 @@ components: - run_type - state - external_trigger + - triggered_by - conf - notes title: DAGRunResponse @@ -1061,6 +1064,19 @@ components: so please ensure that their values always match the ones with the same name in TaskInstanceState.' + DagRunTriggeredByType: + type: string + enum: + - cli + - operator + - rest_api + - ui + - test + - timetable + - dataset + - backfill + title: DagRunTriggeredByType + description: Class with TriggeredBy types for DagRun. DagRunType: type: string enum: diff --git a/airflow/api_fastapi/serializers/dag_run.py b/airflow/api_fastapi/serializers/dag_run.py index 3de560485ca1..e8528fc7f545 100644 --- a/airflow/api_fastapi/serializers/dag_run.py +++ b/airflow/api_fastapi/serializers/dag_run.py @@ -17,16 +17,16 @@ from __future__ import annotations -from pydantic import BaseModel +from pydantic import BaseModel, Field from airflow.utils.state import DagRunState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType class DAGRunResponse(BaseModel): """DAG Run serializer for responses.""" - dag_run_id: str | None + dag_run_id: str | None = Field(alias="run_id") dag_id: str logical_date: str | None start_date: str | None @@ -37,5 +37,6 @@ class DAGRunResponse(BaseModel): run_type: DagRunType state: DagRunState external_trigger: bool + triggered_by: DagRunTriggeredByType conf: dict notes: str | None diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index c38773a402a6..eff1ee3962e2 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -786,7 +786,7 @@ export const $DAGResponse = { export const $DAGRunResponse = { properties: { - dag_run_id: { + run_id: { anyOf: [ { type: "string", @@ -795,7 +795,7 @@ export const $DAGRunResponse = { type: "null", }, ], - title: "Dag Run Id", + title: "Run Id", }, dag_id: { type: "string", @@ -877,6 +877,9 @@ export const $DAGRunResponse = { type: "boolean", title: "External Trigger", }, + triggered_by: { + $ref: "#/components/schemas/DagRunTriggeredByType", + }, conf: { type: "object", title: "Conf", @@ -895,7 +898,7 @@ export const $DAGRunResponse = { }, type: "object", required: [ - "dag_run_id", + "run_id", "dag_id", "logical_date", "start_date", @@ -906,6 +909,7 @@ export const $DAGRunResponse = { "run_type", "state", "external_trigger", + "triggered_by", "conf", "notes", ], @@ -924,6 +928,22 @@ so please ensure that their values always match the ones with the same name in TaskInstanceState.`, } as const; +export const $DagRunTriggeredByType = { + type: "string", + enum: [ + "cli", + "operator", + "rest_api", + "ui", + "test", + "timetable", + "dataset", + "backfill", + ], + title: "DagRunTriggeredByType", + description: "Class with TriggeredBy types for DagRun.", +} as const; + export const $DagRunType = { type: "string", enum: ["backfill", "scheduled", "manual", "dataset_triggered"], diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index a3e20b4b2f17..6bb9311fa79c 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -124,7 +124,7 @@ export type DAGResponse = { * DAG Run serializer for responses. */ export type DAGRunResponse = { - dag_run_id: string | null; + run_id: string | null; dag_id: string; logical_date: string | null; start_date: string | null; @@ -135,6 +135,7 @@ export type DAGRunResponse = { run_type: DagRunType; state: DagRunState; external_trigger: boolean; + triggered_by: DagRunTriggeredByType; conf: { [key: string]: unknown; }; @@ -150,6 +151,19 @@ export type DAGRunResponse = { */ export type DagRunState = "queued" | "running" | "success" | "failed"; +/** + * Class with TriggeredBy types for DagRun. + */ +export type DagRunTriggeredByType = + | "cli" + | "operator" + | "rest_api" + | "ui" + | "test" + | "timetable" + | "dataset" + | "backfill"; + /** * Class with DagRun types. */ diff --git a/tests/api_fastapi/views/public/test_dag_run.py b/tests/api_fastapi/views/public/test_dag_run.py new file mode 100644 index 000000000000..8c07d0ed1dec --- /dev/null +++ b/tests/api_fastapi/views/public/test_dag_run.py @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +from airflow.operators.empty import EmptyOperator +from airflow.utils.session import provide_session +from airflow.utils.state import DagRunState +from airflow.utils.types import DagRunTriggeredByType, DagRunType +from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags + +pytestmark = pytest.mark.db_test + +DAG1_ID = "test_dag1" +DAG2_ID = "test_dag2" +DAG1_RUN1_ID = "dag_run_1" +DAG1_RUN2_ID = "dag_run_2" +DAG2_RUN1_ID = "dag_run_3" +DAG2_RUN2_ID = "dag_run_4" +DAG1_RUN1_STATE = DagRunState.SUCCESS +DAG1_RUN2_STATE = DagRunState.FAILED +DAG2_RUN1_STATE = DagRunState.SUCCESS +DAG2_RUN2_STATE = DagRunState.SUCCESS +DAG1_RUN1_RUN_TYPE = DagRunType.MANUAL +DAG1_RUN2_RUN_TYPE = DagRunType.SCHEDULED +DAG2_RUN1_RUN_TYPE = DagRunType.BACKFILL_JOB +DAG2_RUN2_RUN_TYPE = DagRunType.DATASET_TRIGGERED +DAG1_RUN1_TRIGGERED_BY = DagRunTriggeredByType.UI +DAG1_RUN2_TRIGGERED_BY = DagRunTriggeredByType.DATASET +DAG2_RUN1_TRIGGERED_BY = DagRunTriggeredByType.CLI +DAG2_RUN2_TRIGGERED_BY = DagRunTriggeredByType.REST_API +START_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc) +EXECUTION_DATE = datetime(2024, 6, 16, 0, 0, tzinfo=timezone.utc) + + +@pytest.fixture(autouse=True) +@provide_session +def setup(dag_maker, session=None): + clear_db_runs() + clear_db_dags() + clear_db_serialized_dags() + + with dag_maker( + DAG1_ID, + schedule="@daily", + start_date=START_DATE, + ): + EmptyOperator(task_id="task_1") + dag_maker.create_dagrun( + run_id=DAG1_RUN1_ID, + state=DAG1_RUN1_STATE, + run_type=DAG1_RUN1_RUN_TYPE, + triggered_by=DAG1_RUN1_TRIGGERED_BY, + ) + dag_maker.create_dagrun( + run_id=DAG1_RUN2_ID, + state=DAG1_RUN2_STATE, + run_type=DAG1_RUN2_RUN_TYPE, + triggered_by=DAG1_RUN2_TRIGGERED_BY, + execution_date=EXECUTION_DATE, + ) + + with dag_maker( + DAG2_ID, + schedule=None, + start_date=START_DATE, + ): + EmptyOperator(task_id="task_2") + dag_maker.create_dagrun( + run_id=DAG2_RUN1_ID, + state=DAG2_RUN1_STATE, + run_type=DAG2_RUN1_RUN_TYPE, + triggered_by=DAG2_RUN1_TRIGGERED_BY, + execution_date=EXECUTION_DATE, + ) + dag_maker.create_dagrun( + run_id=DAG2_RUN2_ID, + state=DAG2_RUN2_STATE, + run_type=DAG2_RUN2_RUN_TYPE, + triggered_by=DAG2_RUN2_TRIGGERED_BY, + execution_date=EXECUTION_DATE, + ) + + dag_maker.dagbag.sync_to_db() + dag_maker.dag_model + dag_maker.dag_model.has_task_concurrency_limits = True + session.merge(dag_maker.dag_model) + session.commit() + + +@pytest.mark.parametrize( + "dag_id, run_id, state, run_type, triggered_by", + [ + (DAG1_ID, DAG1_RUN1_ID, DAG1_RUN1_STATE, DAG1_RUN1_RUN_TYPE, DAG1_RUN1_TRIGGERED_BY), + (DAG1_ID, DAG1_RUN2_ID, DAG1_RUN2_STATE, DAG1_RUN2_RUN_TYPE, DAG1_RUN2_TRIGGERED_BY), + (DAG2_ID, DAG2_RUN1_ID, DAG2_RUN1_STATE, DAG2_RUN1_RUN_TYPE, DAG2_RUN1_TRIGGERED_BY), + (DAG2_ID, DAG2_RUN2_ID, DAG2_RUN2_STATE, DAG2_RUN2_RUN_TYPE, DAG2_RUN2_TRIGGERED_BY), + ], +) +def test_get_dag_run(test_client, dag_id, run_id, state, run_type, triggered_by): + print(dag_id, run_id) + response = test_client.get(f"/api/v1/dags/{dag_id}/dagRuns/{run_id}") + assert response.status_code == 200 + body = response.json() + assert body["dag_id"] == dag_id + assert body["run_id"] == run_id + assert body["state"] == state + assert body["run_type"] == run_type + assert body["triggered_by"] == triggered_by From 0633e4c520d1fab8f9a504b4acf7165a49027974 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 4 Oct 2024 21:28:26 +0530 Subject: [PATCH 06/13] working tests --- airflow/api_fastapi/openapi/v1-generated.yaml | 12 ++++++------ airflow/api_fastapi/serializers/dag_run.py | 15 ++++++++------- airflow/api_fastapi/views/public/dag_run.py | 2 +- airflow/ui/openapi-gen/requests/schemas.gen.ts | 18 ++++++------------ airflow/ui/openapi-gen/requests/types.gen.ts | 1 - tests/api_fastapi/views/public/test_dag_run.py | 5 ++--- 6 files changed, 23 insertions(+), 30 deletions(-) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 27b896067410..eb9af3489073 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -986,31 +986,37 @@ components: logical_date: anyOf: - type: string + format: date-time - type: 'null' title: Logical Date start_date: anyOf: - type: string + format: date-time - type: 'null' title: Start Date end_date: anyOf: - type: string + format: date-time - type: 'null' title: End Date data_interval_start: anyOf: - type: string + format: date-time - type: 'null' title: Data Interval Start data_interval_end: anyOf: - type: string + format: date-time - type: 'null' title: Data Interval End last_scheduling_decision: anyOf: - type: string + format: date-time - type: 'null' title: Last Scheduling Decision run_type: @@ -1025,11 +1031,6 @@ components: conf: type: object title: Conf - notes: - anyOf: - - type: string - - type: 'null' - title: Notes type: object required: - run_id @@ -1045,7 +1046,6 @@ components: - external_trigger - triggered_by - conf - - notes title: DAGRunResponse description: DAG Run serializer for responses. DagRunState: diff --git a/airflow/api_fastapi/serializers/dag_run.py b/airflow/api_fastapi/serializers/dag_run.py index e8528fc7f545..4e177be6dd0f 100644 --- a/airflow/api_fastapi/serializers/dag_run.py +++ b/airflow/api_fastapi/serializers/dag_run.py @@ -17,6 +17,8 @@ from __future__ import annotations +from datetime import datetime + from pydantic import BaseModel, Field from airflow.utils.state import DagRunState @@ -28,15 +30,14 @@ class DAGRunResponse(BaseModel): dag_run_id: str | None = Field(alias="run_id") dag_id: str - logical_date: str | None - start_date: str | None - end_date: str | None - data_interval_start: str | None - data_interval_end: str | None - last_scheduling_decision: str | None + logical_date: datetime | None + start_date: datetime | None + end_date: datetime | None + data_interval_start: datetime | None + data_interval_end: datetime | None + last_scheduling_decision: datetime | None run_type: DagRunType state: DagRunState external_trigger: bool triggered_by: DagRunTriggeredByType conf: dict - notes: str | None diff --git a/airflow/api_fastapi/views/public/dag_run.py b/airflow/api_fastapi/views/public/dag_run.py index ada8507cc7e4..34565102f735 100644 --- a/airflow/api_fastapi/views/public/dag_run.py +++ b/airflow/api_fastapi/views/public/dag_run.py @@ -44,4 +44,4 @@ async def get_dag_run( 404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" ) - return dag_run + return DAGRunResponse.model_validate(dag_run, from_attributes=True) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index eff1ee3962e2..811102d4c7c2 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -805,6 +805,7 @@ export const $DAGRunResponse = { anyOf: [ { type: "string", + format: "date-time", }, { type: "null", @@ -816,6 +817,7 @@ export const $DAGRunResponse = { anyOf: [ { type: "string", + format: "date-time", }, { type: "null", @@ -827,6 +829,7 @@ export const $DAGRunResponse = { anyOf: [ { type: "string", + format: "date-time", }, { type: "null", @@ -838,6 +841,7 @@ export const $DAGRunResponse = { anyOf: [ { type: "string", + format: "date-time", }, { type: "null", @@ -849,6 +853,7 @@ export const $DAGRunResponse = { anyOf: [ { type: "string", + format: "date-time", }, { type: "null", @@ -860,6 +865,7 @@ export const $DAGRunResponse = { anyOf: [ { type: "string", + format: "date-time", }, { type: "null", @@ -884,17 +890,6 @@ export const $DAGRunResponse = { type: "object", title: "Conf", }, - notes: { - anyOf: [ - { - type: "string", - }, - { - type: "null", - }, - ], - title: "Notes", - }, }, type: "object", required: [ @@ -911,7 +906,6 @@ export const $DAGRunResponse = { "external_trigger", "triggered_by", "conf", - "notes", ], title: "DAGRunResponse", description: "DAG Run serializer for responses.", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 6bb9311fa79c..b46720273139 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -139,7 +139,6 @@ export type DAGRunResponse = { conf: { [key: string]: unknown; }; - notes: string | null; }; /** diff --git a/tests/api_fastapi/views/public/test_dag_run.py b/tests/api_fastapi/views/public/test_dag_run.py index 8c07d0ed1dec..bda188f92970 100644 --- a/tests/api_fastapi/views/public/test_dag_run.py +++ b/tests/api_fastapi/views/public/test_dag_run.py @@ -116,12 +116,11 @@ def setup(dag_maker, session=None): ], ) def test_get_dag_run(test_client, dag_id, run_id, state, run_type, triggered_by): - print(dag_id, run_id) - response = test_client.get(f"/api/v1/dags/{dag_id}/dagRuns/{run_id}") + response = test_client.get(f"/public/dags/{dag_id}/dagRuns/{run_id}") assert response.status_code == 200 body = response.json() assert body["dag_id"] == dag_id assert body["run_id"] == run_id assert body["state"] == state assert body["run_type"] == run_type - assert body["triggered_by"] == triggered_by + assert body["triggered_by"] == triggered_by.value From 2ee6bbc648f6c680cb4417af367fe302a2f87132 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Mon, 7 Oct 2024 21:06:44 +0530 Subject: [PATCH 07/13] add note to DagRunResponse --- airflow/api_fastapi/serializers/dag_run.py | 2 ++ tests/api_fastapi/views/public/test_dag_run.py | 1 + 2 files changed, 3 insertions(+) diff --git a/airflow/api_fastapi/serializers/dag_run.py b/airflow/api_fastapi/serializers/dag_run.py index 4e177be6dd0f..edbd08fc3467 100644 --- a/airflow/api_fastapi/serializers/dag_run.py +++ b/airflow/api_fastapi/serializers/dag_run.py @@ -21,6 +21,7 @@ from pydantic import BaseModel, Field +# from airflow.models.dagrun import DagRunNote from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -41,3 +42,4 @@ class DAGRunResponse(BaseModel): external_trigger: bool triggered_by: DagRunTriggeredByType conf: dict + # note: DagRunNote diff --git a/tests/api_fastapi/views/public/test_dag_run.py b/tests/api_fastapi/views/public/test_dag_run.py index bda188f92970..b628b1bca6e9 100644 --- a/tests/api_fastapi/views/public/test_dag_run.py +++ b/tests/api_fastapi/views/public/test_dag_run.py @@ -97,6 +97,7 @@ def setup(dag_maker, session=None): run_type=DAG2_RUN2_RUN_TYPE, triggered_by=DAG2_RUN2_TRIGGERED_BY, execution_date=EXECUTION_DATE, + note="hello!", ) dag_maker.dagbag.sync_to_db() From 3763684d3488041f4f5445e4b53491adecef07a3 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 8 Oct 2024 05:48:51 +0530 Subject: [PATCH 08/13] add note --- airflow/api_fastapi/openapi/v1-generated.yaml | 38 +++++++++++++ airflow/api_fastapi/serializers/dag_run.py | 4 +- airflow/api_fastapi/views/public/dag_run.py | 1 - airflow/serialization/pydantic/dag_run.py | 12 ++++ .../ui/openapi-gen/requests/schemas.gen.ts | 57 +++++++++++++++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 12 ++++ .../api_fastapi/views/public/test_dag_run.py | 2 +- 7 files changed, 123 insertions(+), 3 deletions(-) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index eb9af3489073..e706ac4ce235 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -1031,6 +1031,10 @@ components: conf: type: object title: Conf + note: + anyOf: + - $ref: '#/components/schemas/DagRunNotePydantic' + - type: 'null' type: object required: - run_id @@ -1046,8 +1050,42 @@ components: - external_trigger - triggered_by - conf + - note title: DAGRunResponse description: DAG Run serializer for responses. + DagRunNotePydantic: + properties: + dag_run_id: + type: string + title: Dag Run Id + content: + anyOf: + - type: string + - type: 'null' + title: Content + created_at: + type: string + format: date-time + title: Created At + updated_at: + type: string + format: date-time + title: Updated At + user_id: + anyOf: + - type: integer + - type: 'null' + title: User Id + type: object + required: + - dag_run_id + - content + - created_at + - updated_at + - user_id + title: DagRunNotePydantic + description: Serializable representation of the DagRunNote ORM SqlAlchemyModel + used by internal API. DagRunState: type: string enum: diff --git a/airflow/api_fastapi/serializers/dag_run.py b/airflow/api_fastapi/serializers/dag_run.py index edbd08fc3467..7f6a1b92f951 100644 --- a/airflow/api_fastapi/serializers/dag_run.py +++ b/airflow/api_fastapi/serializers/dag_run.py @@ -21,6 +21,8 @@ from pydantic import BaseModel, Field +from airflow.serialization.pydantic.dag_run import DagRunNotePydantic + # from airflow.models.dagrun import DagRunNote from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -42,4 +44,4 @@ class DAGRunResponse(BaseModel): external_trigger: bool triggered_by: DagRunTriggeredByType conf: dict - # note: DagRunNote + note: DagRunNotePydantic | None diff --git a/airflow/api_fastapi/views/public/dag_run.py b/airflow/api_fastapi/views/public/dag_run.py index 34565102f735..bc69fb51f8cf 100644 --- a/airflow/api_fastapi/views/public/dag_run.py +++ b/airflow/api_fastapi/views/public/dag_run.py @@ -38,7 +38,6 @@ async def get_dag_run( dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)] ) -> DAGRunResponse: dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) - if dag_run is None: raise HTTPException( 404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" diff --git a/airflow/serialization/pydantic/dag_run.py b/airflow/serialization/pydantic/dag_run.py index 86857452e831..f3851b777472 100644 --- a/airflow/serialization/pydantic/dag_run.py +++ b/airflow/serialization/pydantic/dag_run.py @@ -115,3 +115,15 @@ def get_log_template(self, session: Session): DagRunPydantic.model_rebuild() + + +class DagRunNotePydantic(BaseModelPydantic): + """Serializable representation of the DagRunNote ORM SqlAlchemyModel used by internal API.""" + + dag_run_id: str + content: str | None + created_at: datetime + updated_at: datetime + user_id: int | None + + model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 811102d4c7c2..b7399229bca7 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -890,6 +890,16 @@ export const $DAGRunResponse = { type: "object", title: "Conf", }, + note: { + anyOf: [ + { + $ref: "#/components/schemas/DagRunNotePydantic", + }, + { + type: "null", + }, + ], + }, }, type: "object", required: [ @@ -906,11 +916,58 @@ export const $DAGRunResponse = { "external_trigger", "triggered_by", "conf", + "note", ], title: "DAGRunResponse", description: "DAG Run serializer for responses.", } as const; +export const $DagRunNotePydantic = { + properties: { + dag_run_id: { + type: "string", + title: "Dag Run Id", + }, + content: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Content", + }, + created_at: { + type: "string", + format: "date-time", + title: "Created At", + }, + updated_at: { + type: "string", + format: "date-time", + title: "Updated At", + }, + user_id: { + anyOf: [ + { + type: "integer", + }, + { + type: "null", + }, + ], + title: "User Id", + }, + }, + type: "object", + required: ["dag_run_id", "content", "created_at", "updated_at", "user_id"], + title: "DagRunNotePydantic", + description: + "Serializable representation of the DagRunNote ORM SqlAlchemyModel used by internal API.", +} as const; + export const $DagRunState = { type: "string", enum: ["queued", "running", "success", "failed"], diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index b46720273139..372b1440affc 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -139,6 +139,18 @@ export type DAGRunResponse = { conf: { [key: string]: unknown; }; + note: DagRunNotePydantic | null; +}; + +/** + * Serializable representation of the DagRunNote ORM SqlAlchemyModel used by internal API. + */ +export type DagRunNotePydantic = { + dag_run_id: string; + content: string | null; + created_at: string; + updated_at: string; + user_id: number | null; }; /** diff --git a/tests/api_fastapi/views/public/test_dag_run.py b/tests/api_fastapi/views/public/test_dag_run.py index b628b1bca6e9..e08640be0527 100644 --- a/tests/api_fastapi/views/public/test_dag_run.py +++ b/tests/api_fastapi/views/public/test_dag_run.py @@ -97,7 +97,6 @@ def setup(dag_maker, session=None): run_type=DAG2_RUN2_RUN_TYPE, triggered_by=DAG2_RUN2_TRIGGERED_BY, execution_date=EXECUTION_DATE, - note="hello!", ) dag_maker.dagbag.sync_to_db() @@ -125,3 +124,4 @@ def test_get_dag_run(test_client, dag_id, run_id, state, run_type, triggered_by) assert body["state"] == state assert body["run_type"] == run_type assert body["triggered_by"] == triggered_by.value + assert body["note"] is None From b8847749977ebbe0f56e40ce33f90105634583da Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 8 Oct 2024 23:36:52 +0530 Subject: [PATCH 09/13] add test to test non Null note --- airflow/api_fastapi/openapi/v1-generated.yaml | 36 +------------- airflow/api_fastapi/serializers/dag_run.py | 12 +---- .../ui/openapi-gen/requests/schemas.gen.ts | 49 +------------------ airflow/ui/openapi-gen/requests/types.gen.ts | 13 +---- .../api_fastapi/views/public/test_dag_run.py | 19 ++++--- 5 files changed, 17 insertions(+), 112 deletions(-) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 4c180a71f3af..a59cc41640b9 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -1074,8 +1074,9 @@ components: title: Conf note: anyOf: - - $ref: '#/components/schemas/DagRunNotePydantic' + - type: string - type: 'null' + title: Note type: object required: - run_id @@ -1094,39 +1095,6 @@ components: - note title: DAGRunResponse description: DAG Run serializer for responses. - DagRunNotePydantic: - properties: - dag_run_id: - type: string - title: Dag Run Id - content: - anyOf: - - type: string - - type: 'null' - title: Content - created_at: - type: string - format: date-time - title: Created At - updated_at: - type: string - format: date-time - title: Updated At - user_id: - anyOf: - - type: integer - - type: 'null' - title: User Id - type: object - required: - - dag_run_id - - content - - created_at - - updated_at - - user_id - title: DagRunNotePydantic - description: Serializable representation of the DagRunNote ORM SqlAlchemyModel - used by internal API. DagRunState: type: string enum: diff --git a/airflow/api_fastapi/serializers/dag_run.py b/airflow/api_fastapi/serializers/dag_run.py index d1f222529939..4622fac645c0 100644 --- a/airflow/api_fastapi/serializers/dag_run.py +++ b/airflow/api_fastapi/serializers/dag_run.py @@ -41,14 +41,4 @@ class DAGRunResponse(BaseModel): external_trigger: bool triggered_by: DagRunTriggeredByType conf: dict - note: DagRunNotePydantic | None - - -class DagRunNotePydantic(BaseModel): - """Serializable representation of the DagRunNote ORM SqlAlchemyModel used by internal API.""" - - dag_run_id: str - content: str | None - created_at: datetime - updated_at: datetime - user_id: int | None + note: str | None diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index b7399229bca7..6bd89d5615ac 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -893,12 +893,13 @@ export const $DAGRunResponse = { note: { anyOf: [ { - $ref: "#/components/schemas/DagRunNotePydantic", + type: "string", }, { type: "null", }, ], + title: "Note", }, }, type: "object", @@ -922,52 +923,6 @@ export const $DAGRunResponse = { description: "DAG Run serializer for responses.", } as const; -export const $DagRunNotePydantic = { - properties: { - dag_run_id: { - type: "string", - title: "Dag Run Id", - }, - content: { - anyOf: [ - { - type: "string", - }, - { - type: "null", - }, - ], - title: "Content", - }, - created_at: { - type: "string", - format: "date-time", - title: "Created At", - }, - updated_at: { - type: "string", - format: "date-time", - title: "Updated At", - }, - user_id: { - anyOf: [ - { - type: "integer", - }, - { - type: "null", - }, - ], - title: "User Id", - }, - }, - type: "object", - required: ["dag_run_id", "content", "created_at", "updated_at", "user_id"], - title: "DagRunNotePydantic", - description: - "Serializable representation of the DagRunNote ORM SqlAlchemyModel used by internal API.", -} as const; - export const $DagRunState = { type: "string", enum: ["queued", "running", "success", "failed"], diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 6c6ab57c0b5f..357fc2597269 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -139,18 +139,7 @@ export type DAGRunResponse = { conf: { [key: string]: unknown; }; - note: DagRunNotePydantic | null; -}; - -/** - * Serializable representation of the DagRunNote ORM SqlAlchemyModel used by internal API. - */ -export type DagRunNotePydantic = { - dag_run_id: string; - content: string | null; - created_at: string; - updated_at: string; - user_id: number | null; + note: string | null; }; /** diff --git a/tests/api_fastapi/views/public/test_dag_run.py b/tests/api_fastapi/views/public/test_dag_run.py index e08640be0527..a6bafb7a7408 100644 --- a/tests/api_fastapi/views/public/test_dag_run.py +++ b/tests/api_fastapi/views/public/test_dag_run.py @@ -49,6 +49,7 @@ DAG2_RUN2_TRIGGERED_BY = DagRunTriggeredByType.REST_API START_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc) EXECUTION_DATE = datetime(2024, 6, 16, 0, 0, tzinfo=timezone.utc) +DAG1_NOTE = "test_note" @pytest.fixture(autouse=True) @@ -64,12 +65,14 @@ def setup(dag_maker, session=None): start_date=START_DATE, ): EmptyOperator(task_id="task_1") - dag_maker.create_dagrun( + dag1 = dag_maker.create_dagrun( run_id=DAG1_RUN1_ID, state=DAG1_RUN1_STATE, run_type=DAG1_RUN1_RUN_TYPE, triggered_by=DAG1_RUN1_TRIGGERED_BY, ) + dag1.note = (DAG1_NOTE, 1) + dag_maker.create_dagrun( run_id=DAG1_RUN2_ID, state=DAG1_RUN2_STATE, @@ -107,15 +110,15 @@ def setup(dag_maker, session=None): @pytest.mark.parametrize( - "dag_id, run_id, state, run_type, triggered_by", + "dag_id, run_id, state, run_type, triggered_by, dag_run_note", [ - (DAG1_ID, DAG1_RUN1_ID, DAG1_RUN1_STATE, DAG1_RUN1_RUN_TYPE, DAG1_RUN1_TRIGGERED_BY), - (DAG1_ID, DAG1_RUN2_ID, DAG1_RUN2_STATE, DAG1_RUN2_RUN_TYPE, DAG1_RUN2_TRIGGERED_BY), - (DAG2_ID, DAG2_RUN1_ID, DAG2_RUN1_STATE, DAG2_RUN1_RUN_TYPE, DAG2_RUN1_TRIGGERED_BY), - (DAG2_ID, DAG2_RUN2_ID, DAG2_RUN2_STATE, DAG2_RUN2_RUN_TYPE, DAG2_RUN2_TRIGGERED_BY), + (DAG1_ID, DAG1_RUN1_ID, DAG1_RUN1_STATE, DAG1_RUN1_RUN_TYPE, DAG1_RUN1_TRIGGERED_BY, DAG1_NOTE), + (DAG1_ID, DAG1_RUN2_ID, DAG1_RUN2_STATE, DAG1_RUN2_RUN_TYPE, DAG1_RUN2_TRIGGERED_BY, None), + (DAG2_ID, DAG2_RUN1_ID, DAG2_RUN1_STATE, DAG2_RUN1_RUN_TYPE, DAG2_RUN1_TRIGGERED_BY, None), + (DAG2_ID, DAG2_RUN2_ID, DAG2_RUN2_STATE, DAG2_RUN2_RUN_TYPE, DAG2_RUN2_TRIGGERED_BY, None), ], ) -def test_get_dag_run(test_client, dag_id, run_id, state, run_type, triggered_by): +def test_get_dag_run(test_client, dag_id, run_id, state, run_type, triggered_by, dag_run_note): response = test_client.get(f"/public/dags/{dag_id}/dagRuns/{run_id}") assert response.status_code == 200 body = response.json() @@ -124,4 +127,4 @@ def test_get_dag_run(test_client, dag_id, run_id, state, run_type, triggered_by) assert body["state"] == state assert body["run_type"] == run_type assert body["triggered_by"] == triggered_by.value - assert body["note"] is None + assert body["note"] == dag_run_note From 56a574c2d4b43dc4ee1420943f2bfacbe905202d Mon Sep 17 00:00:00 2001 From: Kalyan R Date: Wed, 9 Oct 2024 15:13:27 +0530 Subject: [PATCH 10/13] Update airflow/api_fastapi/views/public/dag_run.py Co-authored-by: Pierre Jeambrun --- airflow/api_fastapi/views/public/dag_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/views/public/dag_run.py b/airflow/api_fastapi/views/public/dag_run.py index bc69fb51f8cf..be62716e8f8c 100644 --- a/airflow/api_fastapi/views/public/dag_run.py +++ b/airflow/api_fastapi/views/public/dag_run.py @@ -28,7 +28,7 @@ from airflow.api_fastapi.views.router import AirflowRouter from airflow.models import DagRun -dag_run_router = AirflowRouter(tags=["DagRun"]) +dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns") @dag_run_router.get( From 57d2d41fce00308e515c5dee8d622379599d736a Mon Sep 17 00:00:00 2001 From: Kalyan R Date: Wed, 9 Oct 2024 21:16:50 +0530 Subject: [PATCH 11/13] Update airflow/api_fastapi/views/public/dag_run.py Co-authored-by: Pierre Jeambrun --- airflow/api_fastapi/views/public/dag_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/views/public/dag_run.py b/airflow/api_fastapi/views/public/dag_run.py index be62716e8f8c..256463d8ceb8 100644 --- a/airflow/api_fastapi/views/public/dag_run.py +++ b/airflow/api_fastapi/views/public/dag_run.py @@ -32,7 +32,7 @@ @dag_run_router.get( - "/dags/{dag_id}/dagRuns/{dag_run_id}", responses=create_openapi_http_exception_doc([401, 403, 404]) + "/{dag_run_id}", responses=create_openapi_http_exception_doc([401, 403, 404]) ) async def get_dag_run( dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)] From 1afae1cb2bc125f24a53cdcf10dc994dff4f6e47 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Wed, 9 Oct 2024 21:31:29 +0530 Subject: [PATCH 12/13] Merge branch 'main' of https://github.com/apache/airflow into kalyan/AIP-84/get_dag_run --- airflow/api_fastapi/openapi/v1-generated.yaml | 131 +++++++++++++++++ airflow/api_fastapi/views/public/dag_run.py | 4 +- airflow/ui/openapi-gen/queries/common.ts | 19 +++ airflow/ui/openapi-gen/queries/prefetch.ts | 23 +++ airflow/ui/openapi-gen/queries/queries.ts | 32 ++++ airflow/ui/openapi-gen/queries/suspense.ts | 32 ++++ .../ui/openapi-gen/requests/schemas.gen.ts | 139 ++++++++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 31 ++++ airflow/ui/openapi-gen/requests/types.gen.ts | 56 +++++++ 9 files changed, 464 insertions(+), 3 deletions(-) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index cfd8ec73668f..5533e2803b20 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -579,6 +579,56 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}: + get: + tags: + - DagRun + summary: Get Dag Run + operationId: get_dag_run + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGRunResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' components: schemas: ConnectionResponse: @@ -1047,6 +1097,87 @@ components: - file_token title: DAGResponse description: DAG serializer for responses. + DAGRunResponse: + properties: + run_id: + anyOf: + - type: string + - type: 'null' + title: Run Id + dag_id: + type: string + title: Dag Id + logical_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date + start_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Start Date + end_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: End Date + data_interval_start: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Data Interval Start + data_interval_end: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Data Interval End + last_scheduling_decision: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Scheduling Decision + run_type: + $ref: '#/components/schemas/DagRunType' + state: + $ref: '#/components/schemas/DagRunState' + external_trigger: + type: boolean + title: External Trigger + triggered_by: + $ref: '#/components/schemas/DagRunTriggeredByType' + conf: + type: object + title: Conf + note: + anyOf: + - type: string + - type: 'null' + title: Note + type: object + required: + - run_id + - dag_id + - logical_date + - start_date + - end_date + - data_interval_start + - data_interval_end + - last_scheduling_decision + - run_type + - state + - external_trigger + - triggered_by + - conf + - note + title: DAGRunResponse + description: DAG Run serializer for responses. DAGRunStates: properties: queued: diff --git a/airflow/api_fastapi/views/public/dag_run.py b/airflow/api_fastapi/views/public/dag_run.py index 256463d8ceb8..d39fb6f2f331 100644 --- a/airflow/api_fastapi/views/public/dag_run.py +++ b/airflow/api_fastapi/views/public/dag_run.py @@ -31,9 +31,7 @@ dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns") -@dag_run_router.get( - "/{dag_run_id}", responses=create_openapi_http_exception_doc([401, 403, 404]) -) +@dag_run_router.get("/{dag_run_id}", responses=create_openapi_http_exception_doc([401, 403, 404])) async def get_dag_run( dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)] ) -> DAGRunResponse: diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 72fd0ef9ccde..4224c3b43a25 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -4,6 +4,7 @@ import { UseQueryResult } from "@tanstack/react-query"; import { AssetService, ConnectionService, + DagRunService, DagService, DashboardService, VariableService, @@ -150,6 +151,24 @@ export const UseVariableServiceGetVariableKeyFn = ( }, queryKey?: Array, ) => [useVariableServiceGetVariableKey, ...(queryKey ?? [{ variableKey }])]; +export type DagRunServiceGetDagRunDefaultResponse = Awaited< + ReturnType +>; +export type DagRunServiceGetDagRunQueryResult< + TData = DagRunServiceGetDagRunDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagRunServiceGetDagRunKey = "DagRunServiceGetDagRun"; +export const UseDagRunServiceGetDagRunKeyFn = ( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: Array, +) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])]; export type DagServicePatchDagsMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index a114b9dc92c6..a1d966f7291a 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -4,6 +4,7 @@ import { type QueryClient } from "@tanstack/react-query"; import { AssetService, ConnectionService, + DagRunService, DagService, DashboardService, VariableService, @@ -186,3 +187,25 @@ export const prefetchUseVariableServiceGetVariable = ( queryKey: Common.UseVariableServiceGetVariableKeyFn({ variableKey }), queryFn: () => VariableService.getVariable({ variableKey }), }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagRunServiceGetDagRun = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }), + }); diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index a3ce02257160..b9ba156d12c0 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -9,6 +9,7 @@ import { import { AssetService, ConnectionService, + DagRunService, DagService, DashboardService, VariableService, @@ -237,6 +238,37 @@ export const useVariableServiceGetVariable = < queryFn: () => VariableService.getVariable({ variableKey }) as TData, ...options, }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetDagRun = < + TData = Common.DagRunServiceGetDagRunDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, + ...options, + }); /** * Patch Dags * Patch multiple DAGs. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index fbef843c6e0a..7c11ae730a77 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -4,6 +4,7 @@ import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query"; import { AssetService, ConnectionService, + DagRunService, DagService, DashboardService, VariableService, @@ -232,3 +233,34 @@ export const useVariableServiceGetVariableSuspense = < queryFn: () => VariableService.getVariable({ variableKey }) as TData, ...options, }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetDagRunSuspense = < + TData = Common.DagRunServiceGetDagRunDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, + ...options, + }); diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index e4cc18086b2e..18df5284651b 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -784,6 +784,145 @@ export const $DAGResponse = { description: "DAG serializer for responses.", } as const; +export const $DAGRunResponse = { + properties: { + run_id: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Run Id", + }, + dag_id: { + type: "string", + title: "Dag Id", + }, + logical_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Logical Date", + }, + start_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Start Date", + }, + end_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "End Date", + }, + data_interval_start: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Data Interval Start", + }, + data_interval_end: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Data Interval End", + }, + last_scheduling_decision: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Scheduling Decision", + }, + run_type: { + $ref: "#/components/schemas/DagRunType", + }, + state: { + $ref: "#/components/schemas/DagRunState", + }, + external_trigger: { + type: "boolean", + title: "External Trigger", + }, + triggered_by: { + $ref: "#/components/schemas/DagRunTriggeredByType", + }, + conf: { + type: "object", + title: "Conf", + }, + note: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Note", + }, + }, + type: "object", + required: [ + "run_id", + "dag_id", + "logical_date", + "start_date", + "end_date", + "data_interval_start", + "data_interval_end", + "last_scheduling_decision", + "run_type", + "state", + "external_trigger", + "triggered_by", + "conf", + "note", + ], + title: "DAGRunResponse", + description: "DAG Run serializer for responses.", +} as const; + export const $DAGRunStates = { properties: { queued: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 7f61fd32f349..30dc5c910fe9 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -23,6 +23,8 @@ import type { DeleteVariableResponse, GetVariableData, GetVariableResponse, + GetDagRunData, + GetDagRunResponse, } from "./types.gen"; export class AssetService { @@ -334,3 +336,32 @@ export class VariableService { }); } } + +export class DagRunService { + /** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ + public static getDagRun( + data: GetDagRunData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } +} diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 1e2d28314980..c1b4e7f9dc12 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -120,6 +120,28 @@ export type DAGResponse = { readonly file_token: string; }; +/** + * DAG Run serializer for responses. + */ +export type DAGRunResponse = { + run_id: string | null; + dag_id: string; + logical_date: string | null; + start_date: string | null; + end_date: string | null; + data_interval_start: string | null; + data_interval_end: string | null; + last_scheduling_decision: string | null; + run_type: DagRunType; + state: DagRunState; + external_trigger: boolean; + triggered_by: DagRunTriggeredByType; + conf: { + [key: string]: unknown; + }; + note: string | null; +}; + /** * DAG Run States for responses. */ @@ -320,6 +342,13 @@ export type GetVariableData = { export type GetVariableResponse = VariableResponse; +export type GetDagRunData = { + dagId: string; + dagRunId: string; +}; + +export type GetDagRunResponse = DAGRunResponse; + export type $OpenApiTs = { "/ui/next_run_assets/{dag_id}": { get: { @@ -567,4 +596,31 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}": { + get: { + req: GetDagRunData; + res: { + /** + * Successful Response + */ + 200: DAGRunResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; }; From 2bd6625e6603043ae100ec7a2fc971ed82032a24 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Wed, 9 Oct 2024 21:42:57 +0530 Subject: [PATCH 13/13] add 404 test --- tests/api_fastapi/views/public/test_dag_run.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/api_fastapi/views/public/test_dag_run.py b/tests/api_fastapi/views/public/test_dag_run.py index a6bafb7a7408..dab81907068e 100644 --- a/tests/api_fastapi/views/public/test_dag_run.py +++ b/tests/api_fastapi/views/public/test_dag_run.py @@ -128,3 +128,10 @@ def test_get_dag_run(test_client, dag_id, run_id, state, run_type, triggered_by, assert body["run_type"] == run_type assert body["triggered_by"] == triggered_by.value assert body["note"] == dag_run_note + + +def test_get_dag_run_not_found(test_client): + response = test_client.get(f"/public/dags/{DAG1_ID}/dagRuns/invalid") + assert response.status_code == 404 + body = response.json() + assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found"