forked from cpwood/Pico-Go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathterminalExec.py
151 lines (124 loc) · 3.73 KB
/
terminalExec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import sys
import socket
import selectors
import types
import os
import threading
import time
import signal
isWindows = sys.platform == "win32"
if isWindows:
import msvcrt
else:
import termios
import fcntl
port = int(sys.argv[1]) if len(sys.argv) == 2 else 1337
ip = "127.0.0.1"
debug = False
sel = selectors.DefaultSelector()
socketDelay = 0.01
characterDelay = 0.05
clients = set()
clients_lock = threading.Lock()
def acceptWrapper(sock):
conn, addr = sock.accept() # Should be ready to read
log("Accepted connection from " + str(addr))
with clients_lock:
clients.add(conn)
conn.setblocking(False)
data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
events = selectors.EVENT_READ | selectors.EVENT_WRITE
sel.register(conn, events, data=data)
def serviceConnection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024) # Should be ready to read
if recv_data:
#data.outb += recv_data
boardInput(recv_data)
else:
log("Closing connection to " + str(data.addr))
sel.unregister(sock)
sock.close()
clients.remove(sock)
def getCharacterPosix():
fd = sys.stdin.fileno()
oldterm = termios.tcgetattr(fd)
newattr = termios.tcgetattr(fd)
newattr[3] = newattr[3] & ~termios.ICANON & ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, newattr)
oldflags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
try:
while True:
try:
c = sys.stdin.read(1)
yield c
time.sleep(characterDelay)
except IOError: pass
finally:
termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm)
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags)
def getCharacterWindows():
while True:
c = msvcrt.getch().decode("utf-8")
yield c
time.sleep(characterDelay)
def boardInput(data: bytes):
sys.stdout.write(data.decode("utf-8"))
sys.stdout.flush()
def userInput(ch: str):
if (ch != ""):
log("Received: " + ch)
# Posix sends \n only; Windows sends \r only.
# The board expects \r\n.
if ch == "\n" or ch == "\r":
ch = "\r\n"
with clients_lock:
for client in clients:
client.sendall(str.encode(ch))
def log(msg: str):
if debug:
print(msg)
def runServer():
log("Starting server...")
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((ip, port))
lsock.listen()
sel.register(lsock, selectors.EVENT_READ, data=None)
try:
while True:
events = sel.select(timeout=None)
for key, mask in events:
if key.data is None:
acceptWrapper(key.fileobj)
else:
serviceConnection(key, mask)
time.sleep(socketDelay)
except KeyboardInterrupt:
log("Caught keyboard interrupt: exiting!")
finally:
sel.close()
def listenForInput():
if isWindows:
while True:
for ch in getCharacterWindows():
sys.stdout.flush()
userInput(ch)
time.sleep(characterDelay)
else:
while True:
for ch in getCharacterPosix():
sys.stdout.flush()
userInput(ch)
time.sleep(characterDelay)
def handler(signum, frame):
userInput(str(chr(3)))
server = threading.Thread(target=runServer)
server.start()
inputMonitor = threading.Thread(target=listenForInput)
inputMonitor.start()
# Don't let Ctrl+C end this Python process: send it to the board instead!
signal.signal(signal.SIGINT, handler)
server.join()