Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Migrate GET Dag Run endpoint to FastAPI #42725

Merged
merged 25 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
95b6129
get dag_run init
rawwar Oct 4, 2024
d1e7681
add serializer
rawwar Oct 4, 2024
a8c3684
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 4, 2024
8c43a30
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 4, 2024
f8332d1
add types
rawwar Oct 4, 2024
6c382dc
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 4, 2024
939153e
add test
rawwar Oct 4, 2024
0633e4c
working tests
rawwar Oct 4, 2024
12ee3d2
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 4, 2024
c05dc0a
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 7, 2024
2ee6bbc
add note to DagRunResponse
rawwar Oct 7, 2024
3763684
add note
rawwar Oct 8, 2024
8d259df
move DagRunNotePydantic to api_fastapi
rawwar Oct 8, 2024
b884774
add test to test non Null note
rawwar Oct 8, 2024
0454687
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 8, 2024
9c641a7
Merge branch 'main' into kalyan/AIP-84/get_dag_run
rawwar Oct 8, 2024
5ddb577
Merge branch 'main' into kalyan/AIP-84/get_dag_run
rawwar Oct 8, 2024
5165ba4
Merge branch 'main' into kalyan/AIP-84/get_dag_run
rawwar Oct 8, 2024
56a574c
Update airflow/api_fastapi/views/public/dag_run.py
rawwar Oct 9, 2024
57d2d41
Update airflow/api_fastapi/views/public/dag_run.py
rawwar Oct 9, 2024
ac3ea2e
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 9, 2024
8aecafe
Merge branch 'kalyan/AIP-84/get_dag_run' of github.com:rawwar/airflow…
rawwar Oct 9, 2024
1afae1c
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 9, 2024
2bd6625
add 404 test
rawwar Oct 9, 2024
8ccf78f
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from airflow.models import DagModel, DagRun
from airflow.timetables.base import DataInterval
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -90,6 +91,7 @@ def delete_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSI
return NoContent, HTTPStatus.NO_CONTENT


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.RUN)
@provide_session
def get_dag_run(
Expand Down
153 changes: 153 additions & 0 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,56 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}:
get:
tags:
- DagRun
summary: Get Dag Run
operationId: get_dag_run
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
ConnectionResponse:
Expand Down Expand Up @@ -1047,6 +1097,87 @@ components:
- file_token
title: DAGResponse
description: DAG serializer for responses.
DAGRunResponse:
properties:
run_id:
anyOf:
- type: string
- type: 'null'
title: Run Id
dag_id:
type: string
title: Dag Id
logical_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date
start_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date
end_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date
data_interval_start:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval Start
data_interval_end:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval End
last_scheduling_decision:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Scheduling Decision
run_type:
$ref: '#/components/schemas/DagRunType'
state:
$ref: '#/components/schemas/DagRunState'
external_trigger:
type: boolean
title: External Trigger
triggered_by:
$ref: '#/components/schemas/DagRunTriggeredByType'
conf:
type: object
title: Conf
note:
anyOf:
- type: string
- type: 'null'
title: Note
type: object
required:
- run_id
- dag_id
- logical_date
- start_date
- end_date
- data_interval_start
- data_interval_end
- last_scheduling_decision
- run_type
- state
- external_trigger
- triggered_by
- conf
- note
title: DAGRunResponse
description: DAG Run serializer for responses.
DAGRunStates:
properties:
queued:
Expand Down Expand Up @@ -1107,6 +1238,28 @@ components:
so please ensure that their values always match the ones with the

same name in TaskInstanceState.'
DagRunTriggeredByType:
type: string
enum:
- cli
- operator
- rest_api
- ui
- test
- timetable
- dataset
- backfill
title: DagRunTriggeredByType
description: Class with TriggeredBy types for DagRun.
DagRunType:
type: string
enum:
- backfill
- scheduled
- manual
- dataset_triggered
title: DagRunType
description: Class with DagRun types.
DagTagPydantic:
properties:
name:
Expand Down
44 changes: 44 additions & 0 deletions airflow/api_fastapi/serializers/dag_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from datetime import datetime

from pydantic import BaseModel, Field

from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType


class DAGRunResponse(BaseModel):
"""DAG Run serializer for responses."""

dag_run_id: str | None = Field(alias="run_id")
dag_id: str
logical_date: datetime | None
start_date: datetime | None
end_date: datetime | None
data_interval_start: datetime | None
data_interval_end: datetime | None
last_scheduling_decision: datetime | None
run_type: DagRunType
state: DagRunState
external_trigger: bool
triggered_by: DagRunTriggeredByType
conf: dict
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
note: str | None
2 changes: 2 additions & 0 deletions airflow/api_fastapi/views/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

from airflow.api_fastapi.views.public.connections import connections_router
from airflow.api_fastapi.views.public.dag_run import dag_run_router
from airflow.api_fastapi.views.public.dags import dags_router
from airflow.api_fastapi.views.public.variables import variables_router
from airflow.api_fastapi.views.router import AirflowRouter
Expand All @@ -28,3 +29,4 @@
public_router.include_router(dags_router)
public_router.include_router(connections_router)
public_router.include_router(variables_router)
public_router.include_router(dag_run_router)
44 changes: 44 additions & 0 deletions airflow/api_fastapi/views/public/dag_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from fastapi import Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.db.common import get_session
from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.serializers.dag_run import DAGRunResponse
from airflow.api_fastapi.views.router import AirflowRouter
from airflow.models import DagRun

dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns")


@dag_run_router.get("/{dag_run_id}", responses=create_openapi_http_exception_doc([401, 403, 404]))
async def get_dag_run(
dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]
) -> DAGRunResponse:
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))
if dag_run is None:
raise HTTPException(
404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found"
)

return DAGRunResponse.model_validate(dag_run, from_attributes=True)
19 changes: 19 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { UseQueryResult } from "@tanstack/react-query";
import {
AssetService,
ConnectionService,
DagRunService,
DagService,
DashboardService,
VariableService,
Expand Down Expand Up @@ -150,6 +151,24 @@ export const UseVariableServiceGetVariableKeyFn = (
},
queryKey?: Array<unknown>,
) => [useVariableServiceGetVariableKey, ...(queryKey ?? [{ variableKey }])];
export type DagRunServiceGetDagRunDefaultResponse = Awaited<
ReturnType<typeof DagRunService.getDagRun>
>;
export type DagRunServiceGetDagRunQueryResult<
TData = DagRunServiceGetDagRunDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useDagRunServiceGetDagRunKey = "DagRunServiceGetDagRun";
export const UseDagRunServiceGetDagRunKeyFn = (
{
dagId,
dagRunId,
}: {
dagId: string;
dagRunId: string;
},
queryKey?: Array<unknown>,
) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])];
export type DagServicePatchDagsMutationResult = Awaited<
ReturnType<typeof DagService.patchDags>
>;
Expand Down
23 changes: 23 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { type QueryClient } from "@tanstack/react-query";
import {
AssetService,
ConnectionService,
DagRunService,
DagService,
DashboardService,
VariableService,
Expand Down Expand Up @@ -186,3 +187,25 @@ export const prefetchUseVariableServiceGetVariable = (
queryKey: Common.UseVariableServiceGetVariableKeyFn({ variableKey }),
queryFn: () => VariableService.getVariable({ variableKey }),
});
/**
* Get Dag Run
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagRunServiceGetDagRun = (
queryClient: QueryClient,
{
dagId,
dagRunId,
}: {
dagId: string;
dagRunId: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }),
});
Loading