-
Notifications
You must be signed in to change notification settings - Fork 23
/
control_test.py
183 lines (151 loc) · 5.91 KB
/
control_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import time
import ctypes
import os
import glob
import numpy as np
from mininet.topo import Topo
from mininet.net import Mininet
from mininet.log import setLogLevel
from mininet.node import RemoteController
from multiprocessing import RawArray
from dc_gym.control.iroko_bw_control import BandwidthController
import dc_gym.utils as dc_utils
# configure logging
import logging
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
log = logging.getLogger(__name__)
class Ring(ctypes.Structure):
pass
FILE_DIR = os.path.dirname(os.path.abspath(__file__))
SRC_PORT = 20130
DST_PORT = 20130
PACKET_RX_RING = 5
PACKET_TX_RING = 13
class TestTopo(Topo):
def __init__(self, num_hosts=2):
"The custom BlueBridge topo we use for testing."
# Initialize topology
Topo.__init__(self)
switch = self.addSwitch('s1', failMode="standalone")
for h in range(num_hosts):
host = self.addHost('h%d' % (h),
ip="10.0.0.%d/24" % (h + 1),
mac='00:04:00:00:00:%02x' % h)
self.addLink(host, switch)
def connect_controller(net, host):
controller = RemoteController("c")
net.addController(controller)
# Configure host
net.addLink(controller, host)
# Configure controller
ctrl_iface = "c-eth0"
return ctrl_iface
def launch_ctrl_client(host, capacity):
capacity = capacity
ctrl_cmd = f"{FILE_DIR}/dc_gym/control/go_ctrl "
ctrl_cmd += "-n %s " % host.intfList()[0]
ctrl_cmd += "-c %s " % host.intfList()[1]
ctrl_cmd += "-r %d " % capacity
return dc_utils.start_process(ctrl_cmd, host)
def launch_goben_server(host):
traffic_cmd = f"{FILE_DIR}/dc_gym/goben "
return dc_utils.start_process(traffic_cmd, host)
def launch_goben_client(src_host, dst_host, in_rate):
in_rate = in_rate / 1e6 # Goben operates on mbps
# start the actual client
traffic_cmd = f"{FILE_DIR}/dc_gym/goben --silent "
traffic_cmd += "-totalDuration %s " % "inf" # infinite
traffic_cmd += "-hosts %s " % dst_host.IP()
traffic_cmd += "-maxSpeed %d " % in_rate # mbit
traffic_cmd += "-passiveServer "
traffic_cmd += "-udp "
return dc_utils.start_process(traffic_cmd, src_host)
def init_rate_control(ctrl_iface, rate):
# Initialize the action array shared with the control manager
tx_rate = RawArray(ctypes.c_uint32, 1)
tx_rate = dc_utils.shmem_to_nparray(tx_rate, np.float32)
tx_rate.fill(rate)
bw_proc = BandwidthController({"test": ctrl_iface}, tx_rate, tx_rate, rate)
bw_proc.start()
return tx_rate, bw_proc
def adjust_rate(ctrl_iface, rate=1e6):
log.info(f"Setting rate to {rate}")
bw_lib = ctypes.CDLL(f"{FILE_DIR}/dc_gym/control/libbw_control.so")
bw_lib.init_ring.argtypes = [
ctypes.c_char_p, ctypes.c_ushort, ctypes.c_uint
]
bw_lib.init_ring.restype = ctypes.POINTER(Ring)
bw_lib.send_bw.argtypes = [
ctypes.c_uint32,
ctypes.POINTER(Ring), ctypes.c_ushort
]
bw_lib.send_bw.restype = ctypes.c_int
bw_lib.wait_for_reply.argtypes = [ctypes.POINTER(Ring)]
rx_ring = bw_lib.init_ring(ctrl_iface.encode("ascii"), SRC_PORT,
PACKET_RX_RING)
tx_ring = bw_lib.init_ring(ctrl_iface.encode("ascii"), SRC_PORT,
PACKET_TX_RING)
bw_lib.send_bw(int(rate), tx_ring, DST_PORT)
bw_lib.wait_for_reply(rx_ring)
def record_rate(in_rate, ctrl_rate, sleep, out_dir):
# Convert to human readable format
ctrl_rate = ctrl_rate * (in_rate / 1e3)
in_rate = in_rate / 1e6
log.info(f"Input: {in_rate} Mbps Expected: {ctrl_rate} kbps")
log.info(f"Waiting for {sleep} seconds...")
out_dir = "control_test"
out_file = f"{out_dir}/{in_rate}mbps_in_{ctrl_rate}kbps_expected"
dc_utils.check_dir(out_dir)
ifstat_cmd = "ifstat -b "
ifstat_cmd += "-i s1-eth1 " # interface to listen on
ifstat_cmd += "-q 1 " # measurement interval
ifstat_cmd += "%d " % sleep # measure how long
return dc_utils.exec_process(ifstat_cmd, out_file=out_file)
def generate_ctrl_rates(base_rate):
ctrl_rates = []
ctrl_rates.extend([0.0001, 0.0005, 0.001, 0.005])
ctrl_rates.extend([0.01, 0.05, 0.1, 0.5, 1.0])
return np.array(ctrl_rates)
def summarize(input_dir):
with open(f"{input_dir}/summary.txt", 'w+') as summary_f:
for out_file in glob.glob(f"{input_dir}/*.out"):
file_name = os.path.splitext(out_file)[0]
with open(out_file, 'r') as ifstat_file:
summary_f.write(f"\n########## {file_name} ##########\n")
items = ifstat_file.readlines()
for item in items:
summary_f.write(f"{item}")
def main():
in_rates = [10e6, 100e6, 1000e6]
ctrl_rates = []
out_dir = "control_test"
sleep_per_test = 10 # seconds
setLogLevel("info")
topo = TestTopo(2)
net = Mininet(topo=topo, controller=None)
net.start()
net.ping()
hosts = net.hosts
src_host = hosts[0]
dst_host = hosts[1]
ctrl_iface = connect_controller(net, src_host)
server_proc = launch_goben_server(dst_host)
time.sleep(2)
for in_rate in in_rates:
ctrl_proc = launch_ctrl_client(src_host, in_rate)
tx_rate, bw_proc = init_rate_control(ctrl_iface, in_rate)
time.sleep(0.5)
ctrl_rates = generate_ctrl_rates(in_rate)
client_proc = launch_goben_client(src_host, dst_host, in_rate)
for ctrl_rate in ctrl_rates:
log.info("#############################")
tx_rate[0] = ctrl_rate
dc_utils.start_process("tc qdisc show dev h0-eth0", src_host)
record_rate(in_rate, ctrl_rate, sleep_per_test, out_dir)
log.info("#############################")
dc_utils.kill_processes([ctrl_proc, client_proc, bw_proc])
dc_utils.kill_processes([server_proc])
summarize(out_dir)
net.stop()
if __name__ == '__main__':
main()