Skip to content

Commit

Permalink
Add service manager infrastructure
Browse files Browse the repository at this point in the history
The changes are (This will be a bit long):
- A ServiceManager class that spawns a background thread and deals with
  service lifecycle management. The idea is that service lifecycle code
  will run in async functions, so a single thread is enough to manage
  any (reasonable) amount of services.

- A Service class, that offers start() and stop() methods that just
  notify the service manager to... well. Start and stop a service.

(!) Warning: Note that this differs from mp.Process.start/stop in that
  the service commands are sent asynchronously and will complete
  "eventually". This is good because it means that business logic is
  fast when booting up and shutting down, but we need to make sure
  that code does not rely on start() and stop() being instant
  (Mainly pid assignments).

  Subclasses of the Service class should use the on_start and on_stop
  methods to monitor for service events. These will be run by the
  service manager thread, so we need to be careful not to block
  execution here. Standard async stuff.

(!) Note on service names: Service names should be unique within a
  ServiceManager. Make sure that you pass the name you want to
  super().__init__(name="...") if you plan to spawn multiple instances
  of a service.

- A ServiceProcess class: A Service that wraps a multiprocessing.Process
  into a Service. It offers a run() method subclasses can override.

And finally, I lied a bit about this whole thing using a single thread.
I can't find any way to run python multiprocessing in async, so there is
a MultiprocessingWaiter thread that waits for multiprocessing events and
notifies any pending futures. This was uhhh... fun? No, not really.
But it works. Using this part of the code just involves calling the
provided wait method. See the implementation of ServiceProcess for more
details.
  • Loading branch information
gtsiam committed Oct 16, 2024
1 parent 3f1ab66 commit e187eea
Show file tree
Hide file tree
Showing 6 changed files with 475 additions and 0 deletions.
1 change: 1 addition & 0 deletions .cspell/frigate-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ codeproject
colormap
colorspace
comms
coro
ctypeslib
CUDA
Cuvid
Expand Down
4 changes: 4 additions & 0 deletions frigate/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,7 @@ ignore_errors = false
[mypy-frigate.watchdog]
ignore_errors = false
disallow_untyped_calls = false


[mypy-frigate.service_manager.*]
ignore_errors = false
4 changes: 4 additions & 0 deletions frigate/service_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .multiprocessing import ServiceProcess
from .service import Service, ServiceManager

__all__ = ["Service", "ServiceProcess", "ServiceManager"]
67 changes: 67 additions & 0 deletions frigate/service_manager/multiprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import asyncio
import multiprocessing as mp
from abc import ABC, abstractmethod
from asyncio.exceptions import TimeoutError
from typing import Optional

from .multiprocessing_waiter import wait as mp_wait
from .service import Service, ServiceManager

DEFAULT_STOP_TIMEOUT = 10 # seconds


class ServiceProcess(Service, ABC):
_process: mp.Process

def __init__(
self,
name: Optional[str] = None,
manager: Optional[ServiceManager] = None,
) -> None:
super().__init__(name=name, manager=manager)

self._process_lock = asyncio.Lock()

async def on_start(self) -> None:
async with self._process_lock:
if hasattr(self, "_process"):
if self._process.is_alive():
self.manager.logger.debug(
"Attempted to start already running process"
f" {self.name} (pid: {self._process.pid})"
)
return
else:
self._process.close()

# At this point, the process is either stopped or dead, so we can recreate it.
self._process = mp.Process(name=self.name, target=self.run, daemon=True)
self._process.start()
self.manager.logger.info(f"Started {self.name} (pid: {self._process.pid})")

async def on_stop(self, *, timeout: Optional[float] = None) -> None:
if timeout is None:
timeout = DEFAULT_STOP_TIMEOUT

async with self._process_lock:
if not hasattr(self, "_process"):
return # Already stopped.

self._process.terminate()
try:
await asyncio.wait_for(mp_wait(self._process), timeout)
except TimeoutError:
self.manager.logger.warning(
f"{self.name} is still running after "
f"{timeout} seconds. Killing."
)
self._process.kill()
await mp_wait(self._process)

del self._process

self.manager.logger.info(f"{self.name} stopped")

@abstractmethod
def run(self) -> None:
pass
150 changes: 150 additions & 0 deletions frigate/service_manager/multiprocessing_waiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import asyncio
import functools
import logging
import multiprocessing as mp
import queue
import threading
from multiprocessing.connection import Connection
from multiprocessing.connection import wait as mp_wait
from socket import socket
from typing import Any, Optional, Union

logger = logging.getLogger(__name__)


class MultiprocessingWaiter(threading.Thread):
"""A background thread that manages futures for the multiprocessing.connection.wait() method."""

def __init__(self) -> None:
super().__init__(daemon=True)

# Queue of objects to wait for and futures to set results for.
self._queue: queue.Queue[tuple[Any, asyncio.Future[None]]] = queue.Queue()

# This is required to get mp_wait() to wake up when new objects to wait for are received.
receive, send = mp.Pipe(duplex=False)
self._receive_connection = receive
self._send_connection = send

def wait_for_sentinel(self, sentinel: Any) -> asyncio.Future[None]:
"""Create an asyncio.Future tracking a sentinel for multiprocessing.connection.wait()
Warning: This method is NOT thread-safe.
"""
# This would be incredibly stupid, but you never know.
assert sentinel != self._receive_connection

# Send the future to the background thread for processing.
future = asyncio.get_running_loop().create_future()
self._queue.put((sentinel, future))

# Notify the background thread.
#
# This is the non-thread-safe part, but since this method is not really meant to be called
# by users, we can get away with not adding a lock at this point (to avoid adding 2 locks).
self._send_connection.send_bytes(b".")

return future

def run(self) -> None:
logger.debug("Started background thread")

wait_dict: dict[Any, set[asyncio.Future[None]]] = {
self._receive_connection: set()
}
while True:
for ready_obj in mp_wait(wait_dict.keys()):
# Make sure we never remove the receive connection from the wait dict
if ready_obj is self._receive_connection:
continue

logger.debug(
f"Sentinel {ready_obj!r} is ready. "
f"Notifying {len(wait_dict[ready_obj])} future(s)."
)

# Go over all the futures attached to this object and mark them as ready.
for fut in wait_dict.pop(ready_obj):
if fut.cancelled():
logger.debug(
f"A future for sentinel {ready_obj!r} is ready, "
"but the future is cancelled. Skipping."
)
else:
fut.get_loop().call_soon_threadsafe(
# Note: We need to check fut.cancelled() again, since it might
# have been set before the event loop's definition of "soon".
functools.partial(
lambda fut: fut.cancelled() or fut.set_result(None), fut
)
)

# Check for cancellations in the remaining futures.
done_objects = []
for obj, fut_set in wait_dict.items():
if obj is self._receive_connection:
continue

# Find any cancelled futures and remove them.
cancelled = [fut for fut in fut_set if fut.cancelled()]
fut_set.difference_update(cancelled)
logger.debug(
f"Removing {len(cancelled)} future(s) from sentinel: {obj!r}"
)

# Mark objects with no remaining futures for removal.
if len(fut_set) == 0:
done_objects.append(obj)

# Remove any objects that are done after removing cancelled futures.
for obj in done_objects:
logger.debug(
f"Sentinel {obj!r} no longer has any futures waiting for it."
)
del wait_dict[obj]

# Get new objects to wait for from the queue.
while True:
try:
obj, fut = self._queue.get_nowait()
self._receive_connection.recv_bytes(maxlength=1)
self._queue.task_done()

logger.debug(f"Received new sentinel: {obj!r}")

wait_dict.setdefault(obj, set()).add(fut)
except queue.Empty:
break


waiter_lock = threading.Lock()
waiter_thread: Optional[MultiprocessingWaiter] = None


async def wait(object: Union[mp.Process, Connection, socket]) -> None:
"""Wait for the supplied object to be ready.
Under the hood, this uses multiprocessing.connection.wait() and a background thread manage the
returned futures.
"""
global waiter_thread, waiter_lock

sentinel: Union[Connection, socket, int]
if isinstance(object, mp.Process):
sentinel = object.sentinel
elif isinstance(object, Connection) or isinstance(object, socket):
sentinel = object
else:
raise ValueError(f"Cannot wait for object of type {type(object).__qualname__}")

with waiter_lock:
if waiter_thread is None:
# Start a new waiter thread.
waiter_thread = MultiprocessingWaiter()
waiter_thread.start()

# Create the future while still holding the lock,
# since wait_for_sentinel() is not thread safe.
fut = waiter_thread.wait_for_sentinel(sentinel)

await fut
Loading

0 comments on commit e187eea

Please sign in to comment.