-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdt_rrt_star.py
233 lines (191 loc) · 9.61 KB
/
dt_rrt_star.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import numpy as np
import random
from visualiser import Visualiser
from rrt import RRT
from debug import debug_planner, proc_time
class Node:
def __init__(self, position, parent=None):
self.position = position # Node Position (x coordinate, y coordinate)
self.parent = parent # Reference to the parent node
class DT_RRT_Star:
def __init__(self, map_env: Visualiser, step_size=5, neighbor_radius=20):
self.map_env = map_env
self.step_size = step_size
self.neighbor_radius = neighbor_radius
self.nodes = [Node(map_env.start)]
self.nodes[0].cost = 0
self.sigma_r = 20 # Standard deviation for the radial distance
self.mu_r = 2 # Mean radial distance
self.sigma_theta = np.pi / 6 # Standard deviation for the angle in radians
self.mu_theta = np.pi / 2 # Mean angle (e.g., pointing upwards)
def sample_around(self, nodes):
new_nodes = []
random_node = random.choice(nodes)
for _ in range(1000):
r = np.random.normal(self.mu_r, self.sigma_r)
theta = np.random.normal(self.mu_theta, self.sigma_theta)
# Convert polar coordinates (r, theta) to Cartesian coordinates (dx, dy)
dx = r * np.cos(theta)
dy = r * np.sin(theta)
# Create a new position by adding the offset to the current position
new_position = (random_node[0] + dx, random_node[1] + dy)
# Create a new node with the new position and add it to the list
new_nodes.append(new_position)
return (random_node, new_nodes)
def random_gaussian_point(self, nodes):
random_node = random.choice(nodes)
r = np.random.normal(self.mu_r, self.sigma_r)
theta = np.random.normal(self.mu_theta, self.sigma_theta)
# Convert polar coordinates (r, theta) to Cartesian coordinates (dx, dy)
dx = r * np.cos(theta)
dy = r * np.sin(theta)
# Create a new position by adding the offset to the current position
gaussian_point = (random_node[0] + dx, random_node[1] + dy)
return gaussian_point
def distance(self, a, b):
return np.sqrt((a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2)
def nearest_node(self, position):
return min(self.nodes, key=lambda node: self.distance(node.position, position))
def step_from_to(self, n1, n2):
if self.distance(n1, n2) < self.step_size:
return n2
else:
theta = np.arctan2(n2[1] - n1[1], n2[0] - n1[0])
return n1[0] + self.step_size * np.cos(theta), n1[1] + self.step_size * np.sin(theta)
def find_neighbors(self, new_node):
return [node for node in self.nodes if self.distance(node.position, new_node.position) < self.neighbor_radius]
def is_collision_free(self, node):
for obstacle, size in self.map_env.obstacles:
if (
obstacle[0] <= node.position[0] <= obstacle[0] + size[0]
and obstacle[1] <= node.position[1] <= obstacle[1] + size[1]
):
return False
return True
def is_path_collision_free(self, start_pos, end_pos):
steps = int(np.ceil(self.distance(start_pos, end_pos) / self.step_size))
dx = (end_pos[0] - start_pos[0]) / steps
dy = (end_pos[1] - start_pos[1]) / steps
for step in range(1, steps + 1):
intermediate_point = (start_pos[0] + dx * step, start_pos[1] + dy * step)
for obstacle, size in self.map_env.obstacles:
# Check if intermediate_point is inside the current obstacle
if (
obstacle[0] <= intermediate_point[0] <= obstacle[0] + size[0]
and obstacle[1] <= intermediate_point[1] <= obstacle[1] + size[1]
):
return False # Path is not collision-free
return True # Path is collision-free
def choose_best_parent(self, new_node, neighbors):
for neighbor in neighbors:
potential_cost = neighbor.cost + self.distance(neighbor.position, new_node.position)
if self.is_path_collision_free(neighbor.position, new_node.position) and potential_cost < new_node.cost:
new_node.parent = neighbor
new_node.cost = potential_cost
def re_search_parent(self, new_node):
# Initialize the potential parent as the node’s parent
potential_parent = new_node.parent
best_cost = new_node.cost
found_better_parent = False
# Traverse back up the tree towards the root
current_node = new_node.parent
while current_node is not None:
# Calculate the potential cost if current_node were the parent
if self.is_path_collision_free(current_node.position, new_node.position):
potential_cost = current_node.cost + self.distance(current_node.position, new_node.position)
# Check if this new potential parent offers a better (lower) cost
if potential_cost < best_cost:
best_cost = potential_cost
potential_parent = current_node
found_better_parent = True
# Move up the tree
current_node = current_node.parent
# If a better parent was found, update the parent and cost of new_node
if found_better_parent:
new_node.parent = potential_parent
new_node.cost = best_cost
@debug_planner
def find_path(self):
# first, notify any pauser daemons that we are starting
self.find_path.pause_condition.acquire(blocking=True)
rrt = RRT(self.map_env)
last_node, _ = rrt.find_path()
_, shortcut_path = self._shortcut_path(last_node)
print("INFO: found shortcut path!")
self.find_path.pause_condition.notify()
self.find_path.pause_condition.release()
goal_reached = False
while not goal_reached:
self.find_path.pause_condition.acquire(blocking=True)
self.find_path.pause_condition.wait_for(lambda: not self.find_path.paused)
random_position = self.random_gaussian_point(shortcut_path)
nearest = self.nearest_node(random_position)
new_position = self.step_from_to(nearest.position, random_position)
new_node = Node(new_position, nearest)
new_node.cost = nearest.cost + self.distance(new_node.position, nearest.position)
if self.is_collision_free(new_node):
neighbors = self.find_neighbors(new_node)
self.choose_best_parent(new_node, neighbors)
self.re_search_parent(new_node)
self.nodes.append(new_node)
if self.distance(new_node.position, self.map_env.goal) <= self.step_size:
print("INFO: Goal reached!")
goal_reached = True
self.find_path.pause_condition.release()
last_node = self.nodes[-1]
return last_node, self._trace_path(last_node)
def _trace_path(self, final_node):
path = []
current_node = final_node
while current_node is not None:
path.append(current_node.position)
current_node = current_node.parent
path.reverse()
return path
@proc_time
def _shortcut_path(self, last_final_node: Node):
"""
Runs the `re_search_parent` operation on an existing path
from the start point to the goal. Iterates from the goal, backwards,
tracing `.parent` pointers to find the best parent for each node.
N.B.:Additionally, it adds intermediate nodes to the path to aid in
Gaussian sampling later on.
Parameters:
-----------
final_node : Node
Goal node with parent set, all the way back to the start node
Returns:
--------
(Node, list[tuple])
A tuple containing the last node in the path (excluding the goal)
and the optimised path from the start to the goal
"""
# for the first iteration, we start at the goal
final_node = current_node = Node(self.map_env.goal, last_final_node)
current_node.cost = last_final_node.cost + self.distance(last_final_node.position, self.map_env.goal)
self.re_search_parent(current_node)
current_node = current_node.parent
# iterate back up the tree to find the best parent for each node
while current_node is not None:
self.re_search_parent(current_node)
current_node = current_node.parent
# make the path have nodes each of increment `step_size`
# by adding intermediate nodes. Needed for the Gaussian sampling
# to work correctly
current_node = final_node
while current_node.parent is not None:
if self.distance(current_node.position, current_node.parent.position) > self.step_size:
new_position = self.step_from_to(current_node.position, current_node.parent.position)
new_node = Node(new_position, current_node.parent)
new_node.cost = current_node.parent.cost + self.distance(
new_node.position, current_node.parent.position
)
current_node.parent = new_node
current_node = current_node.parent
# retrace the path from the start to the goal
# and return the optimised path
#
# TODO: change this to match the terminology the rest of the project
# `final_node` needs to be a node *within step_size* of the goal
# currently, it is the end goal node itself
return final_node, self._trace_path(final_node)