Skip to content

Commit

Permalink
AIP-84 Migrate patch dags to FastAPI API (#42545)
Browse files Browse the repository at this point in the history
* AIP-84 Migrate patch dags to FastAPI API

* Fix CI
  • Loading branch information
pierrejeambrun authored Sep 30, 2024
1 parent 97c7d2c commit 7af76b8
Show file tree
Hide file tree
Showing 13 changed files with 596 additions and 105 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def patch_dag(*, dag_id: str, update_mask: UpdateMask = None, session: Session =
return dag_schema.dump(dag)


@mark_fastapi_migration_done
@security.requires_access_dag("PUT")
@format_parameters({"limit": check_limit})
@action_logging
Expand Down
16 changes: 16 additions & 0 deletions airflow/api_fastapi/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
83 changes: 83 additions & 0 deletions airflow/api_fastapi/db/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, create_session, provide_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select

from airflow.api_fastapi.parameters import BaseParam


async def get_session() -> Session:
"""
Dependency for providing a session.
For non route function please use the :class:`airflow.utils.session.provide_session` decorator.
Example usage:
.. code:: python
@router.get("/your_path")
def your_route(session: Annotated[Session, Depends(get_session)]):
pass
"""
with create_session() as session:
yield session


def apply_filters_to_select(base_select: Select, filters: Sequence[BaseParam | None]) -> Select:
base_select = base_select
for filter in filters:
if filter is None:
continue
base_select = filter.to_orm(base_select)

return base_select


@provide_session
def paginated_select(
base_select: Select,
filters: Sequence[BaseParam],
order_by: BaseParam | None = None,
offset: BaseParam | None = None,
limit: BaseParam | None = None,
session: Session = NEW_SESSION,
) -> Select:
base_select = apply_filters_to_select(
base_select,
filters,
)

total_entries = get_query_count(base_select, session=session)

# TODO: Re-enable when permissions are handled. Readable / writable entities,
# for instance:
# readable_dags = get_auth_manager().get_permitted_dag_ids(user=g.user)
# dags_select = dags_select.where(DagModel.dag_id.in_(readable_dags))

base_select = apply_filters_to_select(base_select, [order_by, offset, limit])

return base_select, total_entries
54 changes: 18 additions & 36 deletions airflow/api_fastapi/db.py → airflow/api_fastapi/db/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,31 @@

from __future__ import annotations

from typing import TYPE_CHECKING

from sqlalchemy import func, select

from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
from airflow.utils.session import create_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select

from airflow.api_fastapi.parameters import BaseParam


async def get_session() -> Session:
"""
Dependency for providing a session.
For non route function please use the :class:`airflow.utils.session.provide_session` decorator.
Example usage:
.. code:: python
@router.get("/your_path")
def your_route(session: Annotated[Session, Depends(get_session)]):
pass
"""
with create_session() as session:
yield session


def apply_filters_to_select(base_select: Select, filters: list[BaseParam]) -> Select:
select = base_select
for filter in filters:
select = filter.to_orm(select)

return select


latest_dag_run_per_dag_id_cte = (
select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
.where()
.group_by(DagRun.dag_id)
.cte()
)


dags_select_with_latest_dag_run = (
select(DagModel)
.join(
latest_dag_run_per_dag_id_cte,
DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.join(
DagRun,
DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date
and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.order_by(DagModel.dag_id)
)
123 changes: 122 additions & 1 deletion airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,133 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
patch:
tags:
- DAG
summary: Patch Dags
description: Patch multiple DAGs.
operationId: patch_dags_public_dags_patch
parameters:
- name: update_mask
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
title: Update Mask
- name: limit
in: query
required: false
schema:
type: integer
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
default: 0
title: Offset
- name: tags
in: query
required: false
schema:
type: array
items:
type: string
title: Tags
- name: owners
in: query
required: false
schema:
type: array
items:
type: string
title: Owners
- name: dag_id_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Dag Id Pattern
- name: only_active
in: query
required: false
schema:
type: boolean
default: true
title: Only Active
- name: paused
in: query
required: false
schema:
anyOf:
- type: boolean
- type: 'null'
title: Paused
- name: last_dag_run_state
in: query
required: false
schema:
anyOf:
- $ref: '#/components/schemas/DagRunState'
- type: 'null'
title: Last Dag Run State
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAGPatchBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGCollectionResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}:
patch:
tags:
- DAG
summary: Patch Dag
description: Update the specific DAG.
description: Patch the specific DAG.
operationId: patch_dag_public_dags__dag_id__patch
parameters:
- name: dag_id
Expand Down
Loading

0 comments on commit 7af76b8

Please sign in to comment.