Skip to content

Commit

Permalink
AIP-84 Migrate GET Dag Run endpoint to FastAPI (apache#42725)
Browse files Browse the repository at this point in the history
* get dag_run init

* add serializer

* Merge branch 'main' of https://github.com/apache/airflow into kalyan/AIP-84/get_dag_run

* add types

* add test

* working tests

* add note to DagRunResponse

* add note

* add test to test non Null note

* Update airflow/api_fastapi/views/public/dag_run.py

Co-authored-by: Pierre Jeambrun <[email protected]>

* Update airflow/api_fastapi/views/public/dag_run.py

Co-authored-by: Pierre Jeambrun <[email protected]>

* Merge branch 'main' of https://github.com/apache/airflow into kalyan/AIP-84/get_dag_run

* add 404 test

---------

Co-authored-by: Pierre Jeambrun <[email protected]>
  • Loading branch information
2 people authored and Lorin committed Oct 17, 2024
1 parent fe86dfa commit 78b5a01
Show file tree
Hide file tree
Showing 13 changed files with 759 additions and 0 deletions.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from airflow.models import DagModel, DagRun
from airflow.timetables.base import DataInterval
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -90,6 +91,7 @@ def delete_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSI
return NoContent, HTTPStatus.NO_CONTENT


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.RUN)
@provide_session
def get_dag_run(
Expand Down
153 changes: 153 additions & 0 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,56 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}:
get:
tags:
- DagRun
summary: Get Dag Run
operationId: get_dag_run
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
ConnectionResponse:
Expand Down Expand Up @@ -1097,6 +1147,87 @@ components:
- file_token
title: DAGResponse
description: DAG serializer for responses.
DAGRunResponse:
properties:
run_id:
anyOf:
- type: string
- type: 'null'
title: Run Id
dag_id:
type: string
title: Dag Id
logical_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date
start_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date
end_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date
data_interval_start:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval Start
data_interval_end:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval End
last_scheduling_decision:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Scheduling Decision
run_type:
$ref: '#/components/schemas/DagRunType'
state:
$ref: '#/components/schemas/DagRunState'
external_trigger:
type: boolean
title: External Trigger
triggered_by:
$ref: '#/components/schemas/DagRunTriggeredByType'
conf:
type: object
title: Conf
note:
anyOf:
- type: string
- type: 'null'
title: Note
type: object
required:
- run_id
- dag_id
- logical_date
- start_date
- end_date
- data_interval_start
- data_interval_end
- last_scheduling_decision
- run_type
- state
- external_trigger
- triggered_by
- conf
- note
title: DAGRunResponse
description: DAG Run serializer for responses.
DAGRunStates:
properties:
queued:
Expand Down Expand Up @@ -1157,6 +1288,28 @@ components:
so please ensure that their values always match the ones with the
same name in TaskInstanceState.'
DagRunTriggeredByType:
type: string
enum:
- cli
- operator
- rest_api
- ui
- test
- timetable
- dataset
- backfill
title: DagRunTriggeredByType
description: Class with TriggeredBy types for DagRun.
DagRunType:
type: string
enum:
- backfill
- scheduled
- manual
- dataset_triggered
title: DagRunType
description: Class with DagRun types.
DagTagPydantic:
properties:
name:
Expand Down
44 changes: 44 additions & 0 deletions airflow/api_fastapi/serializers/dag_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from datetime import datetime

from pydantic import BaseModel, Field

from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType


class DAGRunResponse(BaseModel):
"""DAG Run serializer for responses."""

dag_run_id: str | None = Field(alias="run_id")
dag_id: str
logical_date: datetime | None
start_date: datetime | None
end_date: datetime | None
data_interval_start: datetime | None
data_interval_end: datetime | None
last_scheduling_decision: datetime | None
run_type: DagRunType
state: DagRunState
external_trigger: bool
triggered_by: DagRunTriggeredByType
conf: dict
note: str | None
2 changes: 2 additions & 0 deletions airflow/api_fastapi/views/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

from airflow.api_fastapi.views.public.connections import connections_router
from airflow.api_fastapi.views.public.dag_run import dag_run_router
from airflow.api_fastapi.views.public.dags import dags_router
from airflow.api_fastapi.views.public.variables import variables_router
from airflow.api_fastapi.views.router import AirflowRouter
Expand All @@ -28,3 +29,4 @@
public_router.include_router(dags_router)
public_router.include_router(connections_router)
public_router.include_router(variables_router)
public_router.include_router(dag_run_router)
44 changes: 44 additions & 0 deletions airflow/api_fastapi/views/public/dag_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from fastapi import Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.db.common import get_session
from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.serializers.dag_run import DAGRunResponse
from airflow.api_fastapi.views.router import AirflowRouter
from airflow.models import DagRun

dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns")


@dag_run_router.get("/{dag_run_id}", responses=create_openapi_http_exception_doc([401, 403, 404]))
async def get_dag_run(
dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]
) -> DAGRunResponse:
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))
if dag_run is None:
raise HTTPException(
404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found"
)

return DAGRunResponse.model_validate(dag_run, from_attributes=True)
19 changes: 19 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { UseQueryResult } from "@tanstack/react-query";
import {
AssetService,
ConnectionService,
DagRunService,
DagService,
DashboardService,
VariableService,
Expand Down Expand Up @@ -166,6 +167,24 @@ export const UseVariableServiceGetVariableKeyFn = (
},
queryKey?: Array<unknown>,
) => [useVariableServiceGetVariableKey, ...(queryKey ?? [{ variableKey }])];
export type DagRunServiceGetDagRunDefaultResponse = Awaited<
ReturnType<typeof DagRunService.getDagRun>
>;
export type DagRunServiceGetDagRunQueryResult<
TData = DagRunServiceGetDagRunDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useDagRunServiceGetDagRunKey = "DagRunServiceGetDagRun";
export const UseDagRunServiceGetDagRunKeyFn = (
{
dagId,
dagRunId,
}: {
dagId: string;
dagRunId: string;
},
queryKey?: Array<unknown>,
) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])];
export type DagServicePatchDagsMutationResult = Awaited<
ReturnType<typeof DagService.patchDags>
>;
Expand Down
23 changes: 23 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { type QueryClient } from "@tanstack/react-query";
import {
AssetService,
ConnectionService,
DagRunService,
DagService,
DashboardService,
VariableService,
Expand Down Expand Up @@ -206,3 +207,25 @@ export const prefetchUseVariableServiceGetVariable = (
queryKey: Common.UseVariableServiceGetVariableKeyFn({ variableKey }),
queryFn: () => VariableService.getVariable({ variableKey }),
});
/**
* Get Dag Run
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagRunServiceGetDagRun = (
queryClient: QueryClient,
{
dagId,
dagRunId,
}: {
dagId: string;
dagRunId: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }),
});
Loading

0 comments on commit 78b5a01

Please sign in to comment.