From 5de1ccbfe296abeb79a46d3a895eaf34a758c62d Mon Sep 17 00:00:00 2001 From: Mateusz Soltysik Date: Thu, 17 Oct 2024 20:11:22 +0200 Subject: [PATCH] Fix memory leak in exporter and reader (#4224) --- CHANGELOG.md | 2 + .../sdk/metrics/_internal/__init__.py | 4 +- .../sdk/metrics/_internal/export/__init__.py | 7 +- .../test_provider_shutdown.py | 86 +++++++++++++++++++ .../tests/metrics/test_metrics.py | 4 +- 5 files changed, 98 insertions(+), 5 deletions(-) create mode 100644 opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py diff --git a/CHANGELOG.md b/CHANGELOG.md index bbe2ca994b..deef579cfa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4222](https://github.com/open-telemetry/opentelemetry-python/pull/4222)) - Record logger name as the instrumentation scope name ([#4208](https://github.com/open-telemetry/opentelemetry-python/pull/4208)) +- Fix memory leak in exporter and reader + ([#4224](https://github.com/open-telemetry/opentelemetry-python/pull/4224)) - Drop `OTEL_PYTHON_EXPERIMENTAL_DISABLE_PROMETHEUS_UNIT_NORMALIZATION` environment variable ([#4217](https://github.com/open-telemetry/opentelemetry-python/pull/4217)) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index 7740ba1b5f..1ef8dcfaa6 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import weakref from atexit import register, unregister from logging import getLogger from os import environ @@ -386,7 +386,7 @@ class MeterProvider(APIMeterProvider): """ _all_metric_readers_lock = Lock() - _all_metric_readers = set() + _all_metric_readers = weakref.WeakSet() def __init__( self, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index cd9aafaa19..89bb8f3eb7 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -14,6 +14,7 @@ import math import os +import weakref from abc import ABC, abstractmethod from enum import Enum from logging import getLogger @@ -488,7 +489,11 @@ def __init__( ) self._daemon_thread.start() if hasattr(os, "register_at_fork"): - os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access + weak_at_fork = weakref.WeakMethod(self._at_fork_reinit) + + os.register_at_fork( + after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda, protected-access + ) elif self._export_interval_millis <= 0: raise ValueError( f"interval value {self._export_interval_millis} is invalid \ diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py b/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py new file mode 100644 index 0000000000..1f4a16d7f6 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py @@ -0,0 +1,86 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gc +import time +import weakref +from typing import Sequence +from unittest import TestCase + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ( + Metric, + MetricExporter, + MetricExportResult, + PeriodicExportingMetricReader, +) + + +class FakeMetricsExporter(MetricExporter): + def __init__( + self, wait=0, preferred_temporality=None, preferred_aggregation=None + ): + self.wait = wait + self.metrics = [] + self._shutdown = False + super().__init__( + preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) + + def export( + self, + metrics_data: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: + time.sleep(self.wait) + self.metrics.extend(metrics_data) + return True + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + self._shutdown = True + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + return True + + +class TestMeterProviderShutdown(TestCase): + def test_meter_provider_shutdown_cleans_up_successfully(self): + def create_and_shutdown(): + exporter = FakeMetricsExporter() + exporter_wr = weakref.ref(exporter) + + reader = PeriodicExportingMetricReader(exporter) + reader_wr = weakref.ref(reader) + + provider = MeterProvider(metric_readers=[reader]) + provider_wr = weakref.ref(provider) + + provider.shutdown() + + return exporter_wr, reader_wr, provider_wr + + # When: the provider is shutdown + ( + exporter_weakref, + reader_weakref, + provider_weakref, + ) = create_and_shutdown() + gc.collect() + + # Then: the provider, exporter and reader should be garbage collected + self.assertIsNone(exporter_weakref()) + self.assertIsNone(reader_weakref()) + self.assertIsNone(provider_weakref()) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 400087774c..4ba0c2fde8 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -14,7 +14,7 @@ # pylint: disable=protected-access,no-self-use - +import weakref from logging import WARNING from time import sleep from typing import Iterable, Sequence @@ -66,7 +66,7 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: class TestMeterProvider(ConcurrencyTestBase, TestCase): def tearDown(self): - MeterProvider._all_metric_readers = set() + MeterProvider._all_metric_readers = weakref.WeakSet() @patch.object(Resource, "create") def test_init_default(self, resource_patch):