From e95d08b75b03d469866058e2e4150c4a1797ff2f Mon Sep 17 00:00:00 2001 From: Amogh Date: Tue, 12 Nov 2024 20:16:40 +0530 Subject: [PATCH 01/13] AIP-84: Migrating GET queued asset events for DAG to fastAPI --- .../api_connexion/endpoints/asset_endpoint.py | 2 + .../api_fastapi/core_api/datamodels/dags.py | 15 ++++ .../core_api/openapi/v1-generated.yaml | 86 +++++++++++++++++++ .../core_api/routes/public/dags.py | 47 ++++++++++ airflow/ui/openapi-gen/queries/common.ts | 22 +++++ airflow/ui/openapi-gen/queries/prefetch.ts | 26 ++++++ airflow/ui/openapi-gen/queries/queries.ts | 33 +++++++ airflow/ui/openapi-gen/queries/suspense.ts | 33 +++++++ .../ui/openapi-gen/requests/schemas.gen.ts | 42 +++++++++ .../ui/openapi-gen/requests/services.gen.ts | 32 +++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 51 +++++++++++ 11 files changed, 389 insertions(+) diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index 1ea1db2b3bbb..ebb14f27c413 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -47,6 +47,7 @@ from airflow.assets.manager import asset_manager from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel from airflow.utils import timezone +from airflow.utils.api_migration import mark_fastapi_migration_done from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session from airflow.www.decorators import action_logging @@ -218,6 +219,7 @@ def delete_dag_asset_queued_event( ) +@mark_fastapi_migration_done @security.requires_access_asset("GET") @security.requires_access_dag("GET") @provide_session diff --git a/airflow/api_fastapi/core_api/datamodels/dags.py b/airflow/api_fastapi/core_api/datamodels/dags.py index 27cc3ad47356..415367174855 100644 --- a/airflow/api_fastapi/core_api/datamodels/dags.py +++ b/airflow/api_fastapi/core_api/datamodels/dags.py @@ -159,3 +159,18 @@ class DAGTagCollectionResponse(BaseModel): tags: list[str] total_entries: int + + +class QueuedEventResponse(BaseModel): + """QueuedEvent serializer for responses..""" + + uri: str + dag_id: str + created_at: datetime + + +class QueuedEventCollectionResponse(BaseModel): + """QueuedEventCollection serializer for responses.""" + + queued_events: list[QueuedEventResponse] + total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 1f8831b4eeae..a03eafcfdec0 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -959,6 +959,57 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/assets/queuedEvent: + get: + tags: + - DAG + summary: Get Dag Asset Queued Events + description: Get queued asset events for a DAG. + operationId: get_dag_asset_queued_events + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: before + in: query + required: false + schema: + type: string + title: Before + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/QueuedEventCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/connections/{connection_id}: delete: tags: @@ -4780,6 +4831,41 @@ components: - version title: ProviderResponse description: Provider serializer for responses. + QueuedEventCollectionResponse: + properties: + queued_events: + items: + $ref: '#/components/schemas/QueuedEventResponse' + type: array + title: Queued Events + total_entries: + type: integer + title: Total Entries + type: object + required: + - queued_events + - total_entries + title: QueuedEventCollectionResponse + description: QueuedEventCollection serializer for responses. + QueuedEventResponse: + properties: + uri: + type: string + title: Uri + dag_id: + type: string + title: Dag Id + created_at: + type: string + format: date-time + title: Created At + type: object + required: + - uri + - dag_id + - created_at + title: QueuedEventResponse + description: QueuedEvent serializer for responses.. ReprocessBehavior: type: string enum: diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow/api_fastapi/core_api/routes/public/dags.py index 6027c1d0e4c1..b490a03fe17a 100644 --- a/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow/api_fastapi/core_api/routes/public/dags.py @@ -50,10 +50,14 @@ DAGPatchBody, DAGResponse, DAGTagCollectionResponse, + QueuedEventCollectionResponse, + QueuedEventResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.exceptions import AirflowException, DagNotFound from airflow.models import DAG, DagModel, DagTag +from airflow.models.asset import AssetDagRunQueue, AssetModel +from airflow.utils import timezone dags_router = AirflowRouter(tags=["DAG"], prefix="/dags") @@ -314,3 +318,46 @@ def delete_dag( status.HTTP_409_CONFLICT, f"Task instances of dag with id: '{dag_id}' are still running" ) return Response(status_code=status.HTTP_204_NO_CONTENT) + + +@dags_router.get( + "/{dag_id}/assets/queuedEvent", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_401_UNAUTHORIZED, + status.HTTP_403_FORBIDDEN, + status.HTTP_404_NOT_FOUND, + ] + ), +) +def get_dag_asset_queued_events( + dag_id: str, + session: Annotated[Session, Depends(get_session)], + # move it to DateTimeQuery + before: str = Query(None), +) -> QueuedEventCollectionResponse: + """Get queued asset events for a DAG.""" + where_clause = [AssetDagRunQueue.target_dag_id == dag_id] + if before: + before_parsed = timezone.parse(before) + where_clause.append(AssetDagRunQueue.created_at < before_parsed) + query = ( + select(AssetDagRunQueue, AssetModel.uri) + .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) + .where(*where_clause) + ) + result = session.execute(query).all() + total_entries = len(result) + if not result: + raise HTTPException(status.HTTP_400_BAD_REQUEST, f"Queue event with dag_id: `{dag_id}` was not found") + queued_events = [ + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) + for adrq, uri in result + ] + return QueuedEventCollectionResponse( + queued_events=[ + QueuedEventResponse.model_validate(queued_event, from_attributes=True) + for queued_event in queued_events + ], + total_entries=total_entries, + ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 5e9d12a78d9b..16c389b1c354 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -262,6 +262,28 @@ export const UseDagServiceGetDagDetailsKeyFn = ( }, queryKey?: Array, ) => [useDagServiceGetDagDetailsKey, ...(queryKey ?? [{ dagId }])]; +export type DagServiceGetDagAssetQueuedEventsDefaultResponse = Awaited< + ReturnType +>; +export type DagServiceGetDagAssetQueuedEventsQueryResult< + TData = DagServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagServiceGetDagAssetQueuedEventsKey = + "DagServiceGetDagAssetQueuedEvents"; +export const UseDagServiceGetDagAssetQueuedEventsKeyFn = ( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: Array, +) => [ + useDagServiceGetDagAssetQueuedEventsKey, + ...(queryKey ?? [{ before, dagId }]), +]; export type ConnectionServiceGetConnectionDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index ad690171afc0..2e30ba203b68 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -339,6 +339,32 @@ export const prefetchUseDagServiceGetDagDetails = ( queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }), queryFn: () => DagService.getDagDetails({ dagId }), }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagServiceGetDagAssetQueuedEvents = ( + queryClient: QueryClient, + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagServiceGetDagAssetQueuedEventsKeyFn({ + before, + dagId, + }), + queryFn: () => DagService.getDagAssetQueuedEvents({ before, dagId }), + }); /** * Get Connection * Get a connection entry. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index e20730af0d6f..08c956826796 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -419,6 +419,39 @@ export const useDagServiceGetDagDetails = < queryFn: () => DagService.getDagDetails({ dagId }) as TData, ...options, }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagServiceGetDagAssetQueuedEvents = < + TData = Common.DagServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagServiceGetDagAssetQueuedEventsKeyFn( + { before, dagId }, + queryKey, + ), + queryFn: () => + DagService.getDagAssetQueuedEvents({ before, dagId }) as TData, + ...options, + }); /** * Get Connection * Get a connection entry. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index ca4fca9c15c2..f1fa966f241a 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -404,6 +404,39 @@ export const useDagServiceGetDagDetailsSuspense = < queryFn: () => DagService.getDagDetails({ dagId }) as TData, ...options, }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagServiceGetDagAssetQueuedEventsSuspense = < + TData = Common.DagServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagServiceGetDagAssetQueuedEventsKeyFn( + { before, dagId }, + queryKey, + ), + queryFn: () => + DagService.getDagAssetQueuedEvents({ before, dagId }) as TData, + ...options, + }); /** * Get Connection * Get a connection entry. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index dc630be5dd1e..0f40c09b93ac 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2426,6 +2426,48 @@ export const $ProviderResponse = { description: "Provider serializer for responses.", } as const; +export const $QueuedEventCollectionResponse = { + properties: { + queued_events: { + items: { + $ref: "#/components/schemas/QueuedEventResponse", + }, + type: "array", + title: "Queued Events", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["queued_events", "total_entries"], + title: "QueuedEventCollectionResponse", + description: "QueuedEventCollection serializer for responses.", +} as const; + +export const $QueuedEventResponse = { + properties: { + uri: { + type: "string", + title: "Uri", + }, + dag_id: { + type: "string", + title: "Dag Id", + }, + created_at: { + type: "string", + format: "date-time", + title: "Created At", + }, + }, + type: "object", + required: ["uri", "dag_id", "created_at"], + title: "QueuedEventResponse", + description: "QueuedEvent serializer for responses..", +} as const; + export const $ReprocessBehavior = { type: "string", enum: ["failed", "completed", "none"], diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 39c3e8d7bc1c..132dfc1571f8 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -35,6 +35,8 @@ import type { DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, + GetDagAssetQueuedEventsData, + GetDagAssetQueuedEventsResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, @@ -593,6 +595,36 @@ export class DagService { }, }); } + + /** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ + public static getDagAssetQueuedEvents( + data: GetDagAssetQueuedEventsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/assets/queuedEvent", + path: { + dag_id: data.dagId, + }, + query: { + before: data.before, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class ConnectionService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index d38dda245bcb..c9c6c9a3f8c9 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -578,6 +578,23 @@ export type ProviderResponse = { version: string; }; +/** + * QueuedEventCollection serializer for responses. + */ +export type QueuedEventCollectionResponse = { + queued_events: Array; + total_entries: number; +}; + +/** + * QueuedEvent serializer for responses.. + */ +export type QueuedEventResponse = { + uri: string; + dag_id: string; + created_at: string; +}; + /** * Internal enum for setting reprocess behavior in a backfill. * @@ -910,6 +927,13 @@ export type GetDagDetailsData = { export type GetDagDetailsResponse = DAGDetailsResponse; +export type GetDagAssetQueuedEventsData = { + before?: string; + dagId: string; +}; + +export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse; + export type DeleteConnectionData = { connectionId: string; }; @@ -1624,6 +1648,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/assets/queuedEvent": { + get: { + req: GetDagAssetQueuedEventsData; + res: { + /** + * Successful Response + */ + 200: QueuedEventCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/connections/{connection_id}": { delete: { req: DeleteConnectionData; From 79430764b004ef1c5cbb90a2cc02e9aec54a60ec Mon Sep 17 00:00:00 2001 From: Amogh Date: Wed, 13 Nov 2024 13:51:55 +0530 Subject: [PATCH 02/13] adding test cases --- airflow/api_fastapi/common/parameters.py | 6 +- .../core_api/routes/public/dags.py | 3 +- .../core_api/routes/public/test_dags.py | 70 +++++++++++++++++++ 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index c1d7624b37ab..5f7b919e86d4 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -29,7 +29,11 @@ from airflow.api_connexion.endpoints.task_instance_endpoint import _convert_ti_states from airflow.models import Base, Connection -from airflow.models.asset import AssetModel, DagScheduleAssetReference, TaskOutletAssetReference +from airflow.models.asset import ( + AssetModel, + DagScheduleAssetReference, + TaskOutletAssetReference, +) from airflow.models.dag import DagModel, DagTag from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning, DagWarningType diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow/api_fastapi/core_api/routes/public/dags.py index b490a03fe17a..33ced9d83410 100644 --- a/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow/api_fastapi/core_api/routes/public/dags.py @@ -333,7 +333,6 @@ def delete_dag( def get_dag_asset_queued_events( dag_id: str, session: Annotated[Session, Depends(get_session)], - # move it to DateTimeQuery before: str = Query(None), ) -> QueuedEventCollectionResponse: """Get queued asset events for a DAG.""" @@ -349,7 +348,7 @@ def get_dag_asset_queued_events( result = session.execute(query).all() total_entries = len(result) if not result: - raise HTTPException(status.HTTP_400_BAD_REQUEST, f"Queue event with dag_id: `{dag_id}` was not found") + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") queued_events = [ QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) for adrq, uri in result diff --git a/tests/api_fastapi/core_api/routes/public/test_dags.py b/tests/api_fastapi/core_api/routes/public/test_dags.py index f913fd36e4bb..2f91dc1b0064 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dags.py +++ b/tests/api_fastapi/core_api/routes/public/test_dags.py @@ -17,13 +17,17 @@ from __future__ import annotations from datetime import datetime, timezone +from typing import Generator import pendulum import pytest +import time_machine +from airflow.models.asset import AssetDagRunQueue, AssetModel from airflow.models.dag import DagModel, DagTag from airflow.models.dagrun import DagRun from airflow.operators.empty import EmptyOperator +from airflow.utils import timezone as tz from airflow.utils.session import provide_session from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -557,3 +561,69 @@ def test_delete_dag( details_response = test_client.get(f"{API_PREFIX}/{dag_id}/details") assert details_response.status_code == status_code_details + + +class TestQueuedEventEndpoint: + default_time = "2020-06-11T18:00:00+00:00" + + @pytest.fixture + def time_freezer(self) -> Generator: + freezer = time_machine.travel(self.default_time, tick=False) + freezer.start() + + yield + + freezer.stop() + + def _create_asset_dag_run_queues(self, dag_id, asset_id, session): + adrq = AssetDagRunQueue(target_dag_id=dag_id, asset_id=asset_id) + session.add(adrq) + session.commit() + return adrq + + def _create_asset(self, session): + asset_model = AssetModel( + id=1, + uri="s3://bucket/key", + extra={"foo": "bar"}, + created_at=tz.parse(self.default_time), + updated_at=tz.parse(self.default_time), + ) + session.add(asset_model) + session.commit() + return asset_model + + +class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint): + @pytest.mark.usefixtures("time_freezer") + def test_should_respond_200(self, test_client, session, create_dummy_dag): + dag, _ = create_dummy_dag() + dag_id = dag.dag_id + asset_id = self._create_asset(session).id + self._create_asset_dag_run_queues(dag_id, asset_id, session) + + response = test_client.get( + f"/public/dags/{dag_id}/assets/queuedEvent", + ) + + assert response.status_code == 200 + assert response.json() == { + "queued_events": [ + { + "created_at": self.default_time.replace("+00:00", "Z"), + "uri": "s3://bucket/key", + "dag_id": "dag", + } + ], + "total_entries": 1, + } + + def test_should_respond_404(self, test_client): + dag_id = "not_exists" + + response = test_client.get( + f"/public/dags/{dag_id}/assets/queuedEvent", + ) + + assert response.status_code == 404 + assert response.json()["detail"] == "Queue event with dag_id: `not_exists` was not found" From 123bb65d41fe9710c55e6d3d7960fc380e89fc9a Mon Sep 17 00:00:00 2001 From: Amogh Date: Thu, 14 Nov 2024 16:36:19 +0530 Subject: [PATCH 03/13] adding setup and teardown --- tests/api_fastapi/core_api/routes/public/test_dags.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_dags.py b/tests/api_fastapi/core_api/routes/public/test_dags.py index 66cbac3ef149..333dccf522d8 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dags.py +++ b/tests/api_fastapi/core_api/routes/public/test_dags.py @@ -32,7 +32,7 @@ from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunTriggeredByType, DagRunType -from tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags +from tests_common.test_utils.db import clear_db_assets, clear_db_dags, clear_db_runs, clear_db_serialized_dags pytestmark = pytest.mark.db_test @@ -566,6 +566,13 @@ def test_delete_dag( class TestQueuedEventEndpoint: default_time = "2020-06-11T18:00:00+00:00" + @pytest.fixture(autouse=True) + def setup(self) -> None: + clear_db_assets() + + def teardown_method(self) -> None: + clear_db_assets() + @pytest.fixture def time_freezer(self) -> Generator: freezer = time_machine.travel(self.default_time, tick=False) From 01898bfcae936312e4e51974067fd67335111e33 Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 15 Nov 2024 11:21:47 +0530 Subject: [PATCH 04/13] review comments part 1 --- airflow/api_fastapi/core_api/datamodels/dags.py | 4 ++-- airflow/api_fastapi/core_api/openapi/v1-generated.yaml | 4 ++-- airflow/api_fastapi/core_api/routes/public/dags.py | 2 -- airflow/ui/openapi-gen/requests/schemas.gen.ts | 4 ++-- airflow/ui/openapi-gen/requests/types.gen.ts | 4 ++-- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dags.py b/airflow/api_fastapi/core_api/datamodels/dags.py index 415367174855..fbea9bc482a3 100644 --- a/airflow/api_fastapi/core_api/datamodels/dags.py +++ b/airflow/api_fastapi/core_api/datamodels/dags.py @@ -162,7 +162,7 @@ class DAGTagCollectionResponse(BaseModel): class QueuedEventResponse(BaseModel): - """QueuedEvent serializer for responses..""" + """Queued Event serializer for responses..""" uri: str dag_id: str @@ -170,7 +170,7 @@ class QueuedEventResponse(BaseModel): class QueuedEventCollectionResponse(BaseModel): - """QueuedEventCollection serializer for responses.""" + """Queued Event Collection serializer for responses.""" queued_events: list[QueuedEventResponse] total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index a44eff09d2c2..6921c5efed77 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -5539,7 +5539,7 @@ components: - queued_events - total_entries title: QueuedEventCollectionResponse - description: QueuedEventCollection serializer for responses. + description: Queued Event Collection serializer for responses. QueuedEventResponse: properties: uri: @@ -5558,7 +5558,7 @@ components: - dag_id - created_at title: QueuedEventResponse - description: QueuedEvent serializer for responses.. + description: Queued Event serializer for responses.. ReprocessBehavior: type: string enum: diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow/api_fastapi/core_api/routes/public/dags.py index 0b1ddcd188bb..1ebbb31c9565 100644 --- a/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow/api_fastapi/core_api/routes/public/dags.py @@ -313,8 +313,6 @@ def delete_dag( "/{dag_id}/assets/queuedEvent", responses=create_openapi_http_exception_doc( [ - status.HTTP_401_UNAUTHORIZED, - status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND, ] ), diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index dc86d578e5db..6db4ec1e1b7d 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2861,7 +2861,7 @@ export const $QueuedEventCollectionResponse = { type: "object", required: ["queued_events", "total_entries"], title: "QueuedEventCollectionResponse", - description: "QueuedEventCollection serializer for responses.", + description: "Queued Event Collection serializer for responses.", } as const; export const $QueuedEventResponse = { @@ -2883,7 +2883,7 @@ export const $QueuedEventResponse = { type: "object", required: ["uri", "dag_id", "created_at"], title: "QueuedEventResponse", - description: "QueuedEvent serializer for responses..", + description: "Queued Event serializer for responses..", } as const; export const $ReprocessBehavior = { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 0f207fcd0fdb..21b7ad000b9d 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -696,7 +696,7 @@ export type ProviderResponse = { }; /** - * QueuedEventCollection serializer for responses. + * Queued Event Collection serializer for responses. */ export type QueuedEventCollectionResponse = { queued_events: Array; @@ -704,7 +704,7 @@ export type QueuedEventCollectionResponse = { }; /** - * QueuedEvent serializer for responses.. + * Queued Event serializer for responses.. */ export type QueuedEventResponse = { uri: string; From 2a684f6a2397f2dc6c80bfa826d13cb4b7ac72f6 Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 15 Nov 2024 11:39:38 +0530 Subject: [PATCH 05/13] introducing _generate_queued_event_where_clause --- airflow/api_fastapi/common/utils.py | 36 +++++++++++++++++++ .../core_api/routes/public/dags.py | 28 ++++++++++++--- 2 files changed, 59 insertions(+), 5 deletions(-) create mode 100644 airflow/api_fastapi/common/utils.py diff --git a/airflow/api_fastapi/common/utils.py b/airflow/api_fastapi/common/utils.py new file mode 100644 index 000000000000..615ea7130e72 --- /dev/null +++ b/airflow/api_fastapi/common/utils.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from fastapi import HTTPException, status +from pendulum.parsing import ParserError + +from airflow.utils import timezone + + +def format_datetime(value: str) -> datetime: + """ + Format datetime objects. + + If it can't be parsed, it returns an HTTP 400 exception. + """ + try: + return timezone.parse(value) + except (ParserError, TypeError) as err: + raise HTTPException(status.HTTP_404_NOT_FOUND, detail=f"Incorrect datetime argument: {err}") diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow/api_fastapi/core_api/routes/public/dags.py index 1ebbb31c9565..06a2c3902a39 100644 --- a/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow/api_fastapi/core_api/routes/public/dags.py @@ -44,6 +44,7 @@ SortParam, ) from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.utils import format_datetime from airflow.api_fastapi.core_api.datamodels.dags import ( DAGCollectionResponse, DAGDetailsResponse, @@ -57,11 +58,31 @@ from airflow.exceptions import AirflowException, DagNotFound from airflow.models import DAG, DagModel, DagTag from airflow.models.asset import AssetDagRunQueue, AssetModel -from airflow.utils import timezone dags_router = AirflowRouter(tags=["DAG"], prefix="/dags") +def _generate_queued_event_where_clause( + *, + dag_id: str | None = None, + uri: str | None = None, + before: str | None = None, +) -> list: + """Get AssetDagRunQueue where clause.""" + where_clause = [] + if dag_id is not None: + where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) + if uri is not None: + where_clause.append( + AssetDagRunQueue.asset_id.in_( + select(AssetModel.id).where(AssetModel.uri == uri), + ), + ) + if before is not None: + where_clause.append(AssetDagRunQueue.created_at < format_datetime(before)) + return where_clause + + @dags_router.get("/") def get_dags( limit: QueryLimit, @@ -323,10 +344,7 @@ def get_dag_asset_queued_events( before: str = Query(None), ) -> QueuedEventCollectionResponse: """Get queued asset events for a DAG.""" - where_clause = [AssetDagRunQueue.target_dag_id == dag_id] - if before: - before_parsed = timezone.parse(before) - where_clause.append(AssetDagRunQueue.created_at < before_parsed) + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) query = ( select(AssetDagRunQueue, AssetModel.uri) .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) From 1c48a6f2cdc55ade30f30bf36e2f5187748a1d9b Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 15 Nov 2024 11:42:38 +0530 Subject: [PATCH 06/13] changing to bad request --- airflow/api_fastapi/common/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/common/utils.py b/airflow/api_fastapi/common/utils.py index 615ea7130e72..5eb2601fb151 100644 --- a/airflow/api_fastapi/common/utils.py +++ b/airflow/api_fastapi/common/utils.py @@ -33,4 +33,4 @@ def format_datetime(value: str) -> datetime: try: return timezone.parse(value) except (ParserError, TypeError) as err: - raise HTTPException(status.HTTP_404_NOT_FOUND, detail=f"Incorrect datetime argument: {err}") + raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=f"Incorrect datetime argument: {err}") From 9a7574c659034ed2511f66d2406bfe2378599885 Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 15 Nov 2024 12:02:01 +0530 Subject: [PATCH 07/13] adding paginated_select --- .../api_fastapi/core_api/routes/public/dags.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow/api_fastapi/core_api/routes/public/dags.py index 06a2c3902a39..7796bc08b77e 100644 --- a/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow/api_fastapi/core_api/routes/public/dags.py @@ -350,14 +350,21 @@ def get_dag_asset_queued_events( .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) .where(*where_clause) ) - result = session.execute(query).all() - total_entries = len(result) - if not result: + + dag_asset_queued_events_select, total_entries = paginated_select( + query, + [], + ) + adrqs = session.execute(dag_asset_queued_events_select).all() + + if not adrqs: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") + queued_events = [ QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in result + for adrq, uri in adrqs ] + return QueuedEventCollectionResponse( queued_events=[ QueuedEventResponse.model_validate(queued_event, from_attributes=True) From c1a7eed0fdf8ef396eb0a77db5eb90033f2ec654 Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 15 Nov 2024 12:20:03 +0530 Subject: [PATCH 08/13] moving _generate_queued_event_where_clause to assets.py --- .../core_api/routes/public/assets.py | 24 ++++++++++++++++++- .../core_api/routes/public/dags.py | 23 +----------------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 67218c471615..12b4bbe9e7a3 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -37,6 +37,7 @@ SortParam, ) from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.utils import format_datetime from airflow.api_fastapi.core_api.datamodels.assets import ( AssetCollectionResponse, AssetEventCollectionResponse, @@ -44,11 +45,32 @@ AssetResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.models.asset import AssetEvent, AssetModel +from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel assets_router = AirflowRouter(tags=["Asset"], prefix="/assets") +def _generate_queued_event_where_clause( + *, + dag_id: str | None = None, + uri: str | None = None, + before: str | None = None, +) -> list: + """Get AssetDagRunQueue where clause.""" + where_clause = [] + if dag_id is not None: + where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) + if uri is not None: + where_clause.append( + AssetDagRunQueue.asset_id.in_( + select(AssetModel.id).where(AssetModel.uri == uri), + ), + ) + if before is not None: + where_clause.append(AssetDagRunQueue.created_at < format_datetime(before)) + return where_clause + + @assets_router.get( "/", responses=create_openapi_http_exception_doc([401, 403, 404]), diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow/api_fastapi/core_api/routes/public/dags.py index 7796bc08b77e..d9fb6c28ad26 100644 --- a/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow/api_fastapi/core_api/routes/public/dags.py @@ -44,7 +44,6 @@ SortParam, ) from airflow.api_fastapi.common.router import AirflowRouter -from airflow.api_fastapi.common.utils import format_datetime from airflow.api_fastapi.core_api.datamodels.dags import ( DAGCollectionResponse, DAGDetailsResponse, @@ -55,6 +54,7 @@ QueuedEventResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.routes.public.assets import _generate_queued_event_where_clause from airflow.exceptions import AirflowException, DagNotFound from airflow.models import DAG, DagModel, DagTag from airflow.models.asset import AssetDagRunQueue, AssetModel @@ -62,27 +62,6 @@ dags_router = AirflowRouter(tags=["DAG"], prefix="/dags") -def _generate_queued_event_where_clause( - *, - dag_id: str | None = None, - uri: str | None = None, - before: str | None = None, -) -> list: - """Get AssetDagRunQueue where clause.""" - where_clause = [] - if dag_id is not None: - where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) - if uri is not None: - where_clause.append( - AssetDagRunQueue.asset_id.in_( - select(AssetModel.id).where(AssetModel.uri == uri), - ), - ) - if before is not None: - where_clause.append(AssetDagRunQueue.created_at < format_datetime(before)) - return where_clause - - @dags_router.get("/") def get_dags( limit: QueryLimit, From 8301a1b8d2b149bc841b69eb6a6332ff9b089acf Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 15 Nov 2024 12:42:24 +0530 Subject: [PATCH 09/13] moving datamodels to assets --- airflow/api_fastapi/core_api/datamodels/assets.py | 15 +++++++++++++++ airflow/api_fastapi/core_api/datamodels/dags.py | 15 --------------- .../api_fastapi/core_api/routes/public/dags.py | 3 +-- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 85e41ff7b569..9ac4528964e6 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -99,3 +99,18 @@ class AssetEventCollectionResponse(BaseModel): asset_events: list[AssetEventResponse] total_entries: int + + +class QueuedEventResponse(BaseModel): + """Queued Event serializer for responses..""" + + uri: str + dag_id: str + created_at: datetime + + +class QueuedEventCollectionResponse(BaseModel): + """Queued Event Collection serializer for responses.""" + + queued_events: list[QueuedEventResponse] + total_entries: int diff --git a/airflow/api_fastapi/core_api/datamodels/dags.py b/airflow/api_fastapi/core_api/datamodels/dags.py index fbea9bc482a3..27cc3ad47356 100644 --- a/airflow/api_fastapi/core_api/datamodels/dags.py +++ b/airflow/api_fastapi/core_api/datamodels/dags.py @@ -159,18 +159,3 @@ class DAGTagCollectionResponse(BaseModel): tags: list[str] total_entries: int - - -class QueuedEventResponse(BaseModel): - """Queued Event serializer for responses..""" - - uri: str - dag_id: str - created_at: datetime - - -class QueuedEventCollectionResponse(BaseModel): - """Queued Event Collection serializer for responses.""" - - queued_events: list[QueuedEventResponse] - total_entries: int diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow/api_fastapi/core_api/routes/public/dags.py index d9fb6c28ad26..2ee1b8f6c67c 100644 --- a/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow/api_fastapi/core_api/routes/public/dags.py @@ -44,14 +44,13 @@ SortParam, ) from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.assets import QueuedEventCollectionResponse, QueuedEventResponse from airflow.api_fastapi.core_api.datamodels.dags import ( DAGCollectionResponse, DAGDetailsResponse, DAGPatchBody, DAGResponse, DAGTagCollectionResponse, - QueuedEventCollectionResponse, - QueuedEventResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.routes.public.assets import _generate_queued_event_where_clause From c593a3f86c400738a743b3739608ed7e95a462dc Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 15 Nov 2024 12:59:45 +0530 Subject: [PATCH 10/13] moving tests to assets --- .../core_api/routes/public/test_assets.py | 83 ++++++++++++++++++- .../core_api/routes/public/test_dags.py | 79 +----------------- 2 files changed, 83 insertions(+), 79 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 95ad658ba4d9..f37b038e938d 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -17,11 +17,19 @@ from __future__ import annotations import urllib +from typing import Generator import pytest +import time_machine from airflow.models import DagModel -from airflow.models.asset import AssetEvent, AssetModel, DagScheduleAssetReference, TaskOutletAssetReference +from airflow.models.asset import ( + AssetDagRunQueue, + AssetEvent, + AssetModel, + DagScheduleAssetReference, + TaskOutletAssetReference, +) from airflow.models.dagrun import DagRun from airflow.utils import timezone from airflow.utils.session import provide_session @@ -459,3 +467,76 @@ def test_should_respond_404(self, test_client): ) assert response.status_code == 404 assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found" + + +class TestQueuedEventEndpoint: + default_time = "2020-06-11T18:00:00+00:00" + + @pytest.fixture(autouse=True) + def setup(self) -> None: + clear_db_assets() + + def teardown_method(self) -> None: + clear_db_assets() + + @pytest.fixture + def time_freezer(self) -> Generator: + freezer = time_machine.travel(self.default_time, tick=False) + freezer.start() + + yield + + freezer.stop() + + def _create_asset_dag_run_queues(self, dag_id, asset_id, session): + adrq = AssetDagRunQueue(target_dag_id=dag_id, asset_id=asset_id) + session.add(adrq) + session.commit() + return adrq + + def _create_asset(self, session): + asset_model = AssetModel( + id=1, + uri="s3://bucket/key", + extra={"foo": "bar"}, + created_at=timezone.parse(self.default_time), + updated_at=timezone.parse(self.default_time), + ) + session.add(asset_model) + session.commit() + return asset_model + + +class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint): + @pytest.mark.usefixtures("time_freezer") + def test_should_respond_200(self, test_client, session, create_dummy_dag): + dag, _ = create_dummy_dag() + dag_id = dag.dag_id + asset_id = self._create_asset(session).id + self._create_asset_dag_run_queues(dag_id, asset_id, session) + + response = test_client.get( + f"/public/dags/{dag_id}/assets/queuedEvent", + ) + + assert response.status_code == 200 + assert response.json() == { + "queued_events": [ + { + "created_at": self.default_time.replace("+00:00", "Z"), + "uri": "s3://bucket/key", + "dag_id": "dag", + } + ], + "total_entries": 1, + } + + def test_should_respond_404(self, test_client): + dag_id = "not_exists" + + response = test_client.get( + f"/public/dags/{dag_id}/assets/queuedEvent", + ) + + assert response.status_code == 404 + assert response.json()["detail"] == "Queue event with dag_id: `not_exists` was not found" diff --git a/tests/api_fastapi/core_api/routes/public/test_dags.py b/tests/api_fastapi/core_api/routes/public/test_dags.py index 333dccf522d8..e200763c2a72 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dags.py +++ b/tests/api_fastapi/core_api/routes/public/test_dags.py @@ -17,22 +17,18 @@ from __future__ import annotations from datetime import datetime, timezone -from typing import Generator import pendulum import pytest -import time_machine -from airflow.models.asset import AssetDagRunQueue, AssetModel from airflow.models.dag import DagModel, DagTag from airflow.models.dagrun import DagRun from airflow.operators.empty import EmptyOperator -from airflow.utils import timezone as tz from airflow.utils.session import provide_session from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunTriggeredByType, DagRunType -from tests_common.test_utils.db import clear_db_assets, clear_db_dags, clear_db_runs, clear_db_serialized_dags +from tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags pytestmark = pytest.mark.db_test @@ -561,76 +557,3 @@ def test_delete_dag( details_response = test_client.get(f"{API_PREFIX}/{dag_id}/details") assert details_response.status_code == status_code_details - - -class TestQueuedEventEndpoint: - default_time = "2020-06-11T18:00:00+00:00" - - @pytest.fixture(autouse=True) - def setup(self) -> None: - clear_db_assets() - - def teardown_method(self) -> None: - clear_db_assets() - - @pytest.fixture - def time_freezer(self) -> Generator: - freezer = time_machine.travel(self.default_time, tick=False) - freezer.start() - - yield - - freezer.stop() - - def _create_asset_dag_run_queues(self, dag_id, asset_id, session): - adrq = AssetDagRunQueue(target_dag_id=dag_id, asset_id=asset_id) - session.add(adrq) - session.commit() - return adrq - - def _create_asset(self, session): - asset_model = AssetModel( - id=1, - uri="s3://bucket/key", - extra={"foo": "bar"}, - created_at=tz.parse(self.default_time), - updated_at=tz.parse(self.default_time), - ) - session.add(asset_model) - session.commit() - return asset_model - - -class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint): - @pytest.mark.usefixtures("time_freezer") - def test_should_respond_200(self, test_client, session, create_dummy_dag): - dag, _ = create_dummy_dag() - dag_id = dag.dag_id - asset_id = self._create_asset(session).id - self._create_asset_dag_run_queues(dag_id, asset_id, session) - - response = test_client.get( - f"/public/dags/{dag_id}/assets/queuedEvent", - ) - - assert response.status_code == 200 - assert response.json() == { - "queued_events": [ - { - "created_at": self.default_time.replace("+00:00", "Z"), - "uri": "s3://bucket/key", - "dag_id": "dag", - } - ], - "total_entries": 1, - } - - def test_should_respond_404(self, test_client): - dag_id = "not_exists" - - response = test_client.get( - f"/public/dags/{dag_id}/assets/queuedEvent", - ) - - assert response.status_code == 404 - assert response.json()["detail"] == "Queue event with dag_id: `not_exists` was not found" From 4ee4dffd158bdcce48a64115a14dded02abe3c51 Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 15 Nov 2024 13:12:45 +0530 Subject: [PATCH 11/13] reuse fixtures from TestGetAssets and inherit classes to simplify --- .../core_api/routes/public/test_assets.py | 28 +++---------------- 1 file changed, 4 insertions(+), 24 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index f37b038e938d..36efc75340c3 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -469,16 +469,7 @@ def test_should_respond_404(self, test_client): assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found" -class TestQueuedEventEndpoint: - default_time = "2020-06-11T18:00:00+00:00" - - @pytest.fixture(autouse=True) - def setup(self) -> None: - clear_db_assets() - - def teardown_method(self) -> None: - clear_db_assets() - +class TestQueuedEventEndpoint(TestAssets): @pytest.fixture def time_freezer(self) -> Generator: freezer = time_machine.travel(self.default_time, tick=False) @@ -494,25 +485,14 @@ def _create_asset_dag_run_queues(self, dag_id, asset_id, session): session.commit() return adrq - def _create_asset(self, session): - asset_model = AssetModel( - id=1, - uri="s3://bucket/key", - extra={"foo": "bar"}, - created_at=timezone.parse(self.default_time), - updated_at=timezone.parse(self.default_time), - ) - session.add(asset_model) - session.commit() - return asset_model - class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint): @pytest.mark.usefixtures("time_freezer") def test_should_respond_200(self, test_client, session, create_dummy_dag): dag, _ = create_dummy_dag() dag_id = dag.dag_id - asset_id = self._create_asset(session).id + self.create_assets(session=session, num=1) + asset_id = 1 self._create_asset_dag_run_queues(dag_id, asset_id, session) response = test_client.get( @@ -524,7 +504,7 @@ def test_should_respond_200(self, test_client, session, create_dummy_dag): "queued_events": [ { "created_at": self.default_time.replace("+00:00", "Z"), - "uri": "s3://bucket/key", + "uri": "s3://bucket/key/1", "dag_id": "dag", } ], From 79caf8617c6d31456524b0e8df3918c3704fcccd Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Fri, 15 Nov 2024 18:25:49 +0100 Subject: [PATCH 12/13] Move route to assets module --- .../core_api/openapi/v1-generated.yaml | 106 +++++++++--------- .../core_api/routes/public/assets.py | 54 ++++++++- .../core_api/routes/public/dags.py | 47 -------- airflow/ui/openapi-gen/queries/common.ts | 44 ++++---- airflow/ui/openapi-gen/queries/prefetch.ts | 52 ++++----- airflow/ui/openapi-gen/queries/queries.ts | 66 +++++------ airflow/ui/openapi-gen/queries/suspense.ts | 66 +++++------ .../ui/openapi-gen/requests/services.gen.ts | 66 +++++------ airflow/ui/openapi-gen/requests/types.gen.ts | 70 ++++++------ 9 files changed, 286 insertions(+), 285 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 6921c5efed77..b3caee5e3ee3 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -170,7 +170,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/: + /public/assets: get: tags: - Asset @@ -391,6 +391,59 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/assets/queuedEvent: + get: + tags: + - Asset + summary: Get Dag Asset Queued Events + description: Get queued asset events for a DAG. + operationId: get_dag_asset_queued_events + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: before + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Before + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/QueuedEventCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/backfills/: get: tags: @@ -1864,57 +1917,6 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/assets/queuedEvent: - get: - tags: - - DAG - summary: Get Dag Asset Queued Events - description: Get queued asset events for a DAG. - operationId: get_dag_asset_queued_events - parameters: - - name: dag_id - in: path - required: true - schema: - type: string - title: Dag Id - - name: before - in: query - required: false - schema: - type: string - title: Before - responses: - '200': - description: Successful Response - content: - application/json: - schema: - $ref: '#/components/schemas/QueuedEventCollectionResponse' - '401': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Unauthorized - '403': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Forbidden - '404': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Not Found - '422': - description: Validation Error - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' /public/eventLogs/{event_log_id}: get: tags: diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 12b4bbe9e7a3..03adc7993d29 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -43,11 +43,13 @@ AssetEventCollectionResponse, AssetEventResponse, AssetResponse, + QueuedEventCollectionResponse, + QueuedEventResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel -assets_router = AirflowRouter(tags=["Asset"], prefix="/assets") +assets_router = AirflowRouter(tags=["Asset"]) def _generate_queued_event_where_clause( @@ -72,7 +74,7 @@ def _generate_queued_event_where_clause( @assets_router.get( - "/", + "/assets", responses=create_openapi_http_exception_doc([401, 403, 404]), ) def get_assets( @@ -107,7 +109,7 @@ def get_assets( @assets_router.get( - "/events", + "/assets/events", responses=create_openapi_http_exception_doc([404]), ) def get_asset_events( @@ -157,7 +159,7 @@ def get_asset_events( @assets_router.get( - "/{uri:path}", + "/assets/{uri:path}", responses=create_openapi_http_exception_doc([401, 403, 404]), ) def get_asset( @@ -175,3 +177,47 @@ def get_asset( raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found") return AssetResponse.model_validate(asset, from_attributes=True) + + +@assets_router.get( + "/dags/{dag_id}/assets/queuedEvent", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_404_NOT_FOUND, + ] + ), +) +def get_dag_asset_queued_events( + dag_id: str, + session: Annotated[Session, Depends(get_session)], + before: str | None = None, +) -> QueuedEventCollectionResponse: + """Get queued asset events for a DAG.""" + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) + query = ( + select(AssetDagRunQueue, AssetModel.uri) + .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) + .where(*where_clause) + ) + + dag_asset_queued_events_select, total_entries = paginated_select( + query, + [], + ) + adrqs = session.execute(dag_asset_queued_events_select).all() + + if not adrqs: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") + + queued_events = [ + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) + for adrq, uri in adrqs + ] + + return QueuedEventCollectionResponse( + queued_events=[ + QueuedEventResponse.model_validate(queued_event, from_attributes=True) + for queued_event in queued_events + ], + total_entries=total_entries, + ) diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow/api_fastapi/core_api/routes/public/dags.py index 2ee1b8f6c67c..8ab1ae3798d6 100644 --- a/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow/api_fastapi/core_api/routes/public/dags.py @@ -44,7 +44,6 @@ SortParam, ) from airflow.api_fastapi.common.router import AirflowRouter -from airflow.api_fastapi.core_api.datamodels.assets import QueuedEventCollectionResponse, QueuedEventResponse from airflow.api_fastapi.core_api.datamodels.dags import ( DAGCollectionResponse, DAGDetailsResponse, @@ -53,10 +52,8 @@ DAGTagCollectionResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.api_fastapi.core_api.routes.public.assets import _generate_queued_event_where_clause from airflow.exceptions import AirflowException, DagNotFound from airflow.models import DAG, DagModel, DagTag -from airflow.models.asset import AssetDagRunQueue, AssetModel dags_router = AirflowRouter(tags=["DAG"], prefix="/dags") @@ -306,47 +303,3 @@ def delete_dag( status.HTTP_409_CONFLICT, f"Task instances of dag with id: '{dag_id}' are still running" ) return Response(status_code=status.HTTP_204_NO_CONTENT) - - -@dags_router.get( - "/{dag_id}/assets/queuedEvent", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), -) -def get_dag_asset_queued_events( - dag_id: str, - session: Annotated[Session, Depends(get_session)], - before: str = Query(None), -) -> QueuedEventCollectionResponse: - """Get queued asset events for a DAG.""" - where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) - query = ( - select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) - - dag_asset_queued_events_select, total_entries = paginated_select( - query, - [], - ) - adrqs = session.execute(dag_asset_queued_events_select).all() - - if not adrqs: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") - - queued_events = [ - QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in adrqs - ] - - return QueuedEventCollectionResponse( - queued_events=[ - QueuedEventResponse.model_validate(queued_event, from_attributes=True) - for queued_event in queued_events - ], - total_entries=total_entries, - ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index e968c4ede491..761d07f352e3 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -129,6 +129,28 @@ export const UseAssetServiceGetAssetKeyFn = ( }, queryKey?: Array, ) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])]; +export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited< + ReturnType +>; +export type AssetServiceGetDagAssetQueuedEventsQueryResult< + TData = AssetServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useAssetServiceGetDagAssetQueuedEventsKey = + "AssetServiceGetDagAssetQueuedEvents"; +export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = ( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: Array, +) => [ + useAssetServiceGetDagAssetQueuedEventsKey, + ...(queryKey ?? [{ before, dagId }]), +]; export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited< ReturnType >; @@ -478,28 +500,6 @@ export const UseDagServiceGetDagDetailsKeyFn = ( }, queryKey?: Array, ) => [useDagServiceGetDagDetailsKey, ...(queryKey ?? [{ dagId }])]; -export type DagServiceGetDagAssetQueuedEventsDefaultResponse = Awaited< - ReturnType ->; -export type DagServiceGetDagAssetQueuedEventsQueryResult< - TData = DagServiceGetDagAssetQueuedEventsDefaultResponse, - TError = unknown, -> = UseQueryResult; -export const useDagServiceGetDagAssetQueuedEventsKey = - "DagServiceGetDagAssetQueuedEvents"; -export const UseDagServiceGetDagAssetQueuedEventsKeyFn = ( - { - before, - dagId, - }: { - before?: string; - dagId: string; - }, - queryKey?: Array, -) => [ - useDagServiceGetDagAssetQueuedEventsKey, - ...(queryKey ?? [{ before, dagId }]), -]; export type EventLogServiceGetEventLogDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 0ac1b812f83b..51998a375065 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -165,6 +165,32 @@ export const prefetchUseAssetServiceGetAsset = ( queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }), queryFn: () => AssetService.getAsset({ uri }), }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( + queryClient: QueryClient, + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn({ + before, + dagId, + }), + queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }), + }); /** * Historical Metrics * Return cluster activity historical metrics. @@ -622,32 +648,6 @@ export const prefetchUseDagServiceGetDagDetails = ( queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }), queryFn: () => DagService.getDagDetails({ dagId }), }); -/** - * Get Dag Asset Queued Events - * Get queued asset events for a DAG. - * @param data The data for the request. - * @param data.dagId - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ -export const prefetchUseDagServiceGetDagAssetQueuedEvents = ( - queryClient: QueryClient, - { - before, - dagId, - }: { - before?: string; - dagId: string; - }, -) => - queryClient.prefetchQuery({ - queryKey: Common.UseDagServiceGetDagAssetQueuedEventsKeyFn({ - before, - dagId, - }), - queryFn: () => DagService.getDagAssetQueuedEvents({ before, dagId }), - }); /** * Get Event Log * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index f39290037c2f..93df8f104ac2 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -210,6 +210,39 @@ export const useAssetServiceGetAsset = < queryFn: () => AssetService.getAsset({ uri }) as TData, ...options, }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetDagAssetQueuedEvents = < + TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn( + { before, dagId }, + queryKey, + ), + queryFn: () => + AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. @@ -766,39 +799,6 @@ export const useDagServiceGetDagDetails = < queryFn: () => DagService.getDagDetails({ dagId }) as TData, ...options, }); -/** - * Get Dag Asset Queued Events - * Get queued asset events for a DAG. - * @param data The data for the request. - * @param data.dagId - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ -export const useDagServiceGetDagAssetQueuedEvents = < - TData = Common.DagServiceGetDagAssetQueuedEventsDefaultResponse, - TError = unknown, - TQueryKey extends Array = unknown[], ->( - { - before, - dagId, - }: { - before?: string; - dagId: string; - }, - queryKey?: TQueryKey, - options?: Omit, "queryKey" | "queryFn">, -) => - useQuery({ - queryKey: Common.UseDagServiceGetDagAssetQueuedEventsKeyFn( - { before, dagId }, - queryKey, - ), - queryFn: () => - DagService.getDagAssetQueuedEvents({ before, dagId }) as TData, - ...options, - }); /** * Get Event Log * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index aca120c28fe0..7ec5ecc8319c 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -195,6 +195,39 @@ export const useAssetServiceGetAssetSuspense = < queryFn: () => AssetService.getAsset({ uri }) as TData, ...options, }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetDagAssetQueuedEventsSuspense = < + TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn( + { before, dagId }, + queryKey, + ), + queryFn: () => + AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. @@ -751,39 +784,6 @@ export const useDagServiceGetDagDetailsSuspense = < queryFn: () => DagService.getDagDetails({ dagId }) as TData, ...options, }); -/** - * Get Dag Asset Queued Events - * Get queued asset events for a DAG. - * @param data The data for the request. - * @param data.dagId - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ -export const useDagServiceGetDagAssetQueuedEventsSuspense = < - TData = Common.DagServiceGetDagAssetQueuedEventsDefaultResponse, - TError = unknown, - TQueryKey extends Array = unknown[], ->( - { - before, - dagId, - }: { - before?: string; - dagId: string; - }, - queryKey?: TQueryKey, - options?: Omit, "queryKey" | "queryFn">, -) => - useSuspenseQuery({ - queryKey: Common.UseDagServiceGetDagAssetQueuedEventsKeyFn( - { before, dagId }, - queryKey, - ), - queryFn: () => - DagService.getDagAssetQueuedEvents({ before, dagId }) as TData, - ...options, - }); /** * Get Event Log * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 2da448df6fcf..4df1042ea9bd 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -11,6 +11,8 @@ import type { GetAssetEventsResponse, GetAssetData, GetAssetResponse, + GetDagAssetQueuedEventsData, + GetDagAssetQueuedEventsResponse, HistoricalMetricsData, HistoricalMetricsResponse, RecentDagRunsData, @@ -65,8 +67,6 @@ import type { DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, - GetDagAssetQueuedEventsData, - GetDagAssetQueuedEventsResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, @@ -159,7 +159,7 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/assets/", + url: "/public/assets", query: { limit: data.limit, offset: data.offset, @@ -241,6 +241,36 @@ export class AssetService { }, }); } + + /** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ + public static getDagAssetQueuedEvents( + data: GetDagAssetQueuedEventsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/assets/queuedEvent", + path: { + dag_id: data.dagId, + }, + query: { + before: data.before, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class DashboardService { @@ -1070,36 +1100,6 @@ export class DagService { }, }); } - - /** - * Get Dag Asset Queued Events - * Get queued asset events for a DAG. - * @param data The data for the request. - * @param data.dagId - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ - public static getDagAssetQueuedEvents( - data: GetDagAssetQueuedEventsData, - ): CancelablePromise { - return __request(OpenAPI, { - method: "GET", - url: "/public/dags/{dag_id}/assets/queuedEvent", - path: { - dag_id: data.dagId, - }, - query: { - before: data.before, - }, - errors: { - 401: "Unauthorized", - 403: "Forbidden", - 404: "Not Found", - 422: "Validation Error", - }, - }); - } } export class EventLogService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 21b7ad000b9d..99b6e98d7cec 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1007,6 +1007,13 @@ export type GetAssetData = { export type GetAssetResponse = AssetResponse; +export type GetDagAssetQueuedEventsData = { + before?: string | null; + dagId: string; +}; + +export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse; + export type HistoricalMetricsData = { endDate: string; startDate: string; @@ -1219,13 +1226,6 @@ export type GetDagDetailsData = { export type GetDagDetailsResponse = DAGDetailsResponse; -export type GetDagAssetQueuedEventsData = { - before?: string; - dagId: string; -}; - -export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse; - export type GetEventLogData = { eventLogId: number; }; @@ -1474,7 +1474,7 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/": { + "/public/assets": { get: { req: GetAssetsData; res: { @@ -1555,6 +1555,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/assets/queuedEvent": { + get: { + req: GetDagAssetQueuedEventsData; + res: { + /** + * Successful Response + */ + 200: QueuedEventCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/ui/dashboard/historical_metrics_data": { get: { req: HistoricalMetricsData; @@ -2286,33 +2313,6 @@ export type $OpenApiTs = { }; }; }; - "/public/dags/{dag_id}/assets/queuedEvent": { - get: { - req: GetDagAssetQueuedEventsData; - res: { - /** - * Successful Response - */ - 200: QueuedEventCollectionResponse; - /** - * Unauthorized - */ - 401: HTTPExceptionResponse; - /** - * Forbidden - */ - 403: HTTPExceptionResponse; - /** - * Not Found - */ - 404: HTTPExceptionResponse; - /** - * Validation Error - */ - 422: HTTPValidationError; - }; - }; - }; "/public/eventLogs/{event_log_id}": { get: { req: GetEventLogData; From ee0ee76ea716854a20339826f38a1cb74931a711 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Fri, 15 Nov 2024 18:52:21 +0100 Subject: [PATCH 13/13] Small adjustments --- airflow/api_fastapi/common/parameters.py | 26 ++++++++++++-- airflow/api_fastapi/common/utils.py | 36 ------------------- .../core_api/openapi/v1-generated.yaml | 3 ++ .../core_api/routes/public/assets.py | 9 ++--- 4 files changed, 32 insertions(+), 42 deletions(-) delete mode 100644 airflow/api_fastapi/common/utils.py diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index abf6378ac5b2..41ef99804332 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -19,7 +19,7 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar +from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar, Union, overload from fastapi import Depends, HTTPException, Query from pendulum.parsing.exceptions import ParserError @@ -373,6 +373,27 @@ def _safe_parse_datetime(date_to_check: str) -> datetime: """ if not date_to_check: raise ValueError(f"{date_to_check} cannot be None.") + return _safe_parse_datetime_optional(date_to_check) + + +@overload +def _safe_parse_datetime_optional(date_to_check: str) -> datetime: ... + + +@overload +def _safe_parse_datetime_optional(date_to_check: None) -> None: ... + + +def _safe_parse_datetime_optional(date_to_check: str | None) -> datetime | None: + """ + Parse datetime and raise error for invalid dates. + + Allow None values. + + :param date_to_check: the string value to be parsed + """ + if date_to_check is None: + return None try: return timezone.parse(date_to_check, strict=True) except (TypeError, ParserError): @@ -578,7 +599,8 @@ def depends_float( # Common Safe DateTime -DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)] +DateTimeQuery = Annotated[datetime, AfterValidator(_safe_parse_datetime)] +OptionalDateTimeQuery = Annotated[Union[datetime, None], AfterValidator(_safe_parse_datetime_optional)] # DAG QueryLimit = Annotated[_LimitFilter, Depends(_LimitFilter().depends)] diff --git a/airflow/api_fastapi/common/utils.py b/airflow/api_fastapi/common/utils.py deleted file mode 100644 index 5eb2601fb151..000000000000 --- a/airflow/api_fastapi/common/utils.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from datetime import datetime - -from fastapi import HTTPException, status -from pendulum.parsing import ParserError - -from airflow.utils import timezone - - -def format_datetime(value: str) -> datetime: - """ - Format datetime objects. - - If it can't be parsed, it returns an HTTP 400 exception. - """ - try: - return timezone.parse(value) - except (ParserError, TypeError) as err: - raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=f"Incorrect datetime argument: {err}") diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index b3caee5e3ee3..8429616f70b9 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -47,12 +47,14 @@ paths: required: true schema: type: string + format: date-time title: Start Date - name: end_date in: query required: true schema: type: string + format: date-time title: End Date responses: '200': @@ -411,6 +413,7 @@ paths: schema: anyOf: - type: string + format: date-time - type: 'null' title: Before responses: diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 03adc7993d29..d4597f9994df 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -17,6 +17,7 @@ from __future__ import annotations +from datetime import datetime from typing import Annotated from fastapi import Depends, HTTPException, status @@ -25,6 +26,7 @@ from airflow.api_fastapi.common.db.common import get_session, paginated_select from airflow.api_fastapi.common.parameters import ( + OptionalDateTimeQuery, QueryAssetDagIdPatternSearch, QueryAssetIdFilter, QueryLimit, @@ -37,7 +39,6 @@ SortParam, ) from airflow.api_fastapi.common.router import AirflowRouter -from airflow.api_fastapi.common.utils import format_datetime from airflow.api_fastapi.core_api.datamodels.assets import ( AssetCollectionResponse, AssetEventCollectionResponse, @@ -56,7 +57,7 @@ def _generate_queued_event_where_clause( *, dag_id: str | None = None, uri: str | None = None, - before: str | None = None, + before: datetime | None = None, ) -> list: """Get AssetDagRunQueue where clause.""" where_clause = [] @@ -69,7 +70,7 @@ def _generate_queued_event_where_clause( ), ) if before is not None: - where_clause.append(AssetDagRunQueue.created_at < format_datetime(before)) + where_clause.append(AssetDagRunQueue.created_at < before) return where_clause @@ -190,7 +191,7 @@ def get_asset( def get_dag_asset_queued_events( dag_id: str, session: Annotated[Session, Depends(get_session)], - before: str | None = None, + before: OptionalDateTimeQuery = None, ) -> QueuedEventCollectionResponse: """Get queued asset events for a DAG.""" where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before)