-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmock_mcp.py
151 lines (132 loc) · 4.95 KB
/
mock_mcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import logging
from enum import Flag, auto
_LOGGER = logging.getLogger(__name__)
class MockSystem(Flag):
# Models (choose one)
MODEL_LITEJET = auto()
# Quirks (choose zero or more)
QUIRK_DUAL = auto()
QUIRK_OLD_LOAD_EVENT = auto()
# Standard combinations
LITEJET = MODEL_LITEJET
LITEJET_48 = MODEL_LITEJET | QUIRK_DUAL
LITEJET_OLD = MODEL_LITEJET | QUIRK_OLD_LOAD_EVENT
def is_dual(self):
return MockSystem.QUIRK_DUAL in self
def old_load_event(self):
return MockSystem.QUIRK_OLD_LOAD_EVENT in self
class MockMCP:
def __init__(self, system: MockSystem = MockSystem.LITEJET):
_LOGGER.info(f"System: {system}")
self._system = system
self._load_levels = {}
self._switch_pressed = {}
self._broadcast_receivers = []
self._other = None
self._prefix = ""
if system.is_dual():
self._prefix = "Alpha "
self._other = MockMCP(system & ~MockSystem.QUIRK_DUAL)
self._other._prefix = "Bravo "
self._other._other = self
def _broadcast(self, line: str):
b = line.encode("utf-8")
_LOGGER.info(f"bcst MCP: {b}")
for r in self._broadcast_receivers:
r(b)
def _broadcast_other(self, line: str):
if self._other:
self._other._broadcast(line)
def add_listener(self, r):
self._broadcast_receivers.append(r)
def set_load(self, number: int, level: int):
self._load_levels[number] = level
if self._system.old_load_event():
event = "F" if level == 0 else "N"
self._broadcast(f"{event}{number:03d}\r")
self._broadcast_other(f"{event}{number+40:03d}\r")
else:
self._broadcast(f"^K{number:03d}{level:02d}\r")
self._broadcast_other(f"^K{number+40:03d}{level:02d}\r")
def set_switch(self, number: int, pressed: bool):
if self._switch_pressed.get(number, False) == pressed:
return
self._switch_pressed[number] = pressed
if pressed:
event = "P"
else:
event = "R"
self._broadcast(f"{event}{number:03d}\r")
def get_load(self, number: int):
return self._load_levels.get(number, 0)
def get_switch_name(self, number: int):
return f"{self._prefix}Switch #{number}"
def get_load_name(self, number: int):
return f"{self._prefix}Load #{number}"
def get_scene_name(self, number: int):
return f"{self._prefix}Scene #{number}"
def handle_input(self, data: bytes):
_LOGGER.info(f" to MCP: {data}")
str = data.decode("utf-8")
# Skip until a command start marker
start_index = 0
while start_index < len(str) and str[start_index] != "+":
start_index += 1
if start_index != 0:
_LOGGER.debug(f"Skipped {start_index} bytes")
return start_index, None
command = str[1]
response = None
mcp = self
if command.islower():
if self._other:
mcp = self._other
command = command.upper()
if command in "ABCDEFIJKLM":
number = int(str[2:5])
if command == "A":
mcp.set_load(number, 99)
command_length = 5
elif command == "B":
mcp.set_load(number, 0)
command_length = 5
elif command == "C" or command == "D":
_LOGGER.warning("Scenes not supported")
elif command == "E":
level = int(str[5:7])
rate = int(str[7:9])
mcp.set_load(number, level)
command_length = 9
elif command == "F":
response = f"{mcp.get_load(number):02d}\r"
command_length = 5
elif command == "G":
_LOGGER.warning("Instant status not supported")
response = "000000000000000000000000000000000000000000000000\r"
command_length = 2
elif command == "H":
_LOGGER.warning("Instant status not supported")
response = "00000000000000000000000000000000000000 0000000000000000000000000000000000000000000000000000000000\r"
command_length = 2
elif command == "I":
mcp.set_switch(number, True)
command_length = 5
elif command == "J":
mcp.set_switch(number, False)
command_length = 5
elif command == "K":
response = f"{mcp.get_switch_name(number)}\r"
command_length = 5
elif command == "L":
response = f"{mcp.get_load_name(number)}\r"
command_length = 5
elif command == "M":
response = f"{mcp.get_scene_name(number)}\r"
command_length = 5
else:
_LOGGER.warning(f"Unknown command '{command}'")
command_length = 2
if response:
response = response.encode("utf-8")
_LOGGER.info(f"from MCP: {response}")
return command_length, response