forked from AtsushiSakai/PythonRobotics
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Wavefront Coverage Path Planning (AtsushiSakai#351)
* First Commit of Wavefront Coverage Planner * Update wavefront_coverage_path_planner.py * fix CI / CodeFactor problem * Fix mypy error * update * Fix PyCodeStyle * Fix Code Scanning Warning and code styling problem * add simple unittest * followed second code review suggestions
- Loading branch information
Showing
5 changed files
with
283 additions
and
0 deletions.
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
219 changes: 219 additions & 0 deletions
219
PathPlanning/WavefrontCPP/wavefront_coverage_path_planner.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
""" | ||
Distance/Path Transform Wavefront Coverage Path Planner | ||
author: Todd Tang | ||
paper: Planning paths of complete coverage of an unstructured environment | ||
by a mobile robot - Zelinsky et.al. | ||
link: http://pinkwink.kr/attachment/[email protected] | ||
""" | ||
|
||
import os | ||
import sys | ||
|
||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
|
||
from scipy import ndimage | ||
|
||
|
||
do_animation = True | ||
|
||
|
||
def transform( | ||
gridmap, src, distance_type='chessboard', | ||
transform_type='path', alpha=0.01 | ||
): | ||
"""transform | ||
calculating transform of transform_type from src | ||
in given distance_type | ||
:param gridmap: 2d binary map | ||
:param src: distance transform source | ||
:param distance_type: type of distance used | ||
:param transform_type: type of transform used | ||
:param alpha: weight of Obstacle Transform usedwhen using path_transform | ||
""" | ||
|
||
nrows, ncols = gridmap.shape | ||
|
||
if nrows == 0 or ncols == 0: | ||
sys.exit('Empty gridmap.') | ||
|
||
inc_order = [[0, 1], [1, 1], [1, 0], [1, -1], | ||
[0, -1], [-1, -1], [-1, 0], [-1, 1]] | ||
if distance_type == 'chessboard': | ||
cost = [1, 1, 1, 1, 1, 1, 1, 1] | ||
elif distance_type == 'eculidean': | ||
cost = [1, np.sqrt(2), 1, np.sqrt(2), 1, np.sqrt(2), 1, np.sqrt(2)] | ||
else: | ||
sys.exit('Unsupported distance type.') | ||
|
||
transform_matrix = float('inf') * np.ones_like(gridmap, dtype=np.float) | ||
transform_matrix[src[0], src[1]] = 0 | ||
if transform_type == 'distance': | ||
eT = np.zeros_like(gridmap) | ||
elif transform_type == 'path': | ||
eT = ndimage.distance_transform_cdt(1-gridmap, distance_type) | ||
else: | ||
sys.exit('Unsupported transform type.') | ||
|
||
# set obstacle transform_matrix value to infinity | ||
for i in range(nrows): | ||
for j in range(ncols): | ||
if gridmap[i][j] == 1.0: | ||
transform_matrix[i][j] = float('inf') | ||
is_visited = np.zeros_like(transform_matrix, dtype=bool) | ||
is_visited[src[0], src[1]] = True | ||
traversal_queue = [src] | ||
calculated = [(src[0]-1)*ncols + src[1]] | ||
|
||
def is_valid_neighbor(i, j): | ||
return ni >= 0 and ni < nrows and nj >= 0 and nj < ncols \ | ||
and not gridmap[ni][nj] | ||
|
||
while traversal_queue != []: | ||
i, j = traversal_queue.pop(0) | ||
for k, inc in enumerate(inc_order): | ||
ni = i + inc[0] | ||
nj = j + inc[1] | ||
if is_valid_neighbor(ni, nj): | ||
is_visited[i][j] = True | ||
|
||
# update transform_matrix | ||
transform_matrix[i][j] = min( | ||
transform_matrix[i][j], | ||
transform_matrix[ni][nj] + cost[k] + alpha*eT[ni][nj]) | ||
|
||
if not is_visited[ni][nj] \ | ||
and ((ni-1)*ncols + nj) not in calculated: | ||
traversal_queue.append((ni, nj)) | ||
calculated.append((ni-1)*ncols + nj) | ||
|
||
return transform_matrix | ||
|
||
|
||
def wavefront(transform_matrix, start, goal): | ||
"""wavefront | ||
performing wavefront coverage path planning | ||
:param transform_matrix: the transform matrix | ||
:param start: start point of planning | ||
:param goal: goal point of planning | ||
""" | ||
|
||
path = [] | ||
nrows, ncols = transform_matrix.shape | ||
|
||
def get_search_order_increment(start, goal): | ||
if start[0] >= goal[0] and start[1] >= goal[1]: | ||
order = [[1, 0], [0, 1], [-1, 0], [0, -1], | ||
[1, 1], [1, -1], [-1, 1], [-1, -1]] | ||
elif start[0] <= goal[0] and start[1] >= goal[1]: | ||
order = [[-1, 0], [0, 1], [1, 0], [0, -1], | ||
[-1, 1], [-1, -1], [1, 1], [1, -1]] | ||
elif start[0] >= goal[0] and start[1] <= goal[1]: | ||
order = [[1, 0], [0, -1], [-1, 0], [0, 1], | ||
[1, -1], [-1, -1], [1, 1], [-1, 1]] | ||
elif start[0] <= goal[0] and start[1] <= goal[1]: | ||
order = [[-1, 0], [0, -1], [0, 1], [1, 0], | ||
[-1, -1], [-1, 1], [1, -1], [1, 1]] | ||
else: | ||
sys.exit('get_search_order_increment: cannot determine \ | ||
start=>goal increment order') | ||
return order | ||
|
||
def is_valid_neighbor(i, j): | ||
is_i_valid_bounded = i >= 0 and i < nrows | ||
is_j_valid_bounded = j >= 0 and j < ncols | ||
if is_i_valid_bounded and is_j_valid_bounded: | ||
return not is_visited[i][j] and \ | ||
transform_matrix[i][j] != float('inf') | ||
return False | ||
|
||
inc_order = get_search_order_increment(start, goal) | ||
|
||
current_node = start | ||
is_visited = np.zeros_like(transform_matrix, dtype=bool) | ||
|
||
while current_node != goal: | ||
i, j = current_node | ||
path.append((i, j)) | ||
is_visited[i][j] = True | ||
|
||
max_T = float('-inf') | ||
i_max = (-1, -1) | ||
i_last = 0 | ||
for i_last in range(len(path)): | ||
current_node = path[-1 - i_last] # get lastest node in path | ||
for ci, cj in inc_order: | ||
ni, nj = current_node[0] + ci, current_node[1] + cj | ||
if is_valid_neighbor(ni, nj) and \ | ||
transform_matrix[ni][nj] > max_T: | ||
i_max = (ni, nj) | ||
max_T = transform_matrix[ni][nj] | ||
|
||
if i_max != (-1, -1): | ||
break | ||
|
||
if i_max == (-1, -1): | ||
break | ||
else: | ||
current_node = i_max | ||
if i_last != 0: | ||
print('backtracing to', current_node) | ||
path.append(goal) | ||
|
||
return path | ||
|
||
|
||
def visualize_path(grid_map, start, goal, path, resolution): | ||
oy, ox = start | ||
gy, gx = goal | ||
px, py = np.transpose(np.flipud(np.fliplr((path)))) | ||
|
||
if not do_animation: | ||
plt.imshow(grid_map, cmap='Greys') | ||
plt.plot(ox, oy, "-xy") | ||
plt.plot(px, py, "-r") | ||
plt.plot(gx, gy, "-pg") | ||
plt.show() | ||
else: | ||
for ipx, ipy in zip(px, py): | ||
plt.cla() | ||
# for stopping simulation with the esc key. | ||
plt.gcf().canvas.mpl_connect( | ||
'key_release_event', | ||
lambda event: [exit(0) if event.key == 'escape' else None]) | ||
plt.imshow(grid_map, cmap='Greys') | ||
plt.plot(ox, oy, "-xb") | ||
plt.plot(px, py, "-r") | ||
plt.plot(gx, gy, "-pg") | ||
plt.plot(ipx, ipy, "or") | ||
plt.axis("equal") | ||
plt.grid(True) | ||
plt.pause(0.1) | ||
|
||
|
||
def main(): | ||
dir_path = os.path.dirname(os.path.realpath(__file__)) | ||
img = plt.imread(os.path.join(dir_path, 'map', 'test.png')) | ||
img = 1 - img # revert pixel values | ||
|
||
start = (43, 0) | ||
goal = (0, 0) | ||
|
||
# distance transform wavefront | ||
DT = transform(img, goal, transform_type='distance') | ||
DT_path = wavefront(DT, start, goal) | ||
visualize_path(img, start, goal, DT_path, 1) | ||
|
||
# path transform wavefront | ||
PT = transform(img, goal, transform_type='path', alpha=0.01) | ||
PT_path = wavefront(PT, start, goal) | ||
visualize_path(img, start, goal, PT_path, 1) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import os | ||
import sys | ||
import matplotlib.pyplot as plt | ||
from unittest import TestCase | ||
|
||
sys.path.append(os.path.dirname( | ||
os.path.abspath(__file__)) + "/../PathPlanning/WavefrontCPP") | ||
try: | ||
import wavefront_coverage_path_planner | ||
except ImportError: | ||
raise | ||
|
||
wavefront_coverage_path_planner.do_animation = False | ||
|
||
|
||
class TestPlanning(TestCase): | ||
def wavefront_cpp(self, img, start, goal): | ||
num_free = 0 | ||
for i in range(img.shape[0]): | ||
for j in range(img.shape[1]): | ||
num_free += 1 - img[i][j] | ||
|
||
DT = wavefront_coverage_path_planner.transform( | ||
img, goal, transform_type='distance') | ||
DT_path = wavefront_coverage_path_planner.wavefront(DT, start, goal) | ||
self.assertEqual(len(DT_path), num_free) # assert complete coverage | ||
|
||
PT = wavefront_coverage_path_planner.transform( | ||
img, goal, transform_type='path', alpha=0.01) | ||
PT_path = wavefront_coverage_path_planner.wavefront(PT, start, goal) | ||
self.assertEqual(len(PT_path), num_free) # assert complete coverage | ||
|
||
def test_wavefront_CPP_1(self): | ||
img_dir = os.path.dirname( | ||
os.path.abspath(__file__)) + "/../PathPlanning/WavefrontCPP" | ||
img = plt.imread(os.path.join(img_dir, 'map', 'test.png')) | ||
img = 1 - img | ||
|
||
start = (43, 0) | ||
goal = (0, 0) | ||
|
||
self.wavefront_cpp(img, start, goal) | ||
|
||
def test_wavefront_CPP_2(self): | ||
img_dir = os.path.dirname( | ||
os.path.abspath(__file__)) + "/../PathPlanning/WavefrontCPP" | ||
img = plt.imread(os.path.join(img_dir, 'map', 'test_2.png')) | ||
img = 1 - img | ||
|
||
start = (10, 0) | ||
goal = (10, 40) | ||
|
||
self.wavefront_cpp(img, start, goal) | ||
|
||
def test_wavefront_CPP_3(self): | ||
img_dir = os.path.dirname( | ||
os.path.abspath(__file__)) + "/../PathPlanning/WavefrontCPP" | ||
img = plt.imread(os.path.join(img_dir, 'map', 'test_3.png')) | ||
img = 1 - img | ||
|
||
start = (0, 0) | ||
goal = (30, 30) | ||
|
||
self.wavefront_cpp(img, start, goal) |