diff --git a/airflow/api_connexion/endpoints/event_log_endpoint.py b/airflow/api_connexion/endpoints/event_log_endpoint.py index ef55ad145c75..8084c2ecab67 100644 --- a/airflow/api_connexion/endpoints/event_log_endpoint.py +++ b/airflow/api_connexion/endpoints/event_log_endpoint.py @@ -31,6 +31,7 @@ from airflow.auth.managers.models.resource_details import DagAccessEntity from airflow.models import Log from airflow.utils import timezone +from airflow.utils.api_migration import mark_fastapi_migration_done from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session @@ -40,6 +41,7 @@ from airflow.api_connexion.types import APIResponse +@mark_fastapi_migration_done @security.requires_access_dag("GET", DagAccessEntity.AUDIT_LOG) @provide_session def get_event_log(*, event_log_id: int, session: Session = NEW_SESSION) -> APIResponse: diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 136d6a4ed027..3d1b1611ad8c 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1506,6 +1506,50 @@ paths: application/json: schema: $ref: '#/components/schemas/VersionInfo' + /public/eventLogs/{event_log_id}: + get: + tags: + - Event Log + summary: Get Event Log + operationId: get_event_log + parameters: + - name: event_log_id + in: path + required: true + schema: + type: integer + title: Event Log Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/EventLogResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' components: schemas: AppBuilderMenuItemResponse: @@ -2415,6 +2459,74 @@ components: title: DagTagPydantic description: Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API. + EventLogResponse: + properties: + event_log_id: + type: integer + title: Event Log Id + when: + type: string + format: date-time + title: When + dag_id: + anyOf: + - type: string + - type: 'null' + title: Dag Id + task_id: + anyOf: + - type: string + - type: 'null' + title: Task Id + run_id: + anyOf: + - type: string + - type: 'null' + title: Run Id + map_index: + anyOf: + - type: integer + - type: 'null' + title: Map Index + try_number: + anyOf: + - type: integer + - type: 'null' + title: Try Number + event: + type: string + title: Event + logical_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date + owner: + anyOf: + - type: string + - type: 'null' + title: Owner + extra: + anyOf: + - type: string + - type: 'null' + title: Extra + type: object + required: + - event_log_id + - when + - dag_id + - task_id + - run_id + - map_index + - try_number + - event + - logical_date + - owner + - extra + title: EventLogResponse + description: Event Log Response. FastAPIAppResponse: properties: app: diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py b/airflow/api_fastapi/core_api/routes/public/__init__.py index ab307409add0..fe32ecc9e27d 100644 --- a/airflow/api_fastapi/core_api/routes/public/__init__.py +++ b/airflow/api_fastapi/core_api/routes/public/__init__.py @@ -21,6 +21,7 @@ from airflow.api_fastapi.core_api.routes.public.connections import connections_router from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router from airflow.api_fastapi.core_api.routes.public.dags import dags_router +from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router from airflow.api_fastapi.core_api.routes.public.monitor import monitor_router from airflow.api_fastapi.core_api.routes.public.plugins import plugins_router from airflow.api_fastapi.core_api.routes.public.pools import pools_router @@ -40,3 +41,4 @@ public_router.include_router(providers_router) public_router.include_router(plugins_router) public_router.include_router(version_router) +public_router.include_router(event_logs_router) diff --git a/airflow/api_fastapi/core_api/routes/public/event_logs.py b/airflow/api_fastapi/core_api/routes/public/event_logs.py new file mode 100644 index 000000000000..75f12cbefb03 --- /dev/null +++ b/airflow/api_fastapi/core_api/routes/public/event_logs.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from fastapi import Depends, HTTPException +from sqlalchemy import select +from sqlalchemy.orm import Session +from typing_extensions import Annotated + +from airflow.api_fastapi.common.db.common import ( + get_session, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.serializers.event_logs import ( + EventLogResponse, +) +from airflow.models import Log + +event_logs_router = AirflowRouter(tags=["Event Log"], prefix="/eventLogs") + + +@event_logs_router.get( + "/{event_log_id}", + responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_event_log( + event_log_id: int, + session: Annotated[Session, Depends(get_session)], +) -> EventLogResponse: + event_log = session.scalar(select(Log).where(Log.id == event_log_id)) + if event_log is None: + raise HTTPException(404, f"The Event Log with id: `{event_log_id}` not found") + return EventLogResponse.model_validate( + event_log, + from_attributes=True, + ) diff --git a/airflow/api_fastapi/core_api/serializers/event_logs.py b/airflow/api_fastapi/core_api/serializers/event_logs.py new file mode 100644 index 000000000000..e295dc35061f --- /dev/null +++ b/airflow/api_fastapi/core_api/serializers/event_logs.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict, Field + + +class EventLogResponse(BaseModel): + """Event Log Response.""" + + id: int = Field(alias="event_log_id") + dttm: datetime = Field(alias="when") + dag_id: str | None + task_id: str | None + run_id: str | None + map_index: int | None + try_number: int | None + event: str + execution_date: datetime | None = Field(alias="logical_date") + owner: str | None + extra: str | None + + model_config = ConfigDict(populate_by_name=True) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 5fa46e4e91f0..959b476718d2 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -8,6 +8,7 @@ import { DagService, DagsService, DashboardService, + EventLogService, MonitorService, PluginService, PoolService, @@ -409,6 +410,22 @@ export const UseVersionServiceGetVersionKeyFn = (queryKey?: Array) => [ useVersionServiceGetVersionKey, ...(queryKey ?? []), ]; +export type EventLogServiceGetEventLogDefaultResponse = Awaited< + ReturnType +>; +export type EventLogServiceGetEventLogQueryResult< + TData = EventLogServiceGetEventLogDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useEventLogServiceGetEventLogKey = "EventLogServiceGetEventLog"; +export const UseEventLogServiceGetEventLogKeyFn = ( + { + eventLogId, + }: { + eventLogId: number; + }, + queryKey?: Array, +) => [useEventLogServiceGetEventLogKey, ...(queryKey ?? [{ eventLogId }])]; export type VariableServicePostVariableMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 72b4376751f7..350f9deddcf7 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -8,6 +8,7 @@ import { DagService, DagsService, DashboardService, + EventLogService, MonitorService, PluginService, PoolService, @@ -512,3 +513,22 @@ export const prefetchUseVersionServiceGetVersion = (queryClient: QueryClient) => queryKey: Common.UseVersionServiceGetVersionKeyFn(), queryFn: () => VersionService.getVersion(), }); +/** + * Get Event Log + * @param data The data for the request. + * @param data.eventLogId + * @returns EventLogResponse Successful Response + * @throws ApiError + */ +export const prefetchUseEventLogServiceGetEventLog = ( + queryClient: QueryClient, + { + eventLogId, + }: { + eventLogId: number; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseEventLogServiceGetEventLogKeyFn({ eventLogId }), + queryFn: () => EventLogService.getEventLog({ eventLogId }), + }); diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index ce319bc6cd67..a3aed2e79371 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -13,6 +13,7 @@ import { DagService, DagsService, DashboardService, + EventLogService, MonitorService, PluginService, PoolService, @@ -663,6 +664,34 @@ export const useVersionServiceGetVersion = < queryFn: () => VersionService.getVersion() as TData, ...options, }); +/** + * Get Event Log + * @param data The data for the request. + * @param data.eventLogId + * @returns EventLogResponse Successful Response + * @throws ApiError + */ +export const useEventLogServiceGetEventLog = < + TData = Common.EventLogServiceGetEventLogDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + eventLogId, + }: { + eventLogId: number; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseEventLogServiceGetEventLogKeyFn( + { eventLogId }, + queryKey, + ), + queryFn: () => EventLogService.getEventLog({ eventLogId }) as TData, + ...options, + }); /** * Post Variable * Create a variable. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index cd7bc95fa5b5..16f8ca003038 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -8,6 +8,7 @@ import { DagService, DagsService, DashboardService, + EventLogService, MonitorService, PluginService, PoolService, @@ -652,3 +653,31 @@ export const useVersionServiceGetVersionSuspense = < queryFn: () => VersionService.getVersion() as TData, ...options, }); +/** + * Get Event Log + * @param data The data for the request. + * @param data.eventLogId + * @returns EventLogResponse Successful Response + * @throws ApiError + */ +export const useEventLogServiceGetEventLogSuspense = < + TData = Common.EventLogServiceGetEventLogDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + eventLogId, + }: { + eventLogId: number; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseEventLogServiceGetEventLogKeyFn( + { eventLogId }, + queryKey, + ), + queryFn: () => EventLogService.getEventLog({ eventLogId }) as TData, + ...options, + }); diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 4bbf448ca8ee..b1a5b267e11e 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1449,6 +1449,129 @@ export const $DagTagPydantic = { "Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API.", } as const; +export const $EventLogResponse = { + properties: { + event_log_id: { + type: "integer", + title: "Event Log Id", + }, + when: { + type: "string", + format: "date-time", + title: "When", + }, + dag_id: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Dag Id", + }, + task_id: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Task Id", + }, + run_id: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Run Id", + }, + map_index: { + anyOf: [ + { + type: "integer", + }, + { + type: "null", + }, + ], + title: "Map Index", + }, + try_number: { + anyOf: [ + { + type: "integer", + }, + { + type: "null", + }, + ], + title: "Try Number", + }, + event: { + type: "string", + title: "Event", + }, + logical_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Logical Date", + }, + owner: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Owner", + }, + extra: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Extra", + }, + }, + type: "object", + required: [ + "event_log_id", + "when", + "dag_id", + "task_id", + "run_id", + "map_index", + "try_number", + "event", + "logical_date", + "owner", + "extra", + ], + title: "EventLogResponse", + description: "Event Log Response.", +} as const; + export const $FastAPIAppResponse = { properties: { app: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 08e93457c4f6..4db1e052a202 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -59,6 +59,8 @@ import type { GetPluginsData, GetPluginsResponse, GetVersionResponse, + GetEventLogData, + GetEventLogResponse, } from "./types.gen"; export class AssetService { @@ -894,3 +896,30 @@ export class VersionService { }); } } + +export class EventLogService { + /** + * Get Event Log + * @param data The data for the request. + * @param data.eventLogId + * @returns EventLogResponse Successful Response + * @throws ApiError + */ + public static getEventLog( + data: GetEventLogData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/eventLogs/{event_log_id}", + path: { + event_log_id: data.eventLogId, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } +} diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index e3df16ea8298..288916b3928e 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -294,6 +294,23 @@ export type DagTagPydantic = { dag_id: string; }; +/** + * Event Log Response. + */ +export type EventLogResponse = { + event_log_id: number; + when: string; + dag_id: string | null; + task_id: string | null; + run_id: string | null; + map_index: number | null; + try_number: number | null; + event: string; + logical_date: string | null; + owner: string | null; + extra: string | null; +}; + /** * Serializer for Plugin FastAPI App responses. */ @@ -715,6 +732,12 @@ export type GetPluginsResponse = PluginCollectionResponse; export type GetVersionResponse = VersionInfo; +export type GetEventLogData = { + eventLogId: number; +}; + +export type GetEventLogResponse = EventLogResponse; + export type $OpenApiTs = { "/ui/next_run_assets/{dag_id}": { get: { @@ -1397,4 +1420,31 @@ export type $OpenApiTs = { }; }; }; + "/public/eventLogs/{event_log_id}": { + get: { + req: GetEventLogData; + res: { + /** + * Successful Response + */ + 200: EventLogResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; }; diff --git a/tests/api_fastapi/core_api/routes/public/test_event_logs.py b/tests/api_fastapi/core_api/routes/public/test_event_logs.py new file mode 100644 index 000000000000..c329015b9f9e --- /dev/null +++ b/tests/api_fastapi/core_api/routes/public/test_event_logs.py @@ -0,0 +1,177 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +from airflow.models.log import Log +from airflow.utils.session import provide_session + +from tests_common.test_utils.db import clear_db_logs, clear_db_runs + +pytestmark = pytest.mark.db_test + +DAG_ID = "TEST_DAG_ID" +DAG_RUN_ID = "TEST_DAG_RUN_ID" +TASK_ID = "TEST_TASK_ID" +DAG_EXECUTION_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc) +OWNER = "TEST_OWNER" +OWNER_DISPLAY_NAME = "Test Owner" +OWNER_AIRFLOW = "airflow" +TASK_INSTANCE_EVENT = "TASK_INSTANCE_EVENT" +TASK_INSTANCE_OWNER = "TASK_INSTANCE_OWNER" +TASK_INSTANCE_OWNER_DISPLAY_NAME = "Task Instance Owner" + + +EVENT_NORMAL = "NORMAL_EVENT" +EVENT_WITH_OWNER = "EVENT_WITH_OWNER" +EVENT_WITH_TASK_INSTANCE = "EVENT_WITH_TASK_INSTANCE" +EVENT_WITH_OWNER_AND_TASK_INSTANCE = "EVENT_WITH_OWNER_AND_TASK_INSTANCE" +EVENT_NON_EXISTED_ID = 9999 + + +class TestEventLogsEndpoint: + """Common class for /public/eventLogs related unit tests.""" + + @staticmethod + def _clear_db(): + clear_db_logs() + clear_db_runs() + + @pytest.fixture(autouse=True) + @provide_session + def setup(self, create_task_instance, session=None) -> dict[str, Log]: + """ + Setup event logs for testing. + :return: Dictionary with event log keys and their corresponding IDs. + """ + self._clear_db() + # create task instances for testing + task_instance = create_task_instance( + session=session, + dag_id=DAG_ID, + task_id=TASK_ID, + run_id=DAG_RUN_ID, + execution_date=DAG_EXECUTION_DATE, + ) + normal_log = Log( + event=EVENT_NORMAL, + ) + log_with_owner = Log( + event=EVENT_WITH_OWNER, + owner=OWNER, + owner_display_name=OWNER_DISPLAY_NAME, + ) + log_with_task_instance = Log( + event=TASK_INSTANCE_EVENT, + task_instance=task_instance, + ) + log_with_owner_and_task_instance = Log( + event=EVENT_WITH_OWNER_AND_TASK_INSTANCE, + owner=OWNER, + owner_display_name=OWNER_DISPLAY_NAME, + task_instance=task_instance, + ) + session.add_all( + [normal_log, log_with_owner, log_with_task_instance, log_with_owner_and_task_instance] + ) + session.commit() + return { + EVENT_NORMAL: normal_log, + EVENT_WITH_OWNER: log_with_owner, + TASK_INSTANCE_EVENT: log_with_task_instance, + EVENT_WITH_OWNER_AND_TASK_INSTANCE: log_with_owner_and_task_instance, + } + + def teardown_method(self) -> None: + self._clear_db() + + +class TestGetEventLog(TestEventLogsEndpoint): + @pytest.mark.parametrize( + "event_log_key, expected_status_code, expected_body", + [ + ( + EVENT_NORMAL, + 200, + { + "event": EVENT_NORMAL, + }, + ), + ( + EVENT_WITH_OWNER, + 200, + { + "event": EVENT_WITH_OWNER, + "owner": OWNER, + }, + ), + ( + TASK_INSTANCE_EVENT, + 200, + { + "dag_id": DAG_ID, + "event": TASK_INSTANCE_EVENT, + "map_index": -1, + "owner": OWNER_AIRFLOW, + "run_id": DAG_RUN_ID, + "task_id": TASK_ID, + }, + ), + ( + EVENT_WITH_OWNER_AND_TASK_INSTANCE, + 200, + { + "dag_id": DAG_ID, + "event": EVENT_WITH_OWNER_AND_TASK_INSTANCE, + "map_index": -1, + "owner": OWNER, + "run_id": DAG_RUN_ID, + "task_id": TASK_ID, + "try_number": 0, + }, + ), + ("not_existed_event_log_key", 404, {}), + ], + ) + def test_get_event_log(self, test_client, setup, event_log_key, expected_status_code, expected_body): + event_log: Log | None = setup.get(event_log_key, None) + event_log_id = event_log.id if event_log else EVENT_NON_EXISTED_ID + response = test_client.get(f"/public/eventLogs/{event_log_id}") + assert response.status_code == expected_status_code + if expected_status_code != 200: + return + + expected_json = { + "event_log_id": event_log_id, + "when": event_log.dttm.isoformat().replace("+00:00", "Z") if event_log.dttm else None, + "dag_id": expected_body.get("dag_id"), + "task_id": expected_body.get("task_id"), + "run_id": expected_body.get("run_id"), + "map_index": event_log.map_index, + "try_number": event_log.try_number, + "event": expected_body.get("event"), + "logical_date": event_log.execution_date.isoformat().replace("+00:00", "Z") + if event_log.execution_date + else None, + "owner": expected_body.get("owner"), + "extra": expected_body.get("extra"), + } + + assert response.json() == expected_json