diff --git a/.flake8 b/.flake8 index 45a9c85..9ed90e5 100644 --- a/.flake8 +++ b/.flake8 @@ -68,6 +68,14 @@ ignore = WPS229, ; Found function with too much cognitive complexity WPS231, + ; Found walrus operator + WPS332 + ; Found multiline conditions + WPS337 + ; Found multi-line function type annotation + WPS320 + ; Found `in` used with a non-set container + WPS510 per-file-ignores = ; all tests diff --git a/README.md b/README.md index 3a6ea41..a2f78de 100644 --- a/README.md +++ b/README.md @@ -116,6 +116,32 @@ async def main(): ``` +## Queue Types and Message Reliability + +AioPikaBroker supports both classic and quorum queues. Quorum queues are a more modern queue type in RabbitMQ that provides better reliability and data safety guarantees. + +```python +from taskiq_aio_pika import AioPikaBroker, QueueType + +broker = AioPikaBroker( + queue_type=QueueType.QUORUM, # Use quorum queues for better reliability + max_attempts_at_message=3 # Limit redelivery attempts +) +``` + +### Message Redelivery Control + +When message processing fails due to consumer crashes (e.g. due to an OOM condition resulting in a SIGKILL), network issues, or other infrastructure problems, before the consumer has had the chance to acknowledge, positively or negatively, the message (and schedule a retry via taskiq's retry middleware), RabbitMQ will requeue the message to the front of the queue and it will be redelivered. With quorum queues, you can control how many times such a message will be redelivered: + +- Set `max_attempts_at_message` to limit delivery attempts. +- Set `max_attempts_at_message=None` for unlimited attempts. +- This operates at the message delivery level, not application retry level. For application-level retries in case of exceptions that can be caught (e.g., temporary API failures), use taskiq's retry middleware instead. +- After max attempts, the message is logged and discarded. +- `max_attempts_at_message` requires using quorum queues (`queue_type=QueueType.QUORUM`). + +This is particularly useful for preventing infinite loops of redeliveries of messages that consistently cause the consumer to crash ([poison messages](https://www.rabbitmq.com/docs/quorum-queues#poison-message-handling)) and can cause the queue to backup. + + ## Configuration AioPikaBroker parameters: @@ -125,13 +151,12 @@ AioPikaBroker parameters: * `exchange_name` - name of exchange that used to send messages. * `exchange_type` - type of the exchange. Used only if `declare_exchange` is True. * `queue_name` - queue that used to get incoming messages. +* `queue_type` - type of RabbitMQ queue to use: `classic` or `quorum`. defaults to `classic`. * `routing_key` - that used to bind that queue to the exchange. * `declare_exchange` - whether you want to declare new exchange if it doesn't exist. * `max_priority` - maximum priority for messages. -* `delay_queue_name` - custom delay queue name. - This queue is used to deliver messages with delays. -* `dead_letter_queue_name` - custom dead letter queue name. - This queue is used to receive negatively acknowleged messages from the main queue. +* `delay_queue_name` - custom delay queue name. This queue is used to deliver messages with delays. +* `dead_letter_queue_name` - custom dead letter queue name. This queue is used to receive negatively acknowleged messages from the main queue. * `qos` - number of messages that worker can prefetch. -* `declare_queues` - whether you want to declare queues even on - client side. May be useful for message persistance. +* `declare_queues` - whether you want to declare queues even on client side. May be useful for message persistance. +* `max_attempts_at_message` - maximum number of attempts at processing the same message. requires the queue type to be set to `QueueType.QUORUM`. defaults to `20` for quorum queues and to `None` for classic queues. is not the same as task retries. pass `None` for unlimited attempts. diff --git a/taskiq_aio_pika/__init__.py b/taskiq_aio_pika/__init__.py index 4b40666..6bab083 100644 --- a/taskiq_aio_pika/__init__.py +++ b/taskiq_aio_pika/__init__.py @@ -1,4 +1,5 @@ """Taskiq integration with aio-pika.""" + from importlib.metadata import version from taskiq_aio_pika.broker import AioPikaBroker diff --git a/taskiq_aio_pika/broker.py b/taskiq_aio_pika/broker.py index 326e54a..30888f8 100644 --- a/taskiq_aio_pika/broker.py +++ b/taskiq_aio_pika/broker.py @@ -1,17 +1,41 @@ import asyncio +import copy from datetime import timedelta +from enum import Enum from logging import getLogger -from typing import Any, AsyncGenerator, Callable, Dict, Optional, TypeVar +from typing import ( + Any, + AsyncGenerator, + Callable, + Dict, + Literal, + Optional, + TypeVar, + Union, +) from aio_pika import DeliveryMode, ExchangeType, Message, connect_robust from aio_pika.abc import AbstractChannel, AbstractQueue, AbstractRobustConnection -from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage +from taskiq import ( + AckableMessage, + AckableMessageWithDeliveryCount, + AsyncBroker, + AsyncResultBackend, + BrokerMessage, +) _T = TypeVar("_T") # noqa: WPS111 logger = getLogger("taskiq.aio_pika_broker") +class QueueType(Enum): + """Type of RabbitMQ queue.""" + + CLASSIC = "classic" + QUORUM = "quorum" + + def parse_val( parse_func: Callable[[str], _T], target: Optional[str] = None, @@ -35,7 +59,7 @@ def parse_val( class AioPikaBroker(AsyncBroker): """Broker that works with RabbitMQ.""" - def __init__( # noqa: WPS211 + def __init__( # noqa: C901, WPS211 self, url: Optional[str] = None, result_backend: Optional[AsyncResultBackend[_T]] = None, @@ -44,6 +68,7 @@ def __init__( # noqa: WPS211 loop: Optional[asyncio.AbstractEventLoop] = None, exchange_name: str = "taskiq", queue_name: str = "taskiq", + queue_type: QueueType = QueueType.CLASSIC, dead_letter_queue_name: Optional[str] = None, delay_queue_name: Optional[str] = None, declare_exchange: bool = True, @@ -54,6 +79,7 @@ def __init__( # noqa: WPS211 delayed_message_exchange_plugin: bool = False, declare_exchange_kwargs: Optional[Dict[Any, Any]] = None, declare_queues_kwargs: Optional[Dict[Any, Any]] = None, + max_attempts_at_message: Union[Optional[int], Literal["default"]] = "default", **connection_kwargs: Any, ) -> None: """ @@ -62,12 +88,13 @@ def __init__( # noqa: WPS211 :param url: url to rabbitmq. If None, the default "amqp://guest:guest@localhost:5672" is used. :param result_backend: custom result backend. - :param task_id_generator: custom task_id genertaor. :param qos: number of messages that worker can prefetch. :param loop: specific even loop. :param exchange_name: name of exchange that used to send messages. :param queue_name: queue that used to get incoming messages. + :param queue_type: type of RabbitMQ queue to use: `classic` or `quorum`. + defaults to `classic`. :param dead_letter_queue_name: custom name for dead-letter queue. by default it set to {queue_name}.dead_letter. :param delay_queue_name: custom name for queue that used to @@ -86,6 +113,11 @@ def __init__( # noqa: WPS211 :param declare_queues_kwargs: additional from AbstractChannel.declare_queue :param connection_kwargs: additional keyword arguments, for connect_robust method of aio-pika. + :param max_attempts_at_message: maximum number of attempts at processing + the same message. requires the queue type to be set to `QueueType.QUORUM`. + defaults to `20` for quorum queues and to `None` for classic queues. + is not the same as task retries. pass `None` for unlimited attempts. + :raises ValueError: if inappropriate arguments were passed. """ super().__init__(result_backend, task_id_generator) @@ -104,6 +136,52 @@ def __init__( # noqa: WPS211 self._max_priority = max_priority self._delayed_message_exchange_plugin = delayed_message_exchange_plugin + if self._declare_queues_kwargs.get("arguments", {}).get( + "x-queue-type", + ) or self._declare_queues_kwargs.get("arguments", {}).get("x-delivery-limit"): + raise ValueError( + "Use the `queue_type` and `max_attempts_at_message` parameters of " + "`AioPikaBroker.__init__` instead of `x-queue-type` and " + "`x-delivery-limit`", + ) + if queue_type == QueueType.QUORUM: + self._declare_queues_kwargs.setdefault("arguments", {})[ + "x-queue-type" + ] = "quorum" + self._declare_queues_kwargs["durable"] = True + else: + self._declare_queues_kwargs.setdefault("arguments", {})[ + "x-queue-type" + ] = "classic" + + if queue_type != QueueType.QUORUM and max_attempts_at_message not in ( + "default", + None, + ): + raise ValueError( + "`max_attempts_at_message` requires `queue_type` to be set to " + "`QueueType.QUORUM`.", + ) + + if max_attempts_at_message == "default": + if queue_type == QueueType.QUORUM: + self.max_attempts_at_message = 20 + else: + self.max_attempts_at_message = None + else: + self.max_attempts_at_message = max_attempts_at_message + + if queue_type == QueueType.QUORUM: + if self.max_attempts_at_message is None: + # no limit + self._declare_queues_kwargs["arguments"]["x-delivery-limit"] = "-1" + else: + # the final attempt will be handled in `taskiq.Receiver` + # to generate visible logs + self._declare_queues_kwargs["arguments"]["x-delivery-limit"] = ( + self.max_attempts_at_message + 1 + ) + self._dead_letter_queue_name = f"{queue_name}.dead_letter" if dead_letter_queue_name: self._dead_letter_queue_name = dead_letter_queue_name @@ -183,9 +261,15 @@ async def declare_queues( :param channel: channel to used for declaration. :return: main queue instance. """ + declare_queues_kwargs_ex_arguments = copy.copy(self._declare_queues_kwargs) + declare_queue_arguments = declare_queues_kwargs_ex_arguments.pop( + "arguments", + {}, + ) await channel.declare_queue( self._dead_letter_queue_name, - **self._declare_queues_kwargs, + **declare_queues_kwargs_ex_arguments, + arguments=declare_queue_arguments, ) args: "Dict[str, Any]" = { "x-dead-letter-exchange": "", @@ -195,8 +279,8 @@ async def declare_queues( args["x-max-priority"] = self._max_priority queue = await channel.declare_queue( self._queue_name, - arguments=args, - **self._declare_queues_kwargs, + arguments=args | declare_queue_arguments, + **declare_queues_kwargs_ex_arguments, ) if self._delayed_message_exchange_plugin: await queue.bind( @@ -209,8 +293,9 @@ async def declare_queues( arguments={ "x-dead-letter-exchange": "", "x-dead-letter-routing-key": self._queue_name, - }, - **self._declare_queues_kwargs, + } + | declare_queue_arguments, + **declare_queues_kwargs_ex_arguments, ) await queue.bind( @@ -291,7 +376,16 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: queue = await self.declare_queues(self.read_channel) async with queue.iterator() as iterator: async for message in iterator: - yield AckableMessage( - data=message.body, - ack=message.ack, - ) + if ( + delivery_count := message.headers.get("x-delivery-count") + ) is not None: + yield AckableMessageWithDeliveryCount( + data=message.body, + ack=message.ack, + delivery_count=delivery_count, + ) + else: + yield AckableMessage( + data=message.body, + ack=message.ack, + ) diff --git a/tests/conftest.py b/tests/conftest.py index f677fb2..1a8aa92 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import os +from contextlib import suppress from typing import AsyncGenerator from uuid import uuid4 @@ -229,3 +230,13 @@ async def broker_with_delayed_message_plugin( if_empty=False, if_unused=False, ) + + +@pytest.fixture(autouse=True, scope="function") +async def cleanup_rabbitmq(test_channel: Channel) -> AsyncGenerator[None, None]: + yield + + for queue_name in ["taskiq", "taskiq.dead_letter", "taskiq.delay"]: + with suppress(Exception): + queue = await test_channel.get_queue(queue_name, ensure=False) + await queue.delete(if_unused=False, if_empty=False) diff --git a/tests/test_broker.py b/tests/test_broker.py index 07541c4..f6c217c 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -1,5 +1,6 @@ import asyncio import uuid +from typing import Any, Literal import pytest from aio_pika import Channel, Message @@ -7,7 +8,7 @@ from taskiq import AckableMessage, BrokerMessage from taskiq.utils import maybe_awaitable -from taskiq_aio_pika.broker import AioPikaBroker +from taskiq_aio_pika.broker import AioPikaBroker, QueueType async def get_first_task(broker: AioPikaBroker) -> AckableMessage: # type: ignore @@ -48,7 +49,7 @@ async def test_kick_success(broker: AioPikaBroker) -> None: message = await asyncio.wait_for(get_first_task(broker), timeout=0.4) assert message.data == sent.message - await maybe_awaitable(message.ack()) + await maybe_awaitable(message.ack()) # type: ignore[misc] @pytest.mark.anyio @@ -114,7 +115,7 @@ async def test_listen( message = await asyncio.wait_for(get_first_task(broker), timeout=0.4) assert message.data == b"test_message" - await maybe_awaitable(message.ack()) + await maybe_awaitable(message.ack()) # type: ignore[misc] @pytest.mark.anyio @@ -139,7 +140,7 @@ async def test_wrong_format( message = await asyncio.wait_for(get_first_task(broker), 0.4) assert message.data == b"wrong" - await maybe_awaitable(message.ack()) + await maybe_awaitable(message.ack()) # type: ignore[misc] with pytest.raises(QueueEmpty): await queue.get() @@ -219,3 +220,109 @@ async def test_delayed_message_with_plugin( await asyncio.sleep(2) assert await main_queue.get() + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "args, x_queue_type, x_delivery_limit, max_attempts_at_message, raised_exception", + [ + ( # custom `max_attempts_at_message` + { + "queue_type": QueueType.QUORUM, + "max_attempts_at_message": 4, + }, + "quorum", + 5, + 4, + False, + ), + ( # unlimited `max_attempts_at_message` + { + "queue_type": QueueType.QUORUM, + "max_attempts_at_message": None, + }, + "quorum", + "-1", + None, + False, + ), + ( # default `max_attempts_at_message` + { + "queue_type": QueueType.QUORUM, + }, + "quorum", + 21, + 20, + False, + ), + ( # classic queue type + { + "queue_type": QueueType.CLASSIC, + }, + "classic", + False, + None, + False, + ), + ( # classic queue type with `max_attempts_at_message` at `None` + { + "queue_type": QueueType.CLASSIC, + "max_attempts_at_message": None, + }, + "classic", + False, + None, + False, + ), + ({}, "classic", False, None, False), # default queue type + ( # `x-queue-type` should raise + {"declare_queues_kwargs": {"arguments": {"x-queue-type": "classic"}}}, + "classic", + False, + None, + True, + ), + ( # `x-delivery-limit` should raise + {"declare_queues_kwargs": {"arguments": {"x-delivery-limit": 3}}}, + None, + None, + None, + True, + ), + ( # classic queue type with `max_attempts_at_message` should raise + { + "queue_type": QueueType.CLASSIC, + "max_attempts_at_message": 3, + }, + "classic", + False, + None, + True, + ), + ], +) +async def test_broker_arguments( + amqp_url: str, + args: dict[str, Any], + x_delivery_limit: int | str | Literal[False], + x_queue_type: str, + max_attempts_at_message: int | None, + raised_exception: bool, +) -> None: + if raised_exception: + with pytest.raises(ValueError): + broker = AioPikaBroker(amqp_url, **args) + return + else: + broker = AioPikaBroker(amqp_url, **args) + await broker.startup() + queue = await broker.declare_queues(broker.write_channel) # type: ignore[arg-type] + + assert queue.arguments["x-dead-letter-exchange"] == "" # type: ignore[index] + assert queue.arguments["x-dead-letter-routing-key"] == "taskiq.dead_letter" # type: ignore[index] + assert queue.arguments["x-queue-type"] == x_queue_type # type: ignore[index] + if x_delivery_limit: + assert queue.arguments["x-delivery-limit"] == x_delivery_limit # type: ignore[index] + assert broker.max_attempts_at_message == max_attempts_at_message + + await broker.shutdown()