From 1f728133cb9102b3757fd3b0148745f40498c60b Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 22 Feb 2023 21:42:05 +0100 Subject: [PATCH 1/2] metrics collector with queue len & wait time reporting --- .gitignore | 1 + src/waitress/adjustments.py | 4 +++ src/waitress/observability.py | 25 +++++++++++++++ src/waitress/server.py | 8 ++--- src/waitress/task.py | 20 +++++++++++- tests/test_adjustments.py | 3 ++ tests/test_server.py | 12 +++++++ tests/test_task.py | 59 +++++++++++++++++++++++++++++++++-- 8 files changed, 125 insertions(+), 7 deletions(-) create mode 100644 src/waitress/observability.py diff --git a/.gitignore b/.gitignore index 146736ff..d5cda880 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ build/ coverage.xml docs/_themes docs/_build +*.swp diff --git a/src/waitress/adjustments.py b/src/waitress/adjustments.py index 9eb17947..8bd4c1a3 100644 --- a/src/waitress/adjustments.py +++ b/src/waitress/adjustments.py @@ -133,6 +133,7 @@ class Adjustments: ("sockets", as_socket_list), ("channel_request_lookahead", int), ("server_name", str), + ("metrics_collector", lambda x: x), ) _param_map = dict(_params) @@ -289,6 +290,9 @@ class Adjustments: # (or when using the Proxy settings, without forwarding a Host header) server_name = "waitress.invalid" + # Metrics collector. This allow to provide the listener that collects metrics + metrics_collector = None + def __init__(self, **kw): if "listen" in kw and ("host" in kw or "port" in kw): diff --git a/src/waitress/observability.py b/src/waitress/observability.py new file mode 100644 index 00000000..80505849 --- /dev/null +++ b/src/waitress/observability.py @@ -0,0 +1,25 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Foundation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Interfaces for metrics collection +""" + +from typing import Protocol + + +class TasksMetricsCollector(Protocol): + def report_queue_length(self, lenght): + ... # pragma: no cover + + def report_task_wait_time(self, wait_time_ns): + ... # pragma: no cover diff --git a/src/waitress/server.py b/src/waitress/server.py index bbb68191..32a43793 100644 --- a/src/waitress/server.py +++ b/src/waitress/server.py @@ -34,7 +34,7 @@ def create_server( _start=True, # test shim _sock=None, # test shim _dispatcher=None, # test shim - **kw # adjustments + **kw, # adjustments ): """ if __name__ == '__main__': @@ -53,7 +53,7 @@ def create_server( dispatcher = _dispatcher if dispatcher is None: - dispatcher = ThreadedTaskDispatcher() + dispatcher = ThreadedTaskDispatcher(metrics_collector=adj.metrics_collector) dispatcher.set_thread_count(adj.threads) if adj.unix_socket and hasattr(socket, "AF_UNIX"): @@ -194,7 +194,7 @@ def __init__( adj=None, # adjustments sockinfo=None, # opaque object bind_socket=True, - **kw + **kw, ): if adj is None: adj = Adjustments(**kw) @@ -388,7 +388,7 @@ def __init__( dispatcher=None, # dispatcher adj=None, # adjustments sockinfo=None, # opaque object - **kw + **kw, ): if sockinfo is None: sockinfo = (socket.AF_UNIX, socket.SOCK_STREAM, None, None) diff --git a/src/waitress/task.py b/src/waitress/task.py index 956c0c0f..bd659245 100644 --- a/src/waitress/task.py +++ b/src/waitress/task.py @@ -47,13 +47,16 @@ class ThreadedTaskDispatcher: active_count = 0 # Number of currently active threads logger = logger queue_logger = queue_logger + metrics_collector = None - def __init__(self): + def __init__(self, metrics_collector=None): self.threads = set() self.queue = deque() self.lock = threading.Lock() self.queue_cv = threading.Condition(self.lock) self.thread_exit_cv = threading.Condition(self.lock) + self.metrics_collector = metrics_collector + self.report_queue_length(0) def start_new_thread(self, target, thread_no): t = threading.Thread( @@ -80,6 +83,8 @@ def handler_thread(self, thread_no): break task = self.queue.popleft() + self.report_queue_length(len(self.queue)) + self.report_task_wait_time(task) try: task.service() except BaseException: @@ -109,6 +114,7 @@ def add_task(self, task): self.queue.append(task) self.queue_cv.notify() queue_size = len(self.queue) + self.report_queue_length(queue_size) idle_threads = len(self.threads) - self.stop_count - self.active_count if queue_size > idle_threads: self.queue_logger.warning( @@ -138,11 +144,22 @@ def shutdown(self, cancel_pending=True, timeout=5): return True return False + def report_queue_length(self, length): + if self.metrics_collector: + self.metrics_collector.report_queue_length(length) + + def report_task_wait_time(self, task): + if self.metrics_collector: + self.metrics_collector.report_task_wait_time( + time.time_ns() - task.create_time_ns + ) + class Task: close_on_finish = False status = "200 OK" wrote_header = False + create_time_ns = 0 start_time = 0 content_length = None content_bytes_written = 0 @@ -161,6 +178,7 @@ def __init__(self, channel, request): # fall back to a version we support. version = "1.0" self.version = version + self.create_time_ns = time.time_ns() def service(self): try: diff --git a/tests/test_adjustments.py b/tests/test_adjustments.py index cbbb0063..09b26b24 100644 --- a/tests/test_adjustments.py +++ b/tests/test_adjustments.py @@ -107,6 +107,7 @@ def _makeOne(self, **kw): return Adjustments(**kw) def test_goodvars(self): + test_metrics_collector = object() inst = self._makeOne( host="localhost", port="8080", @@ -135,6 +136,7 @@ def test_goodvars(self): url_prefix="///foo/", ipv4=True, ipv6=False, + metrics_collector=test_metrics_collector, ) self.assertEqual(inst.host, "localhost") @@ -164,6 +166,7 @@ def test_goodvars(self): self.assertEqual(inst.url_prefix, "/foo") self.assertEqual(inst.ipv4, True) self.assertEqual(inst.ipv6, False) + self.assertEqual(inst.metrics_collector, test_metrics_collector) bind_pairs = [ sockaddr[:2] diff --git a/tests/test_server.py b/tests/test_server.py index 6edc3b24..f3b5bea3 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -17,6 +17,7 @@ def _makeOne( _start=True, _sock=None, _server=None, + metrics_collector=None, ): from waitress.server import create_server @@ -25,6 +26,7 @@ def _makeOne( host=host, port=port, map=map, + metrics_collector=metrics_collector, _dispatcher=_dispatcher, _start=_start, _sock=_sock, @@ -109,6 +111,16 @@ def test_ctor_makes_dispatcher(self): inst.task_dispatcher.__class__.__name__, "ThreadedTaskDispatcher" ) + def test_ctor_passes_metrics_collector_to_dispatcher(self): + from waitress.observability import TasksMetricsCollector + + class DummyMetricsCollector(TasksMetricsCollector): + pass + + metrics_collector = DummyMetricsCollector() + inst = self._makeOne(_start=False, map={}, metrics_collector=metrics_collector) + self.assertEqual(inst.task_dispatcher.metrics_collector, metrics_collector) + def test_ctor_start_false(self): inst = self._makeOneWithMap(_start=False) self.assertEqual(inst.accepting, False) diff --git a/tests/test_task.py b/tests/test_task.py index 47868e15..17aa31e9 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -1,12 +1,13 @@ import io +import time import unittest class TestThreadedTaskDispatcher(unittest.TestCase): - def _makeOne(self): + def _makeOne(self, **kwargs): from waitress.task import ThreadedTaskDispatcher - return ThreadedTaskDispatcher() + return ThreadedTaskDispatcher(**kwargs) def test_handler_thread_task_raises(self): inst = self._makeOne() @@ -100,6 +101,53 @@ def test_shutdown_no_cancel_pending(self): inst = self._makeOne() self.assertEqual(inst.shutdown(cancel_pending=False, timeout=0.01), False) + def _make_test_collector(self): + from waitress.observability import TasksMetricsCollector + + class TestMetricsCollector(TasksMetricsCollector): + def __init__(self): + self.queue_lengths = [] + self.wait_times = [] + + def report_queue_length(self, length): + self.queue_lengths.append(length) + + def report_task_wait_time(self, wait_time_ns): + self.wait_times.append(wait_time_ns) + + return TestMetricsCollector() + + def test_add_task_reports_length_to_metrics_collector(self): + test_collector = self._make_test_collector() + inst = self._makeOne(metrics_collector=test_collector) + + self.assertEqual(test_collector.queue_lengths, [0]) + + inst.add_task(DummyTask()) + + self.assertEqual(test_collector.queue_lengths, [0, 1]) + + def test_task_start_reports_length_and_wait_to_metrics_collector(self): + class StopLoopDummyTask(DummyTask): + def service(self): + super().service() + inst.stop_count += 1 + raise Exception + + test_collector = self._make_test_collector() + inst = self._makeOne(metrics_collector=test_collector) + + inst.active_count += 1 + inst.add_task(StopLoopDummyTask()) + + self.assertEqual(test_collector.wait_times, []) + self.assertEqual(test_collector.queue_lengths, [0, 1]) + + inst.handler_thread(0) + + self.assertEqual(test_collector.queue_lengths, [0, 1, 0]) + self.assertEqual(len(test_collector.wait_times), 1) + class TestTask(unittest.TestCase): def _makeOne(self, channel=None, request=None): @@ -117,6 +165,10 @@ def test_ctor_version_not_in_known(self): inst = self._makeOne(request=request) self.assertEqual(inst.version, "1.0") + def test_ctor_captures_creation_time(self): + inst = self._makeOne() + self.assertLessEqual(inst.create_time_ns, time.time_ns()) + def test_build_response_header_bad_http_version(self): inst = self._makeOne() inst.request = DummyParser() @@ -931,6 +983,9 @@ class DummyTask: serviced = False cancelled = False + def __init__(self): + self.create_time_ns = time.time_ns() + def service(self): self.serviced = True From bf7f5608097cc8bc74853b7b7eb2497e56cfbe89 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Fri, 24 Feb 2023 14:58:18 +0100 Subject: [PATCH 2/2] Fixes the issue in the original implementation --- src/waitress/task.py | 14 ++++++-------- tests/test_task.py | 14 +++++--------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/src/waitress/task.py b/src/waitress/task.py index bd659245..b19515cd 100644 --- a/src/waitress/task.py +++ b/src/waitress/task.py @@ -82,9 +82,9 @@ def handler_thread(self, thread_no): self.thread_exit_cv.notify() break - task = self.queue.popleft() + task, create_time_ns = self.queue.popleft() self.report_queue_length(len(self.queue)) - self.report_task_wait_time(task) + self.report_task_wait_time(task, create_time_ns) try: task.service() except BaseException: @@ -111,7 +111,7 @@ def set_thread_count(self, count): def add_task(self, task): with self.lock: - self.queue.append(task) + self.queue.append((task, time.time_ns())) self.queue_cv.notify() queue_size = len(self.queue) self.report_queue_length(queue_size) @@ -138,7 +138,7 @@ def shutdown(self, cancel_pending=True, timeout=5): if len(queue) > 0: self.logger.warning("Canceling %d pending task(s)", len(queue)) while queue: - task = queue.popleft() + task, _ = queue.popleft() task.cancel() self.queue_cv.notify_all() return True @@ -148,10 +148,10 @@ def report_queue_length(self, length): if self.metrics_collector: self.metrics_collector.report_queue_length(length) - def report_task_wait_time(self, task): + def report_task_wait_time(self, task, create_time_ns): if self.metrics_collector: self.metrics_collector.report_task_wait_time( - time.time_ns() - task.create_time_ns + time.time_ns() - create_time_ns ) @@ -159,7 +159,6 @@ class Task: close_on_finish = False status = "200 OK" wrote_header = False - create_time_ns = 0 start_time = 0 content_length = None content_bytes_written = 0 @@ -178,7 +177,6 @@ def __init__(self, channel, request): # fall back to a version we support. version = "1.0" self.version = version - self.create_time_ns = time.time_ns() def service(self): try: diff --git a/tests/test_task.py b/tests/test_task.py index 17aa31e9..5d5c2174 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -9,6 +9,9 @@ def _makeOne(self, **kwargs): return ThreadedTaskDispatcher(**kwargs) + def _append_task(self, dispatcher, task): + dispatcher.queue.append((task, time.time_ns())) + def test_handler_thread_task_raises(self): inst = self._makeOne() inst.threads.add(0) @@ -22,7 +25,7 @@ def service(self): task = BadDummyTask() inst.logger = DummyLogger() - inst.queue.append(task) + self._append_task(inst, task) inst.active_count += 1 inst.handler_thread(0) self.assertEqual(inst.stop_count, 0) @@ -82,7 +85,7 @@ def test_shutdown_one_thread(self): inst.threads.add(0) inst.logger = DummyLogger() task = DummyTask() - inst.queue.append(task) + self._append_task(inst, task) self.assertEqual(inst.shutdown(timeout=0.01), True) self.assertEqual( inst.logger.logged, @@ -165,10 +168,6 @@ def test_ctor_version_not_in_known(self): inst = self._makeOne(request=request) self.assertEqual(inst.version, "1.0") - def test_ctor_captures_creation_time(self): - inst = self._makeOne() - self.assertLessEqual(inst.create_time_ns, time.time_ns()) - def test_build_response_header_bad_http_version(self): inst = self._makeOne() inst.request = DummyParser() @@ -983,9 +982,6 @@ class DummyTask: serviced = False cancelled = False - def __init__(self): - self.create_time_ns = time.time_ns() - def service(self): self.serviced = True