-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkitchen_eval.py
271 lines (205 loc) · 8.85 KB
/
kitchen_eval.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# eval.py
from datetime import datetime
import os
import random
import logging
from pathlib import Path
from time import sleep
import hydra
import numpy as np
import imageio
import optree
from omegaconf import DictConfig
from src.kitchen import kitchen_dataset_obs_transforms
from src.utils.non_blocking_keypress import NonBlockingKeyPress
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EvaluationApp:
def __init__(self, agent, env, tasks, task_map=None):
# Create output folder
self.output_dir = Path(hydra.core.hydra_config.HydraConfig.get().runtime.output_dir)
self.tasks = tasks
# Create environment
self.env = env
assert self.env.connect()
self.latest_obs, reset_info = self.env.reset()
if task_map is not None:
self.task_map = task_map
else:
self.task_map = lambda x: x
self.agent = agent
def _reset_robot(self):
self.latest_obs, _ = self.env.reset()
logger.info(f"Reset the robot")
def _reset(self, reset_robot=True):
# Reset robot
if reset_robot:
self._reset_robot()
self.latest_obs_timestamp = datetime.now()
logger.info(f"Reset the agent")
self.agent.reset(self.instruction)
# Reset storage
self.obs_timestamps = []
self.action_timestamps = []
self.joint_positions = []
self.joint_velocities = []
self.ee_positions = []
self.ee_velocities = []
self.gripper_widths = []
self.actions = []
self.images = {
"primary_camera": [],
"secondary_camera": []
}
def _inference_step(self):
assert self.instruction is not None
std_obs = self.env.to_standardized_obs(self.latest_obs)
tr_obs = kitchen_dataset_obs_transforms(std_obs)
self.tr_obs = tr_obs
raw_actions = self.agent(tr_obs)
# Record data
self.latest_action = raw_actions
self.latest_action_timestamp = datetime.now()
# 4) Postprocess (de-normalize, handle orientation, sticky gripper, etc.)
processed_actions = self.env.from_standardized_action(raw_actions)
# 5) Step the environment
self.latest_obs, reward, done, truncated, info = self.env.step(processed_actions)
# Record time
self.latest_obs_timestamp = datetime.now()
def _init_storage(self, task_name: str, subtask_number: int, task_i: int, subtask_name: str):
# Create task and subtask folder
subtask_dir = self.output_dir / task_name / str(task_i + 200) / (subtask_name + "_" + str(subtask_number))
subtask_dir.mkdir(parents=True, exist_ok=True)
self.subtask_dir = subtask_dir
# Clean collected data
self.observations = {}
def _save_step(self):
# Save timestamps
self.obs_timestamps.append(self.latest_obs_timestamp)
self.action_timestamps.append(self.latest_action_timestamp)
# Save trajectories with values directly from the robot
self.joint_positions.append(self.latest_obs["joint_pos"])
# self.joint_velocities.append(self.latest_obs["joint_vel"])
# self.ee_positions.append(self.latest_obs["ee_pos"])
# self.ee_velocities.append(self.latest_obs["ee_vel"])
# self.gripper_widths.append(self.latest_obs["gripper_width"])
# Save the model's output
self.actions.append(self.latest_action)
# Save video
self.images["primary_camera"].append(self.tr_obs["primary_camera"])
self.images["secondary_camera"].append(self.tr_obs["secondary_camera"])
def _save_subtask(self, success):
assert self.subtask_dir.exists()
# Store timestamps
obs_timestamps = np.vstack([ts.isoformat() for ts in self.obs_timestamps])
np.save(self.subtask_dir / "obs_timestamps.npy", obs_timestamps)
action_timestamps = np.vstack([ts.isoformat() for ts in self.action_timestamps])
np.save(self.subtask_dir / "action_timestamps.npy", action_timestamps)
# Store trajectories with values directly from the robot
joint_positions = np.vstack(self.joint_positions)
np.save(self.subtask_dir / "joint_positions.npy", joint_positions)
# joint_velocities = np.vstack(self.joint_velocities)
# np.save(self.subtask_dir / "joint_velocities.npy", joint_velocities)
# ee_positions = np.vstack(self.ee_positions)
# np.save(self.subtask_dir / "ee_positions.npy", ee_positions)
# ee_velocities = np.vstack(self.ee_velocities)
# np.save(self.subtask_dir / "ee_velocities.npy", ee_velocities)
# gripper_widths = np.vstack(self.gripper_widths)
# np.save(self.subtask_dir / "gripper_widths.npy", gripper_widths)
# Store the model's output
actions = np.vstack(self.actions)
np.save(self.subtask_dir / "actions.npy", actions)
for camera in self.images:
vid = np.stack(self.images[camera], axis=0)
imageio.mimsave(self.subtask_dir / f"{camera}.mp4", vid, fps=10)
(self.subtask_dir / ('success' if success else 'failure')).touch()
def run_evaluation(self):
# Run tasks
for task_i, task in enumerate(self.tasks):
if not (isinstance(task, DictConfig) or isinstance(task, dict)):
task = dict(
name=task,
subtasks=[task]
)
self._reset_robot()
# Ask if [p]roceed, [s]kip or [q]uit the task
print("")
print(f"📋 The current task is: {task['name']}")
print(f"Do you want [p]roceed, [s]kip, [r]eset again or [q]uit?")
key = input()
while key != "p" and key != "s" and key != "q":
if key == "r":
print("Resetting the robot again.")
self._reset_robot()
else:
sleep(0.3)
print("Please enter a command.")
key = input()
if key == "s":
continue
elif key == "q":
break
logger.info(f"Running task: {task['name']}")
for i, subtask in enumerate(task["subtasks"]):
# Ask if [p]roceed, [s]kip or [q]uit the subtask
print("")
print(f"📜 The current subtask is: '{subtask}'")
print(f"Do you want [p]roceed, [s]kip or [q]uit?")
key = input()
while key != "p" and key != "s" and key != "q":
sleep(0.3)
print("Please enter a valid command")
key = input()
if key == "s":
continue
elif key == "q":
break
# Set instruction
self.instruction = self.task_map(subtask)
logger.info(f"Running subtask instruction: '{subtask}'")
# Init subtask output folder
self._init_storage(task["name"], i, task_i, subtask)
# Reset everything
self._reset(reset_robot=False)
# Loop until [s]uccess or [f]ail
print("")
print("📌 If the robot completed the subtask successfully press [s] otherwise [f]")
print("")
step = 0
success = None
with NonBlockingKeyPress() as kp:
while success is None:
self._inference_step()
step += 1
# Logging
if step % 10 == 0:
logger.info(f"Step {step}")
# Check if user has decided outcome
key = kp.get_data()
if key == "s":
success = True
elif key == "f":
success = False
# Save current state
self._save_step()
# Print outcome
if success:
logger.info(f"Successfully finished the subtask '{subtask}'")
else:
logger.info(f"Failed finishing the subtask '{subtask}'")
logger.info(f"The execution took: {self.obs_timestamps[-1] - self.obs_timestamps[0]}")
# Store results
self._save_subtask(success)
# Task summary
logger.info("Task evaluation completed")
# Close connection to environment
self.env.close()
@hydra.main(config_path="config", config_name="kitchen_eval")
def main(cfg):
"""Main evaluation entry point."""
app = hydra.utils.instantiate(cfg)
app.run_evaluation()
# TODO Calculate statistics?
if __name__ == "__main__":
main()