Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add service manager infrastructure #14150

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from

Conversation

gtsiam
Copy link
Contributor

@gtsiam gtsiam commented Oct 4, 2024

This is still very much a work in progress. Please don't merge just yet. I'm posting the PR now so we can discuss the approach. The code I've added is still (intentionally) not called from anywhere.

I'm making the PR now since it's getting big again so it doesn't come out of nowhere. But this time it can wait for 0.16 however long it needs to since I haven't touched anything outside of the new frigate/service_manager/ folder (Well, I did change mypy.ini, but I can live with merging that).

TODO

  • Add the ability to restart services.
  • Add service heartbeat. If a service does not send a heartbeat often enough, it is restarted.
  • Document some of this inside in the source, not here. (Since the API might still change, all of this was faster to write, believe it or not)

High level overview

Add a service class that can spawn and tear down processes blazingly 🔥 fast (Sorry, I had to).

In more detail

I'll just paste the singular commit message here for now.

    The changes are (This will be a bit long):
    - A ServiceManager class that spawns a background thread and deals with
      service lifecycle management. The idea is that service lifecycle code
      will run in async functions, so a single thread is enough to manage
      any (reasonable) amount of services.
    
    - A Service class, that offers start() and stop() methods that just
      notify the service manager to... well. Start and stop a service.
    
    (!) Warning: Note that this differs from mp.Process.start/stop in that
      the service commands are sent asynchronously and will complete
      "eventually". This is good because it means that business logic is
      fast when booting up and shutting down, but we need to make sure
      that code does not rely on start() and stop() being instant
      (Mainly pid assignments).
    
      Subclasses of the Service class should use the on_start and on_stop
      methods to monitor for service events. These will be run by the
      service manager thread, so we need to be careful not to block
      execution here. Standard async stuff.
    
    (!) Note on service names: Service names should be unique within a
      ServiceManager. Make sure that you pass the name you want to
      super().__init__(name="...") if you plan to spawn multiple instances
      of a service.
    
    - A ServiceProcess class: A Service that wraps a multiprocessing.Process
      into a Service. It offers a run() method subclasses can override.
    
    And finally, I lied a bit about this whole thing using a single thread.
    I can't find any way to run python multiprocessing in async, so there is
    a MultiprocessingWaiter thread that waits for multiprocessing events and
    notifies any pending futures. This was uhhh... fun? No, not really.
    But it works. Using this part of the code just involves calling the
    provided wait method. See the implementation of ServiceProcess for more
    details.

Type of change

  • Dependency upgrade
  • Bugfix (non-breaking change which fixes an issue)
  • New feature
  • Breaking change (fix/feature causing existing functionality to break)
  • Code quality improvements to existing code

Toy example to put into main if you wanna play with the service manager:

from frigate.service_manager import ServiceProcess

class TestSvc(ServiceProcess):
    def __init__(self, idx: int):
        super().__init__(name=f"TestSvc ({idx})")
        self.logger = logging.getLogger(self.name)

    def run(self):
        stop_event = threading.Event()

        signal.signal(signal.SIGTERM, lambda sig, frame: stop_event.set())
        signal.signal(signal.SIGINT, lambda sig, frame: stop_event.set())

        stop_event.wait()
        self.logger.info("Got stop event")
        # time.sleep(15) # Service stop timeout is currently hardcoded to 10 seconds.
        self.logger.info("Exiting process")

TestSvc(1).start()
TestSvc(2).start()
TestSvc(3).start()
TestSvc(4).start()
TestSvc(0).start()
TestSvc(5).start()
TestSvc(6).start()
TestSvc(7).start()
TestSvc(8).start()
TestSvc(9).start()
time.sleep(2)

Copy link

netlify bot commented Oct 4, 2024

Deploy Preview for frigate-docs canceled.

Name Link
🔨 Latest commit e187eea
🔍 Latest deploy log https://app.netlify.com/sites/frigate-docs/deploys/670f740fd7558300082f4baa

@NickM-27
Copy link
Sponsor Collaborator

NickM-27 commented Oct 4, 2024

I think it would be great to see what the other maintainers think, but I'll leave my thoughts:

I like this idea. Personally, I think it would work best to implement the framework first and then gradually move some of the processes over as it makes sense (as opposed to converting lots of the current processes at once).

This approach seems like it would be a great way to go about implementing #1911 in which case we will sometimes deliberately want the service to stop indefinitely before it starts again at some point in the future.

@blakeblackshear
Copy link
Owner

Looks great to me. Really appreciate the detailed explanation and thoughtful staging of the changes.

The changes are (This will be a bit long):
- A ServiceManager class that spawns a background thread and deals with
  service lifecycle management. The idea is that service lifecycle code
  will run in async functions, so a single thread is enough to manage
  any (reasonable) amount of services.

- A Service class, that offers start() and stop() methods that just
  notify the service manager to... well. Start and stop a service.

(!) Warning: Note that this differs from mp.Process.start/stop in that
  the service commands are sent asynchronously and will complete
  "eventually". This is good because it means that business logic is
  fast when booting up and shutting down, but we need to make sure
  that code does not rely on start() and stop() being instant
  (Mainly pid assignments).

  Subclasses of the Service class should use the on_start and on_stop
  methods to monitor for service events. These will be run by the
  service manager thread, so we need to be careful not to block
  execution here. Standard async stuff.

(!) Note on service names: Service names should be unique within a
  ServiceManager. Make sure that you pass the name you want to
  super().__init__(name="...") if you plan to spawn multiple instances
  of a service.

- A ServiceProcess class: A Service that wraps a multiprocessing.Process
  into a Service. It offers a run() method subclasses can override.

And finally, I lied a bit about this whole thing using a single thread.
I can't find any way to run python multiprocessing in async, so there is
a MultiprocessingWaiter thread that waits for multiprocessing events and
notifies any pending futures. This was uhhh... fun? No, not really.
But it works. Using this part of the code just involves calling the
provided wait method. See the implementation of ServiceProcess for more
details.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants