-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmock_server.py
61 lines (47 loc) · 1.65 KB
/
mock_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import asyncio
import logging
import argparse
from mock_mcp import MockMCP, MockSystem
_LOGGER = logging.getLogger(__name__)
async def handle_client(reader, writer):
_LOGGER.info("Client connected")
loop = asyncio.get_running_loop()
async def async_write(data):
writer.write(data)
await writer.drain()
def handle_broadcast(data):
asyncio.run_coroutine_threadsafe(async_write(data), loop)
mcp.add_listener(handle_broadcast)
while True:
try:
request = await reader.readuntil(separator=b"\n")
except asyncio.exceptions.IncompleteReadError:
break
while len(request) > 0:
length, response = mcp.handle_input(request)
if response:
await async_write(response)
request = request[length:]
writer.close()
_LOGGER.info("Client disconnected")
async def run_server():
port = 9999
_LOGGER.info(f"Listening for connections on port {port}")
server = await asyncio.start_server(handle_client, port=port)
async with server:
await server.serve_forever()
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser("Emulate a LiteJet lighting system.")
parser.add_argument(
"--litejet48",
help="Emulate a LiteJet 48 (two LiteJet boards)",
action="store_const", dest="system", const=MockSystem.LITEJET_48
)
parser.add_argument(
"--oldlitejet",
help="Emulate an old LiteJet (sends load on/off event only)",
action="store_const", dest="system", const=MockSystem.LITEJET_OLD
)
args = parser.parse_args()
mcp = MockMCP(args.system or MockSystem.LITEJET)
asyncio.run(run_server())