From a41feeb5aedad842be2b0f954e0be30c767dbc5e Mon Sep 17 00:00:00 2001 From: Daniel Imberman Date: Fri, 15 Nov 2024 20:42:58 -0800 Subject: [PATCH] Re-queue tassk when they are stuck in queued (#43520) The old "stuck in queued" logic just failed the tasks. Now we requeue them. We accomplish this by revoking the task from executor and setting state to scheduled. We'll re-queue it up to 2 times. Number of times is configurable by hidden config. We added a method to base executor revoke_task because, it's a discrete operation that is required for this feature, and it might be useful in other cases e.g. when detecting as zombies etc. We set state to failed or scheduled directly from scheduler (rather than sending through the event buffer) because event buffer makes more sense for handling external events -- why round trip through the executor and back to scheduler when scheduler is initiating the action? Anyway this avoids having to deal with "state mismatch" issues when processing events. --------- Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- airflow/executors/base_executor.py | 27 ++- airflow/jobs/scheduler_job_runner.py | 165 ++++++++++++++---- docs/spelling_wordlist.txt | 1 + .../celery/executors/celery_executor.py | 44 ++--- .../executors/celery_kubernetes_executor.py | 29 ++- .../executors/kubernetes_executor.py | 53 ++++-- .../executors/kubernetes_executor_types.py | 10 ++ .../executors/kubernetes_executor_utils.py | 29 ++- .../executors/local_kubernetes_executor.py | 12 ++ .../celery/executors/test_celery_executor.py | 36 +++- .../executors/test_kubernetes_executor.py | 67 ++++++- tests/jobs/test_scheduler_job.py | 128 ++++++++++++-- 12 files changed, 507 insertions(+), 94 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 2d0198632f3b..d24fd4f15169 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -25,9 +25,11 @@ from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple import pendulum +from deprecated import deprecated from airflow.cli.cli_config import DefaultHelpParser from airflow.configuration import conf +from airflow.exceptions import RemovedInAirflow3Warning from airflow.executors.executor_loader import ExecutorLoader from airflow.models import Log from airflow.stats import Stats @@ -552,7 +554,12 @@ def terminate(self): """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError - def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover + @deprecated( + reason="Replaced by function `revoke_task`.", + category=RemovedInAirflow3Warning, + action="ignore", + ) + def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: """ Handle remnants of tasks that were failed because they were stuck in queued. @@ -563,7 +570,23 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # p :param tis: List of Task Instances to clean up :return: List of readable task instances for a warning message """ - raise NotImplementedError() + raise NotImplementedError + + def revoke_task(self, *, ti: TaskInstance): + """ + Attempt to remove task from executor. + + It should attempt to ensure that the task is no longer running on the worker, + and ensure that it is cleared out from internal data structures. + + It should *not* change the state of the task in airflow, or add any events + to the event buffer. + + It should not raise any error. + + :param ti: Task instance to remove + """ + raise NotImplementedError def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: """ diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 6c7887c643cb..de42ee01afda 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -25,12 +25,14 @@ import sys import time from collections import Counter, defaultdict, deque +from contextlib import suppress from datetime import timedelta from functools import lru_cache, partial from itertools import groupby from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator +from deprecated import deprecated from sqlalchemy import and_, delete, exists, func, not_, select, text, update from sqlalchemy.exc import OperationalError from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload @@ -40,7 +42,7 @@ from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.callbacks.pipe_callback_sink import PipeCallbackSink from airflow.configuration import conf -from airflow.exceptions import UnknownExecutorException +from airflow.exceptions import RemovedInAirflow3Warning, UnknownExecutorException from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.base_job_runner import BaseJobRunner from airflow.jobs.job import Job, perform_heartbeat @@ -99,6 +101,9 @@ DR = DagRun DM = DagModel +TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule" +""":meta private:""" + class ConcurrencyMap: """ @@ -184,8 +189,15 @@ def __init__( self._standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor") self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration") self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout") - self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc") + + # this param is intentionally undocumented + self._num_stuck_queued_retries = conf.getint( + section="scheduler", + key="num_stuck_in_queued_retries", + fallback=2, + ) + if self._enable_tracemalloc: import tracemalloc @@ -1046,7 +1058,7 @@ def _run_scheduler_loop(self) -> None: timers.call_regular_interval( conf.getfloat("scheduler", "task_queued_timeout_check_interval"), - self._fail_tasks_stuck_in_queued, + self._handle_tasks_stuck_in_queued, ) timers.call_regular_interval( @@ -1098,6 +1110,7 @@ def _run_scheduler_loop(self) -> None: for executor in self.job.executors: try: # this is backcompat check if executor does not inherit from BaseExecutor + # todo: remove in airflow 3.0 if not hasattr(executor, "_task_event_logs"): continue with create_session() as session: @@ -1767,48 +1780,132 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques self.log.debug("callback is empty") @provide_session - def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: + def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: """ - Mark tasks stuck in queued for longer than `task_queued_timeout` as failed. + Handle the scenario where a task is queued for longer than `task_queued_timeout`. Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses track of a task, a cluster can't further scale up its workers, etc.), but tasks - should not be stuck in queued for a long time. This will mark tasks stuck in - queued for longer than `self._task_queued_timeout` as failed. If the task has - available retries, it will be retried. + should not be stuck in queued for a long time. + + We will attempt to requeue the task (by revoking it from executor and setting to + scheduled) up to 2 times before failing the task. """ - self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method") + tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session) + for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): + try: + for ti in stuck_tis: + executor.revoke_task(ti=ti) + self._maybe_requeue_stuck_ti( + ti=ti, + session=session, + ) + except NotImplementedError: + # this block only gets entered if the executor has not implemented `revoke_task`. + # in which case, we try the fallback logic + # todo: remove the call to _stuck_in_queued_backcompat_logic in airflow 3.0. + # after 3.0, `cleanup_stuck_queued_tasks` will be removed, so we should + # just continue immediately. + self._stuck_in_queued_backcompat_logic(executor, stuck_tis) + continue - tasks_stuck_in_queued = session.scalars( + def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]: + """Query db for TIs that are stuck in queued.""" + return session.scalars( select(TI).where( TI.state == TaskInstanceState.QUEUED, TI.queued_dttm < (timezone.utcnow() - timedelta(seconds=self._task_queued_timeout)), TI.queued_by_job_id == self.job.id, ) - ).all() + ) - for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): - try: - cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis)) - for ti in stuck_tis: - if repr(ti) in cleaned_up_task_instances: - self.log.warning( - "Marking task instance %s stuck in queued as failed. " - "If the task instance has available retries, it will be retried.", - ti, - ) - session.add( - Log( - event="stuck in queued", - task_instance=ti.key, - extra=( - "Task will be marked as failed. If the task instance has " - "available retries, it will be retried." - ), - ) - ) - except NotImplementedError: - self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.") + def _maybe_requeue_stuck_ti(self, *, ti, session): + """ + Requeue task if it has not been attempted too many times. + + Otherwise, fail it. + """ + num_times_stuck = self._get_num_times_stuck_in_queued(ti, session) + if num_times_stuck < self._num_stuck_queued_retries: + self.log.info("Task stuck in queued; will try to requeue. task_id=%s", ti.task_id) + session.add( + Log( + event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT, + task_instance=ti.key, + extra=( + f"Task was in queued state for longer than {self._task_queued_timeout} " + "seconds; task state will be set back to scheduled." + ), + ) + ) + self._reschedule_stuck_task(ti) + else: + self.log.info( + "Task requeue attempts exceeded max; marking failed. task_instance=%s", + ti, + ) + session.add( + Log( + event="stuck in queued tries exceeded", + task_instance=ti.key, + extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.", + ) + ) + ti.set_state(TaskInstanceState.FAILED, session=session) + + @deprecated( + reason="This is backcompat layer for older executor interface. Should be removed in 3.0", + category=RemovedInAirflow3Warning, + action="ignore", + ) + def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis): + """ + Try to invoke stuck in queued cleanup for older executor interface. + + TODO: remove in airflow 3.0 + + Here we handle case where the executor pre-dates the interface change that + introduced `cleanup_tasks_stuck_in_queued` and deprecated `cleanup_stuck_queued_tasks`. + + """ + with suppress(NotImplementedError): + for ti_repr in executor.cleanup_stuck_queued_tasks(tis=stuck_tis): + self.log.warning( + "Task instance %s stuck in queued. Will be set to failed.", + ti_repr, + ) + + @provide_session + def _reschedule_stuck_task(self, ti, session=NEW_SESSION): + session.execute( + update(TI) + .where(TI.filter_for_tis([ti])) + .values( + state=TaskInstanceState.SCHEDULED, + queued_dttm=None, + ) + .execution_options(synchronize_session=False) + ) + + @provide_session + def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int: + """ + Check the Log table to see how many times a taskinstance has been stuck in queued. + + We can then use this information to determine whether to reschedule a task or fail it. + """ + return ( + session.query(Log) + .where( + Log.task_id == ti.task_id, + Log.dag_id == ti.dag_id, + Log.run_id == ti.run_id, + Log.map_index == ti.map_index, + Log.try_number == ti.try_number, + Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT, + ) + .count() + ) @provide_session def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None: @@ -2167,7 +2264,7 @@ def _activate_assets_generate_warnings() -> Iterator[tuple[str, str]]: session.add(warning) existing_warned_dag_ids.add(warning.dag_id) - def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]: + def _executor_to_tis(self, tis: Iterable[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]: """Organize TIs into lists per their respective executor.""" _executor_to_tis: defaultdict[BaseExecutor, list[TaskInstance]] = defaultdict(list) for ti in tis: diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index cc927fa15a3d..32787428cc5c 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1375,6 +1375,7 @@ repos repr req reqs +requeued Reserialize reserialize reserialized diff --git a/providers/src/airflow/providers/celery/executors/celery_executor.py b/providers/src/airflow/providers/celery/executors/celery_executor.py index 807c77ab9878..43ae2cc21339 100644 --- a/providers/src/airflow/providers/celery/executors/celery_executor.py +++ b/providers/src/airflow/providers/celery/executors/celery_executor.py @@ -35,6 +35,7 @@ from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple from celery import states as celery_states +from deprecated import deprecated from packaging.version import Version from airflow import __version__ as airflow_version @@ -52,7 +53,7 @@ lazy_load_command, ) from airflow.configuration import conf -from airflow.exceptions import AirflowTaskTimeout +from airflow.exceptions import AirflowProviderDeprecationWarning, AirflowTaskTimeout from airflow.executors.base_executor import BaseExecutor from airflow.stats import Stats from airflow.utils.state import TaskInstanceState @@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task return not_adopted_tis + @deprecated( + reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", + category=AirflowProviderDeprecationWarning, + ) def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: """ - Handle remnants of tasks that were failed because they were stuck in queued. + Remove tasks stuck in queued from executor and fail them. - Tasks can get stuck in queued. If such a task is detected, it will be marked - as `UP_FOR_RETRY` if the task instance has remaining retries or marked as `FAILED` - if it doesn't. - - :param tis: List of Task Instances to clean up - :return: List of readable task instances for a warning message + This method is deprecated. Use `cleanup_tasks_stuck_in_queued` instead. """ - readable_tis = [] + reprs = [] + for ti in tis: + reprs.append(repr(ti)) + self.revoke_task(ti=ti) + self.fail(ti.key) + return reprs + + def revoke_task(self, *, ti: TaskInstance): from airflow.providers.celery.executors.celery_executor_utils import app - for ti in tis: - readable_tis.append(repr(ti)) - task_instance_key = ti.key - self.fail(task_instance_key, None) - celery_async_result = self.tasks.pop(task_instance_key, None) - if celery_async_result: - try: - app.control.revoke(celery_async_result.task_id) - except Exception as ex: - self.log.error("Error revoking task instance %s from celery: %s", task_instance_key, ex) - return readable_tis + celery_async_result = self.tasks.pop(ti.key, None) + if celery_async_result: + try: + app.control.revoke(celery_async_result.task_id) + except Exception: + self.log.exception("Error revoking task instance %s from celery", ti.key) + self.running.discard(ti.key) + self.queued_tasks.pop(ti.key, None) @staticmethod def get_cli_commands() -> list[GroupCommand]: diff --git a/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py b/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py index a8c69871ab9c..3715a37d3d86 100644 --- a/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -20,15 +20,16 @@ from functools import cached_property from typing import TYPE_CHECKING, Sequence +from deprecated import deprecated + from airflow.configuration import conf +from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning from airflow.executors.base_executor import BaseExecutor from airflow.providers.celery.executors.celery_executor import CeleryExecutor try: from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor except ImportError as e: - from airflow.exceptions import AirflowOptionalProviderFeatureException - raise AirflowOptionalProviderFeatureException(e) from airflow.utils.providers_configuration_loader import providers_configuration_loaded @@ -246,6 +247,11 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task *self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis), ] + @deprecated( + reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", + category=AirflowProviderDeprecationWarning, + action="ignore", # ignoring since will get warning from the nested executors + ) def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: celery_tis = [ti for ti in tis if ti.queue != self.kubernetes_queue] kubernetes_tis = [ti for ti in tis if ti.queue == self.kubernetes_queue] @@ -254,6 +260,25 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: *self.kubernetes_executor.cleanup_stuck_queued_tasks(kubernetes_tis), ] + def revoke_task(self, *, ti: TaskInstance): + if ti.queue == self.kubernetes_queue: + try: + self.kubernetes_executor.revoke_task(ti=ti) + except NotImplementedError: + self.log.warning( + "Your kubernetes provider version is old. Falling back to deprecated " + "function, `cleanup_stuck_queued_tasks`. You must upgrade k8s " + "provider to enable 'stuck in queue' retries and stuck in queue " + "event logging." + ) + for ti_repr in self.kubernetes_executor.cleanup_stuck_queued_tasks(tis=[ti]): + self.log.info( + "task stuck in queued and will be marked failed. task_instance=%s", + ti_repr, + ) + else: + self.celery_executor.revoke_task(ti=ti) + def end(self) -> None: """End celery and kubernetes executor.""" self.celery_executor.end() diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 3300f3cf96d4..c465548fddf6 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -35,6 +35,7 @@ from queue import Empty, Queue from typing import TYPE_CHECKING, Any, Sequence +from deprecated import deprecated from kubernetes.dynamic import DynamicClient from sqlalchemy import or_, select, update @@ -56,6 +57,7 @@ positive_int, ) from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.executors.base_executor import BaseExecutor from airflow.executors.executor_constants import KUBERNETES_EXECUTOR from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( @@ -610,6 +612,10 @@ def _iter_tis_to_flush(): tis_to_flush.extend(_iter_tis_to_flush()) return tis_to_flush + @deprecated( + reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", + category=AirflowProviderDeprecationWarning, + ) def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: """ Handle remnants of tasks that were failed because they were stuck in queued. @@ -621,28 +627,39 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: :param tis: List of Task Instances to clean up :return: List of readable task instances for a warning message """ + reprs = [] + for ti in tis: + reprs.append(repr(ti)) + self.revoke_task(ti=ti) + self.fail(ti.key) + return reprs + + def revoke_task(self, *, ti: TaskInstance): + """ + Revoke task that may be running. + + :param ti: task instance to revoke + """ if TYPE_CHECKING: assert self.kube_client assert self.kube_scheduler - readable_tis: list[str] = [] - if not tis: - return readable_tis + self.running.discard(ti.key) + self.queued_tasks.pop(ti.key, None) pod_combined_search_str_to_pod_map = self.get_pod_combined_search_str_to_pod_map() - for ti in tis: - # Build the pod selector - base_label_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}" - if ti.map_index >= 0: - # Old tasks _couldn't_ be mapped, so we don't have to worry about compat - base_label_selector += f",map_index={ti.map_index}" - - search_str = f"{base_label_selector},run_id={ti.run_id}" - pod = pod_combined_search_str_to_pod_map.get(search_str, None) - if not pod: - self.log.warning("Cannot find pod for ti %s", ti) - continue - readable_tis.append(repr(ti)) - self.kube_scheduler.delete_pod(pod_name=pod.metadata.name, namespace=pod.metadata.namespace) - return readable_tis + # Build the pod selector + base_label_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}" + if ti.map_index >= 0: + # Old tasks _couldn't_ be mapped, so we don't have to worry about compat + base_label_selector += f",map_index={ti.map_index}" + + search_str = f"{base_label_selector},run_id={ti.run_id}" + pod = pod_combined_search_str_to_pod_map.get(search_str, None) + if not pod: + self.log.warning("Cannot find pod for ti %s", ti) + return + + self.kube_scheduler.patch_pod_revoked(pod_name=pod.metadata.name, namespace=pod.metadata.namespace) + self.kube_scheduler.delete_pod(pod_name=pod.metadata.name, namespace=pod.metadata.namespace) def adopt_launched_task( self, diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py index 422913629802..1e29861c6c45 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py @@ -35,3 +35,13 @@ ALL_NAMESPACES = "ALL_NAMESPACES" POD_EXECUTOR_DONE_KEY = "airflow_executor_done" + +POD_REVOKED_KEY = "airflow_pod_revoked" +"""Label to indicate pod revoked by executor. + +When executor the executor revokes a task, the pod deletion is the result of +the revocation. So we don't want it to process that as an external deletion. +So we want events on a revoked pod to be ignored. + +:meta private: +""" diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index ace65466b23f..ad0207cdfa3b 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -33,6 +33,7 @@ ADOPTED, ALL_NAMESPACES, POD_EXECUTOR_DONE_KEY, + POD_REVOKED_KEY, ) from airflow.providers.cncf.kubernetes.kube_client import get_kube_client from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( @@ -205,9 +206,13 @@ def process_status( resource_version: str, event: Any, ) -> None: + """Process status response.""" pod = event["object"] + + if POD_REVOKED_KEY in pod.metadata.labels.keys(): + return + annotations_string = annotations_for_logging_task_metadata(annotations) - """Process status response.""" if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp: # This will happen only when the task pods are adopted by another executor. # So, there is no change in the pod state. @@ -433,7 +438,7 @@ def run_next(self, next_job: KubernetesJobType) -> None: def delete_pod(self, pod_name: str, namespace: str) -> None: """Delete Pod from a namespace; does not raise if it does not exist.""" try: - self.log.debug("Deleting pod %s in namespace %s", pod_name, namespace) + self.log.info("Deleting pod %s in namespace %s", pod_name, namespace) self.kube_client.delete_namespaced_pod( pod_name, namespace, @@ -445,6 +450,26 @@ def delete_pod(self, pod_name: str, namespace: str) -> None: if str(e.status) != "404": raise + def patch_pod_revoked(self, *, pod_name: str, namespace: str): + """ + Patch the pod with a label that ensures it's ignored by the kubernetes watcher. + + :meta private: + """ + self.log.info( + "Patching pod %s in namespace %s to note that we are revoking the task.", + pod_name, + namespace, + ) + try: + self.kube_client.patch_namespaced_pod( + name=pod_name, + namespace=namespace, + body={"metadata": {"labels": {POD_REVOKED_KEY: "True"}}}, + ) + except ApiException: + self.log.warning("Failed to patch pod %s with pod revoked key.", pod_name, exc_info=True) + def patch_pod_executor_done(self, *, pod_name: str, namespace: str): """Add a "done" annotation to ensure we don't continually adopt pods.""" self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_name, namespace) diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index d24a59a95d10..fa1e584971ef 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Sequence +from deprecated import deprecated + from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.executors.base_executor import BaseExecutor from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor @@ -230,12 +233,21 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task *self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis), ] + @deprecated( + reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", + category=AirflowProviderDeprecationWarning, + action="ignore", # ignoring since will get warning from the nested executors + ) def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # LocalExecutor doesn't have a cleanup_stuck_queued_tasks method, so we # will only run KubernetesExecutor's kubernetes_tis = [ti for ti in tis if ti.queue == self.KUBERNETES_QUEUE] return self.kubernetes_executor.cleanup_stuck_queued_tasks(kubernetes_tis) + def revoke_task(self, *, ti: TaskInstance): + if ti.queue == self.KUBERNETES_QUEUE: + self.kubernetes_executor.revoke_task(ti=ti) + def end(self) -> None: """End local and kubernetes executor.""" self.local_executor.end() diff --git a/providers/tests/celery/executors/test_celery_executor.py b/providers/tests/celery/executors/test_celery_executor.py index 2fa72deab0aa..aa8dddcb363d 100644 --- a/providers/tests/celery/executors/test_celery_executor.py +++ b/providers/tests/celery/executors/test_celery_executor.py @@ -255,11 +255,43 @@ def test_cleanup_stuck_queued_tasks(self, mock_fail): executor.job_id = 1 executor.running = {ti.key} executor.tasks = {ti.key: AsyncResult("231")} - executor.cleanup_stuck_queued_tasks(tis) + assert executor.has_task(ti) + with pytest.warns(DeprecationWarning): + executor.cleanup_stuck_queued_tasks(tis=tis) executor.sync() assert executor.tasks == {} app.control.revoke.assert_called_once_with("231") - mock_fail.assert_called_once() + mock_fail.assert_called() + assert not executor.has_task(ti) + + @pytest.mark.backend("mysql", "postgres") + @mock.patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.fail") + def test_revoke_task(self, mock_fail): + start_date = timezone.utcnow() - timedelta(days=2) + + with DAG("test_revoke_task", schedule=None): + task = BaseOperator(task_id="task_1", start_date=start_date) + + ti = TaskInstance(task=task, run_id=None) + ti.external_executor_id = "231" + ti.state = State.QUEUED + ti.queued_dttm = timezone.utcnow() - timedelta(minutes=30) + ti.queued_by_job_id = 1 + tis = [ti] + with _prepare_app() as app: + app.control.revoke = mock.MagicMock() + executor = celery_executor.CeleryExecutor() + executor.job_id = 1 + executor.running = {ti.key} + executor.tasks = {ti.key: AsyncResult("231")} + assert executor.has_task(ti) + for ti in tis: + executor.revoke_task(ti=ti) + executor.sync() + app.control.revoke.assert_called_once_with("231") + assert executor.tasks == {} + assert not executor.has_task(ti) + mock_fail.assert_not_called() @conf_vars({("celery", "result_backend_sqlalchemy_engine_options"): '{"pool_recycle": 1800}'}) @mock.patch("celery.Celery") diff --git a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py index 14785fab37b9..e5cc8619a97b 100644 --- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1220,7 +1220,14 @@ def test_not_adopt_unassigned_task(self, mock_kube_client): @pytest.mark.db_test @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") def test_cleanup_stuck_queued_tasks(self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session): - """Delete any pods associated with a task stuck in queued.""" + """ + This verifies legacy behavior. Remove when removing ``cleanup_stuck_queued_tasks``. + + It's expected that that method, ``cleanup_stuck_queued_tasks`` will patch the pod + such that it is ignored by watcher, delete the pod, remove from running set, and + fail the task. + + """ mock_kube_client = mock.MagicMock() mock_kube_dynamic_client.return_value = mock.MagicMock() mock_pod_resource = mock.MagicMock() @@ -1261,8 +1268,64 @@ def test_cleanup_stuck_queued_tasks(self, mock_kube_dynamic_client, dag_maker, c executor.kube_scheduler = mock.MagicMock() ti.refresh_from_db() tis = [ti] - executor.cleanup_stuck_queued_tasks(tis) + with pytest.warns(DeprecationWarning): + executor.cleanup_stuck_queued_tasks(tis=tis) + executor.kube_scheduler.delete_pod.assert_called_once() + assert executor.running == set() + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") + def test_revoke_task(self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session): + """ + It's expected that that ``revoke_tasks`` will patch the pod + such that it is ignored by watcher, delete the pod and remove from running set. + """ + mock_kube_client = mock.MagicMock() + mock_kube_dynamic_client.return_value = mock.MagicMock() + mock_pod_resource = mock.MagicMock() + mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource + mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList( + items=[ + k8s.V1Pod( + metadata=k8s.V1ObjectMeta( + annotations={ + "dag_id": "test_cleanup_stuck_queued_tasks", + "task_id": "bash", + "run_id": "test", + "try_number": 0, + }, + labels={ + "role": "airflow-worker", + "dag_id": "test_cleanup_stuck_queued_tasks", + "task_id": "bash", + "airflow-worker": 123, + "run_id": "test", + "try_number": 0, + }, + ), + status=k8s.V1PodStatus(phase="Pending"), + ) + ] + ) + create_dummy_dag(dag_id="test_cleanup_stuck_queued_tasks", task_id="bash", with_dagrun_type=None) + dag_run = dag_maker.create_dagrun() + ti = dag_run.task_instances[0] + ti.state = State.QUEUED + ti.queued_by_job_id = 123 + session.flush() + + executor = self.kubernetes_executor + executor.job_id = 123 + executor.kube_client = mock_kube_client + executor.kube_scheduler = mock.MagicMock() + ti.refresh_from_db() + executor.running.add(ti.key) # so we can verify it gets removed after revoke + assert executor.has_task(task_instance=ti) + executor.revoke_task(ti=ti) + assert not executor.has_task(task_instance=ti) + executor.kube_scheduler.patch_pod_revoked.assert_called_once() executor.kube_scheduler.delete_pod.assert_called_once() + mock_kube_client.patch_namespaced_pod.calls[0] == [] assert executor.running == set() @pytest.mark.parametrize( diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 2975fc49df5d..8c9096c6849e 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -28,6 +28,7 @@ from typing import Generator from unittest import mock from unittest.mock import MagicMock, PropertyMock, patch +from uuid import uuid4 import pendulum import psutil @@ -59,6 +60,7 @@ from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest +from airflow.models.log import Log from airflow.models.pool import Pool from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance @@ -124,6 +126,19 @@ def load_examples(): # Patch the MockExecutor into the dict of known executors in the Loader +@contextlib.contextmanager +def _loader_mock(mock_executors): + with mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as loader_mock: + # The executors are mocked, so cannot be loaded/imported. Mock load_executor and return the + # correct object for the given input executor name. + loader_mock.side_effect = lambda *x: { + ("default_exec",): mock_executors[0], + (None,): mock_executors[0], + ("secondary_exec",): mock_executors[1], + }[x] + yield + + @patch.dict( ExecutorLoader.executors, {MOCK_EXECUTOR: f"{MockExecutor.__module__}.{MockExecutor.__qualname__}"} ) @@ -2189,7 +2204,18 @@ def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, mock_ # Second executor called for ti3 mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3]) - def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors): + def test_handle_stuck_queued_tasks_backcompat(self, dag_maker, session, mock_executors): + """ + Verify backward compatibility of the executor interface w.r.t. stuck queued. + + Prior to #43520, scheduler called method `cleanup_stuck_queued_tasks`, which failed tis. + + After #43520, scheduler calls `cleanup_tasks_stuck_in_queued`, which requeues tis. + + At Airflow 3.0, we should remove backcompat support for this old function. But for now + we verify that we call it as a fallback. + """ + # todo: remove in airflow 3.0 with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): op1 = EmptyOperator(task_id="op1") op2 = EmptyOperator(task_id="op2", executor="default_exec") @@ -2206,26 +2232,102 @@ def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors): scheduler_job = Job() job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0) job_runner._task_queued_timeout = 300 + mock_exec_1 = mock_executors[0] + mock_exec_2 = mock_executors[1] + mock_exec_1.revoke_task.side_effect = NotImplementedError + mock_exec_2.revoke_task.side_effect = NotImplementedError with mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as loader_mock: # The executors are mocked, so cannot be loaded/imported. Mock load_executor and return the # correct object for the given input executor name. loader_mock.side_effect = lambda *x: { - ("default_exec",): mock_executors[0], - (None,): mock_executors[0], - ("secondary_exec",): mock_executors[1], + ("default_exec",): mock_exec_1, + (None,): mock_exec_1, + ("secondary_exec",): mock_exec_2, }[x] - job_runner._fail_tasks_stuck_in_queued() + job_runner._handle_tasks_stuck_in_queued() # Default executor is called for ti1 (no explicit executor override uses default) and ti2 (where we # explicitly marked that for execution by the default executor) try: - mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti1, ti2]) + mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti1, ti2]) except AssertionError: - mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1]) - mock_executors[1].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3]) + mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1]) + mock_exec_2.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3]) + + @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"}) + def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, session, mock_executors): + """Verify that tasks stuck in queued will be rescheduled up to N times.""" + with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): + EmptyOperator(task_id="op1") + EmptyOperator(task_id="op2", executor="default_exec") + + def _queue_tasks(tis): + for ti in tis: + ti.state = "queued" + ti.queued_dttm = timezone.utcnow() + session.commit() + + run_id = str(uuid4()) + dr = dag_maker.create_dagrun(run_id=run_id) + + tis = dr.get_task_instances(session=session) + _queue_tasks(tis=tis) + scheduler_job = Job() + scheduler = SchedulerJobRunner(job=scheduler_job, num_runs=0) + # job_runner._reschedule_stuck_task = MagicMock() + scheduler._task_queued_timeout = -300 # always in violation of timeout + + with _loader_mock(mock_executors): + scheduler._handle_tasks_stuck_in_queued(session=session) + + # If the task gets stuck in queued once, we reset it to scheduled + tis = dr.get_task_instances(session=session) + assert [x.state for x in tis] == ["scheduled", "scheduled"] + assert [x.queued_dttm for x in tis] == [None, None] + + _queue_tasks(tis=tis) + log_events = [x.event for x in session.scalars(select(Log)).all()] + assert log_events == [ + "stuck in queued reschedule", + "stuck in queued reschedule", + ] + + with _loader_mock(mock_executors): + scheduler._handle_tasks_stuck_in_queued(session=session) + session.commit() + + log_events = [x.event for x in session.scalars(select(Log).where(Log.run_id == run_id)).all()] + assert log_events == [ + "stuck in queued reschedule", + "stuck in queued reschedule", + "stuck in queued reschedule", + "stuck in queued reschedule", + ] + mock_executors[0].fail.assert_not_called() + tis = dr.get_task_instances(session=session) + assert [x.state for x in tis] == ["scheduled", "scheduled"] + _queue_tasks(tis=tis) + + with _loader_mock(mock_executors): + scheduler._handle_tasks_stuck_in_queued(session=session) + session.commit() + log_events = [x.event for x in session.scalars(select(Log).where(Log.run_id == run_id)).all()] + assert log_events == [ + "stuck in queued reschedule", + "stuck in queued reschedule", + "stuck in queued reschedule", + "stuck in queued reschedule", + "stuck in queued tries exceeded", + "stuck in queued tries exceeded", + ] + + mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method + states = [x.state for x in dr.get_task_instances(session=session)] + assert states == ["failed", "failed"] - def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker, session, caplog): + def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog): + """Test that if executor no implement revoke_task then we don't blow up.""" with dag_maker("test_fail_stuck_queued_tasks"): op1 = EmptyOperator(task_id="op1") @@ -2236,12 +2338,14 @@ def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker, session session.commit() from airflow.executors.local_executor import LocalExecutor + assert "revoke_task" in BaseExecutor.__dict__ + # this is just verifying that LocalExecutor is good enough for this test + # in that it does not implement revoke_task + assert "revoke_task" not in LocalExecutor.__dict__ scheduler_job = Job(executor=LocalExecutor()) job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0) job_runner._task_queued_timeout = 300 - with caplog.at_level(logging.DEBUG): - job_runner._fail_tasks_stuck_in_queued() - assert "Executor doesn't support cleanup of stuck queued tasks. Skipping." in caplog.text + job_runner._handle_tasks_stuck_in_queued() @mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent") def test_executor_end_called(self, mock_processor_agent, mock_executors):