diff --git a/.gitignore b/.gitignore index 146736ff..d5cda880 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ build/ coverage.xml docs/_themes docs/_build +*.swp diff --git a/src/waitress/adjustments.py b/src/waitress/adjustments.py index 9eb17947..8bd4c1a3 100644 --- a/src/waitress/adjustments.py +++ b/src/waitress/adjustments.py @@ -133,6 +133,7 @@ class Adjustments: ("sockets", as_socket_list), ("channel_request_lookahead", int), ("server_name", str), + ("metrics_collector", lambda x: x), ) _param_map = dict(_params) @@ -289,6 +290,9 @@ class Adjustments: # (or when using the Proxy settings, without forwarding a Host header) server_name = "waitress.invalid" + # Metrics collector. This allow to provide the listener that collects metrics + metrics_collector = None + def __init__(self, **kw): if "listen" in kw and ("host" in kw or "port" in kw): diff --git a/src/waitress/observability.py b/src/waitress/observability.py new file mode 100644 index 00000000..80505849 --- /dev/null +++ b/src/waitress/observability.py @@ -0,0 +1,25 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Foundation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Interfaces for metrics collection +""" + +from typing import Protocol + + +class TasksMetricsCollector(Protocol): + def report_queue_length(self, lenght): + ... # pragma: no cover + + def report_task_wait_time(self, wait_time_ns): + ... # pragma: no cover diff --git a/src/waitress/server.py b/src/waitress/server.py index bbb68191..32a43793 100644 --- a/src/waitress/server.py +++ b/src/waitress/server.py @@ -34,7 +34,7 @@ def create_server( _start=True, # test shim _sock=None, # test shim _dispatcher=None, # test shim - **kw # adjustments + **kw, # adjustments ): """ if __name__ == '__main__': @@ -53,7 +53,7 @@ def create_server( dispatcher = _dispatcher if dispatcher is None: - dispatcher = ThreadedTaskDispatcher() + dispatcher = ThreadedTaskDispatcher(metrics_collector=adj.metrics_collector) dispatcher.set_thread_count(adj.threads) if adj.unix_socket and hasattr(socket, "AF_UNIX"): @@ -194,7 +194,7 @@ def __init__( adj=None, # adjustments sockinfo=None, # opaque object bind_socket=True, - **kw + **kw, ): if adj is None: adj = Adjustments(**kw) @@ -388,7 +388,7 @@ def __init__( dispatcher=None, # dispatcher adj=None, # adjustments sockinfo=None, # opaque object - **kw + **kw, ): if sockinfo is None: sockinfo = (socket.AF_UNIX, socket.SOCK_STREAM, None, None) diff --git a/src/waitress/task.py b/src/waitress/task.py index 956c0c0f..b19515cd 100644 --- a/src/waitress/task.py +++ b/src/waitress/task.py @@ -47,13 +47,16 @@ class ThreadedTaskDispatcher: active_count = 0 # Number of currently active threads logger = logger queue_logger = queue_logger + metrics_collector = None - def __init__(self): + def __init__(self, metrics_collector=None): self.threads = set() self.queue = deque() self.lock = threading.Lock() self.queue_cv = threading.Condition(self.lock) self.thread_exit_cv = threading.Condition(self.lock) + self.metrics_collector = metrics_collector + self.report_queue_length(0) def start_new_thread(self, target, thread_no): t = threading.Thread( @@ -79,7 +82,9 @@ def handler_thread(self, thread_no): self.thread_exit_cv.notify() break - task = self.queue.popleft() + task, create_time_ns = self.queue.popleft() + self.report_queue_length(len(self.queue)) + self.report_task_wait_time(task, create_time_ns) try: task.service() except BaseException: @@ -106,9 +111,10 @@ def set_thread_count(self, count): def add_task(self, task): with self.lock: - self.queue.append(task) + self.queue.append((task, time.time_ns())) self.queue_cv.notify() queue_size = len(self.queue) + self.report_queue_length(queue_size) idle_threads = len(self.threads) - self.stop_count - self.active_count if queue_size > idle_threads: self.queue_logger.warning( @@ -132,12 +138,22 @@ def shutdown(self, cancel_pending=True, timeout=5): if len(queue) > 0: self.logger.warning("Canceling %d pending task(s)", len(queue)) while queue: - task = queue.popleft() + task, _ = queue.popleft() task.cancel() self.queue_cv.notify_all() return True return False + def report_queue_length(self, length): + if self.metrics_collector: + self.metrics_collector.report_queue_length(length) + + def report_task_wait_time(self, task, create_time_ns): + if self.metrics_collector: + self.metrics_collector.report_task_wait_time( + time.time_ns() - create_time_ns + ) + class Task: close_on_finish = False diff --git a/tests/test_adjustments.py b/tests/test_adjustments.py index cbbb0063..09b26b24 100644 --- a/tests/test_adjustments.py +++ b/tests/test_adjustments.py @@ -107,6 +107,7 @@ def _makeOne(self, **kw): return Adjustments(**kw) def test_goodvars(self): + test_metrics_collector = object() inst = self._makeOne( host="localhost", port="8080", @@ -135,6 +136,7 @@ def test_goodvars(self): url_prefix="///foo/", ipv4=True, ipv6=False, + metrics_collector=test_metrics_collector, ) self.assertEqual(inst.host, "localhost") @@ -164,6 +166,7 @@ def test_goodvars(self): self.assertEqual(inst.url_prefix, "/foo") self.assertEqual(inst.ipv4, True) self.assertEqual(inst.ipv6, False) + self.assertEqual(inst.metrics_collector, test_metrics_collector) bind_pairs = [ sockaddr[:2] diff --git a/tests/test_server.py b/tests/test_server.py index 6edc3b24..f3b5bea3 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -17,6 +17,7 @@ def _makeOne( _start=True, _sock=None, _server=None, + metrics_collector=None, ): from waitress.server import create_server @@ -25,6 +26,7 @@ def _makeOne( host=host, port=port, map=map, + metrics_collector=metrics_collector, _dispatcher=_dispatcher, _start=_start, _sock=_sock, @@ -109,6 +111,16 @@ def test_ctor_makes_dispatcher(self): inst.task_dispatcher.__class__.__name__, "ThreadedTaskDispatcher" ) + def test_ctor_passes_metrics_collector_to_dispatcher(self): + from waitress.observability import TasksMetricsCollector + + class DummyMetricsCollector(TasksMetricsCollector): + pass + + metrics_collector = DummyMetricsCollector() + inst = self._makeOne(_start=False, map={}, metrics_collector=metrics_collector) + self.assertEqual(inst.task_dispatcher.metrics_collector, metrics_collector) + def test_ctor_start_false(self): inst = self._makeOneWithMap(_start=False) self.assertEqual(inst.accepting, False) diff --git a/tests/test_task.py b/tests/test_task.py index 47868e15..5d5c2174 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -1,12 +1,16 @@ import io +import time import unittest class TestThreadedTaskDispatcher(unittest.TestCase): - def _makeOne(self): + def _makeOne(self, **kwargs): from waitress.task import ThreadedTaskDispatcher - return ThreadedTaskDispatcher() + return ThreadedTaskDispatcher(**kwargs) + + def _append_task(self, dispatcher, task): + dispatcher.queue.append((task, time.time_ns())) def test_handler_thread_task_raises(self): inst = self._makeOne() @@ -21,7 +25,7 @@ def service(self): task = BadDummyTask() inst.logger = DummyLogger() - inst.queue.append(task) + self._append_task(inst, task) inst.active_count += 1 inst.handler_thread(0) self.assertEqual(inst.stop_count, 0) @@ -81,7 +85,7 @@ def test_shutdown_one_thread(self): inst.threads.add(0) inst.logger = DummyLogger() task = DummyTask() - inst.queue.append(task) + self._append_task(inst, task) self.assertEqual(inst.shutdown(timeout=0.01), True) self.assertEqual( inst.logger.logged, @@ -100,6 +104,53 @@ def test_shutdown_no_cancel_pending(self): inst = self._makeOne() self.assertEqual(inst.shutdown(cancel_pending=False, timeout=0.01), False) + def _make_test_collector(self): + from waitress.observability import TasksMetricsCollector + + class TestMetricsCollector(TasksMetricsCollector): + def __init__(self): + self.queue_lengths = [] + self.wait_times = [] + + def report_queue_length(self, length): + self.queue_lengths.append(length) + + def report_task_wait_time(self, wait_time_ns): + self.wait_times.append(wait_time_ns) + + return TestMetricsCollector() + + def test_add_task_reports_length_to_metrics_collector(self): + test_collector = self._make_test_collector() + inst = self._makeOne(metrics_collector=test_collector) + + self.assertEqual(test_collector.queue_lengths, [0]) + + inst.add_task(DummyTask()) + + self.assertEqual(test_collector.queue_lengths, [0, 1]) + + def test_task_start_reports_length_and_wait_to_metrics_collector(self): + class StopLoopDummyTask(DummyTask): + def service(self): + super().service() + inst.stop_count += 1 + raise Exception + + test_collector = self._make_test_collector() + inst = self._makeOne(metrics_collector=test_collector) + + inst.active_count += 1 + inst.add_task(StopLoopDummyTask()) + + self.assertEqual(test_collector.wait_times, []) + self.assertEqual(test_collector.queue_lengths, [0, 1]) + + inst.handler_thread(0) + + self.assertEqual(test_collector.queue_lengths, [0, 1, 0]) + self.assertEqual(len(test_collector.wait_times), 1) + class TestTask(unittest.TestCase): def _makeOne(self, channel=None, request=None):