forked from rstager/ARoboCarController
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimulator.py
62 lines (57 loc) · 1.87 KB
/
simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import pickle
import os
import tempfile
# This class manages a connection to the RoboCar simulator
class Simulator:
def connect(self,confighook=None):
tmpdir=tempfile.gettempdir()
state_filename=os.path.join(tmpdir,"sim_state")
cmd_filename=os.path.join(tmpdir,"sim_cmd")
if not os.path.exists(state_filename):
os.mkfifo(state_filename)
if not os.path.exists(cmd_filename):
os.mkfifo(cmd_filename)
print("Connecting to server")
self.fstate=open(state_filename,"rb")
self.fcmd=open(cmd_filename,"wb")
print("Connection opened")
config = pickle.load(self.fstate)
if confighook != None:
config=confighook(config)
pickle.dump(config,self.fcmd)
self.fcmd.flush()
return config
def get_state(self):
try:
return pickle.load(self.fstate)
except EOFError:
#print("Connection closed")
return None
def send_cmd(self,command):
try:
pickle.dump(command, self.fcmd)
self.fcmd.flush()
except EOFError:
print("Connection closed")
self.disconnect()
return None
def disconnect(self):
self.fstate.close()
self.fcmd.close()
def reset(self):
for i in range(10):
print("pausing")
tmp=self.get_state()
self.send_cmd({"steering": 0, "throttle": 0})
print("send reset")
tmp = self.get_state()
self.send_cmd({"command": "reset"})
tmp = self.get_state()
self.send_cmd({"steering": 0, "throttle": 0})
tmp = self.get_state()
self.send_cmd({"command": "reset"})
for i in range(10):
print("pausing")
tmp=self.get_state()
self.send_cmd({"steering": 0, "throttle": 0})
print("Done")