Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics collector with queue length & wait time reporting #1

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ build/
coverage.xml
docs/_themes
docs/_build
*.swp
4 changes: 4 additions & 0 deletions src/waitress/adjustments.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class Adjustments:
("sockets", as_socket_list),
("channel_request_lookahead", int),
("server_name", str),
("metrics_collector", lambda x: x),
)

_param_map = dict(_params)
Expand Down Expand Up @@ -289,6 +290,9 @@ class Adjustments:
# (or when using the Proxy settings, without forwarding a Host header)
server_name = "waitress.invalid"

# Metrics collector. This allow to provide the listener that collects metrics
metrics_collector = None

def __init__(self, **kw):

if "listen" in kw and ("host" in kw or "port" in kw):
Expand Down
25 changes: 25 additions & 0 deletions src/waitress/observability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Interfaces for metrics collection
"""

from typing import Protocol


class TasksMetricsCollector(Protocol):
def report_queue_length(self, lenght):
... # pragma: no cover

def report_task_wait_time(self, wait_time_ns):
... # pragma: no cover
8 changes: 4 additions & 4 deletions src/waitress/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def create_server(
_start=True, # test shim
_sock=None, # test shim
_dispatcher=None, # test shim
**kw # adjustments
**kw, # adjustments
):
"""
if __name__ == '__main__':
Expand All @@ -53,7 +53,7 @@ def create_server(

dispatcher = _dispatcher
if dispatcher is None:
dispatcher = ThreadedTaskDispatcher()
dispatcher = ThreadedTaskDispatcher(metrics_collector=adj.metrics_collector)
dispatcher.set_thread_count(adj.threads)

if adj.unix_socket and hasattr(socket, "AF_UNIX"):
Expand Down Expand Up @@ -194,7 +194,7 @@ def __init__(
adj=None, # adjustments
sockinfo=None, # opaque object
bind_socket=True,
**kw
**kw,
):
if adj is None:
adj = Adjustments(**kw)
Expand Down Expand Up @@ -388,7 +388,7 @@ def __init__(
dispatcher=None, # dispatcher
adj=None, # adjustments
sockinfo=None, # opaque object
**kw
**kw,
):
if sockinfo is None:
sockinfo = (socket.AF_UNIX, socket.SOCK_STREAM, None, None)
Expand Down
24 changes: 20 additions & 4 deletions src/waitress/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ class ThreadedTaskDispatcher:
active_count = 0 # Number of currently active threads
logger = logger
queue_logger = queue_logger
metrics_collector = None

def __init__(self):
def __init__(self, metrics_collector=None):
self.threads = set()
self.queue = deque()
self.lock = threading.Lock()
self.queue_cv = threading.Condition(self.lock)
self.thread_exit_cv = threading.Condition(self.lock)
self.metrics_collector = metrics_collector
self.report_queue_length(0)

def start_new_thread(self, target, thread_no):
t = threading.Thread(
Expand All @@ -79,7 +82,9 @@ def handler_thread(self, thread_no):
self.thread_exit_cv.notify()
break

task = self.queue.popleft()
task, create_time_ns = self.queue.popleft()
self.report_queue_length(len(self.queue))
self.report_task_wait_time(task, create_time_ns)
try:
task.service()
except BaseException:
Expand All @@ -106,9 +111,10 @@ def set_thread_count(self, count):

def add_task(self, task):
with self.lock:
self.queue.append(task)
self.queue.append((task, time.time_ns()))
self.queue_cv.notify()
queue_size = len(self.queue)
self.report_queue_length(queue_size)
idle_threads = len(self.threads) - self.stop_count - self.active_count
if queue_size > idle_threads:
self.queue_logger.warning(
Expand All @@ -132,12 +138,22 @@ def shutdown(self, cancel_pending=True, timeout=5):
if len(queue) > 0:
self.logger.warning("Canceling %d pending task(s)", len(queue))
while queue:
task = queue.popleft()
task, _ = queue.popleft()
task.cancel()
self.queue_cv.notify_all()
return True
return False

def report_queue_length(self, length):
if self.metrics_collector:
self.metrics_collector.report_queue_length(length)

def report_task_wait_time(self, task, create_time_ns):
if self.metrics_collector:
self.metrics_collector.report_task_wait_time(
time.time_ns() - create_time_ns
)


class Task:
close_on_finish = False
Expand Down
3 changes: 3 additions & 0 deletions tests/test_adjustments.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def _makeOne(self, **kw):
return Adjustments(**kw)

def test_goodvars(self):
test_metrics_collector = object()
inst = self._makeOne(
host="localhost",
port="8080",
Expand Down Expand Up @@ -135,6 +136,7 @@ def test_goodvars(self):
url_prefix="///foo/",
ipv4=True,
ipv6=False,
metrics_collector=test_metrics_collector,
)

self.assertEqual(inst.host, "localhost")
Expand Down Expand Up @@ -164,6 +166,7 @@ def test_goodvars(self):
self.assertEqual(inst.url_prefix, "/foo")
self.assertEqual(inst.ipv4, True)
self.assertEqual(inst.ipv6, False)
self.assertEqual(inst.metrics_collector, test_metrics_collector)

bind_pairs = [
sockaddr[:2]
Expand Down
12 changes: 12 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def _makeOne(
_start=True,
_sock=None,
_server=None,
metrics_collector=None,
):
from waitress.server import create_server

Expand All @@ -25,6 +26,7 @@ def _makeOne(
host=host,
port=port,
map=map,
metrics_collector=metrics_collector,
_dispatcher=_dispatcher,
_start=_start,
_sock=_sock,
Expand Down Expand Up @@ -109,6 +111,16 @@ def test_ctor_makes_dispatcher(self):
inst.task_dispatcher.__class__.__name__, "ThreadedTaskDispatcher"
)

def test_ctor_passes_metrics_collector_to_dispatcher(self):
from waitress.observability import TasksMetricsCollector

class DummyMetricsCollector(TasksMetricsCollector):
pass

metrics_collector = DummyMetricsCollector()
inst = self._makeOne(_start=False, map={}, metrics_collector=metrics_collector)
self.assertEqual(inst.task_dispatcher.metrics_collector, metrics_collector)

def test_ctor_start_false(self):
inst = self._makeOneWithMap(_start=False)
self.assertEqual(inst.accepting, False)
Expand Down
59 changes: 55 additions & 4 deletions tests/test_task.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import io
import time
import unittest


class TestThreadedTaskDispatcher(unittest.TestCase):
def _makeOne(self):
def _makeOne(self, **kwargs):
from waitress.task import ThreadedTaskDispatcher

return ThreadedTaskDispatcher()
return ThreadedTaskDispatcher(**kwargs)

def _append_task(self, dispatcher, task):
dispatcher.queue.append((task, time.time_ns()))

def test_handler_thread_task_raises(self):
inst = self._makeOne()
Expand All @@ -21,7 +25,7 @@ def service(self):

task = BadDummyTask()
inst.logger = DummyLogger()
inst.queue.append(task)
self._append_task(inst, task)
inst.active_count += 1
inst.handler_thread(0)
self.assertEqual(inst.stop_count, 0)
Expand Down Expand Up @@ -81,7 +85,7 @@ def test_shutdown_one_thread(self):
inst.threads.add(0)
inst.logger = DummyLogger()
task = DummyTask()
inst.queue.append(task)
self._append_task(inst, task)
self.assertEqual(inst.shutdown(timeout=0.01), True)
self.assertEqual(
inst.logger.logged,
Expand All @@ -100,6 +104,53 @@ def test_shutdown_no_cancel_pending(self):
inst = self._makeOne()
self.assertEqual(inst.shutdown(cancel_pending=False, timeout=0.01), False)

def _make_test_collector(self):
from waitress.observability import TasksMetricsCollector

class TestMetricsCollector(TasksMetricsCollector):
def __init__(self):
self.queue_lengths = []
self.wait_times = []

def report_queue_length(self, length):
self.queue_lengths.append(length)

def report_task_wait_time(self, wait_time_ns):
self.wait_times.append(wait_time_ns)

return TestMetricsCollector()

def test_add_task_reports_length_to_metrics_collector(self):
test_collector = self._make_test_collector()
inst = self._makeOne(metrics_collector=test_collector)

self.assertEqual(test_collector.queue_lengths, [0])

inst.add_task(DummyTask())

self.assertEqual(test_collector.queue_lengths, [0, 1])

def test_task_start_reports_length_and_wait_to_metrics_collector(self):
class StopLoopDummyTask(DummyTask):
def service(self):
super().service()
inst.stop_count += 1
raise Exception

test_collector = self._make_test_collector()
inst = self._makeOne(metrics_collector=test_collector)

inst.active_count += 1
inst.add_task(StopLoopDummyTask())

self.assertEqual(test_collector.wait_times, [])
self.assertEqual(test_collector.queue_lengths, [0, 1])

inst.handler_thread(0)

self.assertEqual(test_collector.queue_lengths, [0, 1, 0])
self.assertEqual(len(test_collector.wait_times), 1)


class TestTask(unittest.TestCase):
def _makeOne(self, channel=None, request=None):
Expand Down