Skip to content

Commit

Permalink
AIP-84 Migrate the public endpoint DAG Details to FastAPI (#42631)
Browse files Browse the repository at this point in the history
* Migrate the public endpoint DAG Details to FastAPI

* Add comment for computed_field, remove unused import

* Update pendulum import path

* Re-run breeze static checks

* Add openapi-gen for DAG Details API

* Resolve review comments, test has_task_concurrency_limits

* Remove unused import

* Use lambda for aliases, re-run breeze static checks

* Remove duplicate entry

* Use simpler approach for alias
  • Loading branch information
omkar-foss authored Oct 3, 2024
1 parent db6eb16 commit b5712e7
Show file tree
Hide file tree
Showing 12 changed files with 1,075 additions and 11 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def get_dag(
)


@mark_fastapi_migration_done
@security.requires_access_dag("GET")
@provide_session
def get_dag_details(
Expand Down
288 changes: 288 additions & 0 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,57 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/details:
get:
tags:
- DAG
summary: Get Dag Details
description: Get details of DAG.
operationId: get_dag_details
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGDetailsResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/dags/{dag_id}:
patch:
tags:
Expand Down Expand Up @@ -378,6 +429,243 @@ components:
- total_entries
title: DAGCollectionResponse
description: DAG Collection serializer for responses.
DAGDetailsResponse:
properties:
dag_id:
type: string
title: Dag Id
dag_display_name:
type: string
title: Dag Display Name
is_paused:
type: boolean
title: Is Paused
is_active:
type: boolean
title: Is Active
last_parsed_time:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Parsed Time
last_pickled:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Pickled
last_expired:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Expired
scheduler_lock:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Scheduler Lock
pickle_id:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Pickle Id
default_view:
anyOf:
- type: string
- type: 'null'
title: Default View
fileloc:
type: string
title: Fileloc
description:
anyOf:
- type: string
- type: 'null'
title: Description
timetable_summary:
anyOf:
- type: string
- type: 'null'
title: Timetable Summary
timetable_description:
anyOf:
- type: string
- type: 'null'
title: Timetable Description
tags:
items:
$ref: '#/components/schemas/DagTagPydantic'
type: array
title: Tags
max_active_tasks:
type: integer
title: Max Active Tasks
max_active_runs:
anyOf:
- type: integer
- type: 'null'
title: Max Active Runs
max_consecutive_failed_dag_runs:
type: integer
title: Max Consecutive Failed Dag Runs
has_task_concurrency_limits:
type: boolean
title: Has Task Concurrency Limits
has_import_errors:
type: boolean
title: Has Import Errors
next_dagrun:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun
next_dagrun_data_interval_start:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun Data Interval Start
next_dagrun_data_interval_end:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun Data Interval End
next_dagrun_create_after:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun Create After
owners:
items:
type: string
type: array
title: Owners
catchup:
type: boolean
title: Catchup
dag_run_timeout:
anyOf:
- type: string
format: duration
- type: 'null'
title: Dag Run Timeout
dataset_expression:
anyOf:
- type: object
- type: 'null'
title: Dataset Expression
doc_md:
anyOf:
- type: string
- type: 'null'
title: Doc Md
start_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date
end_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date
is_paused_upon_creation:
anyOf:
- type: boolean
- type: 'null'
title: Is Paused Upon Creation
orientation:
type: string
title: Orientation
params:
anyOf:
- type: object
- type: 'null'
title: Params
render_template_as_native_obj:
type: boolean
title: Render Template As Native Obj
template_search_path:
anyOf:
- items:
type: string
type: array
- type: 'null'
title: Template Search Path
timezone:
anyOf:
- type: string
- type: 'null'
title: Timezone
last_parsed:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Parsed
file_token:
type: string
title: File Token
description: Return file token.
readOnly: true
concurrency:
type: integer
title: Concurrency
description: Return max_active_tasks as concurrency.
readOnly: true
type: object
required:
- dag_id
- dag_display_name
- is_paused
- is_active
- last_parsed_time
- last_pickled
- last_expired
- scheduler_lock
- pickle_id
- default_view
- fileloc
- description
- timetable_summary
- timetable_description
- tags
- max_active_tasks
- max_active_runs
- max_consecutive_failed_dag_runs
- has_task_concurrency_limits
- has_import_errors
- next_dagrun
- next_dagrun_data_interval_start
- next_dagrun_data_interval_end
- next_dagrun_create_after
- owners
- catchup
- dag_run_timeout
- dataset_expression
- doc_md
- start_date
- end_date
- is_paused_upon_creation
- orientation
- params
- render_template_as_native_obj
- template_search_path
- timezone
- last_parsed
- file_token
- concurrency
title: DAGDetailsResponse
description: Specific serializer for DAG Details responses.
DAGPatchBody:
properties:
is_paused:
Expand Down
61 changes: 59 additions & 2 deletions airflow/api_fastapi/serializers/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

from __future__ import annotations

from datetime import datetime
from typing import Any
from collections import abc
from datetime import datetime, timedelta
from typing import Any, Iterable

from itsdangerous import URLSafeSerializer
from pendulum.tz.timezone import FixedTimezone, Timezone
from pydantic import (
AliasChoices,
BaseModel,
Field,
computed_field,
field_validator,
)
Expand Down Expand Up @@ -93,3 +97,56 @@ class DAGCollectionResponse(BaseModel):

dags: list[DAGResponse]
total_entries: int


class DAGDetailsResponse(DAGResponse):
"""Specific serializer for DAG Details responses."""

catchup: bool
dag_run_timeout: timedelta | None = Field(
validation_alias=AliasChoices("dag_run_timeout", "dagrun_timeout")
)
dataset_expression: dict | None
doc_md: str | None
start_date: datetime | None
end_date: datetime | None
is_paused_upon_creation: bool | None
orientation: str
params: abc.MutableMapping | None
render_template_as_native_obj: bool
template_search_path: Iterable[str] | None = Field(
validation_alias=AliasChoices("template_search_path", "template_searchpath")
)
timezone: str | None
last_parsed: datetime | None = Field(validation_alias=AliasChoices("last_parsed", "last_loaded"))

@field_validator("timezone", mode="before")
@classmethod
def get_timezone(cls, tz: Timezone | FixedTimezone) -> str | None:
"""Convert timezone attribute to string representation."""
if tz is None:
return None
return str(tz)

@field_validator("timetable_summary", mode="before")
@classmethod
def get_timetable_summary(cls, tts: str | None) -> str | None:
"""Validate the string representation of timetable_summary."""
if tts is None or tts == "None":
return None
return str(tts)

@field_validator("params", mode="before")
@classmethod
def get_params(cls, params: abc.MutableMapping | None) -> dict | None:
"""Convert params attribute to dict representation."""
if params is None:
return None
return {param_name: param_val.dump() for param_name, param_val in params.items()}

# Mypy issue https://github.com/python/mypy/issues/1362
@computed_field # type: ignore[misc]
@property
def concurrency(self) -> int:
"""Return max_active_tasks as concurrency."""
return self.max_active_tasks
Loading

0 comments on commit b5712e7

Please sign in to comment.