diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index 837225209590..b15eb1ce9e5a 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -222,6 +222,7 @@ def delete_dag_asset_queued_event( ) +@mark_fastapi_migration_done @security.requires_access_asset("GET") @security.requires_access_dag("GET") @provide_session @@ -250,6 +251,7 @@ def get_dag_asset_queued_events( ) +@mark_fastapi_migration_done @security.requires_access_asset("DELETE") @security.requires_access_dag("GET") @action_logging diff --git a/airflow/api_fastapi/common/utils.py b/airflow/api_fastapi/common/utils.py new file mode 100644 index 000000000000..5eb2601fb151 --- /dev/null +++ b/airflow/api_fastapi/common/utils.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from fastapi import HTTPException, status +from pendulum.parsing import ParserError + +from airflow.utils import timezone + + +def format_datetime(value: str) -> datetime: + """ + Format datetime objects. + + If it can't be parsed, it returns an HTTP 400 exception. + """ + try: + return timezone.parse(value) + except (ParserError, TypeError) as err: + raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=f"Incorrect datetime argument: {err}") diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 85e41ff7b569..9ac4528964e6 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -99,3 +99,18 @@ class AssetEventCollectionResponse(BaseModel): asset_events: list[AssetEventResponse] total_entries: int + + +class QueuedEventResponse(BaseModel): + """Queued Event serializer for responses..""" + + uri: str + dag_id: str + created_at: datetime + + +class QueuedEventCollectionResponse(BaseModel): + """Queued Event Collection serializer for responses.""" + + queued_events: list[QueuedEventResponse] + total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index b99b389de51f..9fd965302816 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1864,6 +1864,108 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/assets/queuedEvent: + get: + tags: + - DAG + summary: Get Dag Asset Queued Events + description: Get queued asset events for a DAG. + operationId: get_dag_asset_queued_events + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: before + in: query + required: false + schema: + type: string + title: Before + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/QueuedEventCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + delete: + tags: + - DAG + summary: Delete Dag Asset Queued Events + operationId: delete_dag_asset_queued_events + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: before + in: query + required: false + schema: + type: string + title: Before + responses: + '204': + description: Successful Response + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/eventLogs/{event_log_id}: get: tags: @@ -5473,6 +5575,41 @@ components: - version title: ProviderResponse description: Provider serializer for responses. + QueuedEventCollectionResponse: + properties: + queued_events: + items: + $ref: '#/components/schemas/QueuedEventResponse' + type: array + title: Queued Events + total_entries: + type: integer + title: Total Entries + type: object + required: + - queued_events + - total_entries + title: QueuedEventCollectionResponse + description: Queued Event Collection serializer for responses. + QueuedEventResponse: + properties: + uri: + type: string + title: Uri + dag_id: + type: string + title: Dag Id + created_at: + type: string + format: date-time + title: Created At + type: object + required: + - uri + - dag_id + - created_at + title: QueuedEventResponse + description: Queued Event serializer for responses.. ReprocessBehavior: type: string enum: diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 67218c471615..12b4bbe9e7a3 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -37,6 +37,7 @@ SortParam, ) from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.utils import format_datetime from airflow.api_fastapi.core_api.datamodels.assets import ( AssetCollectionResponse, AssetEventCollectionResponse, @@ -44,11 +45,32 @@ AssetResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.models.asset import AssetEvent, AssetModel +from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel assets_router = AirflowRouter(tags=["Asset"], prefix="/assets") +def _generate_queued_event_where_clause( + *, + dag_id: str | None = None, + uri: str | None = None, + before: str | None = None, +) -> list: + """Get AssetDagRunQueue where clause.""" + where_clause = [] + if dag_id is not None: + where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) + if uri is not None: + where_clause.append( + AssetDagRunQueue.asset_id.in_( + select(AssetModel.id).where(AssetModel.uri == uri), + ), + ) + if before is not None: + where_clause.append(AssetDagRunQueue.created_at < format_datetime(before)) + return where_clause + + @assets_router.get( "/", responses=create_openapi_http_exception_doc([401, 403, 404]), diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow/api_fastapi/core_api/routes/public/dags.py index 8ab1ae3798d6..c140361d28db 100644 --- a/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow/api_fastapi/core_api/routes/public/dags.py @@ -20,7 +20,7 @@ from typing import Annotated from fastapi import Depends, HTTPException, Query, Request, Response, status -from sqlalchemy import select, update +from sqlalchemy import delete, select, update from sqlalchemy.orm import Session from airflow.api.common import delete_dag as delete_dag_module @@ -44,6 +44,7 @@ SortParam, ) from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.assets import QueuedEventCollectionResponse, QueuedEventResponse from airflow.api_fastapi.core_api.datamodels.dags import ( DAGCollectionResponse, DAGDetailsResponse, @@ -52,8 +53,10 @@ DAGTagCollectionResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.routes.public.assets import _generate_queued_event_where_clause from airflow.exceptions import AirflowException, DagNotFound from airflow.models import DAG, DagModel, DagTag +from airflow.models.asset import AssetDagRunQueue, AssetModel dags_router = AirflowRouter(tags=["DAG"], prefix="/dags") @@ -303,3 +306,73 @@ def delete_dag( status.HTTP_409_CONFLICT, f"Task instances of dag with id: '{dag_id}' are still running" ) return Response(status_code=status.HTTP_204_NO_CONTENT) + + +@dags_router.get( + "/{dag_id}/assets/queuedEvent", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_404_NOT_FOUND, + ] + ), +) +def get_dag_asset_queued_events( + dag_id: str, + session: Annotated[Session, Depends(get_session)], + before: str = Query(None), +) -> QueuedEventCollectionResponse: + """Get queued asset events for a DAG.""" + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) + query = ( + select(AssetDagRunQueue, AssetModel.uri) + .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) + .where(*where_clause) + ) + + dag_asset_queued_events_select, total_entries = paginated_select( + query, + [], + ) + adrqs = session.execute(dag_asset_queued_events_select).all() + + if not adrqs: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") + + queued_events = [ + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) + for adrq, uri in adrqs + ] + + return QueuedEventCollectionResponse( + queued_events=[ + QueuedEventResponse.model_validate(queued_event, from_attributes=True) + for queued_event in queued_events + ], + total_entries=total_entries, + ) + + +@dags_router.delete( + "/{dag_id}/assets/queuedEvent", + status_code=status.HTTP_204_NO_CONTENT, + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_401_UNAUTHORIZED, + status.HTTP_403_FORBIDDEN, + status.HTTP_404_NOT_FOUND, + ] + ), +) +def delete_dag_asset_queued_events( + dag_id: str, + session: Annotated[Session, Depends(get_session)], + before: str = Query(None), +): + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) + delete_statement = delete(AssetDagRunQueue).where(*where_clause) + result = session.execute(delete_statement) + if result.rowcount == 0: + raise HTTPException( + status.HTTP_404_NOT_FOUND, detail=f"Queue event with dag_id: `{dag_id}` was not found" + ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index bc3cabf37929..380e73af5fc9 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -478,6 +478,28 @@ export const UseDagServiceGetDagDetailsKeyFn = ( }, queryKey?: Array, ) => [useDagServiceGetDagDetailsKey, ...(queryKey ?? [{ dagId }])]; +export type DagServiceGetDagAssetQueuedEventsDefaultResponse = Awaited< + ReturnType +>; +export type DagServiceGetDagAssetQueuedEventsQueryResult< + TData = DagServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagServiceGetDagAssetQueuedEventsKey = + "DagServiceGetDagAssetQueuedEvents"; +export const UseDagServiceGetDagAssetQueuedEventsKeyFn = ( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: Array, +) => [ + useDagServiceGetDagAssetQueuedEventsKey, + ...(queryKey ?? [{ before, dagId }]), +]; export type EventLogServiceGetEventLogDefaultResponse = Awaited< ReturnType >; @@ -1096,6 +1118,9 @@ export type DagRunServiceDeleteDagRunMutationResult = Awaited< export type DagServiceDeleteDagMutationResult = Awaited< ReturnType >; +export type DagServiceDeleteDagAssetQueuedEventsMutationResult = Awaited< + ReturnType +>; export type PoolServiceDeletePoolMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index f5f120e55554..0ac1b812f83b 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -622,6 +622,32 @@ export const prefetchUseDagServiceGetDagDetails = ( queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }), queryFn: () => DagService.getDagDetails({ dagId }), }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagServiceGetDagAssetQueuedEvents = ( + queryClient: QueryClient, + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagServiceGetDagAssetQueuedEventsKeyFn({ + before, + dagId, + }), + queryFn: () => DagService.getDagAssetQueuedEvents({ before, dagId }), + }); /** * Get Event Log * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index f16ebde095fb..6fdcc61b55f8 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -766,6 +766,39 @@ export const useDagServiceGetDagDetails = < queryFn: () => DagService.getDagDetails({ dagId }) as TData, ...options, }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagServiceGetDagAssetQueuedEvents = < + TData = Common.DagServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagServiceGetDagAssetQueuedEventsKeyFn( + { before, dagId }, + queryKey, + ), + queryFn: () => + DagService.getDagAssetQueuedEvents({ before, dagId }) as TData, + ...options, + }); /** * Get Event Log * @param data The data for the request. @@ -2448,6 +2481,48 @@ export const useDagServiceDeleteDag = < DagService.deleteDag({ dagId }) as unknown as Promise, ...options, }); +/** + * Delete Dag Asset Queued Events + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns void Successful Response + * @throws ApiError + */ +export const useDagServiceDeleteDagAssetQueuedEvents = < + TData = Common.DagServiceDeleteDagAssetQueuedEventsMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + before?: string; + dagId: string; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + before?: string; + dagId: string; + }, + TContext + >({ + mutationFn: ({ before, dagId }) => + DagService.deleteDagAssetQueuedEvents({ + before, + dagId, + }) as unknown as Promise, + ...options, + }); /** * Delete Pool * Delete a pool entry. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index e1d8d3f9d4ee..aca120c28fe0 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -751,6 +751,39 @@ export const useDagServiceGetDagDetailsSuspense = < queryFn: () => DagService.getDagDetails({ dagId }) as TData, ...options, }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagServiceGetDagAssetQueuedEventsSuspense = < + TData = Common.DagServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagServiceGetDagAssetQueuedEventsKeyFn( + { before, dagId }, + queryKey, + ), + queryFn: () => + DagService.getDagAssetQueuedEvents({ before, dagId }) as TData, + ...options, + }); /** * Get Event Log * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 7bf8f4b02966..6db4ec1e1b7d 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2844,6 +2844,48 @@ export const $ProviderResponse = { description: "Provider serializer for responses.", } as const; +export const $QueuedEventCollectionResponse = { + properties: { + queued_events: { + items: { + $ref: "#/components/schemas/QueuedEventResponse", + }, + type: "array", + title: "Queued Events", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["queued_events", "total_entries"], + title: "QueuedEventCollectionResponse", + description: "Queued Event Collection serializer for responses.", +} as const; + +export const $QueuedEventResponse = { + properties: { + uri: { + type: "string", + title: "Uri", + }, + dag_id: { + type: "string", + title: "Dag Id", + }, + created_at: { + type: "string", + format: "date-time", + title: "Created At", + }, + }, + type: "object", + required: ["uri", "dag_id", "created_at"], + title: "QueuedEventResponse", + description: "Queued Event serializer for responses..", +} as const; + export const $ReprocessBehavior = { type: "string", enum: ["failed", "completed", "none"], diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index ee4ed1e4c41c..93490b6e8613 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -65,6 +65,10 @@ import type { DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, + GetDagAssetQueuedEventsData, + GetDagAssetQueuedEventsResponse, + DeleteDagAssetQueuedEventsData, + DeleteDagAssetQueuedEventsResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, @@ -1068,6 +1072,66 @@ export class DagService { }, }); } + + /** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ + public static getDagAssetQueuedEvents( + data: GetDagAssetQueuedEventsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/assets/queuedEvent", + path: { + dag_id: data.dagId, + }, + query: { + before: data.before, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + + /** + * Delete Dag Asset Queued Events + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns void Successful Response + * @throws ApiError + */ + public static deleteDagAssetQueuedEvents( + data: DeleteDagAssetQueuedEventsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "DELETE", + url: "/public/dags/{dag_id}/assets/queuedEvent", + path: { + dag_id: data.dagId, + }, + query: { + before: data.before, + }, + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class EventLogService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index c4eee6b97c21..9f74fb5db659 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -695,6 +695,23 @@ export type ProviderResponse = { version: string; }; +/** + * Queued Event Collection serializer for responses. + */ +export type QueuedEventCollectionResponse = { + queued_events: Array; + total_entries: number; +}; + +/** + * Queued Event serializer for responses.. + */ +export type QueuedEventResponse = { + uri: string; + dag_id: string; + created_at: string; +}; + /** * Internal enum for setting reprocess behavior in a backfill. * @@ -1202,6 +1219,20 @@ export type GetDagDetailsData = { export type GetDagDetailsResponse = DAGDetailsResponse; +export type GetDagAssetQueuedEventsData = { + before?: string; + dagId: string; +}; + +export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse; + +export type DeleteDagAssetQueuedEventsData = { + before?: string; + dagId: string; +}; + +export type DeleteDagAssetQueuedEventsResponse = void; + export type GetEventLogData = { eventLogId: number; }; @@ -2262,6 +2293,62 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/assets/queuedEvent": { + get: { + req: GetDagAssetQueuedEventsData; + res: { + /** + * Successful Response + */ + 200: QueuedEventCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + delete: { + req: DeleteDagAssetQueuedEventsData; + res: { + /** + * Successful Response + */ + 204: void; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/eventLogs/{event_log_id}": { get: { req: GetEventLogData; diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 95ad658ba4d9..187137f7d82f 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -17,11 +17,19 @@ from __future__ import annotations import urllib +from typing import Generator import pytest +import time_machine from airflow.models import DagModel -from airflow.models.asset import AssetEvent, AssetModel, DagScheduleAssetReference, TaskOutletAssetReference +from airflow.models.asset import ( + AssetDagRunQueue, + AssetEvent, + AssetModel, + DagScheduleAssetReference, + TaskOutletAssetReference, +) from airflow.models.dagrun import DagRun from airflow.utils import timezone from airflow.utils.session import provide_session @@ -459,3 +467,81 @@ def test_should_respond_404(self, test_client): ) assert response.status_code == 404 assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found" + + +class TestQueuedEventEndpoint(TestAssets): + @pytest.fixture + def time_freezer(self) -> Generator: + freezer = time_machine.travel(self.default_time, tick=False) + freezer.start() + + yield + + freezer.stop() + + def _create_asset_dag_run_queues(self, dag_id, asset_id, session): + adrq = AssetDagRunQueue(target_dag_id=dag_id, asset_id=asset_id) + session.add(adrq) + session.commit() + return adrq + + +class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint): + @pytest.mark.usefixtures("time_freezer") + def test_should_respond_200(self, test_client, session, create_dummy_dag): + dag, _ = create_dummy_dag() + dag_id = dag.dag_id + self.create_assets(session=session, num=1) + asset_id = 1 + self._create_asset_dag_run_queues(dag_id, asset_id, session) + + response = test_client.get( + f"/public/dags/{dag_id}/assets/queuedEvent", + ) + + assert response.status_code == 200 + assert response.json() == { + "queued_events": [ + { + "created_at": self.default_time.replace("+00:00", "Z"), + "uri": "s3://bucket/key/1", + "dag_id": "dag", + } + ], + "total_entries": 1, + } + + def test_should_respond_404(self, test_client): + dag_id = "not_exists" + + response = test_client.get( + f"/public/dags/{dag_id}/assets/queuedEvent", + ) + + assert response.status_code == 404 + assert response.json()["detail"] == "Queue event with dag_id: `not_exists` was not found" + + +class TestDeleteDagDatasetQueuedEvents(TestQueuedEventEndpoint): + @pytest.mark.usefixtures("time_freezer") + def test_should_respond_200(self, test_client, session, create_dummy_dag): + dag, _ = create_dummy_dag() + dag_id = dag.dag_id + asset_id = self._create_asset(session).id + self._create_asset_dag_run_queues(dag_id, asset_id, session) + + response = test_client.delete( + f"/public/dags/{dag_id}/assets/queuedEvent", + ) + + assert response.status_code == 204 + + def test_should_respond_404(self, test_client): + dag_id = "not_exists" + + response = test_client.delete( + f"/public/dags/{dag_id}/assets/queuedEvent", + ) + + assert response.status_code == 404 + assert response.json()["detail"] == "Queue event with dag_id: `not_exists` was not found"