Skip to content

Commit

Permalink
Merge pull request #107 from toruseo/develop
Browse files Browse the repository at this point in the history
Add area and vehicle group settings and analysis
  • Loading branch information
toruseo authored Aug 19, 2024
2 parents 54bf92d + fb1d6a6 commit 9d79a3b
Show file tree
Hide file tree
Showing 3 changed files with 481 additions and 12 deletions.
191 changes: 188 additions & 3 deletions tests/test_other_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def test_scenario_write_and_read():
W.adddemand_point2point(0.5, 0.5, 2.5, 2.5, 0, 1800, volume=100)
W.adddemand_area2area(0.5, 0.5, 2, 2.5, 2.5, 2, 0, 1800, volume=100)

W.save_scenario("out/test_grid.pkl")
W.save_scenario("out/test_grid.uxsim_scenario")

W.exec_simulation()
W.analyzer.print_simple_stats()
Expand All @@ -181,7 +181,7 @@ def test_scenario_write_and_read():
random_seed=42
)

W2.load_scenario("out/test_grid.pkl")
W2.load_scenario("out/test_grid.uxsim_scenario")

W2.exec_simulation()

Expand All @@ -192,6 +192,79 @@ def test_scenario_write_and_read():

assert df1["total_travel_time"][0] == df2["total_travel_time"][0]

def test_scenario_write_and_read_areas():
##################################
# Iter 1
##################################

W = World(
name="",
deltan=10,
tmax=4000,
print_mode=1, save_mode=1, show_mode=0,
random_seed=42,
)

n_nodes = 5
imax = n_nodes
jmax = n_nodes
nodes = {}
for i in range(imax):
for j in range(jmax):
nodes[i,j] = W.addNode(f"n{(i,j)}", i, j, flow_capacity=1.6)

links = {}
for i in range(imax):
for j in range(jmax):
if i != imax-1:
links[i,j,i+1,j] = W.addLink(f"l{(i,j,i+1,j)}", nodes[i,j], nodes[i+1,j], length=1000)
if i != 0:
links[i,j,i-1,j] = W.addLink(f"l{(i,j,i-1,j)}", nodes[i,j], nodes[i-1,j], length=1000)
if j != jmax-1:
links[i,j,i,j+1] = W.addLink(f"l{(i,j,i,j+1)}", nodes[i,j], nodes[i,j+1], length=1000)
if j != 0:
links[i,j,i,j-1] = W.addLink(f"l{(i,j,i,j-1)}", nodes[i,j], nodes[i,j-1], length=1000)


areas = {
"areaN": [nodes[0,0], nodes[0, n_nodes-1]],
"areaS": [nodes[n_nodes-1,0], nodes[n_nodes-1, n_nodes-1]],
"areaNW": [nodes[0,0]],
"areaSE": [nodes[n_nodes-1, n_nodes-1]]
}

W.adddemand_areas2areas(areas["areaN"], areas["areaS"], 0, 3000, volume=7000)

W.save_scenario("out/test_area.uxsim_scenario")

W.exec_simulation()
W.analyzer.print_simple_stats()

df1 = W.analyzer.basic_to_pandas()
print(df1)

##################################
# Iter 2
##################################

W2 = World(
name="",
deltan=10,
tmax=4000,
print_mode=1, save_mode=1, show_mode=0,
random_seed=42,
)

W2.load_scenario("out/test_area.uxsim_scenario")

W2.exec_simulation()
W2.analyzer.print_simple_stats()

df2 = W2.analyzer.basic_to_pandas()
print(df2)

assert df1["total_travel_time"][0] == df2["total_travel_time"][0]

def test_k_shortest_path():
W = World(
name="", # Scenario name
Expand Down Expand Up @@ -381,4 +454,116 @@ def test_printtry():
lis = [1,2,3]
printtry(lambda: (lis[0]))
printtry(lambda: (lis[10]))
assert True
assert True

def test_area():
W = World(
name="",
deltan=10,
tmax=4000,
print_mode=1, save_mode=1, show_mode=0,
random_seed=None,
)

n_nodes = 5
imax = n_nodes
jmax = n_nodes
nodes = {}
for i in range(imax):
for j in range(jmax):
nodes[i,j] = W.addNode(f"n{(i,j)}", i, j, flow_capacity=1.6)

links = {}
for i in range(imax):
for j in range(jmax):
if i != imax-1:
links[i,j,i+1,j] = W.addLink(f"l{(i,j,i+1,j)}", nodes[i,j], nodes[i+1,j], length=1000)
if i != 0:
links[i,j,i-1,j] = W.addLink(f"l{(i,j,i-1,j)}", nodes[i,j], nodes[i-1,j], length=1000)
if j != jmax-1:
links[i,j,i,j+1] = W.addLink(f"l{(i,j,i,j+1)}", nodes[i,j], nodes[i,j+1], length=1000)
if j != 0:
links[i,j,i,j-1] = W.addLink(f"l{(i,j,i,j-1)}", nodes[i,j], nodes[i,j-1], length=1000)


areas = {
"areaN": [nodes[0,0], nodes[0, n_nodes-1]],
"areaS": [nodes[n_nodes-1,0], nodes[n_nodes-1, n_nodes-1]],
"areaNW": [nodes[0,0]],
"areaSE": [nodes[n_nodes-1, n_nodes-1]]
}

W.adddemand_areas2areas(areas["areaN"], areas["areaS"], 0, 3000, volume=7000)

W.exec_simulation()
W.analyzer.print_simple_stats()

df = W.analyzer.areas2areas_to_pandas(areas.values(), list(areas.keys()))
print(df)

print(W.analyzer.areas2areas_to_pandas(areas.values()))

assert W.analyzer.trip_all == 7000
assert df["total_trips"][(df["origin_area"] == "areaN") & (df["destination_area"] == "areaS")].values[0] == 7000
assert df["average_free_travel_time"][(df["origin_area"] == "areaN") & (df["destination_area"] == "areaS")].values[0] == 300.0
assert df["average_shortest_distance"][(df["origin_area"] == "areaN") & (df["destination_area"] == "areaS")].values[0] == 6000.0
assert df["total_trips"][(df["origin_area"] == "areaNW") & (df["destination_area"] == "areaSE")].values[0] == 1750
assert df["average_free_travel_time"][(df["origin_area"] == "areaNW") & (df["destination_area"] == "areaSE")].values[0] == 400.0
assert df["average_shortest_distance"][(df["origin_area"] == "areaNW") & (df["destination_area"] == "areaSE")].values[0] == 8000.0

@pytest.mark.flaky(reruns=10)
def test_vehicle_group():
W = World(
name="",
deltan=10,
tmax=4000,
print_mode=1, save_mode=1, show_mode=0,
random_seed=None,
)

n_nodes = 4
imax = n_nodes
jmax = n_nodes
nodes = {}
for i in range(imax):
for j in range(jmax):
nodes[i,j] = W.addNode(f"n{(i,j)}", i, j, flow_capacity=1.6)

links = {}
for i in range(imax):
for j in range(jmax):
if i != imax-1:
links[i,j,i+1,j] = W.addLink(f"l{(i,j,i+1,j)}", nodes[i,j], nodes[i+1,j], length=1000)
if i != 0:
links[i,j,i-1,j] = W.addLink(f"l{(i,j,i-1,j)}", nodes[i,j], nodes[i-1,j], length=1000)
if j != jmax-1:
links[i,j,i,j+1] = W.addLink(f"l{(i,j,i,j+1)}", nodes[i,j], nodes[i,j+1], length=1000)
if j != 0:
links[i,j,i,j-1] = W.addLink(f"l{(i,j,i,j-1)}", nodes[i,j], nodes[i,j-1], length=1000)


areas = {
"areaN": [nodes[0,0], nodes[0, n_nodes-1]],
"areaS": [nodes[n_nodes-1,0], nodes[n_nodes-1, n_nodes-1]],
"areaNW": [nodes[0,0]],
"areaSE": [nodes[n_nodes-1, n_nodes-1]]
}

W.adddemand_areas2areas(areas["areaN"], areas["areaS"], 0, 3000, volume=7000)

W.exec_simulation()
W.analyzer.print_simple_stats()

dt = 500
group_dict = {}
for t in range(0, W.TMAX, dt):
group_dict[f"depart_t={t}"] = [veh for veh in W.VEHICLES.values() if t <= veh.departure_time_in_second < t+dt]

df = W.analyzer.vehicle_groups_to_pandas(list(group_dict.values()), list(group_dict.keys()))
print(df)

assert df["average_travel_time"][df["group"]=="depart_t=0"].values[0] < df["average_travel_time"][df["group"]=="depart_t=1500"].values[0]
assert df["average_delay_ratio"][df["group"]=="depart_t=0"].values[0] < df["average_delay_ratio"][df["group"]=="depart_t=1500"].values[0]
assert df["average_traveled_distance"][df["group"]=="depart_t=0"].values[0] < df["average_traveled_distance"][df["group"]=="depart_t=1500"].values[0]
assert df["average_detour_ratio"][df["group"]=="depart_t=0"].values[0] < df["average_detour_ratio"][df["group"]=="depart_t=1500"].values[0]
assert df["average_speed"][df["group"]=="depart_t=0"].values[0] > df["average_speed"][df["group"]=="depart_t=1500"].values[0]
Loading

0 comments on commit 9d79a3b

Please sign in to comment.