Skip to content

Commit

Permalink
reformat with black
Browse files Browse the repository at this point in the history
  • Loading branch information
rob miller committed Apr 25, 2022
1 parent 5dedd1d commit dc34c58
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 24 deletions.
38 changes: 30 additions & 8 deletions amqtt/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ class RetainedApplicationMessage:

__slots__ = ("source_session", "topic", "data", "qos")

def __init__(self, source_session: Optional[Session], topic: str, data: bytes, qos: int = None):
def __init__(
self,
source_session: Optional[Session],
topic: str,
data: bytes,
qos: int = None,
):
self.source_session = source_session
self.topic = topic
self.data = data
Expand Down Expand Up @@ -126,7 +132,9 @@ def __init__(self, broker: "Broker") -> None:
self.config = None
self._broker_instance = broker

async def broadcast_message(self, topic: str, data: bytes, qos: Optional[int] = None):
async def broadcast_message(
self, topic: str, data: bytes, qos: Optional[int] = None
):
await self._broker_instance.internal_message_broadcast(topic, data, qos)

def retain_message(self, topic_name: str, data: bytes, qos: Optional[int] = None):
Expand Down Expand Up @@ -170,8 +178,10 @@ class Broker:
_sessions: Dict[str, Tuple[Session, BrokerProtocolHandler]]
_subscriptions: Dict[str, Tuple[Session, int]]
_retained_messages: Dict[str, RetainedApplicationMessage]

def __init__(self, config=None, loop: AbstractEventLoop = None, plugin_namespace: str = None):

def __init__(
self, config=None, loop: AbstractEventLoop = None, plugin_namespace: str = None
):
self.logger = logging.getLogger(__name__)
self.config = _defaults
if config is not None:
Expand Down Expand Up @@ -388,7 +398,9 @@ async def shutdown(self):
await self.plugins_manager.fire_event(EVENT_BROKER_POST_SHUTDOWN)
self.transitions.stopping_success()

async def internal_message_broadcast(self, topic: str, data: bytes, qos: Optional[int] = None):
async def internal_message_broadcast(
self, topic: str, data: bytes, qos: Optional[int] = None
):
return await self._broadcast_message(None, topic, data)

async def ws_connected(self, websocket, uri, listener_name):
Expand Down Expand Up @@ -659,7 +671,9 @@ async def client_connected(
self.logger.debug("%s Client disconnected" % client_session.client_id)
server.release_connection()

def _init_handler(self, session: Session, reader: Type[ReaderAdapter], writer: Type[WriterAdapter]):
def _init_handler(
self, session: Session, reader: Type[ReaderAdapter], writer: Type[WriterAdapter]
):
"""
Create a BrokerProtocolHandler and attach to a session
:return:
Expand Down Expand Up @@ -948,7 +962,13 @@ async def _broadcast_loop(self):
await asyncio.wait(running_tasks)
raise # reraise per CancelledError semantics

async def _broadcast_message(self, session: Optional[Session], topic: str, data: bytes, force_qos: Optional[bool] = None):
async def _broadcast_message(
self,
session: Optional[Session],
topic: str,
data: bytes,
force_qos: Optional[bool] = None,
):
broadcast = {"session": session, "topic": topic, "data": data}
if force_qos:
broadcast["qos"] = force_qos
Expand Down Expand Up @@ -976,7 +996,9 @@ async def publish_session_retained_messages(self, session: Session):
if publish_tasks:
await asyncio.wait(publish_tasks)

async def publish_retained_messages_for_subscription(self, subscription, session: Session):
async def publish_retained_messages_for_subscription(
self, subscription, session: Session
):
self.logger.debug(
"Begin broadcasting messages retained due to subscription on '%s' from %s"
% (subscription[0], format_client_message(session=session))
Expand Down
2 changes: 1 addition & 1 deletion amqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def get_retain_and_qos():
)

@mqtt_connected
async def subscribe(self, topics: List[Tuple[str,int]]):
async def subscribe(self, topics: List[Tuple[str, int]]):
"""
Subscribe to some topics.
Expand Down
10 changes: 7 additions & 3 deletions amqtt/mqtt/connack.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class ConnackVariableHeader(MQTTVariableHeader):

__slots__ = ("session_parent", "return_code")

def __init__(self, session_parent: Optional[int] = None, return_code: Optional[int] = None):
def __init__(
self, session_parent: Optional[int] = None, return_code: Optional[int] = None
):
super().__init__()
self.session_parent = session_parent
self.return_code = return_code
Expand Down Expand Up @@ -76,7 +78,7 @@ def __init__(
self,
fixed: Optional[MQTTFixedHeader] = None,
variable_header: Optional[ConnackVariableHeader] = None,
payload = None,
payload=None,
):
if fixed is None:
header = MQTTFixedHeader(CONNACK, 0x00)
Expand All @@ -92,7 +94,9 @@ def __init__(
self.payload = None

@classmethod
def build(cls, session_parent: int = None, return_code: int = None) -> ConnackPacket:
def build(
cls, session_parent: int = None, return_code: int = None
) -> ConnackPacket:
v_header = ConnackVariableHeader(session_parent, return_code)
packet = ConnackPacket(variable_header=v_header)
return packet
10 changes: 8 additions & 2 deletions amqtt/mqtt/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ class ConnectVariableHeader(MQTTVariableHeader):
RESERVED_FLAG: int = 0x01

def __init__(
self, connect_flags: int = 0x00, keep_alive: int = 0, proto_name: str = "MQTT", proto_level: int = 0x04
self,
connect_flags: int = 0x00,
keep_alive: int = 0,
proto_name: str = "MQTT",
proto_level: int = 0x04,
):
super().__init__()
self.proto_name = proto_name
Expand Down Expand Up @@ -118,7 +122,9 @@ def will_qos(self, val: int):
self.flags |= val << 3

@classmethod
async def from_stream(cls, reader: ReaderAdapter, fixed_header: MQTTFixedHeader) -> ConnectVariableHeader:
async def from_stream(
cls, reader: ReaderAdapter, fixed_header: MQTTFixedHeader
) -> ConnectVariableHeader:
# protocol name
protocol_name = await decode_string(reader)

Expand Down
11 changes: 9 additions & 2 deletions amqtt/mqtt/protocol/broker_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@

class BrokerProtocolHandler(ProtocolHandler):
def __init__(
self, plugins_manager: PluginManager, session: Session = None, loop: AbstractEventLoop = None
self,
plugins_manager: PluginManager,
session: Session = None,
loop: AbstractEventLoop = None,
):
super().__init__(plugins_manager, session, loop)
self._disconnect_waiter = None
Expand Down Expand Up @@ -121,7 +124,11 @@ async def mqtt_connack_authorize(self, authorize: bool):

@classmethod
async def init_from_connect(
cls, reader: ReaderAdapter, writer: WriterAdapter, plugins_manager, loop: AbstractEventLoop = None
cls,
reader: ReaderAdapter,
writer: WriterAdapter,
plugins_manager,
loop: AbstractEventLoop = None,
) -> Tuple[BrokerProtocolHandler, Session]:
"""
Expand Down
9 changes: 7 additions & 2 deletions amqtt/mqtt/protocol/client_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

class ClientProtocolHandler(ProtocolHandler):
def __init__(
self, plugins_manager: PluginManager, session: Session = None, loop: AbstractEventLoop = None
self,
plugins_manager: PluginManager,
session: Session = None,
loop: AbstractEventLoop = None,
):
super().__init__(plugins_manager, session, loop=loop)
self._ping_task = None
Expand Down Expand Up @@ -94,7 +97,9 @@ def handle_write_timeout(self):
def handle_read_timeout(self):
pass

async def mqtt_subscribe(self, topics: List[Tuple[str,int]], packet_id: int) -> List[int]:
async def mqtt_subscribe(
self, topics: List[Tuple[str, int]], packet_id: int
) -> List[int]:
"""
:param topics: array of topics [('$SYS/broker/uptime', QOS_1), ('$SYS/broker/load/#', QOS_2),]
:return:
Expand Down
11 changes: 8 additions & 3 deletions amqtt/mqtt/protocol/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import itertools
import asyncio
from asyncio import InvalidStateError, AbstractEventLoop
from typing import Type, TypeVar
from typing import Type

from amqtt.mqtt import packet_class
from amqtt.mqtt.connack import ConnackPacket
Expand Down Expand Up @@ -73,7 +73,10 @@ class ProtocolHandler:
"""

def __init__(
self, plugins_manager: PluginManager, session: Session = None, loop: AbstractEventLoop = None
self,
plugins_manager: PluginManager,
session: Session = None,
loop: AbstractEventLoop = None,
):
self.logger = logging.getLogger(__name__)
if session:
Expand Down Expand Up @@ -193,7 +196,9 @@ async def _retry_deliveries(self):
)
self.logger.debug("End messages delivery retries")

async def mqtt_publish(self, topic: str, data: bytes, qos: int, retain: bool, ack_timeout: int = None):
async def mqtt_publish(
self, topic: str, data: bytes, qos: int, retain: bool, ack_timeout: int = None
):
"""
Sends a MQTT publish message and manages messages flows.
This methods doesn't return until the message has been acknowledged by receiver or timeout occur
Expand Down
6 changes: 3 additions & 3 deletions amqtt/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ApplicationMessage:
"pubrel_packet",
"pubcomp_packet",
)

def __init__(self, packet_id: int, topic: str, qos: int, data: bytes, retain: bool):
self.packet_id = packet_id
""" Publish message `packet identifier <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718025>`_"""
Expand Down Expand Up @@ -130,8 +130,8 @@ class Session:
capath = None
cadata = None
_packet_id: int = 0
parent: int = 0
parent: int = 0

def __init__(self):
self._init_states()
self.remote_address = None
Expand Down

0 comments on commit dc34c58

Please sign in to comment.