From dc34c58e6800d84397538f2c3ffc301355ae50d0 Mon Sep 17 00:00:00 2001 From: rob miller Date: Mon, 25 Apr 2022 09:43:04 +0800 Subject: [PATCH] reformat with black --- amqtt/broker.py | 38 +++++++++++++++++++++------ amqtt/client.py | 2 +- amqtt/mqtt/connack.py | 10 ++++--- amqtt/mqtt/connect.py | 10 +++++-- amqtt/mqtt/protocol/broker_handler.py | 11 ++++++-- amqtt/mqtt/protocol/client_handler.py | 9 +++++-- amqtt/mqtt/protocol/handler.py | 11 +++++--- amqtt/session.py | 6 ++--- 8 files changed, 73 insertions(+), 24 deletions(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index b99ad7d8..c41bf135 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -59,7 +59,13 @@ class RetainedApplicationMessage: __slots__ = ("source_session", "topic", "data", "qos") - def __init__(self, source_session: Optional[Session], topic: str, data: bytes, qos: int = None): + def __init__( + self, + source_session: Optional[Session], + topic: str, + data: bytes, + qos: int = None, + ): self.source_session = source_session self.topic = topic self.data = data @@ -126,7 +132,9 @@ def __init__(self, broker: "Broker") -> None: self.config = None self._broker_instance = broker - async def broadcast_message(self, topic: str, data: bytes, qos: Optional[int] = None): + async def broadcast_message( + self, topic: str, data: bytes, qos: Optional[int] = None + ): await self._broker_instance.internal_message_broadcast(topic, data, qos) def retain_message(self, topic_name: str, data: bytes, qos: Optional[int] = None): @@ -170,8 +178,10 @@ class Broker: _sessions: Dict[str, Tuple[Session, BrokerProtocolHandler]] _subscriptions: Dict[str, Tuple[Session, int]] _retained_messages: Dict[str, RetainedApplicationMessage] - - def __init__(self, config=None, loop: AbstractEventLoop = None, plugin_namespace: str = None): + + def __init__( + self, config=None, loop: AbstractEventLoop = None, plugin_namespace: str = None + ): self.logger = logging.getLogger(__name__) self.config = _defaults if config is not None: @@ -388,7 +398,9 @@ async def shutdown(self): await self.plugins_manager.fire_event(EVENT_BROKER_POST_SHUTDOWN) self.transitions.stopping_success() - async def internal_message_broadcast(self, topic: str, data: bytes, qos: Optional[int] = None): + async def internal_message_broadcast( + self, topic: str, data: bytes, qos: Optional[int] = None + ): return await self._broadcast_message(None, topic, data) async def ws_connected(self, websocket, uri, listener_name): @@ -659,7 +671,9 @@ async def client_connected( self.logger.debug("%s Client disconnected" % client_session.client_id) server.release_connection() - def _init_handler(self, session: Session, reader: Type[ReaderAdapter], writer: Type[WriterAdapter]): + def _init_handler( + self, session: Session, reader: Type[ReaderAdapter], writer: Type[WriterAdapter] + ): """ Create a BrokerProtocolHandler and attach to a session :return: @@ -948,7 +962,13 @@ async def _broadcast_loop(self): await asyncio.wait(running_tasks) raise # reraise per CancelledError semantics - async def _broadcast_message(self, session: Optional[Session], topic: str, data: bytes, force_qos: Optional[bool] = None): + async def _broadcast_message( + self, + session: Optional[Session], + topic: str, + data: bytes, + force_qos: Optional[bool] = None, + ): broadcast = {"session": session, "topic": topic, "data": data} if force_qos: broadcast["qos"] = force_qos @@ -976,7 +996,9 @@ async def publish_session_retained_messages(self, session: Session): if publish_tasks: await asyncio.wait(publish_tasks) - async def publish_retained_messages_for_subscription(self, subscription, session: Session): + async def publish_retained_messages_for_subscription( + self, subscription, session: Session + ): self.logger.debug( "Begin broadcasting messages retained due to subscription on '%s' from %s" % (subscription[0], format_client_message(session=session)) diff --git a/amqtt/client.py b/amqtt/client.py index 5b4d46a7..ddabceff 100644 --- a/amqtt/client.py +++ b/amqtt/client.py @@ -311,7 +311,7 @@ def get_retain_and_qos(): ) @mqtt_connected - async def subscribe(self, topics: List[Tuple[str,int]]): + async def subscribe(self, topics: List[Tuple[str, int]]): """ Subscribe to some topics. diff --git a/amqtt/mqtt/connack.py b/amqtt/mqtt/connack.py index c114a3e6..4e1bb610 100644 --- a/amqtt/mqtt/connack.py +++ b/amqtt/mqtt/connack.py @@ -22,7 +22,9 @@ class ConnackVariableHeader(MQTTVariableHeader): __slots__ = ("session_parent", "return_code") - def __init__(self, session_parent: Optional[int] = None, return_code: Optional[int] = None): + def __init__( + self, session_parent: Optional[int] = None, return_code: Optional[int] = None + ): super().__init__() self.session_parent = session_parent self.return_code = return_code @@ -76,7 +78,7 @@ def __init__( self, fixed: Optional[MQTTFixedHeader] = None, variable_header: Optional[ConnackVariableHeader] = None, - payload = None, + payload=None, ): if fixed is None: header = MQTTFixedHeader(CONNACK, 0x00) @@ -92,7 +94,9 @@ def __init__( self.payload = None @classmethod - def build(cls, session_parent: int = None, return_code: int = None) -> ConnackPacket: + def build( + cls, session_parent: int = None, return_code: int = None + ) -> ConnackPacket: v_header = ConnackVariableHeader(session_parent, return_code) packet = ConnackPacket(variable_header=v_header) return packet diff --git a/amqtt/mqtt/connect.py b/amqtt/mqtt/connect.py index 61773cd7..39a022a3 100644 --- a/amqtt/mqtt/connect.py +++ b/amqtt/mqtt/connect.py @@ -39,7 +39,11 @@ class ConnectVariableHeader(MQTTVariableHeader): RESERVED_FLAG: int = 0x01 def __init__( - self, connect_flags: int = 0x00, keep_alive: int = 0, proto_name: str = "MQTT", proto_level: int = 0x04 + self, + connect_flags: int = 0x00, + keep_alive: int = 0, + proto_name: str = "MQTT", + proto_level: int = 0x04, ): super().__init__() self.proto_name = proto_name @@ -118,7 +122,9 @@ def will_qos(self, val: int): self.flags |= val << 3 @classmethod - async def from_stream(cls, reader: ReaderAdapter, fixed_header: MQTTFixedHeader) -> ConnectVariableHeader: + async def from_stream( + cls, reader: ReaderAdapter, fixed_header: MQTTFixedHeader + ) -> ConnectVariableHeader: # protocol name protocol_name = await decode_string(reader) diff --git a/amqtt/mqtt/protocol/broker_handler.py b/amqtt/mqtt/protocol/broker_handler.py index be03a68b..15865233 100644 --- a/amqtt/mqtt/protocol/broker_handler.py +++ b/amqtt/mqtt/protocol/broker_handler.py @@ -33,7 +33,10 @@ class BrokerProtocolHandler(ProtocolHandler): def __init__( - self, plugins_manager: PluginManager, session: Session = None, loop: AbstractEventLoop = None + self, + plugins_manager: PluginManager, + session: Session = None, + loop: AbstractEventLoop = None, ): super().__init__(plugins_manager, session, loop) self._disconnect_waiter = None @@ -121,7 +124,11 @@ async def mqtt_connack_authorize(self, authorize: bool): @classmethod async def init_from_connect( - cls, reader: ReaderAdapter, writer: WriterAdapter, plugins_manager, loop: AbstractEventLoop = None + cls, + reader: ReaderAdapter, + writer: WriterAdapter, + plugins_manager, + loop: AbstractEventLoop = None, ) -> Tuple[BrokerProtocolHandler, Session]: """ diff --git a/amqtt/mqtt/protocol/client_handler.py b/amqtt/mqtt/protocol/client_handler.py index 9189b1f8..0a00170b 100644 --- a/amqtt/mqtt/protocol/client_handler.py +++ b/amqtt/mqtt/protocol/client_handler.py @@ -20,7 +20,10 @@ class ClientProtocolHandler(ProtocolHandler): def __init__( - self, plugins_manager: PluginManager, session: Session = None, loop: AbstractEventLoop = None + self, + plugins_manager: PluginManager, + session: Session = None, + loop: AbstractEventLoop = None, ): super().__init__(plugins_manager, session, loop=loop) self._ping_task = None @@ -94,7 +97,9 @@ def handle_write_timeout(self): def handle_read_timeout(self): pass - async def mqtt_subscribe(self, topics: List[Tuple[str,int]], packet_id: int) -> List[int]: + async def mqtt_subscribe( + self, topics: List[Tuple[str, int]], packet_id: int + ) -> List[int]: """ :param topics: array of topics [('$SYS/broker/uptime', QOS_1), ('$SYS/broker/load/#', QOS_2),] :return: diff --git a/amqtt/mqtt/protocol/handler.py b/amqtt/mqtt/protocol/handler.py index ca91711e..ddecede1 100644 --- a/amqtt/mqtt/protocol/handler.py +++ b/amqtt/mqtt/protocol/handler.py @@ -8,7 +8,7 @@ import itertools import asyncio from asyncio import InvalidStateError, AbstractEventLoop -from typing import Type, TypeVar +from typing import Type from amqtt.mqtt import packet_class from amqtt.mqtt.connack import ConnackPacket @@ -73,7 +73,10 @@ class ProtocolHandler: """ def __init__( - self, plugins_manager: PluginManager, session: Session = None, loop: AbstractEventLoop = None + self, + plugins_manager: PluginManager, + session: Session = None, + loop: AbstractEventLoop = None, ): self.logger = logging.getLogger(__name__) if session: @@ -193,7 +196,9 @@ async def _retry_deliveries(self): ) self.logger.debug("End messages delivery retries") - async def mqtt_publish(self, topic: str, data: bytes, qos: int, retain: bool, ack_timeout: int = None): + async def mqtt_publish( + self, topic: str, data: bytes, qos: int, retain: bool, ack_timeout: int = None + ): """ Sends a MQTT publish message and manages messages flows. This methods doesn't return until the message has been acknowledged by receiver or timeout occur diff --git a/amqtt/session.py b/amqtt/session.py index fab97cc0..21706b3b 100644 --- a/amqtt/session.py +++ b/amqtt/session.py @@ -36,7 +36,7 @@ class ApplicationMessage: "pubrel_packet", "pubcomp_packet", ) - + def __init__(self, packet_id: int, topic: str, qos: int, data: bytes, retain: bool): self.packet_id = packet_id """ Publish message `packet identifier `_""" @@ -130,8 +130,8 @@ class Session: capath = None cadata = None _packet_id: int = 0 - parent: int = 0 - + parent: int = 0 + def __init__(self): self._init_states() self.remote_address = None