Skip to content

Commit

Permalink
Merge branch 'reconnect'
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelarnauts committed Apr 2, 2024
2 parents 6d7145a + 4c35053 commit 48a5a83
Show file tree
Hide file tree
Showing 5 changed files with 902 additions and 828 deletions.
23 changes: 15 additions & 8 deletions aiocomfoconnect/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,10 @@ def sensor_callback(sensor, value):
print("Sending keepalive...")
# Use cmd_time_request as a keepalive since cmd_keepalive doesn't send back a reply we can wait for
await comfoconnect.cmd_time_request()

except (AioComfoConnectNotConnected, AioComfoConnectTimeout):
# Reconnect when connection has been dropped
try:
await comfoconnect.connect(uuid)
except AioComfoConnectTimeout:
_LOGGER.warning("Connection timed out. Retrying later...")
except AioComfoConnectNotConnected:
print("Got AioComfoConnectNotConnected")
except AioComfoConnectTimeout:
print("Got AioComfoConnectTimeout")

except KeyboardInterrupt:
pass
Expand Down Expand Up @@ -310,7 +307,17 @@ def sensor_callback(sensor_, value):
if follow:
try:
while True:
await asyncio.sleep(1)
# Wait for updates and send a keepalive every 30 seconds
await asyncio.sleep(30)

try:
print("Sending keepalive...")
# Use cmd_time_request as a keepalive since cmd_keepalive doesn't send back a reply we can wait for
await comfoconnect.cmd_time_request()
except AioComfoConnectNotConnected:
print("Got AioComfoConnectNotConnected")
except AioComfoConnectTimeout:
print("Got AioComfoConnectTimeout")

except KeyboardInterrupt:
pass
Expand Down
121 changes: 59 additions & 62 deletions aiocomfoconnect/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import asyncio
import logging
import struct
from asyncio import IncompleteReadError, StreamReader, StreamWriter
from asyncio import StreamReader, StreamWriter
from typing import Awaitable

from google.protobuf.message import DecodeError
Expand Down Expand Up @@ -71,7 +71,6 @@ def __init__(self, host: str, uuid: str, loop=None):
self._reference = None

self._event_bus: EventBus = None
self._read_task: asyncio.Task = None

self.__sensor_callback_fn: callable = None
self.__alarm_callback_fn: callable = None
Expand All @@ -89,43 +88,43 @@ def set_alarm_callback(self, callback: callable):
"""Set a callback to be called when an alarm is received."""
self.__alarm_callback_fn = callback

async def connect(self, uuid: str):
async def _connect(self, uuid: str):
"""Connect to the bridge."""
await self.disconnect()

_LOGGER.debug("Connecting to bridge %s", self.host)
try:
self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self.host, self.PORT), TIMEOUT)
except asyncio.TimeoutError as exc:
raise AioComfoConnectTimeout() from exc
_LOGGER.warning("Timeout while connecting to bridge %s", self.host)
raise AioComfoConnectTimeout from exc

self._reference = 1
self._local_uuid = uuid
self._event_bus = EventBus()

# We are connected, start the background task
self._read_task = self._loop.create_task(self._read_messages())
async def _read_messages():
while True:
try:
# Keep processing messages until we are disconnected or shutting down
await self._process_message()

_LOGGER.debug("Connected to bridge %s", self.host)
except asyncio.exceptions.CancelledError:
# We are shutting down. Return to stop the background task
return False

async def disconnect(self):
"""Disconnect from the bridge."""
_LOGGER.debug("Disconnecting from bridge %s", self.host)
except AioComfoConnectNotConnected:
# We have been disconnected
raise

if self._read_task:
# Cancel the background task
self._read_task.cancel()
read_task = self._loop.create_task(_read_messages())
_LOGGER.debug("Connected to bridge %s", self.host)

# Wait for background task to finish
try:
await self._read_task
except asyncio.CancelledError:
pass
return read_task

async def _disconnect(self):
"""Disconnect from the bridge."""
if self._writer:
self._writer.close()

_LOGGER.debug("Disconnected from bridge %s", self.host)
await self._writer.wait_closed()

def is_connected(self) -> bool:
"""Returns True if the bridge is connected."""
Expand All @@ -135,7 +134,7 @@ async def _send(self, request, request_type, params: dict = None, reply: bool =
"""Sends a command and wait for a response if the request is known to return a result."""
# Check if we are actually connected
if not self.is_connected():
raise AioComfoConnectNotConnected()
raise AioComfoConnectNotConnected

# Construct the message
cmd = zehnder_pb2.GatewayOperation() # pylint: disable=no-member
Expand All @@ -160,6 +159,7 @@ async def _send(self, request, request_type, params: dict = None, reply: bool =
# Send the message
_LOGGER.debug("TX %s", message)
self._writer.write(message.encode())
await self._writer.drain()

# Increase message reference for next message
self._reference += 1
Expand All @@ -168,6 +168,7 @@ async def _send(self, request, request_type, params: dict = None, reply: bool =
return await asyncio.wait_for(fut, TIMEOUT)
except asyncio.TimeoutError as exc:
_LOGGER.warning("Timeout while waiting for response from bridge")
await self._disconnect()
raise AioComfoConnectTimeout from exc

async def _read(self) -> Message:
Expand Down Expand Up @@ -206,55 +207,51 @@ async def _read(self) -> Message:

return message

async def _read_messages(self):
"""Receive a message from the bridge."""
while self._read_task.cancelled() is False:
try:
message = await self._read()

# pylint: disable=no-member
if message.cmd.type == zehnder_pb2.GatewayOperation.CnRpdoNotificationType:
if self.__sensor_callback_fn:
self.__sensor_callback_fn(message.msg.pdid, int.from_bytes(message.msg.data, byteorder="little", signed=True))
else:
_LOGGER.info("Unhandled CnRpdoNotificationType since no callback is registered.")
async def _process_message(self):
"""Process a message from the bridge."""
try:
message = await self._read()

elif message.cmd.type == zehnder_pb2.GatewayOperation.GatewayNotificationType:
_LOGGER.debug("Unhandled GatewayNotificationType")
# pylint: disable=no-member
if message.cmd.type == zehnder_pb2.GatewayOperation.CnRpdoNotificationType:
if self.__sensor_callback_fn:
self.__sensor_callback_fn(message.msg.pdid, int.from_bytes(message.msg.data, byteorder="little", signed=True))
else:
_LOGGER.info("Unhandled CnRpdoNotificationType since no callback is registered.")

elif message.cmd.type == zehnder_pb2.GatewayOperation.CnNodeNotificationType:
_LOGGER.debug("Unhandled CnNodeNotificationType")
elif message.cmd.type == zehnder_pb2.GatewayOperation.GatewayNotificationType:
_LOGGER.debug("Unhandled GatewayNotificationType")

elif message.cmd.type == zehnder_pb2.GatewayOperation.CnAlarmNotificationType:
if self.__alarm_callback_fn:
self.__alarm_callback_fn(message.msg.nodeId, message.msg)
else:
_LOGGER.info("Unhandled CnAlarmNotificationType since no callback is registered.")
elif message.cmd.type == zehnder_pb2.GatewayOperation.CnNodeNotificationType:
_LOGGER.debug("Unhandled CnNodeNotificationType")

elif message.cmd.type == zehnder_pb2.GatewayOperation.CloseSessionRequestType:
_LOGGER.info("The Bridge has asked us to close the connection.")
return # Stop the background task
elif message.cmd.type == zehnder_pb2.GatewayOperation.CnAlarmNotificationType:
if self.__alarm_callback_fn:
self.__alarm_callback_fn(message.msg.nodeId, message.msg)
else:
_LOGGER.info("Unhandled CnAlarmNotificationType since no callback is registered.")

elif message.cmd.reference:
# Emit to the event bus
self._event_bus.emit(message.cmd.reference, message.msg)
elif message.cmd.type == zehnder_pb2.GatewayOperation.CloseSessionRequestType:
_LOGGER.info("The Bridge has asked us to close the connection.")

else:
_LOGGER.warning("Unhandled message type %s: %s", message.cmd.type, message)
elif message.cmd.reference:
# Emit to the event bus
self._event_bus.emit(message.cmd.reference, message.msg)

except asyncio.exceptions.CancelledError:
return # Stop the background task
else:
_LOGGER.warning("Unhandled message type %s: %s", message.cmd.type, message)

except IncompleteReadError:
_LOGGER.info("The connection was closed.")
return # Stop the background task
except asyncio.exceptions.IncompleteReadError:
_LOGGER.info("The connection was closed.")
await self._disconnect()
raise AioComfoConnectNotConnected

except ComfoConnectError as exc:
if exc.message.cmd.reference:
self._event_bus.emit(exc.message.cmd.reference, exc)
except ComfoConnectError as exc:
if exc.message.cmd.reference:
self._event_bus.emit(exc.message.cmd.reference, exc)

except DecodeError as exc:
_LOGGER.error("Failed to decode message: %s", exc)
except DecodeError as exc:
_LOGGER.error("Failed to decode message: %s", exc)

def cmd_start_session(self, take_over: bool = False) -> Awaitable[Message]:
"""Starts the session on the device by logging in and optionally disconnecting an already existing session."""
Expand Down
71 changes: 58 additions & 13 deletions aiocomfoconnect/comfoconnect.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
""" ComfoConnect Bridge API abstraction """
from __future__ import annotations

import asyncio
import logging
from asyncio import Future
from typing import Callable, Dict, List, Literal

from aiocomfoconnect import Bridge
Expand All @@ -15,17 +17,21 @@
SUBUNIT_06,
SUBUNIT_07,
SUBUNIT_08,
PdoType,
UNIT_ERROR,
UNIT_SCHEDULE,
UNIT_TEMPHUMCONTROL,
UNIT_VENTILATIONCONFIG,
PdoType,
VentilationBalance,
VentilationMode,
VentilationSetting,
VentilationSpeed,
VentilationTemperatureProfile,
)
from aiocomfoconnect.exceptions import (
AioComfoConnectNotConnected,
AioComfoConnectTimeout,
)
from aiocomfoconnect.properties import Property
from aiocomfoconnect.sensors import Sensor
from aiocomfoconnect.util import bytearray_to_bits, bytestring, encode_pdo_value
Expand All @@ -50,31 +56,70 @@ def __init__(self, host: str, uuid: str, loop=None, sensor_callback=None, alarm_
self._sensors_values: Dict[int, any] = {}
self._sensor_hold = None

self._tasks = set()

def _unhold_sensors(self):
"""Unhold the sensors."""
_LOGGER.debug("Unholding sensors")
self._sensor_hold = None

# Emit the current cached values of the sensors, by now, they will have received a correct update.
# Emit the current cached values of the sensors, by now, they should have received a correct update.
for sensor_id, _ in self._sensors.items():
if self._sensors_values[sensor_id] is not None:
self._sensor_callback(sensor_id, self._sensors_values[sensor_id])

async def connect(self, uuid: str, start_session=True):
async def connect(self, uuid: str):
"""Connect to the bridge."""
await super().connect(uuid)
connected: Future = Future()

async def _reconnect_loop():
while True:
try:
# Connect to the bridge
read_task = await self._connect(uuid)

# Start session
await self.cmd_start_session(True)

# Wait for a specified amount of seconds to buffer sensor values.
# This is to work around a bug where the bridge sends invalid sensor values when connecting.
if self.sensor_delay:
_LOGGER.debug("Holding sensors for %s second(s)", self.sensor_delay)
self._sensors_values = {}
self._sensor_hold = self._loop.call_later(self.sensor_delay, self._unhold_sensors)

# Register the sensors again (in case we lost the connection)
for sensor in self._sensors.values():
await self.cmd_rpdo_request(sensor.id, sensor.type)

if not connected.done():
connected.set_result(True)

# Wait for the read task to finish or throw an exception
await read_task

if read_task.result() is False:
# We are shutting down.
return

except AioComfoConnectTimeout:
# Reconnect after 5 seconds when we could not connect
_LOGGER.info("Could not reconnect. Retrying after 5 seconds.")
await asyncio.sleep(5)

except AioComfoConnectNotConnected:
# Reconnect when connection has been dropped
_LOGGER.info("We got disconnected. Reconnecting.")
pass

if start_session:
await self.cmd_start_session(True)
reconnect_task = self._loop.create_task(_reconnect_loop())
self._tasks.add(reconnect_task)
reconnect_task.add_done_callback(self._tasks.discard)

if self.sensor_delay:
_LOGGER.debug("Holding sensors for %s second(s)", self.sensor_delay)
self._sensor_hold = self._loop.call_later(self.sensor_delay, self._unhold_sensors)
await connected

# Register the sensors again (in case we lost the connection)
self._sensors_values = {}
for sensor in self._sensors.values():
await self.cmd_rpdo_request(sensor.id, sensor.type)
async def disconnect(self):
await self._disconnect()

async def register_sensor(self, sensor: Sensor):
"""Register a sensor on the bridge."""
Expand Down
7 changes: 1 addition & 6 deletions aiocomfoconnect/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,7 @@

from dataclasses import dataclass

from .const import (
PdoType,
UNIT_NODE,
UNIT_NODECONFIGURATION,
UNIT_TEMPHUMCONTROL,
)
from .const import UNIT_NODE, UNIT_NODECONFIGURATION, UNIT_TEMPHUMCONTROL, PdoType


@dataclass
Expand Down
Loading

0 comments on commit 48a5a83

Please sign in to comment.