Skip to content

Commit

Permalink
AIP-84 Get TI and Mapped TI dependencies (#43788)
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrejeambrun authored Nov 7, 2024
1 parent e502065 commit c63cc69
Show file tree
Hide file tree
Showing 12 changed files with 809 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ def get_task_instance_dependencies(
return task_dependencies_collection_schema.dump({"dependencies": deps})


@mark_fastapi_migration_done
def get_mapped_task_instance_dependencies(
*, dag_id: str, dag_run_id: str, task_id: str, map_index: int
) -> APIResponse:
Expand Down
153 changes: 153 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2407,6 +2407,133 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/dependencies:
get:
tags:
- Task Instance
summary: Get Task Instance Dependencies
description: Get dependencies blocking task from getting scheduled.
operationId: get_task_instance_dependencies
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: map_index
in: path
required: true
schema:
type: integer
title: Map Index
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskDependencyCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/dependencies:
get:
tags:
- Task Instance
summary: Get Task Instance Dependencies
description: Get dependencies blocking task from getting scheduled.
operationId: get_task_instance_dependencies
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: map_index
in: query
required: false
schema:
type: integer
default: -1
title: Map Index
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskDependencyCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}:
get:
tags:
Expand Down Expand Up @@ -4583,6 +4710,32 @@ components:
- latest_scheduler_heartbeat
title: SchedulerInfoSchema
description: Schema for Scheduler info.
TaskDependencyCollectionResponse:
properties:
dependencies:
items:
$ref: '#/components/schemas/TaskDependencyResponse'
type: array
title: Dependencies
type: object
required:
- dependencies
title: TaskDependencyCollectionResponse
description: Task scheduling dependencies collection serializer for responses.
TaskDependencyResponse:
properties:
name:
type: string
title: Name
reason:
type: string
title: Reason
type: object
required:
- name
- reason
title: TaskDependencyResponse
description: Task Dependency serializer for responses.
TaskInstanceCollectionResponse:
properties:
task_instances:
Expand Down
62 changes: 62 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.task_instances import (
TaskDependencyCollectionResponse,
TaskInstanceCollectionResponse,
TaskInstanceResponse,
)
from airflow.exceptions import TaskNotFound
from airflow.models.taskinstance import TaskInstance as TI
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.utils.db import get_query_count
from airflow.utils.state import TaskInstanceState

task_instances_router = AirflowRouter(
tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances"
Expand Down Expand Up @@ -168,6 +172,64 @@ async def get_mapped_task_instances(
)


@task_instances_router.get(
"/{task_id}/dependencies",
responses=create_openapi_http_exception_doc(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
@task_instances_router.get(
"/{task_id}/{map_index}/dependencies",
responses=create_openapi_http_exception_doc(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_task_instance_dependencies(
dag_id: str,
dag_run_id: str,
task_id: str,
session: Annotated[Session, Depends(get_session)],
request: Request,
map_index: int = -1,
) -> TaskDependencyCollectionResponse:
"""Get dependencies blocking task from getting scheduled."""
query = select(TI).where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id)

if map_index == -1:
query = query.where(TI.map_index == -1)
else:
query = query.where(TI.map_index == map_index)

result = session.execute(query).one_or_none()

if result is None:
error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}"
raise HTTPException(status.HTTP_404_NOT_FOUND, error_message)

ti = result[0]
deps = []

if ti.state in [None, TaskInstanceState.SCHEDULED]:
dag = request.app.state.dag_bag.get_dag(ti.dag_id)

if dag:
try:
ti.task = dag.get_task(ti.task_id)
except TaskNotFound:
pass
else:
dep_context = DepContext(SCHEDULER_QUEUED_DEPS)
deps = sorted(
[
{"name": dep.dep_name, "reason": dep.reason}
for dep in ti.get_failed_dep_statuses(dep_context=dep_context, session=session)
],
key=lambda x: x["name"],
)

return TaskDependencyCollectionResponse.model_validate({"dependencies": deps})


@task_instances_router.get(
"/{task_id}/{map_index}",
responses=create_openapi_http_exception_doc(
Expand Down
13 changes: 13 additions & 0 deletions airflow/api_fastapi/core_api/serializers/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,16 @@ class TaskInstanceCollectionResponse(BaseModel):

task_instances: list[TaskInstanceResponse]
total_entries: int


class TaskDependencyResponse(BaseModel):
"""Task Dependency serializer for responses."""

name: str
reason: str


class TaskDependencyCollectionResponse(BaseModel):
"""Task scheduling dependencies collection serializer for responses."""

dependencies: list[TaskDependencyResponse]
50 changes: 50 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,56 @@ export const UseTaskInstanceServiceGetMappedTaskInstancesKeyFn = (
},
]),
];
export type TaskInstanceServiceGetTaskInstanceDependenciesDefaultResponse =
Awaited<ReturnType<typeof TaskInstanceService.getTaskInstanceDependencies>>;
export type TaskInstanceServiceGetTaskInstanceDependenciesQueryResult<
TData = TaskInstanceServiceGetTaskInstanceDependenciesDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetTaskInstanceDependenciesKey =
"TaskInstanceServiceGetTaskInstanceDependencies";
export const UseTaskInstanceServiceGetTaskInstanceDependenciesKeyFn = (
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex: number;
taskId: string;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetTaskInstanceDependenciesKey,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }]),
];
export type TaskInstanceServiceGetTaskInstanceDependencies1DefaultResponse =
Awaited<ReturnType<typeof TaskInstanceService.getTaskInstanceDependencies1>>;
export type TaskInstanceServiceGetTaskInstanceDependencies1QueryResult<
TData = TaskInstanceServiceGetTaskInstanceDependencies1DefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetTaskInstanceDependencies1Key =
"TaskInstanceServiceGetTaskInstanceDependencies1";
export const UseTaskInstanceServiceGetTaskInstanceDependencies1KeyFn = (
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetTaskInstanceDependencies1Key,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }]),
];
export type TaskInstanceServiceGetMappedTaskInstanceDefaultResponse = Awaited<
ReturnType<typeof TaskInstanceService.getMappedTaskInstance>
>;
Expand Down
Loading

0 comments on commit c63cc69

Please sign in to comment.