Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feedback requested: Python SDK config and status object for debugging #1082

Draft
wants to merge 5 commits into
base: v3
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 186 additions & 5 deletions azure-iot-device/azure/iot/device/common/mqtt_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from . import transport_exceptions as exceptions
from enum import Enum
import socks
import dataclasses
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this requires Python 3.7, there should be an update to the setup.py to exclude Python 3.6 (and a removal of the associated classifier)

import datetime

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,6 +51,104 @@
}


@dataclasses.dataclass
class PahoStatus(object):
connect_rc_codes: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of rc codes returned from the `connect` method"""
Copy link
Member

@cartertinney cartertinney Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is.... super nitpicky... (but also kind of not) but you aren't supposed to use """ to make comments anywhere except a docstring. As I recall, they are interpreted differently by the runtime, because the """ is a string literal, wheras a # comment just gets completely ignored.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not nitpicky. These string literals end up taking space at runtime, so the distinction is important. I did it this way on purpose. Even though docstrings on attributes are technically not part of the Python standard, some tools will pick these up (if you put them after the attribute definition). I think PyCharm will pick these up, and maybe some versions of Sphynx.


on_connect_rc_codes: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of rc codes passed into the `on_connect` handler"""

connect_exceptions: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of exceptions raised by the `connect` method"""

disconnect_rc_codes: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of rc codes returned from the `disconnect` method"""

on_disconnect_rc_codes: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of rc codes passed into the `on_disconnect` handler"""

disconnect_exceptions: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of exceptions raised by the `disconnect` method"""

publish_rc_codes: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of rc codes returned from the `publish` method"""

publish_exceptions: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of exceptions raised by the `publish` method"""

subscribe_rc_codes: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of rc codes returned from the `subscribe` method"""

subscribe_exceptions: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of exceptions raised by the `subscribe` method"""

unsubscribe_rc_codes: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of rc codes returned from the `unsubscribe` method"""

unsubscribe_exceptions: dict = dataclasses.field(default_factory=dict)
"""Value->count dictionary of exceptions raised by the `unsubscribe` method"""

count_message_received: int = 0

count_subscribe: int = 0
count_suback: int = 0

count_unsubscribe: int = 0
count_unsuback: int = 0

count_publish: int = 0
count_puback: int = 0

shut_down: bool = False

time_since_last_paho_traffic_in: str = ""
time_since_last_paho_traffic_out: str = ""
client_object_id: int = 0
thread_name: str = ""
thread_is_alive: bool = False
len_out_mesage_queue: int = 0
len_in_message_queue: int = 0
len_out_pakcet_queue: int = 0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling?

thread_terminate: bool = False
paho_connection_state: int = 0

def to_dict(self):
return dataclasses.asdict(self)


@dataclasses.dataclass(order=True)
class PahoConfig(object):
transport: str = ""
protocol: str = ""
keepalive: int = 0
connect_timeout: int = 0
reconnect_on_failure: bool = False
reconnect_delay_min: int = 0
reconnect_delay_max: int = 0
host: str = ""
port: int = 0
proxy_args: dict = dataclasses.field(default_factory=dict)
socket_class: str = ""
socket_name: str = ""

def to_dict(self):
return dataclasses.asdict(self)


def add_count_to_dict(dikt, key):
if isinstance(key, Exception):
key = type(key).__name__
dikt[key] = dikt.get(key, 0) + 1


def format_time_delta(s):
if s:
return str(datetime.timedelta(seconds=mqtt.time_func() - s))
else:
return "infinity"


def _create_error_from_connack_rc_code(rc):
"""
Given a paho CONNACK rc code, return an Exception that can be raised
Expand Down Expand Up @@ -131,6 +231,8 @@ def __init__(

self._mqtt_client = self._create_mqtt_client()

self._paho_status = PahoStatus()

def _create_mqtt_client(self):
"""
Create the MQTT client object and assign all necessary event handler callbacks.
Expand Down Expand Up @@ -176,6 +278,8 @@ def on_connect(client, userdata, flags, rc):
this = self_weakref()
logger.info("connected with result code: {}".format(rc))

add_count_to_dict(this._paho_status.on_connect_rc_codes, rc)

if rc: # i.e. if there is an error
if this.on_mqtt_connection_failure_handler:
try:
Expand Down Expand Up @@ -204,6 +308,9 @@ def on_disconnect(client, userdata, rc):
this = self_weakref()
logger.info("disconnected with result code: {}".format(rc))

if this:
add_count_to_dict(this._paho_status.on_disconnect_rc_codes, rc)

cause = None
if rc: # i.e. if there is an error
logger.debug("".join(traceback.format_stack()))
Expand Down Expand Up @@ -239,27 +346,31 @@ def on_disconnect(client, userdata, rc):
def on_subscribe(client, userdata, mid, granted_qos):
this = self_weakref()
logger.info("suback received for {}".format(mid))
this._paho_status.count_suback += 1
# subscribe failures are returned from the subscribe() call. This is just
# a notification that a SUBACK was received, so there is no failure case here
this._op_manager.complete_operation(OperationType.SUBSCRIBE, mid)

def on_unsubscribe(client, userdata, mid):
this = self_weakref()
logger.info("UNSUBACK received for {}".format(mid))
this._paho_status.count_unsuback += 1
# unsubscribe failures are returned from the unsubscribe() call. This is just
# a notification that a SUBACK was received, so there is no failure case here
this._op_manager.complete_operation(OperationType.UNSUBSCRIBE, mid)

def on_publish(client, userdata, mid):
this = self_weakref()
logger.info("payload published for {}".format(mid))
this._paho_status.count_puback += 1
# publish failures are returned from the publish() call. This is just
# a notification that a PUBACK was received, so there is no failure case here
this._op_manager.complete_operation(OperationType.PUBLISH, mid)

def on_message(client, userdata, mqtt_message):
this = self_weakref()
logger.info("message received on {}".format(mqtt_message.topic))
this._paho_status.count_message_received += 1

if this.on_mqtt_message_received_handler:
try:
Expand Down Expand Up @@ -362,6 +473,7 @@ def _create_ssl_context(self):

def shutdown(self):
"""Shut down the transport. This is (currently) irreversible."""
self._paho_status.shut_down = True
# Remove the disconnect handler from Paho. We don't want to trigger any events in response
# to the shutdown and confuse the higher level layers of code. Just end it.
self._mqtt_client.on_disconnect = None
Expand Down Expand Up @@ -405,7 +517,9 @@ def connect(self, password=None):
rc = self._mqtt_client.connect(
host=self._hostname, port=8883, keepalive=self._keep_alive
)
add_count_to_dict(self._paho_status.connect_rc_codes, rc)
except socket.error as e:
add_count_to_dict(self._paho_status.connect_exceptions, e)
self._force_transport_disconnect_and_cleanup()

# Only this type will raise a special error
Expand All @@ -428,6 +542,7 @@ def connect(self, password=None):
raise exceptions.ConnectionFailedError() from e

except Exception as e:
add_count_to_dict(self._paho_status.connect_exceptions, e)
self._force_transport_disconnect_and_cleanup()

raise exceptions.ProtocolClientError("Unexpected Paho failure during connect") from e
Expand All @@ -451,6 +566,7 @@ def disconnect(self, clear_inflight=False):
try:
rc = self._mqtt_client.disconnect()
except Exception as e:
add_count_to_dict(self._paho_status.disconnect_exceptions, e)
raise exceptions.ProtocolClientError("Unexpected Paho failure during disconnect") from e
finally:
self._mqtt_client.loop_stop()
Expand All @@ -459,6 +575,8 @@ def disconnect(self, clear_inflight=False):
logger.debug("in paho thread. nulling _thread")
self._mqtt_client._thread = None

add_count_to_dict(self._paho_status.disconnect_rc_codes, rc)

logger.debug("_mqtt_client.disconnect returned rc={}".format(rc))
if rc:
# This could result in ConnectionDroppedError or ProtocolClientError
Expand Down Expand Up @@ -488,14 +606,18 @@ def subscribe(self, topic, qos=1, callback=None):
:raises: ProtocolClientError if there is some other client error.
:raises: NoConnectionError if the client isn't actually connected.
"""
self._paho_status.count_subscribe += 1
logger.info("subscribing to {} with qos {}".format(topic, qos))
try:
(rc, mid) = self._mqtt_client.subscribe(topic, qos=qos)
except ValueError:
except ValueError as e:
add_count_to_dict(self._paho_status.subscribe_exceptions, e)
raise
except Exception as e:
add_count_to_dict(self._paho_status.subscribe_exceptions, e)
raise exceptions.ProtocolClientError("Unexpected Paho failure during subscribe") from e
logger.debug("_mqtt_client.subscribe returned rc={}".format(rc))
add_count_to_dict(self._paho_status.subscribe_rc_codes, rc)
if rc:
# This could result in ConnectionDroppedError or ProtocolClientError
raise _create_error_from_rc_code(rc)
Expand All @@ -513,16 +635,20 @@ def unsubscribe(self, topic, callback=None):
:raises: ProtocolClientError if there is some other client error.
:raises: NoConnectionError if the client isn't actually connected.
"""
self._paho_status.count_unsubscribe += 1
logger.info("unsubscribing from {}".format(topic))
try:
(rc, mid) = self._mqtt_client.unsubscribe(topic)
except ValueError:
except ValueError as e:
add_count_to_dict(self._paho_status.unsubscribe_exceptions, e)
raise
except Exception as e:
add_count_to_dict(self._paho_status.unsubscribe_exceptions, e)
raise exceptions.ProtocolClientError(
"Unexpected Paho failure during unsubscribe"
) from e
logger.debug("_mqtt_client.unsubscribe returned rc={}".format(rc))
add_count_to_dict(self._paho_status.unsubscribe_rc_codes, rc)
if rc:
# This could result in ConnectionDroppedError or ProtocolClientError
raise _create_error_from_rc_code(rc)
Expand All @@ -546,16 +672,18 @@ def publish(self, topic, payload, qos=1, callback=None):
:raises: ConnectionDroppedError if connection is dropped during execution.
:raises: ProtocolClientError if there is some other client error.
"""
self._paho_status.count_publish += 1
logger.info("publishing on {}".format(topic))
try:
(rc, mid) = self._mqtt_client.publish(topic=topic, payload=payload, qos=qos)
except ValueError:
raise
except TypeError:
except (ValueError, TypeError) as e:
add_count_to_dict(self._paho_status.publish_exceptions, e)
raise
except Exception as e:
add_count_to_dict(self._paho_status.publish_exceptions, e)
raise exceptions.ProtocolClientError("Unexpected Paho failure during publish") from e
logger.debug("_mqtt_client.publish returned rc={}".format(rc))
add_count_to_dict(self._paho_status.publish_rc_codes, rc)
if rc:
# Even though Paho returns a rc code indicating an error, it still stores the message
# and will publish on connect, so this isn't really a failure - it just hangs.
Expand All @@ -565,6 +693,59 @@ def publish(self, topic, payload, qos=1, callback=None):
raise _create_error_from_rc_code(rc)
self._op_manager.establish_operation(OperationType.PUBLISH, mid, callback)

def get_debug_status(self):
"""
Return an infomrational status object that describes the current state of the Paho transport
"""

self._paho_status.time_since_last_paho_traffic_in = format_time_delta(
self._mqtt_client._last_msg_in
)
self._paho_status.time_since_last_paho_traffic_out = format_time_delta(
self._mqtt_client._last_msg_out
)

self._paho_status.client_object_id = id(self._mqtt_client)
self._paho_status.thread_name = (
self._mqtt_client._thread.name if self._mqtt_client._thread else "None"
)
self._paho_status.thread_is_alive = (
str(self._mqtt_client._thread.is_alive()) if self._mqtt_client._thread else "No thread"
)
self._paho_status.len_out_mesage_queue = len(self._mqtt_client._out_messages)
self._paho_status.len_in_message_queue = len(self._mqtt_client._in_messages)
self._paho_status.len_out_pakcet_queue = len(self._mqtt_client._out_packet)
self._paho_status.thread_terminate = self._mqtt_client._thread_terminate
self._paho_status.paho_connection_state = self._mqtt_client._state

return self._paho_status

def get_debug_config(self):
"""
Return an infomrational status object that describes the configuration of the Paho transport
"""
config = PahoConfig()

config.transport = self._mqtt_client._transport
Copy link
Member

@cartertinney cartertinney Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel as though accessing convention-private attributes of Paho is dangerous. There is no guarantee they don't change in an update to Paho, which could break anyone using our library until we make an update. Furthermore, that risk doesn't really seem worth it for a logging utility.

I could be convinced of the utility if we were to build some exception handling in to protect against this possibility, but even still, seems dicey.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's dangerous. I also think it's super useful -- at least some of these are. Using getattr() with a default would be much safer.

config.protocol = str(self._mqtt_client._protocol)
config.keepalive = self._mqtt_client._keepalive
config.connect_timeout = self._mqtt_client._connect_timeout
config.reconnect_on_failure = self._mqtt_client._reconnect_on_failure
config.reconnect_delay_min = self._mqtt_client._reconnect_min_delay
config.reconnect_delay_max = self._mqtt_client._reconnect_max_delay
config.host = self._mqtt_client._host
config.port = self._mqtt_client._port
config.proxy_args = self._mqtt_client._proxy
config.socket_class = type(self._mqtt_client.socket()).__name__
config.socket_name = (
str(self._mqtt_client.socket().getsockname())
if self._mqtt_client.socket()
and getattr(self._mqtt_client.socket(), "getsockname", None)
else "No socket name"
)

return config


class OperationType(Enum):
PUBLISH = "PUBLISH"
Expand Down
27 changes: 19 additions & 8 deletions dev_utils/dev_utils/iptables.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,39 @@ def transport_to_port(transport):
)


def disconnect_output_port(disconnect_type, transport, host):
def disconnect_output_port(disconnect_type, transport, host=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great change. Was thinking of doing this myself.

"""
Disconnect the port for a given transport. disconnect_type can either be "DROP" to drop
packets sent to that port, or it can be "REJECT" to reject packets sent to that port.
"""
# sudo -n iptables -A OUTPUT -p tcp --dport 8883 --destination 20.21.22.23 -j DROP
ip = get_ip(host)
port = transport_to_port(transport)
run_shell_command(
"{}iptables -A OUTPUT -p tcp --dport {} --destination {} -j {}".format(
get_sudo_prefix(), port, ip, disconnect_type

if host:
ip = get_ip(host)
run_shell_command(
"{}iptables -A OUTPUT -p tcp --dport {} --destination {} -j {}".format(
get_sudo_prefix(), port, ip, disconnect_type
)
)
else:
run_shell_command(
"{}iptables -A OUTPUT -p tcp --dport {} -j {}".format(
get_sudo_prefix(), port, disconnect_type
)
)
)


def reconnect_all(transport, host):
def reconnect_all(transport, host=None):
"""
Reconnect all disconnects for this host and transport. Effectively, clean up
anything that this module may have done.
"""
if not sys.platform.startswith("win"):
ip = get_ip(host)
if host:
ip = get_ip(host)
else:
ip = ""
port = transport_to_port(transport)
for disconnect_type in all_disconnect_types:
# sudo -n iptables -L OUTPUT -n -v --line-numbers
Expand Down
1 change: 1 addition & 0 deletions longhaul/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
logs/**/*
Loading