From c63cc69323d8020bbebf450292eecbbcb2579212 Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Fri, 8 Nov 2024 00:55:12 +0800 Subject: [PATCH] AIP-84 Get TI and Mapped TI dependencies (#43788) --- .../endpoints/task_instance_endpoint.py | 1 + .../core_api/openapi/v1-generated.yaml | 153 ++++++++++++++++++ .../core_api/routes/public/task_instances.py | 62 +++++++ .../core_api/serializers/task_instances.py | 13 ++ airflow/ui/openapi-gen/queries/common.ts | 50 ++++++ airflow/ui/openapi-gen/queries/prefetch.ts | 80 +++++++++ airflow/ui/openapi-gen/queries/queries.ts | 88 ++++++++++ airflow/ui/openapi-gen/queries/suspense.ts | 88 ++++++++++ .../ui/openapi-gen/requests/schemas.gen.ts | 34 ++++ .../ui/openapi-gen/requests/services.gen.ts | 70 ++++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 89 ++++++++++ .../routes/public/test_task_instances.py | 81 ++++++++++ 12 files changed, 809 insertions(+) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 87b29e90ca70..f23f37213c3d 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -721,6 +721,7 @@ def get_task_instance_dependencies( return task_dependencies_collection_schema.dump({"dependencies": deps}) +@mark_fastapi_migration_done def get_mapped_task_instance_dependencies( *, dag_id: str, dag_run_id: str, task_id: str, map_index: int ) -> APIResponse: diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index aacedc583aa4..0e9221444af2 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2407,6 +2407,133 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/dependencies: + get: + tags: + - Task Instance + summary: Get Task Instance Dependencies + description: Get dependencies blocking task from getting scheduled. + operationId: get_task_instance_dependencies + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: map_index + in: path + required: true + schema: + type: integer + title: Map Index + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TaskDependencyCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/dependencies: + get: + tags: + - Task Instance + summary: Get Task Instance Dependencies + description: Get dependencies blocking task from getting scheduled. + operationId: get_task_instance_dependencies + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: map_index + in: query + required: false + schema: + type: integer + default: -1 + title: Map Index + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TaskDependencyCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}: get: tags: @@ -4583,6 +4710,32 @@ components: - latest_scheduler_heartbeat title: SchedulerInfoSchema description: Schema for Scheduler info. + TaskDependencyCollectionResponse: + properties: + dependencies: + items: + $ref: '#/components/schemas/TaskDependencyResponse' + type: array + title: Dependencies + type: object + required: + - dependencies + title: TaskDependencyCollectionResponse + description: Task scheduling dependencies collection serializer for responses. + TaskDependencyResponse: + properties: + name: + type: string + title: Name + reason: + type: string + title: Reason + type: object + required: + - name + - reason + title: TaskDependencyResponse + description: Task Dependency serializer for responses. TaskInstanceCollectionResponse: properties: task_instances: diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow/api_fastapi/core_api/routes/public/task_instances.py index 5de0bc89954b..be76d1922548 100644 --- a/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -38,12 +38,16 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.serializers.task_instances import ( + TaskDependencyCollectionResponse, TaskInstanceCollectionResponse, TaskInstanceResponse, ) from airflow.exceptions import TaskNotFound from airflow.models.taskinstance import TaskInstance as TI +from airflow.ti_deps.dep_context import DepContext +from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS from airflow.utils.db import get_query_count +from airflow.utils.state import TaskInstanceState task_instances_router = AirflowRouter( tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" @@ -168,6 +172,64 @@ async def get_mapped_task_instances( ) +@task_instances_router.get( + "/{task_id}/dependencies", + responses=create_openapi_http_exception_doc( + [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] + ), +) +@task_instances_router.get( + "/{task_id}/{map_index}/dependencies", + responses=create_openapi_http_exception_doc( + [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] + ), +) +async def get_task_instance_dependencies( + dag_id: str, + dag_run_id: str, + task_id: str, + session: Annotated[Session, Depends(get_session)], + request: Request, + map_index: int = -1, +) -> TaskDependencyCollectionResponse: + """Get dependencies blocking task from getting scheduled.""" + query = select(TI).where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id) + + if map_index == -1: + query = query.where(TI.map_index == -1) + else: + query = query.where(TI.map_index == map_index) + + result = session.execute(query).one_or_none() + + if result is None: + error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}" + raise HTTPException(status.HTTP_404_NOT_FOUND, error_message) + + ti = result[0] + deps = [] + + if ti.state in [None, TaskInstanceState.SCHEDULED]: + dag = request.app.state.dag_bag.get_dag(ti.dag_id) + + if dag: + try: + ti.task = dag.get_task(ti.task_id) + except TaskNotFound: + pass + else: + dep_context = DepContext(SCHEDULER_QUEUED_DEPS) + deps = sorted( + [ + {"name": dep.dep_name, "reason": dep.reason} + for dep in ti.get_failed_dep_statuses(dep_context=dep_context, session=session) + ], + key=lambda x: x["name"], + ) + + return TaskDependencyCollectionResponse.model_validate({"dependencies": deps}) + + @task_instances_router.get( "/{task_id}/{map_index}", responses=create_openapi_http_exception_doc( diff --git a/airflow/api_fastapi/core_api/serializers/task_instances.py b/airflow/api_fastapi/core_api/serializers/task_instances.py index 07ef42d8e24b..47a48c88b768 100644 --- a/airflow/api_fastapi/core_api/serializers/task_instances.py +++ b/airflow/api_fastapi/core_api/serializers/task_instances.py @@ -70,3 +70,16 @@ class TaskInstanceCollectionResponse(BaseModel): task_instances: list[TaskInstanceResponse] total_entries: int + + +class TaskDependencyResponse(BaseModel): + """Task Dependency serializer for responses.""" + + name: str + reason: str + + +class TaskDependencyCollectionResponse(BaseModel): + """Task scheduling dependencies collection serializer for responses.""" + + dependencies: list[TaskDependencyResponse] diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 53d4c49419e4..953bb16291fc 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -684,6 +684,56 @@ export const UseTaskInstanceServiceGetMappedTaskInstancesKeyFn = ( }, ]), ]; +export type TaskInstanceServiceGetTaskInstanceDependenciesDefaultResponse = + Awaited>; +export type TaskInstanceServiceGetTaskInstanceDependenciesQueryResult< + TData = TaskInstanceServiceGetTaskInstanceDependenciesDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useTaskInstanceServiceGetTaskInstanceDependenciesKey = + "TaskInstanceServiceGetTaskInstanceDependencies"; +export const UseTaskInstanceServiceGetTaskInstanceDependenciesKeyFn = ( + { + dagId, + dagRunId, + mapIndex, + taskId, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + }, + queryKey?: Array, +) => [ + useTaskInstanceServiceGetTaskInstanceDependenciesKey, + ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }]), +]; +export type TaskInstanceServiceGetTaskInstanceDependencies1DefaultResponse = + Awaited>; +export type TaskInstanceServiceGetTaskInstanceDependencies1QueryResult< + TData = TaskInstanceServiceGetTaskInstanceDependencies1DefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useTaskInstanceServiceGetTaskInstanceDependencies1Key = + "TaskInstanceServiceGetTaskInstanceDependencies1"; +export const UseTaskInstanceServiceGetTaskInstanceDependencies1KeyFn = ( + { + dagId, + dagRunId, + mapIndex, + taskId, + }: { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + }, + queryKey?: Array, +) => [ + useTaskInstanceServiceGetTaskInstanceDependencies1Key, + ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }]), +]; export type TaskInstanceServiceGetMappedTaskInstanceDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 7b66bccef7c7..b8dc71dd821c 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -895,6 +895,86 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstances = ( updatedAtLte, }), }); +/** + * Get Task Instance Dependencies + * Get dependencies blocking task from getting scheduled. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @returns TaskDependencyCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseTaskInstanceServiceGetTaskInstanceDependencies = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + mapIndex, + taskId, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseTaskInstanceServiceGetTaskInstanceDependenciesKeyFn({ + dagId, + dagRunId, + mapIndex, + taskId, + }), + queryFn: () => + TaskInstanceService.getTaskInstanceDependencies({ + dagId, + dagRunId, + mapIndex, + taskId, + }), + }); +/** + * Get Task Instance Dependencies + * Get dependencies blocking task from getting scheduled. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @returns TaskDependencyCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseTaskInstanceServiceGetTaskInstanceDependencies1 = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + mapIndex, + taskId, + }: { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseTaskInstanceServiceGetTaskInstanceDependencies1KeyFn({ + dagId, + dagRunId, + mapIndex, + taskId, + }), + queryFn: () => + TaskInstanceService.getTaskInstanceDependencies1({ + dagId, + dagRunId, + mapIndex, + taskId, + }), + }); /** * Get Mapped Task Instance * Get task instance. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 19d049cf12d6..c6f4bd09dd69 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -1095,6 +1095,94 @@ export const useTaskInstanceServiceGetMappedTaskInstances = < }) as TData, ...options, }); +/** + * Get Task Instance Dependencies + * Get dependencies blocking task from getting scheduled. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @returns TaskDependencyCollectionResponse Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetTaskInstanceDependencies = < + TData = Common.TaskInstanceServiceGetTaskInstanceDependenciesDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + mapIndex, + taskId, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseTaskInstanceServiceGetTaskInstanceDependenciesKeyFn( + { dagId, dagRunId, mapIndex, taskId }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getTaskInstanceDependencies({ + dagId, + dagRunId, + mapIndex, + taskId, + }) as TData, + ...options, + }); +/** + * Get Task Instance Dependencies + * Get dependencies blocking task from getting scheduled. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @returns TaskDependencyCollectionResponse Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetTaskInstanceDependencies1 = < + TData = Common.TaskInstanceServiceGetTaskInstanceDependencies1DefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + mapIndex, + taskId, + }: { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseTaskInstanceServiceGetTaskInstanceDependencies1KeyFn( + { dagId, dagRunId, mapIndex, taskId }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getTaskInstanceDependencies1({ + dagId, + dagRunId, + mapIndex, + taskId, + }) as TData, + ...options, + }); /** * Get Mapped Task Instance * Get task instance. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index aa566e88562b..3672219a23f2 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -1080,6 +1080,94 @@ export const useTaskInstanceServiceGetMappedTaskInstancesSuspense = < }) as TData, ...options, }); +/** + * Get Task Instance Dependencies + * Get dependencies blocking task from getting scheduled. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @returns TaskDependencyCollectionResponse Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetTaskInstanceDependenciesSuspense = < + TData = Common.TaskInstanceServiceGetTaskInstanceDependenciesDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + mapIndex, + taskId, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseTaskInstanceServiceGetTaskInstanceDependenciesKeyFn( + { dagId, dagRunId, mapIndex, taskId }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getTaskInstanceDependencies({ + dagId, + dagRunId, + mapIndex, + taskId, + }) as TData, + ...options, + }); +/** + * Get Task Instance Dependencies + * Get dependencies blocking task from getting scheduled. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @returns TaskDependencyCollectionResponse Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetTaskInstanceDependencies1Suspense = < + TData = Common.TaskInstanceServiceGetTaskInstanceDependencies1DefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + mapIndex, + taskId, + }: { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseTaskInstanceServiceGetTaskInstanceDependencies1KeyFn( + { dagId, dagRunId, mapIndex, taskId }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getTaskInstanceDependencies1({ + dagId, + dagRunId, + mapIndex, + taskId, + }) as TData, + ...options, + }); /** * Get Mapped Task Instance * Get task instance. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 9acae85c690e..44bd279a1625 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2448,6 +2448,40 @@ export const $SchedulerInfoSchema = { description: "Schema for Scheduler info.", } as const; +export const $TaskDependencyCollectionResponse = { + properties: { + dependencies: { + items: { + $ref: "#/components/schemas/TaskDependencyResponse", + }, + type: "array", + title: "Dependencies", + }, + }, + type: "object", + required: ["dependencies"], + title: "TaskDependencyCollectionResponse", + description: + "Task scheduling dependencies collection serializer for responses.", +} as const; + +export const $TaskDependencyResponse = { + properties: { + name: { + type: "string", + title: "Name", + }, + reason: { + type: "string", + title: "Reason", + }, + }, + type: "object", + required: ["name", "reason"], + title: "TaskDependencyResponse", + description: "Task Dependency serializer for responses.", +} as const; + export const $TaskInstanceCollectionResponse = { properties: { task_instances: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index b37deac2009b..45be069d310b 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -82,6 +82,10 @@ import type { GetTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, + GetTaskInstanceDependenciesData, + GetTaskInstanceDependenciesResponse, + GetTaskInstanceDependencies1Data, + GetTaskInstanceDependencies1Response, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, GetTaskInstancesData, @@ -1335,6 +1339,72 @@ export class TaskInstanceService { }); } + /** + * Get Task Instance Dependencies + * Get dependencies blocking task from getting scheduled. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @returns TaskDependencyCollectionResponse Successful Response + * @throws ApiError + */ + public static getTaskInstanceDependencies( + data: GetTaskInstanceDependenciesData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/dependencies", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId, + map_index: data.mapIndex, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + + /** + * Get Task Instance Dependencies + * Get dependencies blocking task from getting scheduled. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @returns TaskDependencyCollectionResponse Successful Response + * @throws ApiError + */ + public static getTaskInstanceDependencies1( + data: GetTaskInstanceDependencies1Data, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/dependencies", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId, + }, + query: { + map_index: data.mapIndex, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + /** * Get Mapped Task Instance * Get task instance. diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 033c7a7a9bbe..8dc0a3188ca3 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -592,6 +592,21 @@ export type SchedulerInfoSchema = { latest_scheduler_heartbeat: string | null; }; +/** + * Task scheduling dependencies collection serializer for responses. + */ +export type TaskDependencyCollectionResponse = { + dependencies: Array; +}; + +/** + * Task Dependency serializer for responses. + */ +export type TaskDependencyResponse = { + name: string; + reason: string; +}; + /** * Task Instance Collection serializer for responses. */ @@ -1065,6 +1080,26 @@ export type GetMappedTaskInstancesData = { export type GetMappedTaskInstancesResponse = TaskInstanceCollectionResponse; +export type GetTaskInstanceDependenciesData = { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; +}; + +export type GetTaskInstanceDependenciesResponse = + TaskDependencyCollectionResponse; + +export type GetTaskInstanceDependencies1Data = { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; +}; + +export type GetTaskInstanceDependencies1Response = + TaskDependencyCollectionResponse; + export type GetMappedTaskInstanceData = { dagId: string; dagRunId: string; @@ -2154,6 +2189,60 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/dependencies": { + get: { + req: GetTaskInstanceDependenciesData; + res: { + /** + * Successful Response + */ + 200: TaskDependencyCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/dependencies": { + get: { + req: GetTaskInstanceDependencies1Data; + res: { + /** + * Successful Response + */ + 200: TaskDependencyCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}": { get: { req: GetMappedTaskInstanceData; diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index def34cbd4348..88d2b62ed79d 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1046,3 +1046,84 @@ def test_should_respond_200_for_pagination(self, test_client, session): assert response_batch1.json()["total_entries"] == response_batch2.json()["total_entries"] == ti_count assert (num_entries_batch1 + num_entries_batch2) == ti_count assert response_batch1 != response_batch2 + + +class TestGetTaskDependencies(TestTaskInstanceEndpoint): + def setup_method(self): + clear_db_runs() + + def teardown_method(self): + clear_db_runs() + + def test_should_respond_empty_non_scheduled(self, test_client, session): + self.create_task_instances(session) + response = test_client.get( + "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/" + "print_the_context/dependencies", + ) + assert response.status_code == 200, response.text + assert response.json() == {"dependencies": []} + + @pytest.mark.parametrize( + "state, dependencies", + [ + ( + State.SCHEDULED, + { + "dependencies": [ + { + "name": "Execution Date", + "reason": "The execution date is 2020-01-01T00:00:00+00:00 but this is " + "before the task's start date 2021-01-01T00:00:00+00:00.", + }, + { + "name": "Execution Date", + "reason": "The execution date is 2020-01-01T00:00:00+00:00 but this is " + "before the task's DAG's start date 2021-01-01T00:00:00+00:00.", + }, + ], + }, + ), + ( + State.NONE, + { + "dependencies": [ + { + "name": "Execution Date", + "reason": "The execution date is 2020-01-01T00:00:00+00:00 but this is before the task's start date 2021-01-01T00:00:00+00:00.", + }, + { + "name": "Execution Date", + "reason": "The execution date is 2020-01-01T00:00:00+00:00 but this is before the task's DAG's start date 2021-01-01T00:00:00+00:00.", + }, + {"name": "Task Instance State", "reason": "Task is in the 'None' state."}, + ] + }, + ), + ], + ) + def test_should_respond_dependencies(self, test_client, session, state, dependencies): + self.create_task_instances(session, task_instances=[{"state": state}], update_extras=True) + + response = test_client.get( + "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/" + "print_the_context/dependencies", + ) + assert response.status_code == 200, response.text + assert response.json() == dependencies + + def test_should_respond_dependencies_mapped(self, test_client, session): + tis = self.create_task_instances( + session, task_instances=[{"state": State.SCHEDULED}], update_extras=True + ) + old_ti = tis[0] + + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=0, state=old_ti.state) + session.add(ti) + session.commit() + + response = test_client.get( + "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/" + "print_the_context/0/dependencies", + ) + assert response.status_code == 200, response.text