Skip to content

Commit

Permalink
Merge pull request #6 from robertbetts/develop
Browse files Browse the repository at this point in the history
Improved logging and testing
  • Loading branch information
robertbetts authored Sep 5, 2023
2 parents e0c2d79 + 59e2cc0 commit 221f847
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 73 deletions.
4 changes: 2 additions & 2 deletions examples/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


async def make_request(api: RMQAPI):
service = "sandbox_service"
service = "sandbox"
method = "test_method"
params = {"param1": "value1"}
context = {"context1": "value1"}
Expand All @@ -28,7 +28,7 @@ async def make_request(api: RMQAPI):


async def make_command(api: RMQAPI):
service = "sandbox_service"
service = "sandbox"
method = "test_method"
params = {"param1": "value1"}
context = {"context1": "value1"}
Expand Down
10 changes: 5 additions & 5 deletions examples/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
async def main():
amqp_url = "amqp://guest:[email protected]:5672/sandbox"
api_url = "http://guest:guest@localhost:15672/api"
service_name = "sandbox_service"
service_name = "sandbox"
instance_id = uuid4().hex

transport_settings = dict(
Expand Down Expand Up @@ -40,10 +40,10 @@ async def main():
# port=2379,
# ),
)
await container.start()

fut = asyncio.Future()
await fut
started = await container.start()
if started:
fut = asyncio.Future()
await fut
logging.info("Server Done")


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "nuropb"
version = "0.1.1"
version = "0.1.2"
description = "NuroPb - A Distributed Event Driven Service Mesh"
authors = ["Robert Betts <[email protected]>"]
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion src/nuropb/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

logger = logging.getLogger(__name__)

NUROPB_VERSION = "0.1.1"
NUROPB_VERSION = "0.1.2"
NUROPB_PROTOCOL_VERSION = "0.1.0"
NUROPB_PROTOCOL_VERSIONS_SUPPORTED = ("0.1.0",)
NUROPB_MESSAGE_TYPES = (
Expand Down
19 changes: 2 additions & 17 deletions src/nuropb/rmq_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ def __init__(

transport_settings.update(
{
"service_name": service_name,
"instance_id": instance_id,
"service_name": self._service_name,
"instance_id": self._instance_id,
"amqp_url": amqp_url,
"message_callback": self.receive_transport_message,
"rpc_exchange": rpc_exchange,
Expand Down Expand Up @@ -151,21 +151,6 @@ async def disconnect(self) -> None:
"""
await self._transport.stop()

async def keep_loop_active(self) -> None: # pragma: no cover
"""keep_loop_active: keeps the asyncio loop active while the transport is connected
The factor for introducing this method is that during pytest, the asyncio loop
is not always running when expected. there is an 1 cost with this delay, that will
impact the performance of the service.
TODO: investigate this further and only activate this method when required.
:return:
"""
logger.info("keeping_loop_active() is starting")
while self.connected:
await asyncio.sleep(0.001)
logger.debug("keeping_loop_active() is stopping")

def receive_transport_message(
self,
service_message: TransportServicePayload,
Expand Down
72 changes: 29 additions & 43 deletions src/nuropb/rmq_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ class RMQTransport:
_channel: Channel | None
_consumer_tags: Set[Any]
_consuming: bool
_connecting: bool
_closing: bool
_connected: bool
_reconnect: bool
_was_consuming: bool

def __init__(
Expand Down Expand Up @@ -225,9 +225,9 @@ def __init__(
Experiment with larger values for higher throughput in your user case.
:param int default_ttl: The default time to live for messages in milliseconds, defaults to 12 hours.
"""
self._reconnect = True
self._connected = False
self._closing = False
self._connecting = False
self._was_consuming = False
self._consuming = False

Expand Down Expand Up @@ -426,7 +426,6 @@ async def stop(self) -> None:
IOLoop will be buffered but not processed.
"""
self._reconnect = False
if not self._closing:
logger.info("Stopping")
if self._consuming:
Expand Down Expand Up @@ -477,7 +476,7 @@ def connect(self) -> Awaitable[bool]:
}
)
self._connection = conn

self._connecting = True
return self._connected_future

def disconnect(self) -> Awaitable[bool]:
Expand Down Expand Up @@ -512,6 +511,7 @@ def on_connection_open(self, _connection: AsyncioConnection) -> None:
The connection
"""
logger.info("Connection opened - now opening channel")

self.open_channel()

def on_connection_open_error(
Expand All @@ -528,6 +528,10 @@ def on_connection_open_error(
if self._connected_future is not None and not self._connected_future.done():
self._connected_future.set_exception(err)

if self._connecting:
self._connecting = False
# self.connect()

def on_connection_closed(
self, _connection: AsyncioConnection, reason: Exception
) -> None:
Expand All @@ -540,23 +544,21 @@ def on_connection_closed(
connection.
"""
logger.warning("connection closed for reason %s", reason)
self._channel = None
if not self._closing:
logger.warning("Possible reconnect necessary")

if self._connected_future is not None and not self._connected_future.done():
self._connected_future.set_exception(
RuntimeError(f"Connection closed for reason: {reason}")
)

logger.warning("Connection closed. reason: %s", reason)
if (
self._disconnected_future is not None
and not self._disconnected_future.done()
):
self._disconnected_future.set_result(True)

self._connected = False
self._channel = None

if self._closing:
self._closing = False
else:
self._connecting = False
self.connect()

def open_channel(self) -> None:
"""Open a new channel with RabbitMQ by issuing the Channel.Open RPC command. When RabbitMQ
Expand Down Expand Up @@ -588,33 +590,16 @@ def on_channel_closed(self, channel: Channel, reason: Exception) -> None:
:param Exception reason: why the channel was closed
"""
if isinstance(reason, ChannelClosedByBroker):
logger.critical("Channel %i was closed by broker: %s", channel, reason)
if reason.reply_code == 404:
logger.error(
f"""\n\n
RabbitMQ channel closed by broker with reply_code: {reason.reply_code} and reply_text: {reason.reply_text}
This is usually caused by a misconfiguration of the RabbitMQ broker.
Please check the RabbitMQ broker configuration and restart the service:
RabbitMQ url: {obfuscate_credentials(self._amqp_url)}
Check that the following exchanges, queues and bindings exist:
Exchange: {self._rpc_exchange}
Exchange: {self._events_exchange}
Exchange: {self._dl_exchange}
Queue: {self._dl_queue}
Queue: {self._service_queue}
Queue: {self._response_queue}
Bindings: {self._rpc_bindings}
Bindings: {self._event_bindings}
\n\n"""
)
if self._connected_future and not self._connected_future.done():
self._connected_future.set_exception(
ServiceNotConfigured(
f"RabbitMQ not properly configured: {reason}"
)
)
logger.critical(
f"RabbitMQ channel {channel} closed by broker with reply_code: {reason.reply_code} "
f"and reply_text: {reason.reply_text}"
)
if self._connected_future and not self._connected_future.done():
self._connected_future.set_exception(Exception)
self._connecting = False

# investigate reasons and methods automatically reopen the channel.
# until a solution is found it will be important to monitor for this condition

def declare_service_queue(self) -> None:
"""Refresh the request queue on RabbitMQ by invoking the Queue.Declare RPC command. When it
Expand All @@ -640,7 +625,7 @@ def declare_service_queue(self) -> None:
)
else:
logger.info("Client only, not declaring request queue")
# TODO: FIXME: Check that passing None in here is OK
# Check that passing None in here is OK
self.on_bindok(None, userdata=self._response_queue)

def on_service_queue_declareok(
Expand Down Expand Up @@ -767,7 +752,7 @@ def on_basic_qos_ok(self, _frame: pika.frame.Method) -> None:

# Start consuming the requests queue
if not self._client_only:
logger.info("Consuming Requests, Events and Commands")
logger.info("Ready to consume requests, events and commands")
self._consumer_tags.add(
self._channel.basic_consume(
on_message_callback=functools.partial(
Expand All @@ -782,6 +767,7 @@ def on_basic_qos_ok(self, _frame: pika.frame.Method) -> None:

if self._connected_future:
self._connected_future.set_result(True)
self._connecting = False
self._connected = True

def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None:
Expand Down
14 changes: 13 additions & 1 deletion src/nuropb/service_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from etcd3.stateful.lease import Lease
from etcd3.stateful.watch import Watcher, Event
from etcd3.stateful.transaction import Txn
from requests.exceptions import HTTPError
from pika.exceptions import AMQPConnectionError

from nuropb.rmq_api import RMQAPI
from nuropb.rmq_lib import configure_nuropb_rmq, create_virtual_host
Expand Down Expand Up @@ -401,19 +403,29 @@ async def startup_steps(self) -> None:

await self._instance.connect()

async def start(self) -> None:
async def start(self) -> bool:
"""start: starts the container service instance.
- primary entry point to start the service container.
:return: None
"""
started = False
try:
await self.startup_steps()
logger.info("Container startup complete")
started = True
except AMQPConnectionError as err:
logger.error(f"Startup error connecting to RabbitMQ: {err}")
except HTTPError as err:
logger.error(f"HTTP error calling RabbitMQ API: {err}")
except ValueError as err:
logger.error(f"Startup error, likely due to miss configuration: {err}")
except (asyncio.CancelledError, Exception) as err:
if isinstance(err, asyncio.CancelledError):
logger.info(f"container running future cancelled: {err}")
else:
logger.exception(f"Container running future runtime exception: {err}")
finally:
return started

async def stop(self) -> None:
"""stop: stops the container service instance.
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_rmq_url(test_settings):

create_virtual_host(api_url, rmq_url)

def message_callback(*args, **kwargs):
def message_callback(*args, **kwargs): # pragma: no cover
pass

transport_settings = dict(
Expand Down Expand Up @@ -132,7 +132,7 @@ def test_rmq_url_static(test_settings):

create_virtual_host(api_url, rmq_url)

def message_callback(*args, **kwargs):
def message_callback(*args, **kwargs): # pragma: no cover
pass

transport_settings = dict(
Expand Down
34 changes: 33 additions & 1 deletion tests/test_service_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

# @pytest.mark.skip
@pytest.mark.asyncio
async def test_rmq_api_service_mode(test_settings, test_rmq_url, test_api_url):
async def test_rmq_api_client_mode(test_settings, test_rmq_url, test_api_url):
instance_id = uuid4().hex
transport_settings = dict(
dl_exchange=test_settings["dl_exchange"],
Expand All @@ -34,8 +34,40 @@ async def test_rmq_api_service_mode(test_settings, test_rmq_url, test_api_url):
port=2379,
),
)
# must resolved the testing issue on github actions
# await container.start()

@pytest.mark.asyncio
async def test_rmq_api_service_mode(test_settings, test_rmq_url, test_api_url, service_instance):
instance_id = uuid4().hex
transport_settings = dict(
dl_exchange=test_settings["dl_exchange"],
rpc_bindings=test_settings["rpc_bindings"],
event_bindings=test_settings["event_bindings"],
prefetch_count=test_settings["prefetch_count"],
default_ttl=test_settings["default_ttl"],
)
rmq_api = RMQAPI(
service_name=test_settings["service_name"],
service_instance=service_instance,
instance_id=instance_id,
amqp_url=test_rmq_url,
rpc_exchange=test_settings["rpc_exchange"],
events_exchange=test_settings["events_exchange"],
transport_settings=transport_settings,
)
container = ServiceContainer(
rmq_api_url=test_api_url,
instance=rmq_api,
etcd_config=dict(
host="localhost",
port=2379,
),
)
# must resolved the testing issue on github actions
# await container.start()



@pytest.mark.asyncio
async def test_rmq_api_service_mode_no_etcd(test_settings, test_rmq_url, test_api_url):
Expand Down

0 comments on commit 221f847

Please sign in to comment.