diff --git a/amqtt/broker.py b/amqtt/broker.py index 4d2442e0..c91a9af4 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -65,15 +65,11 @@ def __init__(self, source_session, topic, data, qos=None): class Server: - def __init__(self, listener_name, server_instance, max_connections=-1, loop=None): + def __init__(self, listener_name, server_instance, max_connections=-1): self.logger = logging.getLogger(__name__) self.instance = server_instance self.conn_count = 0 self.listener_name = listener_name - if loop is not None: - self._loop = loop - else: - self._loop = asyncio.get_event_loop() self.max_connections = max_connections if self.max_connections > 0: @@ -318,7 +314,7 @@ async def start(self) -> None: ssl=sc, ) self._servers[listener_name] = Server( - listener_name, instance, max_connections, self._loop + listener_name, instance, max_connections ) elif listener["type"] == "ws": cb_partial = partial(self.ws_connected, listener_name=listener_name) @@ -327,11 +323,10 @@ async def start(self) -> None: address, port, ssl=sc, - loop=self._loop, subprotocols=["mqtt"], ) self._servers[listener_name] = Server( - listener_name, instance, max_connections, self._loop + listener_name, instance, max_connections ) self.logger.info( diff --git a/amqtt/client.py b/amqtt/client.py index 013cdb1c..d14cb01a 100644 --- a/amqtt/client.py +++ b/amqtt/client.py @@ -95,11 +95,10 @@ class MQTTClient: :param client_id: MQTT client ID to use when connecting to the broker. If none, it will generated randomly by :func:`amqtt.utils.gen_client_id` :param config: Client configuration - :param loop: asynio loop to use :return: class instance """ - def __init__(self, client_id=None, config=None, loop=None): + def __init__(self, client_id=None, config=None): self.logger = logging.getLogger(__name__) self.config = copy.deepcopy(_defaults) if config is not None: @@ -112,10 +111,6 @@ def __init__(self, client_id=None, config=None, loop=None): self.client_id = gen_client_id() self.logger.debug("Using generated client ID : %s" % self.client_id) - if loop is not None: - self._loop = loop - else: - self._loop = asyncio.get_event_loop() self.session = None self._handler = None self._disconnect_task = None @@ -450,7 +445,6 @@ async def _connect_coro(self): websocket = await websockets.connect( self.session.broker_uri, subprotocols=["mqtt"], - loop=self._loop, extra_headers=self.extra_headers, **kwargs ) diff --git a/amqtt/mqtt/protocol/broker_handler.py b/amqtt/mqtt/protocol/broker_handler.py index 1efc7891..f7f6c5ad 100644 --- a/amqtt/mqtt/protocol/broker_handler.py +++ b/amqtt/mqtt/protocol/broker_handler.py @@ -191,7 +191,7 @@ async def init_from_connect( await writer.close() raise MQTTException(error_msg) - incoming_session = Session(loop) + incoming_session = Session() incoming_session.client_id = connect.client_id incoming_session.clean_session = connect.clean_session_flag incoming_session.will_flag = connect.will_flag diff --git a/amqtt/scripts/pub_script.py b/amqtt/scripts/pub_script.py index 632fa6ce..6597471b 100644 --- a/amqtt/scripts/pub_script.py +++ b/amqtt/scripts/pub_script.py @@ -177,7 +177,7 @@ def main(*args, **kwargs): config["will"]["qos"] = int(arguments["--will-qos"]) config["will"]["retain"] = arguments["--will-retain"] - client = MQTTClient(client_id=client_id, config=config, loop=loop) + client = MQTTClient(client_id=client_id, config=config) loop.run_until_complete(do_pub(client, arguments)) loop.close() diff --git a/amqtt/scripts/sub_script.py b/amqtt/scripts/sub_script.py index 456f3ceb..0ea89a6c 100644 --- a/amqtt/scripts/sub_script.py +++ b/amqtt/scripts/sub_script.py @@ -159,7 +159,7 @@ def main(*args, **kwargs): config["will"]["qos"] = int(arguments["--will-qos"]) config["will"]["retain"] = arguments["--will-retain"] - client = MQTTClient(client_id=client_id, config=config, loop=loop) + client = MQTTClient(client_id=client_id, config=config) loop.run_until_complete(do_sub(client, arguments)) loop.close() diff --git a/amqtt/session.py b/amqtt/session.py index d342773b..7213e223 100644 --- a/amqtt/session.py +++ b/amqtt/session.py @@ -1,7 +1,6 @@ # Copyright (c) 2015 Nicolas JOUANIN # # See the file license.txt for copying permission. -import asyncio from transitions import Machine from asyncio import Queue from collections import OrderedDict @@ -106,7 +105,7 @@ def __init__(self, packet_id, topic, qos, data, retain): class Session: states = ["new", "connected", "disconnected"] - def __init__(self, loop=None): + def __init__(self): self._init_states() self.remote_address = None self.remote_port = None @@ -127,10 +126,6 @@ def __init__(self, loop=None): self.cadata = None self._packet_id = 0 self.parent = 0 - if loop is not None: - self._loop = loop - else: - self._loop = asyncio.get_event_loop() # Used to store outgoing ApplicationMessage while publish protocol flows self.inflight_out = OrderedDict()