diff --git a/instrumentation/opentelemetry-instrumentation-asgi/test-requirements.txt b/instrumentation/opentelemetry-instrumentation-asgi/test-requirements.txt index ebe439d1d2..18a11500e1 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/test-requirements.txt +++ b/instrumentation/opentelemetry-instrumentation-asgi/test-requirements.txt @@ -1,4 +1,4 @@ -asgiref==3.7.2 +asgiref==3.8.1 Deprecated==1.2.14 importlib-metadata==6.11.0 iniconfig==2.0.0 diff --git a/instrumentation/opentelemetry-instrumentation-asgi/tests/__init__.py b/instrumentation/opentelemetry-instrumentation-asgi/tests/__init__.py index e69de29bb2..1f32846c43 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/tests/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asgi/tests/__init__.py @@ -0,0 +1,58 @@ +import asyncio +from unittest import IsolatedAsyncioTestCase + +from asgiref.testing import ApplicationCommunicator + +from opentelemetry.test.test_base import TestBase + + +def setup_testing_defaults(scope): + scope.update( + { + "client": ("127.0.0.1", 32767), + "headers": [], + "http_version": "1.0", + "method": "GET", + "path": "/", + "query_string": b"", + "scheme": "http", + "server": ("127.0.0.1", 80), + "type": "http", + } + ) + + +class AsyncAsgiTestBase(TestBase, IsolatedAsyncioTestCase): + def setUp(self): + super().setUp() + + self.scope = {} + setup_testing_defaults(self.scope) + self.communicator = None + + def tearDown(self): + if self.communicator: + asyncio.get_event_loop().run_until_complete( + self.communicator.wait() + ) + + def seed_app(self, app): + self.communicator = ApplicationCommunicator(app, self.scope) + + async def send_input(self, message): + await self.communicator.send_input(message) + + async def send_default_request(self): + await self.send_input({"type": "http.request", "body": b""}) + + async def get_output(self, timeout=1): + return await self.communicator.receive_output(timeout) + + async def get_all_output(self, timeout=1): + outputs = [] + while True: + try: + outputs.append(await self.communicator.receive_output(timeout)) + except asyncio.TimeoutError: + break + return outputs diff --git a/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_custom_headers.py b/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_custom_headers.py index 5394d62ff0..42e7ea2f51 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_custom_headers.py +++ b/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_custom_headers.py @@ -1,7 +1,6 @@ import os import opentelemetry.instrumentation.asgi as otel_asgi -from opentelemetry.test.asgitestutil import AsgiTestBase from opentelemetry.test.test_base import TestBase from opentelemetry.trace import SpanKind from opentelemetry.util.http import ( @@ -10,8 +9,11 @@ OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE, ) +from . import AsyncAsgiTestBase from .test_asgi_middleware import simple_asgi +_TIMEOUT = 0.01 + async def http_app_with_custom_headers(scope, receive, send): message = await receive() @@ -90,7 +92,7 @@ async def websocket_app_with_custom_headers(scope, receive, send): break -class TestCustomHeaders(AsgiTestBase, TestBase): +class TestCustomHeaders(AsyncAsgiTestBase): constructor_params = {} __test__ = False @@ -108,7 +110,7 @@ def setUp(self): **self.constructor_params, ) - def test_http_custom_request_headers_in_span_attributes(self): + async def test_http_custom_request_headers_in_span_attributes(self): self.scope["headers"].extend( [ (b"custom-test-header-1", b"test-header-value-1"), @@ -119,8 +121,8 @@ def test_http_custom_request_headers_in_span_attributes(self): ] ) self.seed_app(self.app) - self.send_default_request() - self.get_all_output() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) span_list = self.exporter.get_finished_spans() expected = { "http.request.header.custom_test_header_1": ( @@ -139,7 +141,7 @@ def test_http_custom_request_headers_in_span_attributes(self): if span.kind == SpanKind.SERVER: self.assertSpanHasAttributes(span, expected) - def test_http_repeat_request_headers_in_span_attributes(self): + async def test_http_repeat_request_headers_in_span_attributes(self): self.scope["headers"].extend( [ (b"custom-test-header-1", b"test-header-value-1"), @@ -147,8 +149,8 @@ def test_http_repeat_request_headers_in_span_attributes(self): ] ) self.seed_app(self.app) - self.send_default_request() - self.get_all_output() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) span_list = self.exporter.get_finished_spans() expected = { "http.request.header.custom_test_header_1": ( @@ -159,15 +161,15 @@ def test_http_repeat_request_headers_in_span_attributes(self): span = next(span for span in span_list if span.kind == SpanKind.SERVER) self.assertSpanHasAttributes(span, expected) - def test_http_custom_request_headers_not_in_span_attributes(self): + async def test_http_custom_request_headers_not_in_span_attributes(self): self.scope["headers"].extend( [ (b"custom-test-header-1", b"test-header-value-1"), ] ) self.seed_app(self.app) - self.send_default_request() - self.get_all_output() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) span_list = self.exporter.get_finished_spans() expected = { "http.request.header.custom_test_header_1": ( @@ -185,15 +187,15 @@ def test_http_custom_request_headers_not_in_span_attributes(self): for key, _ in not_expected.items(): self.assertNotIn(key, span.attributes) - def test_http_custom_response_headers_in_span_attributes(self): + async def test_http_custom_response_headers_in_span_attributes(self): self.app = otel_asgi.OpenTelemetryMiddleware( http_app_with_custom_headers, tracer_provider=self.tracer_provider, **self.constructor_params, ) self.seed_app(self.app) - self.send_default_request() - self.get_all_output() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) span_list = self.exporter.get_finished_spans() expected = { "http.response.header.custom_test_header_1": ( @@ -214,15 +216,15 @@ def test_http_custom_response_headers_in_span_attributes(self): if span.kind == SpanKind.SERVER: self.assertSpanHasAttributes(span, expected) - def test_http_repeat_response_headers_in_span_attributes(self): + async def test_http_repeat_response_headers_in_span_attributes(self): self.app = otel_asgi.OpenTelemetryMiddleware( http_app_with_repeat_headers, tracer_provider=self.tracer_provider, **self.constructor_params, ) self.seed_app(self.app) - self.send_default_request() - self.get_all_output() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) span_list = self.exporter.get_finished_spans() expected = { "http.response.header.custom_test_header_1": ( @@ -233,15 +235,15 @@ def test_http_repeat_response_headers_in_span_attributes(self): span = next(span for span in span_list if span.kind == SpanKind.SERVER) self.assertSpanHasAttributes(span, expected) - def test_http_custom_response_headers_not_in_span_attributes(self): + async def test_http_custom_response_headers_not_in_span_attributes(self): self.app = otel_asgi.OpenTelemetryMiddleware( http_app_with_custom_headers, tracer_provider=self.tracer_provider, **self.constructor_params, ) self.seed_app(self.app) - self.send_default_request() - self.get_all_output() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) span_list = self.exporter.get_finished_spans() not_expected = { "http.response.header.custom_test_header_3": ( @@ -253,7 +255,7 @@ def test_http_custom_response_headers_not_in_span_attributes(self): for key, _ in not_expected.items(): self.assertNotIn(key, span.attributes) - def test_websocket_custom_request_headers_in_span_attributes(self): + async def test_websocket_custom_request_headers_in_span_attributes(self): self.scope = { "type": "websocket", "http_version": "1.1", @@ -271,11 +273,11 @@ def test_websocket_custom_request_headers_in_span_attributes(self): "server": ("127.0.0.1", 80), } self.seed_app(self.app) - self.send_input({"type": "websocket.connect"}) - self.send_input({"type": "websocket.receive", "text": "ping"}) - self.send_input({"type": "websocket.disconnect"}) + await self.send_input({"type": "websocket.connect"}) + await self.send_input({"type": "websocket.receive", "text": "ping"}) + await self.send_input({"type": "websocket.disconnect"}) - self.get_all_output() + await self.get_all_output(_TIMEOUT) span_list = self.exporter.get_finished_spans() expected = { "http.request.header.custom_test_header_1": ( @@ -294,7 +296,9 @@ def test_websocket_custom_request_headers_in_span_attributes(self): if span.kind == SpanKind.SERVER: self.assertSpanHasAttributes(span, expected) - def test_websocket_custom_request_headers_not_in_span_attributes(self): + async def test_websocket_custom_request_headers_not_in_span_attributes( + self, + ): self.scope = { "type": "websocket", "http_version": "1.1", @@ -309,11 +313,11 @@ def test_websocket_custom_request_headers_not_in_span_attributes(self): "server": ("127.0.0.1", 80), } self.seed_app(self.app) - self.send_input({"type": "websocket.connect"}) - self.send_input({"type": "websocket.receive", "text": "ping"}) - self.send_input({"type": "websocket.disconnect"}) + await self.send_input({"type": "websocket.connect"}) + await self.send_input({"type": "websocket.receive", "text": "ping"}) + await self.send_input({"type": "websocket.disconnect"}) - self.get_all_output() + await self.get_all_output(_TIMEOUT) span_list = self.exporter.get_finished_spans() not_expected = { "http.request.header.custom_test_header_3": ( @@ -325,7 +329,7 @@ def test_websocket_custom_request_headers_not_in_span_attributes(self): for key, _ in not_expected.items(): self.assertNotIn(key, span.attributes) - def test_websocket_custom_response_headers_in_span_attributes(self): + async def test_websocket_custom_response_headers_in_span_attributes(self): self.scope = { "type": "websocket", "http_version": "1.1", @@ -342,10 +346,10 @@ def test_websocket_custom_response_headers_in_span_attributes(self): **self.constructor_params, ) self.seed_app(self.app) - self.send_input({"type": "websocket.connect"}) - self.send_input({"type": "websocket.receive", "text": "ping"}) - self.send_input({"type": "websocket.disconnect"}) - self.get_all_output() + await self.send_input({"type": "websocket.connect"}) + await self.send_input({"type": "websocket.receive", "text": "ping"}) + await self.send_input({"type": "websocket.disconnect"}) + await self.get_all_output(_TIMEOUT) span_list = self.exporter.get_finished_spans() expected = { "http.response.header.custom_test_header_1": ( @@ -366,7 +370,9 @@ def test_websocket_custom_response_headers_in_span_attributes(self): if span.kind == SpanKind.SERVER: self.assertSpanHasAttributes(span, expected) - def test_websocket_custom_response_headers_not_in_span_attributes(self): + async def test_websocket_custom_response_headers_not_in_span_attributes( + self, + ): self.scope = { "type": "websocket", "http_version": "1.1", @@ -383,10 +389,10 @@ def test_websocket_custom_response_headers_not_in_span_attributes(self): **self.constructor_params, ) self.seed_app(self.app) - self.send_input({"type": "websocket.connect"}) - self.send_input({"type": "websocket.receive", "text": "ping"}) - self.send_input({"type": "websocket.disconnect"}) - self.get_all_output() + await self.send_input({"type": "websocket.connect"}) + await self.send_input({"type": "websocket.receive", "text": "ping"}) + await self.send_input({"type": "websocket.disconnect"}) + await self.get_all_output(_TIMEOUT) span_list = self.exporter.get_finished_spans() not_expected = { "http.response.header.custom_test_header_3": ( diff --git a/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py b/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py index af51faa808..d287eccc2e 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py +++ b/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py @@ -14,7 +14,6 @@ # pylint: disable=too-many-lines -import asyncio import sys import time import unittest @@ -66,13 +65,11 @@ USER_AGENT_ORIGINAL, ) from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.test.asgitestutil import ( - AsgiTestBase, - setup_testing_defaults, -) from opentelemetry.test.test_base import TestBase from opentelemetry.trace import SpanKind, format_span_id, format_trace_id +from . import AsyncAsgiTestBase, setup_testing_defaults + _expected_metric_names_old = [ "http.server.active_requests", "http.server.duration", @@ -109,6 +106,7 @@ ) _SIMULATED_BACKGROUND_TASK_EXECUTION_TIME_S = 0.01 +_TIMEOUT = 0.01 async def http_app(scope, receive, send): @@ -278,7 +276,7 @@ async def error_asgi(scope, receive, send): # pylint: disable=too-many-public-methods -class TestAsgiApplication(AsgiTestBase): +class TestAsgiApplication(AsyncAsgiTestBase): def setUp(self): super().setUp() @@ -482,31 +480,31 @@ def validate_outputs( "opentelemetry.instrumentation.asgi", ) - def test_basic_asgi_call(self): + async def test_basic_asgi_call_test(self): """Test that spans are emitted as expected.""" app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs(outputs) - def test_basic_asgi_call_new_semconv(self): + async def test_basic_asgi_call_new_semconv(self): """Test that spans are emitted as expected.""" app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs(outputs, old_sem_conv=False, new_sem_conv=True) - def test_basic_asgi_call_both_semconv(self): + async def test_basic_asgi_call_both_semconv(self): """Test that spans are emitted as expected.""" app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs(outputs, old_sem_conv=True, new_sem_conv=True) - def test_asgi_not_recording(self): + async def test_asgi_not_recording(self): mock_tracer = mock.Mock() mock_span = mock.Mock() mock_span.is_recording.return_value = False @@ -519,21 +517,21 @@ def test_asgi_not_recording(self): tracer.return_value = mock_tracer app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() + await self.send_default_request() self.assertFalse(mock_span.is_recording()) self.assertTrue(mock_span.is_recording.called) self.assertFalse(mock_span.set_attribute.called) self.assertFalse(mock_span.set_status.called) - def test_asgi_exc_info(self): + async def test_asgi_exc_info(self): """Test that exception information is emitted as expected.""" app = otel_asgi.OpenTelemetryMiddleware(error_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs(outputs, error=ValueError) - def test_long_response(self): + async def test_long_response(self): """Test that the server span is ended on the final response body message. If the server span is ended early then this test will fail due @@ -541,8 +539,8 @@ def test_long_response(self): """ app = otel_asgi.OpenTelemetryMiddleware(long_response_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) def add_more_body_spans(expected: list): more_body_span = { @@ -556,12 +554,12 @@ def add_more_body_spans(expected: list): self.validate_outputs(outputs, modifiers=[add_more_body_spans]) - def test_background_execution(self): + async def test_background_execution(self): """Test that the server span is ended BEFORE the background task is finished.""" app = otel_asgi.OpenTelemetryMiddleware(background_execution_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs(outputs) span_list = self.memory_exporter.get_finished_spans() server_span = span_list[-1] @@ -572,15 +570,15 @@ def test_background_execution(self): _SIMULATED_BACKGROUND_TASK_EXECUTION_TIME_S * 10**9, ) - def test_trailers(self): + async def test_trailers(self): """Test that trailers are emitted as expected and that the server span is ended BEFORE the background task is finished.""" app = otel_asgi.OpenTelemetryMiddleware( background_execution_trailers_asgi ) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) def add_body_and_trailer_span(expected: list): body_span = { @@ -607,7 +605,7 @@ def add_body_and_trailer_span(expected: list): _SIMULATED_BACKGROUND_TASK_EXECUTION_TIME_S * 10**9, ) - def test_override_span_name(self): + async def test_override_span_name(self): """Test that default span_names can be overwritten by our callback function.""" span_name = "Dymaxion" @@ -628,11 +626,11 @@ def update_expected_span_name(expected): simple_asgi, default_span_details=get_predefined_span_details ) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs(outputs, modifiers=[update_expected_span_name]) - def test_custom_tracer_provider_otel_asgi(self): + async def test_custom_tracer_provider_otel_asgi(self): resource = resources.Resource.create({"service-test-key": "value"}) result = TestBase.create_tracer_provider(resource=resource) tracer_provider, exporter = result @@ -641,28 +639,28 @@ def test_custom_tracer_provider_otel_asgi(self): simple_asgi, tracer_provider=tracer_provider ) self.seed_app(app) - self.send_default_request() + await self.send_default_request() span_list = exporter.get_finished_spans() for span in span_list: self.assertEqual( span.resource.attributes["service-test-key"], "value" ) - def test_no_op_tracer_provider_otel_asgi(self): + async def test_no_op_tracer_provider_otel_asgi(self): app = otel_asgi.OpenTelemetryMiddleware( simple_asgi, tracer_provider=trace_api.NoOpTracerProvider() ) self.seed_app(app) - self.send_default_request() + await self.send_default_request() - response_start, response_body, *_ = self.get_all_output() + response_start, response_body, *_ = await self.get_all_output(_TIMEOUT) self.assertEqual(response_body["body"], b"*") self.assertEqual(response_start["status"], 200) span_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(span_list), 0) - def test_behavior_with_scope_server_as_none(self): + async def test_behavior_with_scope_server_as_none(self): """Test that middleware is ok when server is none in scope.""" def update_expected_server(expected): @@ -678,11 +676,11 @@ def update_expected_server(expected): self.scope["server"] = None app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs(outputs, modifiers=[update_expected_server]) - def test_behavior_with_scope_server_as_none_new_semconv(self): + async def test_behavior_with_scope_server_as_none_new_semconv(self): """Test that middleware is ok when server is none in scope.""" def update_expected_server(expected): @@ -697,8 +695,8 @@ def update_expected_server(expected): self.scope["server"] = None app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs( outputs, modifiers=[update_expected_server], @@ -706,7 +704,7 @@ def update_expected_server(expected): new_sem_conv=True, ) - def test_behavior_with_scope_server_as_none_both_semconv(self): + async def test_behavior_with_scope_server_as_none_both_semconv(self): """Test that middleware is ok when server is none in scope.""" def update_expected_server(expected): @@ -724,8 +722,8 @@ def update_expected_server(expected): self.scope["server"] = None app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs( outputs, modifiers=[update_expected_server], @@ -733,7 +731,7 @@ def update_expected_server(expected): new_sem_conv=True, ) - def test_host_header(self): + async def test_host_header(self): """Test that host header is converted to http.server_name.""" hostname = b"server_name_1" @@ -746,11 +744,11 @@ def update_expected_server(expected): self.scope["headers"].append([b"host", hostname]) app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs(outputs, modifiers=[update_expected_server]) - def test_host_header_both_semconv(self): + async def test_host_header_both_semconv(self): """Test that host header is converted to http.server_name.""" hostname = b"server_name_1" @@ -763,8 +761,8 @@ def update_expected_server(expected): self.scope["headers"].append([b"host", hostname]) app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs( outputs, modifiers=[update_expected_server], @@ -772,7 +770,7 @@ def update_expected_server(expected): new_sem_conv=True, ) - def test_user_agent(self): + async def test_user_agent(self): """Test that host header is converted to http.server_name.""" user_agent = b"test-agent" @@ -785,11 +783,11 @@ def update_expected_user_agent(expected): self.scope["headers"].append([b"user-agent", user_agent]) app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs(outputs, modifiers=[update_expected_user_agent]) - def test_user_agent_new_semconv(self): + async def test_user_agent_new_semconv(self): """Test that host header is converted to http.server_name.""" user_agent = b"test-agent" @@ -802,8 +800,8 @@ def update_expected_user_agent(expected): self.scope["headers"].append([b"user-agent", user_agent]) app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs( outputs, modifiers=[update_expected_user_agent], @@ -811,7 +809,7 @@ def update_expected_user_agent(expected): new_sem_conv=True, ) - def test_user_agent_both_semconv(self): + async def test_user_agent_both_semconv(self): """Test that host header is converted to http.server_name.""" user_agent = b"test-agent" @@ -827,8 +825,8 @@ def update_expected_user_agent(expected): self.scope["headers"].append([b"user-agent", user_agent]) app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs( outputs, modifiers=[update_expected_user_agent], @@ -836,7 +834,7 @@ def update_expected_user_agent(expected): new_sem_conv=True, ) - def test_traceresponse_header(self): + async def test_traceresponse_header(self): """Test a traceresponse header is sent when a global propagator is set.""" orig = get_global_response_propagator() @@ -844,12 +842,12 @@ def test_traceresponse_header(self): app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() + await self.send_default_request() + response_start, response_body, *_ = await self.get_all_output(_TIMEOUT) span = self.memory_exporter.get_finished_spans()[-1] self.assertEqual(trace_api.SpanKind.SERVER, span.kind) - response_start, response_body, *_ = self.get_all_output() self.assertEqual(response_body["body"], b"*") self.assertEqual(response_start["status"], 200) @@ -869,7 +867,7 @@ def test_traceresponse_header(self): set_global_response_propagator(orig) - def test_websocket(self): + async def test_websocket(self): self.scope = { "method": "GET", "type": "websocket", @@ -883,10 +881,10 @@ def test_websocket(self): } app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_input({"type": "websocket.connect"}) - self.send_input({"type": "websocket.receive", "text": "ping"}) - self.send_input({"type": "websocket.disconnect"}) - self.get_all_output() + await self.send_input({"type": "websocket.connect"}) + await self.send_input({"type": "websocket.receive", "text": "ping"}) + await self.send_input({"type": "websocket.disconnect"}) + await self.get_all_output(_TIMEOUT) span_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(span_list), 6) expected = [ @@ -943,7 +941,7 @@ def test_websocket(self): self.assertEqual(span.kind, expected["kind"]) self.assertDictEqual(dict(span.attributes), expected["attributes"]) - def test_websocket_new_semconv(self): + async def test_websocket_new_semconv(self): self.scope = { "method": "GET", "type": "websocket", @@ -957,10 +955,10 @@ def test_websocket_new_semconv(self): } app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_input({"type": "websocket.connect"}) - self.send_input({"type": "websocket.receive", "text": "ping"}) - self.send_input({"type": "websocket.disconnect"}) - self.get_all_output() + await self.send_input({"type": "websocket.connect"}) + await self.send_input({"type": "websocket.receive", "text": "ping"}) + await self.send_input({"type": "websocket.disconnect"}) + await self.get_all_output(_TIMEOUT) span_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(span_list), 6) expected = [ @@ -1016,7 +1014,7 @@ def test_websocket_new_semconv(self): self.assertEqual(span.kind, expected["kind"]) self.assertDictEqual(dict(span.attributes), expected["attributes"]) - def test_websocket_both_semconv(self): + async def test_websocket_both_semconv(self): self.scope = { "method": "GET", "type": "websocket", @@ -1030,10 +1028,10 @@ def test_websocket_both_semconv(self): } app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_input({"type": "websocket.connect"}) - self.send_input({"type": "websocket.receive", "text": "ping"}) - self.send_input({"type": "websocket.disconnect"}) - self.get_all_output() + await self.send_input({"type": "websocket.connect"}) + await self.send_input({"type": "websocket.receive", "text": "ping"}) + await self.send_input({"type": "websocket.disconnect"}) + await self.get_all_output(_TIMEOUT) span_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(span_list), 6) expected = [ @@ -1101,7 +1099,7 @@ def test_websocket_both_semconv(self): self.assertEqual(span.kind, expected["kind"]) self.assertDictEqual(dict(span.attributes), expected["attributes"]) - def test_websocket_traceresponse_header(self): + async def test_websocket_traceresponse_header(self): """Test a traceresponse header is set for websocket messages""" orig = get_global_response_propagator() @@ -1119,10 +1117,10 @@ def test_websocket_traceresponse_header(self): } app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_input({"type": "websocket.connect"}) - self.send_input({"type": "websocket.receive", "text": "ping"}) - self.send_input({"type": "websocket.disconnect"}) - _, socket_send, *_ = self.get_all_output() + await self.send_input({"type": "websocket.connect"}) + await self.send_input({"type": "websocket.receive", "text": "ping"}) + await self.send_input({"type": "websocket.disconnect"}) + _, socket_send, *_ = await self.get_all_output(_TIMEOUT) span = self.memory_exporter.get_finished_spans()[-1] self.assertEqual(trace_api.SpanKind.SERVER, span.kind) @@ -1141,15 +1139,15 @@ def test_websocket_traceresponse_header(self): set_global_response_propagator(orig) - def test_lifespan(self): + async def test_lifespan(self): self.scope["type"] = "lifespan" app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() + await self.send_default_request() span_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(span_list), 0) - def test_hooks(self): + async def test_hooks(self): def server_request_hook(span, scope): span.update_name("name from server hook") @@ -1176,20 +1174,27 @@ def update_expected_hook_results(expected): client_response_hook=client_response_hook, ) self.seed_app(app) - self.send_default_request() - outputs = self.get_all_output() + await self.send_default_request() + outputs = await self.get_all_output(_TIMEOUT) self.validate_outputs( outputs, modifiers=[update_expected_hook_results] ) - def test_asgi_metrics(self): + async def test_asgi_metrics(self): app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) + self.seed_app(app) - self.send_default_request() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) + self.seed_app(app) - self.send_default_request() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) + self.seed_app(app) - self.send_default_request() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) + metrics_list = self.memory_metrics_reader.get_metrics_data() number_data_point_seen = False histogram_data_point_seen = False @@ -1218,14 +1223,17 @@ def test_asgi_metrics(self): ) self.assertTrue(number_data_point_seen and histogram_data_point_seen) - def test_asgi_metrics_new_semconv(self): + async def test_asgi_metrics_new_semconv(self): app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) self.seed_app(app) - self.send_default_request() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) self.seed_app(app) - self.send_default_request() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) metrics_list = self.memory_metrics_reader.get_metrics_data() number_data_point_seen = False histogram_data_point_seen = False @@ -1254,14 +1262,17 @@ def test_asgi_metrics_new_semconv(self): ) self.assertTrue(number_data_point_seen and histogram_data_point_seen) - def test_asgi_metrics_both_semconv(self): + async def test_asgi_metrics_both_semconv(self): app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_default_request() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) self.seed_app(app) - self.send_default_request() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) self.seed_app(app) - self.send_default_request() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) metrics_list = self.memory_metrics_reader.get_metrics_data() number_data_point_seen = False histogram_data_point_seen = False @@ -1290,12 +1301,13 @@ def test_asgi_metrics_both_semconv(self): ) self.assertTrue(number_data_point_seen and histogram_data_point_seen) - def test_basic_metric_success(self): + async def test_basic_metric_success(self): app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) start = default_timer() - self.send_default_request() + await self.send_default_request() duration = max(round((default_timer() - start) * 1000), 0) + await self.get_all_output(_TIMEOUT) expected_duration_attributes = { "http.method": "GET", "http.host": "127.0.0.1", @@ -1337,7 +1349,7 @@ def test_basic_metric_success(self): ) self.assertEqual(point.value, 0) - def test_basic_metric_success_nonrecording_span(self): + async def test_basic_metric_success_nonrecording_span(self): mock_tracer = mock.Mock() mock_span = mock.Mock() mock_span.is_recording.return_value = False @@ -1351,8 +1363,9 @@ def test_basic_metric_success_nonrecording_span(self): app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) start = default_timer() - self.send_default_request() + await self.send_default_request() duration = max(round((default_timer() - start) * 1000), 0) + await self.get_all_output(_TIMEOUT) expected_duration_attributes = { "http.method": "GET", "http.host": "127.0.0.1", @@ -1381,7 +1394,7 @@ def test_basic_metric_success_nonrecording_span(self): self.assertEqual(point.count, 1) if metric.name == "http.server.duration": self.assertAlmostEqual( - duration, point.sum, delta=5 + duration, point.sum, delta=15 ) elif ( metric.name == "http.server.response.size" @@ -1396,12 +1409,13 @@ def test_basic_metric_success_nonrecording_span(self): ) self.assertEqual(point.value, 0) - def test_basic_metric_success_new_semconv(self): + async def test_basic_metric_success_new_semconv(self): app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) start = default_timer() - self.send_default_request() + await self.send_default_request() duration_s = max(default_timer() - start, 0) + await self.get_all_output(_TIMEOUT) expected_duration_attributes = { "http.request.method": "GET", "url.scheme": "http", @@ -1443,13 +1457,14 @@ def test_basic_metric_success_new_semconv(self): ) self.assertEqual(point.value, 0) - def test_basic_metric_success_both_semconv(self): + async def test_basic_metric_success_both_semconv(self): app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) start = default_timer() - self.send_default_request() + await self.send_default_request() duration = max(round((default_timer() - start) * 1000), 0) duration_s = max(default_timer() - start, 0) + await self.get_all_output(_TIMEOUT) expected_duration_attributes_old = { "http.method": "GET", "http.host": "127.0.0.1", @@ -1531,7 +1546,7 @@ def test_basic_metric_success_both_semconv(self): ) self.assertEqual(point.value, 0) - def test_metric_target_attribute(self): + async def test_metric_target_attribute(self): expected_target = "/api/user/{id}" class TestRoute: @@ -1547,7 +1562,8 @@ async def target_asgi(scope, receive, send): app = otel_asgi.OpenTelemetryMiddleware(target_asgi) self.seed_app(app) - self.send_default_request() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) metrics_list = self.memory_metrics_reader.get_metrics_data() assertions = 0 for resource_metric in metrics_list.resource_metrics: @@ -1564,7 +1580,7 @@ async def target_asgi(scope, receive, send): assertions += 1 self.assertEqual(assertions, 3) - def test_no_metric_for_websockets(self): + async def test_no_metric_for_websockets(self): self.scope = { "type": "websocket", "http_version": "1.1", @@ -1577,10 +1593,10 @@ def test_no_metric_for_websockets(self): } app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) self.seed_app(app) - self.send_input({"type": "websocket.connect"}) - self.send_input({"type": "websocket.receive", "text": "ping"}) - self.send_input({"type": "websocket.disconnect"}) - self.get_all_output() + await self.send_input({"type": "websocket.connect"}) + await self.send_input({"type": "websocket.receive", "text": "ping"}) + await self.send_input({"type": "websocket.disconnect"}) + await self.get_all_output(_TIMEOUT) self.assertIsNone(self.memory_metrics_reader.get_metrics_data()) @@ -1799,8 +1815,10 @@ def test_collect_target_attribute_fastapi_starlette_invalid(self): ) -class TestWrappedApplication(AsgiTestBase): - def test_mark_span_internal_in_presence_of_span_from_other_framework(self): +class TestWrappedApplication(AsyncAsgiTestBase): + async def test_mark_span_internal_in_presence_of_span_from_other_framework( + self, + ): tracer_provider, exporter = TestBase.create_tracer_provider() tracer = tracer_provider.get_tracer(__name__) app = otel_asgi.OpenTelemetryMiddleware( @@ -1815,7 +1833,8 @@ async def wrapped_app(scope, receive, send): await app(scope, receive, send) self.seed_app(wrapped_app) - self.send_default_request() + await self.send_default_request() + await self.get_all_output(_TIMEOUT) span_list = exporter.get_finished_spans() self.assertEqual(SpanKind.INTERNAL, span_list[0].kind) @@ -1832,11 +1851,11 @@ async def wrapped_app(scope, receive, send): ) -class TestAsgiApplicationRaisingError(AsgiTestBase): +class TestAsgiApplicationRaisingError(AsyncAsgiTestBase): def tearDown(self): pass - def test_asgi_issue_1883(self): + async def test_asgi_value_error_exception(self): """ Test that exception UnboundLocalError local variable 'start' referenced before assignment is not raised See https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1883 @@ -1847,11 +1866,9 @@ async def bad_app(_scope, _receive, _send): app = otel_asgi.OpenTelemetryMiddleware(bad_app) self.seed_app(app) - self.send_default_request() + await self.send_default_request() try: - asyncio.get_event_loop().run_until_complete( - self.communicator.stop() - ) + await self.communicator.wait() except ValueError as exc_info: self.assertEqual(exc_info.args[0], "whatever") except Exception as exc_info: # pylint: disable=W0703