diff --git a/CHANGELOG.md b/CHANGELOG.md index fa52f4cde4..01d9411162 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- `opentelemetry-instrumentation-aio-pika` and `opentelemetry-instrumentation-pika` Fix missing trace context propagation when trace not recording. + ([#1969](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1969)) - Fix version of Flask dependency `werkzeug` ([#1980](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1980)) - `opentelemetry-resource-detector-azure` Using new Cloud Resource ID attribute. diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py index cae834a031..03937290ee 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py @@ -45,8 +45,7 @@ async def decorated_publish( if not span: return await publish(message, routing_key, **kwargs) with trace.use_span(span, end_on_exit=True): - if span.is_recording(): - propagate.inject(message.properties.headers) + propagate.inject(message.properties.headers) return_value = await publish(message, routing_key, **kwargs) return return_value diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py index d5291e07d9..41cd11d5a6 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py @@ -14,6 +14,7 @@ import asyncio from typing import Type from unittest import TestCase, mock, skipIf +from unittest.mock import MagicMock from aio_pika import Exchange, RobustExchange @@ -92,6 +93,36 @@ def test_publish(self): def test_robust_publish(self): self._test_publish(RobustExchange) + def _test_publish_works_with_not_recording_span(self, exchange_type): + exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME) + with mock.patch.object( + PublishDecorator, "_get_publish_span" + ) as mock_get_publish_span: + mocked_not_recording_span = MagicMock() + mocked_not_recording_span.is_recording.return_value = False + mock_get_publish_span.return_value = mocked_not_recording_span + with mock.patch.object( + Exchange, "publish", return_value=asyncio.sleep(0) + ) as mock_publish: + with mock.patch( + "opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject" + ) as mock_inject: + decorated_publish = PublishDecorator( + self.tracer, exchange + ).decorate(mock_publish) + self.loop.run_until_complete( + decorated_publish(MESSAGE, ROUTING_KEY) + ) + mock_publish.assert_called_once() + mock_get_publish_span.assert_called_once() + mock_inject.assert_called_once() + + def test_publish_works_with_not_recording_span(self): + self._test_publish_works_with_not_recording_span(Exchange) + + def test_publish_works_with_not_recording_span_robust(self): + self._test_publish_works_with_not_recording_span(RobustExchange) + @skipIf(AIOPIKA_VERSION_INFO <= (8, 0), "Only for aio_pika 8") class TestInstrumentedExchangeAioRmq8(TestCase): @@ -144,3 +175,33 @@ def test_publish(self): def test_robust_publish(self): self._test_publish(RobustExchange) + + def _test_publish_works_with_not_recording_span(self, exchange_type): + exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME) + with mock.patch.object( + PublishDecorator, "_get_publish_span" + ) as mock_get_publish_span: + mocked_not_recording_span = MagicMock() + mocked_not_recording_span.is_recording.return_value = False + mock_get_publish_span.return_value = mocked_not_recording_span + with mock.patch.object( + Exchange, "publish", return_value=asyncio.sleep(0) + ) as mock_publish: + with mock.patch( + "opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject" + ) as mock_inject: + decorated_publish = PublishDecorator( + self.tracer, exchange + ).decorate(mock_publish) + self.loop.run_until_complete( + decorated_publish(MESSAGE, ROUTING_KEY) + ) + mock_publish.assert_called_once() + mock_get_publish_span.assert_called_once() + mock_inject.assert_called_once() + + def test_publish_works_with_not_recording_span(self): + self._test_publish_works_with_not_recording_span(Exchange) + + def test_publish_works_with_not_recording_span_robust(self): + self._test_publish_works_with_not_recording_span(RobustExchange) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index e9f819f2d6..881149dbac 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -113,12 +113,11 @@ def decorated_function( exchange, routing_key, body, properties, mandatory ) with trace.use_span(span, end_on_exit=True): - if span.is_recording(): - propagate.inject(properties.headers) - try: - publish_hook(span, body, properties) - except Exception as hook_exception: # pylint: disable=W0703 - _LOG.exception(hook_exception) + propagate.inject(properties.headers) + try: + publish_hook(span, body, properties) + except Exception as hook_exception: # pylint: disable=W0703 + _LOG.exception(hook_exception) retval = original_function( exchange, routing_key, body, properties, mandatory ) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py index 9b1aed7f49..ed33593389 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -292,7 +292,6 @@ def test_decorate_basic_publish( use_span.assert_called_once_with( get_span.return_value, end_on_exit=True ) - get_span.return_value.is_recording.assert_called_once() inject.assert_called_once_with(properties.headers) callback.assert_called_once_with( exchange_name, routing_key, mock_body, properties, False @@ -323,7 +322,6 @@ def test_decorate_basic_publish_no_properties( use_span.assert_called_once_with( get_span.return_value, end_on_exit=True ) - get_span.return_value.is_recording.assert_called_once() inject.assert_called_once_with(basic_properties.return_value.headers) self.assertEqual(retval, callback.return_value) @@ -393,7 +391,55 @@ def test_decorate_basic_publish_with_hook( use_span.assert_called_once_with( get_span.return_value, end_on_exit=True ) - get_span.return_value.is_recording.assert_called_once() + inject.assert_called_once_with(properties.headers) + publish_hook.assert_called_once_with( + get_span.return_value, mock_body, properties + ) + callback.assert_called_once_with( + exchange_name, routing_key, mock_body, properties, False + ) + self.assertEqual(retval, callback.return_value) + + @mock.patch("opentelemetry.instrumentation.pika.utils._get_span") + @mock.patch("opentelemetry.propagate.inject") + @mock.patch("opentelemetry.trace.use_span") + def test_decorate_basic_publish_when_span_is_not_recording( + self, + use_span: mock.MagicMock, + inject: mock.MagicMock, + get_span: mock.MagicMock, + ) -> None: + callback = mock.MagicMock() + tracer = mock.MagicMock() + channel = mock.MagicMock(spec=Channel) + exchange_name = "test-exchange" + routing_key = "test-routing-key" + properties = mock.MagicMock() + mock_body = b"mock_body" + publish_hook = mock.MagicMock() + + mocked_span = mock.MagicMock() + mocked_span.is_recording.return_value = False + get_span.return_value = mocked_span + + decorated_basic_publish = utils._decorate_basic_publish( + callback, channel, tracer, publish_hook + ) + retval = decorated_basic_publish( + exchange_name, routing_key, mock_body, properties + ) + get_span.assert_called_once_with( + tracer, + channel, + properties, + destination=exchange_name, + span_kind=SpanKind.PRODUCER, + task_name="(temporary)", + operation=None, + ) + use_span.assert_called_once_with( + get_span.return_value, end_on_exit=True + ) inject.assert_called_once_with(properties.headers) publish_hook.assert_called_once_with( get_span.return_value, mock_body, properties