Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for quorum queues and max_attempts_at_message #37

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ ignore =
WPS229,
; Found function with too much cognitive complexity
WPS231,
; Found walrus operator
WPS332
; Found multiline conditions
WPS337
; Found multi-line function type annotation
WPS320
; Found `in` used with a non-set container
WPS510

per-file-ignores =
; all tests
Expand Down
37 changes: 31 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,32 @@ async def main():

```

## Queue Types and Message Reliability

AioPikaBroker supports both classic and quorum queues. Quorum queues are a more modern queue type in RabbitMQ that provides better reliability and data safety guarantees.

```python
from taskiq_aio_pika import AioPikaBroker, QueueType

broker = AioPikaBroker(
queue_type=QueueType.QUORUM, # Use quorum queues for better reliability
max_attempts_at_message=3 # Limit redelivery attempts
)
```

### Message Redelivery Control

When message processing fails due to consumer crashes (e.g. due to an OOM condition resulting in a SIGKILL), network issues, or other infrastructure problems, before the consumer has had the chance to acknowledge, positively or negatively, the message (and schedule a retry via taskiq's retry middleware), RabbitMQ will requeue the message to the front of the queue and it will be redelivered. With quorum queues, you can control how many times such a message will be redelivered:

- Set `max_attempts_at_message` to limit delivery attempts.
- Set `max_attempts_at_message=None` for unlimited attempts.
- This operates at the message delivery level, not application retry level. For application-level retries in case of exceptions that can be caught (e.g., temporary API failures), use taskiq's retry middleware instead.
- After max attempts, the message is logged and discarded.
- `max_attempts_at_message` requires using quorum queues (`queue_type=QueueType.QUORUM`).

This is particularly useful for preventing infinite loops of redeliveries of messages that consistently cause the consumer to crash ([poison messages](https://www.rabbitmq.com/docs/quorum-queues#poison-message-handling)) and can cause the queue to backup.


## Configuration

AioPikaBroker parameters:
Expand All @@ -125,13 +151,12 @@ AioPikaBroker parameters:
* `exchange_name` - name of exchange that used to send messages.
* `exchange_type` - type of the exchange. Used only if `declare_exchange` is True.
* `queue_name` - queue that used to get incoming messages.
* `queue_type` - type of RabbitMQ queue to use: `classic` or `quorum`. defaults to `classic`.
* `routing_key` - that used to bind that queue to the exchange.
* `declare_exchange` - whether you want to declare new exchange if it doesn't exist.
* `max_priority` - maximum priority for messages.
* `delay_queue_name` - custom delay queue name.
This queue is used to deliver messages with delays.
* `dead_letter_queue_name` - custom dead letter queue name.
This queue is used to receive negatively acknowleged messages from the main queue.
* `delay_queue_name` - custom delay queue name. This queue is used to deliver messages with delays.
* `dead_letter_queue_name` - custom dead letter queue name. This queue is used to receive negatively acknowleged messages from the main queue.
* `qos` - number of messages that worker can prefetch.
* `declare_queues` - whether you want to declare queues even on
client side. May be useful for message persistance.
* `declare_queues` - whether you want to declare queues even on client side. May be useful for message persistance.
* `max_attempts_at_message` - maximum number of attempts at processing the same message. requires the queue type to be set to `QueueType.QUORUM`. defaults to `20` for quorum queues and to `None` for classic queues. is not the same as task retries. pass `None` for unlimited attempts.
1 change: 1 addition & 0 deletions taskiq_aio_pika/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Taskiq integration with aio-pika."""

from importlib.metadata import version

from taskiq_aio_pika.broker import AioPikaBroker
Expand Down
120 changes: 107 additions & 13 deletions taskiq_aio_pika/broker.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,41 @@
import asyncio
import copy
from datetime import timedelta
from enum import Enum
from logging import getLogger
from typing import Any, AsyncGenerator, Callable, Dict, Optional, TypeVar
from typing import (
Any,
AsyncGenerator,
Callable,
Dict,
Literal,
Optional,
TypeVar,
Union,
)

from aio_pika import DeliveryMode, ExchangeType, Message, connect_robust
from aio_pika.abc import AbstractChannel, AbstractQueue, AbstractRobustConnection
from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage
from taskiq import (
AckableMessage,
AckableMessageWithDeliveryCount,
AsyncBroker,
AsyncResultBackend,
BrokerMessage,
)

_T = TypeVar("_T") # noqa: WPS111

logger = getLogger("taskiq.aio_pika_broker")


class QueueType(Enum):
"""Type of RabbitMQ queue."""

CLASSIC = "classic"
QUORUM = "quorum"


def parse_val(
parse_func: Callable[[str], _T],
target: Optional[str] = None,
Expand All @@ -35,7 +59,7 @@ def parse_val(
class AioPikaBroker(AsyncBroker):
"""Broker that works with RabbitMQ."""

def __init__( # noqa: WPS211
def __init__( # noqa: C901, WPS211
self,
url: Optional[str] = None,
result_backend: Optional[AsyncResultBackend[_T]] = None,
Expand All @@ -44,6 +68,7 @@ def __init__( # noqa: WPS211
loop: Optional[asyncio.AbstractEventLoop] = None,
exchange_name: str = "taskiq",
queue_name: str = "taskiq",
queue_type: QueueType = QueueType.CLASSIC,
dead_letter_queue_name: Optional[str] = None,
delay_queue_name: Optional[str] = None,
declare_exchange: bool = True,
Expand All @@ -54,6 +79,7 @@ def __init__( # noqa: WPS211
delayed_message_exchange_plugin: bool = False,
declare_exchange_kwargs: Optional[Dict[Any, Any]] = None,
declare_queues_kwargs: Optional[Dict[Any, Any]] = None,
max_attempts_at_message: Union[Optional[int], Literal["default"]] = "default",
**connection_kwargs: Any,
) -> None:
"""
Expand All @@ -62,12 +88,13 @@ def __init__( # noqa: WPS211
:param url: url to rabbitmq. If None,
the default "amqp://guest:guest@localhost:5672" is used.
:param result_backend: custom result backend.

:param task_id_generator: custom task_id genertaor.
:param qos: number of messages that worker can prefetch.
:param loop: specific even loop.
:param exchange_name: name of exchange that used to send messages.
:param queue_name: queue that used to get incoming messages.
:param queue_type: type of RabbitMQ queue to use: `classic` or `quorum`.
defaults to `classic`.
:param dead_letter_queue_name: custom name for dead-letter queue.
by default it set to {queue_name}.dead_letter.
:param delay_queue_name: custom name for queue that used to
Expand All @@ -86,6 +113,11 @@ def __init__( # noqa: WPS211
:param declare_queues_kwargs: additional from AbstractChannel.declare_queue
:param connection_kwargs: additional keyword arguments,
for connect_robust method of aio-pika.
:param max_attempts_at_message: maximum number of attempts at processing
the same message. requires the queue type to be set to `QueueType.QUORUM`.
defaults to `20` for quorum queues and to `None` for classic queues.
is not the same as task retries. pass `None` for unlimited attempts.
:raises ValueError: if inappropriate arguments were passed.
"""
super().__init__(result_backend, task_id_generator)

Expand All @@ -104,6 +136,52 @@ def __init__( # noqa: WPS211
self._max_priority = max_priority
self._delayed_message_exchange_plugin = delayed_message_exchange_plugin

if self._declare_queues_kwargs.get("arguments", {}).get(
"x-queue-type",
) or self._declare_queues_kwargs.get("arguments", {}).get("x-delivery-limit"):
raise ValueError(
"Use the `queue_type` and `max_attempts_at_message` parameters of "
"`AioPikaBroker.__init__` instead of `x-queue-type` and "
"`x-delivery-limit`",
)
if queue_type == QueueType.QUORUM:
self._declare_queues_kwargs.setdefault("arguments", {})[
"x-queue-type"
] = "quorum"
self._declare_queues_kwargs["durable"] = True
else:
self._declare_queues_kwargs.setdefault("arguments", {})[
"x-queue-type"
] = "classic"

if queue_type != QueueType.QUORUM and max_attempts_at_message not in (
"default",
None,
):
raise ValueError(
"`max_attempts_at_message` requires `queue_type` to be set to "
"`QueueType.QUORUM`.",
)

if max_attempts_at_message == "default":
if queue_type == QueueType.QUORUM:
self.max_attempts_at_message = 20
else:
self.max_attempts_at_message = None
else:
self.max_attempts_at_message = max_attempts_at_message

if queue_type == QueueType.QUORUM:
if self.max_attempts_at_message is None:
# no limit
self._declare_queues_kwargs["arguments"]["x-delivery-limit"] = "-1"
else:
# the final attempt will be handled in `taskiq.Receiver`
# to generate visible logs
self._declare_queues_kwargs["arguments"]["x-delivery-limit"] = (
self.max_attempts_at_message + 1
)

self._dead_letter_queue_name = f"{queue_name}.dead_letter"
if dead_letter_queue_name:
self._dead_letter_queue_name = dead_letter_queue_name
Expand Down Expand Up @@ -183,9 +261,15 @@ async def declare_queues(
:param channel: channel to used for declaration.
:return: main queue instance.
"""
declare_queues_kwargs_ex_arguments = copy.copy(self._declare_queues_kwargs)
declare_queue_arguments = declare_queues_kwargs_ex_arguments.pop(
"arguments",
{},
)
await channel.declare_queue(
self._dead_letter_queue_name,
**self._declare_queues_kwargs,
**declare_queues_kwargs_ex_arguments,
arguments=declare_queue_arguments,
)
args: "Dict[str, Any]" = {
"x-dead-letter-exchange": "",
Expand All @@ -195,8 +279,8 @@ async def declare_queues(
args["x-max-priority"] = self._max_priority
queue = await channel.declare_queue(
self._queue_name,
arguments=args,
**self._declare_queues_kwargs,
arguments=args | declare_queue_arguments,
**declare_queues_kwargs_ex_arguments,
)
if self._delayed_message_exchange_plugin:
await queue.bind(
Expand All @@ -209,8 +293,9 @@ async def declare_queues(
arguments={
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self._queue_name,
},
**self._declare_queues_kwargs,
}
| declare_queue_arguments,
**declare_queues_kwargs_ex_arguments,
)

await queue.bind(
Expand Down Expand Up @@ -291,7 +376,16 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
queue = await self.declare_queues(self.read_channel)
async with queue.iterator() as iterator:
async for message in iterator:
yield AckableMessage(
data=message.body,
ack=message.ack,
)
if (
delivery_count := message.headers.get("x-delivery-count")
) is not None:
yield AckableMessageWithDeliveryCount(
data=message.body,
ack=message.ack,
delivery_count=delivery_count,
)
else:
yield AckableMessage(
data=message.body,
ack=message.ack,
)
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from contextlib import suppress
from typing import AsyncGenerator
from uuid import uuid4

Expand Down Expand Up @@ -229,3 +230,13 @@ async def broker_with_delayed_message_plugin(
if_empty=False,
if_unused=False,
)


@pytest.fixture(autouse=True, scope="function")
async def cleanup_rabbitmq(test_channel: Channel) -> AsyncGenerator[None, None]:
yield

for queue_name in ["taskiq", "taskiq.dead_letter", "taskiq.delay"]:
with suppress(Exception):
queue = await test_channel.get_queue(queue_name, ensure=False)
await queue.delete(if_unused=False, if_empty=False)
Loading