Skip to content

Commit

Permalink
Migrate public endpoint Get DAG Stats to FastAPI, with main resynced
Browse files Browse the repository at this point in the history
  • Loading branch information
omkar-foss committed Oct 28, 2024
1 parent b5b00ef commit 440180c
Show file tree
Hide file tree
Showing 12 changed files with 877 additions and 0 deletions.
98 changes: 98 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,59 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/VersionInfo'
/public/dagStats/:
get:
tags:
- DagStats
summary: Get Dag Stats
description: Get Dag statistics.
operationId: get_dag_stats
parameters:
- name: dag_ids
in: query
required: false
schema:
type: array
items:
type: string
title: Dag Ids
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DagStatsCollectionResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
components:
schemas:
AppBuilderMenuItemResponse:
Expand Down Expand Up @@ -2400,6 +2453,51 @@ components:
- asset_triggered
title: DagRunType
description: Class with DagRun types.
DagStatsCollectionResponse:
properties:
dags:
items:
$ref: '#/components/schemas/DagStatsResponse'
type: array
title: Dags
total_entries:
type: integer
title: Total Entries
type: object
required:
- dags
- total_entries
title: DagStatsCollectionResponse
description: DAG Stats Collection serializer for responses.
DagStatsResponse:
properties:
dag_id:
type: string
title: Dag Id
stats:
items:
$ref: '#/components/schemas/DagStatsStateResponse'
type: array
title: Stats
type: object
required:
- dag_id
- stats
title: DagStatsResponse
description: DAG Stats serializer for responses.
DagStatsStateResponse:
properties:
state:
$ref: '#/components/schemas/DagRunState'
count:
type: integer
title: Count
type: object
required:
- state
- count
title: DagStatsStateResponse
description: DagStatsState serializer for responses.
DagTagPydantic:
properties:
name:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.routes.public.connections import connections_router
from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
from airflow.api_fastapi.core_api.routes.public.dag_stats import dag_stats_router
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.monitor import monitor_router
from airflow.api_fastapi.core_api.routes.public.plugins import plugins_router
Expand All @@ -40,3 +41,4 @@
public_router.include_router(providers_router)
public_router.include_router(plugins_router)
public_router.include_router(version_router)
public_router.include_router(dag_stats_router)
80 changes: 80 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/dag_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from fastapi import Depends, Query
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import (
get_session,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_stats import (
DagStatsCollectionResponse,
)
from airflow.models.dagrun import DagRun
from airflow.utils.state import DagRunState

dag_stats_router = AirflowRouter(tags=["DagStats"], prefix="/dagStats")


@dag_stats_router.get(
"/",
responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]),
)
async def get_dag_stats(
session: Annotated[Session, Depends(get_session)],
dag_ids: list[str] = Query(None),
) -> DagStatsCollectionResponse:
"""Get Dag statistics."""
query = select(
DagRun.dag_id,
DagRun.state,
func.count(DagRun.state),
)
if dag_ids:
query = query.where(DagRun.dag_id.in_(dag_ids))
else:
dag_ids = []
query = query.group_by(DagRun.dag_id, DagRun.state).order_by(DagRun.dag_id)
query_result = session.execute(query)

result_dag_ids = []
dag_state_data = {}
for dag_id, state, count in query_result:
dag_state_data[(dag_id, state)] = count
if dag_id not in result_dag_ids:
result_dag_ids.append(dag_id)

dags = [
{
"dag_id": dag_id,
"stats": [
{
"state": state,
"count": dag_state_data.get((dag_id, state), 0),
}
for state in DagRunState
],
}
for dag_id in result_dag_ids
]
return DagStatsCollectionResponse(dags=dags, total_entries=len(dags))
43 changes: 43 additions & 0 deletions airflow/api_fastapi/core_api/serializers/dag_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from pydantic import BaseModel

from airflow.utils.state import DagRunState


class DagStatsStateResponse(BaseModel):
"""DagStatsState serializer for responses."""

state: DagRunState
count: int


class DagStatsResponse(BaseModel):
"""DAG Stats serializer for responses."""

dag_id: str
stats: list[DagStatsStateResponse]


class DagStatsCollectionResponse(BaseModel):
"""DAG Stats Collection serializer for responses."""

dags: list[DagStatsResponse]
total_entries: int
17 changes: 17 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
ConnectionService,
DagRunService,
DagService,
DagStatsService,
DagsService,
DashboardService,
MonitorService,
Expand Down Expand Up @@ -409,6 +410,22 @@ export const UseVersionServiceGetVersionKeyFn = (queryKey?: Array<unknown>) => [
useVersionServiceGetVersionKey,
...(queryKey ?? []),
];
export type DagStatsServiceGetDagStatsDefaultResponse = Awaited<
ReturnType<typeof DagStatsService.getDagStats>
>;
export type DagStatsServiceGetDagStatsQueryResult<
TData = DagStatsServiceGetDagStatsDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useDagStatsServiceGetDagStatsKey = "DagStatsServiceGetDagStats";
export const UseDagStatsServiceGetDagStatsKeyFn = (
{
dagIds,
}: {
dagIds?: string[];
} = {},
queryKey?: Array<unknown>,
) => [useDagStatsServiceGetDagStatsKey, ...(queryKey ?? [{ dagIds }])];
export type VariableServicePostVariableMutationResult = Awaited<
ReturnType<typeof VariableService.postVariable>
>;
Expand Down
21 changes: 21 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
ConnectionService,
DagRunService,
DagService,
DagStatsService,
DagsService,
DashboardService,
MonitorService,
Expand Down Expand Up @@ -512,3 +513,23 @@ export const prefetchUseVersionServiceGetVersion = (queryClient: QueryClient) =>
queryKey: Common.UseVersionServiceGetVersionKeyFn(),
queryFn: () => VersionService.getVersion(),
});
/**
* Get Dag Stats
* Get Dag statistics.
* @param data The data for the request.
* @param data.dagIds
* @returns DagStatsCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagStatsServiceGetDagStats = (
queryClient: QueryClient,
{
dagIds,
}: {
dagIds?: string[];
} = {},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }),
queryFn: () => DagStatsService.getDagStats({ dagIds }),
});
27 changes: 27 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
ConnectionService,
DagRunService,
DagService,
DagStatsService,
DagsService,
DashboardService,
MonitorService,
Expand Down Expand Up @@ -663,6 +664,32 @@ export const useVersionServiceGetVersion = <
queryFn: () => VersionService.getVersion() as TData,
...options,
});
/**
* Get Dag Stats
* Get Dag statistics.
* @param data The data for the request.
* @param data.dagIds
* @returns DagStatsCollectionResponse Successful Response
* @throws ApiError
*/
export const useDagStatsServiceGetDagStats = <
TData = Common.DagStatsServiceGetDagStatsDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagIds,
}: {
dagIds?: string[];
} = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }, queryKey),
queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData,
...options,
});
/**
* Post Variable
* Create a variable.
Expand Down
27 changes: 27 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
ConnectionService,
DagRunService,
DagService,
DagStatsService,
DagsService,
DashboardService,
MonitorService,
Expand Down Expand Up @@ -652,3 +653,29 @@ export const useVersionServiceGetVersionSuspense = <
queryFn: () => VersionService.getVersion() as TData,
...options,
});
/**
* Get Dag Stats
* Get Dag statistics.
* @param data The data for the request.
* @param data.dagIds
* @returns DagStatsCollectionResponse Successful Response
* @throws ApiError
*/
export const useDagStatsServiceGetDagStatsSuspense = <
TData = Common.DagStatsServiceGetDagStatsDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagIds,
}: {
dagIds?: string[];
} = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }, queryKey),
queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData,
...options,
});
Loading

0 comments on commit 440180c

Please sign in to comment.