Skip to content

Commit

Permalink
feature: add middleware after_{consumer,worker}_thread_boot hooks
Browse files Browse the repository at this point in the history
This allows the middleware to run code in the context of a worker or
consumer thread before it enters its run loop. This could be used to set
up thread-local resources, or as in the author's case, to get a reference
to the thread before it does any work.

This was proposed on the mailing list [0] with Bogdan accepting the idea
in principle [1].

[0]: https://groups.io/g/dramatiq-users/topic/105311701
[1]: https://groups.io/g/dramatiq-users/topic/105311701#258
  • Loading branch information
kamalmarhubi committed Apr 11, 2024
1 parent 68c6257 commit b0289da
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
10 changes: 7 additions & 3 deletions dramatiq/middleware/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,22 @@ def after_worker_shutdown(self, broker, worker):
"""Called after the worker process shuts down.
"""

def after_consumer_thread_boot(self, broker, thread):
"""Called from a consumer thread after it starts but before it starts its run loop.
"""

def before_consumer_thread_shutdown(self, broker, thread):
"""Called before a consumer thread shuts down. This may be
used to clean up thread-local resources (such as Django
database connections).
"""

There is no ``after_consumer_thread_boot``.
def after_worker_thread_boot(self, broker, thread):
"""Called from a worker thread after it starts but before it starts its run loop.
"""

def before_worker_thread_shutdown(self, broker, thread):
"""Called before a worker thread shuts down. This may be used
to clean up thread-local resources (such as Django database
connections).
There is no ``after_worker_thread_boot``.
"""
2 changes: 2 additions & 0 deletions dramatiq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def __init__(self, *, broker, queue_name, prefetch, work_queue, worker_timeout):
def run(self):
self.logger.debug("Running consumer thread...")
self.running = True
self.broker.emit_after("consumer_thread_boot", self)
while self.running:
if self.paused:
self.logger.debug("Consumer is paused. Sleeping for %.02fms...", self.worker_timeout)
Expand Down Expand Up @@ -448,6 +449,7 @@ def __init__(self, *, broker, consumers, work_queue, worker_timeout):
def run(self):
self.logger.debug("Running worker thread...")
self.running = True
self.broker.emit_after("worker_thread_boot", self)
while self.running:
if self.paused:
self.logger.debug("Worker is paused. Sleeping for %.02f...", self.timeout)
Expand Down

0 comments on commit b0289da

Please sign in to comment.