Skip to content

Commit

Permalink
Fix old-style load events missing parameter. Also fixed that exceptio…
Browse files Browse the repository at this point in the history
…n in it could cause the reader loop to silently stall.
  • Loading branch information
joncar committed Sep 29, 2024
1 parent 2b94ecb commit 14f169b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 27 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2016-2023 Jon Caruana
Copyright (c) 2016-2024 Jon Caruana

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
36 changes: 28 additions & 8 deletions mock_mcp.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,39 @@
import logging
from enum import Enum
from enum import Flag, auto

_LOGGER = logging.getLogger(__name__)

class MockSystem(Enum):
LITEJET = 1
LITEJET_48 = 2
class MockSystem(Flag):
# Models (choose one)
MODEL_LITEJET = auto()

# Quirks (choose zero or more)
QUIRK_DUAL = auto()
QUIRK_OLD_LOAD_EVENT = auto()

# Standard combinations
LITEJET = MODEL_LITEJET
LITEJET_48 = MODEL_LITEJET | QUIRK_DUAL
LITEJET_OLD = MODEL_LITEJET | QUIRK_OLD_LOAD_EVENT

def is_dual(self):
return MockSystem.QUIRK_DUAL in self
def old_load_event(self):
return MockSystem.QUIRK_OLD_LOAD_EVENT in self

class MockMCP:
def __init__(self, system: MockSystem = MockSystem.LITEJET):
_LOGGER.info(f"System: {system}")
self._system = system
self._load_levels = {}
self._switch_pressed = {}
self._broadcast_receivers = []
self._other = None
self._prefix = ""

if system == MockSystem.LITEJET_48:
if system.is_dual():
self._prefix = "Alpha "
self._other = MockMCP(MockSystem.LITEJET)
self._other = MockMCP(system & ~MockSystem.QUIRK_DUAL)
self._other._prefix = "Bravo "
self._other._other = self

Expand All @@ -37,8 +52,13 @@ def add_listener(self, r):

def set_load(self, number: int, level: int):
self._load_levels[number] = level
self._broadcast(f"^K{number:03d}{level:02d}\r")
self._broadcast_other(f"^K{number+40:03d}{level:02d}\r")
if self._system.old_load_event():
event = "F" if level == 0 else "N"
self._broadcast(f"{event}{number:03d}\r")
self._broadcast_other(f"{event}{number+40:03d}\r")
else:
self._broadcast(f"^K{number:03d}{level:02d}\r")
self._broadcast_other(f"^K{number+40:03d}{level:02d}\r")

def set_switch(self, number: int, pressed: bool):
if self._switch_pressed.get(number, False) == pressed:
Expand Down
9 changes: 7 additions & 2 deletions mock_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ async def run_server():
parser.add_argument(
"--litejet48",
help="Emulate a LiteJet 48 (two LiteJet boards)",
action="store_true"
action="store_const", dest="system", const=MockSystem.LITEJET_48
)
parser.add_argument(
"--oldlitejet",
help="Emulate an old LiteJet (sends load on/off event only)",
action="store_const", dest="system", const=MockSystem.LITEJET_OLD
)
args = parser.parse_args()

mcp = MockMCP(MockSystem.LITEJET_48 if args.litejet48 else MockSystem.LITEJET)
mcp = MockMCP(args.system or MockSystem.LITEJET)

asyncio.run(run_server())
39 changes: 23 additions & 16 deletions pylitejet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,25 +268,32 @@ async def _reader_impl(self):
while self._reader_active:
try:
line = await self._adapter.read()
except:
except Exception as exc:
_LOGGER.debug(f"Read error: {exc}")
await asyncio.sleep(5)
continue

line = line[0:-1].decode("utf-8")

_LOGGER.debug(f'Read "{line}" ({len(line)})')
if len(line) == 4 and (line[0] == "P" or line[0] == "R"):
self._notify_event(line)
elif len(line) == 4 and (line[0] == "F" or line[0] == "N"):
self._notify_event(line)
elif len(line) == 7 and line[0] == "^" and line[1] == "K":
new_level = line[5:7]
_LOGGER.debug("Dim event: '" + line[2:5] + "' '" + new_level + "'")
event_name = "F" if new_level == "00" else "N"
self._notify_event(event_name + line[2:5], int(new_level))
else:
self._recv_line = line
self._recv_event.set()
try:
line = line[0:-1].decode("utf-8")

_LOGGER.debug(f'Read "{line}" ({len(line)})')
if len(line) == 4 and (line[0] == "P" or line[0] == "R"):
# Switch press/release event
self._notify_event(line)
elif len(line) == 4 and (line[0] == "F" or line[0] == "N"):
# Load on/off event
self._notify_event(line, None)
elif len(line) == 7 and line[0] == "^" and line[1] == "K":
# Dimming event
new_level = line[5:7]
event_name = "F" if new_level == "00" else "N"
self._notify_event(event_name + line[2:5], int(new_level))
else:
# Possible command response
self._recv_line = line
self._recv_event.set()
except Exception as exc:
_LOGGER.error(f"While handling '{line}': {exc}")

async def close(self):
self._open = False
Expand Down

0 comments on commit 14f169b

Please sign in to comment.