From 440180c9d8c187ada41e7bd63456aac3c6b805bf Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Mon, 28 Oct 2024 21:19:54 +0530 Subject: [PATCH] Migrate public endpoint Get DAG Stats to FastAPI, with main resynced --- .../core_api/openapi/v1-generated.yaml | 98 +++++ .../core_api/routes/public/__init__.py | 2 + .../core_api/routes/public/dag_stats.py | 80 ++++ .../core_api/serializers/dag_stats.py | 43 ++ airflow/ui/openapi-gen/queries/common.ts | 17 + airflow/ui/openapi-gen/queries/prefetch.ts | 21 + airflow/ui/openapi-gen/queries/queries.ts | 27 ++ airflow/ui/openapi-gen/queries/suspense.ts | 27 ++ .../ui/openapi-gen/requests/schemas.gen.ts | 56 +++ .../ui/openapi-gen/requests/services.gen.ts | 31 ++ airflow/ui/openapi-gen/requests/types.gen.ts | 61 +++ .../core_api/routes/public/test_dag_stats.py | 414 ++++++++++++++++++ 12 files changed, 877 insertions(+) create mode 100644 airflow/api_fastapi/core_api/routes/public/dag_stats.py create mode 100644 airflow/api_fastapi/core_api/serializers/dag_stats.py create mode 100644 tests/api_fastapi/core_api/routes/public/test_dag_stats.py diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 136d6a4ed0271..f86ffa90382c1 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1506,6 +1506,59 @@ paths: application/json: schema: $ref: '#/components/schemas/VersionInfo' + /public/dagStats/: + get: + tags: + - DagStats + summary: Get Dag Stats + description: Get Dag statistics. + operationId: get_dag_stats + parameters: + - name: dag_ids + in: query + required: false + schema: + type: array + items: + type: string + title: Dag Ids + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DagStatsCollectionResponse' + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unprocessable Entity components: schemas: AppBuilderMenuItemResponse: @@ -2400,6 +2453,51 @@ components: - asset_triggered title: DagRunType description: Class with DagRun types. + DagStatsCollectionResponse: + properties: + dags: + items: + $ref: '#/components/schemas/DagStatsResponse' + type: array + title: Dags + total_entries: + type: integer + title: Total Entries + type: object + required: + - dags + - total_entries + title: DagStatsCollectionResponse + description: DAG Stats Collection serializer for responses. + DagStatsResponse: + properties: + dag_id: + type: string + title: Dag Id + stats: + items: + $ref: '#/components/schemas/DagStatsStateResponse' + type: array + title: Stats + type: object + required: + - dag_id + - stats + title: DagStatsResponse + description: DAG Stats serializer for responses. + DagStatsStateResponse: + properties: + state: + $ref: '#/components/schemas/DagRunState' + count: + type: integer + title: Count + type: object + required: + - state + - count + title: DagStatsStateResponse + description: DagStatsState serializer for responses. DagTagPydantic: properties: name: diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py b/airflow/api_fastapi/core_api/routes/public/__init__.py index ab307409add0a..d5bcee35237ce 100644 --- a/airflow/api_fastapi/core_api/routes/public/__init__.py +++ b/airflow/api_fastapi/core_api/routes/public/__init__.py @@ -20,6 +20,7 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.routes.public.connections import connections_router from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router +from airflow.api_fastapi.core_api.routes.public.dag_stats import dag_stats_router from airflow.api_fastapi.core_api.routes.public.dags import dags_router from airflow.api_fastapi.core_api.routes.public.monitor import monitor_router from airflow.api_fastapi.core_api.routes.public.plugins import plugins_router @@ -40,3 +41,4 @@ public_router.include_router(providers_router) public_router.include_router(plugins_router) public_router.include_router(version_router) +public_router.include_router(dag_stats_router) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_stats.py b/airflow/api_fastapi/core_api/routes/public/dag_stats.py new file mode 100644 index 0000000000000..8acecd0ad72e8 --- /dev/null +++ b/airflow/api_fastapi/core_api/routes/public/dag_stats.py @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from fastapi import Depends, Query +from sqlalchemy import func, select +from sqlalchemy.orm import Session +from typing_extensions import Annotated + +from airflow.api_fastapi.common.db.common import ( + get_session, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.serializers.dag_stats import ( + DagStatsCollectionResponse, +) +from airflow.models.dagrun import DagRun +from airflow.utils.state import DagRunState + +dag_stats_router = AirflowRouter(tags=["DagStats"], prefix="/dagStats") + + +@dag_stats_router.get( + "/", + responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]), +) +async def get_dag_stats( + session: Annotated[Session, Depends(get_session)], + dag_ids: list[str] = Query(None), +) -> DagStatsCollectionResponse: + """Get Dag statistics.""" + query = select( + DagRun.dag_id, + DagRun.state, + func.count(DagRun.state), + ) + if dag_ids: + query = query.where(DagRun.dag_id.in_(dag_ids)) + else: + dag_ids = [] + query = query.group_by(DagRun.dag_id, DagRun.state).order_by(DagRun.dag_id) + query_result = session.execute(query) + + result_dag_ids = [] + dag_state_data = {} + for dag_id, state, count in query_result: + dag_state_data[(dag_id, state)] = count + if dag_id not in result_dag_ids: + result_dag_ids.append(dag_id) + + dags = [ + { + "dag_id": dag_id, + "stats": [ + { + "state": state, + "count": dag_state_data.get((dag_id, state), 0), + } + for state in DagRunState + ], + } + for dag_id in result_dag_ids + ] + return DagStatsCollectionResponse(dags=dags, total_entries=len(dags)) diff --git a/airflow/api_fastapi/core_api/serializers/dag_stats.py b/airflow/api_fastapi/core_api/serializers/dag_stats.py new file mode 100644 index 0000000000000..0d768c2cbac07 --- /dev/null +++ b/airflow/api_fastapi/core_api/serializers/dag_stats.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from pydantic import BaseModel + +from airflow.utils.state import DagRunState + + +class DagStatsStateResponse(BaseModel): + """DagStatsState serializer for responses.""" + + state: DagRunState + count: int + + +class DagStatsResponse(BaseModel): + """DAG Stats serializer for responses.""" + + dag_id: str + stats: list[DagStatsStateResponse] + + +class DagStatsCollectionResponse(BaseModel): + """DAG Stats Collection serializer for responses.""" + + dags: list[DagStatsResponse] + total_entries: int diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 5fa46e4e91f09..e385157f54c0f 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -6,6 +6,7 @@ import { ConnectionService, DagRunService, DagService, + DagStatsService, DagsService, DashboardService, MonitorService, @@ -409,6 +410,22 @@ export const UseVersionServiceGetVersionKeyFn = (queryKey?: Array) => [ useVersionServiceGetVersionKey, ...(queryKey ?? []), ]; +export type DagStatsServiceGetDagStatsDefaultResponse = Awaited< + ReturnType +>; +export type DagStatsServiceGetDagStatsQueryResult< + TData = DagStatsServiceGetDagStatsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagStatsServiceGetDagStatsKey = "DagStatsServiceGetDagStats"; +export const UseDagStatsServiceGetDagStatsKeyFn = ( + { + dagIds, + }: { + dagIds?: string[]; + } = {}, + queryKey?: Array, +) => [useDagStatsServiceGetDagStatsKey, ...(queryKey ?? [{ dagIds }])]; export type VariableServicePostVariableMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 72b4376751f7f..5815e57ea5869 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -6,6 +6,7 @@ import { ConnectionService, DagRunService, DagService, + DagStatsService, DagsService, DashboardService, MonitorService, @@ -512,3 +513,23 @@ export const prefetchUseVersionServiceGetVersion = (queryClient: QueryClient) => queryKey: Common.UseVersionServiceGetVersionKeyFn(), queryFn: () => VersionService.getVersion(), }); +/** + * Get Dag Stats + * Get Dag statistics. + * @param data The data for the request. + * @param data.dagIds + * @returns DagStatsCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagStatsServiceGetDagStats = ( + queryClient: QueryClient, + { + dagIds, + }: { + dagIds?: string[]; + } = {}, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }), + queryFn: () => DagStatsService.getDagStats({ dagIds }), + }); diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index ce319bc6cd676..3972ec98729c5 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -11,6 +11,7 @@ import { ConnectionService, DagRunService, DagService, + DagStatsService, DagsService, DashboardService, MonitorService, @@ -663,6 +664,32 @@ export const useVersionServiceGetVersion = < queryFn: () => VersionService.getVersion() as TData, ...options, }); +/** + * Get Dag Stats + * Get Dag statistics. + * @param data The data for the request. + * @param data.dagIds + * @returns DagStatsCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagStatsServiceGetDagStats = < + TData = Common.DagStatsServiceGetDagStatsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagIds, + }: { + dagIds?: string[]; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }, queryKey), + queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData, + ...options, + }); /** * Post Variable * Create a variable. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index cd7bc95fa5b59..726c43e95a72f 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -6,6 +6,7 @@ import { ConnectionService, DagRunService, DagService, + DagStatsService, DagsService, DashboardService, MonitorService, @@ -652,3 +653,29 @@ export const useVersionServiceGetVersionSuspense = < queryFn: () => VersionService.getVersion() as TData, ...options, }); +/** + * Get Dag Stats + * Get Dag statistics. + * @param data The data for the request. + * @param data.dagIds + * @returns DagStatsCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagStatsServiceGetDagStatsSuspense = < + TData = Common.DagStatsServiceGetDagStatsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagIds, + }: { + dagIds?: string[]; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }, queryKey), + queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData, + ...options, + }); diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 4bbf448ca8eec..26977fd935083 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1431,6 +1431,62 @@ export const $DagRunType = { description: "Class with DagRun types.", } as const; +export const $DagStatsCollectionResponse = { + properties: { + dags: { + items: { + $ref: "#/components/schemas/DagStatsResponse", + }, + type: "array", + title: "Dags", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["dags", "total_entries"], + title: "DagStatsCollectionResponse", + description: "DAG Stats Collection serializer for responses.", +} as const; + +export const $DagStatsResponse = { + properties: { + dag_id: { + type: "string", + title: "Dag Id", + }, + stats: { + items: { + $ref: "#/components/schemas/DagStatsStateResponse", + }, + type: "array", + title: "Stats", + }, + }, + type: "object", + required: ["dag_id", "stats"], + title: "DagStatsResponse", + description: "DAG Stats serializer for responses.", +} as const; + +export const $DagStatsStateResponse = { + properties: { + state: { + $ref: "#/components/schemas/DagRunState", + }, + count: { + type: "integer", + title: "Count", + }, + }, + type: "object", + required: ["state", "count"], + title: "DagStatsStateResponse", + description: "DagStatsState serializer for responses.", +} as const; + export const $DagTagPydantic = { properties: { name: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 08e93457c4f67..3ce67b453c40a 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -59,6 +59,8 @@ import type { GetPluginsData, GetPluginsResponse, GetVersionResponse, + GetDagStatsData, + GetDagStatsResponse, } from "./types.gen"; export class AssetService { @@ -894,3 +896,32 @@ export class VersionService { }); } } + +export class DagStatsService { + /** + * Get Dag Stats + * Get Dag statistics. + * @param data The data for the request. + * @param data.dagIds + * @returns DagStatsCollectionResponse Successful Response + * @throws ApiError + */ + public static getDagStats( + data: GetDagStatsData = {}, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dagStats/", + query: { + dag_ids: data.dagIds, + }, + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Unprocessable Entity", + }, + }); + } +} diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index e3df16ea82980..57c85d9c2d8e7 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -286,6 +286,30 @@ export type DagRunType = | "manual" | "asset_triggered"; +/** + * DAG Stats Collection serializer for responses. + */ +export type DagStatsCollectionResponse = { + dags: Array; + total_entries: number; +}; + +/** + * DAG Stats serializer for responses. + */ +export type DagStatsResponse = { + dag_id: string; + stats: Array; +}; + +/** + * DagStatsState serializer for responses. + */ +export type DagStatsStateResponse = { + state: DagRunState; + count: number; +}; + /** * Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API. */ @@ -715,6 +739,12 @@ export type GetPluginsResponse = PluginCollectionResponse; export type GetVersionResponse = VersionInfo; +export type GetDagStatsData = { + dagIds?: Array; +}; + +export type GetDagStatsResponse = DagStatsCollectionResponse; + export type $OpenApiTs = { "/ui/next_run_assets/{dag_id}": { get: { @@ -1397,4 +1427,35 @@ export type $OpenApiTs = { }; }; }; + "/public/dagStats/": { + get: { + req: GetDagStatsData; + res: { + /** + * Successful Response + */ + 200: DagStatsCollectionResponse; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Unprocessable Entity + */ + 422: HTTPExceptionResponse; + }; + }; + }; }; diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_stats.py b/tests/api_fastapi/core_api/routes/public/test_dag_stats.py new file mode 100644 index 0000000000000..a6a2e7289ce44 --- /dev/null +++ b/tests/api_fastapi/core_api/routes/public/test_dag_stats.py @@ -0,0 +1,414 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime, timedelta + +import pytest + +from airflow.models.dag import DagModel +from airflow.models.dagrun import DagRun +from airflow.utils import timezone +from airflow.utils.state import DagRunState +from airflow.utils.types import DagRunType + +from tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags + +pytestmark = pytest.mark.db_test + +DAG1_ID = "test_dag1" +DAG2_ID = "test_dag2" +DAG3_ID = "test_dag3" +TASK_ID = "op1" +API_PREFIX = "/public/dagStats" + + +class TestDagStatsEndpoint: + default_time = "2020-06-11T18:00:00+00:00" + + @staticmethod + def _clear_db(): + clear_db_runs() + clear_db_dags() + clear_db_serialized_dags() + + def _create_dag_and_runs(self, session=None): + dag_1 = DagModel( + dag_id=DAG1_ID, + fileloc="/tmp/dag_stats_1.py", + timetable_summary="2 2 * * *", + is_active=False, + is_paused=True, + owners="test_owner,another_test_owner", + next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + ) + dag_1_run_1 = DagRun( + dag_id=DAG1_ID, + run_id="test_dag_run_id_1", + run_type=DagRunType.MANUAL, + execution_date=timezone.parse(self.default_time), + start_date=timezone.parse(self.default_time), + external_trigger=True, + state="running", + ) + dag_1_run_2 = DagRun( + dag_id=dag_1.dag_id, + run_id="test_dag_run_id_2", + run_type=DagRunType.MANUAL, + execution_date=timezone.parse(self.default_time) + timedelta(days=1), + start_date=timezone.parse(self.default_time), + external_trigger=True, + state="failed", + ) + dag_2 = DagModel( + dag_id=DAG2_ID, + fileloc="/tmp/dag_stats_2.py", + timetable_summary="2 2 * * *", + is_active=False, + is_paused=True, + owners="test_owner,another_test_owner", + next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + ) + dag_2_run_1 = DagRun( + dag_id=dag_2.dag_id, + run_id="test_dag_2_run_id_1", + run_type=DagRunType.MANUAL, + execution_date=timezone.parse(self.default_time), + start_date=timezone.parse(self.default_time), + external_trigger=True, + state="queued", + ) + dag_3 = DagModel( + dag_id=DAG3_ID, + fileloc="/tmp/dag_stats_3.py", + timetable_summary="2 2 * * *", + is_active=False, + is_paused=True, + owners="test_owner,another_test_owner", + next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + ) + dag_3_run_1 = DagRun( + dag_id=dag_3.dag_id, + run_id="test_dag_3_run_id_1", + run_type=DagRunType.MANUAL, + execution_date=timezone.parse(self.default_time), + start_date=timezone.parse(self.default_time), + external_trigger=True, + state="success", + ) + entities = ( + dag_1, + dag_1_run_1, + dag_1_run_2, + dag_2, + dag_2_run_1, + dag_3, + dag_3_run_1, + ) + session.add_all(entities) + session.commit() + + @pytest.fixture(autouse=True) + def setup(self) -> None: + self._clear_db() + + def teardown_method(self) -> None: + self._clear_db() + + +class TestGetDagStats(TestDagStatsEndpoint): + """Unit tests for Get DAG Stats.""" + + def test_should_respond_200(self, client, session): + self._create_dag_and_runs(session) + exp_payload = { + "dags": [ + { + "dag_id": DAG1_ID, + "stats": [ + { + "state": DagRunState.QUEUED, + "count": 0, + }, + { + "state": DagRunState.RUNNING, + "count": 1, + }, + { + "state": DagRunState.SUCCESS, + "count": 0, + }, + { + "state": DagRunState.FAILED, + "count": 1, + }, + ], + }, + { + "dag_id": DAG2_ID, + "stats": [ + { + "state": DagRunState.QUEUED, + "count": 1, + }, + { + "state": DagRunState.RUNNING, + "count": 0, + }, + { + "state": DagRunState.SUCCESS, + "count": 0, + }, + { + "state": DagRunState.FAILED, + "count": 0, + }, + ], + }, + ], + "total_entries": 2, + } + + response = client().get(f"{API_PREFIX}?dag_ids={DAG1_ID}&dag_ids={DAG2_ID}") + assert response.status_code == 200 + res_json = response.json() + assert res_json["total_entries"] == len(res_json["dags"]) + assert res_json["dags"] == exp_payload["dags"] + assert res_json["total_entries"] == exp_payload["total_entries"] + + def test_all_dags_should_respond_200(self, client, session): + self._create_dag_and_runs(session) + exp_payload = { + "dags": [ + { + "dag_id": DAG1_ID, + "stats": [ + { + "state": DagRunState.QUEUED, + "count": 0, + }, + { + "state": DagRunState.RUNNING, + "count": 1, + }, + { + "state": DagRunState.SUCCESS, + "count": 0, + }, + { + "state": DagRunState.FAILED, + "count": 1, + }, + ], + }, + { + "dag_id": DAG2_ID, + "stats": [ + { + "state": DagRunState.QUEUED, + "count": 1, + }, + { + "state": DagRunState.RUNNING, + "count": 0, + }, + { + "state": DagRunState.SUCCESS, + "count": 0, + }, + { + "state": DagRunState.FAILED, + "count": 0, + }, + ], + }, + { + "dag_id": DAG3_ID, + "stats": [ + { + "state": DagRunState.QUEUED, + "count": 0, + }, + { + "state": DagRunState.RUNNING, + "count": 0, + }, + { + "state": DagRunState.SUCCESS, + "count": 1, + }, + { + "state": DagRunState.FAILED, + "count": 0, + }, + ], + }, + ], + "total_entries": 3, + } + + response = client().get(API_PREFIX) + assert response.status_code == 200 + res_json = response.json() + assert res_json["total_entries"] == len(res_json["dags"]) + assert res_json["dags"] == exp_payload["dags"] + assert res_json["total_entries"] == exp_payload["total_entries"] + + @pytest.mark.parametrize( + "url, exp_payload", + [ + ( + f"{API_PREFIX}?dag_ids={DAG1_ID}&dag_ids={DAG2_ID}&dag_ids={DAG3_ID}", + { + "dag_ids": [DAG1_ID, DAG2_ID, DAG3_ID], + "dags": [ + { + "dag_id": DAG1_ID, + "stats": [ + { + "state": DagRunState.QUEUED, + "count": 0, + }, + { + "state": DagRunState.RUNNING, + "count": 1, + }, + { + "state": DagRunState.SUCCESS, + "count": 0, + }, + { + "state": DagRunState.FAILED, + "count": 1, + }, + ], + }, + { + "dag_id": DAG2_ID, + "stats": [ + { + "state": DagRunState.QUEUED, + "count": 1, + }, + { + "state": DagRunState.RUNNING, + "count": 0, + }, + { + "state": DagRunState.SUCCESS, + "count": 0, + }, + { + "state": DagRunState.FAILED, + "count": 0, + }, + ], + }, + { + "dag_id": DAG3_ID, + "stats": [ + { + "state": DagRunState.QUEUED, + "count": 0, + }, + { + "state": DagRunState.RUNNING, + "count": 0, + }, + { + "state": DagRunState.SUCCESS, + "count": 1, + }, + { + "state": DagRunState.FAILED, + "count": 0, + }, + ], + }, + ], + "total_entries": 3, + }, + ), + ( + f"{API_PREFIX}?dag_ids={DAG1_ID}", + { + "dag_ids": [DAG1_ID], + "dags": [ + { + "dag_id": DAG1_ID, + "stats": [ + { + "state": DagRunState.QUEUED, + "count": 0, + }, + { + "state": DagRunState.RUNNING, + "count": 1, + }, + { + "state": DagRunState.SUCCESS, + "count": 0, + }, + { + "state": DagRunState.FAILED, + "count": 1, + }, + ], + } + ], + "total_entries": 1, + }, + ), + ( + f"{API_PREFIX}?dag_ids={DAG3_ID}", + { + "dags": [ + { + "dag_id": DAG3_ID, + "stats": [ + { + "state": DagRunState.QUEUED, + "count": 0, + }, + { + "state": DagRunState.RUNNING, + "count": 0, + }, + { + "state": DagRunState.SUCCESS, + "count": 1, + }, + { + "state": DagRunState.FAILED, + "count": 0, + }, + ], + }, + ], + "total_entries": 1, + }, + ), + ], + ) + def test_single_dag_in_dag_ids(self, client, session, url, exp_payload): + self._create_dag_and_runs(session) + response = client().get(url) + assert response.status_code == 200 + res_json = response.json() + assert res_json["total_entries"] == len(res_json["dags"]) + assert res_json["dags"] == exp_payload["dags"] + assert res_json["total_entries"] == exp_payload["total_entries"]