Skip to content

Commit

Permalink
AIP-84 Migrate delete a connection to FastAPI API (#42571)
Browse files Browse the repository at this point in the history
* Include connections router and migrate delete a connection endpoint to fastapi

* Mark tests as db_test

* Use only pyfixture session

* make method async

* setup method to setup_attrs

* Convert APIRouter tags, make setup method unified

* Use AirflowRouter over fastapi.APIRouter
  • Loading branch information
bugraoz93 authored Oct 2, 2024
1 parent 9822060 commit 0c911a9
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 2 deletions.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/connection_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from airflow.secrets.environment_variables import CONN_ENV_PREFIX
from airflow.security import permissions
from airflow.utils import helpers
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.log.action_logger import action_event_from_permission
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.strings import get_random_string
Expand All @@ -53,6 +54,7 @@
RESOURCE_EVENT_PREFIX = "connection"


@mark_fastapi_migration_done
@security.requires_access_connection("DELETE")
@provide_session
@action_logging(
Expand Down
41 changes: 41 additions & 0 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,47 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/connections/{connection_id}:
delete:
tags:
- Connection
summary: Delete Connection
description: Delete a connection entry.
operationId: delete_connection
parameters:
- name: connection_id
in: path
required: true
schema:
type: string
title: Connection Id
responses:
'204':
description: Successful Response
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
DAGCollectionResponse:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/views/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

from __future__ import annotations

from airflow.api_fastapi.views.public.connections import connections_router
from airflow.api_fastapi.views.public.dags import dags_router
from airflow.api_fastapi.views.router import AirflowRouter

public_router = AirflowRouter(prefix="/public")


public_router.include_router(dags_router)
public_router.include_router(connections_router)
47 changes: 47 additions & 0 deletions airflow/api_fastapi/views/public/connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from fastapi import Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.db.common import get_session
from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.views.router import AirflowRouter
from airflow.models import Connection

connections_router = AirflowRouter(tags=["Connection"])


@connections_router.delete(
"/connections/{connection_id}",
status_code=204,
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
async def delete_connection(
connection_id: str,
session: Annotated[Session, Depends(get_session)],
):
"""Delete a connection entry."""
connection = session.scalar(select(Connection).filter_by(conn_id=connection_id))

if connection is None:
raise HTTPException(404, f"The Connection with connection_id: `{connection_id}` was not found")

session.delete(connection)
9 changes: 8 additions & 1 deletion airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// generated with @7nohe/[email protected]
import { UseQueryResult } from "@tanstack/react-query";

import { AssetService, DagService } from "../requests/services.gen";
import {
AssetService,
ConnectionService,
DagService,
} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";

export type AssetServiceNextRunAssetsDefaultResponse = Awaited<
Expand Down Expand Up @@ -76,3 +80,6 @@ export type DagServicePatchDagsMutationResult = Awaited<
export type DagServicePatchDagMutationResult = Awaited<
ReturnType<typeof DagService.patchDag>
>;
export type ConnectionServiceDeleteConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.deleteConnection>
>;
45 changes: 44 additions & 1 deletion airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import {
UseQueryOptions,
} from "@tanstack/react-query";

import { AssetService, DagService } from "../requests/services.gen";
import {
AssetService,
ConnectionService,
DagService,
} from "../requests/services.gen";
import { DAGPatchBody, DagRunState } from "../requests/types.gen";
import * as Common from "./common";

Expand Down Expand Up @@ -247,3 +251,42 @@ export const useDagServicePatchDag = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Delete Connection
* Delete a connection entry.
* @param data The data for the request.
* @param data.connectionId
* @returns void Successful Response
* @throws ApiError
*/
export const useConnectionServiceDeleteConnection = <
TData = Common.ConnectionServiceDeleteConnectionMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
connectionId: string;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
connectionId: string;
},
TContext
>({
mutationFn: ({ connectionId }) =>
ConnectionService.deleteConnection({
connectionId,
}) as unknown as Promise<TData>,
...options,
});
30 changes: 30 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import type {
PatchDagsResponse,
PatchDagData,
PatchDagResponse,
DeleteConnectionData,
DeleteConnectionResponse,
} from "./types.gen";

export class AssetService {
Expand Down Expand Up @@ -159,3 +161,31 @@ export class DagService {
});
}
}

export class ConnectionService {
/**
* Delete Connection
* Delete a connection entry.
* @param data The data for the request.
* @param data.connectionId
* @returns void Successful Response
* @throws ApiError
*/
public static deleteConnection(
data: DeleteConnectionData,
): CancelablePromise<DeleteConnectionResponse> {
return __request(OpenAPI, {
method: "DELETE",
url: "/public/connections/{connection_id}",
path: {
connection_id: data.connectionId,
},
errors: {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}
}
33 changes: 33 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ export type PatchDagData = {

export type PatchDagResponse = DAGResponse;

export type DeleteConnectionData = {
connectionId: string;
};

export type DeleteConnectionResponse = void;

export type $OpenApiTs = {
"/ui/next_run_datasets/{dag_id}": {
get: {
Expand Down Expand Up @@ -227,4 +233,31 @@ export type $OpenApiTs = {
};
};
};
"/public/connections/{connection_id}": {
delete: {
req: DeleteConnectionData;
res: {
/**
* Successful Response
*/
204: void;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
};
63 changes: 63 additions & 0 deletions tests/api_fastapi/views/public/test_connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import pytest

from airflow.models import Connection
from airflow.utils.session import provide_session
from tests.test_utils.db import clear_db_connections

pytestmark = pytest.mark.db_test

TEST_CONN_ID = "test_connection_id"
TEST_CONN_TYPE = "test_type"


@provide_session
def _create_connection(session) -> None:
connection_model = Connection(conn_id=TEST_CONN_ID, conn_type=TEST_CONN_TYPE)
session.add(connection_model)


class TestConnectionEndpoint:
@pytest.fixture(autouse=True)
def setup(self) -> None:
clear_db_connections(False)

def teardown_method(self) -> None:
clear_db_connections()

def create_connection(self):
_create_connection()


class TestDeleteConnection(TestConnectionEndpoint):
def test_delete_should_respond_204(self, test_client, session):
self.create_connection()
conns = session.query(Connection).all()
assert len(conns) == 1
response = test_client.delete(f"/public/connections/{TEST_CONN_ID}")
assert response.status_code == 204
connection = session.query(Connection).all()
assert len(connection) == 0

def test_delete_should_respond_404(self, test_client):
response = test_client.delete(f"/public/connections/{TEST_CONN_ID}")
assert response.status_code == 404
body = response.json()
assert f"The Connection with connection_id: `{TEST_CONN_ID}` was not found" == body["detail"]

0 comments on commit 0c911a9

Please sign in to comment.