-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrun_asm.py
76 lines (64 loc) · 2.13 KB
/
run_asm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# This file is part of thymiodirect.
# Copyright 2020 ECOLE POLYTECHNIQUE FEDERALE DE LAUSANNE,
# Miniature Mobile Robots group, Switzerland
# Author: Yves Piguet
#
# SPDX-License-Identifier: BSD-3-Clause
# Send assembly program to Thymio
from thymiodirect import Connection
from thymiodirect.assembler import Assembler
import serial
import sys
import os
if __name__ == "__main__":
asm = None
use_tcp = False
debug = False
for arg in sys.argv[1:]:
if arg == "--tcp":
use_tcp = True
elif arg == "--debug":
debug = True
elif arg[0:2] == "--" or asm:
print(f"Usage: python3 {sys.argv[0]} [--tcp] [program.asm]")
exit(1)
else:
with open(arg, 'r') as file:
asm = file.read()
if asm is None:
asm = sys.stdin.read()
code_sent = False
async def on_connection_changed(node_id, connected):
if connected:
# assemble program
remote_node = th.remote_nodes[node_id]
a = Assembler(remote_node, asm)
bc = a.assemble()
# send bytecode and run it
th.set_bytecode(node_id, bc)
th.run(node_id)
global code_sent
code_sent = True
async def on_execution_state_changed(node_id, pc, flags):
if code_sent:
os._exit(0) # forced exit despite coroutines
def run_until_executed(th):
th.on_connection_changed = on_connection_changed
th.on_execution_state_changed = on_execution_state_changed
try:
th.run_forever()
except KeyboardInterrupt:
th.shutdown()
th.run_forever()
th.close()
try:
if not use_tcp:
# try to open serial connection
with Connection.serial(discover_rate=2, refreshing_rate=0.5, debug=debug) as th:
run_until_executed(th)
except serial.serialutil.SerialException:
use_tcp = True
if use_tcp:
# try TCP on default local port
with Connection.tcp(discover_rate=2, refreshing_rate=0.5, debug=debug) as th:
run_until_executed(th)