-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathtasks.py
1605 lines (1523 loc) · 73.4 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) 2019 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A variety of teacher tasks.
"""
import math
import numpy as np
import operator
import os
import gin
import itertools
import random
import json
from collections import deque, OrderedDict
from abc import abstractmethod
from absl import logging
import social_bot
from social_bot.teacher import TeacherAction
class Task(object):
"""Base class for Task.
A Task is for teaching a single task.
"""
compatible_agents = [
'pioneer2dx_noplugin',
'pr2_noplugin',
'icub',
'icub_with_hands',
'youbot_noplugin',
]
def __init__(self, env, max_steps=200, reward_weight=1.0):
"""
Setting things up during the initialization.
Args:
env (social_bot.GazeboEnvBase): an instance of Gym Environment
reward_weight(float): the weight of reward for caculating final_reward in teacher.teach()
Returns:
None
"""
self._env = env
self._world = env._world
self._agent = env._agent
self._max_steps = max_steps
self.reward_weight = reward_weight
self.task_vocab = ['hello', 'well', 'done', 'failed', 'to']
@abstractmethod
def run(self):
""" run() use yield to generate TeacherAction.
Structure of run():
```python
def run(self):
...
# agent_sentence is provided by Teacher using send() in TaskGroup.teach()
agent_sentence = yield # the first yielded value is ignored
...
# TeacherAction will be passed to Teacher as the return value of send() in TaskGroup.teach()
agent_sentence = yield TeacherAction(...)
...
agent_sentence = yield TeacherAction(...)
...
yield TeacherAction(done=True)
```
Returns:
A generator of TeacherAction
"""
pass
def task_specific_observation(self, agent):
"""
The extra infomation needed by the task if sparse states are used.
This can be overridden by the sub task. Note that this is only for the
case "Agent._use_image_observation" is False. For image case, the
image form camera of agent is used. For case of image with internal
states, Agent.get_internal_states() is used, which only returns
self joint positions and velocities.
Args:
agent (GazeboAgent): the agent
Returns:
np.array, the observations of the task for non-image case
"""
return np.array([])
def set_agent(self, agent):
""" Set the agent of task.
The agent can be overridden by this function. This might be useful when multi
agents share the same task or embodied teacher.
Args:
agent (GazeboAgent): the agent
"""
self._agent = agent
def _get_states_of_model_list(self,
model_list,
including_velocity=True,
including_rotation=False):
""" Get the poses and velocities from a model list.
Args:
model_list (list): a list of model names
including_velocity (bool): if Ture, the velocity of objects will be included.
including_rotation (bool): if Ture, the rotation of objects (in roll pitch yaw) will be included.
Returns:
np.array, the poses and velocities of the models
"""
model_states = []
for model_id in range(len(model_list)):
model = self._world.get_model(model_list[model_id])
model_states.append(model.get_pose()[0])
if including_rotation:
model_states.append(model.get_pose()[1])
if including_velocity:
model_states.append(model.get_velocities()[0])
model_states = np.array(model_states).flatten()
return model_states
def _random_move_object(self,
target,
random_range,
center_pos=np.array([0, 0]),
min_distance=0,
height=0):
""" Move an object to a random position.
Args:
target (pyagzebo.Model): the target to move
random_range (float): the range of the new position
center_pos (numpy.array): the center coordinates (x, y) of the random range
min_distance (float): the new position will not be closer than this distance
height (float): height offset
Returns:
np.array, the new position
"""
r = random.uniform(min_distance, random_range)
theta = random.random() * 2 * np.pi
loc = (center_pos[0] + r * np.cos(theta),
center_pos[1] + r * np.sin(theta), height)
target.set_pose((loc, (0, 0, 0)))
return np.array(loc)
@gin.configurable
class GoalTask(Task):
"""
A simple teacher task to find a goal.
For this task, the agent will receive reward 1 when it is close enough to the goal.
If it is moving away from the goal too much or still not close to the goal after max_steps,
it will get reward -1.
"""
def __init__(self,
env,
max_steps,
goal_name="ball",
distraction_list=[
'coke_can', 'table', 'car_wheel', 'plastic_cup', 'beer'
],
goal_conditioned=False,
use_aux_achieved=False,
xy_only_aux=False,
multi_dim_reward=False,
end_on_hitting_distraction=False,
end_episode_after_success=False,
reset_time_limit_on_success=True,
success_distance_thresh=0.5,
fail_distance_thresh=2.0,
distraction_penalty_distance_thresh=0,
distraction_penalty=0.5,
random_agent_orientation=False,
sparse_reward=True,
random_range=5.0,
polar_coord=True,
random_goal=False,
use_curriculum_training=False,
curriculum_distractions=True,
curriculum_target_angle=False,
switch_goal_within_episode=False,
start_range=0.0,
increase_range_by_percent=50.,
reward_thresh_to_increase_range=0.4,
percent_full_range_in_curriculum=0.1,
max_reward_q_length=100,
reward_weight=1.0,
move_goal_during_episode=True,
success_with_angle_requirement=True,
additional_observation_list=[],
use_egocentric_states=False,
egocentric_perception_range=0):
"""
Args:
env (gym.Env): an instance of Environment
max_steps (int): episode will end if not reaching gaol in so many steps
goal_name (string): name of the goal in the world
distraction_list (list of string): a list of model. the model shoud be in gazebo database
goal_conditioned (bool): if True, each step has -1 reward, unless at goal state, which gives 0.
use_aux_achieved (bool): if True, pull out speed, pose dimensions into a separate
field: aux_achieved. Only valid when goal_conditioned is True.
xy_only_aux (bool): exclude irrelevant dimensions (z-axis movements) from
aux_achieved field.
multi_dim_reward (bool): if True, separate goal reward and distraction penalty into two dimensions.
end_episode_after_success (bool): if True, the episode will end once the goal is reached. A True value of this
flag will overwrite the effects of flags ``switch_goal_within_episode`` and ``move_goal_during_episode``.
end_on_hitting_distraction (bool): whether to end episode on hitting distraction
reset_time_limit_on_success (bool): if not ending after success, if hit success before time limit,
reset clock to 0.
success_distance_thresh (float): the goal is reached if it's within this distance to the agent
fail_distance_thresh (float): if the agent moves away from the goal more than this distance,
it's considered a failure and is given reward -1
distraction_penalty_distance_thresh (float): if positive, penalize agent getting too close
to distraction objects (objects that include the goal itself, as approaching goal without
facing it is considered hitting a distraction)
distraction_penalty (float): positive float of how much to penalize getting too close to
distraction objects
random_agent_orientation (bool): whether randomize the orientation (yaw) of the agent at the beginning of an
episode.
sparse_reward (bool): if true, the reward is -1/0/1, otherwise the 0 case will be replaced
with normalized distance the agent get closer to goal.
random_range (float): the goal's random position range
polar_coord (bool): use cartesian coordinates in random_range, otherwise, use polar coord.
random_goal (bool): if True, teacher will randomly select goal from the object list each episode
use_curriculum_training (bool): when true, use curriculum in goal task training
curriculum_distractions (bool): move distractions according to curriculum as well
curriculum_target_angle (bool): enlarge angle to target when initializing target according
to curriculum. Only when all angles are satisfied does curriculum try to increase distance.
Uses range of 0-360 degrees, starting from 60 with increments of 20.
switch_goal_within_episode (bool): if random_goal and this are both true, goal will be re-picked
within episode every time target is reached, besides picking after whole episode ends.
start_range (float): for curriculum learning, the starting random_range to set the goal
increase_range_by_percent (float): for curriculum learning, how much to increase random range
every time agent reached the specified amount of reward.
reward_thresh_to_increase_range (float): for curriculum learning, how much reward to reach
before the teacher increases random range.
percent_full_range_in_curriculum (float): if above 0, randomly throw in x% of training examples
where random_range is the full range instead of the easier ones in the curriculum.
max_reward_q_length (int): how many recent rewards to consider when estimating agent accuracy.
reward_weight (float): the weight of the reward, is used in multi-task case
move_goal_during_episode (bool): if True, the goal will be moved during episode, when it has been achieved
success_with_angle_requirement: if True then calculate the reward considering the angular requirement
additional_observation_list: a list of additonal objects to be added
use_egocentric_states (bool): For the non-image observation case, use the states transformed to
egocentric coordinate, e.g., agent's egocentric distance and direction to goal
egocentric_perception_range (float): the max range in degree to limit the agent's observation.
E.g. 60 means object is only visible when it's within +/-60 degrees in front of the agent's
direction (yaw).
"""
self._max_play_ground_size = 5 # play ground will be (-5, 5) for both x and y axes.
# TODO: Remove the default grey walls in the play ground world file,
# and insert them according to the max_play_ground_size.
# The wall should be lower, and adjustable in length. Add a custom model for that.
super().__init__(
env=env, max_steps=max_steps, reward_weight=reward_weight)
self._goal_name = goal_name
self._goal_conditioned = goal_conditioned
self._use_aux_achieved = use_aux_achieved
self._xy_only_aux = xy_only_aux
self._multi_dim_reward = multi_dim_reward
self.end_on_hitting_distraction = end_on_hitting_distraction
self._end_episode_after_success = end_episode_after_success
self._reset_time_limit_on_success = reset_time_limit_on_success
self._success_distance_thresh = success_distance_thresh
self._fail_distance_thresh = fail_distance_thresh
self._distraction_penalty_distance_thresh = distraction_penalty_distance_thresh
if distraction_penalty_distance_thresh > 0:
assert distraction_penalty_distance_thresh < success_distance_thresh
self._distraction_penalty = distraction_penalty
self._sparse_reward = sparse_reward
self._random_agent_orientation = random_agent_orientation
self._use_curriculum_training = use_curriculum_training
self._curriculum_distractions = curriculum_distractions
self._curriculum_target_angle = curriculum_target_angle
self._switch_goal_within_episode = switch_goal_within_episode
if curriculum_target_angle:
self._random_angle = 60
self._start_range = start_range
self._is_full_range_in_curriculum = False
self._random_goal = random_goal
if random_goal and goal_name not in distraction_list:
distraction_list.append(goal_name)
self._distraction_list = distraction_list
self._object_list = distraction_list
if goal_name and goal_name not in distraction_list:
self._object_list.append(goal_name)
self._goals = self._object_list
self._move_goal_during_episode = move_goal_during_episode
self._success_with_angle_requirement = success_with_angle_requirement
if not additional_observation_list:
additional_observation_list = self._object_list
self._additional_observation_list = additional_observation_list
self._pos_list = list(
itertools.product(
range(-self._max_play_ground_size, self._max_play_ground_size),
range(-self._max_play_ground_size,
self._max_play_ground_size)))
self._pos_list.remove((0, 0))
self._polar_coord = polar_coord
self._use_egocentric_states = use_egocentric_states
self._egocentric_perception_range = egocentric_perception_range
if self.should_use_curriculum_training():
self._orig_random_range = random_range
self._random_range = start_range
self._max_reward_q_length = max_reward_q_length
self._q = deque(maxlen=max_reward_q_length)
self._reward_thresh_to_increase_range = reward_thresh_to_increase_range
self._increase_range_by_percent = increase_range_by_percent
self._percent_full_range_in_curriculum = percent_full_range_in_curriculum
angle_str = ""
if curriculum_target_angle:
angle_str = ", start_angle {}".format(self._random_angle)
logging.info(
"Env %d: start_range %f%s, reward_thresh_to_increase_range %f",
self._env._port, self._start_range, angle_str,
self._reward_thresh_to_increase_range)
else:
self._random_range = random_range
self._goal_dist = 0.
obs_format = "image"
obs_relative = "ego"
if use_egocentric_states:
obs_format = "full_state"
else:
obs_relative = "absolute"
logging.info("Observations: {}, {}.".format(obs_format, obs_relative))
if not use_egocentric_states:
logging.info(
"Dims: 0-5: agent's velocity and angular " +
"velocity, 6-11: agent's position and pose, 12-13: goal x, y" +
", all distractions' x, y coordinates.")
self.task_vocab += self._object_list
self._env.insert_model_list(self._object_list)
def should_use_curriculum_training(self):
return (self._use_curriculum_training
and self._start_range >= self._success_distance_thresh * 1.2)
def _push_reward_queue(self, value):
if (not self.should_use_curriculum_training()
) or self._is_full_range_in_curriculum:
return
self._q.append(value)
if (value > 0 and len(self._q) == self._max_reward_q_length
and sum(self._q) >= self._max_reward_q_length *
self._reward_thresh_to_increase_range):
if self._curriculum_target_angle:
self._random_angle += 20
logging.info("Env %d: Raising random_angle to %d",
self._env._port, self._random_angle)
if (not self._curriculum_target_angle or self._random_angle > 360):
self._random_angle = 60
new_range = min((1. + self._increase_range_by_percent) *
self._random_range, self._orig_random_range)
if self._random_range < self._orig_random_range:
logging.info("Env %d: Raising random_range to %f",
self._env._port, new_range)
self._random_range = new_range
self._q.clear()
def get_random_range(self):
return self._random_range
def pick_goal(self):
if self._random_goal:
random_id = random.randrange(len(self._goals))
self.set_goal_name(self._goals[random_id])
def _get_agent_loc(self):
loc, agent_dir = self._agent.get_pose()
if self._agent.type.find('icub') != -1:
# For agent icub, we need to use the average pos here
loc = ICubAuxiliaryTask.get_icub_extra_obs(self._agent)[:3]
loc = np.array(loc)
return loc, agent_dir
def _prepare_teacher_action(self,
reward,
sentence,
done,
success=False,
rewards=None):
goal_dist = 0.
if rewards is not None:
rewards = rewards.astype(np.float32)
if done:
goal_dist = self._goal_dist
# clear self._goal_dist so it is only output once
self._goal_dist = 0.
return TeacherAction(
reward=reward,
sentence=sentence,
done=done,
success=success,
goal_range=goal_dist,
rewards=rewards)
def _within_angle(self, dot):
return (not self._success_with_angle_requirement) or dot > 0.707
def _get_goal_dist(self, goal):
loc, agent_dir = self._get_agent_loc()
goal_loc, _ = goal.get_pose()
goal_loc = np.array(goal_loc)
dist = np.linalg.norm(loc - goal_loc)
# dir from get_pose is (roll, pitch, yaw)
dir = np.array([math.cos(agent_dir[2]), math.sin(agent_dir[2])])
goal_dir = (goal_loc[0:2] - loc[0:2]) / dist
dot = sum(dir * goal_dir)
return dist, dot, loc
def run(self):
""" Start a teaching episode for this task. """
agent_sentence = yield
self._agent.reset()
if self._random_agent_orientation:
loc, agent_dir = self._agent.get_pose()
self._agent.set_pose((loc, (agent_dir[0], agent_dir[1],
2 * math.pi * random.random())))
a_loc, a_dir = self._get_agent_loc()
self._random_move_objects()
self.pick_goal()
goal = self._world.get_model(self._goal_name)
self._move_goal(goal, a_loc, a_dir)
steps_since_last_reward = 0
prev_min_dist_to_distraction = 100
rewards = None # reward array in multi_dim_reward case
while steps_since_last_reward < self._max_steps:
steps_since_last_reward += 1
dist, dot, loc = self._get_goal_dist(goal)
distraction_penalty, prev_min_dist_to_distraction = (
self._get_distraction_penalty(loc, dot,
prev_min_dist_to_distraction))
# TODO(Le): compare achieved goal with desired goal if task is
# goal conditioned?
if dist < self._success_distance_thresh and self._within_angle(
dot):
# within 45 degrees of the agent direction
reward = 1.0 - distraction_penalty
self._push_reward_queue(max(reward, 0))
if self._goal_conditioned:
reward -= 1.
if self._multi_dim_reward:
rewards = np.array([0, -distraction_penalty])
else:
if self._multi_dim_reward:
rewards = np.array([1, -distraction_penalty])
logging.debug("yielding reward: " + str(reward))
done = self._end_episode_after_success
agent_sentence = yield self._prepare_teacher_action(
reward=reward,
sentence="well done",
done=done,
success=True,
rewards=rewards)
if self._reset_time_limit_on_success:
steps_since_last_reward = 0
if self._switch_goal_within_episode:
self.pick_goal()
goal = self._world.get_agent(self._goal_name)
if self._move_goal_during_episode:
self._agent.reset()
loc, agent_dir = self._get_agent_loc()
self._move_goal(goal, loc, agent_dir)
elif dist > self._initial_dist + self._fail_distance_thresh:
reward = -1.0 - distraction_penalty
self._push_reward_queue(0)
logging.debug(
"yielding reward: {}, farther than {} from goal".format(
str(reward), str(self._fail_distance_thresh)))
if self._multi_dim_reward:
rewards = np.array([-1, -distraction_penalty])
yield self._prepare_teacher_action(
reward=reward,
sentence="failed",
done=True,
rewards=rewards)
else:
if self._sparse_reward:
reward = 0
if self._goal_conditioned:
reward = -1
else:
reward = (self._prev_dist - dist) / self._initial_dist
if self._multi_dim_reward:
rewards = np.array([reward, -distraction_penalty])
reward = reward - distraction_penalty
done = False
if distraction_penalty > 0:
logging.debug("yielding reward: " + str(reward))
self._push_reward_queue(0)
done = self.end_on_hitting_distraction
self._prev_dist = dist
agent_sentence = yield self._prepare_teacher_action(
reward=reward,
sentence=self._goal_name,
done=done,
rewards=rewards)
reward = -1.0
dist, dot, loc = self._get_goal_dist(goal)
distraction_penalty, prev_min_dist_to_distraction = (
self._get_distraction_penalty(loc, dot,
prev_min_dist_to_distraction))
# TODO(Le): compare achieved goal with desired goal if task is
# goal conditioned?
success = False
if dist < self._success_distance_thresh and self._within_angle(dot):
success = True
reward = 1.0 - distraction_penalty
self._push_reward_queue(max(reward, 0))
if self._goal_conditioned:
reward -= 1.
if self._multi_dim_reward:
rewards = np.array([0, -distraction_penalty])
else:
if self._multi_dim_reward:
rewards = np.array([1, -distraction_penalty])
else:
self._push_reward_queue(0)
logging.debug("took more than {} steps".format(
str(self._max_steps)))
logging.debug("yielding reward: {}".format(str(reward)))
if self.should_use_curriculum_training():
logging.debug("reward queue len: {}, sum: {}".format(
str(len(self._q)), str(sum(self._q))))
yield self._prepare_teacher_action(
reward=reward,
sentence="failed",
done=True,
success=success,
rewards=rewards)
def _get_distraction_penalty(self, agent_loc, dot,
prev_min_dist_to_distraction):
"""
Calculate penalty for hitting/getting close to distraction objects
"""
distraction_penalty = 0
if (self._distraction_penalty_distance_thresh > 0
and self._distraction_list):
curr_min_dist = 100
for obj_name in self._distraction_list:
obj = self._world.get_model(obj_name)
if not obj:
continue
obj_loc, _ = obj.get_pose()
obj_loc = np.array(obj_loc)
distraction_dist = np.linalg.norm(agent_loc - obj_loc)
if (distraction_dist >=
self._distraction_penalty_distance_thresh):
continue
if obj_name == self._goal_name and self._within_angle(dot):
continue # correctly getting to goal, no penalty
if distraction_dist < curr_min_dist:
curr_min_dist = distraction_dist
if (prev_min_dist_to_distraction >
self._distraction_penalty_distance_thresh):
logging.debug("hitting object: " + obj_name)
distraction_penalty += self._distraction_penalty
prev_min_dist_to_distraction = curr_min_dist
return distraction_penalty, prev_min_dist_to_distraction
def _move_goal(self, goal, agent_loc, agent_dir):
"""
Move goal as well as all distraction objects to a random location.
"""
avoid_locations = [agent_loc]
loc, dist = self._move_obj(
obj=goal,
agent_loc=agent_loc,
agent_dir=agent_dir,
is_goal=True,
avoid_locations=avoid_locations)
self._goal_dist += dist
avoid_locations.append(loc)
distractions = OrderedDict()
for item in self._distraction_list:
if item is not self._goal_name:
distractions[item] = 1
if len(distractions) and self._curriculum_distractions:
for item, _ in distractions.items():
distraction = self._world.get_agent(item)
loc, _ = self._move_obj(
obj=distraction,
agent_loc=agent_loc,
agent_dir=agent_dir,
is_goal=False,
avoid_locations=avoid_locations)
avoid_locations.append(loc)
def _move_obj(self,
obj,
agent_loc,
agent_dir,
is_goal=True,
avoid_locations=[]):
if (self.should_use_curriculum_training()
and self._percent_full_range_in_curriculum > 0
and random.random() < self._percent_full_range_in_curriculum):
range = self._orig_random_range
self._is_full_range_in_curriculum = is_goal
else:
range = self._random_range
self._is_full_range_in_curriculum = False
attempts = 0
dist = range
while True:
attempts += 1
dist = random.random() * range
if self._curriculum_target_angle:
angle_range = self._random_angle
else:
angle_range = 360
angle = math.radians(
math.degrees(agent_dir[2]) + random.random() * angle_range -
angle_range / 2)
loc = (dist * math.cos(angle), dist * math.sin(angle),
0) + agent_loc
if not self._polar_coord:
loc = np.asarray((random.random() * range - range / 2,
random.random() * range - range / 2, 0))
self._initial_dist = np.linalg.norm(loc - agent_loc)
satisfied = True
if (abs(loc[0]) > self._max_play_ground_size or abs(loc[1]) >
self._max_play_ground_size): # not within walls
satisfied = False
for avoid_loc in avoid_locations:
dist = np.linalg.norm(loc - avoid_loc)
if dist < self._success_distance_thresh:
satisfied = False
break
if satisfied or attempts > 10000:
if not satisfied:
logging.warning(
"Took forever to find satisfying " +
"object location. " +
"agent_loc: {}, range: {}, max_size: {}.".format(
str(agent_loc), str(range),
str(self._max_play_ground_size)))
break
self._prev_dist = self._initial_dist
obj.reset()
obj.set_pose((loc, (0, 0, 0)))
return loc, dist
def _random_move_objects(self, random_range=10.0):
obj_num = len(self._object_list)
obj_pos_list = random.sample(self._pos_list, obj_num)
for obj_id in range(obj_num):
model_name = self._object_list[obj_id]
loc = (obj_pos_list[obj_id][0], obj_pos_list[obj_id][1], 0)
pose = (np.array(loc), (0, 0, 0))
self._world.get_model(model_name).set_pose(pose)
def get_goal_name(self):
"""
Args:
None
Returns:
Goal's name at this episode
"""
return self._goal_name
def set_goal_name(self, goal_name):
"""
Args:
Goal's name
Returns:
None
"""
logging.debug('Setting Goal to %s', goal_name)
self._goal_name = goal_name
def generate_goal_conditioned_obs(self, agent):
flat_obs = self.task_specific_observation(agent)
obs = OrderedDict()
agent_pose = np.array(agent.get_pose()).flatten()
agent_vel = np.array(agent.get_velocities()).flatten()
goal = self._world.get_model(self._goal_name)
goal_pose = np.array(goal.get_pose()[0]).flatten()
obs['observation'] = flat_obs
obs['achieved_goal'] = agent_pose[0:2]
obs['desired_goal'] = goal_pose[0:2]
if self._use_aux_achieved:
# distraction objects' x, y coordinates
obs['observation'] = flat_obs[14:]
obs['aux_achieved'] = np.concatenate((agent_vel, agent_pose[2:]),
axis=0)
if self._xy_only_aux:
# agent speed: 2: z-speed; 3, 4: angular velocities; 5: yaw-vel,
# agent pose: 2: z; 3, 4, 5: roll pitch yaw.
obs['observation'] = np.concatenate(
(agent_vel[2:5], agent_pose[2:5], flat_obs[14:]), axis=0)
obs['aux_achieved'] = np.concatenate(
(agent_vel[0:2], np.expand_dims(agent_vel[5], 0),
np.expand_dims(agent_pose[5], 0)),
axis=0)
return obs
def task_specific_observation(self, agent):
"""
Args:
agent (GazeboAgent): the agent
Returns:
np.array, the observations of the task for non-image case
"""
goal = self._world.get_model(self._goal_name)
goal_first = not agent._with_language
if goal_first: # put goal first
pose = np.array(goal.get_pose()[0]).flatten()
else: # has language input, don't put goal first
pose = None
for name in self._additional_observation_list:
if goal_first and name == self._goal_name:
continue
obj = self._world.get_model(name)
obj_pos = np.array(obj.get_pose()[0]).flatten()
if pose is None:
pose = obj_pos
else:
pose = np.concatenate((pose, obj_pos), axis=0)
agent_pose = np.array(agent.get_pose()).flatten()
yaw = agent_pose[5]
# adds egocentric velocity input
vx, vy, vz, a1, a2, a3 = np.array(agent.get_velocities()).flatten()
if self._use_egocentric_states:
rvx, rvy = agent.get_egocentric_cord_2d(vx, vy, yaw)
else:
rvx, rvy = vx, vy
obs = [rvx, rvy, vz, a1, a2, a3] + list(agent_pose)
# adds objects' (goal's as well as distractions') egocentric
# coordinates to observation
while len(pose) > 1:
x = pose[0]
y = pose[1]
if self._use_egocentric_states:
x = pose[0] - agent_pose[0]
y = pose[1] - agent_pose[1]
rotated_x, rotated_y = agent.get_egocentric_cord_2d(x, y, yaw)
else:
rotated_x, rotated_y = x, y
if (self._use_egocentric_states
and self._egocentric_perception_range > 0):
dist = math.sqrt(rotated_x * rotated_x + rotated_y * rotated_y)
rotated_x /= dist
rotated_y /= dist
magnitude = 1. / dist
if rotated_x < np.cos(
self._egocentric_perception_range / 180. * np.pi):
rotated_x = 0.
rotated_y = 0.
magnitude = 0.
obs.extend([rotated_x, rotated_y, magnitude])
else:
obs.extend([rotated_x, rotated_y])
pose = pose[3:]
obs = np.array(obs)
return obs
@gin.configurable
class ICubAuxiliaryTask(Task):
"""
An auxiliary task spicified for iCub, to keep the agent from falling down
and to encourage the agent walk
"""
def __init__(self,
env,
max_steps,
target=None,
agent_init_pos=(0, 0),
agent_pos_random_range=0,
reward_weight=1.0):
"""
Args:
env (gym.Env): an instance of Environment
max_steps (int): episode will end in so many steps
reward_weight (float): the weight of the reward, should be tuned
accroding to reward range of other tasks
target (string): this is the target icub should face towards, since
you may want the agent interact with something
agent_init_pos (tuple): the expected initial position of the agent
pos_random_range (float): random range of the initial position
"""
super().__init__(
env=env, max_steps=max_steps, reward_weight=reward_weight)
self.task_vocab = ['icub']
self._target_name = target
self._pre_agent_pos = np.array([0, 0, 0], dtype=np.float32)
self._agent_init_pos = agent_init_pos
self._random_range = agent_pos_random_range
if self._target_name:
self._target = self._world.get_model(self._target_name)
with open(
os.path.join(social_bot.get_model_dir(), "agent_cfg.json"),
'r') as cfg_file:
agent_cfgs = json.load(cfg_file)
self._joints = agent_cfgs[self._agent.type]['control_joints']
def run(self):
""" Start a teaching episode for this task. """
self._pre_agent_pos = self.get_icub_extra_obs(self._agent)[:3]
agent_sentence = yield
done = False
# set icub random initial pose
x = self._agent_init_pos[0] + random.random() * self._random_range
y = self._agent_init_pos[1] + random.random() * self._random_range
orient = (random.random() - 0.5) * np.pi
if self._target_name and random.randint(0, 1) == 0:
# a trick from roboschool humanoid flag run, important to learn to steer
pos = np.array([x, y, 0.6])
orient = self._get_angle_to_target(
self._agent, pos, self._agent.type + '::root_link', np.pi)
self._agent.set_pose((np.array([x, y, 0.6]), np.array([0, 0, orient])))
while not done:
# reward for not falling (alive reward)
agent_height = np.array(
self._agent.get_link_pose(self._agent.type + '::head'))[0][2]
done = agent_height < 0.7 # fall down
standing_reward = agent_height
# movement cost, to avoid uncessary movements
joint_pos = []
for joint_name in self._joints:
joint_state = self._agent.get_joint_state(joint_name)
joint_pos.append(joint_state.get_positions())
joint_pos = np.array(joint_pos).flatten()
movement_cost = np.sum(np.abs(joint_pos)) / joint_pos.shape[0]
# orientation cost, the agent should face towards the target
if self._target_name:
agent_pos = self.get_icub_extra_obs(self._agent)[:3]
head_angle = self._get_angle_to_target(
self._agent, agent_pos, self._agent.type + '::head')
root_angle = self._get_angle_to_target(
self._agent, agent_pos, self._agent.type + '::root_link')
l_foot_angle = self._get_angle_to_target(
self._agent, agent_pos,
self._agent.type + '::l_leg::l_foot', np.pi)
r_foot_angle = self._get_angle_to_target(
self._agent, agent_pos,
self._agent.type + '::r_leg::r_foot', np.pi)
orient_cost = (np.abs(head_angle) + np.abs(root_angle) +
np.abs(l_foot_angle) + np.abs(r_foot_angle)) / 4
else:
orient_cost = 0
# sum all
reward = standing_reward - 0.5 * movement_cost - 0.2 * orient_cost
agent_sentence = yield TeacherAction(reward=reward, done=done)
@staticmethod
def get_icub_extra_obs(agent):
"""
Get contacts_to_ground, pose of key ponit of icub and center of them.
A static method, other task can use this to get additional icub info.
Args:
the agent
Returns:
np.array of the extra observations of icub, including average pos
"""
root_pose = np.array(
agent.get_link_pose(agent.name + '::root_link')).flatten()
chest_pose = np.array(
agent.get_link_pose(agent.name + '::chest')).flatten()
l_foot_pose = np.array(
agent.get_link_pose(agent.name + '::l_leg::l_foot')).flatten()
r_foot_pose = np.array(
agent.get_link_pose(agent.name + '::r_leg::r_foot')).flatten()
foot_contacts = np.array([
agent.get_contacts("l_foot_contact_sensor",
'ground_plane::link::collision'),
agent.get_contacts("r_foot_contact_sensor",
'ground_plane::link::collision')
]).astype(np.float32)
average_pos = np.sum([
root_pose[0:3], chest_pose[0:3], l_foot_pose[0:3], r_foot_pose[0:3]
],
axis=0) / 4.0
obs = np.concatenate((average_pos, root_pose, chest_pose, l_foot_pose,
r_foot_pose, foot_contacts))
return obs
def _get_angle_to_target(self, aegnt, agent_pos, link_name, offset=0):
""" Get angle from a icub link, relative to target.
Args:
agent (GazeboAgent): the agent
agent_pos (numpay array): the pos of agent
link_name (string): link name of the agent
offset (float): the yaw offset of link, for some links have initial internal rotation
Returns:
float, angle to target
"""
yaw = aegnt.get_link_pose(link_name)[1][2]
yaw = (yaw + offset) % (
2 * np.pi
) - np.pi # model icub has a globle built-in 180 degree rotation
target_pos, _ = self._target.get_pose()
walk_target_theta = np.arctan2(target_pos[1] - agent_pos[1],
target_pos[0] - agent_pos[0])
angle_to_target = walk_target_theta - yaw
# wrap the range to [-pi, pi)
angle_to_target = (angle_to_target + np.pi) % (2 * np.pi) - np.pi
return angle_to_target
def task_specific_observation(self, agent):
"""
Args:
agent (GazeboAgent): the agent
Returns:
np.array, the observations of the task for non-image case
"""
icub_extra_obs = self.get_icub_extra_obs(agent)
if self._target_name:
agent_pos = icub_extra_obs[:3]
# TODO: be compatible with calling multiple times in one env step
agent_speed = (
agent_pos - self._pre_agent_pos) / self._env.get_step_time()
self._pre_agent_pos = agent_pos
yaw = agent.get_link_pose(agent.type + '::root_link')[1][2]
angle_to_target = self._get_angle_to_target(
agent, agent_pos, agent.type + '::root_link')
rot_minus_yaw = np.array([[np.cos(-yaw), -np.sin(-yaw), 0],
[np.sin(-yaw),
np.cos(-yaw), 0], [0, 0, 1]])
vx, vy, vz = np.dot(rot_minus_yaw,
agent_speed) # rotate to agent view
orientation_ob = np.array(
[np.sin(angle_to_target),
np.cos(angle_to_target), vx, vy, vz],
dtype=np.float32)
return np.concatenate([icub_extra_obs] + [orientation_ob])
else:
return icub_extra_obs
@gin.configurable
class KickingBallTask(Task):
"""
A simple task to kick a ball so that it rolls into the goal. An
optional reward shaping can be used to guide the agent run to the ball first:
Agent will receive 100 when succefully kick the ball into the goal.
Agent will receive the speed of getting closer to the ball before touching the
ball within 45 degrees of agent direction. The reward is trunked within
parameter target_speed.
Agent will receive negative normalized distance from ball to goal center
after touching the ball within the direction. An offset of
"target_speed + 1" is included since touching the ball must be better
than not touching.
If no reward shaping, then the agent will only get -1/0/1 rewards.
"""
def __init__(self,
env,
max_steps,
goal_distance=5.0,
random_range=4.0,
target_speed=2.0,
reward_weight=1.0,
sparse_reward=False):
"""
Args:
env (gym.Env): an instance of Environment
max_steps (int): episode will end if the task is not achieved in so
many steps
goal_distance (float): the distance from the goal to the ball on
average. A smaller distance makes the kicking task easier.
random_range (float): the ball's random position range
target_speed (float): the target speed runing to the ball. The agent will receive no more
higher reward when its speed is higher than target_speed.
reward_weight (float): the weight of the reward
sparse_reward (bool): if True, the agent will only get -1/0/1 rewards.
"""
super().__init__(
env=env, max_steps=max_steps, reward_weight=reward_weight)
self._random_range = random_range
self._target_speed = target_speed