Skip to content

Commit

Permalink
Fix memory leak in exporter and reader (#4224)
Browse files Browse the repository at this point in the history
  • Loading branch information
msoltysik authored Oct 17, 2024
1 parent 679297f commit 5de1ccb
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4222](https://github.com/open-telemetry/opentelemetry-python/pull/4222))
- Record logger name as the instrumentation scope name
([#4208](https://github.com/open-telemetry/opentelemetry-python/pull/4208))
- Fix memory leak in exporter and reader
([#4224](https://github.com/open-telemetry/opentelemetry-python/pull/4224))
- Drop `OTEL_PYTHON_EXPERIMENTAL_DISABLE_PROMETHEUS_UNIT_NORMALIZATION` environment variable
([#4217](https://github.com/open-telemetry/opentelemetry-python/pull/4217))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import weakref
from atexit import register, unregister
from logging import getLogger
from os import environ
Expand Down Expand Up @@ -386,7 +386,7 @@ class MeterProvider(APIMeterProvider):
"""

_all_metric_readers_lock = Lock()
_all_metric_readers = set()
_all_metric_readers = weakref.WeakSet()

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import math
import os
import weakref
from abc import ABC, abstractmethod
from enum import Enum
from logging import getLogger
Expand Down Expand Up @@ -488,7 +489,11 @@ def __init__(
)
self._daemon_thread.start()
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
weak_at_fork = weakref.WeakMethod(self._at_fork_reinit)

os.register_at_fork(
after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda, protected-access
)
elif self._export_interval_millis <= 0:
raise ValueError(
f"interval value {self._export_interval_millis} is invalid \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import gc
import time
import weakref
from typing import Sequence
from unittest import TestCase

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
Metric,
MetricExporter,
MetricExportResult,
PeriodicExportingMetricReader,
)


class FakeMetricsExporter(MetricExporter):
def __init__(
self, wait=0, preferred_temporality=None, preferred_aggregation=None
):
self.wait = wait
self.metrics = []
self._shutdown = False
super().__init__(
preferred_temporality=preferred_temporality,
preferred_aggregation=preferred_aggregation,
)

def export(
self,
metrics_data: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
time.sleep(self.wait)
self.metrics.extend(metrics_data)
return True

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self._shutdown = True

def force_flush(self, timeout_millis: float = 10_000) -> bool:
return True


class TestMeterProviderShutdown(TestCase):
def test_meter_provider_shutdown_cleans_up_successfully(self):
def create_and_shutdown():
exporter = FakeMetricsExporter()
exporter_wr = weakref.ref(exporter)

reader = PeriodicExportingMetricReader(exporter)
reader_wr = weakref.ref(reader)

provider = MeterProvider(metric_readers=[reader])
provider_wr = weakref.ref(provider)

provider.shutdown()

return exporter_wr, reader_wr, provider_wr

# When: the provider is shutdown
(
exporter_weakref,
reader_weakref,
provider_weakref,
) = create_and_shutdown()
gc.collect()

# Then: the provider, exporter and reader should be garbage collected
self.assertIsNone(exporter_weakref())
self.assertIsNone(reader_weakref())
self.assertIsNone(provider_weakref())
4 changes: 2 additions & 2 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# pylint: disable=protected-access,no-self-use


import weakref
from logging import WARNING
from time import sleep
from typing import Iterable, Sequence
Expand Down Expand Up @@ -66,7 +66,7 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:

class TestMeterProvider(ConcurrencyTestBase, TestCase):
def tearDown(self):
MeterProvider._all_metric_readers = set()
MeterProvider._all_metric_readers = weakref.WeakSet()

@patch.object(Resource, "create")
def test_init_default(self, resource_patch):
Expand Down

0 comments on commit 5de1ccb

Please sign in to comment.