From d5c54e3abeafe5c2b81a85be5659ff38162f0a96 Mon Sep 17 00:00:00 2001 From: Jeevan Opel Date: Mon, 14 Oct 2024 20:45:20 -0700 Subject: [PATCH] Record logger name as the instrumentation scope name (#4208) --- CHANGELOG.md | 2 + .../logs/test_benchmark_logging_handler.py | 38 ++++++ .../sdk/_logs/_internal/__init__.py | 65 +++++++--- opentelemetry-sdk/tests/logs/test_export.py | 32 +++++ .../tests/logs/test_logger_provider_cache.py | 111 ++++++++++++++++++ 5 files changed, 231 insertions(+), 17 deletions(-) create mode 100644 opentelemetry-sdk/benchmarks/logs/test_benchmark_logging_handler.py create mode 100644 opentelemetry-sdk/tests/logs/test_logger_provider_cache.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 72721ef626a..5340583fdfc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4206](https://github.com/open-telemetry/opentelemetry-python/pull/4206)) - Update environment variable descriptions to match signal ([#4222](https://github.com/open-telemetry/opentelemetry-python/pull/4222)) +- Record logger name as the instrumentation scope name + ([#4208](https://github.com/open-telemetry/opentelemetry-python/pull/4208)) ## Version 1.27.0/0.48b0 (2024-08-28) diff --git a/opentelemetry-sdk/benchmarks/logs/test_benchmark_logging_handler.py b/opentelemetry-sdk/benchmarks/logs/test_benchmark_logging_handler.py new file mode 100644 index 00000000000..d1e8c4e39f6 --- /dev/null +++ b/opentelemetry-sdk/benchmarks/logs/test_benchmark_logging_handler.py @@ -0,0 +1,38 @@ +import logging + +import pytest + +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import ( + InMemoryLogExporter, + SimpleLogRecordProcessor, +) + + +def _set_up_logging_handler(level): + logger_provider = LoggerProvider() + exporter = InMemoryLogExporter() + processor = SimpleLogRecordProcessor(exporter=exporter) + logger_provider.add_log_record_processor(processor) + handler = LoggingHandler(level=level, logger_provider=logger_provider) + return handler + + +def _create_logger(handler, name): + logger = logging.getLogger(name) + logger.addHandler(handler) + return logger + + +@pytest.mark.parametrize("num_loggers", [1, 10, 100, 1000]) +def test_simple_get_logger_different_names(benchmark, num_loggers): + handler = _set_up_logging_handler(level=logging.DEBUG) + loggers = [ + _create_logger(handler, str(f"logger_{i}")) for i in range(num_loggers) + ] + + def benchmark_get_logger(): + for index in range(1000): + loggers[index % num_loggers].warning("test message") + + benchmark(benchmark_get_logger) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py index b7e56eda388..dc054f50e32 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py @@ -21,6 +21,7 @@ import traceback import warnings from os import environ +from threading import Lock from time import time_ns from typing import Any, Callable, Optional, Tuple, Union # noqa @@ -471,9 +472,6 @@ def __init__( ) -> None: super().__init__(level=level) self._logger_provider = logger_provider or get_logger_provider() - self._logger = get_logger( - __name__, logger_provider=self._logger_provider - ) @staticmethod def _get_attributes(record: logging.LogRecord) -> Attributes: @@ -558,6 +556,7 @@ def _translate(self, record: logging.LogRecord) -> LogRecord: "WARN" if record.levelname == "WARNING" else record.levelname ) + logger = get_logger(record.name, logger_provider=self._logger_provider) return LogRecord( timestamp=timestamp, observed_timestamp=observered_timestamp, @@ -567,7 +566,7 @@ def _translate(self, record: logging.LogRecord) -> LogRecord: severity_text=level_name, severity_number=severity_number, body=body, - resource=self._logger.resource, + resource=logger.resource, attributes=attributes, ) @@ -577,14 +576,17 @@ def emit(self, record: logging.LogRecord) -> None: The record is translated to OTel format, and then sent across the pipeline. """ - if not isinstance(self._logger, NoOpLogger): - self._logger.emit(self._translate(record)) + logger = get_logger(record.name, logger_provider=self._logger_provider) + if not isinstance(logger, NoOpLogger): + logger.emit(self._translate(record)) def flush(self) -> None: """ - Flushes the logging output. Skip flushing if logger is NoOp. + Flushes the logging output. Skip flushing if logging_provider has no force_flush method. """ - if not isinstance(self._logger, NoOpLogger): + if hasattr(self._logger_provider, "force_flush") and callable( + self._logger_provider.force_flush + ): self._logger_provider.force_flush() @@ -642,26 +644,20 @@ def __init__( self._at_exit_handler = None if shutdown_on_exit: self._at_exit_handler = atexit.register(self.shutdown) + self._logger_cache = {} + self._logger_cache_lock = Lock() @property def resource(self): return self._resource - def get_logger( + def _get_logger_no_cache( self, name: str, version: Optional[str] = None, schema_url: Optional[str] = None, attributes: Optional[Attributes] = None, ) -> Logger: - if self._disabled: - _logger.warning("SDK is disabled.") - return NoOpLogger( - name, - version=version, - schema_url=schema_url, - attributes=attributes, - ) return Logger( self._resource, self._multi_log_record_processor, @@ -673,6 +669,41 @@ def get_logger( ), ) + def _get_logger_cached( + self, + name: str, + version: Optional[str] = None, + schema_url: Optional[str] = None, + ) -> Logger: + with self._logger_cache_lock: + key = (name, version, schema_url) + if key in self._logger_cache: + return self._logger_cache[key] + + self._logger_cache[key] = self._get_logger_no_cache( + name, version, schema_url + ) + return self._logger_cache[key] + + def get_logger( + self, + name: str, + version: Optional[str] = None, + schema_url: Optional[str] = None, + attributes: Optional[Attributes] = None, + ) -> Logger: + if self._disabled: + _logger.warning("SDK is disabled.") + return NoOpLogger( + name, + version=version, + schema_url=schema_url, + attributes=attributes, + ) + if attributes is None: + return self._get_logger_cached(name, version, schema_url) + return self._get_logger_no_cache(name, version, schema_url, attributes) + def add_log_record_processor( self, log_record_processor: LogRecordProcessor ): diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 0998ecfa5d6..ce31d3991fc 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -71,6 +71,9 @@ def test_simple_log_record_processor_default_level(self): self.assertEqual( warning_log_record.severity_number, SeverityNumber.WARN ) + self.assertEqual( + finished_logs[0].instrumentation_scope.name, "default_level" + ) def test_simple_log_record_processor_custom_level(self): exporter = InMemoryLogExporter() @@ -104,6 +107,12 @@ def test_simple_log_record_processor_custom_level(self): self.assertEqual( fatal_log_record.severity_number, SeverityNumber.FATAL ) + self.assertEqual( + finished_logs[0].instrumentation_scope.name, "custom_level" + ) + self.assertEqual( + finished_logs[1].instrumentation_scope.name, "custom_level" + ) def test_simple_log_record_processor_trace_correlation(self): exporter = InMemoryLogExporter() @@ -129,6 +138,9 @@ def test_simple_log_record_processor_trace_correlation(self): self.assertEqual( log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags ) + self.assertEqual( + finished_logs[0].instrumentation_scope.name, "trace_correlation" + ) exporter.clear() tracer = trace.TracerProvider().get_tracer(__name__) @@ -140,6 +152,10 @@ def test_simple_log_record_processor_trace_correlation(self): self.assertEqual(log_record.body, "Critical message within span") self.assertEqual(log_record.severity_text, "CRITICAL") self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) + self.assertEqual( + finished_logs[0].instrumentation_scope.name, + "trace_correlation", + ) span_context = span.get_span_context() self.assertEqual(log_record.trace_id, span_context.trace_id) self.assertEqual(log_record.span_id, span_context.span_id) @@ -166,6 +182,9 @@ def test_simple_log_record_processor_shutdown(self): self.assertEqual( warning_log_record.severity_number, SeverityNumber.WARN ) + self.assertEqual( + finished_logs[0].instrumentation_scope.name, "shutdown" + ) exporter.clear() logger_provider.shutdown() with self.assertLogs(level=logging.WARNING): @@ -206,6 +225,10 @@ def test_simple_log_record_processor_different_msg_types(self): for item in finished_logs ] self.assertEqual(expected, emitted) + for item in finished_logs: + self.assertEqual( + item.instrumentation_scope.name, "different_msg_types" + ) def test_simple_log_record_processor_different_msg_types_with_formatter( self, @@ -428,6 +451,8 @@ def test_shutdown(self): for item in finished_logs ] self.assertEqual(expected, emitted) + for item in finished_logs: + self.assertEqual(item.instrumentation_scope.name, "shutdown") def test_force_flush(self): exporter = InMemoryLogExporter() @@ -447,6 +472,9 @@ def test_force_flush(self): log_record = finished_logs[0].log_record self.assertEqual(log_record.body, "Earth is burning") self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) + self.assertEqual( + finished_logs[0].instrumentation_scope.name, "force_flush" + ) def test_log_record_processor_too_many_logs(self): exporter = InMemoryLogExporter() @@ -465,6 +493,8 @@ def test_log_record_processor_too_many_logs(self): self.assertTrue(log_record_processor.force_flush()) finised_logs = exporter.get_finished_logs() self.assertEqual(len(finised_logs), 1000) + for item in finised_logs: + self.assertEqual(item.instrumentation_scope.name, "many_logs") def test_with_multiple_threads(self): exporter = InMemoryLogExporter() @@ -492,6 +522,8 @@ def bulk_log_and_flush(num_logs): finished_logs = exporter.get_finished_logs() self.assertEqual(len(finished_logs), 2415) + for item in finished_logs: + self.assertEqual(item.instrumentation_scope.name, "threads") @unittest.skipUnless( hasattr(os, "fork"), diff --git a/opentelemetry-sdk/tests/logs/test_logger_provider_cache.py b/opentelemetry-sdk/tests/logs/test_logger_provider_cache.py new file mode 100644 index 00000000000..920c6679044 --- /dev/null +++ b/opentelemetry-sdk/tests/logs/test_logger_provider_cache.py @@ -0,0 +1,111 @@ +import logging +import unittest + +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import ( + InMemoryLogExporter, + SimpleLogRecordProcessor, +) + + +def set_up_logging_handler(level): + logger_provider = LoggerProvider() + exporter = InMemoryLogExporter() + processor = SimpleLogRecordProcessor(exporter=exporter) + logger_provider.add_log_record_processor(processor) + handler = LoggingHandler(level=level, logger_provider=logger_provider) + return handler, logger_provider + + +def create_logger(handler, name): + logger = logging.getLogger(name) + logger.addHandler(handler) + return logger + + +class TestLoggerProviderCache(unittest.TestCase): + + def test_get_logger_single_handler(self): + handler, logger_provider = set_up_logging_handler(level=logging.DEBUG) + # pylint: disable=protected-access + logger_cache = logger_provider._logger_cache + logger = create_logger(handler, "test_logger") + + # Ensure logger is lazily cached + self.assertEqual(0, len(logger_cache)) + + with self.assertLogs(level=logging.WARNING): + logger.warning("test message") + + self.assertEqual(1, len(logger_cache)) + + # Ensure only one logger is cached + with self.assertLogs(level=logging.WARNING): + rounds = 100 + for _ in range(rounds): + logger.warning("test message") + + self.assertEqual(1, len(logger_cache)) + + def test_get_logger_multiple_loggers(self): + handler, logger_provider = set_up_logging_handler(level=logging.DEBUG) + # pylint: disable=protected-access + logger_cache = logger_provider._logger_cache + + num_loggers = 10 + loggers = [create_logger(handler, str(i)) for i in range(num_loggers)] + + # Ensure loggers are lazily cached + self.assertEqual(0, len(logger_cache)) + + with self.assertLogs(level=logging.WARNING): + for logger in loggers: + logger.warning("test message") + + self.assertEqual(num_loggers, len(logger_cache)) + + with self.assertLogs(level=logging.WARNING): + rounds = 100 + for _ in range(rounds): + for logger in loggers: + logger.warning("test message") + + self.assertEqual(num_loggers, len(logger_cache)) + + def test_provider_get_logger_no_cache(self): + _, logger_provider = set_up_logging_handler(level=logging.DEBUG) + # pylint: disable=protected-access + logger_cache = logger_provider._logger_cache + + logger_provider.get_logger( + name="test_logger", + version="version", + schema_url="schema_url", + attributes={"key": "value"}, + ) + + # Ensure logger is not cached if attributes is set + self.assertEqual(0, len(logger_cache)) + + def test_provider_get_logger_cached(self): + _, logger_provider = set_up_logging_handler(level=logging.DEBUG) + # pylint: disable=protected-access + logger_cache = logger_provider._logger_cache + + logger_provider.get_logger( + name="test_logger", + version="version", + schema_url="schema_url", + ) + + # Ensure only one logger is cached + self.assertEqual(1, len(logger_cache)) + + logger_provider.get_logger( + name="test_logger", + version="version", + schema_url="schema_url", + ) + + # Ensure only one logger is cached + self.assertEqual(1, len(logger_cache))