Skip to content

Commit

Permalink
Merge pull request #30 from robertbetts/develop
Browse files Browse the repository at this point in the history
increment version to 0.1.8
  • Loading branch information
robertbetts authored Oct 14, 2023
2 parents 7f0948c + fceacd2 commit 828bbfa
Show file tree
Hide file tree
Showing 23 changed files with 291 additions and 234 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "nuropb"
version = "0.1.7"
version = "0.1.8"
description = "NuroPb - A Distributed Event Driven Service Mesh"
authors = ["Robert Betts <[email protected]>"]
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion src/nuropb/contexts/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def error(self) -> Dict[str, Any] | None:
}

def add_event(self, event: Dict[str, Any]) -> None:
""" Add an event to the context manager. The event will be sent to the service mesh when the context manager
"""Add an event to the context manager. The event will be sent to the service mesh when the context manager
exits successfully.
Event format
Expand Down
23 changes: 13 additions & 10 deletions src/nuropb/contexts/describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:


def describe_service(class_instance: object) -> Dict[str, Any] | None:
"""Returns a description of the class methods that will be exposed to the service mesh
"""
"""Returns a description of the class methods that will be exposed to the service mesh"""
service_info = {
"service_name": "",
"service_version": "",
Expand Down Expand Up @@ -186,18 +185,22 @@ def map_argument(arg_props: Any) -> Tuple[str, Dict[str, Any]]:
}
methods.append((name, method_spec))

service_info.update({
"service_name": service_name,
"service_version": service_version,
"description": service_description,
"encrypted_methods": service_has_encrypted_methods,
"methods": dict(methods),
})
service_info.update(
{
"service_name": service_name,
"service_version": service_version,
"description": service_description,
"encrypted_methods": service_has_encrypted_methods,
"methods": dict(methods),
}
)

if service_has_encrypted_methods:
private_key = service_name = getattr(class_instance, "_private_key", None)
if private_key is None:
service_info["warnings"].append("Service has encrypted methods but no private key has been set.")
service_info["warnings"].append(
"Service has encrypted methods but no private key has been set."
)
logger.debug(
f"Service {service_name} has encrypted methods but no private key has been set"
)
Expand Down
26 changes: 14 additions & 12 deletions src/nuropb/contexts/service_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ def handle_execution_result(
acknowledgement = "reject"

if isinstance(result, BaseException):
""" Create NuroPb response from an exception, and update acknowledgement type.
NOTE: Some exceptions are special cases and not necessarily errors. For example,
"""Create NuroPb response from an exception, and update acknowledgement type.
NOTE: Some exceptions are special cases and not necessarily errors. For example,
NuropbCallAgain and NuropbSuccess are not errors.
"""
(
Expand All @@ -298,15 +298,14 @@ def handle_execution_result(
logger.exception(result)

if service_message["nuropb_type"] in ("event", "command"):
"""There is no requirement to handle the response of instance._event_handler result, only to
positively acknowledge the event. There is also no requirements to handle the response of
"""There is no requirement to handle the response of instance._event_handler result, only to
positively acknowledge the event. There is also no requirements to handle the response of
a command, only to positively acknowledge the command.
"""
pass # Do nothing

if service_message["nuropb_type"] == "request":
"""Create NuroPb response from the service call result
"""
"""Create NuroPb response from the service call result"""
if isinstance(error, BaseException):
pyload_error = error_dict_from_exception(error)
if verbose:
Expand Down Expand Up @@ -366,11 +365,9 @@ def execute_request(

result = None
try:

payload = service_message["nuropb_payload"]

if service_message["nuropb_type"] == "event":

payload = service_message["nuropb_payload"]
topic = payload["topic"]
event = payload["event"]
Expand Down Expand Up @@ -405,7 +402,9 @@ def execute_request(
payload=payload,
exception=None,
)
handle_execution_result(service_message, exception_result, message_complete_callback)
handle_execution_result(
service_message, exception_result, message_complete_callback
)
return

try:
Expand Down Expand Up @@ -434,16 +433,19 @@ def execute_request(
else:
exception_result = err

handle_execution_result(service_message, exception_result, message_complete_callback)
handle_execution_result(
service_message, exception_result, message_complete_callback
)
return

if asyncio.isfuture(result) or asyncio.iscoroutine(result):

if is_future(result):
exception_result = ValueError(
"Tornado Future detected, please use asyncio.Future instead"
)
handle_execution_result(service_message, exception_result, message_complete_callback)
handle_execution_result(
service_message, exception_result, message_complete_callback
)
return

def future_done_callback(future: Awaitable[Any]) -> None:
Expand Down
4 changes: 1 addition & 3 deletions src/nuropb/encodings/encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ def new_symmetric_key(cls) -> bytes:
"""
return Fernet.generate_key()

def add_public_key(
self, service_name: str, public_key: rsa.RSAPublicKey
) -> None:
def add_public_key(self, service_name: str, public_key: rsa.RSAPublicKey) -> None:
"""Add a public key for a service
:param service_name: str
:param public_key: rsa.RSAPublicKey
Expand Down
13 changes: 7 additions & 6 deletions src/nuropb/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

logger = logging.getLogger(__name__)

NUROPB_VERSION = "0.1.7"
NUROPB_VERSION = "0.1.8"
NUROPB_PROTOCOL_VERSION = "0.1.1"
NUROPB_PROTOCOL_VERSIONS_SUPPORTED = ("0.1.1",)
NUROPB_MESSAGE_TYPES = (
Expand Down Expand Up @@ -260,14 +260,15 @@ class NuropbTimeoutError(NuropbException):

class NuropbTransportError(NuropbException):
"""NuropbTransportError: represents an error that inside the plumbing."""

_close_connection: bool

def __init__(
self,
description: Optional[str] = None,
payload: Optional[PayloadDict] = None,
exception: Optional[BaseException] = None,
close_connection: bool = False,
self,
description: Optional[str] = None,
payload: Optional[PayloadDict] = None,
exception: Optional[BaseException] = None,
close_connection: bool = False,
):
super().__init__(
description=description,
Expand Down
114 changes: 63 additions & 51 deletions src/nuropb/nuropb_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@
from uuid import uuid4

from nuropb.rmq_api import RMQAPI
from nuropb.rmq_lib import configure_nuropb_rmq, create_virtual_host, build_amqp_url, build_rmq_api_url, \
rmq_api_url_from_amqp_url
from nuropb.rmq_lib import (
configure_nuropb_rmq,
create_virtual_host,
build_amqp_url,
build_rmq_api_url,
rmq_api_url_from_amqp_url,
)

logger = logging.getLogger(__name__)


def default_connection_properties(connection_properties: Dict[str, Any]) -> Dict[str, Any]:
def default_connection_properties(
connection_properties: Dict[str, Any]
) -> Dict[str, Any]:
if "host" not in connection_properties:
connection_properties["host"] = "localhost"
if "username" not in connection_properties:
Expand All @@ -34,13 +41,13 @@ def default_connection_properties(connection_properties: Dict[str, Any]) -> Dict


def create_client(
name: Optional[str] = None,
instance_id: Optional[str] = None,
connection_properties: Optional[Dict[str, Any]] = None,
transport_settings: Optional[str | Dict[str, Any]] = None,
transport: Optional[RMQAPI] = RMQAPI,
name: Optional[str] = None,
instance_id: Optional[str] = None,
connection_properties: Optional[Dict[str, Any]] = None,
transport_settings: Optional[str | Dict[str, Any]] = None,
transport: Optional[RMQAPI] = RMQAPI,
) -> RMQAPI:
""" Create a client api instance for the nuropb service mesh. This caller of this function
"""Create a client api instance for the nuropb service mesh. This caller of this function
will have to implement the asyncio call to connect to the service mesh:
await client_api.connect()
Expand All @@ -53,11 +60,13 @@ def create_client(
"""

if connection_properties is None:
connection_properties = default_connection_properties({
"vhost": "nuropb",
"ssl": False,
"verify": False,
})
connection_properties = default_connection_properties(
{
"vhost": "nuropb",
"ssl": False,
"verify": False,
}
)
elif isinstance(connection_properties, dict):
connection_properties = default_connection_properties(connection_properties)

Expand All @@ -84,19 +93,21 @@ async def connect(instance_id: Optional[str] = None):


def configure_mesh(
mesh_name: Optional[str] = None,
connection_properties: Optional[Dict[str, Any]] = None,
transport_settings: Optional[str | Dict[str, Any]] = None,
mesh_name: Optional[str] = None,
connection_properties: Optional[Dict[str, Any]] = None,
transport_settings: Optional[str | Dict[str, Any]] = None,
):
if mesh_name is None:
mesh_name = "nuropb"

if connection_properties is None:
connection_properties = default_connection_properties({
"vhost": mesh_name,
"ssl": False,
"verify": False,
})
connection_properties = default_connection_properties(
{
"vhost": mesh_name,
"ssl": False,
"verify": False,
}
)

if isinstance(connection_properties, str):
amqp_url = connection_properties
Expand All @@ -115,9 +126,7 @@ def configure_mesh(
password = connection_properties["password"]
vhost = connection_properties["vhost"]

amqp_url = build_amqp_url(
host, port, username, password, vhost, rmq_scheme
)
amqp_url = build_amqp_url(host, port, username, password, vhost, rmq_scheme)
else:
raise ValueError("connection_properties must be a str or dict")

Expand Down Expand Up @@ -148,51 +157,52 @@ def configure_mesh(


class MeshService:
""" A generic service class that can be used to create a connection only service instance for the
"""A generic service class that can be used to create a connection only service instance for the
nuropb service mesh. This class could also be used as a template or to define a subclass for
creating a service instance.
"""

_service_name: str
_instance_id: str
_event_bindings: list[str]
_event_callback: Optional[Callable]

def __init__(
self,
service_name: str,
instance_id: Optional[str] = None,
event_bindings: Optional[list[str]] = None,
event_callback: Optional[Callable] = None,
self,
service_name: str,
instance_id: Optional[str] = None,
event_bindings: Optional[list[str]] = None,
event_callback: Optional[Callable] = None,
):
self._service_name = service_name
self._instance_id = instance_id or uuid4().hex
self._event_bindings = event_bindings or []
self._event_callback = event_callback

async def _handle_event_(
self,
topic: str,
event: dict,
target: list[str] | None = None,
context: dict | None = None,
trace_id: str | None = None,
self,
topic: str,
event: dict,
target: list[str] | None = None,
context: dict | None = None,
trace_id: str | None = None,
):
_ = self
if self._event_callback is not None:
await self._event_callback(topic, event, target, context, trace_id)


def create_service(
name: str,
instance_id: Optional[str] = None,
service_instance: Optional[object] = None,
connection_properties: Optional[Dict[str, Any]] = None,
transport_settings: Optional[str | Dict[str, Any]] = None,
transport: Optional[RMQAPI] = RMQAPI,
event_bindings: Optional[list[str]] = None,
event_callback: Optional[Callable] = None,
name: str,
instance_id: Optional[str] = None,
service_instance: Optional[object] = None,
connection_properties: Optional[Dict[str, Any]] = None,
transport_settings: Optional[str | Dict[str, Any]] = None,
transport: Optional[RMQAPI] = RMQAPI,
event_bindings: Optional[list[str]] = None,
event_callback: Optional[Callable] = None,
) -> RMQAPI:
""" Create a client api instance for the nuropb service mesh. This caller of this function
"""Create a client api instance for the nuropb service mesh. This caller of this function
will have to implement the asyncio call to connect to the service mesh:
await client_api.connect()
Expand All @@ -215,11 +225,13 @@ def create_service(
"""

if connection_properties is None:
connection_properties = default_connection_properties({
"vhost": "nuropb",
"ssl": False,
"verify": False,
})
connection_properties = default_connection_properties(
{
"vhost": "nuropb",
"ssl": False,
"verify": False,
}
)
elif isinstance(connection_properties, dict):
connection_properties = default_connection_properties(connection_properties)

Expand Down
Loading

0 comments on commit 828bbfa

Please sign in to comment.