Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support throwing SkipMessage in before enqueue, make return type of send and send_with_options Optional #673

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 61 additions & 60 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,63 +6,64 @@ By adding your name to the list below you disavow any rights or claims
to any changes submitted to the Dramatiq project and assign copyright
of those changes to CLEARTYPE SRL.

| Username | Name |
|:-------------------------------------------------------|:-----------------------|
| [@bendemaree](https://github.com/bendemaree) | Ben Demaree |
| [@whalesalad](https://github.com/whalesalad) | Michael Whalen |
| [@rakanalh](https://github.com/rakanalh) | Rakan Alhneiti |
| [@jssuzanne](https://github.com/jssuzanne) | Jean-Sébastien Suzanne |
| [@chen2aaron](https://github.com/chen2aaron) | xixijun |
| [@aequitas](https://github.com/aequitas) | Johan Bloemberg |
| [@najamansari](https://github.com/najamansari) | Najam Ahmed Ansari |
| [@rpkilby](https://github.com/rpkilby) | Ryan P Kilby |
| [@2miksyn](https://github.com/2miksyn) | Mikhail Smirnov |
| [@gdvalle](https://github.com/gdvalle) | Greg Dallavalle |
| [@viiicky](https://github.com/viiicky) | Vikas Prasad |
| [@xdmiodz](https://github.com/xdmiodz) | Dmitry Odzerikho |
| [@ryansm1](https://github.com/ryansm1) | Ryan Smith |
| [@aericson](https://github.com/aericson) | André Ericson |
| [@maerteijn](https://github.com/maerteijn) | Martijn Jacobs |
| [@ryanhiebert](https://github.com/ryanhiebert) | Ryan Hiebert |
| [@davidt99](https://github.com/davidt99) | davidt99 |
| [@brownan](https://github.com/brownan) | Andrew Brown |
| [@gilbsgilbs](https://github.com/gilbsgilbs) | gilbsgilbs |
| [@MihaiBalint](https://github.com/MihaiBalint) | Mihai Balint |
| [@xelhark](https://github.com/xelhark) | Gabriele Platania |
| [@bersace](https://github.com/bersace) | Étienne Bersac |
| [@metheoryt](https://github.com/metheoryt) | Maxim Romanyuk |
| [@douglasmiranda](https://github.com/douglasmiranda) | Douglas Miranda |
| [@srecnig](https://github.com/srecnig) | Martin Sereinig |
| [@wsantos](https://github.com/wsantos) | Waldecir Santos |
| [@jonathanlintott](http://github.com/jonathanlintott) | Jonathan Lintott |
| [@evstratbg](https://github.com/evstratbg) | Bogdan Evstratenko |
| [@CapedHero](https://github.com/CapedHero) | Maciej Wrześniewski |
| [@synweap15](https://github.com/synweap15) | Paweł Werda |
| [@asavoy](https://github.com/asavoy) | Alvin Savoy |
| [@benekastah](https://github.com/benekastah) | Paul Harper |
| [@timdrijvers](https://github.com/timdrijvers) | Tim Drijvers |
| [@takhs91](https://github.com/takhs91) | Takis Panagopoulos |
| [@swidoff](https://github.com/swidoff) | Seth Widoff |
| [@CaselIT](https://github.com/CaselIT) | Federico Caselli |
| [@omegacoleman](https://github.com/omegacoleman) | You Cai |
| [@denhai](https://github.com/denhai) | Hayden Bartlett |
| [@rouge8](https://github.com/rouge8) | Andy Freeland |
| [@thomazthz](https://github.com/thomazthz) | Thomaz Soares |
| [@FinnLidbetter](https://github.com/FinnLidbetter) | Finn Lidbetter |
| [@giuppep](https://github.com/giuppep) | Giuseppe Papallo |
| [@ethervoid](https://github.com/ethervoid) | Mario de Frutos |
| [@pcrockett](https://github.com/pcrockett) | Phil Crockett |
| [@nathanielobrown](https://github.com/nathanielobrown) | Nathaniel Brown |
| [@staticdev](https://github.com/staticdev) | Thiago |
| [@kurtmckee](https://github.com/kurtmckee) | Kurt McKee |
| [@cmflynn](https://github.com/cmflynn) | Cody Flynn |
| [@dancardin](https://github.com/dancardin) | Dan Cardin |
| [@caspervdw](https://github.com/caspervdw) | Casper van der Wel |
| [@jenstroeger](https://github.com/jenstroeger/) | Jens Troeger |
| [@h3nnn4n](https://github.com/h3nnn4n/) | Renan S Silva |
| [@DiegoPomares](https://github.com/DiegoPomares/) | Diego Pomares |
| [@pahrohfit](https://github.com/pahrohfit/) | Robert Dailey |
| [@nhairs](https://github.com/nhairs) | Nicholas Hairs |
| [@5tefan](https://github.com/5tefan/) | Stefan Codrescu |
| [@kuba-lilz](https://github.com/kuba-lilz/) | Jakub Kolodziejczyk |
| [@dbowring](https://github.com/dbowring/) | Daniel Bowring |
| Username | Name |
|:----------------------------------------------------------|:-----------------------|
| [@bendemaree](https://github.com/bendemaree) | Ben Demaree |
| [@whalesalad](https://github.com/whalesalad) | Michael Whalen |
| [@rakanalh](https://github.com/rakanalh) | Rakan Alhneiti |
| [@jssuzanne](https://github.com/jssuzanne) | Jean-Sébastien Suzanne |
| [@chen2aaron](https://github.com/chen2aaron) | xixijun |
| [@aequitas](https://github.com/aequitas) | Johan Bloemberg |
| [@najamansari](https://github.com/najamansari) | Najam Ahmed Ansari |
| [@rpkilby](https://github.com/rpkilby) | Ryan P Kilby |
| [@2miksyn](https://github.com/2miksyn) | Mikhail Smirnov |
| [@gdvalle](https://github.com/gdvalle) | Greg Dallavalle |
| [@viiicky](https://github.com/viiicky) | Vikas Prasad |
| [@xdmiodz](https://github.com/xdmiodz) | Dmitry Odzerikho |
| [@ryansm1](https://github.com/ryansm1) | Ryan Smith |
| [@aericson](https://github.com/aericson) | André Ericson |
| [@maerteijn](https://github.com/maerteijn) | Martijn Jacobs |
| [@ryanhiebert](https://github.com/ryanhiebert) | Ryan Hiebert |
| [@davidt99](https://github.com/davidt99) | davidt99 |
| [@brownan](https://github.com/brownan) | Andrew Brown |
| [@gilbsgilbs](https://github.com/gilbsgilbs) | gilbsgilbs |
| [@MihaiBalint](https://github.com/MihaiBalint) | Mihai Balint |
| [@xelhark](https://github.com/xelhark) | Gabriele Platania |
| [@bersace](https://github.com/bersace) | Étienne Bersac |
| [@metheoryt](https://github.com/metheoryt) | Maxim Romanyuk |
| [@douglasmiranda](https://github.com/douglasmiranda) | Douglas Miranda |
| [@srecnig](https://github.com/srecnig) | Martin Sereinig |
| [@wsantos](https://github.com/wsantos) | Waldecir Santos |
| [@jonathanlintott](http://github.com/jonathanlintott) | Jonathan Lintott |
| [@evstratbg](https://github.com/evstratbg) | Bogdan Evstratenko |
| [@CapedHero](https://github.com/CapedHero) | Maciej Wrześniewski |
| [@synweap15](https://github.com/synweap15) | Paweł Werda |
| [@asavoy](https://github.com/asavoy) | Alvin Savoy |
| [@benekastah](https://github.com/benekastah) | Paul Harper |
| [@timdrijvers](https://github.com/timdrijvers) | Tim Drijvers |
| [@takhs91](https://github.com/takhs91) | Takis Panagopoulos |
| [@swidoff](https://github.com/swidoff) | Seth Widoff |
| [@CaselIT](https://github.com/CaselIT) | Federico Caselli |
| [@omegacoleman](https://github.com/omegacoleman) | You Cai |
| [@denhai](https://github.com/denhai) | Hayden Bartlett |
| [@rouge8](https://github.com/rouge8) | Andy Freeland |
| [@thomazthz](https://github.com/thomazthz) | Thomaz Soares |
| [@FinnLidbetter](https://github.com/FinnLidbetter) | Finn Lidbetter |
| [@giuppep](https://github.com/giuppep) | Giuseppe Papallo |
| [@ethervoid](https://github.com/ethervoid) | Mario de Frutos |
| [@pcrockett](https://github.com/pcrockett) | Phil Crockett |
| [@nathanielobrown](https://github.com/nathanielobrown) | Nathaniel Brown |
| [@staticdev](https://github.com/staticdev) | Thiago |
| [@kurtmckee](https://github.com/kurtmckee) | Kurt McKee |
| [@cmflynn](https://github.com/cmflynn) | Cody Flynn |
| [@dancardin](https://github.com/dancardin) | Dan Cardin |
| [@caspervdw](https://github.com/caspervdw) | Casper van der Wel |
| [@jenstroeger](https://github.com/jenstroeger/) | Jens Troeger |
| [@h3nnn4n](https://github.com/h3nnn4n/) | Renan S Silva |
| [@DiegoPomares](https://github.com/DiegoPomares/) | Diego Pomares |
| [@pahrohfit](https://github.com/pahrohfit/) | Robert Dailey |
| [@nhairs](https://github.com/nhairs) | Nicholas Hairs |
| [@5tefan](https://github.com/5tefan/) | Stefan Codrescu |
| [@kuba-lilz](https://github.com/kuba-lilz/) | Jakub Kolodziejczyk |
| [@dbowring](https://github.com/dbowring/) | Daniel Bowring |
| [@PavelSilnaHealth](https://github.com/PavelSilnaHealth/) | Pavel Asparouhov |
4 changes: 2 additions & 2 deletions dramatiq/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def message_with_options(
options=options,
)

def send(self, *args: P.args, **kwargs: P.kwargs) -> Message[R]:
def send(self, *args: P.args, **kwargs: P.kwargs) -> Optional[Message[R]]:
"""Asynchronously send a message to this actor.

Parameters:
Expand All @@ -147,7 +147,7 @@ def send_with_options(
kwargs: Optional[Dict[str, Any]] = None,
delay: Optional[timedelta | int] = None,
**options,
) -> Message[R]:
) -> Optional[Message[R]]:
"""Asynchronously send a message to this actor, along with an
arbitrary set of processing options for the broker and
middleware.
Expand Down
12 changes: 11 additions & 1 deletion dramatiq/brokers/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ..errors import ConnectionClosed, DecodeError, QueueJoinTimeout
from ..logging import get_logger
from ..message import Message, get_encoder
from ..middleware import SkipMessage

#: The maximum amount of time a message can be in the dead queue.
DEAD_MESSAGE_TTL = int(os.getenv("dramatiq_dead_message_ttl", 86400000 * 7))
Expand Down Expand Up @@ -326,7 +327,16 @@ def enqueue(self, message, *, delay=None):
try:
self.declare_queue(canonical_queue_name, ensure=True)
self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name)
self.emit_before("enqueue", message, delay)

try:
self.emit_before("enqueue", message, delay)
except SkipMessage:
self.logger.warning("Message %s was skipped during enqueue.", message)
proxy = _RabbitmqMessage(False, None, message) # redelivered=False, tag=None
proxy.fail()
self.emit_after("skip_message", proxy)
return None

self.channel.basic_publish(
exchange="",
routing_key=queue_name,
Expand Down
12 changes: 11 additions & 1 deletion dramatiq/brokers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ..errors import ConnectionClosed, QueueJoinTimeout
from ..logging import get_logger
from ..message import Message
from ..middleware import SkipMessage

MAINTENANCE_SCALE = 1000000
MAINTENANCE_COMMAND_BLACKLIST = {"ack", "nack"}
Expand Down Expand Up @@ -182,7 +183,16 @@ def enqueue(self, message, *, delay=None):
)

self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name)
self.emit_before("enqueue", message, delay)

try:
self.emit_before("enqueue", message, delay)
except SkipMessage:
self.logger.warning("Message %s was skipped during enqueue.", message)
proxy = MessageProxy(message)
proxy.fail() # Mark it as failed
self.emit_after("skip_message", proxy)
return None

self.do_enqueue(queue_name, message.options["redis_message_id"], message.encode())
self.emit_after("enqueue", message, delay)
return message
Expand Down
11 changes: 10 additions & 1 deletion dramatiq/brokers/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ..common import current_millis, dq_name, iter_queue, join_queue
from ..errors import QueueNotFound
from ..message import Message
from ..middleware import SkipMessage


class StubBroker(Broker):
Expand Down Expand Up @@ -108,7 +109,15 @@ def enqueue(self, message, *, delay=None):
if queue_name not in self.queues:
raise QueueNotFound(queue_name)

self.emit_before("enqueue", message, delay)
try:
self.emit_before("enqueue", message, delay)
except SkipMessage:
self.logger.warning("Message %s was skipped during enqueue.", message)
proxy = _StubMessageProxy(message)
proxy.fail()
self.emit_after("skip_message", proxy)
return None

self.queues[queue_name].put(message.encode())
self.emit_after("enqueue", message, delay)
return message
Expand Down
2 changes: 1 addition & 1 deletion dramatiq/results/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def after_skip_message(self, broker, message):
Let after_nack handle the case where the message was skipped and failed.
"""
store_results, result_ttl = self._lookup_options(broker, message)
if store_results and not message.failed:
if store_results and (not message or not message.failed):
self.backend.store_result(message, None, result_ttl)

def after_nack(self, broker, message):
Expand Down
31 changes: 31 additions & 0 deletions tests/test_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,37 @@ def track_call():
assert sum(skipped_messages) == 1


def test_messages_can_be_skipped_during_enqueue(stub_broker):
# Given that I have a middleware that skips messages during enqueue
skipped_messages = []

class SkipEnqueueMiddleware(Middleware):
def before_enqueue(self, broker, message, delay):
raise SkipMessage()

def after_skip_message(self, broker, message):
skipped_messages.append(message)

stub_broker.add_middleware(SkipEnqueueMiddleware())

# And an actor that keeps track of its calls
calls = []

@dramatiq.actor
def track_call():
calls.append(1)

# When I send that actor a message
result = track_call.send()

# Then I expect the message to have been skipped
assert result is None
assert len(skipped_messages) == 1

# And the message should not be in the queue
assert stub_broker.queues["default"].qsize() == 0


def test_workers_can_be_paused(stub_broker, stub_worker):
# Given a paused worker
stub_worker.pause()
Expand Down
33 changes: 33 additions & 0 deletions tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dramatiq import Message, Middleware, QueueJoinTimeout, Worker
from dramatiq.brokers.rabbitmq import RabbitmqBroker, URLRabbitmqBroker, _IgnoreScaryLogs
from dramatiq.common import current_millis
from dramatiq.middleware import SkipMessage

from .common import RABBITMQ_CREDENTIALS, RABBITMQ_PASSWORD, RABBITMQ_USERNAME, skip_unless_rabbit_mq

Expand Down Expand Up @@ -556,3 +557,35 @@ def put():

assert len(rabbitmq_broker.queues) == 1
assert put.queue_name in rabbitmq_broker.queues


def test_rabbitmq_messages_can_be_skipped_during_enqueue(rabbitmq_broker):
# Given that I have a middleware that skips messages during enqueue
skipped_messages = []

class SkipEnqueueMiddleware(Middleware):
def before_enqueue(self, broker, message, delay):
raise SkipMessage()

def after_skip_message(self, broker, message):
skipped_messages.append(message)

rabbitmq_broker.add_middleware(SkipEnqueueMiddleware())

# And an actor that keeps track of its calls
calls = []

@dramatiq.actor
def track_call():
calls.append(1)

# When I send that actor a message
result = track_call.send()

# Then I expect the message to have been skipped
assert result is None
assert len(skipped_messages) == 1

# And no messages should be in the queue
queue_count, _, _ = rabbitmq_broker.get_queue_message_counts(track_call.queue_name)
assert queue_count == 0
34 changes: 33 additions & 1 deletion tests/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import redis

import dramatiq
from dramatiq import Message, QueueJoinTimeout
from dramatiq import Message, QueueJoinTimeout, Middleware
from dramatiq.brokers.redis import MAINTENANCE_SCALE, RedisBroker
from dramatiq.common import current_millis, dq_name, xq_name
from dramatiq.errors import ConnectionError
from dramatiq.middleware import SkipMessage

from .common import worker

Expand Down Expand Up @@ -522,3 +523,34 @@ def go_again():

assert called
assert size == [4, 4, 2, 2]


def test_redis_messages_can_be_skipped_during_enqueue(redis_broker, redis_worker):
# Given that I have a middleware that skips messages during enqueue
skipped_messages = []

class SkipEnqueueMiddleware(Middleware):
def before_enqueue(self, broker, message, delay):
raise SkipMessage()

def after_skip_message(self, broker, message):
skipped_messages.append(message)

redis_broker.add_middleware(SkipEnqueueMiddleware())

# And an actor that keeps track of its calls
calls = []

@dramatiq.actor
def track_call():
calls.append(1)

# When I send that actor a message
result = track_call.send()

# Then I expect the message to have been skipped
assert result is None
assert len(skipped_messages) == 1

# And no messages should be in the queue
assert redis_broker.client.llen("dramatiq:%s" % track_call.queue_name) == 0