-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathUNO.py
77 lines (66 loc) · 2.61 KB
/
UNO.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# -*- coding:utf-8 -*-
import threading
import Queue
import serial
import os
import time
import platform
from unospace import returned_config
from unospace.tasks import discover_people
from unospace.tasks import blaze
from unospace.tasks import shake
def getUNOData(readable_instruction_Queue,unoser):
error_time = 0
while 1:
try:
readable_instruction = unoser.readline()
#print "READ:"+readable_instruction
#print len(readable_instruction)
if len(readable_instruction)>0:
#print "T"
readable_instruction_Queue.put(readable_instruction)
except:
if error_time >= 5:
break
else:
time.sleep(2)
continue
class UNO(object):
def __init__(self):
if platform.platform().lower().find('windows')!=-1:
port = 'com6'
else:
port='/dev/ttyACM0'
self.unoser = serial.Serial(port,9600,timeout = 1)
self.unoser.flushInput()#flush input buffer, discarding all its contents
self.unoser.flushOutput()#flush output buffer, aborting current output#and discard all that is in buffer
self.readable_instruction_Queue = Queue.Queue()
threads=[]
t = threading.Thread(target=getUNOData,args=(self.readable_instruction_Queue,self.unoser,))
threads.append(t)
for t in threads:
t.setDaemon(True)
t.start()
self.returned_config = returned_config.main()
def _find_in_config(self, instruction):
for i in xrange(0,len(self.returned_config)):
if self.returned_config[i][0] == instruction:
return self.returned_config[i]
return []
def monitor(self,ser):
while not self.readable_instruction_Queue.empty():
readable_instruction = self.readable_instruction_Queue.get()
#print "show:"+readable_instruction
try:
nonparam_readable_instruction,parameters = readable_instruction.split('-')
except ValueError:
continue
# 这里必须保证UNO返回的可读指令有且只有一个-
config_detial = self._find_in_config(nonparam_readable_instruction)
#例子:['shake', 'play,191,$', 'shake']
#print config_detial
if config_detial:
#print config_detial[2]+".main"+"(ser, '"+config_detial[1]+"', '"+parameters.strip()+"')"
eval(config_detial[2]+".main"+"(ser, '"+config_detial[1]+"', '"+parameters.strip()+"')")
def execute(self,ser,unotask):
pass