Skip to content

Commit

Permalink
Re-queue tassk when they are stuck in queued (apache#43520)
Browse files Browse the repository at this point in the history
The old "stuck in queued" logic just failed the tasks.  Now we requeue them.  We accomplish this by revoking the task from executor and setting state to scheduled.  We'll re-queue it up to 2 times.  Number of times is configurable by hidden config.

We added a method to base executor revoke_task because, it's a discrete operation that is required for this feature, and it might be useful in other cases e.g. when detecting as zombies etc.  We set state to failed or scheduled directly from scheduler (rather than sending through the event buffer) because event buffer makes more sense for handling external events -- why round trip through the executor and back to scheduler when scheduler is initiating the action?  Anyway this avoids having to deal with "state mismatch" issues when processing events.

---------

Co-authored-by: Daniel Standish <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
  • Loading branch information
3 people authored Nov 16, 2024
1 parent bf124d0 commit a41feeb
Show file tree
Hide file tree
Showing 12 changed files with 507 additions and 94 deletions.
27 changes: 25 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple

import pendulum
from deprecated import deprecated

from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import Log
from airflow.stats import Stats
Expand Down Expand Up @@ -552,7 +554,12 @@ def terminate(self):
"""Get called when the daemon receives a SIGTERM."""
raise NotImplementedError

def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover
@deprecated(
reason="Replaced by function `revoke_task`.",
category=RemovedInAirflow3Warning,
action="ignore",
)
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Handle remnants of tasks that were failed because they were stuck in queued.
Expand All @@ -563,7 +570,23 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # p
:param tis: List of Task Instances to clean up
:return: List of readable task instances for a warning message
"""
raise NotImplementedError()
raise NotImplementedError

def revoke_task(self, *, ti: TaskInstance):
"""
Attempt to remove task from executor.
It should attempt to ensure that the task is no longer running on the worker,
and ensure that it is cleared out from internal data structures.
It should *not* change the state of the task in airflow, or add any events
to the event buffer.
It should not raise any error.
:param ti: Task instance to remove
"""
raise NotImplementedError

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
"""
Expand Down
165 changes: 131 additions & 34 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import sys
import time
from collections import Counter, defaultdict, deque
from contextlib import suppress
from datetime import timedelta
from functools import lru_cache, partial
from itertools import groupby
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator

from deprecated import deprecated
from sqlalchemy import and_, delete, exists, func, not_, select, text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
Expand All @@ -40,7 +42,7 @@
from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest
from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
from airflow.configuration import conf
from airflow.exceptions import UnknownExecutorException
from airflow.exceptions import RemovedInAirflow3Warning, UnknownExecutorException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import Job, perform_heartbeat
Expand Down Expand Up @@ -99,6 +101,9 @@
DR = DagRun
DM = DagModel

TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule"
""":meta private:"""


class ConcurrencyMap:
"""
Expand Down Expand Up @@ -184,8 +189,15 @@ def __init__(
self._standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration")
self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")

self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc")

# this param is intentionally undocumented
self._num_stuck_queued_retries = conf.getint(
section="scheduler",
key="num_stuck_in_queued_retries",
fallback=2,
)

if self._enable_tracemalloc:
import tracemalloc

Expand Down Expand Up @@ -1046,7 +1058,7 @@ def _run_scheduler_loop(self) -> None:

timers.call_regular_interval(
conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
self._fail_tasks_stuck_in_queued,
self._handle_tasks_stuck_in_queued,
)

timers.call_regular_interval(
Expand Down Expand Up @@ -1098,6 +1110,7 @@ def _run_scheduler_loop(self) -> None:
for executor in self.job.executors:
try:
# this is backcompat check if executor does not inherit from BaseExecutor
# todo: remove in airflow 3.0
if not hasattr(executor, "_task_event_logs"):
continue
with create_session() as session:
Expand Down Expand Up @@ -1767,48 +1780,132 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques
self.log.debug("callback is empty")

@provide_session
def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
"""
Mark tasks stuck in queued for longer than `task_queued_timeout` as failed.
Handle the scenario where a task is queued for longer than `task_queued_timeout`.
Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses
track of a task, a cluster can't further scale up its workers, etc.), but tasks
should not be stuck in queued for a long time. This will mark tasks stuck in
queued for longer than `self._task_queued_timeout` as failed. If the task has
available retries, it will be retried.
should not be stuck in queued for a long time.
We will attempt to requeue the task (by revoking it from executor and setting to
scheduled) up to 2 times before failing the task.
"""
self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method")
tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session)
for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items():
try:
for ti in stuck_tis:
executor.revoke_task(ti=ti)
self._maybe_requeue_stuck_ti(
ti=ti,
session=session,
)
except NotImplementedError:
# this block only gets entered if the executor has not implemented `revoke_task`.
# in which case, we try the fallback logic
# todo: remove the call to _stuck_in_queued_backcompat_logic in airflow 3.0.
# after 3.0, `cleanup_stuck_queued_tasks` will be removed, so we should
# just continue immediately.
self._stuck_in_queued_backcompat_logic(executor, stuck_tis)
continue

tasks_stuck_in_queued = session.scalars(
def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
"""Query db for TIs that are stuck in queued."""
return session.scalars(
select(TI).where(
TI.state == TaskInstanceState.QUEUED,
TI.queued_dttm < (timezone.utcnow() - timedelta(seconds=self._task_queued_timeout)),
TI.queued_by_job_id == self.job.id,
)
).all()
)

for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items():
try:
cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
for ti in stuck_tis:
if repr(ti) in cleaned_up_task_instances:
self.log.warning(
"Marking task instance %s stuck in queued as failed. "
"If the task instance has available retries, it will be retried.",
ti,
)
session.add(
Log(
event="stuck in queued",
task_instance=ti.key,
extra=(
"Task will be marked as failed. If the task instance has "
"available retries, it will be retried."
),
)
)
except NotImplementedError:
self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.")
def _maybe_requeue_stuck_ti(self, *, ti, session):
"""
Requeue task if it has not been attempted too many times.
Otherwise, fail it.
"""
num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
if num_times_stuck < self._num_stuck_queued_retries:
self.log.info("Task stuck in queued; will try to requeue. task_id=%s", ti.task_id)
session.add(
Log(
event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
task_instance=ti.key,
extra=(
f"Task was in queued state for longer than {self._task_queued_timeout} "
"seconds; task state will be set back to scheduled."
),
)
)
self._reschedule_stuck_task(ti)
else:
self.log.info(
"Task requeue attempts exceeded max; marking failed. task_instance=%s",
ti,
)
session.add(
Log(
event="stuck in queued tries exceeded",
task_instance=ti.key,
extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.",
)
)
ti.set_state(TaskInstanceState.FAILED, session=session)

@deprecated(
reason="This is backcompat layer for older executor interface. Should be removed in 3.0",
category=RemovedInAirflow3Warning,
action="ignore",
)
def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis):
"""
Try to invoke stuck in queued cleanup for older executor interface.
TODO: remove in airflow 3.0
Here we handle case where the executor pre-dates the interface change that
introduced `cleanup_tasks_stuck_in_queued` and deprecated `cleanup_stuck_queued_tasks`.
"""
with suppress(NotImplementedError):
for ti_repr in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
self.log.warning(
"Task instance %s stuck in queued. Will be set to failed.",
ti_repr,
)

@provide_session
def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
session.execute(
update(TI)
.where(TI.filter_for_tis([ti]))
.values(
state=TaskInstanceState.SCHEDULED,
queued_dttm=None,
)
.execution_options(synchronize_session=False)
)

@provide_session
def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int:
"""
Check the Log table to see how many times a taskinstance has been stuck in queued.
We can then use this information to determine whether to reschedule a task or fail it.
"""
return (
session.query(Log)
.where(
Log.task_id == ti.task_id,
Log.dag_id == ti.dag_id,
Log.run_id == ti.run_id,
Log.map_index == ti.map_index,
Log.try_number == ti.try_number,
Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
)
.count()
)

@provide_session
def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
Expand Down Expand Up @@ -2167,7 +2264,7 @@ def _activate_assets_generate_warnings() -> Iterator[tuple[str, str]]:
session.add(warning)
existing_warned_dag_ids.add(warning.dag_id)

def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]:
def _executor_to_tis(self, tis: Iterable[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]:
"""Organize TIs into lists per their respective executor."""
_executor_to_tis: defaultdict[BaseExecutor, list[TaskInstance]] = defaultdict(list)
for ti in tis:
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,7 @@ repos
repr
req
reqs
requeued
Reserialize
reserialize
reserialized
Expand Down
44 changes: 24 additions & 20 deletions providers/src/airflow/providers/celery/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple

from celery import states as celery_states
from deprecated import deprecated
from packaging.version import Version

from airflow import __version__ as airflow_version
Expand All @@ -52,7 +53,7 @@
lazy_load_command,
)
from airflow.configuration import conf
from airflow.exceptions import AirflowTaskTimeout
from airflow.exceptions import AirflowProviderDeprecationWarning, AirflowTaskTimeout
from airflow.executors.base_executor import BaseExecutor
from airflow.stats import Stats
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task

return not_adopted_tis

@deprecated(
reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.",
category=AirflowProviderDeprecationWarning,
)
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Handle remnants of tasks that were failed because they were stuck in queued.
Remove tasks stuck in queued from executor and fail them.
Tasks can get stuck in queued. If such a task is detected, it will be marked
as `UP_FOR_RETRY` if the task instance has remaining retries or marked as `FAILED`
if it doesn't.
:param tis: List of Task Instances to clean up
:return: List of readable task instances for a warning message
This method is deprecated. Use `cleanup_tasks_stuck_in_queued` instead.
"""
readable_tis = []
reprs = []
for ti in tis:
reprs.append(repr(ti))
self.revoke_task(ti=ti)
self.fail(ti.key)
return reprs

def revoke_task(self, *, ti: TaskInstance):
from airflow.providers.celery.executors.celery_executor_utils import app

for ti in tis:
readable_tis.append(repr(ti))
task_instance_key = ti.key
self.fail(task_instance_key, None)
celery_async_result = self.tasks.pop(task_instance_key, None)
if celery_async_result:
try:
app.control.revoke(celery_async_result.task_id)
except Exception as ex:
self.log.error("Error revoking task instance %s from celery: %s", task_instance_key, ex)
return readable_tis
celery_async_result = self.tasks.pop(ti.key, None)
if celery_async_result:
try:
app.control.revoke(celery_async_result.task_id)
except Exception:
self.log.exception("Error revoking task instance %s from celery", ti.key)
self.running.discard(ti.key)
self.queued_tasks.pop(ti.key, None)

@staticmethod
def get_cli_commands() -> list[GroupCommand]:
Expand Down
Loading

0 comments on commit a41feeb

Please sign in to comment.