diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 7cb6d76ff2a3..b8e7f36d1fd4 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -115,6 +115,7 @@ def get_dag_run( raise BadRequest("DAGRunSchema error", detail=str(e)) +@mark_fastapi_migration_done @security.requires_access_dag("GET", DagAccessEntity.RUN) @security.requires_access_asset("GET") @provide_session diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 1295b33fbf76..3e317a4c7e35 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -73,7 +73,7 @@ class DagRunAssetReference(BaseModel): dag_id: str execution_date: datetime = Field(alias="logical_date") start_date: datetime - end_date: datetime + end_date: datetime | None state: str data_interval_start: datetime data_interval_end: datetime diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 23d4eeb6e408..94bb60b8efbd 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1164,6 +1164,58 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents: + get: + tags: + - DagRun + summary: Get Upstream Asset Events + description: If dag run is asset-triggered, return the asset events that triggered + it. + operationId: get_upstream_asset_events + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/AssetEventCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dags/{dag_id}/dagRuns/{dag_run_id}/clear: post: tags: @@ -4919,8 +4971,10 @@ components: format: date-time title: Start Date end_date: - type: string - format: date-time + anyOf: + - type: string + format: date-time + - type: 'null' title: End Date state: type: string diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index d95cf76f69ae..decc7ff2b285 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -30,6 +30,7 @@ ) from airflow.api_fastapi.common.db.common import get_session from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.assets import AssetEventCollectionResponse, AssetEventResponse from airflow.api_fastapi.core_api.datamodels.dag_run import ( DAGRunClearBody, DAGRunPatchBody, @@ -149,6 +150,38 @@ def patch_dag_run( return DAGRunResponse.model_validate(dag_run, from_attributes=True) +@dag_run_router.get( + "/{dag_run_id}/upstreamAssetEvents", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_404_NOT_FOUND, + ] + ), +) +def get_upstream_asset_events( + dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)] +) -> AssetEventCollectionResponse: + """If dag run is asset-triggered, return the asset events that triggered it.""" + dag_run: DagRun | None = session.scalar( + select(DagRun).where( + DagRun.dag_id == dag_id, + DagRun.run_id == dag_run_id, + ) + ) + if dag_run is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found", + ) + events = dag_run.consumed_asset_events + return AssetEventCollectionResponse( + asset_events=[ + AssetEventResponse.model_validate(asset_event, from_attributes=True) for asset_event in events + ], + total_entries=len(events), + ) + + @dag_run_router.post( "/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]) ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 60464a7580e4..6175179d11cc 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -305,6 +305,28 @@ export const UseDagRunServiceGetDagRunKeyFn = ( }, queryKey?: Array, ) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])]; +export type DagRunServiceGetUpstreamAssetEventsDefaultResponse = Awaited< + ReturnType +>; +export type DagRunServiceGetUpstreamAssetEventsQueryResult< + TData = DagRunServiceGetUpstreamAssetEventsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagRunServiceGetUpstreamAssetEventsKey = + "DagRunServiceGetUpstreamAssetEvents"; +export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: Array, +) => [ + useDagRunServiceGetUpstreamAssetEventsKey, + ...(queryKey ?? [{ dagId, dagRunId }]), +]; export type DagSourceServiceGetDagSourceDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index ac1cd93db37c..4c541670258f 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -386,6 +386,32 @@ export const prefetchUseDagRunServiceGetDagRun = ( queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }), }); +/** + * Get Upstream Asset Events + * If dag run is asset-triggered, return the asset events that triggered it. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns AssetEventCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagRunServiceGetUpstreamAssetEvents = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ + dagId, + dagRunId, + }), + queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }), + }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 68a31b0a7a11..3c6a426ee07f 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -485,6 +485,39 @@ export const useDagRunServiceGetDagRun = < queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, ...options, }); +/** + * Get Upstream Asset Events + * If dag run is asset-triggered, return the asset events that triggered it. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns AssetEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetUpstreamAssetEvents = < + TData = Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => + DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) as TData, + ...options, + }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 0c162cc42cd5..43331b187fe0 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -469,6 +469,39 @@ export const useDagRunServiceGetDagRunSuspense = < queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, ...options, }); +/** + * Get Upstream Asset Events + * If dag run is asset-triggered, return the asset events that triggered it. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns AssetEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetUpstreamAssetEventsSuspense = < + TData = Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => + DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) as TData, + ...options, + }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 61007167cdd9..17b4cf787218 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1936,8 +1936,15 @@ export const $DagRunAssetReference = { title: "Start Date", }, end_date: { - type: "string", - format: "date-time", + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], title: "End Date", }, state: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 451eac3365d0..1ebbac6aa6cc 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -45,6 +45,8 @@ import type { DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, + GetUpstreamAssetEventsData, + GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagSourceData, @@ -740,6 +742,34 @@ export class DagRunService { }); } + /** + * Get Upstream Asset Events + * If dag run is asset-triggered, return the asset events that triggered it. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns AssetEventCollectionResponse Successful Response + * @throws ApiError + */ + public static getUpstreamAssetEvents( + data: GetUpstreamAssetEventsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + /** * Clear Dag Run * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 0b221ab4ae7e..05b6e84adfca 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -426,7 +426,7 @@ export type DagRunAssetReference = { dag_id: string; logical_date: string; start_date: string; - end_date: string; + end_date: string | null; state: string; data_interval_start: string; data_interval_end: string; @@ -1129,6 +1129,13 @@ export type PatchDagRunData = { export type PatchDagRunResponse = DAGRunResponse; +export type GetUpstreamAssetEventsData = { + dagId: string; + dagRunId: string; +}; + +export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse; + export type ClearDagRunData = { dagId: string; dagRunId: string; @@ -2010,6 +2017,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents": { + get: { + req: GetUpstreamAssetEventsData; + res: { + /** + * Successful Response + */ + 200: AssetEventCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear": { post: { req: ClearDagRunData; diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index f6ee3d2dee9c..7d28a9237e34 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -22,7 +22,9 @@ import pytest from sqlalchemy import select +from airflow import Asset from airflow.models import DagRun +from airflow.models.asset import AssetEvent, AssetModel from airflow.operators.empty import EmptyOperator from airflow.utils.session import provide_session from airflow.utils.state import DagRunState, State @@ -51,6 +53,7 @@ DAG2_RUN1_TRIGGERED_BY = DagRunTriggeredByType.CLI DAG2_RUN2_TRIGGERED_BY = DagRunTriggeredByType.REST_API START_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc) +END_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc) EXECUTION_DATE = datetime(2024, 6, 16, 0, 0, tzinfo=timezone.utc) DAG1_RUN1_NOTE = "test_note" @@ -264,6 +267,78 @@ def test_delete_dag_run_not_found(self, test_client): assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" +class TestGetDagRunAssetTriggerEvents: + def test_should_respond_200(self, test_client, dag_maker, session): + asset1 = Asset(uri="ds1") + + with dag_maker(dag_id="source_dag", start_date=START_DATE, session=session): + EmptyOperator(task_id="task", outlets=[asset1]) + dr = dag_maker.create_dagrun() + ti = dr.task_instances[0] + + asset1_id = session.query(AssetModel.id).filter_by(uri=asset1.uri).scalar() + event = AssetEvent( + asset_id=asset1_id, + source_task_id=ti.task_id, + source_dag_id=ti.dag_id, + source_run_id=ti.run_id, + source_map_index=ti.map_index, + ) + session.add(event) + + with dag_maker(dag_id="TEST_DAG_ID", start_date=START_DATE, session=session): + pass + dr = dag_maker.create_dagrun(run_id="TEST_DAG_RUN_ID", run_type=DagRunType.ASSET_TRIGGERED) + dr.consumed_asset_events.append(event) + + session.commit() + assert event.timestamp + + response = test_client.get( + "/public/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents", + ) + assert response.status_code == 200 + expected_response = { + "asset_events": [ + { + "timestamp": event.timestamp.isoformat().replace("+00:00", "Z"), + "asset_id": asset1_id, + "uri": asset1.uri, + "extra": {}, + "id": event.id, + "source_dag_id": ti.dag_id, + "source_map_index": ti.map_index, + "source_run_id": ti.run_id, + "source_task_id": ti.task_id, + "created_dagruns": [ + { + "dag_id": "TEST_DAG_ID", + "run_id": "TEST_DAG_RUN_ID", + "data_interval_end": dr.data_interval_end.isoformat().replace("+00:00", "Z"), + "data_interval_start": dr.data_interval_start.isoformat().replace("+00:00", "Z"), + "end_date": None, + "logical_date": dr.logical_date.isoformat().replace("+00:00", "Z"), + "start_date": dr.start_date.isoformat().replace("+00:00", "Z"), + "state": "running", + } + ], + } + ], + "total_entries": 1, + } + assert response.json() == expected_response + + def test_should_respond_404(self, test_client): + response = test_client.get( + "public/dags/invalid-id/dagRuns/invalid-run-id/upstreamAssetEvents", + ) + assert response.status_code == 404 + assert ( + "The DagRun with dag_id: `invalid-id` and run_id: `invalid-run-id` was not found" + == response.json()["detail"] + ) + + class TestClearDagRun: def test_clear_dag_run(self, test_client): response = test_client.post(