Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Quorum Queues to RabbitMQ Broker #2

Open
wants to merge 1 commit into
base: es/add-support-for-deleting-queues
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion dramatiq/brokers/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,13 @@ class RabbitmqBroker(Broker):
.. _ConnectionParameters: https://pika.readthedocs.io/en/0.12.0/modules/parameters.html
"""

def __init__(self, *, confirm_delivery=False, url=None, middleware=None, max_priority=None, parameters=None, **kwargs):
def __init__(self, *, confirm_delivery=False, url=None, middleware=None, max_priority=None, parameters=None,
quorum_queues=False, **kwargs):
super().__init__(middleware=middleware)

if max_priority is not None and quorum_queues:
raise ValueError("Quorum queues don't support message priority")

if max_priority is not None and not (0 < max_priority <= 255):
raise ValueError("max_priority must be a value between 0 and 255")

Expand All @@ -114,6 +118,7 @@ def __init__(self, *, confirm_delivery=False, url=None, middleware=None, max_pri
self.parameters = pika.ConnectionParameters(**kwargs)

self.confirm_delivery = confirm_delivery
self.quorum_queues = quorum_queues
self.max_priority = max_priority
self.connections = set()
self.channels = set()
Expand Down Expand Up @@ -276,9 +281,13 @@ def _build_queue_arguments(self, queue_name):
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": xq_name(queue_name),
}

if self.max_priority:
arguments["x-max-priority"] = self.max_priority

if self.quorum_queues:
arguments["x-queue-type"] = "quorum"

return arguments

def _declare_queue(self, queue_name):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def rel(*xs):
"pytest",
"pytest-benchmark[histogram]",
"pytest-cov",
"requests",
"tox",
]

Expand Down
44 changes: 44 additions & 0 deletions tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import os
import time
from threading import Event
from typing import Any
from unittest.mock import Mock, patch
from urllib.parse import quote

import pika.exceptions
import pytest
import requests
from requests.auth import HTTPBasicAuth

import dramatiq
from dramatiq import Message, QueueJoinTimeout, Worker
Expand All @@ -14,6 +18,14 @@
from .common import RABBITMQ_CREDENTIALS, RABBITMQ_PASSWORD, RABBITMQ_USERNAME


def get_queue_details(queue_name: str, vhost: str = "/") -> Any:
res = requests.get(
f'http://127.0.0.1:15672/api/queues/{quote(vhost, safe="")}/{queue_name}',
auth=HTTPBasicAuth(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
)
return res.json()


def test_urlrabbitmq_creates_instances_of_rabbitmq_broker():
# Given a URL connection string
url = "amqp://%s:%[email protected]:5672" % (RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
Expand Down Expand Up @@ -535,3 +547,35 @@ def test_rabbitmq_flush_all_true_deletes_the_queue():
for queue_name in queues:
with pytest.raises(pika.exceptions.ChannelClosedByBroker, match=r"NOT_FOUND - no queue"):
broker.connection.channel().queue_declare(queue=queue_name, passive=True)


def test_rabbitmq_max_priority_must_be_between_0_and_255():
with pytest.raises(ValueError, match="max_priority must be a value between 0 and 255"):
RabbitmqBroker(host="127.0.0.1", max_priority=300)

with pytest.raises(ValueError, match="max_priority must be a value between 0 and 255"):
RabbitmqBroker(host="127.0.0.1", max_priority=-2)


def test_rabbitmq_max_priority_cant_be_enabled_with_quorum_queues():
assert RabbitmqBroker(host="127.0.0.1", max_priority=10)

with pytest.raises(ValueError, match="Quorum queues don't support message priority"):
RabbitmqBroker(host="127.0.0.1", max_priority=10, quorum_queues=True)


def test_rabbitmq_support_for_quorum_queues():
queue_name = f"random_queue_{current_millis()}"
url = "amqp://%s:%[email protected]:5672" % (RABBITMQ_USERNAME, RABBITMQ_PASSWORD)

broker = RabbitmqBroker(url=url, quorum_queues=False)
broker.declare_queue(queue_name, ensure=True)
queue = get_queue_details(queue_name)
assert "quorum" != queue["type"]
broker.flush(queue_name, delete_queue=True)

broker = RabbitmqBroker(url=url, quorum_queues=True)
broker.declare_queue(queue_name, ensure=True)
queue = get_queue_details(queue_name)
assert "quorum" == queue["type"]
broker.flush(queue_name, delete_queue=True)