diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 44891c0ef2c84..a862b7c969503 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -63,6 +63,7 @@ from airflow.models import DagModel, DagRun from airflow.timetables.base import DataInterval from airflow.utils.airflow_flask_app import get_airflow_app +from airflow.utils.api_migration import mark_fastapi_migration_done from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import DagRunState @@ -90,6 +91,7 @@ def delete_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSI return NoContent, HTTPStatus.NO_CONTENT +@mark_fastapi_migration_done @security.requires_access_dag("GET", DagAccessEntity.RUN) @provide_session def get_dag_run( diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index fb19c1abd1c12..7debfbb1008af 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -629,6 +629,56 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}: + get: + tags: + - DagRun + summary: Get Dag Run + operationId: get_dag_run + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGRunResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' components: schemas: ConnectionResponse: @@ -1097,6 +1147,87 @@ components: - file_token title: DAGResponse description: DAG serializer for responses. + DAGRunResponse: + properties: + run_id: + anyOf: + - type: string + - type: 'null' + title: Run Id + dag_id: + type: string + title: Dag Id + logical_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date + start_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Start Date + end_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: End Date + data_interval_start: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Data Interval Start + data_interval_end: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Data Interval End + last_scheduling_decision: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Scheduling Decision + run_type: + $ref: '#/components/schemas/DagRunType' + state: + $ref: '#/components/schemas/DagRunState' + external_trigger: + type: boolean + title: External Trigger + triggered_by: + $ref: '#/components/schemas/DagRunTriggeredByType' + conf: + type: object + title: Conf + note: + anyOf: + - type: string + - type: 'null' + title: Note + type: object + required: + - run_id + - dag_id + - logical_date + - start_date + - end_date + - data_interval_start + - data_interval_end + - last_scheduling_decision + - run_type + - state + - external_trigger + - triggered_by + - conf + - note + title: DAGRunResponse + description: DAG Run serializer for responses. DAGRunStates: properties: queued: @@ -1157,6 +1288,28 @@ components: so please ensure that their values always match the ones with the same name in TaskInstanceState.' + DagRunTriggeredByType: + type: string + enum: + - cli + - operator + - rest_api + - ui + - test + - timetable + - dataset + - backfill + title: DagRunTriggeredByType + description: Class with TriggeredBy types for DagRun. + DagRunType: + type: string + enum: + - backfill + - scheduled + - manual + - dataset_triggered + title: DagRunType + description: Class with DagRun types. DagTagPydantic: properties: name: diff --git a/airflow/api_fastapi/serializers/dag_run.py b/airflow/api_fastapi/serializers/dag_run.py new file mode 100644 index 0000000000000..4622fac645c07 --- /dev/null +++ b/airflow/api_fastapi/serializers/dag_run.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, Field + +from airflow.utils.state import DagRunState +from airflow.utils.types import DagRunTriggeredByType, DagRunType + + +class DAGRunResponse(BaseModel): + """DAG Run serializer for responses.""" + + dag_run_id: str | None = Field(alias="run_id") + dag_id: str + logical_date: datetime | None + start_date: datetime | None + end_date: datetime | None + data_interval_start: datetime | None + data_interval_end: datetime | None + last_scheduling_decision: datetime | None + run_type: DagRunType + state: DagRunState + external_trigger: bool + triggered_by: DagRunTriggeredByType + conf: dict + note: str | None diff --git a/airflow/api_fastapi/views/public/__init__.py b/airflow/api_fastapi/views/public/__init__.py index 4e02d9ab43bcf..9d90a0966802c 100644 --- a/airflow/api_fastapi/views/public/__init__.py +++ b/airflow/api_fastapi/views/public/__init__.py @@ -18,6 +18,7 @@ from __future__ import annotations from airflow.api_fastapi.views.public.connections import connections_router +from airflow.api_fastapi.views.public.dag_run import dag_run_router from airflow.api_fastapi.views.public.dags import dags_router from airflow.api_fastapi.views.public.variables import variables_router from airflow.api_fastapi.views.router import AirflowRouter @@ -28,3 +29,4 @@ public_router.include_router(dags_router) public_router.include_router(connections_router) public_router.include_router(variables_router) +public_router.include_router(dag_run_router) diff --git a/airflow/api_fastapi/views/public/dag_run.py b/airflow/api_fastapi/views/public/dag_run.py new file mode 100644 index 0000000000000..d39fb6f2f331c --- /dev/null +++ b/airflow/api_fastapi/views/public/dag_run.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from fastapi import Depends, HTTPException +from sqlalchemy import select +from sqlalchemy.orm import Session +from typing_extensions import Annotated + +from airflow.api_fastapi.db.common import get_session +from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.serializers.dag_run import DAGRunResponse +from airflow.api_fastapi.views.router import AirflowRouter +from airflow.models import DagRun + +dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns") + + +@dag_run_router.get("/{dag_run_id}", responses=create_openapi_http_exception_doc([401, 403, 404])) +async def get_dag_run( + dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)] +) -> DAGRunResponse: + dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) + if dag_run is None: + raise HTTPException( + 404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" + ) + + return DAGRunResponse.model_validate(dag_run, from_attributes=True) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index a4d65c69003f1..aaff196c0791d 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -4,6 +4,7 @@ import { UseQueryResult } from "@tanstack/react-query"; import { AssetService, ConnectionService, + DagRunService, DagService, DashboardService, VariableService, @@ -166,6 +167,24 @@ export const UseVariableServiceGetVariableKeyFn = ( }, queryKey?: Array, ) => [useVariableServiceGetVariableKey, ...(queryKey ?? [{ variableKey }])]; +export type DagRunServiceGetDagRunDefaultResponse = Awaited< + ReturnType +>; +export type DagRunServiceGetDagRunQueryResult< + TData = DagRunServiceGetDagRunDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagRunServiceGetDagRunKey = "DagRunServiceGetDagRun"; +export const UseDagRunServiceGetDagRunKeyFn = ( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: Array, +) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])]; export type DagServicePatchDagsMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 8bd691ca33be4..3e194302f4be0 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -4,6 +4,7 @@ import { type QueryClient } from "@tanstack/react-query"; import { AssetService, ConnectionService, + DagRunService, DagService, DashboardService, VariableService, @@ -206,3 +207,25 @@ export const prefetchUseVariableServiceGetVariable = ( queryKey: Common.UseVariableServiceGetVariableKeyFn({ variableKey }), queryFn: () => VariableService.getVariable({ variableKey }), }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagRunServiceGetDagRun = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }), + }); diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 51b8f4fb051d7..19bb17b342a84 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -9,6 +9,7 @@ import { import { AssetService, ConnectionService, + DagRunService, DagService, DashboardService, VariableService, @@ -263,6 +264,37 @@ export const useVariableServiceGetVariable = < queryFn: () => VariableService.getVariable({ variableKey }) as TData, ...options, }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetDagRun = < + TData = Common.DagRunServiceGetDagRunDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, + ...options, + }); /** * Patch Dags * Patch multiple DAGs. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index b437007468f20..79ad479f0a42f 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -4,6 +4,7 @@ import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query"; import { AssetService, ConnectionService, + DagRunService, DagService, DashboardService, VariableService, @@ -258,3 +259,34 @@ export const useVariableServiceGetVariableSuspense = < queryFn: () => VariableService.getVariable({ variableKey }) as TData, ...options, }); +/** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetDagRunSuspense = < + TData = Common.DagRunServiceGetDagRunDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagRunServiceGetDagRunKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, + ...options, + }); diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 8f76ebd13c40f..18df5284651b7 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -784,6 +784,145 @@ export const $DAGResponse = { description: "DAG serializer for responses.", } as const; +export const $DAGRunResponse = { + properties: { + run_id: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Run Id", + }, + dag_id: { + type: "string", + title: "Dag Id", + }, + logical_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Logical Date", + }, + start_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Start Date", + }, + end_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "End Date", + }, + data_interval_start: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Data Interval Start", + }, + data_interval_end: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Data Interval End", + }, + last_scheduling_decision: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Scheduling Decision", + }, + run_type: { + $ref: "#/components/schemas/DagRunType", + }, + state: { + $ref: "#/components/schemas/DagRunState", + }, + external_trigger: { + type: "boolean", + title: "External Trigger", + }, + triggered_by: { + $ref: "#/components/schemas/DagRunTriggeredByType", + }, + conf: { + type: "object", + title: "Conf", + }, + note: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Note", + }, + }, + type: "object", + required: [ + "run_id", + "dag_id", + "logical_date", + "start_date", + "end_date", + "data_interval_start", + "data_interval_end", + "last_scheduling_decision", + "run_type", + "state", + "external_trigger", + "triggered_by", + "conf", + "note", + ], + title: "DAGRunResponse", + description: "DAG Run serializer for responses.", +} as const; + export const $DAGRunStates = { properties: { queued: { @@ -845,6 +984,29 @@ so please ensure that their values always match the ones with the same name in TaskInstanceState.`, } as const; +export const $DagRunTriggeredByType = { + type: "string", + enum: [ + "cli", + "operator", + "rest_api", + "ui", + "test", + "timetable", + "dataset", + "backfill", + ], + title: "DagRunTriggeredByType", + description: "Class with TriggeredBy types for DagRun.", +} as const; + +export const $DagRunType = { + type: "string", + enum: ["backfill", "scheduled", "manual", "dataset_triggered"], + title: "DagRunType", + description: "Class with DagRun types.", +} as const; + export const $DagTagPydantic = { properties: { name: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 24fbb9c29c46a..9a126aef25fbc 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -25,6 +25,8 @@ import type { DeleteVariableResponse, GetVariableData, GetVariableResponse, + GetDagRunData, + GetDagRunResponse, } from "./types.gen"; export class AssetService { @@ -361,3 +363,32 @@ export class VariableService { }); } } + +export class DagRunService { + /** + * Get Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ + public static getDagRun( + data: GetDagRunData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } +} diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 368c981b9da1e..45bfa51aec9c4 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -120,6 +120,28 @@ export type DAGResponse = { readonly file_token: string; }; +/** + * DAG Run serializer for responses. + */ +export type DAGRunResponse = { + run_id: string | null; + dag_id: string; + logical_date: string | null; + start_date: string | null; + end_date: string | null; + data_interval_start: string | null; + data_interval_end: string | null; + last_scheduling_decision: string | null; + run_type: DagRunType; + state: DagRunState; + external_trigger: boolean; + triggered_by: DagRunTriggeredByType; + conf: { + [key: string]: unknown; + }; + note: string | null; +}; + /** * DAG Run States for responses. */ @@ -149,6 +171,28 @@ export type DAGRunTypes = { */ export type DagRunState = "queued" | "running" | "success" | "failed"; +/** + * Class with TriggeredBy types for DagRun. + */ +export type DagRunTriggeredByType = + | "cli" + | "operator" + | "rest_api" + | "ui" + | "test" + | "timetable" + | "dataset" + | "backfill"; + +/** + * Class with DagRun types. + */ +export type DagRunType = + | "backfill" + | "scheduled" + | "manual" + | "dataset_triggered"; + /** * Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API. */ @@ -304,6 +348,13 @@ export type GetVariableData = { export type GetVariableResponse = VariableResponse; +export type GetDagRunData = { + dagId: string; + dagRunId: string; +}; + +export type GetDagRunResponse = DAGRunResponse; + export type $OpenApiTs = { "/ui/next_run_assets/{dag_id}": { get: { @@ -580,4 +631,31 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}": { + get: { + req: GetDagRunData; + res: { + /** + * Successful Response + */ + 200: DAGRunResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; }; diff --git a/tests/api_fastapi/views/public/test_dag_run.py b/tests/api_fastapi/views/public/test_dag_run.py new file mode 100644 index 0000000000000..dab81907068e7 --- /dev/null +++ b/tests/api_fastapi/views/public/test_dag_run.py @@ -0,0 +1,137 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +from airflow.operators.empty import EmptyOperator +from airflow.utils.session import provide_session +from airflow.utils.state import DagRunState +from airflow.utils.types import DagRunTriggeredByType, DagRunType +from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags + +pytestmark = pytest.mark.db_test + +DAG1_ID = "test_dag1" +DAG2_ID = "test_dag2" +DAG1_RUN1_ID = "dag_run_1" +DAG1_RUN2_ID = "dag_run_2" +DAG2_RUN1_ID = "dag_run_3" +DAG2_RUN2_ID = "dag_run_4" +DAG1_RUN1_STATE = DagRunState.SUCCESS +DAG1_RUN2_STATE = DagRunState.FAILED +DAG2_RUN1_STATE = DagRunState.SUCCESS +DAG2_RUN2_STATE = DagRunState.SUCCESS +DAG1_RUN1_RUN_TYPE = DagRunType.MANUAL +DAG1_RUN2_RUN_TYPE = DagRunType.SCHEDULED +DAG2_RUN1_RUN_TYPE = DagRunType.BACKFILL_JOB +DAG2_RUN2_RUN_TYPE = DagRunType.DATASET_TRIGGERED +DAG1_RUN1_TRIGGERED_BY = DagRunTriggeredByType.UI +DAG1_RUN2_TRIGGERED_BY = DagRunTriggeredByType.DATASET +DAG2_RUN1_TRIGGERED_BY = DagRunTriggeredByType.CLI +DAG2_RUN2_TRIGGERED_BY = DagRunTriggeredByType.REST_API +START_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc) +EXECUTION_DATE = datetime(2024, 6, 16, 0, 0, tzinfo=timezone.utc) +DAG1_NOTE = "test_note" + + +@pytest.fixture(autouse=True) +@provide_session +def setup(dag_maker, session=None): + clear_db_runs() + clear_db_dags() + clear_db_serialized_dags() + + with dag_maker( + DAG1_ID, + schedule="@daily", + start_date=START_DATE, + ): + EmptyOperator(task_id="task_1") + dag1 = dag_maker.create_dagrun( + run_id=DAG1_RUN1_ID, + state=DAG1_RUN1_STATE, + run_type=DAG1_RUN1_RUN_TYPE, + triggered_by=DAG1_RUN1_TRIGGERED_BY, + ) + dag1.note = (DAG1_NOTE, 1) + + dag_maker.create_dagrun( + run_id=DAG1_RUN2_ID, + state=DAG1_RUN2_STATE, + run_type=DAG1_RUN2_RUN_TYPE, + triggered_by=DAG1_RUN2_TRIGGERED_BY, + execution_date=EXECUTION_DATE, + ) + + with dag_maker( + DAG2_ID, + schedule=None, + start_date=START_DATE, + ): + EmptyOperator(task_id="task_2") + dag_maker.create_dagrun( + run_id=DAG2_RUN1_ID, + state=DAG2_RUN1_STATE, + run_type=DAG2_RUN1_RUN_TYPE, + triggered_by=DAG2_RUN1_TRIGGERED_BY, + execution_date=EXECUTION_DATE, + ) + dag_maker.create_dagrun( + run_id=DAG2_RUN2_ID, + state=DAG2_RUN2_STATE, + run_type=DAG2_RUN2_RUN_TYPE, + triggered_by=DAG2_RUN2_TRIGGERED_BY, + execution_date=EXECUTION_DATE, + ) + + dag_maker.dagbag.sync_to_db() + dag_maker.dag_model + dag_maker.dag_model.has_task_concurrency_limits = True + session.merge(dag_maker.dag_model) + session.commit() + + +@pytest.mark.parametrize( + "dag_id, run_id, state, run_type, triggered_by, dag_run_note", + [ + (DAG1_ID, DAG1_RUN1_ID, DAG1_RUN1_STATE, DAG1_RUN1_RUN_TYPE, DAG1_RUN1_TRIGGERED_BY, DAG1_NOTE), + (DAG1_ID, DAG1_RUN2_ID, DAG1_RUN2_STATE, DAG1_RUN2_RUN_TYPE, DAG1_RUN2_TRIGGERED_BY, None), + (DAG2_ID, DAG2_RUN1_ID, DAG2_RUN1_STATE, DAG2_RUN1_RUN_TYPE, DAG2_RUN1_TRIGGERED_BY, None), + (DAG2_ID, DAG2_RUN2_ID, DAG2_RUN2_STATE, DAG2_RUN2_RUN_TYPE, DAG2_RUN2_TRIGGERED_BY, None), + ], +) +def test_get_dag_run(test_client, dag_id, run_id, state, run_type, triggered_by, dag_run_note): + response = test_client.get(f"/public/dags/{dag_id}/dagRuns/{run_id}") + assert response.status_code == 200 + body = response.json() + assert body["dag_id"] == dag_id + assert body["run_id"] == run_id + assert body["state"] == state + assert body["run_type"] == run_type + assert body["triggered_by"] == triggered_by.value + assert body["note"] == dag_run_note + + +def test_get_dag_run_not_found(test_client): + response = test_client.get(f"/public/dags/{DAG1_ID}/dagRuns/invalid") + assert response.status_code == 404 + body = response.json() + assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found"