diff --git a/CHANGELOG.md b/CHANGELOG.md index e7f7bc6900..3006046690 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Fix metrics export with exemplar and no context and filtering observable instruments + ([#4251](https://github.com/open-telemetry/opentelemetry-python/pull/4251)) + ## Version 1.28.0/0.49b0 (2024-11-05) - Removed superfluous py.typed markers and added them where they were missing diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py index 9f2b27d5a5..0c2b153b3b 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py @@ -13,7 +13,7 @@ # limitations under the License. import logging from os import environ -from typing import Dict +from typing import Dict, List from opentelemetry.exporter.otlp.proto.common._internal import ( _encode_attributes, @@ -34,6 +34,7 @@ ) from opentelemetry.sdk.metrics import ( Counter, + Exemplar, Histogram, ObservableCounter, ObservableGauge, @@ -341,7 +342,7 @@ def _encode_metric(metric, pb2_metric): ) -def _encode_exemplars(sdk_exemplars: list) -> list: +def _encode_exemplars(sdk_exemplars: List[Exemplar]) -> List[pb2.Exemplar]: """ Converts a list of SDK Exemplars into a list of protobuf Exemplars. @@ -353,14 +354,26 @@ def _encode_exemplars(sdk_exemplars: list) -> list: """ pb_exemplars = [] for sdk_exemplar in sdk_exemplars: - pb_exemplar = pb2.Exemplar( - time_unix_nano=sdk_exemplar.time_unix_nano, - span_id=_encode_span_id(sdk_exemplar.span_id), - trace_id=_encode_trace_id(sdk_exemplar.trace_id), - filtered_attributes=_encode_attributes( - sdk_exemplar.filtered_attributes - ), - ) + if ( + sdk_exemplar.span_id is not None + and sdk_exemplar.trace_id is not None + ): + pb_exemplar = pb2.Exemplar( + time_unix_nano=sdk_exemplar.time_unix_nano, + span_id=_encode_span_id(sdk_exemplar.span_id), + trace_id=_encode_trace_id(sdk_exemplar.trace_id), + filtered_attributes=_encode_attributes( + sdk_exemplar.filtered_attributes + ), + ) + else: + pb_exemplar = pb2.Exemplar( + time_unix_nano=sdk_exemplar.time_unix_nano, + filtered_attributes=_encode_attributes( + sdk_exemplar.filtered_attributes + ), + ) + # Assign the value based on its type in the SDK exemplar if isinstance(sdk_exemplar.value, float): pb_exemplar.as_double = sdk_exemplar.value diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py index 44092bdc18..cdf1296485 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -# pylint: disable=protected-access +# pylint: disable=protected-access,too-many-lines import unittest from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import ( @@ -33,6 +33,7 @@ from opentelemetry.proto.resource.v1.resource_pb2 import ( Resource as OTLPResource, ) +from opentelemetry.sdk.metrics import Exemplar from opentelemetry.sdk.metrics.export import ( AggregationTemporality, Buckets, @@ -55,6 +56,9 @@ class TestOTLPMetricsEncoder(unittest.TestCase): + span_id = int("6e0c63257de34c92", 16) + trace_id = int("d4cda95b652f4a1592b449d5929fda1b", 16) + histogram = Metric( name="histogram", description="foo", @@ -65,6 +69,22 @@ class TestOTLPMetricsEncoder(unittest.TestCase): attributes={"a": 1, "b": True}, start_time_unix_nano=1641946016139533244, time_unix_nano=1641946016139533244, + exemplars=[ + Exemplar( + {"filtered": "banana"}, + 298.0, + 1641946016139533400, + span_id, + trace_id, + ), + Exemplar( + {"filtered": "banana"}, + 298.0, + 1641946016139533400, + None, + None, + ), + ], count=5, sum=67, bucket_counts=[1, 4], @@ -460,7 +480,34 @@ def test_encode_histogram(self): sum=67, bucket_counts=[1, 4], explicit_bounds=[10.0, 20.0], - exemplars=[], + exemplars=[ + pb2.Exemplar( + time_unix_nano=1641946016139533400, + as_double=298, + span_id=b"n\x0cc%}\xe3L\x92", + trace_id=b"\xd4\xcd\xa9[e/J\x15\x92\xb4I\xd5\x92\x9f\xda\x1b", + filtered_attributes=[ + KeyValue( + key="filtered", + value=AnyValue( + string_value="banana" + ), + ) + ], + ), + pb2.Exemplar( + time_unix_nano=1641946016139533400, + as_double=298, + filtered_attributes=[ + KeyValue( + key="filtered", + value=AnyValue( + string_value="banana" + ), + ) + ], + ), + ], max=18.0, min=8.0, ) @@ -563,7 +610,34 @@ def test_encode_multiple_scope_histogram(self): sum=67, bucket_counts=[1, 4], explicit_bounds=[10.0, 20.0], - exemplars=[], + exemplars=[ + pb2.Exemplar( + time_unix_nano=1641946016139533400, + as_double=298, + span_id=b"n\x0cc%}\xe3L\x92", + trace_id=b"\xd4\xcd\xa9[e/J\x15\x92\xb4I\xd5\x92\x9f\xda\x1b", + filtered_attributes=[ + KeyValue( + key="filtered", + value=AnyValue( + string_value="banana" + ), + ) + ], + ), + pb2.Exemplar( + time_unix_nano=1641946016139533400, + as_double=298, + filtered_attributes=[ + KeyValue( + key="filtered", + value=AnyValue( + string_value="banana" + ), + ) + ], + ), + ], max=18.0, min=8.0, ) @@ -598,7 +672,34 @@ def test_encode_multiple_scope_histogram(self): sum=67, bucket_counts=[1, 4], explicit_bounds=[10.0, 20.0], - exemplars=[], + exemplars=[ + pb2.Exemplar( + time_unix_nano=1641946016139533400, + as_double=298, + span_id=b"n\x0cc%}\xe3L\x92", + trace_id=b"\xd4\xcd\xa9[e/J\x15\x92\xb4I\xd5\x92\x9f\xda\x1b", + filtered_attributes=[ + KeyValue( + key="filtered", + value=AnyValue( + string_value="banana" + ), + ) + ], + ), + pb2.Exemplar( + time_unix_nano=1641946016139533400, + as_double=298, + filtered_attributes=[ + KeyValue( + key="filtered", + value=AnyValue( + string_value="banana" + ), + ) + ], + ), + ], max=18.0, min=8.0, ) @@ -640,7 +741,34 @@ def test_encode_multiple_scope_histogram(self): sum=67, bucket_counts=[1, 4], explicit_bounds=[10.0, 20.0], - exemplars=[], + exemplars=[ + pb2.Exemplar( + time_unix_nano=1641946016139533400, + as_double=298, + span_id=b"n\x0cc%}\xe3L\x92", + trace_id=b"\xd4\xcd\xa9[e/J\x15\x92\xb4I\xd5\x92\x9f\xda\x1b", + filtered_attributes=[ + KeyValue( + key="filtered", + value=AnyValue( + string_value="banana" + ), + ) + ], + ), + pb2.Exemplar( + time_unix_nano=1641946016139533400, + as_double=298, + filtered_attributes=[ + KeyValue( + key="filtered", + value=AnyValue( + string_value="banana" + ), + ) + ], + ), + ], max=18.0, min=8.0, ) @@ -682,7 +810,34 @@ def test_encode_multiple_scope_histogram(self): sum=67, bucket_counts=[1, 4], explicit_bounds=[10.0, 20.0], - exemplars=[], + exemplars=[ + pb2.Exemplar( + time_unix_nano=1641946016139533400, + as_double=298, + span_id=b"n\x0cc%}\xe3L\x92", + trace_id=b"\xd4\xcd\xa9[e/J\x15\x92\xb4I\xd5\x92\x9f\xda\x1b", + filtered_attributes=[ + KeyValue( + key="filtered", + value=AnyValue( + string_value="banana" + ), + ) + ], + ), + pb2.Exemplar( + time_unix_nano=1641946016139533400, + as_double=298, + filtered_attributes=[ + KeyValue( + key="filtered", + value=AnyValue( + string_value="banana" + ), + ) + ], + ), + ], max=18.0, min=8.0, ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py index d3199c69ab..95582e1601 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py @@ -46,5 +46,5 @@ class Exemplar: filtered_attributes: Attributes value: Union[int, float] time_unix_nano: int - span_id: Optional[str] = None - trace_id: Optional[str] = None + span_id: Optional[int] = None + trace_id: Optional[int] = None diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py index 1dcbfe47da..c8fa7f1453 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py @@ -77,8 +77,8 @@ def __init__(self) -> None: self.__value: Union[int, float] = 0 self.__attributes: Attributes = None self.__time_unix_nano: int = 0 - self.__span_id: Optional[str] = None - self.__trace_id: Optional[str] = None + self.__span_id: Optional[int] = None + self.__trace_id: Optional[int] = None self.__offered: bool = False def offer( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 2acbe1734c..c651033051 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -78,15 +78,17 @@ def __init__( ] = [] def consume_measurement(self, measurement: Measurement) -> None: + should_sample_exemplar = ( + self._sdk_config.exemplar_filter.should_sample( + measurement.value, + measurement.time_unix_nano, + measurement.attributes, + measurement.context, + ) + ) for reader_storage in self._reader_storages.values(): reader_storage.consume_measurement( - measurement, - self._sdk_config.exemplar_filter.should_sample( - measurement.value, - measurement.time_unix_nano, - measurement.attributes, - measurement.context, - ), + measurement, should_sample_exemplar ) def register_asynchronous_instrument( @@ -126,7 +128,17 @@ def collect( ) for measurement in measurements: - metric_reader_storage.consume_measurement(measurement) + should_sample_exemplar = ( + self._sdk_config.exemplar_filter.should_sample( + measurement.value, + measurement.time_unix_nano, + measurement.attributes, + measurement.context, + ) + ) + metric_reader_storage.consume_measurement( + measurement, should_sample_exemplar + ) result = self._reader_storages[metric_reader].collect() diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_exemplars.py b/opentelemetry-sdk/tests/metrics/integration_test/test_exemplars.py new file mode 100644 index 0000000000..c4dabe9209 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_exemplars.py @@ -0,0 +1,317 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +from unittest import TestCase, mock + +from opentelemetry import trace as trace_api +from opentelemetry.sdk.metrics import Exemplar, MeterProvider +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + InMemoryMetricReader, + Metric, + NumberDataPoint, + Sum, +) +from opentelemetry.trace import SpanContext, TraceFlags + + +class TestExemplars(TestCase): + TRACE_ID = int("d4cda95b652f4a1592b449d5929fda1b", 16) + SPAN_ID = int("6e0c63257de34c92", 16) + + @mock.patch.dict(os.environ, {"OTEL_METRICS_EXEMPLAR_FILTER": "always_on"}) + def test_always_on_exemplars(self): + reader = InMemoryMetricReader() + meter_provider = MeterProvider( + metric_readers=[reader], + ) + meter = meter_provider.get_meter("testmeter") + counter = meter.create_counter("testcounter") + counter.add(10, {"label": "value1"}) + data = reader.get_metrics_data() + metrics = data.resource_metrics[0].scope_metrics[0].metrics + self.assertEqual( + metrics, + [ + Metric( + name="testcounter", + description="", + unit="", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"label": "value1"}, + start_time_unix_nano=mock.ANY, + time_unix_nano=mock.ANY, + value=10, + exemplars=[ + Exemplar( + filtered_attributes={}, + value=10, + time_unix_nano=mock.ANY, + span_id=None, + trace_id=None, + ), + ], + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + ], + ) + + @mock.patch.dict( + os.environ, {"OTEL_METRICS_EXEMPLAR_FILTER": "trace_based"} + ) + def test_trace_based_exemplars(self): + span_context = SpanContext( + trace_id=self.TRACE_ID, + span_id=self.SPAN_ID, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state={}, + ) + span = trace_api.NonRecordingSpan(span_context) + trace_api.set_span_in_context(span) + reader = InMemoryMetricReader() + meter_provider = MeterProvider( + metric_readers=[reader], + ) + + meter = meter_provider.get_meter("testmeter") + counter = meter.create_counter("testcounter") + with trace_api.use_span(span): + counter.add(10, {"label": "value1"}) + data = reader.get_metrics_data() + metrics = data.resource_metrics[0].scope_metrics[0].metrics + self.assertEqual( + metrics, + [ + Metric( + name="testcounter", + description="", + unit="", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"label": "value1"}, + start_time_unix_nano=mock.ANY, + time_unix_nano=mock.ANY, + value=10, + exemplars=[ + Exemplar( + filtered_attributes={}, + value=10, + time_unix_nano=mock.ANY, + span_id=self.SPAN_ID, + trace_id=self.TRACE_ID, + ), + ], + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + ], + ) + + def test_default_exemplar_filter_no_span(self): + reader = InMemoryMetricReader() + meter_provider = MeterProvider( + metric_readers=[reader], + ) + + meter = meter_provider.get_meter("testmeter") + counter = meter.create_counter("testcounter") + counter.add(10, {"label": "value1"}) + data = reader.get_metrics_data() + metrics = data.resource_metrics[0].scope_metrics[0].metrics + self.assertEqual( + metrics, + [ + Metric( + name="testcounter", + description="", + unit="", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"label": "value1"}, + start_time_unix_nano=mock.ANY, + time_unix_nano=mock.ANY, + value=10, + exemplars=[], + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + ], + ) + + def test_default_exemplar_filter(self): + span_context = SpanContext( + trace_id=self.TRACE_ID, + span_id=self.SPAN_ID, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state={}, + ) + span = trace_api.NonRecordingSpan(span_context) + trace_api.set_span_in_context(span) + reader = InMemoryMetricReader() + meter_provider = MeterProvider( + metric_readers=[reader], + ) + + meter = meter_provider.get_meter("testmeter") + counter = meter.create_counter("testcounter") + with trace_api.use_span(span): + counter.add(10, {"label": "value1"}) + data = reader.get_metrics_data() + metrics = data.resource_metrics[0].scope_metrics[0].metrics + self.assertEqual( + metrics, + [ + Metric( + name="testcounter", + description="", + unit="", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"label": "value1"}, + start_time_unix_nano=mock.ANY, + time_unix_nano=mock.ANY, + value=10, + exemplars=[ + Exemplar( + filtered_attributes={}, + value=10, + time_unix_nano=mock.ANY, + span_id=self.SPAN_ID, + trace_id=self.TRACE_ID, + ), + ], + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + ], + ) + + def test_exemplar_trace_based_manual_context(self): + span_context = SpanContext( + trace_id=self.TRACE_ID, + span_id=self.SPAN_ID, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state={}, + ) + span = trace_api.NonRecordingSpan(span_context) + ctx = trace_api.set_span_in_context(span) + reader = InMemoryMetricReader() + meter_provider = MeterProvider( + metric_readers=[reader], + ) + + meter = meter_provider.get_meter("testmeter") + counter = meter.create_counter("testcounter") + counter.add(10, {"label": "value1"}, context=ctx) + data = reader.get_metrics_data() + metrics = data.resource_metrics[0].scope_metrics[0].metrics + self.assertEqual( + metrics, + [ + Metric( + name="testcounter", + description="", + unit="", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"label": "value1"}, + start_time_unix_nano=mock.ANY, + time_unix_nano=mock.ANY, + value=10, + exemplars=[ + Exemplar( + filtered_attributes={}, + value=10, + time_unix_nano=mock.ANY, + span_id=self.SPAN_ID, + trace_id=self.TRACE_ID, + ), + ], + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + ], + ) + + @mock.patch.dict( + os.environ, {"OTEL_METRICS_EXEMPLAR_FILTER": "always_off"} + ) + def test_always_off_exemplars(self): + span_context = SpanContext( + trace_id=self.TRACE_ID, + span_id=self.SPAN_ID, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state={}, + ) + span = trace_api.NonRecordingSpan(span_context) + trace_api.set_span_in_context(span) + reader = InMemoryMetricReader() + meter_provider = MeterProvider( + metric_readers=[reader], + ) + meter = meter_provider.get_meter("testmeter") + counter = meter.create_counter("testcounter") + with trace_api.use_span(span): + counter.add(10, {"label": "value1"}) + data = reader.get_metrics_data() + metrics = data.resource_metrics[0].scope_metrics[0].metrics + self.assertEqual( + metrics, + [ + Metric( + name="testcounter", + description="", + unit="", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"label": "value1"}, + start_time_unix_nano=mock.ANY, + time_unix_nano=mock.ANY, + value=10, + exemplars=[], + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + ], + ) diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index fe58ec4aca..22abfbd3cf 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -100,7 +100,7 @@ def test_collect_calls_async_instruments(self, MockMetricReaderStorage): MockMetricReaderStorage.return_value = reader_storage_mock consumer = SynchronousMeasurementConsumer( SdkConfiguration( - exemplar_filter=Mock(), + exemplar_filter=Mock(should_sample=Mock(return_value=False)), resource=Mock(), metric_readers=[reader_mock], views=Mock(), @@ -121,6 +121,9 @@ def test_collect_calls_async_instruments(self, MockMetricReaderStorage): self.assertEqual( len(reader_storage_mock.consume_measurement.mock_calls), 5 ) + # assert consume_measurement was called with at least 2 arguments the second + # matching the mocked exemplar filter + self.assertFalse(reader_storage_mock.consume_measurement.call_args[1]) def test_collect_timeout(self, MockMetricReaderStorage): reader_mock = Mock()