Skip to content

Commit

Permalink
server: introduce notifications.
Browse files Browse the repository at this point in the history
Notifications use a Unix pipe to interrupt select() whenever a reply is
enqueued.
  • Loading branch information
xavierog committed Jan 28, 2025
1 parent ec8a31c commit 9e3d895
Showing 1 changed file with 32 additions and 2 deletions.
34 changes: 32 additions & 2 deletions src/moulti/server.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from socket import socket as Socket
from queue import Queue, Empty
Expand All @@ -15,6 +16,7 @@

TYPE_ACCEPT = 1
TYPE_READ_WRITE = 2
TYPE_NOTIFY = 3

class MoultiServer:
"""
Expand Down Expand Up @@ -51,6 +53,11 @@ def __init__(
self.ready_callback = ready_callback
self.security_callback = security_callback
self.replies: Queue = Queue()
"""Queue used to receive replies that the network loop should send to clients."""
# That queue cannot be polled through the selectors module: combine it with a Unix pipe:
self.notifications, self.notify = os.pipe()
os.set_blocking(self.notifications, False)
os.set_blocking(self.notify, False)

def log(self, data: str) -> None:
if self.log_callback is not None:
Expand All @@ -68,17 +75,17 @@ def network_loop(self) -> None:
if not self.is_listening():
return
try:
self.server_selector.register(self.notifications, EVENT_READ, {'type': TYPE_NOTIFY})
assert self.server_socket is not None
self.server_selector.register(self.server_socket, EVENT_READ, {'type': TYPE_ACCEPT})

if self.ready_callback is not None:
self.ready_callback()

while self.loop_callback():
events = self.server_selector.select(0.01)
events = self.server_selector.select(0.1)
# This loop is not entirely event-based: it also dequeues replies, hence the relatively short duration
# in select().
self.handle_replies()
for key, mask in events:
assert isinstance(key.data, dict)
if key.data['type'] == TYPE_ACCEPT:
Expand All @@ -89,6 +96,9 @@ def network_loop(self) -> None:
self.write(key.data['writer'])
if mask & EVENT_READ:
self.read(key.data['reader'])
elif key.data['type'] == TYPE_NOTIFY:
self.clear_notifications()
self.handle_replies()
except Exception as exc:
self.log(f'network loop: {exc}')
finally:
Expand All @@ -97,8 +107,27 @@ def network_loop(self) -> None:
# Python process does not exit after this App has finished running.
if self.server_socket is not None:
self.server_socket.close()
self.server_socket = None
os.close(self.notify)
clean_selector(self.server_selector, close_fds=True, close=True)

def send_notification(self) -> None:
if not self.is_listening():
return
try:
os.write(self.notify, b'x')
except BlockingIOError:
# The pipe is full, but this can be ignored.
pass

def clear_notifications(self) -> None:
try:
while True:
os.read(self.notifications, 1024)
except BlockingIOError:
# All done: the pipe is empty.
pass

def accept(self, socket: Socket) -> None:
raddr = None
try:
Expand Down Expand Up @@ -160,6 +189,7 @@ def got_tlv(self, socket: Socket, raddr: str, data_type: str, data: bytes, file_
def reply(self, socket: Socket, message: Message) -> None:
# Delegate to the network loop through a queue:
self.replies.put_nowait((socket, message))
self.send_notification()
# This approach makes this function thread-safe.

def handle_replies(self) -> None:
Expand Down

0 comments on commit 9e3d895

Please sign in to comment.