diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index 7915bf8b034b..ff47db883879 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -222,6 +222,7 @@ def delete_dag_asset_queued_event( ) +@mark_fastapi_migration_done @security.requires_access_asset("GET") @security.requires_access_dag("GET") @provide_session diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index 337d85547c3d..c573996eafd1 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -19,7 +19,7 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar +from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar, Union, overload from fastapi import Depends, HTTPException, Query from pendulum.parsing.exceptions import ParserError @@ -409,6 +409,27 @@ def _safe_parse_datetime(date_to_check: str) -> datetime: """ if not date_to_check: raise ValueError(f"{date_to_check} cannot be None.") + return _safe_parse_datetime_optional(date_to_check) + + +@overload +def _safe_parse_datetime_optional(date_to_check: str) -> datetime: ... + + +@overload +def _safe_parse_datetime_optional(date_to_check: None) -> None: ... + + +def _safe_parse_datetime_optional(date_to_check: str | None) -> datetime | None: + """ + Parse datetime and raise error for invalid dates. + + Allow None values. + + :param date_to_check: the string value to be parsed + """ + if date_to_check is None: + return None try: return timezone.parse(date_to_check, strict=True) except (TypeError, ParserError): @@ -614,7 +635,8 @@ def depends_float( # Common Safe DateTime -DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)] +DateTimeQuery = Annotated[datetime, AfterValidator(_safe_parse_datetime)] +OptionalDateTimeQuery = Annotated[Union[datetime, None], AfterValidator(_safe_parse_datetime_optional)] # DAG QueryLimit = Annotated[LimitFilter, Depends(LimitFilter().depends)] diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index e5ac10715ed4..bfdbb2d7fc88 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -101,6 +101,21 @@ class AssetEventCollectionResponse(BaseModel): total_entries: int +class QueuedEventResponse(BaseModel): + """Queued Event serializer for responses..""" + + uri: str + dag_id: str + created_at: datetime + + +class QueuedEventCollectionResponse(BaseModel): + """Queued Event Collection serializer for responses.""" + + queued_events: list[QueuedEventResponse] + total_entries: int + + class CreateAssetEventsBody(BaseModel): """Create asset events request.""" diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index e7762392a0c8..bdf1b8aef1bd 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -47,12 +47,14 @@ paths: required: true schema: type: string + format: date-time title: Start Date - name: end_date in: query required: true schema: type: string + format: date-time title: End Date responses: '200': @@ -170,7 +172,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/: + /public/assets: get: tags: - Asset @@ -346,6 +348,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/events: post: tags: - Asset @@ -353,11 +356,11 @@ paths: description: Create asset events. operationId: create_asset_event requestBody: - required: true content: application/json: schema: $ref: '#/components/schemas/CreateAssetEventsBody' + required: true responses: '200': description: Successful Response @@ -366,23 +369,23 @@ paths: schema: $ref: '#/components/schemas/AssetEventResponse' '401': + description: Unauthorized content: application/json: schema: $ref: '#/components/schemas/HTTPExceptionResponse' - description: Unauthorized '403': + description: Forbidden content: application/json: schema: $ref: '#/components/schemas/HTTPExceptionResponse' - description: Forbidden '404': + description: Not Found content: application/json: schema: $ref: '#/components/schemas/HTTPExceptionResponse' - description: Not Found '422': description: Validation Error content: @@ -434,6 +437,60 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/assets/queuedEvent: + get: + tags: + - Asset + summary: Get Dag Asset Queued Events + description: Get queued asset events for a DAG. + operationId: get_dag_asset_queued_events + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: before + in: query + required: false + schema: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Before + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/QueuedEventCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/backfills/: get: tags: @@ -5730,6 +5787,41 @@ components: - version title: ProviderResponse description: Provider serializer for responses. + QueuedEventCollectionResponse: + properties: + queued_events: + items: + $ref: '#/components/schemas/QueuedEventResponse' + type: array + title: Queued Events + total_entries: + type: integer + title: Total Entries + type: object + required: + - queued_events + - total_entries + title: QueuedEventCollectionResponse + description: Queued Event Collection serializer for responses. + QueuedEventResponse: + properties: + uri: + type: string + title: Uri + dag_id: + type: string + title: Dag Id + created_at: + type: string + format: date-time + title: Created At + type: object + required: + - uri + - dag_id + - created_at + title: QueuedEventResponse + description: Queued Event serializer for responses.. ReprocessBehavior: type: string enum: diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 326a387f0089..0900b0400987 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -17,6 +17,7 @@ from __future__ import annotations +from datetime import datetime from typing import Annotated from fastapi import Depends, HTTPException, status @@ -25,6 +26,7 @@ from airflow.api_fastapi.common.db.common import get_session, paginated_select from airflow.api_fastapi.common.parameters import ( + OptionalDateTimeQuery, QueryAssetDagIdPatternSearch, QueryAssetIdFilter, QueryLimit, @@ -43,18 +45,41 @@ AssetEventResponse, AssetResponse, CreateAssetEventsBody, + QueuedEventCollectionResponse, + QueuedEventResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.assets import Asset from airflow.assets.manager import asset_manager -from airflow.models.asset import AssetEvent, AssetModel +from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel from airflow.utils import timezone -assets_router = AirflowRouter(tags=["Asset"], prefix="/assets") +assets_router = AirflowRouter(tags=["Asset"]) + + +def _generate_queued_event_where_clause( + *, + dag_id: str | None = None, + uri: str | None = None, + before: datetime | None = None, +) -> list: + """Get AssetDagRunQueue where clause.""" + where_clause = [] + if dag_id is not None: + where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) + if uri is not None: + where_clause.append( + AssetDagRunQueue.asset_id.in_( + select(AssetModel.id).where(AssetModel.uri == uri), + ), + ) + if before is not None: + where_clause.append(AssetDagRunQueue.created_at < before) + return where_clause @assets_router.get( - "/", + "/assets", responses=create_openapi_http_exception_doc([401, 403, 404]), ) def get_assets( @@ -89,7 +114,7 @@ def get_assets( @assets_router.get( - "/events", + "/assets/events", responses=create_openapi_http_exception_doc([404]), ) def get_asset_events( @@ -165,7 +190,7 @@ def create_asset_event( @assets_router.get( - "/{uri:path}", + "/assets/{uri:path}", responses=create_openapi_http_exception_doc([401, 403, 404]), ) def get_asset( @@ -183,3 +208,47 @@ def get_asset( raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found") return AssetResponse.model_validate(asset, from_attributes=True) + + +@assets_router.get( + "/dags/{dag_id}/assets/queuedEvent", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_404_NOT_FOUND, + ] + ), +) +def get_dag_asset_queued_events( + dag_id: str, + session: Annotated[Session, Depends(get_session)], + before: OptionalDateTimeQuery = None, +) -> QueuedEventCollectionResponse: + """Get queued asset events for a DAG.""" + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) + query = ( + select(AssetDagRunQueue, AssetModel.uri) + .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) + .where(*where_clause) + ) + + dag_asset_queued_events_select, total_entries = paginated_select( + query, + [], + ) + adrqs = session.execute(dag_asset_queued_events_select).all() + + if not adrqs: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") + + queued_events = [ + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) + for adrq, uri in adrqs + ] + + return QueuedEventCollectionResponse( + queued_events=[ + QueuedEventResponse.model_validate(queued_event, from_attributes=True) + for queued_event in queued_events + ], + total_entries=total_entries, + ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 46940bfd3189..7b23e33f0ab4 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -129,6 +129,28 @@ export const UseAssetServiceGetAssetKeyFn = ( }, queryKey?: Array, ) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])]; +export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited< + ReturnType +>; +export type AssetServiceGetDagAssetQueuedEventsQueryResult< + TData = AssetServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useAssetServiceGetDagAssetQueuedEventsKey = + "AssetServiceGetDagAssetQueuedEvents"; +export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = ( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: Array, +) => [ + useAssetServiceGetDagAssetQueuedEventsKey, + ...(queryKey ?? [{ before, dagId }]), +]; export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 4c541670258f..0c522f36e433 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -165,6 +165,32 @@ export const prefetchUseAssetServiceGetAsset = ( queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }), queryFn: () => AssetService.getAsset({ uri }), }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( + queryClient: QueryClient, + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn({ + before, + dagId, + }), + queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }), + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index a96b09e12a79..8ec0ea9234ac 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -213,6 +213,39 @@ export const useAssetServiceGetAsset = < queryFn: () => AssetService.getAsset({ uri }) as TData, ...options, }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetDagAssetQueuedEvents = < + TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn( + { before, dagId }, + queryKey, + ), + queryFn: () => + AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 43331b187fe0..1b8142228153 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -195,6 +195,39 @@ export const useAssetServiceGetAssetSuspense = < queryFn: () => AssetService.getAsset({ uri }) as TData, ...options, }); +/** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetDagAssetQueuedEventsSuspense = < + TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + dagId, + }: { + before?: string; + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn( + { before, dagId }, + queryKey, + ), + queryFn: () => + AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index e5ac0441a2aa..1f83e434286b 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2880,6 +2880,48 @@ export const $ProviderResponse = { description: "Provider serializer for responses.", } as const; +export const $QueuedEventCollectionResponse = { + properties: { + queued_events: { + items: { + $ref: "#/components/schemas/QueuedEventResponse", + }, + type: "array", + title: "Queued Events", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["queued_events", "total_entries"], + title: "QueuedEventCollectionResponse", + description: "Queued Event Collection serializer for responses.", +} as const; + +export const $QueuedEventResponse = { + properties: { + uri: { + type: "string", + title: "Uri", + }, + dag_id: { + type: "string", + title: "Dag Id", + }, + created_at: { + type: "string", + format: "date-time", + title: "Created At", + }, + }, + type: "object", + required: ["uri", "dag_id", "created_at"], + title: "QueuedEventResponse", + description: "Queued Event serializer for responses..", +} as const; + export const $ReprocessBehavior = { type: "string", enum: ["failed", "completed", "none"], diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 53bb3527d142..c39dce38d34d 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -13,6 +13,8 @@ import type { CreateAssetEventResponse, GetAssetData, GetAssetResponse, + GetDagAssetQueuedEventsData, + GetDagAssetQueuedEventsResponse, HistoricalMetricsData, HistoricalMetricsResponse, RecentDagRunsData, @@ -167,7 +169,7 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/assets/", + url: "/public/assets", query: { limit: data.limit, offset: data.offset, @@ -237,7 +239,7 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "POST", - url: "/public/assets/events", + url: "/public/events", body: data.requestBody, mediaType: "application/json", errors: { @@ -274,6 +276,36 @@ export class AssetService { }, }); } + + /** + * Get Dag Asset Queued Events + * Get queued asset events for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ + public static getDagAssetQueuedEvents( + data: GetDagAssetQueuedEventsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/assets/queuedEvent", + path: { + dag_id: data.dagId, + }, + query: { + before: data.before, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class DashboardService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 078699cc0f2b..96d6b812897f 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -712,6 +712,23 @@ export type ProviderResponse = { version: string; }; +/** + * Queued Event Collection serializer for responses. + */ +export type QueuedEventCollectionResponse = { + queued_events: Array; + total_entries: number; +}; + +/** + * Queued Event serializer for responses.. + */ +export type QueuedEventResponse = { + uri: string; + dag_id: string; + created_at: string; +}; + /** * Internal enum for setting reprocess behavior in a backfill. * @@ -1045,6 +1062,13 @@ export type GetAssetData = { export type GetAssetResponse = AssetResponse; +export type GetDagAssetQueuedEventsData = { + before?: string | null; + dagId: string; +}; + +export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse; + export type HistoricalMetricsData = { endDate: string; startDate: string; @@ -1535,7 +1559,7 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/": { + "/public/assets": { get: { req: GetAssetsData; res: { @@ -1588,6 +1612,8 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; + }; + "/public/events": { post: { req: CreateAssetEventData; res: { @@ -1641,6 +1667,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/assets/queuedEvent": { + get: { + req: GetDagAssetQueuedEventsData; + res: { + /** + * Successful Response + */ + 200: QueuedEventCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/ui/dashboard/historical_metrics_data": { get: { req: HistoricalMetricsData; diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 42b7acd908ff..b5744e47edf8 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -24,7 +24,13 @@ import time_machine from airflow.models import DagModel -from airflow.models.asset import AssetEvent, AssetModel, DagScheduleAssetReference, TaskOutletAssetReference +from airflow.models.asset import ( + AssetDagRunQueue, + AssetEvent, + AssetModel, + DagScheduleAssetReference, + TaskOutletAssetReference, +) from airflow.models.dagrun import DagRun from airflow.utils import timezone from airflow.utils.session import provide_session @@ -464,7 +470,7 @@ def test_should_respond_404(self, test_client): assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found" -class TestPostAssetEvents(TestAssets): +class TestQueuedEventEndpoint(TestAssets): @pytest.fixture def time_freezer(self) -> Generator: freezer = time_machine.travel(self.default_time, tick=False) @@ -474,6 +480,50 @@ def time_freezer(self) -> Generator: freezer.stop() + def _create_asset_dag_run_queues(self, dag_id, asset_id, session): + adrq = AssetDagRunQueue(target_dag_id=dag_id, asset_id=asset_id) + session.add(adrq) + session.commit() + return adrq + + +class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint): + @pytest.mark.usefixtures("time_freezer") + def test_should_respond_200(self, test_client, session, create_dummy_dag): + dag, _ = create_dummy_dag() + dag_id = dag.dag_id + self.create_assets(session=session, num=1) + asset_id = 1 + self._create_asset_dag_run_queues(dag_id, asset_id, session) + + response = test_client.get( + f"/public/dags/{dag_id}/assets/queuedEvent", + ) + + assert response.status_code == 200 + assert response.json() == { + "queued_events": [ + { + "created_at": self.default_time.replace("+00:00", "Z"), + "uri": "s3://bucket/key/1", + "dag_id": "dag", + } + ], + "total_entries": 1, + } + + def test_should_respond_404(self, test_client): + dag_id = "not_exists" + + response = test_client.get( + f"/public/dags/{dag_id}/assets/queuedEvent", + ) + + assert response.status_code == 404 + assert response.json()["detail"] == "Queue event with dag_id: `not_exists` was not found" + + +class TestPostAssetEvents(TestAssets): @pytest.mark.usefixtures("time_freezer") def test_should_respond_200(self, test_client, session): self.create_assets()