Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Migrate patch dags to FastAPI API #42545

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def patch_dag(*, dag_id: str, update_mask: UpdateMask = None, session: Session =
return dag_schema.dump(dag)


@mark_fastapi_migration_done
@security.requires_access_dag("PUT")
@format_parameters({"limit": check_limit})
@action_logging
Expand Down
16 changes: 16 additions & 0 deletions airflow/api_fastapi/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
83 changes: 83 additions & 0 deletions airflow/api_fastapi/db/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, create_session, provide_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select

from airflow.api_fastapi.parameters import BaseParam


async def get_session() -> Session:
"""
Dependency for providing a session.

For non route function please use the :class:`airflow.utils.session.provide_session` decorator.

Example usage:

.. code:: python

@router.get("/your_path")
def your_route(session: Annotated[Session, Depends(get_session)]):
pass
"""
with create_session() as session:
yield session


def apply_filters_to_select(base_select: Select, filters: Sequence[BaseParam | None]) -> Select:
base_select = base_select
for filter in filters:
if filter is None:
continue
base_select = filter.to_orm(base_select)

return base_select


@provide_session
def paginated_select(
base_select: Select,
filters: Sequence[BaseParam],
order_by: BaseParam | None = None,
offset: BaseParam | None = None,
limit: BaseParam | None = None,
session: Session = NEW_SESSION,
) -> Select:
base_select = apply_filters_to_select(
base_select,
filters,
)

total_entries = get_query_count(base_select, session=session)

# TODO: Re-enable when permissions are handled. Readable / writable entities,
# for instance:
# readable_dags = get_auth_manager().get_permitted_dag_ids(user=g.user)
# dags_select = dags_select.where(DagModel.dag_id.in_(readable_dags))

base_select = apply_filters_to_select(base_select, [order_by, offset, limit])

return base_select, total_entries
54 changes: 18 additions & 36 deletions airflow/api_fastapi/db.py → airflow/api_fastapi/db/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,31 @@

from __future__ import annotations

from typing import TYPE_CHECKING

from sqlalchemy import func, select

from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
from airflow.utils.session import create_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select

from airflow.api_fastapi.parameters import BaseParam


async def get_session() -> Session:
"""
Dependency for providing a session.

For non route function please use the :class:`airflow.utils.session.provide_session` decorator.

Example usage:

.. code:: python

@router.get("/your_path")
def your_route(session: Annotated[Session, Depends(get_session)]):
pass
"""
with create_session() as session:
yield session


def apply_filters_to_select(base_select: Select, filters: list[BaseParam]) -> Select:
select = base_select
for filter in filters:
select = filter.to_orm(select)

return select


latest_dag_run_per_dag_id_cte = (
select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
.where()
.group_by(DagRun.dag_id)
.cte()
)


dags_select_with_latest_dag_run = (
select(DagModel)
.join(
latest_dag_run_per_dag_id_cte,
DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.join(
DagRun,
DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date
and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.order_by(DagModel.dag_id)
)
123 changes: 122 additions & 1 deletion airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,133 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
patch:
tags:
- DAG
summary: Patch Dags
description: Patch multiple DAGs.
operationId: patch_dags_public_dags_patch
parameters:
- name: update_mask
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
title: Update Mask
- name: limit
in: query
required: false
schema:
type: integer
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
default: 0
title: Offset
- name: tags
in: query
required: false
schema:
type: array
items:
type: string
title: Tags
- name: owners
in: query
required: false
schema:
type: array
items:
type: string
title: Owners
- name: dag_id_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Dag Id Pattern
- name: only_active
in: query
required: false
schema:
type: boolean
default: true
title: Only Active
- name: paused
in: query
required: false
schema:
anyOf:
- type: boolean
- type: 'null'
title: Paused
- name: last_dag_run_state
in: query
required: false
schema:
anyOf:
- $ref: '#/components/schemas/DagRunState'
- type: 'null'
title: Last Dag Run State
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAGPatchBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGCollectionResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}:
patch:
tags:
- DAG
summary: Patch Dag
description: Update the specific DAG.
description: Patch the specific DAG.
operationId: patch_dag_public_dags__dag_id__patch
parameters:
- name: dag_id
Expand Down
Loading