diff --git a/providers/src/airflow/providers/http/operators/http.py b/providers/src/airflow/providers/http/operators/http.py index 0ec9fd136c71..358294af9879 100644 --- a/providers/src/airflow/providers/http/operators/http.py +++ b/providers/src/airflow/providers/http/operators/http.py @@ -114,6 +114,7 @@ def __init__( response_check: Callable[..., bool] | None = None, response_filter: Callable[..., Any] | None = None, extra_options: dict[str, Any] | None = None, + request_kwargs: dict[str, Any] | None = None, http_conn_id: str = "http_default", log_response: bool = False, auth_type: type[AuthBase] | None = None, @@ -143,6 +144,7 @@ def __init__( self.tcp_keep_alive_interval = tcp_keep_alive_interval self.deferrable = deferrable self.retry_args = retry_args + self.request_kwargs = request_kwargs or {} @property def hook(self) -> HttpHook: @@ -173,10 +175,21 @@ def execute_sync(self, context: Context) -> Any: self.log.info("Calling HTTP method") if self.retry_args: response = self.hook.run_with_advanced_retry( - self.retry_args, self.endpoint, self.data, self.headers, self.extra_options + self.retry_args, + self.endpoint, + self.data, + self.headers, + self.extra_options, + **self.request_kwargs, ) else: - response = self.hook.run(self.endpoint, self.data, self.headers, self.extra_options) + response = self.hook.run( + self.endpoint, + self.data, + self.headers, + self.extra_options, + **self.request_kwargs, + ) response = self.paginate_sync(response=response) return self.process_response(context=context, response=response) @@ -191,7 +204,8 @@ def paginate_sync(self, response: Response) -> Response | list[Response]: break if self.retry_args: response = self.hook.run_with_advanced_retry( - self.retry_args, **self._merge_next_page_parameters(next_page_params) + self.retry_args, + **self._merge_next_page_parameters(next_page_params), ) else: response = self.hook.run(**self._merge_next_page_parameters(next_page_params)) @@ -304,6 +318,7 @@ def _merge_next_page_parameters(self, next_page_params: dict) -> dict: data=data, headers=merge_dicts(self.headers, next_page_params.get("headers", {})), extra_options=merge_dicts(self.extra_options, next_page_params.get("extra_options", {})), + **self.request_kwargs, ) diff --git a/providers/src/airflow/providers/http/sensors/http.py b/providers/src/airflow/providers/http/sensors/http.py index 33b5e1d4defb..ef0d9fd380d0 100644 --- a/providers/src/airflow/providers/http/sensors/http.py +++ b/providers/src/airflow/providers/http/sensors/http.py @@ -94,6 +94,7 @@ def __init__( http_conn_id: str = "http_default", method: str = "GET", request_params: dict[str, Any] | None = None, + request_kwargs: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, response_error_codes_allowlist: list[str] | None = None, response_check: Callable[..., bool] | None = None, @@ -121,6 +122,7 @@ def __init__( self.tcp_keep_alive_count = tcp_keep_alive_count self.tcp_keep_alive_interval = tcp_keep_alive_interval self.deferrable = deferrable + self.request_kwargs = request_kwargs or {} def poke(self, context: Context) -> bool: from airflow.utils.operator_helpers import determine_kwargs @@ -141,6 +143,7 @@ def poke(self, context: Context) -> bool: data=self.request_params, headers=self.headers, extra_options=self.extra_options, + **self.request_kwargs, ) if self.response_check: