Skip to content

Commit

Permalink
Merge pull request #7 from robertbetts/develop
Browse files Browse the repository at this point in the history
Context Managers, service mesh decorators and Encryption
  • Loading branch information
robertbetts authored Sep 19, 2023
2 parents 221f847 + 80dc750 commit ee19c74
Show file tree
Hide file tree
Showing 35 changed files with 2,572 additions and 678 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# NuroPb

## A Distributed Event Driven Service Mesh
## The neural plumbing for an Asynchronous, Distributed, Event Driven Service Mesh

[![codecov](https://codecov.io/gh/robertbetts/nuropb/branch/main/graph/badge.svg?token=DVSBZY794D)](https://codecov.io/gh/robertbetts/nuropb)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![CodeFactor](https://www.codefactor.io/repository/github/robertbetts/nuropb/badge)](https://www.codefactor.io/repository/github/robertbetts/nuropb)
[![License: MIT](https://img.shields.io/pypi/l/giteo)](https://www.apache.org/licenses/LICENSE-2.0.txt)
[![License: Apache 2.0](https://img.shields.io/pypi/l/giteo)](https://www.apache.org/licenses/LICENSE-2.0.txt)

You have a Python class that you want to make available as a service to consumers.
* You potentially want to scale this service horizontally many times over, likely at an unknown scale.
Expand Down Expand Up @@ -36,13 +36,14 @@ and ordered event streaming over Kafka. Kafka has also proved a great tool for a
messages.

Where does the name come from? NuroPb is a contraction of the word neural and the scientific symbol for Lead. Lead
associated with plumbing. So NuroPb is a a system's neural plumbing framework.
associated with plumbing. So NuroPb is a system's neural plumbing framework.

## Getting started

Install the Python package
```
pip install nuropb
```




37 changes: 25 additions & 12 deletions examples/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@

async def make_request(api: RMQAPI):
service = "sandbox"
method = "test_method"
# method = "test_method"
method = "test_encrypt_method"
params = {"param1": "value1"}
context = {"context1": "value1"}
ttl = 60 * 30 * 1000
trace_id = uuid4().hex
encrypted = await api.requires_encryption(service, method)
response = await api.request(
service=service,
method=method,
params=params,
context=context,
ttl=ttl,
trace_id=trace_id,
encrypted=encrypted,
)
return response == f"response from {service}.{method}"

Expand All @@ -34,13 +37,15 @@ async def make_command(api: RMQAPI):
context = {"context1": "value1"}
ttl = 60 * 30 * 1000
trace_id = uuid4().hex
encrypted = await api.requires_encryption(service, method)
api.command(
service=service,
method=method,
params=params,
context=context,
ttl=ttl,
trace_id=trace_id,
encrypted=encrypted,
)


Expand All @@ -61,16 +66,24 @@ async def main():
amqp_url = "amqp://guest:[email protected]:5672/sandbox"
api = RMQAPI(
amqp_url=amqp_url,
transport_settings={
"prefetch_count": 1,
}
)
await api.connect()

total_seconds = 0
total_sample_count = 0

batch_size = 500
batch_size = 10000
number_of_batches = 5
ioloop = asyncio.get_event_loop()

service = "sandbox"
method = "test_method"
encrypted = await api.requires_encryption(service, method)
logger.info(f"encryption is : {encrypted}")

for _ in range(number_of_batches):
start_time = datetime.datetime.utcnow()
logger.info(f"Starting: {batch_size} at {start_time}")
Expand All @@ -82,15 +95,15 @@ async def main():
loop_batch_size += batch_size
# logger.info(f"Request complete: {result[0]}")

tasks = [ioloop.create_task(make_command(api)) for _ in range(batch_size)]
logger.info("Waiting for command tasks to complete")
await asyncio.wait(tasks)
loop_batch_size += batch_size
# tasks = [ioloop.create_task(make_command(api)) for _ in range(batch_size)]
# logger.info("Waiting for command tasks to complete")
# await asyncio.wait(tasks)
# loop_batch_size += batch_size

tasks = [ioloop.create_task(publish_event(api)) for _ in range(batch_size)]
logger.info("Waiting for publish tasks to complete")
await asyncio.wait(tasks)
loop_batch_size += batch_size
# tasks = [ioloop.create_task(publish_event(api)) for _ in range(batch_size)]
# logger.info("Waiting for publish tasks to complete")
# await asyncio.wait(tasks)
# loop_batch_size += batch_size

end_time = datetime.datetime.utcnow()
time_taken = end_time - start_time
Expand All @@ -104,8 +117,8 @@ async def main():
total_seconds,
total_sample_count / total_seconds,
)
fut = asyncio.Future()
await fut
# fut = asyncio.Future()
# await fut
logging.info("Client Done")


Expand Down
30 changes: 30 additions & 0 deletions examples/server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import logging
import asyncio
from uuid import uuid4
import os
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend

from nuropb.rmq_api import RMQAPI
from nuropb.service_runner import ServiceContainer
Expand All @@ -15,14 +19,40 @@ async def main():
service_name = "sandbox"
instance_id = uuid4().hex

""" load private_key and create one if it done not exist
"""
primary_key_filename = "key.pem"
private_key = None
if os.path.exists(primary_key_filename):
with open(primary_key_filename, "rb") as key_file:
private_key = serialization.load_pem_private_key(
data=key_file.read(),
backend=default_backend(),
password=None,
)
if private_key is None:
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
primary_key_data: bytes = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open(primary_key_filename, "wt") as f:
f.write(primary_key_data.decode("utf-8"))


transport_settings = dict(
rpc_bindings=[service_name],
event_bindings=[],
prefetch_count=1,
)

service_example = ServiceExample(
service_name=service_name,
instance_id=instance_id,
private_key=private_key,
)

api = RMQAPI(
Expand Down
21 changes: 18 additions & 3 deletions examples/service_example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
from typing import List
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend


from nuropb.contexts.describe import publish_to_mesh
from nuropb.interface import NuropbException, NuropbSuccess, NuropbCallAgain, EventType


Expand All @@ -11,11 +13,13 @@
class ServiceExample:
_service_name: str
_instance_id: str
_private_key: rsa.RSAPrivateKey
_method_call_count: int

def __init__(self, service_name: str, instance_id: str):
def __init__(self, service_name: str, instance_id: str, private_key: rsa.RSAPrivateKey):
self._service_name = service_name
self._instance_id = instance_id
self._private_key = private_key
self._method_call_count = 0

@classmethod
Expand All @@ -30,10 +34,21 @@ def _handle_event_(
_ = target, context, trace_id
logger.debug(f"Received event {topic}:{event}")

@publish_to_mesh(requires_encryption=False)
def test_method(self, **kwargs) -> str:
self._method_call_count += 1

success_result = f"response from {self._service_name}.test_method"
return success_result

@publish_to_mesh(requires_encryption=True)
def test_encrypt_method(self, **kwargs) -> str:
self._method_call_count += 1
success_result = f"response from {self._service_name}.test_encrypt_method"
return success_result

def test_exception_method(self, **kwargs) -> str:
self._method_call_count += 1
success_result = f"response from {self._service_name}.test_exception_method"

if self._method_call_count % 400 == 0:
events: List[EventType] = [
Expand Down
Loading

0 comments on commit ee19c74

Please sign in to comment.