From e6c27e08007c4acf2a3fa9f903c52c4656b9ea62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Em=C3=ADdio=20Neto?= <9735060+emdneto@users.noreply.github.com> Date: Wed, 17 Jul 2024 14:46:31 -0300 Subject: [PATCH] Add support to instrument httpx when using proxy (#2664) --- CHANGELOG.md | 4 +- .../instrumentation/httpx/__init__.py | 69 +++++++++ .../tests/test_httpx_integration.py | 139 +++++++++++++++++- 3 files changed, 205 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2de76b2736..98bbd3a0bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2630](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2630)) - `opentelemetry-instrumentation-system-metrics` Add support for capture open file descriptors ([#2652](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2652)) +- `opentelemetry-instrumentation-httpx` Add support for instrument client with proxy + ([#2664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2664)) - `opentelemetry-instrumentation-aiohttp-client` Implement new semantic convention opt-in migration ([#2673](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2673)) - `opentelemetry-instrumentation-django` Add `http.target` to Django duration metric attributes @@ -63,7 +65,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-asgi` Fix generation of `http.target` and `http.url` attributes for ASGI apps using sub apps ([#2477](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2477)) -- `opentelemetry-instrumentation-aws-lambda` Bugfix: AWS Lambda event source key incorrect for SNS in instrumentation library. +- `opentelemetry-instrumentation-aws-lambda` Bugfix: AWS Lambda event source key incorrect for SNS in instrumentation library. ([#2612](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2612)) - `opentelemetry-instrumentation-asyncio` instrumented `asyncio.wait_for` properly raises `asyncio.TimeoutError` as expected ([#2637](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2637)) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index d2ff0be292..e3ce383d7e 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -640,6 +640,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._original_transport = self._transport + self._original_mounts = self._mounts.copy() self._is_instrumented_by_opentelemetry = True self._transport = SyncOpenTelemetryTransport( @@ -648,6 +649,21 @@ def __init__(self, *args, **kwargs): request_hook=_InstrumentedClient._request_hook, response_hook=_InstrumentedClient._response_hook, ) + self._mounts.update( + { + url_pattern: ( + SyncOpenTelemetryTransport( + transport, + tracer_provider=_InstrumentedClient._tracer_provider, + request_hook=_InstrumentedClient._request_hook, + response_hook=_InstrumentedClient._response_hook, + ) + if transport is not None + else transport + ) + for url_pattern, transport in self._original_mounts.items() + } + ) class _InstrumentedAsyncClient(httpx.AsyncClient): @@ -659,6 +675,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._original_transport = self._transport + self._original_mounts = self._mounts.copy() self._is_instrumented_by_opentelemetry = True self._transport = AsyncOpenTelemetryTransport( @@ -668,6 +685,22 @@ def __init__(self, *args, **kwargs): response_hook=_InstrumentedAsyncClient._response_hook, ) + self._mounts.update( + { + url_pattern: ( + AsyncOpenTelemetryTransport( + transport, + tracer_provider=_InstrumentedAsyncClient._tracer_provider, + request_hook=_InstrumentedAsyncClient._request_hook, + response_hook=_InstrumentedAsyncClient._response_hook, + ) + if transport is not None + else transport + ) + for url_pattern, transport in self._original_mounts.items() + } + ) + class HTTPXClientInstrumentor(BaseInstrumentor): # pylint: disable=protected-access,attribute-defined-outside-init @@ -752,6 +785,7 @@ def instrument_client( if not client._is_instrumented_by_opentelemetry: if isinstance(client, httpx.Client): client._original_transport = client._transport + client._original_mounts = client._mounts.copy() transport = client._transport or httpx.HTTPTransport() client._transport = SyncOpenTelemetryTransport( transport, @@ -760,8 +794,25 @@ def instrument_client( response_hook=response_hook, ) client._is_instrumented_by_opentelemetry = True + client._mounts.update( + { + url_pattern: ( + SyncOpenTelemetryTransport( + transport, + tracer_provider=tracer_provider, + request_hook=request_hook, + response_hook=response_hook, + ) + if transport is not None + else transport + ) + for url_pattern, transport in client._original_mounts.items() + } + ) + if isinstance(client, httpx.AsyncClient): transport = client._transport or httpx.AsyncHTTPTransport() + client._original_mounts = client._mounts.copy() client._transport = AsyncOpenTelemetryTransport( transport, tracer_provider=tracer_provider, @@ -769,6 +820,21 @@ def instrument_client( response_hook=response_hook, ) client._is_instrumented_by_opentelemetry = True + client._mounts.update( + { + url_pattern: ( + AsyncOpenTelemetryTransport( + transport, + tracer_provider=tracer_provider, + request_hook=request_hook, + response_hook=response_hook, + ) + if transport is not None + else transport + ) + for url_pattern, transport in client._original_mounts.items() + } + ) else: _logger.warning( "Attempting to instrument Httpx client while already instrumented" @@ -787,6 +853,9 @@ def uninstrument_client( client._transport = client._original_transport del client._original_transport client._is_instrumented_by_opentelemetry = False + if hasattr(client, "_original_mounts"): + client._mounts = client._original_mounts.copy() + del client._original_mounts else: _logger.warning( "Attempting to uninstrument Httpx " diff --git a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py index 84bab598e6..03141e61b5 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py @@ -530,6 +530,7 @@ def create_transport( tracer_provider: typing.Optional["TracerProvider"] = None, request_hook: typing.Optional["RequestHook"] = None, response_hook: typing.Optional["ResponseHook"] = None, + **kwargs, ): pass @@ -539,6 +540,7 @@ def create_client( transport: typing.Union[ SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport, None ] = None, + **kwargs, ): pass @@ -643,6 +645,30 @@ def test_not_recording_not_set_attribute_in_exception_new_semconv( self.assertFalse(mock_span.set_attribute.called) self.assertFalse(mock_span.set_status.called) + @respx.mock + def test_client_mounts_with_instrumented_transport(self): + https_url = "https://mock/status/200" + respx.get(https_url).mock(httpx.Response(200)) + proxy_mounts = { + "http://": self.create_transport( + proxy=httpx.Proxy("http://localhost:8080") + ), + "https://": self.create_transport( + proxy=httpx.Proxy("http://localhost:8443") + ), + } + client1 = self.create_client(mounts=proxy_mounts) + client2 = self.create_client(mounts=proxy_mounts) + self.perform_request(self.URL, client=client1) + self.perform_request(https_url, client=client2) + spans = self.assert_span(num_spans=2) + self.assertEqual( + spans[0].attributes[SpanAttributes.HTTP_URL], self.URL + ) + self.assertEqual( + spans[1].attributes[SpanAttributes.HTTP_URL], https_url + ) + class BaseInstrumentorTest(BaseTest, metaclass=abc.ABCMeta): @abc.abstractmethod def create_client( @@ -650,15 +676,39 @@ def create_client( transport: typing.Union[ SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport, None ] = None, + **kwargs, ): pass + @abc.abstractmethod + def create_proxy_transport(self, url: str): + pass + def setUp(self): super().setUp() HTTPXClientInstrumentor().instrument() self.client = self.create_client() HTTPXClientInstrumentor().uninstrument() + def create_proxy_mounts(self): + return { + "http://": self.create_proxy_transport( + "http://localhost:8080" + ), + "https://": self.create_proxy_transport( + "http://localhost:8080" + ), + } + + def assert_proxy_mounts(self, mounts, num_mounts, transport_type): + self.assertEqual(len(mounts), num_mounts) + for transport in mounts: + with self.subTest(transport): + self.assertIsInstance( + transport, + transport_type, + ) + def test_custom_tracer_provider(self): resource = resources.Resource.create({}) result = self.create_tracer_provider(resource=resource) @@ -855,6 +905,71 @@ def test_uninstrument_new_client(self): self.assertEqual(result.text, "Hello!") self.assert_span() + def test_instrument_proxy(self): + proxy_mounts = self.create_proxy_mounts() + HTTPXClientInstrumentor().instrument() + client = self.create_client(mounts=proxy_mounts) + self.perform_request(self.URL, client=client) + self.assert_span(num_spans=1) + self.assert_proxy_mounts( + client._mounts.values(), + 2, + (SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport), + ) + HTTPXClientInstrumentor().uninstrument() + + def test_instrument_client_with_proxy(self): + proxy_mounts = self.create_proxy_mounts() + client = self.create_client(mounts=proxy_mounts) + self.assert_proxy_mounts( + client._mounts.values(), + 2, + (httpx.HTTPTransport, httpx.AsyncHTTPTransport), + ) + HTTPXClientInstrumentor().instrument_client(client) + result = self.perform_request(self.URL, client=client) + self.assertEqual(result.text, "Hello!") + self.assert_span(num_spans=1) + self.assert_proxy_mounts( + client._mounts.values(), + 2, + (SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport), + ) + HTTPXClientInstrumentor().uninstrument_client(client) + + def test_uninstrument_client_with_proxy(self): + proxy_mounts = self.create_proxy_mounts() + HTTPXClientInstrumentor().instrument() + client = self.create_client(mounts=proxy_mounts) + self.assert_proxy_mounts( + client._mounts.values(), + 2, + (SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport), + ) + + HTTPXClientInstrumentor().uninstrument_client(client) + result = self.perform_request(self.URL, client=client) + + self.assertEqual(result.text, "Hello!") + self.assert_span(num_spans=0) + self.assert_proxy_mounts( + client._mounts.values(), + 2, + (httpx.HTTPTransport, httpx.AsyncHTTPTransport), + ) + # Test that other clients as well as instance client is still + # instrumented + client2 = self.create_client() + result = self.perform_request(self.URL, client=client2) + self.assertEqual(result.text, "Hello!") + self.assert_span() + + self.memory_exporter.clear() + + result = self.perform_request(self.URL) + self.assertEqual(result.text, "Hello!") + self.assert_span() + class TestSyncIntegration(BaseTestCases.BaseManualTest): def setUp(self): @@ -871,8 +986,9 @@ def create_transport( tracer_provider: typing.Optional["TracerProvider"] = None, request_hook: typing.Optional["RequestHook"] = None, response_hook: typing.Optional["ResponseHook"] = None, + **kwargs, ): - transport = httpx.HTTPTransport() + transport = httpx.HTTPTransport(**kwargs) telemetry_transport = SyncOpenTelemetryTransport( transport, tracer_provider=tracer_provider, @@ -884,8 +1000,9 @@ def create_transport( def create_client( self, transport: typing.Optional[SyncOpenTelemetryTransport] = None, + **kwargs, ): - return httpx.Client(transport=transport) + return httpx.Client(transport=transport, **kwargs) def perform_request( self, @@ -921,8 +1038,9 @@ def create_transport( tracer_provider: typing.Optional["TracerProvider"] = None, request_hook: typing.Optional["AsyncRequestHook"] = None, response_hook: typing.Optional["AsyncResponseHook"] = None, + **kwargs, ): - transport = httpx.AsyncHTTPTransport() + transport = httpx.AsyncHTTPTransport(**kwargs) telemetry_transport = AsyncOpenTelemetryTransport( transport, tracer_provider=tracer_provider, @@ -934,8 +1052,9 @@ def create_transport( def create_client( self, transport: typing.Optional[AsyncOpenTelemetryTransport] = None, + **kwargs, ): - return httpx.AsyncClient(transport=transport) + return httpx.AsyncClient(transport=transport, **kwargs) def perform_request( self, @@ -977,8 +1096,9 @@ class TestSyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest): def create_client( self, transport: typing.Optional[SyncOpenTelemetryTransport] = None, + **kwargs, ): - return httpx.Client() + return httpx.Client(**kwargs) def perform_request( self, @@ -991,6 +1111,9 @@ def perform_request( return self.client.request(method, url, headers=headers) return client.request(method, url, headers=headers) + def create_proxy_transport(self, url): + return httpx.HTTPTransport(proxy=httpx.Proxy(url)) + class TestAsyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest): response_hook = staticmethod(_async_response_hook) @@ -1007,8 +1130,9 @@ def setUp(self): def create_client( self, transport: typing.Optional[AsyncOpenTelemetryTransport] = None, + **kwargs, ): - return httpx.AsyncClient() + return httpx.AsyncClient(**kwargs) def perform_request( self, @@ -1027,6 +1151,9 @@ async def _perform_request(): return _async_call(_perform_request()) + def create_proxy_transport(self, url): + return httpx.AsyncHTTPTransport(proxy=httpx.Proxy(url)) + def test_basic_multiple(self): # We need to create separate clients because in httpx >= 0.19, # closing the client after "with" means the second http call fails