From 46d5f9de768e9ccc974cebed487e1084aaee365d Mon Sep 17 00:00:00 2001 From: Bert Kleewein Date: Mon, 21 Nov 2022 11:37:41 -0800 Subject: [PATCH 1/5] pulled from main --- longhaul/longhaul.py | 553 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 553 insertions(+) create mode 100644 longhaul/longhaul.py diff --git a/longhaul/longhaul.py b/longhaul/longhaul.py new file mode 100644 index 000000000..9ed584407 --- /dev/null +++ b/longhaul/longhaul.py @@ -0,0 +1,553 @@ +# st ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +import os +import sys +import asyncio +import logging +import logging.handlers +import functools +import json +import random +import dataclasses +import time +import datetime +import gc +import collections +import glob +from blessings import Terminal +from azure.iot.device.aio import IoTHubDeviceClient +from azure.iot.device import Message, X509 + +DEVICE_ID = os.environ["IOTHUB_DEVICE_ID"] + +USE_WEBSOCKETS = True if os.environ.get("IOTHUB_WEBSOCKETS", False) else False + +# Maximum number of seconds between reconnect retries +MAX_WAIT_TIME_BETWEEN_RECONNECT_ATTEMPTS = 20 + +# How long to sleep between telemetry sends +MESSAGE_SEND_SLEEP_TIME = 1 + +# How often do we start taking heap snapshots, in seconds +HEAP_HISTORY_STARTING_INTERVAL = 10 + +# How many heap counts do we keep in the history? +HEAP_HISTORY_LENGTH = 4 + +# Interval, in seconds, for updating the display +DISPLAY_INTERVAL = 1 + +# Interval for rotating logs, in seconds +LOG_ROTATION_INTERVAL = 3600 + +# How many logs to keep before recycling +LOG_BACKUP_COUNT = 6 + +# Directory for storing log files +LOG_DIRECTORY = "./logs/{}".format(DEVICE_ID) + +# Prepare the log directory +os.makedirs(LOG_DIRECTORY, exist_ok=True) +for filename in glob.glob("{}/*.log".format(LOG_DIRECTORY)): + os.remove(filename) + + +log_formatter = logging.Formatter( + "%(asctime)s %(levelname)-5s (%(threadName)s) %(filename)s:%(funcName)s():%(message)s" +) + +paho_log_handler = logging.handlers.TimedRotatingFileHandler( + filename="{}/paho.log".format(LOG_DIRECTORY), + when="S", + interval=LOG_ROTATION_INTERVAL, + backupCount=LOG_BACKUP_COUNT, +) +paho_log_handler.setLevel(level=logging.DEBUG) +paho_log_handler.setFormatter(log_formatter) + +info_log_handler = logging.handlers.TimedRotatingFileHandler( + filename="{}/info.log".format(LOG_DIRECTORY), + when="S", + interval=LOG_ROTATION_INTERVAL, + backupCount=LOG_BACKUP_COUNT, +) +info_log_handler.setLevel(level=logging.INFO) +info_log_handler.setFormatter(log_formatter) + +debug_log_handler = logging.handlers.TimedRotatingFileHandler( + filename="{}/debug.log".format(LOG_DIRECTORY), + when="S", + interval=LOG_ROTATION_INTERVAL, + backupCount=LOG_BACKUP_COUNT, +) +debug_log_handler.setLevel(level=logging.DEBUG) +debug_log_handler.setFormatter(log_formatter) + +longhaul_log_handler = logging.FileHandler(filename="{}/longhaul.log".format(LOG_DIRECTORY)) +longhaul_log_handler.setLevel(level=logging.DEBUG) +longhaul_log_handler.setFormatter(log_formatter) + +root_logger = logging.getLogger() +root_logger.setLevel(level=logging.DEBUG) +root_logger.addHandler(info_log_handler) +root_logger.addHandler(debug_log_handler) + +paho_logger = logging.getLogger("paho") +paho_logger.addHandler(paho_log_handler) + + +logger = logging.getLogger(__name__) +logger.addHandler(longhaul_log_handler) + +term = Terminal() + +try: + # Copy Paho so time deltas work + time_func = time.monotonic +except AttributeError: + time_func = time.time + + +@dataclasses.dataclass +class HeapHistoryItem(object): + time: str + object_count: int + + +@dataclasses.dataclass(order=True) +class HeapHistoryStatus(object): + snapshot_interval: int + next_heap_snapshot: int + history: list + + def __init__(self): + super(HeapHistoryStatus, self).__init__() + self.snapshot_interval = HEAP_HISTORY_STARTING_INTERVAL + self.next_heap_snapshot = time_func() + self.snapshot_interval + self.history = [] + + +@dataclasses.dataclass(order=True) +class ReconnectStatus(object): + connect_loop_status: str = "new" + pipeline_connection_status: str = "" + + connect_count: int = 0 + disconnect_count: int = 0 + max_disconencted_time: int = 0 + + last_connect_time: int = 0 + last_disconnect_time: int = 0 + time_since_last_connect: str = "" + time_since_last_disconnect: str = "" + + +@dataclasses.dataclass(order=True) +class ExceptionStatus(object): + connect_exceptions: dict = dataclasses.field(default_factory=dict) + send_exceptions: dict = dataclasses.field(default_factory=dict) + + +@dataclasses.dataclass(order=True) +class PahoStatus(object): + time_since_last_paho_traffic_in: str = "" + time_since_last_paho_traffic_out: str = "" + client_object_id: int = 0 + thread_name: str = "" + thread_is_alive: bool = False + len_out_mesage_queue: int = 0 + len_in_message_queue: int = 0 + len_out_pakcet_queue: int = 0 + thread_terminate: bool = False + connection_state: int = 0 + + def to_dict(self): + return dataclasses.asdict(self) + + +@dataclasses.dataclass(order=True) +class PahoConfig(object): + transport: str = "" + protocol: str = "" + keepalive: int = 0 + connect_timeout: int = 0 + reconnect_on_failure: bool = False + reconnect_delay_min: int = 0 + reconnect_delay_max: int = 0 + host: str = "" + port: int = 0 + proxy_args: dict = dataclasses.field(default_factory=dict) + socket_class: str = "" + socket_name: str = "" + + def to_dict(self): + return dataclasses.asdict(self) + + +@dataclasses.dataclass(order=True) +class IoTHubClientConfig(object): + pass + + +@dataclasses.dataclass(order=True) +class IoTHubClientStatus(object): + pass + + +@dataclasses.dataclass(order=True) +class SendMessageStatus(object): + messages_sent: int = 0 + messages_queued: int = 0 + + last_message_sent_time: int = 0 + time_since_last_message_sent: str = "" + + +@dataclasses.dataclass(order=True) +class ClientStatus(object): + reconnect: ReconnectStatus + exception: ExceptionStatus + paho_status: PahoStatus + paho_config: PahoConfig + send_message: SendMessageStatus + heap_history: HeapHistoryStatus + + start_time: int = 0 + time_since_start: str = "" + device_id: str = DEVICE_ID + websockets: bool = USE_WEBSOCKETS + + def __init__(self): + super(ClientStatus, self).__init__() + self.reconnect = ReconnectStatus() + self.exception = ExceptionStatus() + self.paho_status = PahoStatus() + self.paho_config = PahoConfig() + self.send_message = SendMessageStatus() + self.heap_history = HeapHistoryStatus() + + +def wrap_in_try_catch(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception as e: + logger.error( + "Exception in {}: {}".format(func.__name__, get_type_name(e)), exc_info=True + ) + raise e + + return wrapper + + +def get_type_name(e): + return type(e).__name__ + + +def get_paho_from_device_client(device_client): + pipeline_root = device_client._mqtt_pipeline._pipeline + stage = pipeline_root + while stage.next: + stage = stage.next + return stage.transport._mqtt_client + + +def get_paho_config(paho_client): + config = PahoConfig() + config.transport = paho_client._transport + config.protocol = str(paho_client._protocol) + config.keepalive = paho_client._keepalive + config.connect_timeout = paho_client._connect_timeout + config.reconnect_on_failure = paho_client._reconnect_on_failure + config.reconnect_delay_min = paho_client._reconnect_min_delay + config.reconnect_delay_max = paho_client._reconnect_max_delay + config.host = paho_client._host + config.port = paho_client._port + config.proxy_args = paho_client._proxy + config.socket_class = str(type(paho_client.socket())) + config.socket_name = ( + str(paho_client.socket().getsockname()) if paho_client.socket() else "No socket" + ) + return config + + +def format_time_delta(s): + if s: + return str(datetime.timedelta(seconds=time_func() - s)) + else: + return "infinity" + + +def get_paho_status(paho_client): + status = PahoStatus() + + status.time_since_last_paho_traffic_in = format_time_delta(paho_client._last_msg_in) + status.time_since_last_paho_traffic_out = format_time_delta(paho_client._last_msg_out) + + status.client_object_id = id(paho_client) + status.thread_name = paho_client._thread.name if paho_client._thread else "None" + status.thread_is_alive = ( + str(paho_client._thread.is_alive()) if paho_client._thread else "No thread" + ) + status.len_out_mesage_queue = len(paho_client._out_messages) + status.len_in_message_queue = len(paho_client._in_messages) + status.len_out_pakcet_queue = len(paho_client._out_packet) + status.thread_terminate = paho_client._thread_terminate + status.connection_state = paho_client._state + + return status + + +class Client(object): + async def init(self): + self.device_client = None + + self.outgoing_message_queue = asyncio.Queue() + + self.disconnected_event = asyncio.Event() + self.connected_event = asyncio.Event() + self.exit_app_event = asyncio.Event() + + self.disconnected_event.set() + self.status = ClientStatus() + + self.first_connect = True + + @wrap_in_try_catch + async def send_message_loop(self): + + while True: + await self.connected_event.wait() + next_message = await self.outgoing_message_queue.get() + self.status.send_message.messages_queued = self.outgoing_message_queue.qsize() + try: + await self.device_client.send_message(next_message) + self.status.send_message.last_message_sent_time = time_func() + except Exception as e: + t = get_type_name(e) + self.status.exception.send_exceptions[t] = ( + self.status.exception.send_exceptions.get(t, 0) + 1 + ) + await self.outgoing_message_queue.put(next_message) + else: + self.status.send_message.messages_sent += 1 + # TODO: queue here + + @wrap_in_try_catch + async def queue_message_loop(self): + messageId = 0 + while True: + messageId += 1 + message = Message( + json.dumps( + {"messageId": messageId, "message": "This is message #{}".format(messageId)} + ) + ) + await self.outgoing_message_queue.put(message) + self.status.send_message.messages_queued = self.outgoing_message_queue.qsize() + await asyncio.sleep(MESSAGE_SEND_SLEEP_TIME) + if self.exit_app_event.is_set(): + return + + @wrap_in_try_catch + async def reconnect_loop(self): + while True: + done, pending = await asyncio.wait( + [ + self.disconnected_event.wait(), + self.exit_app_event.wait(), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + await asyncio.gather(*done) + [x.cancel() for x in pending] + + if self.exit_app_event.is_set(): + self.status.reconnect.connect_loop_status = "exiting while connected" + return + + if self.first_connect: + sleep_time = 0 + self.first_connect = False + else: + sleep_time = random.random() * MAX_WAIT_TIME_BETWEEN_RECONNECT_ATTEMPTS + + self.status.reconnect.connect_loop_status = ( + "disconencted. waiting for {} seconds".format(round(sleep_time, 2)) + ) + + done, pending = await asyncio.wait( + [ + self.connected_event.wait(), + self.exit_app_event.wait(), + asyncio.sleep(sleep_time), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + await asyncio.gather(*done) + [x.cancel() for x in pending] + + if self.exit_app_event.is_set(): + self.status.reconnect.connect_loop_status = "exiting while disconnected" + return + + if self.device_client.connected: + self.status.reconnect.connect_loop_status = "connected" + else: + try: + self.status.reconnect.connect_loop_status = "connecting" + await self.device_client.connect() + self.status.reconnect.connect_loop_status = "connected" + except Exception as e: + t = get_type_name(e) + self.status.reconnect.connect_loop_status = "connect exception {}".format(t) + self.status.exception.connect_exceptions[t] = ( + self.status.exception.connect_exceptions.get(t, 0) + 1 + ) + + @property + def paho(self): + return get_paho_from_device_client(self.device_client) + + @wrap_in_try_catch + async def display_loop(self): + last_heap_counts = collections.Counter({}) + + while True: + + self.status.time_since_start = format_time_delta(self.status.start_time) + + self.status.paho_config = get_paho_config(self.paho) + self.status.paho_status = get_paho_status(self.paho) + + self.status.reconnect.time_since_last_connect = format_time_delta( + self.status.reconnect.last_connect_time + ) + self.status.reconnect.time_since_last_disconnect = format_time_delta( + self.status.reconnect.last_disconnect_time + ) + self.status.send_message.time_since_last_message_sent = format_time_delta( + self.status.send_message.last_message_sent_time + ) + + if time_func() >= self.status.heap_history.next_heap_snapshot: + gc.collect(2) + + self.status.heap_history.snapshot_interval *= 2 + self.status.heap_history.next_heap_snapshot = ( + time_func() + self.status.heap_history.snapshot_interval + ) + self.status.heap_history.history.append( + HeapHistoryItem( + time=str(datetime.datetime.now()), + object_count=len(gc.get_objects()), + ) + ) + self.status.heap_history.history = self.status.heap_history.history[ + -HEAP_HISTORY_LENGTH: + ] + + logger.info( + "Current Status: {}".format( + json.dumps(dataclasses.asdict(self.status), indent=2) + ) + ) + + heap_counts = collections.Counter([type(x).__name__ for x in gc.get_objects()]) + delta = collections.Counter(heap_counts) + delta.subtract(last_heap_counts) + + for key in list(delta.keys()): + if delta[key] == 0: + del delta[key] + + logger.info("Heap Delta: {}".format(json.dumps(delta, indent=2))) + last_heap_counts = heap_counts + + print(term.clear()) + print(json.dumps(dataclasses.asdict(self.status), indent=2)) + + done, pending = await asyncio.wait( + [ + self.exit_app_event.wait(), + asyncio.sleep(DISPLAY_INTERVAL), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + await asyncio.gather(*done) + [x.cancel() for x in pending] + + if self.exit_app_event.is_set(): + return + + @wrap_in_try_catch + async def handle_connection_state_change(self): + if self.device_client.connected: + self.status.reconnect.connect_count += 1 + self.status.reconnect.last_connect_time = time_func() + self.status.reconnect.pipeline_connection_status = "connected" + self.disconnected_event.clear() + self.connected_event.set() + else: + self.status.reconnect.disconnect_count += 1 + self.status.reconnect.last_disconnect_time = time_func() + self.status.reconnect.pipeline_connection_status = "disconnected" + self.disconnected_event.set() + self.connected_event.clear() + + async def main(self): + # Make sure this was run with `python -X dev longhaul.py`. + if not sys.flags.dev_mode: + print("please re-run with -X dev command line arguments") + sys.exit(1) + + await self.init() + + self.status.start_time = time_func() + + if "IOTHUB_DEVICE_CERT" in os.environ: + self.device_client = IoTHubDeviceClient.create_from_x509_certificate( + device_id=os.environ["IOTHUB_DEVICE_ID"], + hostname=os.environ["IOTHUB_HOSTNAME"], + x509=X509( + cert_file=os.environ["IOTHUB_DEVICE_CERT"], + key_file=os.environ["IOTHUB_DEVICE_KEY"], + ), + websockets=USE_WEBSOCKETS, + ) + else: + conn_str = os.getenv("IOTHUB_DEVICE_CONNECTION_STRING") + self.device_client = IoTHubDeviceClient.create_from_connection_string( + conn_str, websockets=USE_WEBSOCKETS + ) + self.device_client.on_connection_state_change = self.handle_connection_state_change + + tasks = [ + self.send_message_loop(), + self.queue_message_loop(), + self.reconnect_loop(), + self.display_loop(), + ] + + try: + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + await asyncio.gather(*done) + except Exception as e: + logger.error("Exception in main loop: {}".format(get_type_name(e))) + finally: + logger.warning("Exiting app") + self.exit_app_event.set() + logger.info("Waiting for all coroutines to exit") + await asyncio.wait_for( + asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED), timeout=5 + ) + await self.device_client.shutdown() + + +if __name__ == "__main__": + asyncio.run(Client().main()) From 86d6df692618e87217706f44a36e414c73c4972c Mon Sep 17 00:00:00 2001 From: Bert Kleewein Date: Mon, 21 Nov 2022 14:50:45 -0800 Subject: [PATCH 2/5] add iothub config --- longhaul/.gitignore | 1 + longhaul/longhaul.py | 65 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 longhaul/.gitignore diff --git a/longhaul/.gitignore b/longhaul/.gitignore new file mode 100644 index 000000000..9d5389b73 --- /dev/null +++ b/longhaul/.gitignore @@ -0,0 +1 @@ +logs/**/* diff --git a/longhaul/longhaul.py b/longhaul/longhaul.py index 9ed584407..6ad0ae9d3 100644 --- a/longhaul/longhaul.py +++ b/longhaul/longhaul.py @@ -190,12 +190,27 @@ def to_dict(self): @dataclasses.dataclass(order=True) class IoTHubClientConfig(object): - pass + client_class: str = "" + server_verification_cert: bool = False + gateway_hostname: str = "" + websockets: bool = False + cipher: str = "" + product_info: str = "" + proxy_options: dict = dataclasses.field(default_factory=dict) + sastoken_ttl: int = 0 + keep_alive: int = 0 + connection_retry: bool = False + connection_retry_interval: int = 0 + device_id: str = "" + module_id: str = "" + x509: bool = False + sastoken_class: str = "" + sastoken_signing_mechanism_class: str = "" @dataclasses.dataclass(order=True) class IoTHubClientStatus(object): - pass + connection_state: str = "" @dataclasses.dataclass(order=True) @@ -213,6 +228,8 @@ class ClientStatus(object): exception: ExceptionStatus paho_status: PahoStatus paho_config: PahoConfig + iothub_client_status: IoTHubClientStatus + iothub_client_config: IoTHubClientConfig send_message: SendMessageStatus heap_history: HeapHistoryStatus @@ -227,6 +244,8 @@ def __init__(self): self.exception = ExceptionStatus() self.paho_status = PahoStatus() self.paho_config = PahoConfig() + self.iothub_client_status = IoTHubClientStatus() + self.iothub_client_config = IoTHubClientConfig() self.send_message = SendMessageStatus() self.heap_history = HeapHistoryStatus() @@ -303,6 +322,42 @@ def get_paho_status(paho_client): return status +def get_iothub_client_config(iothub_client): + internal_config_object = iothub_client._mqtt_pipeline._nucleus.pipeline_configuration + config = IoTHubClientConfig() + + config.client_class = str(type(iothub_client)) + config.server_verification_cert = ( + True if internal_config_object.server_verification_cert else False + ) + config.gateway_hostname = internal_config_object.gateway_hostname + config.websockets = internal_config_object.websockets + config.cipher = str(internal_config_object.cipher) + config.product_info = internal_config_object.product_info + config.proxy_options = internal_config_object.proxy_options + config.keep_alive = internal_config_object.keep_alive + config.connection_retry = internal_config_object.connection_retry + config.connection_retry_interval = internal_config_object.connection_retry_interval + config.device_id = internal_config_object.device_id + config.module_id = internal_config_object.module_id + config.x509 = True if internal_config_object.x509 else False + sastoken = internal_config_object.sastoken + config.sastoken_ttl = sastoken.ttl if sastoken else 0 + config.sastoken_class = str(type(sastoken)) if sastoken else None + config.sastoken_signing_mechanism_class = ( + str(type(sastoken._signing_mechanism)) if sastoken and sastoken._signing_mechanism else None + ) + + return config + + +def get_iothub_client_status(iothub_client): + status = IoTHubClientStatus() + nucleus = iothub_client._mqtt_pipeline._nucleus + status.connection_state = str(nucleus.connection_state) + return status + + class Client(object): async def init(self): self.device_client = None @@ -424,6 +479,8 @@ async def display_loop(self): self.status.paho_config = get_paho_config(self.paho) self.status.paho_status = get_paho_status(self.paho) + self.status.iothub_client_config = get_iothub_client_config(self.device_client) + self.status.iothub_client_status = get_iothub_client_status(self.device_client) self.status.reconnect.time_since_last_connect = format_time_delta( self.status.reconnect.last_connect_time @@ -470,6 +527,10 @@ async def display_loop(self): last_heap_counts = heap_counts print(term.clear()) + if time_func() - self.status.start_time > 50: + self.status.paho_config = None + self.status.iothub_client_config = None + print(json.dumps(dataclasses.asdict(self.status), indent=2)) done, pending = await asyncio.wait( From 2b29f1da3a0c80e40e391ceb7a37a7eb432ada17 Mon Sep 17 00:00:00 2001 From: Bert Kleewein Date: Mon, 21 Nov 2022 15:33:12 -0800 Subject: [PATCH 3/5] paho stats --- .../azure/iot/device/common/mqtt_transport.py | 74 +++++++++++++++++-- longhaul/longhaul.py | 21 ++++-- 2 files changed, 84 insertions(+), 11 deletions(-) diff --git a/azure-iot-device/azure/iot/device/common/mqtt_transport.py b/azure-iot-device/azure/iot/device/common/mqtt_transport.py index 7ee90b84d..0c5185f4e 100644 --- a/azure-iot-device/azure/iot/device/common/mqtt_transport.py +++ b/azure-iot-device/azure/iot/device/common/mqtt_transport.py @@ -14,6 +14,7 @@ from . import transport_exceptions as exceptions from enum import Enum import socks +import dataclasses logger = logging.getLogger(__name__) @@ -49,6 +50,43 @@ } +@dataclasses.dataclass +class TransportStats(object): + connect_rc_codes: dict = dataclasses.field(default_factory=dict) + on_connect_rc_codes: dict = dataclasses.field(default_factory=dict) + connect_exceptions: dict = dataclasses.field(default_factory=dict) + + disconnect_rc_codes: dict = dataclasses.field(default_factory=dict) + on_disconnect_rc_codes: dict = dataclasses.field(default_factory=dict) + disconnect_exceptions: dict = dataclasses.field(default_factory=dict) + + publish_rc_codes: dict = dataclasses.field(default_factory=dict) + publish_exceptions: dict = dataclasses.field(default_factory=dict) + + subscribe_rc_codes: dict = dataclasses.field(default_factory=dict) + subscribe_exceptions: dict = dataclasses.field(default_factory=dict) + + unsubscribe_rc_codes: dict = dataclasses.field(default_factory=dict) + unsubscribe_exceptions: dict = dataclasses.field(default_factory=dict) + + count_message_received: int = 0 + + count_subscribe: int = 0 + count_suback: int = 0 + + count_unsubscribe: int = 0 + count_unsuback: int = 0 + + count_publish: int = 0 + count_puback: int = 0 + + +def add_count_to_dict(dikt, key): + if isinstance(key, Exception): + key = type(key).__name__ + dikt[key] = dikt.get(key, 0) + 1 + + def _create_error_from_connack_rc_code(rc): """ Given a paho CONNACK rc code, return an Exception that can be raised @@ -131,6 +169,8 @@ def __init__( self._mqtt_client = self._create_mqtt_client() + self.stats = TransportStats() + def _create_mqtt_client(self): """ Create the MQTT client object and assign all necessary event handler callbacks. @@ -176,6 +216,8 @@ def on_connect(client, userdata, flags, rc): this = self_weakref() logger.info("connected with result code: {}".format(rc)) + add_count_to_dict(this.stats.on_connect_rc_codes, rc) + if rc: # i.e. if there is an error if this.on_mqtt_connection_failure_handler: try: @@ -204,6 +246,8 @@ def on_disconnect(client, userdata, rc): this = self_weakref() logger.info("disconnected with result code: {}".format(rc)) + add_count_to_dict(this.stats.on_disconnect_rc_codes, rc) + cause = None if rc: # i.e. if there is an error logger.debug("".join(traceback.format_stack())) @@ -239,6 +283,7 @@ def on_disconnect(client, userdata, rc): def on_subscribe(client, userdata, mid, granted_qos): this = self_weakref() logger.info("suback received for {}".format(mid)) + this.stats.count_suback += 1 # subscribe failures are returned from the subscribe() call. This is just # a notification that a SUBACK was received, so there is no failure case here this._op_manager.complete_operation(OperationType.SUBSCRIBE, mid) @@ -246,6 +291,7 @@ def on_subscribe(client, userdata, mid, granted_qos): def on_unsubscribe(client, userdata, mid): this = self_weakref() logger.info("UNSUBACK received for {}".format(mid)) + this.stats.count_unsuback += 1 # unsubscribe failures are returned from the unsubscribe() call. This is just # a notification that a SUBACK was received, so there is no failure case here this._op_manager.complete_operation(OperationType.UNSUBSCRIBE, mid) @@ -253,6 +299,7 @@ def on_unsubscribe(client, userdata, mid): def on_publish(client, userdata, mid): this = self_weakref() logger.info("payload published for {}".format(mid)) + this.stats.count_puback += 1 # publish failures are returned from the publish() call. This is just # a notification that a PUBACK was received, so there is no failure case here this._op_manager.complete_operation(OperationType.PUBLISH, mid) @@ -260,6 +307,7 @@ def on_publish(client, userdata, mid): def on_message(client, userdata, mqtt_message): this = self_weakref() logger.info("message received on {}".format(mqtt_message.topic)) + this.stats.count_message_received += 1 if this.on_mqtt_message_received_handler: try: @@ -405,7 +453,9 @@ def connect(self, password=None): rc = self._mqtt_client.connect( host=self._hostname, port=8883, keepalive=self._keep_alive ) + add_count_to_dict(self.stats.connect_rc_codes, rc) except socket.error as e: + add_count_to_dict(self.stats.connect_exceptions, e) self._force_transport_disconnect_and_cleanup() # Only this type will raise a special error @@ -428,6 +478,7 @@ def connect(self, password=None): raise exceptions.ConnectionFailedError() from e except Exception as e: + add_count_to_dict(self.stats.connect_exceptions, e) self._force_transport_disconnect_and_cleanup() raise exceptions.ProtocolClientError("Unexpected Paho failure during connect") from e @@ -451,6 +502,7 @@ def disconnect(self, clear_inflight=False): try: rc = self._mqtt_client.disconnect() except Exception as e: + add_count_to_dict(self.stats.disconnect_exceptions, e) raise exceptions.ProtocolClientError("Unexpected Paho failure during disconnect") from e finally: self._mqtt_client.loop_stop() @@ -459,6 +511,8 @@ def disconnect(self, clear_inflight=False): logger.debug("in paho thread. nulling _thread") self._mqtt_client._thread = None + add_count_to_dict(self.stats.disconnect_rc_codes, rc) + logger.debug("_mqtt_client.disconnect returned rc={}".format(rc)) if rc: # This could result in ConnectionDroppedError or ProtocolClientError @@ -488,14 +542,18 @@ def subscribe(self, topic, qos=1, callback=None): :raises: ProtocolClientError if there is some other client error. :raises: NoConnectionError if the client isn't actually connected. """ + self.stats.count_subscribe += 1 logger.info("subscribing to {} with qos {}".format(topic, qos)) try: (rc, mid) = self._mqtt_client.subscribe(topic, qos=qos) - except ValueError: + except ValueError as e: + add_count_to_dict(self.stats.subscribe_exceptions, e) raise except Exception as e: + add_count_to_dict(self.stats.subscribe_exceptions, e) raise exceptions.ProtocolClientError("Unexpected Paho failure during subscribe") from e logger.debug("_mqtt_client.subscribe returned rc={}".format(rc)) + add_count_to_dict(self.stats.subscribe_rc_codes, rc) if rc: # This could result in ConnectionDroppedError or ProtocolClientError raise _create_error_from_rc_code(rc) @@ -513,16 +571,20 @@ def unsubscribe(self, topic, callback=None): :raises: ProtocolClientError if there is some other client error. :raises: NoConnectionError if the client isn't actually connected. """ + self.stats.count_unsubscribe += 1 logger.info("unsubscribing from {}".format(topic)) try: (rc, mid) = self._mqtt_client.unsubscribe(topic) - except ValueError: + except ValueError as e: + add_count_to_dict(self.stats.unsubscribe_exceptions, e) raise except Exception as e: + add_count_to_dict(self.stats.unsubscribe_exceptions, e) raise exceptions.ProtocolClientError( "Unexpected Paho failure during unsubscribe" ) from e logger.debug("_mqtt_client.unsubscribe returned rc={}".format(rc)) + add_count_to_dict(self.stats.unsubscribe_rc_codes, rc) if rc: # This could result in ConnectionDroppedError or ProtocolClientError raise _create_error_from_rc_code(rc) @@ -546,16 +608,18 @@ def publish(self, topic, payload, qos=1, callback=None): :raises: ConnectionDroppedError if connection is dropped during execution. :raises: ProtocolClientError if there is some other client error. """ + self.stats.count_publish += 1 logger.info("publishing on {}".format(topic)) try: (rc, mid) = self._mqtt_client.publish(topic=topic, payload=payload, qos=qos) - except ValueError: - raise - except TypeError: + except (ValueError, TypeError) as e: + add_count_to_dict(self.stats.publish_exceptions, e) raise except Exception as e: + add_count_to_dict(self.stats.publish_exceptions, e) raise exceptions.ProtocolClientError("Unexpected Paho failure during publish") from e logger.debug("_mqtt_client.publish returned rc={}".format(rc)) + add_count_to_dict(self.stats.publish_rc_codes, rc) if rc: # Even though Paho returns a rc code indicating an error, it still stores the message # and will publish on connect, so this isn't really a failure - it just hangs. diff --git a/longhaul/longhaul.py b/longhaul/longhaul.py index 6ad0ae9d3..cb0d6e3f8 100644 --- a/longhaul/longhaul.py +++ b/longhaul/longhaul.py @@ -231,6 +231,7 @@ class ClientStatus(object): iothub_client_status: IoTHubClientStatus iothub_client_config: IoTHubClientConfig send_message: SendMessageStatus + transport_stats: dataclasses.dataclass heap_history: HeapHistoryStatus start_time: int = 0 @@ -248,6 +249,7 @@ def __init__(self): self.iothub_client_config = IoTHubClientConfig() self.send_message = SendMessageStatus() self.heap_history = HeapHistoryStatus() + self.transport_class = None def wrap_in_try_catch(func): @@ -268,12 +270,16 @@ def get_type_name(e): return type(e).__name__ -def get_paho_from_device_client(device_client): +def get_transport_from_device_client(device_client): pipeline_root = device_client._mqtt_pipeline._pipeline stage = pipeline_root while stage.next: stage = stage.next - return stage.transport._mqtt_client + return stage.transport + + +def get_paho_from_device_client(device_client): + return get_transport_from_device_client(device_client)._mqtt_client def get_paho_config(paho_client): @@ -288,7 +294,7 @@ def get_paho_config(paho_client): config.host = paho_client._host config.port = paho_client._port config.proxy_args = paho_client._proxy - config.socket_class = str(type(paho_client.socket())) + config.socket_class = get_type_name(paho_client.socket()) config.socket_name = ( str(paho_client.socket().getsockname()) if paho_client.socket() else "No socket" ) @@ -326,7 +332,7 @@ def get_iothub_client_config(iothub_client): internal_config_object = iothub_client._mqtt_pipeline._nucleus.pipeline_configuration config = IoTHubClientConfig() - config.client_class = str(type(iothub_client)) + config.client_class = get_type_name(iothub_client) config.server_verification_cert = ( True if internal_config_object.server_verification_cert else False ) @@ -343,9 +349,11 @@ def get_iothub_client_config(iothub_client): config.x509 = True if internal_config_object.x509 else False sastoken = internal_config_object.sastoken config.sastoken_ttl = sastoken.ttl if sastoken else 0 - config.sastoken_class = str(type(sastoken)) if sastoken else None + config.sastoken_class = get_type_name(sastoken) if sastoken else None config.sastoken_signing_mechanism_class = ( - str(type(sastoken._signing_mechanism)) if sastoken and sastoken._signing_mechanism else None + get_type_name(sastoken._signing_mechanism) + if sastoken and sastoken._signing_mechanism + else None ) return config @@ -481,6 +489,7 @@ async def display_loop(self): self.status.paho_status = get_paho_status(self.paho) self.status.iothub_client_config = get_iothub_client_config(self.device_client) self.status.iothub_client_status = get_iothub_client_status(self.device_client) + self.status.transport_stats = get_transport_from_device_client(self.device_client).stats self.status.reconnect.time_since_last_connect = format_time_delta( self.status.reconnect.last_connect_time From ee0c9c9eccdaa63f6eef21496256280cc49ac1a9 Mon Sep 17 00:00:00 2001 From: Bert Kleewein Date: Wed, 23 Nov 2022 10:55:54 -0800 Subject: [PATCH 4/5] move transport code into transport --- .../azure/iot/device/common/mqtt_transport.py | 167 +++++++++++++++--- longhaul/longhaul.py | 112 ++---------- 2 files changed, 157 insertions(+), 122 deletions(-) diff --git a/azure-iot-device/azure/iot/device/common/mqtt_transport.py b/azure-iot-device/azure/iot/device/common/mqtt_transport.py index 0c5185f4e..855ff3e4c 100644 --- a/azure-iot-device/azure/iot/device/common/mqtt_transport.py +++ b/azure-iot-device/azure/iot/device/common/mqtt_transport.py @@ -15,6 +15,7 @@ from enum import Enum import socks import dataclasses +import datetime logger = logging.getLogger(__name__) @@ -51,23 +52,42 @@ @dataclasses.dataclass -class TransportStats(object): +class PahoStatus(object): connect_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes returned from the `connect` method""" + on_connect_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes passed into the `on_connect` handler""" + connect_exceptions: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of exceptions raised by the `connect` method""" disconnect_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes returned from the `disconnect` method""" + on_disconnect_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes passed into the `on_disconnect` handler""" + disconnect_exceptions: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of exceptions raised by the `disconnect` method""" publish_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes returned from the `publish` method""" + publish_exceptions: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of exceptions raised by the `publish` method""" subscribe_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes returned from the `subscribe` method""" + subscribe_exceptions: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of exceptions raised by the `subscribe` method""" unsubscribe_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes returned from the `unsubscribe` method""" + unsubscribe_exceptions: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of exceptions raised by the `unsubscribe` method""" count_message_received: int = 0 @@ -80,6 +100,41 @@ class TransportStats(object): count_publish: int = 0 count_puback: int = 0 + shut_down: bool = False + + time_since_last_paho_traffic_in: str = "" + time_since_last_paho_traffic_out: str = "" + client_object_id: int = 0 + thread_name: str = "" + thread_is_alive: bool = False + len_out_mesage_queue: int = 0 + len_in_message_queue: int = 0 + len_out_pakcet_queue: int = 0 + thread_terminate: bool = False + paho_connection_state: int = 0 + + def to_dict(self): + return dataclasses.asdict(self) + + +@dataclasses.dataclass(order=True) +class PahoConfig(object): + transport: str = "" + protocol: str = "" + keepalive: int = 0 + connect_timeout: int = 0 + reconnect_on_failure: bool = False + reconnect_delay_min: int = 0 + reconnect_delay_max: int = 0 + host: str = "" + port: int = 0 + proxy_args: dict = dataclasses.field(default_factory=dict) + socket_class: str = "" + socket_name: str = "" + + def to_dict(self): + return dataclasses.asdict(self) + def add_count_to_dict(dikt, key): if isinstance(key, Exception): @@ -87,6 +142,13 @@ def add_count_to_dict(dikt, key): dikt[key] = dikt.get(key, 0) + 1 +def format_time_delta(s): + if s: + return str(datetime.timedelta(seconds=mqtt.time_func() - s)) + else: + return "infinity" + + def _create_error_from_connack_rc_code(rc): """ Given a paho CONNACK rc code, return an Exception that can be raised @@ -169,7 +231,7 @@ def __init__( self._mqtt_client = self._create_mqtt_client() - self.stats = TransportStats() + self._paho_status = PahoStatus() def _create_mqtt_client(self): """ @@ -216,7 +278,7 @@ def on_connect(client, userdata, flags, rc): this = self_weakref() logger.info("connected with result code: {}".format(rc)) - add_count_to_dict(this.stats.on_connect_rc_codes, rc) + add_count_to_dict(this._paho_status.on_connect_rc_codes, rc) if rc: # i.e. if there is an error if this.on_mqtt_connection_failure_handler: @@ -246,7 +308,8 @@ def on_disconnect(client, userdata, rc): this = self_weakref() logger.info("disconnected with result code: {}".format(rc)) - add_count_to_dict(this.stats.on_disconnect_rc_codes, rc) + if this: + add_count_to_dict(this._paho_status.on_disconnect_rc_codes, rc) cause = None if rc: # i.e. if there is an error @@ -283,7 +346,7 @@ def on_disconnect(client, userdata, rc): def on_subscribe(client, userdata, mid, granted_qos): this = self_weakref() logger.info("suback received for {}".format(mid)) - this.stats.count_suback += 1 + this._paho_status.count_suback += 1 # subscribe failures are returned from the subscribe() call. This is just # a notification that a SUBACK was received, so there is no failure case here this._op_manager.complete_operation(OperationType.SUBSCRIBE, mid) @@ -291,7 +354,7 @@ def on_subscribe(client, userdata, mid, granted_qos): def on_unsubscribe(client, userdata, mid): this = self_weakref() logger.info("UNSUBACK received for {}".format(mid)) - this.stats.count_unsuback += 1 + this._paho_status.count_unsuback += 1 # unsubscribe failures are returned from the unsubscribe() call. This is just # a notification that a SUBACK was received, so there is no failure case here this._op_manager.complete_operation(OperationType.UNSUBSCRIBE, mid) @@ -299,7 +362,7 @@ def on_unsubscribe(client, userdata, mid): def on_publish(client, userdata, mid): this = self_weakref() logger.info("payload published for {}".format(mid)) - this.stats.count_puback += 1 + this._paho_status.count_puback += 1 # publish failures are returned from the publish() call. This is just # a notification that a PUBACK was received, so there is no failure case here this._op_manager.complete_operation(OperationType.PUBLISH, mid) @@ -307,7 +370,7 @@ def on_publish(client, userdata, mid): def on_message(client, userdata, mqtt_message): this = self_weakref() logger.info("message received on {}".format(mqtt_message.topic)) - this.stats.count_message_received += 1 + this._paho_status.count_message_received += 1 if this.on_mqtt_message_received_handler: try: @@ -410,6 +473,7 @@ def _create_ssl_context(self): def shutdown(self): """Shut down the transport. This is (currently) irreversible.""" + self._paho_status.shut_down = True # Remove the disconnect handler from Paho. We don't want to trigger any events in response # to the shutdown and confuse the higher level layers of code. Just end it. self._mqtt_client.on_disconnect = None @@ -453,9 +517,9 @@ def connect(self, password=None): rc = self._mqtt_client.connect( host=self._hostname, port=8883, keepalive=self._keep_alive ) - add_count_to_dict(self.stats.connect_rc_codes, rc) + add_count_to_dict(self._paho_status.connect_rc_codes, rc) except socket.error as e: - add_count_to_dict(self.stats.connect_exceptions, e) + add_count_to_dict(self._paho_status.connect_exceptions, e) self._force_transport_disconnect_and_cleanup() # Only this type will raise a special error @@ -478,7 +542,7 @@ def connect(self, password=None): raise exceptions.ConnectionFailedError() from e except Exception as e: - add_count_to_dict(self.stats.connect_exceptions, e) + add_count_to_dict(self._paho_status.connect_exceptions, e) self._force_transport_disconnect_and_cleanup() raise exceptions.ProtocolClientError("Unexpected Paho failure during connect") from e @@ -502,7 +566,7 @@ def disconnect(self, clear_inflight=False): try: rc = self._mqtt_client.disconnect() except Exception as e: - add_count_to_dict(self.stats.disconnect_exceptions, e) + add_count_to_dict(self._paho_status.disconnect_exceptions, e) raise exceptions.ProtocolClientError("Unexpected Paho failure during disconnect") from e finally: self._mqtt_client.loop_stop() @@ -511,7 +575,7 @@ def disconnect(self, clear_inflight=False): logger.debug("in paho thread. nulling _thread") self._mqtt_client._thread = None - add_count_to_dict(self.stats.disconnect_rc_codes, rc) + add_count_to_dict(self._paho_status.disconnect_rc_codes, rc) logger.debug("_mqtt_client.disconnect returned rc={}".format(rc)) if rc: @@ -542,18 +606,18 @@ def subscribe(self, topic, qos=1, callback=None): :raises: ProtocolClientError if there is some other client error. :raises: NoConnectionError if the client isn't actually connected. """ - self.stats.count_subscribe += 1 + self._paho_status.count_subscribe += 1 logger.info("subscribing to {} with qos {}".format(topic, qos)) try: (rc, mid) = self._mqtt_client.subscribe(topic, qos=qos) except ValueError as e: - add_count_to_dict(self.stats.subscribe_exceptions, e) + add_count_to_dict(self._paho_status.subscribe_exceptions, e) raise except Exception as e: - add_count_to_dict(self.stats.subscribe_exceptions, e) + add_count_to_dict(self._paho_status.subscribe_exceptions, e) raise exceptions.ProtocolClientError("Unexpected Paho failure during subscribe") from e logger.debug("_mqtt_client.subscribe returned rc={}".format(rc)) - add_count_to_dict(self.stats.subscribe_rc_codes, rc) + add_count_to_dict(self._paho_status.subscribe_rc_codes, rc) if rc: # This could result in ConnectionDroppedError or ProtocolClientError raise _create_error_from_rc_code(rc) @@ -571,20 +635,20 @@ def unsubscribe(self, topic, callback=None): :raises: ProtocolClientError if there is some other client error. :raises: NoConnectionError if the client isn't actually connected. """ - self.stats.count_unsubscribe += 1 + self._paho_status.count_unsubscribe += 1 logger.info("unsubscribing from {}".format(topic)) try: (rc, mid) = self._mqtt_client.unsubscribe(topic) except ValueError as e: - add_count_to_dict(self.stats.unsubscribe_exceptions, e) + add_count_to_dict(self._paho_status.unsubscribe_exceptions, e) raise except Exception as e: - add_count_to_dict(self.stats.unsubscribe_exceptions, e) + add_count_to_dict(self._paho_status.unsubscribe_exceptions, e) raise exceptions.ProtocolClientError( "Unexpected Paho failure during unsubscribe" ) from e logger.debug("_mqtt_client.unsubscribe returned rc={}".format(rc)) - add_count_to_dict(self.stats.unsubscribe_rc_codes, rc) + add_count_to_dict(self._paho_status.unsubscribe_rc_codes, rc) if rc: # This could result in ConnectionDroppedError or ProtocolClientError raise _create_error_from_rc_code(rc) @@ -608,18 +672,18 @@ def publish(self, topic, payload, qos=1, callback=None): :raises: ConnectionDroppedError if connection is dropped during execution. :raises: ProtocolClientError if there is some other client error. """ - self.stats.count_publish += 1 + self._paho_status.count_publish += 1 logger.info("publishing on {}".format(topic)) try: (rc, mid) = self._mqtt_client.publish(topic=topic, payload=payload, qos=qos) except (ValueError, TypeError) as e: - add_count_to_dict(self.stats.publish_exceptions, e) + add_count_to_dict(self._paho_status.publish_exceptions, e) raise except Exception as e: - add_count_to_dict(self.stats.publish_exceptions, e) + add_count_to_dict(self._paho_status.publish_exceptions, e) raise exceptions.ProtocolClientError("Unexpected Paho failure during publish") from e logger.debug("_mqtt_client.publish returned rc={}".format(rc)) - add_count_to_dict(self.stats.publish_rc_codes, rc) + add_count_to_dict(self._paho_status.publish_rc_codes, rc) if rc: # Even though Paho returns a rc code indicating an error, it still stores the message # and will publish on connect, so this isn't really a failure - it just hangs. @@ -629,6 +693,59 @@ def publish(self, topic, payload, qos=1, callback=None): raise _create_error_from_rc_code(rc) self._op_manager.establish_operation(OperationType.PUBLISH, mid, callback) + def get_debug_status(self): + """ + Return an infomrational status object that describes the current state of the Paho transport + """ + + self._paho_status.time_since_last_paho_traffic_in = format_time_delta( + self._mqtt_client._last_msg_in + ) + self._paho_status.time_since_last_paho_traffic_out = format_time_delta( + self._mqtt_client._last_msg_out + ) + + self._paho_status.client_object_id = id(self._mqtt_client) + self._paho_status.thread_name = ( + self._mqtt_client._thread.name if self._mqtt_client._thread else "None" + ) + self._paho_status.thread_is_alive = ( + str(self._mqtt_client._thread.is_alive()) if self._mqtt_client._thread else "No thread" + ) + self._paho_status.len_out_mesage_queue = len(self._mqtt_client._out_messages) + self._paho_status.len_in_message_queue = len(self._mqtt_client._in_messages) + self._paho_status.len_out_pakcet_queue = len(self._mqtt_client._out_packet) + self._paho_status.thread_terminate = self._mqtt_client._thread_terminate + self._paho_status.paho_connection_state = self._mqtt_client._state + + return self._paho_status + + def get_debug_config(self): + """ + Return an infomrational status object that describes the configuration of the Paho transport + """ + config = PahoConfig() + + config.transport = self._mqtt_client._transport + config.protocol = str(self._mqtt_client._protocol) + config.keepalive = self._mqtt_client._keepalive + config.connect_timeout = self._mqtt_client._connect_timeout + config.reconnect_on_failure = self._mqtt_client._reconnect_on_failure + config.reconnect_delay_min = self._mqtt_client._reconnect_min_delay + config.reconnect_delay_max = self._mqtt_client._reconnect_max_delay + config.host = self._mqtt_client._host + config.port = self._mqtt_client._port + config.proxy_args = self._mqtt_client._proxy + config.socket_class = type(self._mqtt_client.socket()).__name__ + config.socket_name = ( + str(self._mqtt_client.socket().getsockname()) + if self._mqtt_client.socket() + and getattr(self._mqtt_client.socket(), "getsockname", None) + else "No socket name" + ) + + return config + class OperationType(Enum): PUBLISH = "PUBLISH" diff --git a/longhaul/longhaul.py b/longhaul/longhaul.py index cb0d6e3f8..20491e268 100644 --- a/longhaul/longhaul.py +++ b/longhaul/longhaul.py @@ -112,6 +112,13 @@ time_func = time.time +def format_time_delta(s): + if s: + return str(datetime.timedelta(seconds=time_func() - s)) + else: + return "infinity" + + @dataclasses.dataclass class HeapHistoryItem(object): time: str @@ -152,42 +159,6 @@ class ExceptionStatus(object): send_exceptions: dict = dataclasses.field(default_factory=dict) -@dataclasses.dataclass(order=True) -class PahoStatus(object): - time_since_last_paho_traffic_in: str = "" - time_since_last_paho_traffic_out: str = "" - client_object_id: int = 0 - thread_name: str = "" - thread_is_alive: bool = False - len_out_mesage_queue: int = 0 - len_in_message_queue: int = 0 - len_out_pakcet_queue: int = 0 - thread_terminate: bool = False - connection_state: int = 0 - - def to_dict(self): - return dataclasses.asdict(self) - - -@dataclasses.dataclass(order=True) -class PahoConfig(object): - transport: str = "" - protocol: str = "" - keepalive: int = 0 - connect_timeout: int = 0 - reconnect_on_failure: bool = False - reconnect_delay_min: int = 0 - reconnect_delay_max: int = 0 - host: str = "" - port: int = 0 - proxy_args: dict = dataclasses.field(default_factory=dict) - socket_class: str = "" - socket_name: str = "" - - def to_dict(self): - return dataclasses.asdict(self) - - @dataclasses.dataclass(order=True) class IoTHubClientConfig(object): client_class: str = "" @@ -226,12 +197,11 @@ class SendMessageStatus(object): class ClientStatus(object): reconnect: ReconnectStatus exception: ExceptionStatus - paho_status: PahoStatus - paho_config: PahoConfig + paho_status: dataclasses.dataclass + paho_config: dataclasses.dataclass iothub_client_status: IoTHubClientStatus iothub_client_config: IoTHubClientConfig send_message: SendMessageStatus - transport_stats: dataclasses.dataclass heap_history: HeapHistoryStatus start_time: int = 0 @@ -243,13 +213,12 @@ def __init__(self): super(ClientStatus, self).__init__() self.reconnect = ReconnectStatus() self.exception = ExceptionStatus() - self.paho_status = PahoStatus() - self.paho_config = PahoConfig() + self.paho_status = None + self.paho_config = None self.iothub_client_status = IoTHubClientStatus() self.iothub_client_config = IoTHubClientConfig() self.send_message = SendMessageStatus() self.heap_history = HeapHistoryStatus() - self.transport_class = None def wrap_in_try_catch(func): @@ -278,56 +247,6 @@ def get_transport_from_device_client(device_client): return stage.transport -def get_paho_from_device_client(device_client): - return get_transport_from_device_client(device_client)._mqtt_client - - -def get_paho_config(paho_client): - config = PahoConfig() - config.transport = paho_client._transport - config.protocol = str(paho_client._protocol) - config.keepalive = paho_client._keepalive - config.connect_timeout = paho_client._connect_timeout - config.reconnect_on_failure = paho_client._reconnect_on_failure - config.reconnect_delay_min = paho_client._reconnect_min_delay - config.reconnect_delay_max = paho_client._reconnect_max_delay - config.host = paho_client._host - config.port = paho_client._port - config.proxy_args = paho_client._proxy - config.socket_class = get_type_name(paho_client.socket()) - config.socket_name = ( - str(paho_client.socket().getsockname()) if paho_client.socket() else "No socket" - ) - return config - - -def format_time_delta(s): - if s: - return str(datetime.timedelta(seconds=time_func() - s)) - else: - return "infinity" - - -def get_paho_status(paho_client): - status = PahoStatus() - - status.time_since_last_paho_traffic_in = format_time_delta(paho_client._last_msg_in) - status.time_since_last_paho_traffic_out = format_time_delta(paho_client._last_msg_out) - - status.client_object_id = id(paho_client) - status.thread_name = paho_client._thread.name if paho_client._thread else "None" - status.thread_is_alive = ( - str(paho_client._thread.is_alive()) if paho_client._thread else "No thread" - ) - status.len_out_mesage_queue = len(paho_client._out_messages) - status.len_in_message_queue = len(paho_client._in_messages) - status.len_out_pakcet_queue = len(paho_client._out_packet) - status.thread_terminate = paho_client._thread_terminate - status.connection_state = paho_client._state - - return status - - def get_iothub_client_config(iothub_client): internal_config_object = iothub_client._mqtt_pipeline._nucleus.pipeline_configuration config = IoTHubClientConfig() @@ -474,8 +393,8 @@ async def reconnect_loop(self): ) @property - def paho(self): - return get_paho_from_device_client(self.device_client) + def transport(self): + return get_transport_from_device_client(self.device_client) @wrap_in_try_catch async def display_loop(self): @@ -485,11 +404,10 @@ async def display_loop(self): self.status.time_since_start = format_time_delta(self.status.start_time) - self.status.paho_config = get_paho_config(self.paho) - self.status.paho_status = get_paho_status(self.paho) + self.status.paho_config = self.transport.get_debug_config() + self.status.paho_status = self.transport.get_debug_status() self.status.iothub_client_config = get_iothub_client_config(self.device_client) self.status.iothub_client_status = get_iothub_client_status(self.device_client) - self.status.transport_stats = get_transport_from_device_client(self.device_client).stats self.status.reconnect.time_since_last_connect = format_time_delta( self.status.reconnect.last_connect_time From 8372fb7d5d27645b3f02b1c9188d3a3b69a44876 Mon Sep 17 00:00:00 2001 From: Bert Kleewein Date: Wed, 23 Nov 2022 11:01:37 -0800 Subject: [PATCH 5/5] move scripts from v2 longhaul branch to v3 longhaul branch --- dev_utils/dev_utils/iptables.py | 27 +++++++++++++++++++-------- scripts/packet_drop | 5 +++++ scripts/packet_reject | 5 +++++ scripts/packet_restore | 5 +++++ 4 files changed, 34 insertions(+), 8 deletions(-) create mode 100755 scripts/packet_drop create mode 100755 scripts/packet_reject create mode 100755 scripts/packet_restore diff --git a/dev_utils/dev_utils/iptables.py b/dev_utils/dev_utils/iptables.py index 00eea78ab..f346176fb 100644 --- a/dev_utils/dev_utils/iptables.py +++ b/dev_utils/dev_utils/iptables.py @@ -64,28 +64,39 @@ def transport_to_port(transport): ) -def disconnect_output_port(disconnect_type, transport, host): +def disconnect_output_port(disconnect_type, transport, host=None): """ Disconnect the port for a given transport. disconnect_type can either be "DROP" to drop packets sent to that port, or it can be "REJECT" to reject packets sent to that port. """ # sudo -n iptables -A OUTPUT -p tcp --dport 8883 --destination 20.21.22.23 -j DROP - ip = get_ip(host) port = transport_to_port(transport) - run_shell_command( - "{}iptables -A OUTPUT -p tcp --dport {} --destination {} -j {}".format( - get_sudo_prefix(), port, ip, disconnect_type + + if host: + ip = get_ip(host) + run_shell_command( + "{}iptables -A OUTPUT -p tcp --dport {} --destination {} -j {}".format( + get_sudo_prefix(), port, ip, disconnect_type + ) + ) + else: + run_shell_command( + "{}iptables -A OUTPUT -p tcp --dport {} -j {}".format( + get_sudo_prefix(), port, disconnect_type + ) ) - ) -def reconnect_all(transport, host): +def reconnect_all(transport, host=None): """ Reconnect all disconnects for this host and transport. Effectively, clean up anything that this module may have done. """ if not sys.platform.startswith("win"): - ip = get_ip(host) + if host: + ip = get_ip(host) + else: + ip = "" port = transport_to_port(transport) for disconnect_type in all_disconnect_types: # sudo -n iptables -L OUTPUT -n -v --line-numbers diff --git a/scripts/packet_drop b/scripts/packet_drop new file mode 100755 index 000000000..f9bf2ddd8 --- /dev/null +++ b/scripts/packet_drop @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +from dev_utils import iptables + +iptables.disconnect_output_port("DROP", "mqtt") +iptables.disconnect_output_port("DROP", "mqttws") diff --git a/scripts/packet_reject b/scripts/packet_reject new file mode 100755 index 000000000..35f642d5e --- /dev/null +++ b/scripts/packet_reject @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +from dev_utils import iptables + +iptables.disconnect_output_port("REJECT", "mqtt") +iptables.disconnect_output_port("REJECT", "mqttws") diff --git a/scripts/packet_restore b/scripts/packet_restore new file mode 100755 index 000000000..db8fbd41d --- /dev/null +++ b/scripts/packet_restore @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +from dev_utils import iptables + +iptables.reconnect_all("mqtt") +iptables.reconnect_all("mqttws")