Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per-queue stats to bessctl #1007

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 162 additions & 77 deletions bessctl/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import traceback
import tempfile
import signal
import collections
from collections import *
import contextlib
import sugar

Expand Down Expand Up @@ -808,7 +808,7 @@ class Foo(Module):
# using the same name twice as-module class or port-driver.
# But the C++ code does not have the restriction on using
# Foo() as *both* module *and* port-driver.
counts = collections.Counter(rsvd.keys())
counts = Counter(rsvd.keys())
counts.update(class_names)
counts.update(driver_names)
dups = [k for k in counts if counts[k] > 1]
Expand Down Expand Up @@ -1453,15 +1453,23 @@ def _show_port(cli, port):
('', speed, link, duplex, autoneg))
stats = cli.bess.get_port_stats(port.name)

cli.fout.write(' Inc/RX ')
cli.fout.write('packets: {:<20,}'.format(stats.inc.packets))
cli.fout.write('bytes: {:<20,}\n'.format(stats.inc.bytes))
cli.fout.write('{:<14} dropped: {:<20,}\n'.format('', stats.inc.dropped))
fmt_hdr = '{:>16} {:>18}{:>24}{:>18}\n'
fmt_body = '{:>16} {:>18,}{:>24,}{:>18,}\n'
cli.fout.write(fmt_hdr.format('', 'Packets', 'Bytes', 'Dropped'))

cli.fout.write(' Out/TX ')
cli.fout.write('packets: {:<20,}'.format(stats.out.packets))
cli.fout.write('bytes: {:<20,}\n'.format(stats.out.bytes))
cli.fout.write('{:<14} dropped: {:<20,}\n'.format('', stats.out.dropped))
cli.fout.write(fmt_body.format('Inc/RX total', stats.inc.packets,
stats.inc.bytes, stats.inc.dropped))
for qid in sorted(stats.inc_queues.keys()):
inc = stats.inc_queues[qid]
cli.fout.write(fmt_body.format('?' if qid == -1 else qid,
inc.packets, inc.bytes, inc.dropped))

cli.fout.write(fmt_body.format('Out/TX total', stats.out.packets,
stats.out.bytes, stats.out.dropped))
for qid in sorted(stats.out_queues.keys()):
out = stats.out_queues[qid]
cli.fout.write(fmt_body.format('?' if qid == -1 else qid,
out.packets, out.bytes, out.dropped))


@cmd('show port', 'Show the status of all ports')
Expand Down Expand Up @@ -1740,27 +1748,46 @@ def monitor_pipeline_bit(cli, opts):
_monitor_pipeline(cli, 'bytes', 'Mbps', graph_args=opts)


PortRate = collections.namedtuple('PortRate',
['inc_packets', 'inc_dropped', 'inc_bytes',
'out_packets', 'out_dropped', 'out_bytes'])

def _monitor_ports(cli, per_queue, port_names=[]):

def _monitor_ports(cli, *ports):
class Stats:
def __init__(self, timestamp,
inc_packets=0, inc_dropped=0, inc_bytes=0,
out_packets=0, out_dropped=0, out_bytes=0):
self.timestamp = timestamp
self.inc_packets = inc_packets
self.inc_dropped = inc_dropped
self.inc_bytes = inc_bytes
self.out_packets = out_packets
self.out_dropped = out_dropped
self.out_bytes = out_bytes

def get_delta(old, new):
sec_diff = new.timestamp - old.timestamp
delta = PortRate(
inc_packets=(new.inc.packets - old.inc.packets) / sec_diff,
inc_dropped=(new.inc.dropped - old.inc.dropped) / sec_diff,
inc_bytes=(new.inc.bytes - old.inc.bytes) / sec_diff,
out_packets=(new.out.packets - old.out.packets) / sec_diff,
out_dropped=(new.out.dropped - old.out.dropped) / sec_diff,
out_bytes=(new.out.bytes - old.out.bytes) / sec_diff)
delta = Stats(
timestamp=new.timestamp,
inc_packets=(new.inc_packets - old.inc_packets) / sec_diff,
inc_dropped=(new.inc_dropped - old.inc_dropped) / sec_diff,
inc_bytes=(new.inc_bytes - old.inc_bytes) / sec_diff,
out_packets=(new.out_packets - old.out_packets) / sec_diff,
out_dropped=(new.out_dropped - old.out_dropped) / sec_diff,
out_bytes=(new.out_bytes - old.out_bytes) / sec_diff)
return delta

def get_total(arr):
total = copy.deepcopy(arr[0])
for stat in arr[1:]:
total.inc_packets += stat.inc_packets
total.inc_dropped += stat.inc_dropped
total.inc_bytes += stat.inc_bytes
total.out_packets += stat.out_packets
total.out_dropped += stat.out_dropped
total.out_bytes += stat.out_bytes
return total

def print_header(timestamp):
cli.fout.write('\n')
cli.fout.write('{:<20}{:>14}{:>10}{:>10} {:>14}{:>10}{:>10}\n'.format(
cli.fout.write('{:<22}{:>14}{:>10}{:>10} {:>14}{:>10}{:>10}\n'.format(
time.strftime('%X') + str(timestamp % 1)[1:8],
'INC Mbps', 'Mpps', 'dropped', 'OUT Mbps', 'Mpps', 'Dropped'))

Expand All @@ -1769,7 +1796,7 @@ def print_header(timestamp):
def print_footer():
cli.fout.write('{}\n'.format('-' * 96))

def print_delta(timestamp, port, delta, csv_f=None):
def print_delta(delta, port=None, qid=None):
# If inc/out_bytes == 0 and inc_packets != 0, it means the
# driver does not account packet bytes.
# Use 0 rather than inaccurate numbers from Ethernet overheads.
Expand All @@ -1783,87 +1810,145 @@ def print_delta(timestamp, port, delta, csv_f=None):
else:
out_mbps = 0.

data = (inc_mbps, delta.inc_packets / 1e6, long(delta.inc_dropped), out_mbps, delta.out_packets / 1e6,
long(delta.out_dropped))
cli.fout.write('{:<20}{:>14.1f}{:>10.3f}{:>10d} {:>14.1f}{:>10.3f}{:>10d}\n'.format(port, *data))
inc = '{:>14.1f}{:>10.3f}{:>10.0f}'.format(inc_mbps, delta.inc_packets / 1e6, delta.inc_dropped)
out = '{:>14.1f}{:>10.3f}{:>10.0f}'.format(out_mbps, delta.out_packets / 1e6, delta.out_dropped)

if port is None:
name = 'Total'
else:
if per_queue:
first = (qid == -1 and port.name in has_unknown) or \
(qid == 0 and port.name not in has_unknown)
name = '{}{}'.format((port.name + ':') if first else ' ' * (len(port.name) + 1),
'?' if qid == -1 else qid)
# NOTE: # of RX queues may not be different from # of TX queues
if qid >= port.num_inc_q:
inc = ' ' * 34
if qid >= port.num_out_q:
out = ' ' * 34
else:
name = '{}/{}'.format(port.name, port.driver)

if (qid != -1) or (port.name in has_unknown):
cli.fout.write('{:<22}{} {}\n'.format(name, inc, out))
#cli.fout.write('{:<22}{:>14.1f}{:>10.3f}{:>10.0f} {:>14.1f}{:>10.3f}{:>10.0f}\n'.format(name, *data))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you remove this commented code?

if csv_f is not None:
csv_f.write('{},{},{}\n'.format(time.strftime('%X'), port, ','.join(map(lambda x: '{:.3f}'.format(x), data))))
data = (inc_mbps, delta.inc_packets / 1e6, delta.inc_dropped,
out_mbps, delta.out_packets / 1e6, delta.out_dropped)
if per_queue:
name = '{}:{}'.format(port.name, '?' if qid == -1 else qid)
csv_f.write('{},{},{}\n'.format(time.strftime('%X'), name, ','.join(map(lambda x: '{:.3f}'.format(x), data))))

def get_total(arr):
total = copy.deepcopy(arr[0])
for stat in arr[1:]:
total.inc.packets += stat.inc.packets
total.inc.dropped += stat.inc.dropped
total.inc.bytes += stat.inc.bytes
total.out.packets += stat.out.packets
total.out.dropped += stat.out.dropped
total.out.bytes += stat.out.bytes
return total
def print_loop():
last = {}
now = {}

def print_loop(csv_f=None):
while True:
time.sleep(1)

last = copy.deepcopy(now)
for port in ports:
now[port] = cli.bess.get_port_stats(port)

print_header(now[port].timestamp)
port_stats = cli.bess.get_port_stats(port.name)
if per_queue:
for i in range(-1, max(port.num_inc_q, port.num_out_q)):
inc = port_stats.inc_queues[i]
out = port_stats.out_queues[i]

if i == -1:
if inc.packets != 0 or out.packets != 0:
has_unknown.add(port.name)

s = Stats(timestamp=port_stats.timestamp)
if i < port.num_inc_q:
s.inc_packets = inc.packets
s.inc_dropped = inc.dropped
s.inc_bytes = inc.bytes
if i < port.num_out_q:
s.out_packets = out.packets
s.out_dropped = out.dropped
s.out_bytes = out.bytes
now[port.name, i] = s
else:
now[port.name] = Stats(
timestamp = port_stats.timestamp,
inc_packets = port_stats.inc.packets,
inc_dropped = port_stats.inc.dropped,
inc_bytes = port_stats.inc.bytes,
out_packets = port_stats.out.packets,
out_dropped = port_stats.out.dropped,
out_bytes = port_stats.out.bytes)

if last != {}:
print_header(port_stats.timestamp)

for port in ports:
if per_queue:
for i in range(-1, max(port.num_inc_q, port.num_out_q)):
print_delta(get_delta(last[port.name, i], now[port.name, i]), port, i)
else:
print_delta(get_delta(last[port.name], now[port.name]), port)

for port in ports:
print_delta(now[port].timestamp, '{}{}'.format(port, drivers[port]),
get_delta(last[port], now[port]), csv_f)
print_footer()

print_footer()
if per_queue or len(ports) > 1:
last_total = get_total(list(last.values()))
now_total = get_total(list(now.values()))
print_delta(get_delta(last_total, now_total))

if len(ports) > 1:
print_delta(now[port].timestamp, 'Total', get_delta(
get_total(list(last.values())),
get_total(list(now.values()))), csv_f)
time.sleep(1)

for port in ports:
last[port] = now[port]

all_ports = sorted(cli.bess.list_ports().ports, key=lambda x: x.name)
drivers = {}
for port in all_ports:
drivers[port.name] = port.driver
all_ports = cli.bess.list_ports().ports

if not ports:
ports = [port.name for port in all_ports]
if not port_names:
ports = all_ports
if not ports:
raise cli.CommandError('No port to monitor')

cli.fout.write('Monitoring ports: {}\n'.format(', '.join(ports)))

last = {}
now = {}

for port in ports:
last[port] = cli.bess.get_port_stats(port)
else:
ports = []
port_map = {}
for port in all_ports:
port_map[port.name] = port
for port_name in port_names:
if port_name not in port_map:
raise cli.CommandError('Port %s does not exist' % port_name)
if port_name not in ports:
ports.append(port_map[port_name])

has_unknown = set()
ports = sorted(ports, key=lambda x: x.name)
cli.fout.write('Monitoring ports: {}\n'.format(', '.join([p.name for p in ports])))

try:
csv_path = os.getenv('CSV', None)
with open(csv_path, 'w') if csv_path is not None else noop() as csv_f:
if csv_f is not None:
if csv_path:
with open(csv_path, 'w') as csv_f:
csv_f.write('{}\n'.format(','.join(
('Timestamp', 'Port', 'Mbps In', 'Mpps In', 'Dropped In', 'Mbps Out', 'Mpps Out', 'Dropped Out'))))
print_loop(csv_f)
print_loop()
else:
csv_f = None
print_loop()
except KeyboardInterrupt:
pass


@cmd('monitor port', 'Monitor the current traffic of all ports')
def monitor_port_all(cli):
_monitor_ports(cli)
_monitor_ports(cli, False)


@cmd('monitor port PORT...', 'Monitor the current traffic of specified ports')
def monitor_port_all(cli, ports):
_monitor_ports(cli, *ports)
_monitor_ports(cli, False, ports)

@cmd('monitor queue', 'Monitor the per-queue stats of all ports')
def monitor_queue(cli):
_monitor_ports(cli, True)

@cmd('monitor queue PORT...', 'Monitor the per-queue stats of specified ports')
def monitor_port_all(cli, ports):
_monitor_ports(cli, True, ports)

TcCounterRate = collections.namedtuple('TcCounterRate',
['count', 'cycles', 'bits', 'packets'])
TcCounterRate = namedtuple('TcCounterRate', ['count', 'cycles', 'bits', 'packets'])


def _monitor_tcs(cli, *tcs):
Expand All @@ -1888,7 +1973,7 @@ def print_header(timestamp, name_len):
def print_footer(name_len):
cli.fout.write('{}\n'.format('-' * (72 + name_len)))

def print_delta(timestamp, tc, delta, name_len, csv_f=None):
def print_delta(tc, delta, name_len, csv_f=None):
if delta.count >= 1:
ppb = delta.packets / delta.count
else:
Expand All @@ -1915,7 +2000,7 @@ def print_loop(csv=None):
print_header(now[tc].timestamp, max_len)

for tc in tcs:
print_delta(now[tc].timestamp, 'W{} {}'.format(wids[tc], tc),
print_delta('W{} {}'.format(wids[tc], tc),
get_delta(last[tc], now[tc]), max_len, csv)

print_footer(max_len)
Expand Down
31 changes: 25 additions & 6 deletions core/bessctl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ class BESSControlImpl final : public BESSControl::Service {

LOG(INFO) << "*** Resuming ***";
resume_all_workers();
VLOG(1) << "*** Resumed ***";
return Status::OK;
}

Expand Down Expand Up @@ -1130,13 +1131,31 @@ class BESSControlImpl final : public BESSControl::Service {

::Port::PortStats stats = it->second->GetPortStats();

response->mutable_inc()->set_packets(stats.inc.packets);
response->mutable_inc()->set_dropped(stats.inc.dropped);
response->mutable_inc()->set_bytes(stats.inc.bytes);
auto *inc = response->mutable_inc();
inc->set_packets(stats.inc.packets);
inc->set_dropped(stats.inc.dropped);
inc->set_bytes(stats.inc.bytes);

response->mutable_out()->set_packets(stats.out.packets);
response->mutable_out()->set_dropped(stats.out.dropped);
response->mutable_out()->set_bytes(stats.out.bytes);
for (const auto &[qid, v] : stats.inc_queues) {
bess::pb::GetPortStatsResponse::Stat inc_queue;
inc_queue.set_packets(v.packets);
inc_queue.set_dropped(v.dropped);
inc_queue.set_bytes(v.bytes);
response->mutable_inc_queues()->insert({qid, inc_queue});
}

auto *out = response->mutable_out();
out->set_packets(stats.out.packets);
out->set_dropped(stats.out.dropped);
out->set_bytes(stats.out.bytes);

for (const auto &[qid, v] : stats.out_queues) {
bess::pb::GetPortStatsResponse::Stat out_queue;
out_queue.set_packets(v.packets);
out_queue.set_dropped(v.dropped);
out_queue.set_bytes(v.bytes);
response->mutable_out_queues()->insert({qid, out_queue});
}

response->set_timestamp(get_epoch_time());

Expand Down
Loading