diff --git a/LICENSE b/LICENSE index df34e5e..3669087 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2016-2023 Jon Caruana +Copyright (c) 2016-2024 Jon Caruana Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/mock_mcp.py b/mock_mcp.py index a34da31..bb0032b 100644 --- a/mock_mcp.py +++ b/mock_mcp.py @@ -1,14 +1,29 @@ import logging -from enum import Enum +from enum import Flag, auto _LOGGER = logging.getLogger(__name__) -class MockSystem(Enum): - LITEJET = 1 - LITEJET_48 = 2 +class MockSystem(Flag): + # Models (choose one) + MODEL_LITEJET = auto() + + # Quirks (choose zero or more) + QUIRK_DUAL = auto() + QUIRK_OLD_LOAD_EVENT = auto() + + # Standard combinations + LITEJET = MODEL_LITEJET + LITEJET_48 = MODEL_LITEJET | QUIRK_DUAL + LITEJET_OLD = MODEL_LITEJET | QUIRK_OLD_LOAD_EVENT + + def is_dual(self): + return MockSystem.QUIRK_DUAL in self + def old_load_event(self): + return MockSystem.QUIRK_OLD_LOAD_EVENT in self class MockMCP: def __init__(self, system: MockSystem = MockSystem.LITEJET): + _LOGGER.info(f"System: {system}") self._system = system self._load_levels = {} self._switch_pressed = {} @@ -16,9 +31,9 @@ def __init__(self, system: MockSystem = MockSystem.LITEJET): self._other = None self._prefix = "" - if system == MockSystem.LITEJET_48: + if system.is_dual(): self._prefix = "Alpha " - self._other = MockMCP(MockSystem.LITEJET) + self._other = MockMCP(system & ~MockSystem.QUIRK_DUAL) self._other._prefix = "Bravo " self._other._other = self @@ -37,8 +52,13 @@ def add_listener(self, r): def set_load(self, number: int, level: int): self._load_levels[number] = level - self._broadcast(f"^K{number:03d}{level:02d}\r") - self._broadcast_other(f"^K{number+40:03d}{level:02d}\r") + if self._system.old_load_event(): + event = "F" if level == 0 else "N" + self._broadcast(f"{event}{number:03d}\r") + self._broadcast_other(f"{event}{number+40:03d}\r") + else: + self._broadcast(f"^K{number:03d}{level:02d}\r") + self._broadcast_other(f"^K{number+40:03d}{level:02d}\r") def set_switch(self, number: int, pressed: bool): if self._switch_pressed.get(number, False) == pressed: diff --git a/mock_server.py b/mock_server.py index 3f57195..b52616f 100644 --- a/mock_server.py +++ b/mock_server.py @@ -47,10 +47,15 @@ async def run_server(): parser.add_argument( "--litejet48", help="Emulate a LiteJet 48 (two LiteJet boards)", - action="store_true" + action="store_const", dest="system", const=MockSystem.LITEJET_48 +) +parser.add_argument( + "--oldlitejet", + help="Emulate an old LiteJet (sends load on/off event only)", + action="store_const", dest="system", const=MockSystem.LITEJET_OLD ) args = parser.parse_args() -mcp = MockMCP(MockSystem.LITEJET_48 if args.litejet48 else MockSystem.LITEJET) +mcp = MockMCP(args.system or MockSystem.LITEJET) asyncio.run(run_server()) diff --git a/pylitejet/__init__.py b/pylitejet/__init__.py index 35f68ba..b633005 100644 --- a/pylitejet/__init__.py +++ b/pylitejet/__init__.py @@ -268,25 +268,32 @@ async def _reader_impl(self): while self._reader_active: try: line = await self._adapter.read() - except: + except Exception as exc: + _LOGGER.debug(f"Read error: {exc}") await asyncio.sleep(5) continue - line = line[0:-1].decode("utf-8") - - _LOGGER.debug(f'Read "{line}" ({len(line)})') - if len(line) == 4 and (line[0] == "P" or line[0] == "R"): - self._notify_event(line) - elif len(line) == 4 and (line[0] == "F" or line[0] == "N"): - self._notify_event(line) - elif len(line) == 7 and line[0] == "^" and line[1] == "K": - new_level = line[5:7] - _LOGGER.debug("Dim event: '" + line[2:5] + "' '" + new_level + "'") - event_name = "F" if new_level == "00" else "N" - self._notify_event(event_name + line[2:5], int(new_level)) - else: - self._recv_line = line - self._recv_event.set() + try: + line = line[0:-1].decode("utf-8") + + _LOGGER.debug(f'Read "{line}" ({len(line)})') + if len(line) == 4 and (line[0] == "P" or line[0] == "R"): + # Switch press/release event + self._notify_event(line) + elif len(line) == 4 and (line[0] == "F" or line[0] == "N"): + # Load on/off event + self._notify_event(line, None) + elif len(line) == 7 and line[0] == "^" and line[1] == "K": + # Dimming event + new_level = line[5:7] + event_name = "F" if new_level == "00" else "N" + self._notify_event(event_name + line[2:5], int(new_level)) + else: + # Possible command response + self._recv_line = line + self._recv_event.set() + except Exception as exc: + _LOGGER.error(f"While handling '{line}': {exc}") async def close(self): self._open = False