From fceacd2ff90341fe09061937fe50e15548f4a496 Mon Sep 17 00:00:00 2001 From: Robert Betts Date: Sun, 15 Oct 2023 00:11:47 +0100 Subject: [PATCH] increment version to 0.1.8 --- pyproject.toml | 2 +- src/nuropb/contexts/context_manager.py | 2 +- src/nuropb/contexts/describe.py | 23 ++-- src/nuropb/contexts/service_handlers.py | 26 ++-- src/nuropb/encodings/encryption.py | 4 +- src/nuropb/interface.py | 13 +- src/nuropb/nuropb_api.py | 114 ++++++++++-------- src/nuropb/rmq_api.py | 25 ++-- src/nuropb/rmq_lib.py | 39 +++--- src/nuropb/rmq_transport.py | 94 ++++++++++----- src/nuropb/service_runner.py | 3 - src/nuropb/testing/stubs.py | 13 +- src/nuropb/utils.py | 4 +- tests/contexts/test_describe.py | 45 ++++--- tests/encodings/test_json_serialisation.py | 7 +- tests/mesh/test_service_discover.py | 15 +-- tests/test_message_routing.py | 8 +- tests/test_nuropb_api.py | 18 +-- tests/test_service_container.py | 4 +- .../test_channel_state.py | 6 +- .../test_connection_properties.py | 15 +-- tests/transport_connection/test_rqm_api.py | 1 - .../test_tls_connection.py | 44 ++++--- 23 files changed, 291 insertions(+), 234 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5203864..47e5179 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "nuropb" -version = "0.1.7" +version = "0.1.8" description = "NuroPb - A Distributed Event Driven Service Mesh" authors = ["Robert Betts "] readme = "README.md" diff --git a/src/nuropb/contexts/context_manager.py b/src/nuropb/contexts/context_manager.py index 5fb5eff..753c41b 100644 --- a/src/nuropb/contexts/context_manager.py +++ b/src/nuropb/contexts/context_manager.py @@ -79,7 +79,7 @@ def error(self) -> Dict[str, Any] | None: } def add_event(self, event: Dict[str, Any]) -> None: - """ Add an event to the context manager. The event will be sent to the service mesh when the context manager + """Add an event to the context manager. The event will be sent to the service mesh when the context manager exits successfully. Event format diff --git a/src/nuropb/contexts/describe.py b/src/nuropb/contexts/describe.py index 38ebc4b..0129b8c 100644 --- a/src/nuropb/contexts/describe.py +++ b/src/nuropb/contexts/describe.py @@ -87,8 +87,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: def describe_service(class_instance: object) -> Dict[str, Any] | None: - """Returns a description of the class methods that will be exposed to the service mesh - """ + """Returns a description of the class methods that will be exposed to the service mesh""" service_info = { "service_name": "", "service_version": "", @@ -186,18 +185,22 @@ def map_argument(arg_props: Any) -> Tuple[str, Dict[str, Any]]: } methods.append((name, method_spec)) - service_info.update({ - "service_name": service_name, - "service_version": service_version, - "description": service_description, - "encrypted_methods": service_has_encrypted_methods, - "methods": dict(methods), - }) + service_info.update( + { + "service_name": service_name, + "service_version": service_version, + "description": service_description, + "encrypted_methods": service_has_encrypted_methods, + "methods": dict(methods), + } + ) if service_has_encrypted_methods: private_key = service_name = getattr(class_instance, "_private_key", None) if private_key is None: - service_info["warnings"].append("Service has encrypted methods but no private key has been set.") + service_info["warnings"].append( + "Service has encrypted methods but no private key has been set." + ) logger.debug( f"Service {service_name} has encrypted methods but no private key has been set" ) diff --git a/src/nuropb/contexts/service_handlers.py b/src/nuropb/contexts/service_handlers.py index 3b1a418..ed599c7 100644 --- a/src/nuropb/contexts/service_handlers.py +++ b/src/nuropb/contexts/service_handlers.py @@ -282,8 +282,8 @@ def handle_execution_result( acknowledgement = "reject" if isinstance(result, BaseException): - """ Create NuroPb response from an exception, and update acknowledgement type. - NOTE: Some exceptions are special cases and not necessarily errors. For example, + """Create NuroPb response from an exception, and update acknowledgement type. + NOTE: Some exceptions are special cases and not necessarily errors. For example, NuropbCallAgain and NuropbSuccess are not errors. """ ( @@ -298,15 +298,14 @@ def handle_execution_result( logger.exception(result) if service_message["nuropb_type"] in ("event", "command"): - """There is no requirement to handle the response of instance._event_handler result, only to - positively acknowledge the event. There is also no requirements to handle the response of + """There is no requirement to handle the response of instance._event_handler result, only to + positively acknowledge the event. There is also no requirements to handle the response of a command, only to positively acknowledge the command. """ pass # Do nothing if service_message["nuropb_type"] == "request": - """Create NuroPb response from the service call result - """ + """Create NuroPb response from the service call result""" if isinstance(error, BaseException): pyload_error = error_dict_from_exception(error) if verbose: @@ -366,11 +365,9 @@ def execute_request( result = None try: - payload = service_message["nuropb_payload"] if service_message["nuropb_type"] == "event": - payload = service_message["nuropb_payload"] topic = payload["topic"] event = payload["event"] @@ -405,7 +402,9 @@ def execute_request( payload=payload, exception=None, ) - handle_execution_result(service_message, exception_result, message_complete_callback) + handle_execution_result( + service_message, exception_result, message_complete_callback + ) return try: @@ -434,16 +433,19 @@ def execute_request( else: exception_result = err - handle_execution_result(service_message, exception_result, message_complete_callback) + handle_execution_result( + service_message, exception_result, message_complete_callback + ) return if asyncio.isfuture(result) or asyncio.iscoroutine(result): - if is_future(result): exception_result = ValueError( "Tornado Future detected, please use asyncio.Future instead" ) - handle_execution_result(service_message, exception_result, message_complete_callback) + handle_execution_result( + service_message, exception_result, message_complete_callback + ) return def future_done_callback(future: Awaitable[Any]) -> None: diff --git a/src/nuropb/encodings/encryption.py b/src/nuropb/encodings/encryption.py index 34da85a..6ea8340 100644 --- a/src/nuropb/encodings/encryption.py +++ b/src/nuropb/encodings/encryption.py @@ -92,9 +92,7 @@ def new_symmetric_key(cls) -> bytes: """ return Fernet.generate_key() - def add_public_key( - self, service_name: str, public_key: rsa.RSAPublicKey - ) -> None: + def add_public_key(self, service_name: str, public_key: rsa.RSAPublicKey) -> None: """Add a public key for a service :param service_name: str :param public_key: rsa.RSAPublicKey diff --git a/src/nuropb/interface.py b/src/nuropb/interface.py index ea263f9..c00b100 100644 --- a/src/nuropb/interface.py +++ b/src/nuropb/interface.py @@ -15,7 +15,7 @@ logger = logging.getLogger(__name__) -NUROPB_VERSION = "0.1.7" +NUROPB_VERSION = "0.1.8" NUROPB_PROTOCOL_VERSION = "0.1.1" NUROPB_PROTOCOL_VERSIONS_SUPPORTED = ("0.1.1",) NUROPB_MESSAGE_TYPES = ( @@ -260,14 +260,15 @@ class NuropbTimeoutError(NuropbException): class NuropbTransportError(NuropbException): """NuropbTransportError: represents an error that inside the plumbing.""" + _close_connection: bool def __init__( - self, - description: Optional[str] = None, - payload: Optional[PayloadDict] = None, - exception: Optional[BaseException] = None, - close_connection: bool = False, + self, + description: Optional[str] = None, + payload: Optional[PayloadDict] = None, + exception: Optional[BaseException] = None, + close_connection: bool = False, ): super().__init__( description=description, diff --git a/src/nuropb/nuropb_api.py b/src/nuropb/nuropb_api.py index 9c9f501..d4f4d19 100644 --- a/src/nuropb/nuropb_api.py +++ b/src/nuropb/nuropb_api.py @@ -6,13 +6,20 @@ from uuid import uuid4 from nuropb.rmq_api import RMQAPI -from nuropb.rmq_lib import configure_nuropb_rmq, create_virtual_host, build_amqp_url, build_rmq_api_url, \ - rmq_api_url_from_amqp_url +from nuropb.rmq_lib import ( + configure_nuropb_rmq, + create_virtual_host, + build_amqp_url, + build_rmq_api_url, + rmq_api_url_from_amqp_url, +) logger = logging.getLogger(__name__) -def default_connection_properties(connection_properties: Dict[str, Any]) -> Dict[str, Any]: +def default_connection_properties( + connection_properties: Dict[str, Any] +) -> Dict[str, Any]: if "host" not in connection_properties: connection_properties["host"] = "localhost" if "username" not in connection_properties: @@ -34,13 +41,13 @@ def default_connection_properties(connection_properties: Dict[str, Any]) -> Dict def create_client( - name: Optional[str] = None, - instance_id: Optional[str] = None, - connection_properties: Optional[Dict[str, Any]] = None, - transport_settings: Optional[str | Dict[str, Any]] = None, - transport: Optional[RMQAPI] = RMQAPI, + name: Optional[str] = None, + instance_id: Optional[str] = None, + connection_properties: Optional[Dict[str, Any]] = None, + transport_settings: Optional[str | Dict[str, Any]] = None, + transport: Optional[RMQAPI] = RMQAPI, ) -> RMQAPI: - """ Create a client api instance for the nuropb service mesh. This caller of this function + """Create a client api instance for the nuropb service mesh. This caller of this function will have to implement the asyncio call to connect to the service mesh: await client_api.connect() @@ -53,11 +60,13 @@ def create_client( """ if connection_properties is None: - connection_properties = default_connection_properties({ - "vhost": "nuropb", - "ssl": False, - "verify": False, - }) + connection_properties = default_connection_properties( + { + "vhost": "nuropb", + "ssl": False, + "verify": False, + } + ) elif isinstance(connection_properties, dict): connection_properties = default_connection_properties(connection_properties) @@ -84,19 +93,21 @@ async def connect(instance_id: Optional[str] = None): def configure_mesh( - mesh_name: Optional[str] = None, - connection_properties: Optional[Dict[str, Any]] = None, - transport_settings: Optional[str | Dict[str, Any]] = None, + mesh_name: Optional[str] = None, + connection_properties: Optional[Dict[str, Any]] = None, + transport_settings: Optional[str | Dict[str, Any]] = None, ): if mesh_name is None: mesh_name = "nuropb" if connection_properties is None: - connection_properties = default_connection_properties({ - "vhost": mesh_name, - "ssl": False, - "verify": False, - }) + connection_properties = default_connection_properties( + { + "vhost": mesh_name, + "ssl": False, + "verify": False, + } + ) if isinstance(connection_properties, str): amqp_url = connection_properties @@ -115,9 +126,7 @@ def configure_mesh( password = connection_properties["password"] vhost = connection_properties["vhost"] - amqp_url = build_amqp_url( - host, port, username, password, vhost, rmq_scheme - ) + amqp_url = build_amqp_url(host, port, username, password, vhost, rmq_scheme) else: raise ValueError("connection_properties must be a str or dict") @@ -148,21 +157,22 @@ def configure_mesh( class MeshService: - """ A generic service class that can be used to create a connection only service instance for the + """A generic service class that can be used to create a connection only service instance for the nuropb service mesh. This class could also be used as a template or to define a subclass for creating a service instance. """ + _service_name: str _instance_id: str _event_bindings: list[str] _event_callback: Optional[Callable] def __init__( - self, - service_name: str, - instance_id: Optional[str] = None, - event_bindings: Optional[list[str]] = None, - event_callback: Optional[Callable] = None, + self, + service_name: str, + instance_id: Optional[str] = None, + event_bindings: Optional[list[str]] = None, + event_callback: Optional[Callable] = None, ): self._service_name = service_name self._instance_id = instance_id or uuid4().hex @@ -170,12 +180,12 @@ def __init__( self._event_callback = event_callback async def _handle_event_( - self, - topic: str, - event: dict, - target: list[str] | None = None, - context: dict | None = None, - trace_id: str | None = None, + self, + topic: str, + event: dict, + target: list[str] | None = None, + context: dict | None = None, + trace_id: str | None = None, ): _ = self if self._event_callback is not None: @@ -183,16 +193,16 @@ async def _handle_event_( def create_service( - name: str, - instance_id: Optional[str] = None, - service_instance: Optional[object] = None, - connection_properties: Optional[Dict[str, Any]] = None, - transport_settings: Optional[str | Dict[str, Any]] = None, - transport: Optional[RMQAPI] = RMQAPI, - event_bindings: Optional[list[str]] = None, - event_callback: Optional[Callable] = None, + name: str, + instance_id: Optional[str] = None, + service_instance: Optional[object] = None, + connection_properties: Optional[Dict[str, Any]] = None, + transport_settings: Optional[str | Dict[str, Any]] = None, + transport: Optional[RMQAPI] = RMQAPI, + event_bindings: Optional[list[str]] = None, + event_callback: Optional[Callable] = None, ) -> RMQAPI: - """ Create a client api instance for the nuropb service mesh. This caller of this function + """Create a client api instance for the nuropb service mesh. This caller of this function will have to implement the asyncio call to connect to the service mesh: await client_api.connect() @@ -215,11 +225,13 @@ def create_service( """ if connection_properties is None: - connection_properties = default_connection_properties({ - "vhost": "nuropb", - "ssl": False, - "verify": False, - }) + connection_properties = default_connection_properties( + { + "vhost": "nuropb", + "ssl": False, + "verify": False, + } + ) elif isinstance(connection_properties, dict): connection_properties = default_connection_properties(connection_properties) diff --git a/src/nuropb/rmq_api.py b/src/nuropb/rmq_api.py index 1809017..0bce672 100644 --- a/src/nuropb/rmq_api.py +++ b/src/nuropb/rmq_api.py @@ -252,15 +252,15 @@ def receive_transport_message( """ The logic below is only relevant for incoming service messages """ if self._service_instance is None: - error_description = ( - f"No service instance configured to handle the {service_message['nuropb_type']} instruction" - ) + error_description = f"No service instance configured to handle the {service_message['nuropb_type']} instruction" logger.warning(error_description) response = NuropbHandlingError( description=error_description, payload=service_message["nuropb_payload"], ) - handle_execution_result(service_message, response, message_complete_callback) + handle_execution_result( + service_message, response, message_complete_callback + ) return if service_message["nuropb_type"] in ("request", "command", "event"): @@ -277,12 +277,11 @@ def receive_transport_message( @classmethod def _handle_immediate_request_error( - cls, - rpc_response: bool, - payload: RequestPayloadDict | ResponsePayloadDict, - error: Dict[str, Any] | BaseException + cls, + rpc_response: bool, + payload: RequestPayloadDict | ResponsePayloadDict, + error: Dict[str, Any] | BaseException, ) -> ResponsePayloadDict: - if rpc_response and isinstance(error, BaseException): raise error elif rpc_response: @@ -388,7 +387,9 @@ async def request( return self._handle_immediate_request_error(rpc_response, message, err) if response["error"] is not None: - return self._handle_immediate_request_error(rpc_response, response, response["error"]) + return self._handle_immediate_request_error( + rpc_response, response, response["error"] + ) if rpc_response is True: return response["result"] else: @@ -532,7 +533,9 @@ async def describe_service( ) return service_info except Exception as err: - raise ValueError(f"error loading the public key for {service_name}: {err}") + raise ValueError( + f"error loading the public key for {service_name}: {err}" + ) async def requires_encryption(self, service_name: str, method_name: str) -> bool: """requires_encryption: Queries the service discovery information for the service_name diff --git a/src/nuropb/rmq_lib.py b/src/nuropb/rmq_lib.py index 4cfd42b..1e1fe28 100644 --- a/src/nuropb/rmq_lib.py +++ b/src/nuropb/rmq_lib.py @@ -12,13 +12,23 @@ from pika.channel import Channel from pika.credentials import PlainCredentials -from nuropb.interface import PayloadDict, NuropbTransportError, NUROPB_VERSION, NUROPB_PROTOCOL_VERSION +from nuropb.interface import ( + PayloadDict, + NuropbTransportError, + NUROPB_VERSION, + NUROPB_PROTOCOL_VERSION, +) logger = logging.getLogger(__name__) def build_amqp_url( - host: str, port: str | int, username: str, password: str, vhost: str, scheme: str = "amqp" + host: str, + port: str | int, + username: str, + password: str, + vhost: str, + scheme: str = "amqp", ) -> str: """Creates an AMQP URL for connecting to RabbitMQ""" if username: @@ -69,9 +79,9 @@ def rmq_api_url_from_amqp_url( def get_client_connection_properties( - name: Optional[str] = None, - instance_id: Optional[str] = None, - client_only: Optional[bool] = None, + name: Optional[str] = None, + instance_id: Optional[str] = None, + client_only: Optional[bool] = None, ) -> Dict[str, str]: """Returns the client connection properties for the transport""" try: @@ -98,11 +108,11 @@ def get_client_connection_properties( def get_connection_parameters( - amqp_url: str | Dict[str, Any], - name: Optional[str] = None, - instance_id: Optional[str] = None, - client_only: Optional[bool] = None, - **overrides: Any + amqp_url: str | Dict[str, Any], + name: Optional[str] = None, + instance_id: Optional[str] = None, + client_only: Optional[bool] = None, + **overrides: Any, ) -> pika.ConnectionParameters | pika.URLParameters: """Return the connection parameters for the transport :param amqp_url: the AMQP URL or connection parameters to use @@ -147,8 +157,7 @@ def get_connection_parameters( # For client x509 certificate authentication if amqp_url.get("certfile"): context.load_cert_chain( - certfile=amqp_url.get("certfile"), - keyfile=amqp_url.get("keyfile") + certfile=amqp_url.get("certfile"), keyfile=amqp_url.get("keyfile") ) # Whether to disable SSL certificate verification @@ -159,10 +168,7 @@ def get_connection_parameters( context.check_hostname = True context.verify_mode = ssl.CERT_REQUIRED - ssl_options = pika.SSLOptions( - context=context, - server_hostname=host - ) + ssl_options = pika.SSLOptions(context=context, server_hostname=host) pika_parameters["ssl_options"] = ssl_options if pika_parameters["port"] is None and use_ssl: @@ -170,7 +176,6 @@ def get_connection_parameters( elif pika_parameters["port"] is None: pika_parameters["port"] = 5672 - if amqp_url.get("username", None): credentials = PlainCredentials(amqp_url["username"], amqp_url["password"]) pika_parameters["credentials"] = credentials diff --git a/src/nuropb/rmq_transport.py b/src/nuropb/rmq_transport.py index 9733908..09bd71f 100644 --- a/src/nuropb/rmq_transport.py +++ b/src/nuropb/rmq_transport.py @@ -8,7 +8,11 @@ from pika import connection from pika.adapters.asyncio_connection import AsyncioConnection from pika.channel import Channel -from pika.exceptions import ChannelClosedByBroker, ProbableAccessDeniedError, ChannelClosedByClient +from pika.exceptions import ( + ChannelClosedByBroker, + ProbableAccessDeniedError, + ChannelClosedByClient, +) import pika.spec from pika.frame import Method @@ -24,11 +28,14 @@ AcknowledgeAction, NUROPB_PROTOCOL_VERSION, NUROPB_VERSION, - NuropbCallAgainReject, RequestPayloadDict, ResponsePayloadDict, + NuropbCallAgainReject, + RequestPayloadDict, + ResponsePayloadDict, ) from nuropb.rmq_lib import ( create_virtual_host, - configure_nuropb_rmq, get_connection_parameters, + configure_nuropb_rmq, + get_connection_parameters, ) from nuropb.contexts import service_handlers from nuropb.contexts.service_handlers import ( @@ -112,11 +119,12 @@ class ServiceNotConfigured(Exception): """Raised when a service is not properly configured on the RabbitMQ broker. the leader will be expected to configure the Exchange and service queues """ + pass class RMQTransport: - """ RMQTransport is the base class for the RabbitMQ transport. It wraps the + """RMQTransport is the base class for the RabbitMQ transport. It wraps the NuroPb service mesh patterns and rules over the AMQP protocol. When RabbitMQ closes the connection, this class will stop and alert that @@ -127,6 +135,7 @@ class RMQTransport: TODO: Configure the Pika client connection attributes in the pika client properties. """ + _service_name: str _instance_id: str _amqp_url: str | Dict[str, Any] @@ -239,17 +248,27 @@ def __init__( self._instance_id = instance_id self._amqp_url = amqp_url self._rpc_exchange = kwargs.get("rpc_exchange", None) or "nuropb-rpc-exchange" - self._events_exchange = kwargs.get("events_exchange", None) or "nuropb-events-exchange" + self._events_exchange = ( + kwargs.get("events_exchange", None) or "nuropb-events-exchange" + ) self._dl_exchange = kwargs.get("dl_exchange", None) or "nuropb-dl-exchange" - self._dl_queue = kwargs.get("dl_queue", None) or f"nuropb-{self._service_name}-dl" - self._service_queue = kwargs.get("service_queue", None) or f"nuropb-{self._service_name}-sq" + self._dl_queue = ( + kwargs.get("dl_queue", None) or f"nuropb-{self._service_name}-dl" + ) + self._service_queue = ( + kwargs.get("service_queue", None) or f"nuropb-{self._service_name}-sq" + ) self._response_queue = ( - kwargs.get("response_queue", None) - or f"nuropb-{self._service_name}-{self._instance_id}-rq" + kwargs.get("response_queue", None) + or f"nuropb-{self._service_name}-{self._instance_id}-rq" ) self._rpc_bindings = rpc_bindings self._event_bindings = event_bindings - self._prefetch_count = 1 if kwargs.get("prefetch_count", None) is None else kwargs.get("prefetch_count", 1) + self._prefetch_count = ( + 1 + if kwargs.get("prefetch_count", None) is None + else kwargs.get("prefetch_count", 1) + ) self._default_ttl = default_ttl or 60 * 60 * 1000 * 12 # 12 hours self._message_callback = message_callback self._rpc_bindings.add(self._service_name) @@ -400,15 +419,18 @@ async def start(self) -> None: ) raise err except NuropbTransportError as err: - """ Logging already captured, handle the error, likely a channel closed by broker - """ + """Logging already captured, handle the error, likely a channel closed by broker""" if not self._connected_future.done(): self._connected_future.set_exception(err) if err.close_connection: await self.stop() except Exception as err: - logger.exception("General failure connecting to RabbitMQ. %s: %s", type(err).__name__, err) + logger.exception( + "General failure connecting to RabbitMQ. %s: %s", + type(err).__name__, + err, + ) if not self._connected_future.done(): self._connected_future.set_exception(err) @@ -510,7 +532,9 @@ def on_connection_open(self, _connection: AsyncioConnection) -> None: self.open_channel() - def on_connection_open_error(self, conn: AsyncioConnection, reason: Exception) -> None: + def on_connection_open_error( + self, conn: AsyncioConnection, reason: Exception + ) -> None: """This method is called by pika if the connection to RabbitMQ can't be established. :param pika.adapters.asyncio_connection.AsyncioConnection conn: @@ -522,11 +546,13 @@ def on_connection_open_error(self, conn: AsyncioConnection, reason: Exception) - close_connection = True else: close_connection = False - self._connected_future.set_exception(NuropbTransportError( - description=f"Connection open Error. {type(reason).__name__}: {reason}", - exception=reason, - close_connection=close_connection, - )) + self._connected_future.set_exception( + NuropbTransportError( + description=f"Connection open Error. {type(reason).__name__}: {reason}", + exception=reason, + close_connection=close_connection, + ) + ) if self._connecting: self._connecting = False @@ -616,7 +642,10 @@ def on_channel_closed(self, channel: Channel, reason: Exception) -> None: " There is already a response queue setup with the same name and instance_id," " and hence this service is considered single instance only" ) - elif reason.reply_code == 403 and "Provided JWT token has expired" in reason.reply_text: + elif ( + reason.reply_code == 403 + and "Provided JWT token has expired" in reason.reply_text + ): reason_description = ( f"RabbitMQ channel closed by the broker ({reason.reply_code})." f" AuthenticationExpired: {reason.reply_text}" @@ -629,8 +658,8 @@ def on_channel_closed(self, channel: Channel, reason: Exception) -> None: logger.critical(reason_description) if self._connected_future and not self._connected_future.done(): - """ the Connection is still in press and when the channel was closed by the broker - so treat as a serious error and close the connection + """the Connection is still in press and when the channel was closed by the broker + so treat as a serious error and close the connection """ self._connected_future.set_exception( NuropbTransportError( @@ -641,22 +670,19 @@ def on_channel_closed(self, channel: Channel, reason: Exception) -> None: ) elif self._connected_future and self._connected_future.done(): - """ The channel was close after the connection was established - """ + """The channel was close after the connection was established""" if reason.reply_code in (403, 404): - """ There is no point in auto reconnecting when access is refused, so + """There is no point in auto reconnecting when access is refused, so shut the connection down. """ asyncio.create_task(self.stop()) else: - """ It's ok to try and re-open the channel - """ + """It's ok to try and re-open the channel""" logging.info("Re-opening channel") self.open_channel() elif not isinstance(reason, ChannelClosedByClient): - """ Log the reason for the channel close and allow the re-open process to continue - """ + """Log the reason for the channel close and allow the re-open process to continue""" reason_description = ( f"RabbitMQ channel closed ({reason.reply_code})." f"{type(reason).__name__}: {reason}" @@ -1501,7 +1527,9 @@ async def stop_consuming(self) -> None: if self._channel is None or self._channel.is_closed: return else: - logger.info("Stopping consumers and sending a Basic.Cancel command to RabbitMQ") + logger.info( + "Stopping consumers and sending a Basic.Cancel command to RabbitMQ" + ) all_consumers_closed: Awaitable[bool] = asyncio.Future() def _on_cancel_ok(frame: pika.frame.Method) -> None: @@ -1516,11 +1544,11 @@ def _on_cancel_ok(frame: pika.frame.Method) -> None: try: logger.info( - "Waiting for %ss for consumers to close", CONSUMER_CLOSED_WAIT_TIMEOUT + "Waiting for %ss for consumers to close", + CONSUMER_CLOSED_WAIT_TIMEOUT, ) await asyncio.wait_for( - all_consumers_closed, - timeout=CONSUMER_CLOSED_WAIT_TIMEOUT + all_consumers_closed, timeout=CONSUMER_CLOSED_WAIT_TIMEOUT ) logger.info("Consumers to gracefully closed") except asyncio.TimeoutError: diff --git a/src/nuropb/service_runner.py b/src/nuropb/service_runner.py index a5c4cfc..72a1a10 100644 --- a/src/nuropb/service_runner.py +++ b/src/nuropb/service_runner.py @@ -118,7 +118,6 @@ def __init__( self._etcd_lease = None self._etcd_watcher = None - if not self._etcd_config: logger.info("etcd features are disabled") self.running_state = "running-standalone" @@ -129,7 +128,6 @@ def __init__( ) ) - # ***NOTE*** MOVED THIS CODE to self.start() # if self._etcd_config: # """asyncio NOTE: the etcd3 client is initialized as an asyncio task and will run @@ -137,7 +135,6 @@ def __init__( # """ # task = asyncio.create_task(self.init_etcd(on_startup=True)) - @property def running_state(self) -> ContainerRunningState: """running_state: the current running state of the service container.""" diff --git a/src/nuropb/testing/stubs.py b/src/nuropb/testing/stubs.py index 7dff52e..f6c3144 100644 --- a/src/nuropb/testing/stubs.py +++ b/src/nuropb/testing/stubs.py @@ -34,16 +34,17 @@ class ServiceStub: _private_key: rsa.RSAPrivateKey def __init__( - self, - service_name: str, - instance_id: Optional[str] = None, - private_key: Optional[rsa.RSAPrivateKey] = None, + self, + service_name: str, + instance_id: Optional[str] = None, + private_key: Optional[rsa.RSAPrivateKey] = None, ): self._service_name = service_name self._instance_id = instance_id or uuid4().hex self._private_key = private_key or rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) + @property def service_name(self) -> str: return self._service_name @@ -95,7 +96,9 @@ def test_success_error(self, **kwargs: Any) -> None: @nuropb_context @publish_to_mesh(authorize_func=get_claims_from_token) - def test_requires_user_claims(self, ctx: NuropbContextManager, **kwargs: Any) -> Any: + def test_requires_user_claims( + self, ctx: NuropbContextManager, **kwargs: Any + ) -> Any: assert isinstance(self, ServiceExample) assert isinstance(ctx, NuropbContextManager) self._method_call_count += 1 diff --git a/src/nuropb/utils.py b/src/nuropb/utils.py index fb8e27e..087f250 100644 --- a/src/nuropb/utils.py +++ b/src/nuropb/utils.py @@ -21,7 +21,9 @@ def obfuscate_credentials(url_with_credentials: str | Dict[str, Any]) -> str: else: scheme = "amqp" - return "{scheme}://{username}:@{host}{port}/{vhost}".format(scheme=scheme, **url_with_credentials) + return "{scheme}://{username}:@{host}{port}/{vhost}".format( + scheme=scheme, **url_with_credentials + ) pattern = r"(:.*?@)" result = re.sub( diff --git a/tests/contexts/test_describe.py b/tests/contexts/test_describe.py index 9416923..f2b309b 100644 --- a/tests/contexts/test_describe.py +++ b/tests/contexts/test_describe.py @@ -71,10 +71,10 @@ "description": "The date of the order", }, }, - "required": ["account", "security", "quantity", "side"] - } + "required": ["account", "security", "quantity", "side"], + }, } - ] + ], } @@ -102,6 +102,7 @@ class OrderManagementService: """ Some useful documentation to describe the characteristic of the service and its purpose """ + _service_name = "oms_v2" _instance_id = uuid4().hex _version = "2.0.1" @@ -112,13 +113,13 @@ class OrderManagementService: @nuropb_context @publish_to_mesh(requires_encryption=True) async def get_orders( - self, - ctx: NuropbContextManager, - order_date: datetime.datetime, - account: Optional[str] = None, - status: Optional[str] = "", - security: Optional[str] = None, - side: Optional[str] = None + self, + ctx: NuropbContextManager, + order_date: datetime.datetime, + account: Optional[str] = None, + status: Optional[str] = "", + security: Optional[str] = None, + side: Optional[str] = None, ) -> List[Order]: _ = order_date, account, status, security, side assert isinstance(self, OrderManagementService) @@ -130,10 +131,9 @@ async def get_orders( async def create_order(self, ctx: NuropbContextManager) -> Order: assert isinstance(self, OrderManagementService) assert isinstance(ctx, NuropbContextManager) - new_order = Order(account="ABC1234", - security="SSE.L", - quantity=1000, - side="sell") + new_order = Order( + account="ABC1234", security="SSE.L", quantity=1000, side="sell" + ) return new_order @nuropb_context @@ -149,8 +149,8 @@ async def undecorated_method(self) -> str: class Service: - """ Some useful documentation to describe the characteristic of the service and its purpose - """ + """Some useful documentation to describe the characteristic of the service and its purpose""" + service_name = "describe_service" def hello(self, _param1: str, param2: str = "value2") -> str: @@ -159,13 +159,13 @@ def hello(self, _param1: str, param2: str = "value2") -> str: @nuropb_context async def do_some_transaction( - self, - ctx: NuropbContextManager, - order_no: str, - order_date: datetime.datetime, - order_amount: float + self, + ctx: NuropbContextManager, + order_no: str, + order_date: datetime.datetime, + order_amount: float, ) -> str: - """ Some useful documentation for this method + """Some useful documentation for this method :param ctx: :param order_no: @@ -188,4 +188,3 @@ def test_instance_describe(): assert len(result["methods"]) == 3 assert len(result["methods"]["create_order"]) == 3 assert result["methods"]["get_orders"]["requires_encryption"] is True - diff --git a/tests/encodings/test_json_serialisation.py b/tests/encodings/test_json_serialisation.py index 46b777f..59b8288 100644 --- a/tests/encodings/test_json_serialisation.py +++ b/tests/encodings/test_json_serialisation.py @@ -64,10 +64,9 @@ def generic_payload(): "datetime": datetime.datetime(2020, 1, 1, 0, 0, 0), "set": {1, 2, Decimal(3)}, }, - "dataclass": Order(account="ABC1234", - security="SSE.L", - quantity=1000, - side="sell") + "dataclass": Order( + account="ABC1234", security="SSE.L", quantity=1000, side="sell" + ), } diff --git a/tests/mesh/test_service_discover.py b/tests/mesh/test_service_discover.py index 4b70663..5093ca8 100644 --- a/tests/mesh/test_service_discover.py +++ b/tests/mesh/test_service_discover.py @@ -10,7 +10,6 @@ @pytest.mark.asyncio async def test_requires_user_token(mesh_client, mesh_service): - await mesh_service.connect() assert mesh_service.connected is True logger.info("SERVICE API CONNECTED") @@ -45,7 +44,7 @@ async def test_requires_user_token(mesh_client, mesh_service): @pytest.mark.asyncio async def mesh_service_describe(mesh_client, mesh_service): - """ Call the describe function for a service on the mesh. This should return a dictionary + """Call the describe function for a service on the mesh. This should return a dictionary describing the service and its methods. """ @@ -74,7 +73,7 @@ async def mesh_service_describe(mesh_client, mesh_service): @pytest.mark.asyncio async def mesh_service_describe(mesh_client, mesh_service): - """ user the service mesh api helper function to call the describe function for a service on the mesh. + """user the service mesh api helper function to call the describe function for a service on the mesh. Test that service metta information is cached in the mesh client. """ @@ -97,8 +96,7 @@ async def mesh_service_describe(mesh_client, mesh_service): service = mesh_service.service_name method = "test_requires_encryption" public_key = await mesh_client.requires_encryption( - service_name=service, - method_name=method + service_name=service, method_name=method ) params = {} context = {} @@ -115,7 +113,7 @@ async def mesh_service_describe(mesh_client, mesh_service): @pytest.mark.asyncio(async_timeout=10) async def mesh_service_encrypt(mesh_client, mesh_service): - """ user the service mesh api helper function to call the describe function for a service on the mesh. + """user the service mesh api helper function to call the describe function for a service on the mesh. Test that service metta information is cached in the mesh client. """ @@ -130,9 +128,7 @@ async def mesh_service_encrypt(mesh_client, mesh_service): encrypted = await mesh_client.requires_encryption(service, method) assert encrypted is True params = {} - context = { - "Authorization": "Bearer: user_token" - } + context = {"Authorization": "Bearer: user_token"} rpc_response = await mesh_service.request( service=service, method=method, @@ -142,4 +138,3 @@ async def mesh_service_encrypt(mesh_client, mesh_service): encrypted=encrypted, ) logger.info(f"response: {pformat(rpc_response)}") - diff --git a/tests/test_message_routing.py b/tests/test_message_routing.py index 6fd19ca..85f1897 100644 --- a/tests/test_message_routing.py +++ b/tests/test_message_routing.py @@ -11,7 +11,7 @@ @pytest.mark.asyncio async def test_call_self(test_settings, rmq_settings, service_instance): - """ Currently this test passes, as there is no check for the service name in the request method. + """Currently this test passes, as there is no check for the service name in the request method. Restricting the service name to be different from the service name of the service instance is under consideration for a future release. """ @@ -59,8 +59,10 @@ async def test_call_self(test_settings, rmq_settings, service_instance): @pytest.mark.asyncio -async def test_subscribe_to_events_from_self(test_settings, rmq_settings, service_instance): - """ Currently this test passes, as there is no check to restrict binding to the service queue +async def test_subscribe_to_events_from_self( + test_settings, rmq_settings, service_instance +): + """Currently this test passes, as there is no check to restrict binding to the service queue for events that originate from the service. This restriction is under consideration for a future release. """ diff --git a/tests/test_nuropb_api.py b/tests/test_nuropb_api.py index b9af0e5..18ca7e0 100644 --- a/tests/test_nuropb_api.py +++ b/tests/test_nuropb_api.py @@ -7,12 +7,13 @@ IN_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS") == "true" if IN_GITHUB_ACTIONS: - pytest.skip("Skipping model tests when run in Github Actions", allow_module_level=True) + pytest.skip( + "Skipping model tests when run in Github Actions", allow_module_level=True + ) @pytest.mark.asyncio async def test_client_and_service_api_quick_setup(test_settings, rmq_settings): - transport_settings = dict( dl_exchange=test_settings["dl_exchange"], prefetch_count=test_settings["prefetch_count"], @@ -49,7 +50,6 @@ async def test_client_and_service_api_quick_setup(test_settings, rmq_settings): @pytest.mark.asyncio async def test_client_and_service_api_quick_setup_raw_defaults(rmq_settings): - configure_mesh( connection_properties={ "port": rmq_settings["port"], @@ -61,13 +61,15 @@ async def test_client_and_service_api_quick_setup_raw_defaults(rmq_settings): connection_properties={ "port": rmq_settings["port"], "host": rmq_settings["host"], - } + }, ) await service_api.connect() - client_api = create_client(connection_properties={ - "port": rmq_settings["port"], - "host": rmq_settings["host"], - }) + client_api = create_client( + connection_properties={ + "port": rmq_settings["port"], + "host": rmq_settings["host"], + } + ) await client_api.connect() await client_api.disconnect() diff --git a/tests/test_service_container.py b/tests/test_service_container.py index eb14d89..6742789 100644 --- a/tests/test_service_container.py +++ b/tests/test_service_container.py @@ -11,7 +11,9 @@ @pytest.mark.asyncio -async def test_rmq_api_client_mode(test_settings, rmq_settings, test_api_url, etcd_config): +async def test_rmq_api_client_mode( + test_settings, rmq_settings, test_api_url, etcd_config +): instance_id = uuid4().hex transport_settings = dict( dl_exchange=test_settings["dl_exchange"], diff --git a/tests/transport_connection/test_channel_state.py b/tests/transport_connection/test_channel_state.py index 278ba4e..1f32a51 100644 --- a/tests/transport_connection/test_channel_state.py +++ b/tests/transport_connection/test_channel_state.py @@ -30,7 +30,9 @@ async def test_closed_channel_message_in_flight(mesh_service, mesh_client): await mesh_client.connect() async def close_channel(): - logger.info("closing service's transport channel, by closing the connection after 1 seconds") + logger.info( + "closing service's transport channel, by closing the connection after 1 seconds" + ) await asyncio.sleep(1) mesh_service.transport._connection.close() # nudge the event_loop @@ -49,5 +51,3 @@ async def close_channel(): ) assert result is not None - - diff --git a/tests/transport_connection/test_connection_properties.py b/tests/transport_connection/test_connection_properties.py index e163b7c..542a7cb 100644 --- a/tests/transport_connection/test_connection_properties.py +++ b/tests/transport_connection/test_connection_properties.py @@ -13,7 +13,9 @@ def test_ampq_url_to_api_url(): - api_url = rmq_api_url_from_amqp_url("amqp://guest:guest@localhost:5672/nuropb-example") + api_url = rmq_api_url_from_amqp_url( + "amqp://guest:guest@localhost:5672/nuropb-example" + ) assert api_url == "http://guest:guest@localhost:15672/api" api_url = rmq_api_url_from_amqp_url("amqp://guest@localhost:5672/nuropb-example") @@ -22,7 +24,9 @@ def test_ampq_url_to_api_url(): api_url = rmq_api_url_from_amqp_url("amqp:///nuropb-example") assert api_url == "http://localhost:15672/api" - api_url = rmq_api_url_from_amqp_url("amqps://guest:guest@localhost:5672/nuropb-example") + api_url = rmq_api_url_from_amqp_url( + "amqps://guest:guest@localhost:5672/nuropb-example" + ) assert api_url == "https://guest:guest@localhost:15672/api" api_url = rmq_api_url_from_amqp_url("amqps://guest:guest@localhost/nuropb-example") @@ -83,8 +87,7 @@ def message_callback(*args, **kwargs): @pytest.mark.asyncio async def test_single_instance_connection(rmq_settings, test_settings): - """Test Single instance connections - """ + """Test Single instance connections""" amqp_url = rmq_settings.copy() transport_settings = dict( dl_exchange=test_settings["dl_exchange"], @@ -133,7 +136,6 @@ def message_callback(*args, **kwargs): @pytest.mark.asyncio async def test_bad_credentials(rmq_settings, test_settings): - amqp_url = rmq_settings.copy() amqp_url["username"] = "bad-username" transport_settings = dict( @@ -164,7 +166,6 @@ async def test_bad_credentials(rmq_settings, test_settings): @pytest.mark.asyncio async def test_bad_vhost(rmq_settings, test_settings): - amqp_url = rmq_settings.copy() amqp_url["vhost"] = "bad-vhost" transport_settings = dict( @@ -190,4 +191,4 @@ async def test_bad_vhost(rmq_settings, test_settings): ) await api.connect() logger.info("Connected : %s", api.connected) - assert api.connected is False \ No newline at end of file + assert api.connected is False diff --git a/tests/transport_connection/test_rqm_api.py b/tests/transport_connection/test_rqm_api.py index 90f05b2..be896f7 100644 --- a/tests/transport_connection/test_rqm_api.py +++ b/tests/transport_connection/test_rqm_api.py @@ -43,7 +43,6 @@ async def test_instantiate_api(test_settings, rmq_settings): amqp_url=test_url, ) - rmq_api = RMQAPI( amqp_url=rmq_settings, ) diff --git a/tests/transport_connection/test_tls_connection.py b/tests/transport_connection/test_tls_connection.py index 934d838..9d9a13a 100644 --- a/tests/transport_connection/test_tls_connection.py +++ b/tests/transport_connection/test_tls_connection.py @@ -9,20 +9,24 @@ IN_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS") == "true" if IN_GITHUB_ACTIONS: - pytest.skip("Skipping model tests when run in Github Actions", allow_module_level=True) + pytest.skip( + "Skipping model tests when run in Github Actions", allow_module_level=True + ) @pytest.mark.asyncio async def test_tls_connect(rmq_settings, test_settings): - def message_callback(message): print(message) + amqp_url = rmq_settings.copy() - amqp_url.update({ - "ssl": True, - "port": 5671, - "verify": False, - }) + amqp_url.update( + { + "ssl": True, + "port": 5671, + "verify": False, + } + ) transport_settings = dict( dl_exchange=test_settings["dl_exchange"], rpc_bindings=test_settings["rpc_bindings"], @@ -41,6 +45,7 @@ def message_callback(message): ) await transport1.start() from pika.adapters.utils.io_services_utils import _AsyncSSLTransport + assert isinstance(transport1._connection._transport, _AsyncSSLTransport) assert transport1.connected is True @@ -53,7 +58,7 @@ def message_callback(message): instance_id=service.instance_id, service_instance=service, amqp_url=amqp_url, - transport_settings=transport_settings + transport_settings=transport_settings, ) await api.connect() assert api.connected is True @@ -66,7 +71,6 @@ def message_callback(message): @pytest.mark.asyncio async def test_tls_connect_with_cafile(rmq_settings, test_settings): - def message_callback(message): print(message) @@ -75,13 +79,15 @@ def message_callback(message): keyfile = os.path.join(os.path.dirname(__file__), "key-2.pem") amqp_url = rmq_settings.copy() - amqp_url.update({ - "cafile": cacertfile, - "port": 5671, - "verify": False, - "certfile": certfile, - "keyfile": keyfile, - }) + amqp_url.update( + { + "cafile": cacertfile, + "port": 5671, + "verify": False, + "certfile": certfile, + "keyfile": keyfile, + } + ) transport_settings = dict( dl_exchange=test_settings["dl_exchange"], rpc_bindings=test_settings["rpc_bindings"], @@ -100,6 +106,7 @@ def message_callback(message): ) await transport1.start() from pika.adapters.utils.io_services_utils import _AsyncSSLTransport + assert isinstance(transport1._connection._transport, _AsyncSSLTransport) assert transport1.connected is True @@ -112,7 +119,7 @@ def message_callback(message): instance_id=service.instance_id, service_instance=service, amqp_url=amqp_url, - transport_settings=transport_settings + transport_settings=transport_settings, ) await api.connect() assert api.connected is True @@ -121,6 +128,3 @@ def message_callback(message): assert transport1.connected is False await api.disconnect() assert api.connected is False - - -