-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel-based.py
335 lines (277 loc) · 10.9 KB
/
model-based.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
import os
import numpy as np
import operator
import seaborn as sns
import matplotlib.pyplot as plt
from itertools import product
from tqdm import tqdm
class actions:
north = 0
east = 1
south = 2
west = 3
@classmethod
def all(cls):
return {cls.north, cls.east, cls.south, cls.west}
def convert(list):
""" Convert [i, j] to (i, j)
Args:
list
Returns:
tuple
"""
return (*list,)
def find_best_move_simple(current_pos, goal, M):
""" Do simple shortest path heuristic using the start cell and the transition matrix. The function looks at the directions which should be moved
from the current position to get to the goal position, and chooses the direction with the highest transition probability.
Afterwards, calculate the expected travel time for the optimal path
Args:
start ([position]): starting position
goal ([position]): goal position
M : transition matrix
Returns:
travel time
"""
trvl_time = 0
steps = 0
while current_pos != goal:
# potential moves
moves_x = goal[0] - current_pos[0]
moves_y = goal[1] - current_pos[1]
# This will be the vector which contains the values of the possible moves we can do. Order: N, E, S, W
possible_moves = np.zeros(len(actions.all()))
# Determining if we move North, South or no vertical movement
if moves_x > 0:
possible_moves[actions.north] = M[
current_pos[0], current_pos[1], [actions.north]
]
possible_moves[actions.south] = 0
elif moves_x < 0:
possible_moves[actions.north] = 0
possible_moves[actions.south] = M[
current_pos[0], current_pos[1], [actions.south]
]
else:
possible_moves[actions.north] = 0
possible_moves[actions.south] = 0
# Determining if we move East, West or no horizontal movement
if moves_y > 0:
possible_moves[actions.east] = M[
current_pos[0], current_pos[1], [actions.east]
]
possible_moves[actions.west] = 0
elif moves_y < 0:
possible_moves[actions.east] = 0
possible_moves[actions.west] = M[
current_pos[0], current_pos[1], [actions.west]
]
else:
possible_moves[actions.east] = 0
possible_moves[actions.west] = 0
# From this vector, we can choose the optimal move by selecting the direction with the highest probability of no congestion - the max value.
if np.argmax(possible_moves) == actions.north:
current_pos[0] += 1
elif np.argmax(possible_moves) == actions.east:
current_pos[1] += 1
elif np.argmax(possible_moves) == actions.south:
current_pos[0] -= 1
else:
current_pos[1] -= 1
steps += 1
# updating the travel time
trvl_time += prob_to_exp_trvl_time(max(possible_moves))
return trvl_time, steps
def dijkstra(start, goal, M):
""" Do Dijkstra's shortest path algorithm given the start cell and the transition matrix.
Afterwards, calculate the expected travel time for the optimal path
Args:
start ([position]): starting position
goal ([position]): goal position
M : transition matrix
Returns:
travel time
"""
visited = {}
visited[start] = 0
unvisited = {}
while len(visited) < 2500:
# update the distances of the neigbours from the visited points
for node in visited:
potential_neigbours = neighbours(node, M)
for potential_neigbour in potential_neigbours:
# check if there is a shorter way to a neigbour already in the unvisited set
if convert(potential_neigbour) in unvisited:
# print("unvisited in loop", unvisited)
# print("new route? ", potential_neigbour, M[node[0], node[1], get_direction(node, potential_neigbour)], prob_to_exp_trvl_time(M[node[0], node[1], get_direction(node, potential_neigbour)]) + visited[node], unvisited[convert(potential_neigbour)])
unvisited[convert(potential_neigbour)] = min(
prob_to_exp_trvl_time(
M[node[0], node[1], get_direction(node, potential_neigbour)]
)
+ visited[node],
unvisited[convert(potential_neigbour)],
)
# print("added: ", potential_neigbour, unvisited[convert(potential_neigbour)])
break
# if neigbour not yet in the visited set, add to unvisited
if convert(potential_neigbour) not in visited:
unvisited[convert(potential_neigbour)] = visited[
node
] + prob_to_exp_trvl_time(
M[node[0], node[1], get_direction(node, potential_neigbour)]
)
# print("added: ", potential_neigbour, visited[node] + prob_to_exp_trvl_time(M[node[0], node[1], get_direction(node, potential_neigbour)]))
# print("unvisited", unvisited)
# from the unvisited set, choose the node with the shortest distance
visited[min(unvisited.items(), key=operator.itemgetter(1))[0]] = min(
unvisited.items(), key=operator.itemgetter(1)
)[1]
del unvisited[min(unvisited.items(), key=operator.itemgetter(1))[0]]
return visited[goal]
def value_iteration(start, goal, M, plot=False):
""" Do value iteration given the start cell and the transition matrix. Afterwards, calculate the expected travel time for the optimal path
Args:
start ([position]): starting position
goal ([position]): goal position
M : transition matrix
Returns:
travel time
"""
size_V = len(M)
V = np.zeros((size_V, size_V))
V_tmp = np.zeros((size_V, size_V))
# how many steps for value iteration - this might be enough when starting in (0, 0) and going to (1, 10). When the whole matrix is used, we might
# have to increase the steps
it = 0
while True:
for i in range(len(V)):
for j in range(len(V)):
V_sum = []
neigbours_list = neighbours([i, j], V)
for neigbour in neigbours_list:
V_sum.append(
r[i, j]
+ gamma
* M[i, j, get_direction([i, j], neigbour)]
* V_tmp[neigbour[0], neigbour[1]]
)
V_tmp[i][j] = max(V_sum)
it += 1
if np.sum(np.abs(V - V_tmp)) < 1e-13:
print("Value iterations iters =", it)
break
V = V_tmp.copy()
if plot:
plt.figure()
sns.heatmap(V, square=True)
plt.title("Heatmap of V matrix")
plt.xlabel("y")
plt.ylabel("x")
os.makedirs("img", exist_ok=True)
plt.savefig("img/V_heatmap")
# Value iteration done, now choose the optimal path through the matrix
travel_time = 0
current_pos = start
steps = 0
for i in range(100):
moves = []
# find the neigbour with the highest V value and go to this place
neigbours_list = neighbours([current_pos[0], current_pos[1]], V)
for neigbour in neigbours_list:
moves.append(V[neigbour[0], neigbour[1]])
# update the travel time
travel_time += prob_to_exp_trvl_time(
M[
current_pos[0],
current_pos[1],
get_direction(
[current_pos[0], current_pos[1]], neigbours_list[np.argmax(moves)]
),
]
)
current_pos = neigbours_list[np.argmax(moves)]
steps += 1
# goal has been reached
if convert(current_pos) == goal:
break
return travel_time, steps
def prob_to_exp_trvl_time(prob):
""" Convert probability to expected travel time. Always +1 for the step that needs to be done
Args:
prob ([float]): probility of transition
Returns:
travel time [int]: expected travel time
"""
return ((1 - prob) * 10) + 1
def get_direction(current, neigbour):
""" This function gives the direction between the current cell and the neigbour. Direction is indicated as integer 0-3. 0 = North, 1 = East, etc
Args:
current ([type]): current cell
neigbour ([type]): neigbour cell
Returns:
direction [int]:
"""
if current[1] - neigbour[1] == 1:
return actions.north
# southern neigbour
elif current[0] - neigbour[0] == -1:
return actions.east
# eastern neigbour
elif current[1] - neigbour[1] == -1:
return actions.south
# western neigbour
return actions.west
def neighbours(cell, V):
""" This function provides all the possible neigbours from the current cell, respecting the boundries of the matrix
Args:
cell ([type]): current position
V ([type]): Matrix for dimensions
Returns:
neigbours: list with all possible neigbours
"""
neigbours = []
if cell[0] != 0:
neigbours.append([cell[0] - 1, cell[1]])
if cell[0] != len(V) - 1:
neigbours.append([cell[0] + 1, cell[1]])
if cell[1] != 0:
neigbours.append([cell[0], cell[1] - 1])
if cell[1] != len(V) - 1:
neigbours.append([cell[0], cell[1] + 1])
return neigbours
if __name__ == "__main__":
goal = [1, 10]
current_pos = [0, 0]
# Transition matrix with 3 dimensions. x,y is grid, z is transitions. dim0 = north, dim1 = east, dim2 = south, dim3 = west
M = np.zeros((50, 50, 4))
r = np.zeros((50, 50))
gamma = 0.99
r[1][10] = 1
if os.path.exists("M.npy"):
M = np.load("M.npy")
else:
# Generate transition matrix with congestion probabilities
for i in range(len(M)):
for j in range(len(M)):
for k in range(M.shape[2]):
M[i][j][k] = np.random.choice(np.arange(0.1, 1.1, 0.1))
np.save("M.npy", M)
# Simple heuristic - take the shortest path to (1,10)
current_pos = [0, 0]
exp_trvl_time_heuristic, steps = find_best_move_simple(current_pos, goal, M)
print(
"Expected travel time for simple heuristic is: \t%d in %d steps"
% (exp_trvl_time_heuristic, steps)
)
# Set of equations, aka Dijkstra
current_pos = (0, 0)
exp_trvl_time_dijkstra = dijkstra(current_pos, convert(goal), M)
print("Expected travel time in Dijkstra is: \t\t%d" % (exp_trvl_time_dijkstra))
# Dynamic Programming, aka value iteration
current_pos = (0, 0)
exp_trvl_time_value_iteration, steps = value_iteration(
current_pos, convert(goal), M, True
)
print(
"Expected travel time in Value Iteration is: \t%d in %d steps"
% (exp_trvl_time_value_iteration, steps)
)