Skip to content

Commit

Permalink
Add support to instrument httpx when using proxy (#2664)
Browse files Browse the repository at this point in the history
  • Loading branch information
emdneto authored Jul 17, 2024
1 parent 5a7935f commit e6c27e0
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 7 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2630](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2630))
- `opentelemetry-instrumentation-system-metrics` Add support for capture open file descriptors
([#2652](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2652))
- `opentelemetry-instrumentation-httpx` Add support for instrument client with proxy
([#2664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2664))
- `opentelemetry-instrumentation-aiohttp-client` Implement new semantic convention opt-in migration
([#2673](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2673))
- `opentelemetry-instrumentation-django` Add `http.target` to Django duration metric attributes
Expand Down Expand Up @@ -63,7 +65,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-asgi` Fix generation of `http.target` and `http.url` attributes for ASGI apps
using sub apps
([#2477](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2477))
- `opentelemetry-instrumentation-aws-lambda` Bugfix: AWS Lambda event source key incorrect for SNS in instrumentation library.
- `opentelemetry-instrumentation-aws-lambda` Bugfix: AWS Lambda event source key incorrect for SNS in instrumentation library.
([#2612](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2612))
- `opentelemetry-instrumentation-asyncio` instrumented `asyncio.wait_for` properly raises `asyncio.TimeoutError` as expected
([#2637](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2637))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self._original_transport = self._transport
self._original_mounts = self._mounts.copy()
self._is_instrumented_by_opentelemetry = True

self._transport = SyncOpenTelemetryTransport(
Expand All @@ -648,6 +649,21 @@ def __init__(self, *args, **kwargs):
request_hook=_InstrumentedClient._request_hook,
response_hook=_InstrumentedClient._response_hook,
)
self._mounts.update(
{
url_pattern: (
SyncOpenTelemetryTransport(
transport,
tracer_provider=_InstrumentedClient._tracer_provider,
request_hook=_InstrumentedClient._request_hook,
response_hook=_InstrumentedClient._response_hook,
)
if transport is not None
else transport
)
for url_pattern, transport in self._original_mounts.items()
}
)


class _InstrumentedAsyncClient(httpx.AsyncClient):
Expand All @@ -659,6 +675,7 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self._original_transport = self._transport
self._original_mounts = self._mounts.copy()
self._is_instrumented_by_opentelemetry = True

self._transport = AsyncOpenTelemetryTransport(
Expand All @@ -668,6 +685,22 @@ def __init__(self, *args, **kwargs):
response_hook=_InstrumentedAsyncClient._response_hook,
)

self._mounts.update(
{
url_pattern: (
AsyncOpenTelemetryTransport(
transport,
tracer_provider=_InstrumentedAsyncClient._tracer_provider,
request_hook=_InstrumentedAsyncClient._request_hook,
response_hook=_InstrumentedAsyncClient._response_hook,
)
if transport is not None
else transport
)
for url_pattern, transport in self._original_mounts.items()
}
)


class HTTPXClientInstrumentor(BaseInstrumentor):
# pylint: disable=protected-access,attribute-defined-outside-init
Expand Down Expand Up @@ -752,6 +785,7 @@ def instrument_client(
if not client._is_instrumented_by_opentelemetry:
if isinstance(client, httpx.Client):
client._original_transport = client._transport
client._original_mounts = client._mounts.copy()
transport = client._transport or httpx.HTTPTransport()
client._transport = SyncOpenTelemetryTransport(
transport,
Expand All @@ -760,15 +794,47 @@ def instrument_client(
response_hook=response_hook,
)
client._is_instrumented_by_opentelemetry = True
client._mounts.update(
{
url_pattern: (
SyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
)
if transport is not None
else transport
)
for url_pattern, transport in client._original_mounts.items()
}
)

if isinstance(client, httpx.AsyncClient):
transport = client._transport or httpx.AsyncHTTPTransport()
client._original_mounts = client._mounts.copy()
client._transport = AsyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
)
client._is_instrumented_by_opentelemetry = True
client._mounts.update(
{
url_pattern: (
AsyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
)
if transport is not None
else transport
)
for url_pattern, transport in client._original_mounts.items()
}
)
else:
_logger.warning(
"Attempting to instrument Httpx client while already instrumented"
Expand All @@ -787,6 +853,9 @@ def uninstrument_client(
client._transport = client._original_transport
del client._original_transport
client._is_instrumented_by_opentelemetry = False
if hasattr(client, "_original_mounts"):
client._mounts = client._original_mounts.copy()
del client._original_mounts
else:
_logger.warning(
"Attempting to uninstrument Httpx "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ def create_transport(
tracer_provider: typing.Optional["TracerProvider"] = None,
request_hook: typing.Optional["RequestHook"] = None,
response_hook: typing.Optional["ResponseHook"] = None,
**kwargs,
):
pass

Expand All @@ -539,6 +540,7 @@ def create_client(
transport: typing.Union[
SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport, None
] = None,
**kwargs,
):
pass

Expand Down Expand Up @@ -643,22 +645,70 @@ def test_not_recording_not_set_attribute_in_exception_new_semconv(
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)

@respx.mock
def test_client_mounts_with_instrumented_transport(self):
https_url = "https://mock/status/200"
respx.get(https_url).mock(httpx.Response(200))
proxy_mounts = {
"http://": self.create_transport(
proxy=httpx.Proxy("http://localhost:8080")
),
"https://": self.create_transport(
proxy=httpx.Proxy("http://localhost:8443")
),
}
client1 = self.create_client(mounts=proxy_mounts)
client2 = self.create_client(mounts=proxy_mounts)
self.perform_request(self.URL, client=client1)
self.perform_request(https_url, client=client2)
spans = self.assert_span(num_spans=2)
self.assertEqual(
spans[0].attributes[SpanAttributes.HTTP_URL], self.URL
)
self.assertEqual(
spans[1].attributes[SpanAttributes.HTTP_URL], https_url
)

class BaseInstrumentorTest(BaseTest, metaclass=abc.ABCMeta):
@abc.abstractmethod
def create_client(
self,
transport: typing.Union[
SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport, None
] = None,
**kwargs,
):
pass

@abc.abstractmethod
def create_proxy_transport(self, url: str):
pass

def setUp(self):
super().setUp()
HTTPXClientInstrumentor().instrument()
self.client = self.create_client()
HTTPXClientInstrumentor().uninstrument()

def create_proxy_mounts(self):
return {
"http://": self.create_proxy_transport(
"http://localhost:8080"
),
"https://": self.create_proxy_transport(
"http://localhost:8080"
),
}

def assert_proxy_mounts(self, mounts, num_mounts, transport_type):
self.assertEqual(len(mounts), num_mounts)
for transport in mounts:
with self.subTest(transport):
self.assertIsInstance(
transport,
transport_type,
)

def test_custom_tracer_provider(self):
resource = resources.Resource.create({})
result = self.create_tracer_provider(resource=resource)
Expand Down Expand Up @@ -855,6 +905,71 @@ def test_uninstrument_new_client(self):
self.assertEqual(result.text, "Hello!")
self.assert_span()

def test_instrument_proxy(self):
proxy_mounts = self.create_proxy_mounts()
HTTPXClientInstrumentor().instrument()
client = self.create_client(mounts=proxy_mounts)
self.perform_request(self.URL, client=client)
self.assert_span(num_spans=1)
self.assert_proxy_mounts(
client._mounts.values(),
2,
(SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport),
)
HTTPXClientInstrumentor().uninstrument()

def test_instrument_client_with_proxy(self):
proxy_mounts = self.create_proxy_mounts()
client = self.create_client(mounts=proxy_mounts)
self.assert_proxy_mounts(
client._mounts.values(),
2,
(httpx.HTTPTransport, httpx.AsyncHTTPTransport),
)
HTTPXClientInstrumentor().instrument_client(client)
result = self.perform_request(self.URL, client=client)
self.assertEqual(result.text, "Hello!")
self.assert_span(num_spans=1)
self.assert_proxy_mounts(
client._mounts.values(),
2,
(SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport),
)
HTTPXClientInstrumentor().uninstrument_client(client)

def test_uninstrument_client_with_proxy(self):
proxy_mounts = self.create_proxy_mounts()
HTTPXClientInstrumentor().instrument()
client = self.create_client(mounts=proxy_mounts)
self.assert_proxy_mounts(
client._mounts.values(),
2,
(SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport),
)

HTTPXClientInstrumentor().uninstrument_client(client)
result = self.perform_request(self.URL, client=client)

self.assertEqual(result.text, "Hello!")
self.assert_span(num_spans=0)
self.assert_proxy_mounts(
client._mounts.values(),
2,
(httpx.HTTPTransport, httpx.AsyncHTTPTransport),
)
# Test that other clients as well as instance client is still
# instrumented
client2 = self.create_client()
result = self.perform_request(self.URL, client=client2)
self.assertEqual(result.text, "Hello!")
self.assert_span()

self.memory_exporter.clear()

result = self.perform_request(self.URL)
self.assertEqual(result.text, "Hello!")
self.assert_span()


class TestSyncIntegration(BaseTestCases.BaseManualTest):
def setUp(self):
Expand All @@ -871,8 +986,9 @@ def create_transport(
tracer_provider: typing.Optional["TracerProvider"] = None,
request_hook: typing.Optional["RequestHook"] = None,
response_hook: typing.Optional["ResponseHook"] = None,
**kwargs,
):
transport = httpx.HTTPTransport()
transport = httpx.HTTPTransport(**kwargs)
telemetry_transport = SyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
Expand All @@ -884,8 +1000,9 @@ def create_transport(
def create_client(
self,
transport: typing.Optional[SyncOpenTelemetryTransport] = None,
**kwargs,
):
return httpx.Client(transport=transport)
return httpx.Client(transport=transport, **kwargs)

def perform_request(
self,
Expand Down Expand Up @@ -921,8 +1038,9 @@ def create_transport(
tracer_provider: typing.Optional["TracerProvider"] = None,
request_hook: typing.Optional["AsyncRequestHook"] = None,
response_hook: typing.Optional["AsyncResponseHook"] = None,
**kwargs,
):
transport = httpx.AsyncHTTPTransport()
transport = httpx.AsyncHTTPTransport(**kwargs)
telemetry_transport = AsyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
Expand All @@ -934,8 +1052,9 @@ def create_transport(
def create_client(
self,
transport: typing.Optional[AsyncOpenTelemetryTransport] = None,
**kwargs,
):
return httpx.AsyncClient(transport=transport)
return httpx.AsyncClient(transport=transport, **kwargs)

def perform_request(
self,
Expand Down Expand Up @@ -977,8 +1096,9 @@ class TestSyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest):
def create_client(
self,
transport: typing.Optional[SyncOpenTelemetryTransport] = None,
**kwargs,
):
return httpx.Client()
return httpx.Client(**kwargs)

def perform_request(
self,
Expand All @@ -991,6 +1111,9 @@ def perform_request(
return self.client.request(method, url, headers=headers)
return client.request(method, url, headers=headers)

def create_proxy_transport(self, url):
return httpx.HTTPTransport(proxy=httpx.Proxy(url))


class TestAsyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest):
response_hook = staticmethod(_async_response_hook)
Expand All @@ -1007,8 +1130,9 @@ def setUp(self):
def create_client(
self,
transport: typing.Optional[AsyncOpenTelemetryTransport] = None,
**kwargs,
):
return httpx.AsyncClient()
return httpx.AsyncClient(**kwargs)

def perform_request(
self,
Expand All @@ -1027,6 +1151,9 @@ async def _perform_request():

return _async_call(_perform_request())

def create_proxy_transport(self, url):
return httpx.AsyncHTTPTransport(proxy=httpx.Proxy(url))

def test_basic_multiple(self):
# We need to create separate clients because in httpx >= 0.19,
# closing the client after "with" means the second http call fails
Expand Down

0 comments on commit e6c27e0

Please sign in to comment.