Skip to content

Commit

Permalink
Merge pull request #18 from robertbetts/develop
Browse files Browse the repository at this point in the history
TLS RabbitMQ connection support
  • Loading branch information
robertbetts authored Sep 25, 2023
2 parents 38cb213 + 140a01d commit d2da7cc
Show file tree
Hide file tree
Showing 17 changed files with 445 additions and 143 deletions.
4 changes: 0 additions & 4 deletions examples/scripted_mesh_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,11 @@ async def main():
create_virtual_host(rmq_api_url, amqp_url)
transport_settings = api.transport.rmq_configuration
configure_nuropb_rmq(
service_name=service_instance._service_name, # pragma: no cover
rmq_url=amqp_url,
events_exchange=transport_settings["events_exchange"],
rpc_exchange=transport_settings["rpc_exchange"],
dl_exchange=transport_settings["dl_exchange"],
dl_queue=transport_settings["dl_queue"],
service_queue=transport_settings["service_queue"],
rpc_bindings=transport_settings["rpc_bindings"],
event_bindings=transport_settings["event_bindings"],
)


Expand Down
30 changes: 18 additions & 12 deletions src/nuropb/rmq_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class RMQAPI(NuropbInterface):

def __init__(
self,
amqp_url: str,
amqp_url: str | Dict[str, Any],
service_name: str | None = None,
instance_id: str | None = None,
service_instance: object | None = None,
Expand All @@ -53,10 +53,14 @@ def __init__(
transport_settings: Optional[Dict[str, Any]] = None,
):
"""RMQAPI: A NuropbInterface implementation that uses RabbitMQ as the underlying transport."""
parts = amqp_url.split("/")
vhost = amqp_url.split("/")[-1]
if len(parts) < 4:
raise ValueError("Invalid amqp_url, missing vhost")
if isinstance(amqp_url, str):
parts = amqp_url.split("/")
vhost = amqp_url.split("/")[-1]
if len(parts) < 4:
raise ValueError("Invalid amqp_url, missing vhost")
else:
vhost = amqp_url["vhost"]

self._mesh_name = vhost

""" If a service_name is not provided, then the service is a client only and will not be able
Expand Down Expand Up @@ -219,7 +223,9 @@ def receive_transport_message(
""" The logic below is only relevant for incoming service messages
"""
if self._service_instance is None:
error_description = f"No service instance configured to handle the {service_message['nuropb_type']} instruction"
error_description = (
f"No service instance configured to handle the {service_message['nuropb_type']} instruction"
)
logger.warning(error_description)
response = NuropbHandlingError(
description=error_description,
Expand Down Expand Up @@ -424,12 +430,12 @@ def publish_event(
:param topic: str, The routing key on the events exchange
:param event: json-encodable Python Dict.
:param context: dict, The context around gent generation, example content includes:
- user_id: str # a unique user identifier or token of the user that made the request
- correlation_id: str # a unique identifier of the request used to correlate the response
- str user_id: # a unique user identifier or token of the user that made the request
- str correlation_id: # a unique identifier of the request used to correlate the response
# to the request
# or trace the request over the network (e.g. an uuid4 hex string)
- service: str
- method: str
- str service:
- str method:
:param trace_id: str optional
an identifier to trace the request over the network (e.g. an uuid4 hex string)
:param encrypted: bool, if True then the message will be encrypted in transit
Expand Down Expand Up @@ -502,8 +508,8 @@ async def requires_encryption(self, service_name: str, method_name: str) -> bool
and returns True if encryption is required else False.
none of encryption is not required.
:param service_name: str
:param method_name: str
:param service_name:
:param method_name:
:return: bool
"""
service_info = await self.describe_service(service_name)
Expand Down
123 changes: 70 additions & 53 deletions src/nuropb/rmq_lib.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
""" RabbitMQ Utility library for NuroPb
"""
import logging
from typing import Dict, Any, List, Optional
from typing import Dict, Any, Optional
from urllib.parse import urlparse
from contextlib import contextmanager
import ssl

import requests
import pika
from pika.channel import Channel
from pika.credentials import PlainCredentials

from nuropb.interface import PayloadDict, NuropbTransportError

Expand Down Expand Up @@ -48,6 +50,57 @@ def rmq_api_url_from_amqp_url(
return build_rmq_api_url(scheme, host, port, username, password)


def get_connection_parameters(amqp_url: str | Dict[str, Any]) -> pika.ConnectionParameters | pika.URLParameters:
"""Return the connection parameters for the transport"""
if isinstance(amqp_url, dict):
# create TLS connection parameters
cafile = amqp_url.get("cafile", None)
if cafile: # pragma: no cover
context = ssl.create_default_context(
cafile=cafile,
)
else:
context = ssl.create_default_context()

if amqp_url.get("certfile"):
context.load_cert_chain(
certfile=amqp_url.get("certfile"),
keyfile=amqp_url.get("keyfile")
)

if amqp_url.get("verify", True) is False:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
else:
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED

if amqp_url.get("username", None):
credentials = PlainCredentials(amqp_url["username"], amqp_url["password"])
else:
credentials = None

host = amqp_url.get("host", None)
port = amqp_url.get("port", None)
vhost = amqp_url.get("vhost", "/")
ssl_options = pika.SSLOptions(
context=context,
server_hostname=host
)
conn_params = pika.ConnectionParameters(
host=host,
port=port,
virtual_host=vhost,
credentials=credentials,
ssl_options=ssl_options
)
return conn_params

else:
# create connection parameters from amqp_url alone
return pika.URLParameters(amqp_url)


def management_api_session_info(
scheme: str,
host: str,
Expand Down Expand Up @@ -82,11 +135,11 @@ def management_api_session_info(


@contextmanager
def blocking_rabbitmq_channel(rmq_url: str) -> pika.channel.Channel:
def blocking_rabbitmq_channel(rmq_url: str | Dict[str, Any]) -> pika.channel.Channel:
"""Useful for initialisation of queues / exchanges."""
connection = None
try:
parameters = pika.URLParameters(rmq_url)
parameters = get_connection_parameters(rmq_url)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
yield channel
Expand All @@ -101,15 +154,11 @@ def blocking_rabbitmq_channel(rmq_url: str) -> pika.channel.Channel:


def configure_nuropb_rmq(
service_name: str,
rmq_url: str,
rmq_url: str | Dict[str, Any],
events_exchange: str,
rpc_exchange: str,
dl_exchange: str,
dl_queue: str,
service_queue: str,
rpc_bindings: List[str],
event_bindings: List[str],
**kwargs: Any,
) -> bool:
"""Configure the RabbitMQ broker for this transport.
Expand Down Expand Up @@ -149,15 +198,11 @@ def configure_nuropb_rmq(
- named instances of a service each with their own persistent response queue
- Notification and handling of dead letter messages relating to a service
:param str service_name: The name of the service, being configured for
:param str rmq_url: The URL of the RabbitMQ broker
:param str events_exchange: The name of the events exchange
:param str rpc_exchange: The name of the RPC exchange
:param str dl_exchange: The name of the dead letter exchange
:param str dl_queue: The name of the dead letter queue
:param str service_queue: The name of the requests queue
:param List[str] rpc_bindings: The list of RPC bindings
:param List[str] event_bindings: The list of events bindings
:param kwargs: Additional keyword argument overflow from the transport settings.
- client_only: bool - True if this is a client only service, False otherwise
:return: True if the RabbitMQ broker was configured successfully
Expand All @@ -166,9 +211,6 @@ def configure_nuropb_rmq(
logger.info("Client only service, not configuring RMQ")
return True

if len(rpc_bindings) == 0 or service_name not in rpc_bindings:
rpc_bindings.append(service_name)

with blocking_rabbitmq_channel(rmq_url) as channel:
logger.info(f"Declaring the dead letter exchange: {dl_exchange}")
""" Setting up dead letter handling - all requests are automatically sent to a dead letter queue.
Expand Down Expand Up @@ -199,37 +241,6 @@ def configure_nuropb_rmq(
exchange_type="direct",
durable=True,
)
logger.info(
f"Declaring the request queue: {service_queue} to the rpc exchange: {rpc_exchange}"
)
requests_queue_config = {
"durable": True,
"auto_delete": False,
"arguments": {"x-dead-letter-exchange": dl_exchange},
}
channel.queue_declare(queue=service_queue, **requests_queue_config)

""" This NOTE is here for reference only. A response queue is not durable by default
and is the responsibility of the client/service follower to declare on startup.
As the response queue is not durable, it is auto-deleted when the connection is closed.
This is because the response queue is only used for RPC responses, and we don't want to
keep stale responses around. See the notes above about etcd leader/follower configuration
for more information.
responses_queue_config = {"durable": False, "auto_delete": True}
"""

""" Any new bindings will be registered here, however, previous bindings will not be removed.
PLEASE TAKE NOTE, especially in the context of event bindings, this could lead to a lot of
unnecessary traffic and debugging support issues.
"""
for routing_key in rpc_bindings:
logger.info("binding to {}".format(routing_key))
channel.queue_bind(service_queue, rpc_exchange, routing_key)

for routing_key in event_bindings:
logger.info("binding to {}".format(routing_key))
channel.queue_bind(service_queue, events_exchange, routing_key)

rabbitmq_configured = True

Expand Down Expand Up @@ -321,7 +332,7 @@ def get_virtual_host_queues(api_url: str, vhost_url: str) -> Any | None:
return response.json()


def get_virtual_hosts(api_url: str, vhost_url: str) -> Any | None:
def get_virtual_hosts(api_url: str, vhost_url: str | Dict[str, Any]) -> Any | None:
"""Creates a virtual host on the RabbitMQ server using the REST API
:param api_url: the url to the RabbitMQ API
:param vhost_url: the virtual host to create
Expand All @@ -343,15 +354,18 @@ def get_virtual_hosts(api_url: str, vhost_url: str) -> Any | None:
return response.json()


def create_virtual_host(api_url: str, vhost_url: str) -> None:
def create_virtual_host(api_url: str, vhost_url: str | Dict[str, Any]) -> None:
"""Creates a virtual host on the RabbitMQ server using the REST API
:param api_url: the url to the RabbitMQ API
:param vhost_url: the virtual host to create
:return: None
"""
url_parts = urlparse(vhost_url)
vhost = url_parts.path[1:] if url_parts.path.startswith("/") else url_parts.path
if isinstance(vhost_url, dict):
vhost = vhost_url["vhost"]
else:
url_parts = urlparse(vhost_url)
vhost = url_parts.path[1:] if url_parts.path.startswith("/") else url_parts.path

vhost_data = get_virtual_hosts(api_url, vhost_url)
vhost_exists = False
Expand All @@ -376,15 +390,18 @@ def create_virtual_host(api_url: str, vhost_url: str) -> None:
response.raise_for_status()


def delete_virtual_host(api_url: str, vhost_url: str) -> None:
def delete_virtual_host(api_url: str, vhost_url: str | Dict[str, Any]) -> None:
"""Deletes a virtual host on the RabbitMQ server using the REST API
:param api_url: the url to the RabbitMQ API
:param vhost_url: the virtual host to delete
:return: None
"""
url_parts = urlparse(vhost_url)
vhost = url_parts.path[1:] if url_parts.path.startswith("/") else url_parts.path
if isinstance(vhost_url, dict):
vhost = vhost_url["vhost"]
else:
url_parts = urlparse(vhost_url)
vhost = url_parts.path[1:] if url_parts.path.startswith("/") else url_parts.path

vhost_data = get_virtual_hosts(api_url, vhost_url)
vhost_exists = False
Expand Down
Loading

0 comments on commit d2da7cc

Please sign in to comment.