Skip to content

Commit

Permalink
Merge pull request #29 from robertbetts/develop
Browse files Browse the repository at this point in the history
Connection properties assume tls connection #27
  • Loading branch information
robertbetts authored Oct 14, 2023
2 parents 808d83f + 0c58b14 commit 7f0948c
Show file tree
Hide file tree
Showing 12 changed files with 480 additions and 71 deletions.
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ poetry = "^1.5.1"
pytest = "^7.3.1"
certifi = "^2023.7.22"
coverage = {extras = ["toml"], version = "^7.2.7"}
pytest-dotenv = "^0.5.2"
pytest-dotenv = "0.5.2"
black = "^23.3.0"
mypy = "^1.4.1"
pytest-asyncio = "^0.21.1"
Expand Down Expand Up @@ -67,10 +67,10 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.pytest]
env_files = [".env_test"]
testpaths = ["tests"]

[tool.pytest.ini_options]
env_files = [".env"]
testpaths = ["tests"]
asyncio_mode = "strict"
log_cli = true
log_level = "DEBUG"
Expand Down
248 changes: 248 additions & 0 deletions src/nuropb/nuropb_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
"""
Factory functions for instantiating nuropb api's.
"""
import logging
from typing import Optional, Dict, Any, Callable
from uuid import uuid4

from nuropb.rmq_api import RMQAPI
from nuropb.rmq_lib import configure_nuropb_rmq, create_virtual_host, build_amqp_url, build_rmq_api_url, \
rmq_api_url_from_amqp_url

logger = logging.getLogger(__name__)


def default_connection_properties(connection_properties: Dict[str, Any]) -> Dict[str, Any]:
if "host" not in connection_properties:
connection_properties["host"] = "localhost"
if "username" not in connection_properties:
connection_properties["username"] = "guest"
if "password" not in connection_properties:
connection_properties["password"] = "guest"
if "vhost" not in connection_properties:
connection_properties["vhost"] = "nuropb"
if "verify" not in connection_properties:
connection_properties["verify"] = False
if "ssl" not in connection_properties:
connection_properties["ssl"] = False
if "port" not in connection_properties and connection_properties["ssl"]:
connection_properties["port"] = 5671
elif "port" not in connection_properties:
connection_properties["port"] = 5672

return connection_properties


def create_client(
name: Optional[str] = None,
instance_id: Optional[str] = None,
connection_properties: Optional[Dict[str, Any]] = None,
transport_settings: Optional[str | Dict[str, Any]] = None,
transport: Optional[RMQAPI] = RMQAPI,
) -> RMQAPI:
""" Create a client api instance for the nuropb service mesh. This caller of this function
will have to implement the asyncio call to connect to the service mesh:
await client_api.connect()
:param name: used to identify the api connection to the service mesh
:param instance_id: used to create the service mesh response queue for this api connection
:param connection_properties: str or dict with values as required for the chosen transport api client
:param transport_settings: dict with values as required for the underlying transport api
:param transport: the class of the transport api client to use
:return:
"""

if connection_properties is None:
connection_properties = default_connection_properties({
"vhost": "nuropb",
"ssl": False,
"verify": False,
})
elif isinstance(connection_properties, dict):
connection_properties = default_connection_properties(connection_properties)

if transport is None:
transport = RMQAPI
if transport_settings is None:
transport_settings = {}

client_api: RMQAPI = transport(
amqp_url=connection_properties,
service_name=name,
instance_id=instance_id,
transport_settings=transport_settings,
)
return client_api


async def connect(instance_id: Optional[str] = None):
client_api = create_client(
instance_id=instance_id,
)
await client_api.connect()
return client_api


def configure_mesh(
mesh_name: Optional[str] = None,
connection_properties: Optional[Dict[str, Any]] = None,
transport_settings: Optional[str | Dict[str, Any]] = None,
):
if mesh_name is None:
mesh_name = "nuropb"

if connection_properties is None:
connection_properties = default_connection_properties({
"vhost": mesh_name,
"ssl": False,
"verify": False,
})

if isinstance(connection_properties, str):
amqp_url = connection_properties

elif isinstance(connection_properties, dict):
connection_properties = default_connection_properties(connection_properties)

if connection_properties["ssl"]:
rmq_scheme = "amqps"
else:
rmq_scheme = "amqp"

host = connection_properties["host"]
port = connection_properties["port"]
username = connection_properties["username"]
password = connection_properties["password"]
vhost = connection_properties["vhost"]

amqp_url = build_amqp_url(
host, port, username, password, vhost, rmq_scheme
)
else:
raise ValueError("connection_properties must be a str or dict")

rmq_api_url = rmq_api_url_from_amqp_url(amqp_url)
create_virtual_host(
api_url=rmq_api_url,
vhost_url=amqp_url,
)

if transport_settings is None:
transport_settings = {}
if "rpc_exchange" not in transport_settings:
transport_settings["rpc_exchange"] = "nuropb-rpc-exchange"
if "events_exchange" not in transport_settings:
transport_settings["events_exchange"] = "nuropb-events-exchange"
if "dl_exchange" not in transport_settings:
transport_settings["dl_exchange"] = "nuropb-dl-exchange"
if "dl_queue" not in transport_settings:
transport_settings["dl_queue"] = "nuropb-dl-queue"

configure_nuropb_rmq(
rmq_url=connection_properties,
events_exchange=transport_settings["events_exchange"],
rpc_exchange=transport_settings["rpc_exchange"],
dl_exchange=transport_settings["dl_exchange"],
dl_queue=transport_settings["dl_queue"],
)


class MeshService:
""" A generic service class that can be used to create a connection only service instance for the
nuropb service mesh. This class could also be used as a template or to define a subclass for
creating a service instance.
"""
_service_name: str
_instance_id: str
_event_bindings: list[str]
_event_callback: Optional[Callable]

def __init__(
self,
service_name: str,
instance_id: Optional[str] = None,
event_bindings: Optional[list[str]] = None,
event_callback: Optional[Callable] = None,
):
self._service_name = service_name
self._instance_id = instance_id or uuid4().hex
self._event_bindings = event_bindings or []
self._event_callback = event_callback

async def _handle_event_(
self,
topic: str,
event: dict,
target: list[str] | None = None,
context: dict | None = None,
trace_id: str | None = None,
):
_ = self
if self._event_callback is not None:
await self._event_callback(topic, event, target, context, trace_id)


def create_service(
name: str,
instance_id: Optional[str] = None,
service_instance: Optional[object] = None,
connection_properties: Optional[Dict[str, Any]] = None,
transport_settings: Optional[str | Dict[str, Any]] = None,
transport: Optional[RMQAPI] = RMQAPI,
event_bindings: Optional[list[str]] = None,
event_callback: Optional[Callable] = None,
) -> RMQAPI:
""" Create a client api instance for the nuropb service mesh. This caller of this function
will have to implement the asyncio call to connect to the service mesh:
await client_api.connect()
:param name: used to identify this service to the service mesh
:param instance_id: used to create the service mesh response queue for this individual api
connection
:param service_instance: the instance of the service class that is intended to be exposed
to the service mesh
:param connection_properties: str or dict with values as required for the chosen transport
api client
:param transport_settings: dict with values as required for the underlying transport api
:param transport: the class of the transport api client to use
:param event_bindings: when service_instance is None, a list of event topics that this
service will subscribe to.
when service_instance is not None, the list will override the event_bindings of the
transport_settings if any are defined.
:param event_callback: when service_instance is None, a callback function that will be
called when an event is received
:return:
"""

if connection_properties is None:
connection_properties = default_connection_properties({
"vhost": "nuropb",
"ssl": False,
"verify": False,
})
elif isinstance(connection_properties, dict):
connection_properties = default_connection_properties(connection_properties)

if transport is None:
transport = RMQAPI
if transport_settings is None:
transport_settings = {}

if service_instance is None:
service_instance = MeshService(
service_name=name,
instance_id=instance_id,
event_bindings=event_bindings,
event_callback=event_callback,
)
elif event_bindings is not None:
transport_settings["event_bindings"] = event_bindings

service_api: RMQAPI = transport(
amqp_url=connection_properties,
service_name=name,
service_instance=service_instance,
instance_id=instance_id,
transport_settings=transport_settings,
)
return service_api
42 changes: 34 additions & 8 deletions src/nuropb/rmq_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,28 @@


def build_amqp_url(
host: str, port: str | int, username: str, password: str, vhost: str
host: str, port: str | int, username: str, password: str, vhost: str, scheme: str = "amqp"
) -> str:
"""Creates an AMQP URL for connecting to RabbitMQ"""
return f"amqp://{username}:{password}@{host}:{port}/{vhost}"
if username:
password = f":{password}" if password.strip() else ""
return f"{scheme}://{username}{password}@{host}:{port}/{vhost}"
else:
return f"{scheme}://{host}:{port}/{vhost}"


def build_rmq_api_url(
scheme: str, host: str, port: str | int, username: str | None, password: str | None
) -> str:
"""Creates an HTTP URL for connecting to RabbitMQ management API"""
if username is None or password is None:
if username:
if password:
password = f":{password}"
else:
password = ""
return f"{scheme}://{username}{password}@{host}:{port}/api"
else:
return f"{scheme}://{host}:{port}/api"
return f"{scheme}://{username}:{password}@{host}:{port}/api"


def rmq_api_url_from_amqp_url(
Expand All @@ -43,11 +52,19 @@ def rmq_api_url_from_amqp_url(
:return: the RabbitMQ management API URL
"""
url_parts = urlparse(amqp_url)
scheme = scheme or url_parts.scheme
scheme = "https" if scheme == "amqps" else "http"
username = url_parts.username
password = url_parts.password
port = port or url_parts.port
host = url_parts.hostname if url_parts.hostname else "localhost"
port = 15672 if port is None else port
scheme = "http" if scheme is None else scheme
if port:
port = int(port) + 10000
elif not port and scheme == "https":
port = 15671
elif not port:
port = 15672

return build_rmq_api_url(scheme, host, port, username, password)


Expand Down Expand Up @@ -96,7 +113,7 @@ def get_connection_parameters(
"""
if isinstance(amqp_url, dict):
# create TLS connection parameters

use_ssl = amqp_url.get("ssl", False)
host = amqp_url.get("host", None)
port = amqp_url.get("port", None)
pika_parameters = {
Expand All @@ -113,7 +130,10 @@ def get_connection_parameters(
if vhost:
pika_parameters["virtual_host"] = vhost

if amqp_url.get("cafile", None) or amqp_url.get("certfile"):
""" By specifying cafile, it is assumed that the connection will be over SSL/TLS
"""
if use_ssl or amqp_url.get("cafile", None):
use_ssl = True
cafile = amqp_url.get("cafile", None)
if cafile: # pragma: no cover
context = ssl.create_default_context(
Expand Down Expand Up @@ -145,6 +165,12 @@ def get_connection_parameters(
)
pika_parameters["ssl_options"] = ssl_options

if pika_parameters["port"] is None and use_ssl:
pika_parameters["port"] = 5671
elif pika_parameters["port"] is None:
pika_parameters["port"] = 5672


if amqp_url.get("username", None):
credentials = PlainCredentials(amqp_url["username"], amqp_url["password"])
pika_parameters["credentials"] = credentials
Expand Down
4 changes: 2 additions & 2 deletions src/nuropb/rmq_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def __init__(
- int prefetch_count: The number of messages to prefetch defaults to 1, unlimited is 0.
Experiment with larger values for higher throughput in your user case.
When an existing transport initialised and connected, and a subsequent transport
When an existing transport is initialised and connected, and a subsequent transport
instance is connected with the same service_name and instance_id as the first, the broker
will shut down the channel of subsequent instances when they attempt to configure their
response queue. This is because the response queue is opened in exclusive mode. The
Expand Down Expand Up @@ -456,7 +456,7 @@ def connect(self) -> asyncio.Future[bool]:
self._connected_future = asyncio.Future()

connection_parameters = get_connection_parameters(
amqp_url = self._amqp_url,
amqp_url=self._amqp_url,
name=self._service_name,
instance_id=self._instance_id,
client_only=self._client_only,
Expand Down
Loading

0 comments on commit 7f0948c

Please sign in to comment.