Skip to content

Commit

Permalink
Add retry on error 502 and 504 (apache#42994)
Browse files Browse the repository at this point in the history
* Add retry on error 502 and 504

* fix mypy findings

* Add pytest

* Convert response code to HTTPStatus

* Add docs to retriable exception

* extend docs for AirflowHttpException

* Fix syntax and typos

* fix pytest

* fix static checks

* fix some static checks

* Fix ruff

* fix pre-commit

---------

Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <[email protected]>
  • Loading branch information
2 people authored and PaulKobow7536 committed Oct 24, 2024
1 parent 2b253d8 commit 0346615
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
33 changes: 30 additions & 3 deletions airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import json
import logging
from functools import wraps
from http import HTTPStatus
from typing import Callable, TypeVar
from urllib.parse import urlparse

Expand All @@ -40,6 +41,14 @@
logger = logging.getLogger(__name__)


class AirflowHttpException(AirflowException):
"""Raise when there is a problem during an http request on the internal API decorator."""

def __init__(self, message: str, status_code: HTTPStatus):
super().__init__(message)
self.status_code = status_code


class InternalApiConfig:
"""Stores and caches configuration for Internal API."""

Expand Down Expand Up @@ -105,10 +114,27 @@ def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]:
"""
from requests.exceptions import ConnectionError

def _is_retryable_exception(exception: BaseException) -> bool:
"""
Evaluate which exception types to retry.
This is especially demanded for cases where an application gateway or Kubernetes ingress can
not find a running instance of a webserver hosting the API (HTTP 502+504) or when the
HTTP request fails in general on network level.
Note that we want to fail on other general errors on the webserver not to send bad requests in an endless loop.
"""
retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT)
return (
isinstance(exception, AirflowHttpException)
and exception.status_code in retryable_status_codes
or isinstance(exception, (ConnectionError, NewConnectionError))
)

@tenacity.retry(
stop=tenacity.stop_after_attempt(10),
wait=tenacity.wait_exponential(min=1),
retry=tenacity.retry_if_exception_type((NewConnectionError, ConnectionError)),
retry=tenacity.retry_if_exception(_is_retryable_exception),
before_sleep=tenacity.before_log(logger, logging.WARNING),
)
def make_jsonrpc_request(method_name: str, params_json: str) -> bytes:
Expand All @@ -126,9 +152,10 @@ def make_jsonrpc_request(method_name: str, params_json: str) -> bytes:
internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint()
response = requests.post(url=internal_api_endpoint, data=json.dumps(data), headers=headers)
if response.status_code != 200:
raise AirflowException(
raise AirflowHttpException(
f"Got {response.status_code}:{response.reason} when sending "
f"the internal api request: {response.text}"
f"the internal api request: {response.text}",
HTTPStatus(response.status_code),
)
return response.content

Expand Down
23 changes: 23 additions & 0 deletions tests/api_internal/test_internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import pytest
import requests
from tenacity import RetryError

from airflow.__main__ import configure_internal_api
from airflow.api_internal.internal_api_call import InternalApiConfig, internal_api_call
Expand Down Expand Up @@ -266,6 +267,28 @@ def test_remote_classmethod_call_with_params(self, mock_requests):
assert call_kwargs["headers"]["Content-Type"] == "application/json"
assert "Authorization" in call_kwargs["headers"]

@conf_vars(
{
("core", "database_access_isolation"): "true",
("core", "internal_api_url"): "http://localhost:8888",
("database", "sql_alchemy_conn"): "none://",
}
)
@mock.patch("airflow.api_internal.internal_api_call.requests")
@mock.patch("tenacity.time.sleep")
def test_retry_on_bad_gateway(self, mock_sleep, mock_requests):
configure_internal_api(Namespace(subcommand="dag-processor"), conf)
response = requests.Response()
response.status_code = 502
response.reason = "Bad Gateway"
response._content = b"Bad Gateway"

mock_sleep = lambda *_, **__: None # noqa: F841
mock_requests.post.return_value = response
with pytest.raises(RetryError):
TestInternalApiCall.fake_method_with_params("fake-dag", task_id=123, session="session")
assert mock_requests.post.call_count == 10

@conf_vars(
{
("core", "database_access_isolation"): "true",
Expand Down

0 comments on commit 0346615

Please sign in to comment.