Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per-queue stats to bessctl #1007

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
12 changes: 6 additions & 6 deletions core/drivers/pmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,15 +458,15 @@ void PMDPort::CollectStats(bool reset) {
} else {
dir = PACKET_DIR_INC;
for (qid = 0; qid < num_queues[dir]; qid++) {
queue_stats[dir][qid].packets = stats.q_ipackets[qid];
queue_stats[dir][qid].bytes = stats.q_ibytes[qid];
queue_stats[dir][qid].dropped = stats.q_errors[qid];
queue_stats_[dir][qid].packets = stats.q_ipackets[qid];
queue_stats_[dir][qid].bytes = stats.q_ibytes[qid];
queue_stats_[dir][qid].dropped = stats.q_errors[qid];
}

dir = PACKET_DIR_OUT;
for (qid = 0; qid < num_queues[dir]; qid++) {
queue_stats[dir][qid].packets = stats.q_opackets[qid];
queue_stats[dir][qid].bytes = stats.q_obytes[qid];
queue_stats_[dir][qid].packets = stats.q_opackets[qid];
queue_stats_[dir][qid].bytes = stats.q_obytes[qid];
}
}
}
Expand All @@ -479,7 +479,7 @@ int PMDPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {
int PMDPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) {
int sent = rte_eth_tx_burst(dpdk_port_id_, qid,
reinterpret_cast<rte_mbuf **>(pkts), cnt);
auto &stats = queue_stats[PACKET_DIR_OUT][qid];
auto &stats = queue_stats_[PACKET_DIR_OUT][qid];
int dropped = cnt - sent;
stats.dropped += dropped;
stats.requested_hist[cnt]++;
Expand Down
2 changes: 1 addition & 1 deletion core/drivers/pmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class PMDPort final : public Port {
void DeInit() override;

/*!
* Copies rte port statistics into queue_stats datastructure (see port.h).
* Copies rte port statistics into queue_stats_ datastructure (see port.h).
*
* PARAMETERS:
* * bool reset : if true, reset DPDK local statistics and return (do not
Expand Down
26 changes: 11 additions & 15 deletions core/modules/port_inc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,28 +110,24 @@ std::string PortInc::GetDesc() const {

struct task_result PortInc::RunTask(Context *ctx, bess::PacketBatch *batch,
void *arg) {
if (children_overload_ > 0) {
return {.block = true, .packets = 0, .bits = 0};
}

Port *p = port_;

if (!p->conf().admin_up) {
if (!port_->conf().admin_up || children_overload_ > 0) {
return {.block = true, .packets = 0, .bits = 0};
}

const queue_t qid = (queue_t)(uintptr_t)arg;
auto &qstats = port_->queue_stats_[PACKET_DIR_INC][qid];

uint64_t received_bytes = 0;

const int burst = ACCESS_ONCE(burst_);
const int pkt_overhead = 24;

batch->set_cnt(p->RecvPackets(qid, batch->pkts(), burst));
uint32_t cnt = batch->cnt();
p->queue_stats[PACKET_DIR_INC][qid].requested_hist[burst]++;
p->queue_stats[PACKET_DIR_INC][qid].actual_hist[cnt]++;
p->queue_stats[PACKET_DIR_INC][qid].diff_hist[burst - cnt]++;
uint32_t cnt = port_->RecvPackets(qid, batch->pkts(), burst);
batch->set_cnt(cnt);
qstats.requested_hist[burst]++;
qstats.actual_hist[cnt]++;
qstats.diff_hist[burst - cnt]++;

if (cnt == 0) {
return {.block = true, .packets = 0, .bits = 0};
}
Expand All @@ -148,9 +144,9 @@ struct task_result PortInc::RunTask(Context *ctx, bess::PacketBatch *batch,
}
}

if (!(p->GetFlags() & DRIVER_FLAG_SELF_INC_STATS)) {
p->queue_stats[PACKET_DIR_INC][qid].packets += cnt;
p->queue_stats[PACKET_DIR_INC][qid].bytes += received_bytes;
if (!(port_->GetFlags() & DRIVER_FLAG_SELF_INC_STATS)) {
qstats.packets += cnt;
qstats.bytes += received_bytes;
}

RunNextModule(ctx, batch);
Expand Down
25 changes: 11 additions & 14 deletions core/modules/port_out.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,41 +91,38 @@ std::string PortOut::GetDesc() const {
}

static inline int SendBatch(bess::PacketBatch *batch, Port *p, queue_t qid) {
uint64_t sent_bytes = 0;
int sent_pkts = 0;

if (p->conf().admin_up) {
sent_pkts = p->SendPackets(qid, batch->pkts(), batch->cnt());
if (!p->conf().admin_up) {
return 0;
}

if (!(p->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) {
const packet_dir_t dir = PACKET_DIR_OUT;
int sent_pkts = p->SendPackets(qid, batch->pkts(), batch->cnt());

if (!(p->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) {
uint64_t sent_bytes = 0;
for (int i = 0; i < sent_pkts; i++) {
sent_bytes += batch->pkts()[i]->total_len();
}

p->queue_stats[dir][qid].packets += sent_pkts;
p->queue_stats[dir][qid].dropped += (batch->cnt() - sent_pkts);
p->queue_stats[dir][qid].bytes += sent_bytes;
auto &qstats = p->queue_stats_[PACKET_DIR_OUT][qid];
qstats.packets += sent_pkts;
qstats.dropped += (batch->cnt() - sent_pkts);
qstats.bytes += sent_bytes;
}

return sent_pkts;
}

void PortOut::ProcessBatch(Context *ctx, bess::PacketBatch *batch) {
Port *p = port_;

CHECK(worker_queues_[ctx->wid] >= 0);
queue_t qid = worker_queues_[ctx->wid];
int sent_pkts = 0;

if (queue_users_[qid] == 1) {
sent_pkts = SendBatch(batch, p, qid);
sent_pkts = SendBatch(batch, port_, qid);
} else {
mcslock_node_t me;
mcs_lock(&queue_locks_[qid], &me);
sent_pkts = SendBatch(batch, p, qid);
sent_pkts = SendBatch(batch, port_, qid);
mcs_unlock(&queue_locks_[qid], &me);
}

Expand Down
25 changes: 12 additions & 13 deletions core/modules/queue_inc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,28 @@ std::string QueueInc::GetDesc() const {

struct task_result QueueInc::RunTask(Context *ctx, bess::PacketBatch *batch,
void *arg) {
Port *p = port_;

if (!p->conf().admin_up) {
if (!port_->conf().admin_up) {
return {.block = true, .packets = 0, .bits = 0};
}

const queue_t qid = (queue_t)(uintptr_t)arg;

uint64_t received_bytes = 0;
auto &qstats = port_->queue_stats_[PACKET_DIR_INC][qid];

const int burst = ACCESS_ONCE(burst_);
const int pkt_overhead = 24;

batch->set_cnt(p->RecvPackets(qid, batch->pkts(), burst));
uint32_t cnt = batch->cnt();
p->queue_stats[PACKET_DIR_INC][qid].requested_hist[burst]++;
p->queue_stats[PACKET_DIR_INC][qid].actual_hist[cnt]++;
p->queue_stats[PACKET_DIR_INC][qid].diff_hist[burst - cnt]++;
uint32_t cnt = port_->RecvPackets(qid, batch->pkts(), burst);
batch->set_cnt(cnt);
qstats.requested_hist[burst]++;
qstats.actual_hist[cnt]++;
qstats.diff_hist[burst - cnt]++;

if (cnt == 0) {
return {.block = true, .packets = 0, .bits = 0};
}

// NOTE: we cannot skip this step since it might be used by scheduler.
uint64_t received_bytes = 0;
if (prefetch_) {
for (uint32_t i = 0; i < cnt; i++) {
received_bytes += batch->pkts()[i]->total_len();
Expand All @@ -120,9 +119,9 @@ struct task_result QueueInc::RunTask(Context *ctx, bess::PacketBatch *batch,
}
}

if (!(p->GetFlags() & DRIVER_FLAG_SELF_INC_STATS)) {
p->queue_stats[PACKET_DIR_INC][qid].packets += cnt;
p->queue_stats[PACKET_DIR_INC][qid].bytes += received_bytes;
if (!(port_->GetFlags() & DRIVER_FLAG_SELF_INC_STATS)) {
qstats.packets += cnt;
qstats.bytes += received_bytes;
}

RunNextModule(ctx, batch);
Expand Down
21 changes: 8 additions & 13 deletions core/modules/queue_out.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,27 +74,22 @@ std::string QueueOut::GetDesc() const {
}

void QueueOut::ProcessBatch(Context *, bess::PacketBatch *batch) {
Port *p = port_;

const queue_t qid = qid_;

uint64_t sent_bytes = 0;
int sent_pkts = 0;

if (p->conf().admin_up) {
sent_pkts = p->SendPackets(qid, batch->pkts(), batch->cnt());
if (port_->conf().admin_up) {
sent_pkts = port_->SendPackets(qid_, batch->pkts(), batch->cnt());
}

if (!(p->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) {
const packet_dir_t dir = PACKET_DIR_OUT;

if (!(port_->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) {
uint64_t sent_bytes = 0;
for (int i = 0; i < sent_pkts; i++) {
sent_bytes += batch->pkts()[i]->total_len();
}

p->queue_stats[dir][qid].packets += sent_pkts;
p->queue_stats[dir][qid].dropped += (batch->cnt() - sent_pkts);
p->queue_stats[dir][qid].bytes += sent_bytes;
auto &qstats = port_->queue_stats_[PACKET_DIR_OUT][qid_];
qstats.packets += sent_pkts;
qstats.dropped += (batch->cnt() - sent_pkts);
qstats.bytes += sent_bytes;
}

if (sent_pkts < batch->cnt()) {
Expand Down
4 changes: 2 additions & 2 deletions core/port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ Port::PortStats Port::GetPortStats() {
PortStats ret = port_stats_;

for (queue_t qid = 0; qid < num_queues[PACKET_DIR_INC]; qid++) {
const QueueStats &inc = queue_stats[PACKET_DIR_INC][qid];
const QueueStats &inc = queue_stats_[PACKET_DIR_INC][qid];

ret.inc.packets += inc.packets;
ret.inc.dropped += inc.dropped;
Expand All @@ -184,7 +184,7 @@ Port::PortStats Port::GetPortStats() {
}

for (queue_t qid = 0; qid < num_queues[PACKET_DIR_OUT]; qid++) {
const QueueStats &out = queue_stats[PACKET_DIR_OUT][qid];
const QueueStats &out = queue_stats_[PACKET_DIR_OUT][qid];
ret.out.packets += out.packets;
ret.out.dropped += out.dropped;
ret.out.bytes += out.bytes;
Expand Down
15 changes: 8 additions & 7 deletions core/port.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
#include "utils/common.h"
#include "utils/ether.h"

typedef uint8_t queue_t;
using queue_t = uint8_t;

#define MAX_QUEUES_PER_DIR 128 /* [0, 31] (for each RX/TX) */
#define MAX_QUEUES_PER_DIR 128 /* [0, 127] (for each RX/TX) */

#define DRIVER_FLAG_SELF_INC_STATS 0x0001
#define DRIVER_FLAG_SELF_OUT_STATS 0x0002
Expand Down Expand Up @@ -217,15 +217,15 @@ class Port {

// overide this section to create a new driver -----------------------------
Port()
: port_stats_(),
: queue_stats_(),
port_stats_(),
conf_(),
name_(),
driver_arg_(),
port_builder_(),
num_queues(),
queue_size(),
users(),
queue_stats() {
users() {
conf_.mac_addr.Randomize();
conf_.mtu = kDefaultMtu;
conf_.admin_up = true;
Expand Down Expand Up @@ -293,6 +293,9 @@ class Port {

const PortBuilder *port_builder() const { return port_builder_; }

// per-queue stat counters
QueueStats queue_stats_[PACKET_DIRS][MAX_QUEUES_PER_DIR];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally! 🙏


protected:
friend class PortBuilder;

Expand Down Expand Up @@ -330,8 +333,6 @@ class Port {
/* which modules are using this port?
* TODO: more robust gate keeping */
const struct module *users[PACKET_DIRS][MAX_QUEUES_PER_DIR];

struct QueueStats queue_stats[PACKET_DIRS][MAX_QUEUES_PER_DIR];
};

#define ADD_DRIVER(_DRIVER, _NAME_TEMPLATE, _HELP) \
Expand Down