Skip to content

Commit

Permalink
Merge branch 'main' into 42559/aip-84/delete-connection-to-fast-api
Browse files Browse the repository at this point in the history
# Conflicts:
#	airflow/api_fastapi/views/public/__init__.py
#	airflow/ui/openapi-gen/requests/services.gen.ts
  • Loading branch information
bugraoz93 committed Oct 1, 2024
2 parents c5e598b + 05c43ee commit 7f1fb27
Show file tree
Hide file tree
Showing 107 changed files with 3,762 additions and 1,722 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/build-images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ run-name: >
Build images for ${{ github.event.pull_request.title }} ${{ github.event.pull_request._links.html.href }}
on: # yamllint disable-line rule:truthy
pull_request_target:
branches:
- main
- v2-10-stable
- v2-10-test
permissions:
# all other permissions are set to none
contents: read
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ celerybeat-schedule

# dotenv
.env
.env.local
.autoenv*.zsh

# virtualenv
Expand Down
10 changes: 5 additions & 5 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ paths:
tags:
- Asset
summary: Next Run Assets
operationId: next_run_assets_ui_next_run_datasets__dag_id__get
operationId: next_run_assets
parameters:
- name: dag_id
in: path
Expand All @@ -27,7 +27,7 @@ paths:
application/json:
schema:
type: object
title: Response Next Run Assets Ui Next Run Datasets Dag Id Get
title: Response Next Run Assets
'422':
description: Validation Error
content:
Expand All @@ -40,7 +40,7 @@ paths:
- DAG
summary: Get Dags
description: Get all DAGs.
operationId: get_dags_public_dags_get
operationId: get_dags
parameters:
- name: limit
in: query
Expand Down Expand Up @@ -136,7 +136,7 @@ paths:
- DAG
summary: Patch Dags
description: Patch multiple DAGs.
operationId: patch_dags_public_dags_patch
operationId: patch_dags
parameters:
- name: update_mask
in: query
Expand Down Expand Up @@ -258,7 +258,7 @@ paths:
- DAG
summary: Patch Dag
description: Patch the specific DAG.
operationId: patch_dag_public_dags__dag_id__patch
operationId: patch_dag
parameters:
- name: dag_id
in: path
Expand Down
7 changes: 2 additions & 5 deletions airflow/api_fastapi/views/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

from __future__ import annotations

from fastapi import APIRouter

from airflow.api_fastapi.views.public.connections import connections_router
from airflow.api_fastapi.views.public.dags import dags_router
from airflow.api_fastapi.views.router import AirflowRouter

public_router = APIRouter(prefix="/public")
public_router = AirflowRouter(prefix="/public")


public_router.include_router(dags_router)
public_router.include_router(connections_router)
5 changes: 3 additions & 2 deletions airflow/api_fastapi/views/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from __future__ import annotations

from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi import Depends, HTTPException, Query
from sqlalchemy import update
from sqlalchemy.orm import Session
from typing_extensions import Annotated
Expand All @@ -42,9 +42,10 @@
SortParam,
)
from airflow.api_fastapi.serializers.dags import DAGCollectionResponse, DAGPatchBody, DAGResponse
from airflow.api_fastapi.views.router import AirflowRouter
from airflow.models import DagModel

dags_router = APIRouter(tags=["DAG"])
dags_router = AirflowRouter(tags=["DAG"])


@dags_router.get("/dags")
Expand Down
93 changes: 93 additions & 0 deletions airflow/api_fastapi/views/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from enum import Enum
from typing import Any, Callable, Sequence

from fastapi import APIRouter, params
from fastapi.datastructures import Default
from fastapi.routing import APIRoute
from fastapi.types import DecoratedCallable, IncEx
from fastapi.utils import generate_unique_id
from starlette.responses import JSONResponse, Response
from starlette.routing import BaseRoute


class AirflowRouter(APIRouter):
"""Extends the FastAPI default router."""

def api_route(
self,
path: str,
*,
response_model: Any = Default(None),
status_code: int | None = None,
tags: list[str | Enum] | None = None,
dependencies: Sequence[params.Depends] | None = None,
summary: str | None = None,
description: str | None = None,
response_description: str = "Successful Response",
responses: dict[int | str, dict[str, Any]] | None = None,
deprecated: bool | None = None,
methods: list[str] | None = None,
operation_id: str | None = None,
response_model_include: IncEx | None = None,
response_model_exclude: IncEx | None = None,
response_model_by_alias: bool = True,
response_model_exclude_unset: bool = False,
response_model_exclude_defaults: bool = False,
response_model_exclude_none: bool = False,
include_in_schema: bool = True,
response_class: type[Response] = Default(JSONResponse),
name: str | None = None,
callbacks: list[BaseRoute] | None = None,
openapi_extra: dict[str, Any] | None = None,
generate_unique_id_function: Callable[[APIRoute], str] = Default(generate_unique_id),
) -> Callable[[DecoratedCallable], DecoratedCallable]:
def decorator(func: DecoratedCallable) -> DecoratedCallable:
self.add_api_route(
path,
func,
response_model=response_model,
status_code=status_code,
tags=tags,
dependencies=dependencies,
summary=summary,
description=description,
response_description=response_description,
responses=responses,
deprecated=deprecated,
methods=methods,
operation_id=operation_id or func.__name__,
response_model_include=response_model_include,
response_model_exclude=response_model_exclude,
response_model_by_alias=response_model_by_alias,
response_model_exclude_unset=response_model_exclude_unset,
response_model_exclude_defaults=response_model_exclude_defaults,
response_model_exclude_none=response_model_exclude_none,
include_in_schema=include_in_schema,
response_class=response_class,
name=name,
callbacks=callbacks,
openapi_extra=openapi_extra,
generate_unique_id_function=generate_unique_id_function,
)
return func

return decorator
5 changes: 2 additions & 3 deletions airflow/api_fastapi/views/ui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
# under the License.
from __future__ import annotations

from fastapi import APIRouter

from airflow.api_fastapi.views.router import AirflowRouter
from airflow.api_fastapi.views.ui.assets import assets_router

ui_router = APIRouter(prefix="/ui")
ui_router = AirflowRouter(prefix="/ui")

ui_router.include_router(assets_router)
5 changes: 3 additions & 2 deletions airflow/api_fastapi/views/ui/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

from __future__ import annotations

from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi import Depends, HTTPException, Request
from sqlalchemy import and_, func, select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.db.common import get_session
from airflow.api_fastapi.views.router import AirflowRouter
from airflow.models import DagModel
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel, DagScheduleAssetReference

assets_router = APIRouter(tags=["Asset"])
assets_router = AirflowRouter(tags=["Asset"])


@assets_router.get("/next_run_datasets/{dag_id}", include_in_schema=False)
Expand Down
7 changes: 6 additions & 1 deletion airflow/auth/managers/simple/simple_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,12 @@ def _is_authorized(
user = self.get_user()
if not user:
return False
role_str = user.get_role().upper()

user_role = user.get_role()
if not user_role:
return False

role_str = user_role.upper()
role = SimpleAuthManagerRole[role_str]
if role == SimpleAuthManagerRole.ADMIN:
return True
Expand Down
6 changes: 3 additions & 3 deletions airflow/auth/managers/simple/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ class SimpleAuthManagerUser(BaseUser):
User model for users managed by the simple auth manager.
:param username: The username
:param role: The role associated to the user
:param role: The role associated to the user. If not provided, the user has no permission
"""

def __init__(self, *, username: str, role: str) -> None:
def __init__(self, *, username: str, role: str | None) -> None:
self.username = username
self.role = role

Expand All @@ -37,5 +37,5 @@ def get_id(self) -> str:
def get_name(self) -> str:
return self.username

def get_role(self):
def get_role(self) -> str | None:
return self.role
5 changes: 3 additions & 2 deletions airflow/metrics/otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from opentelemetry.metrics import Observation
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics._internal.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
Expand All @@ -40,6 +40,7 @@
get_validator,
stat_name_otel_handler,
)
from airflow.utils.net import get_hostname

if TYPE_CHECKING:
from opentelemetry.metrics import Instrument
Expand Down Expand Up @@ -410,7 +411,7 @@ def get_otel_logger(cls) -> SafeOtelLogger:
debug = conf.getboolean("metrics", "otel_debugging_on")
service_name = conf.get("metrics", "otel_service")

resource = Resource(attributes={SERVICE_NAME: service_name})
resource = Resource.create(attributes={HOST_NAME: get_hostname(), SERVICE_NAME: service_name})

protocol = "https" if ssl_active else "http"
endpoint = f"{protocol}://{host}:{port}/v1/metrics"
Expand Down
52 changes: 52 additions & 0 deletions airflow/migrations/versions/0034_3_0_0_update_user_id_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Update dag_run_note.user_id and task_instance_note.user_id columns to String.
Revision ID: 44eabb1904b4
Revises: 16cbcb1c8c36
Create Date: 2024-09-27 09:57:29.830521
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "44eabb1904b4"
down_revision = "16cbcb1c8c36"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
with op.batch_alter_table("dag_run_note") as batch_op:
batch_op.alter_column("user_id", type_=sa.String(length=128))
with op.batch_alter_table("task_instance_note") as batch_op:
batch_op.alter_column("user_id", type_=sa.String(length=128))


def downgrade():
with op.batch_alter_table("dag_run_note") as batch_op:
batch_op.alter_column("user_id", type_=sa.Integer(), postgresql_using="user_id::integer")
with op.batch_alter_table("task_instance_note") as batch_op:
batch_op.alter_column("user_id", type_=sa.Integer(), postgresql_using="user_id::integer")
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
also rename the one on DatasetAliasModel here for consistency.
Revision ID: 0d9e73a75ee4
Revises: 16cbcb1c8c36
Revises: 44eabb1904b4
Create Date: 2024-08-13 09:45:32.213222
"""

Expand All @@ -42,7 +42,7 @@

# revision identifiers, used by Alembic.
revision = "0d9e73a75ee4"
down_revision = "16cbcb1c8c36"
down_revision = "44eabb1904b4"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1687,7 +1687,7 @@ class DagRunNote(Base):

__tablename__ = "dag_run_note"

user_id = Column(Integer, nullable=True)
user_id = Column(String(128), nullable=True)
dag_run_id = Column(Integer, primary_key=True, nullable=False)
content = Column(String(1000).with_variant(Text(1000), "mysql"))
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4002,7 +4002,7 @@ class TaskInstanceNote(TaskInstanceDependencies):

__tablename__ = "task_instance_note"

user_id = Column(Integer, nullable=True)
user_id = Column(String(128), nullable=True)
task_id = Column(StringID(), primary_key=True, nullable=False)
dag_id = Column(StringID(), primary_key=True, nullable=False)
run_id = Column(StringID(), primary_key=True, nullable=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ def requires_authentication(function: T):

@wraps(function)
def decorated(*args, **kwargs):
if auth_current_user() is not None or current_app.appbuilder.get_app.config.get(
"AUTH_ROLE_PUBLIC", None
):
if auth_current_user() is not None or current_app.config.get("AUTH_ROLE_PUBLIC", None):
return function(*args, **kwargs)
else:
return Response("Unauthorized", 401, {"WWW-Authenticate": "Basic"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def requires_authentication(function: T, find_user: Callable[[str], BaseUser] |

@wraps(function)
def decorated(*args, **kwargs):
if current_app.appbuilder.get_app.config.get("AUTH_ROLE_PUBLIC", None):
if current_app.config.get("AUTH_ROLE_PUBLIC", None):
response = function(*args, **kwargs)
return make_response(response)

Expand Down
Loading

0 comments on commit 7f1fb27

Please sign in to comment.