Skip to content

Commit

Permalink
hbt/bperf: Let per thread reader handle lead exit event. (#317)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #317

When the leader process exits, the per thread readers will not be able to
read data. This is by design. The readers need to detect such event and
renable the reader after the leader process comes back.

An enabled_ flag is added to the reader. We need a heuristic algorithm to
detect when the leader exits. Currently, the algorithm is simple: if we
read same value for the first hardware counter twice, the leader is most
likely gone. We may need to revisit this later.

A unit test is added to cover this. We also need to adjust close()
function of BPerfEventsGroup to make it easier for the reader to detect
leader exit events.

Reviewed By: Alston-Tang

Differential Revision: D64486001

fbshipit-source-id: 1226e2109526e329cd14561b4faa1fb541181b75
  • Loading branch information
liu-song-6 authored and facebook-github-bot committed Oct 18, 2024
1 parent 7761caa commit bc3362d
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 8 deletions.
10 changes: 6 additions & 4 deletions hbt/src/perf_event/BPerfEventsGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,16 @@ void BPerfEventsGroup::close() {
cgroup_output_fd_ = -1;
::close(global_output_fd_);
global_output_fd_ = -1;
::bpf_link__destroy(register_thread_link_);
register_thread_link_ = nullptr;
::bpf_link__destroy(unregister_thread_link_);
unregister_thread_link_ = nullptr;
for (auto& fd : pe_fds_) {
::close(fd);
fd = -1;
}
// Close the perf event fds before destroying the links for per thread
// monitoring. This will help the reader detect the lead has exited.
::bpf_link__destroy(register_thread_link_);
register_thread_link_ = nullptr;
::bpf_link__destroy(unregister_thread_link_);
unregister_thread_link_ = nullptr;
::bpf_link__destroy(pmu_enable_exit_link_);
pmu_enable_exit_link_ = nullptr;
opened_ = false;
Expand Down
30 changes: 30 additions & 0 deletions hbt/src/perf_event/BPerfPerThreadReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ int BPerfPerThreadReader::enable() {
struct bperf_thread_metadata* metadata;
struct BPerfThreadData data;

if (enabled_) {
return 0;
}

idx_fd =
::bpf_obj_get(BPerfEventsGroup::perThreadIndexMapPath(pin_name_).c_str());
data_fd_ =
Expand Down Expand Up @@ -127,6 +131,7 @@ int BPerfPerThreadReader::enable() {
// drift at the moment and use it to fix future readings.
read(&data);
initial_clock_drift_ = getRefMonoTime() - data.monoTime;
enabled_ = true;
return 0;

error:
Expand All @@ -136,6 +141,9 @@ int BPerfPerThreadReader::enable() {
}

void BPerfPerThreadReader::disable() {
if (!enabled_) {
return;
}
::munmap(mmap_ptr_, mmap_size_);
mmap_ptr_ = nullptr;
data_ = nullptr;
Expand All @@ -146,6 +154,7 @@ void BPerfPerThreadReader::disable() {
dummy_pe_mmap_ = nullptr;
::close(dummy_pe_fd_);
dummy_pe_fd_ = -1;
enabled_ = false;
}

BPerfPerThreadReader::~BPerfPerThreadReader() {
Expand All @@ -163,6 +172,10 @@ int BPerfPerThreadReader::read(struct BPerfThreadData* data) {
__u32 lock;
int i, idx;

if (!enabled_) {
return -1;
}

do {
lock = data_->lock;
barrier();
Expand Down Expand Up @@ -198,7 +211,24 @@ int BPerfPerThreadReader::read(struct BPerfThreadData* data) {
data->values[i].running += time_after_sched_in;
}
}
if (leadExited(data->values[0].counter)) {
disable();
return -1;
}
return 0;
}

// Heuristic to check whether the lead program has exited.
bool BPerfPerThreadReader::leadExited(__u64 counter_zero) {
bool ret;

if (!enabled_) {
return true;
}

ret = counter_zero == prev_counter_zero_;
prev_counter_zero_ = counter_zero;
return ret;
}

} // namespace facebook::hbt::perf_event
7 changes: 7 additions & 0 deletions hbt/src/perf_event/BPerfPerThreadReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class BPerfPerThreadReader {
int read(struct BPerfThreadData* data);
int enable();
void disable();
bool isEnabled() {
return enabled_;
}

protected:
void* mmap_ptr_ = nullptr;
Expand All @@ -45,6 +48,10 @@ class BPerfPerThreadReader {
int getDataSize_();
int dummy_pe_fd_ = -1;
void* dummy_pe_mmap_ = nullptr;
bool enabled_ = false;
// Previous reading of event 0, used to detect when the lead exits
__u64 prev_counter_zero_;
bool leadExited(__u64 counter_zero);
};

} // namespace facebook::hbt::perf_event
94 changes: 90 additions & 4 deletions hbt/src/perf_event/tests/BPerfEventsGroupPerThreadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,12 @@ void printBPerfThreadData(
GTEST_LOG_(INFO) << name << " running = " << data.values[0].running;
}

void userThread(void) {
auto reader = createReader();
auto pmu_fd = createPerfEvent();
void checkValue(std::shared_ptr<BPerfPerThreadReader> reader) {
struct bpf_perf_event_value beforeValue, afterValue;
struct BPerfThreadData beforeData, afterData;
__u64 monoTimeBefore, monoTimeDiff;
__u64 cpuTimeBefore, cpuTimeDiff;
auto pmu_fd = createPerfEvent();

long workSizes[TESTS] = {1000000, 10000000, 100000000, 1000000000};

Expand Down Expand Up @@ -136,9 +135,57 @@ void userThread(void) {
EXPECT_GE(cpuTimeRatio, 0.95);
EXPECT_LE(cpuTimeRatio, 1.05);
}
close(pmu_fd);
}

void userThread(void) {
auto reader = createReader();

checkValue(reader);

reader->disable();
close(pmu_fd);
}

// The main thread and the leadExitThread uses testStage to communicate
// status of the test. More details below.
static int testStage = 0;

void readerThread(void) {
int ret, i = 0;
auto reader = createReader();
BPerfThreadData d;

checkValue(reader);

// readerThread finished the test, set testStage to 1
testStage = 1;
GTEST_LOG_(INFO) << "testStage = " << testStage;

// Wait for lead to stop
while (testStage != 2) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

// Wait until reader detect lead exited, set testStage 3
while (reader->read(&d) == 0 && i++ < 10) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
testStage = 3;

// Wait until another lead started.
while (testStage != 4) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

ASSERT_FALSE(reader->isEnabled());

// Re-enable the reader
ret = reader->enable();
ASSERT_EQ(ret, 0);
ASSERT_EQ(reader->read(&d), 0);

// Run check again
checkValue(reader);
}

} // namespace
Expand Down Expand Up @@ -168,3 +215,42 @@ TEST(BPerfEventsGroupPerThreadTest, TestCycles) {
threads[i].join();
}
}

TEST(BPerfEventsGroupPerThreadTest, TestLeadExit) {
auto pmu_manager = makePmuDeviceManager();
auto pmu = pmu_manager->findPmuDeviceByName("generic_hardware");
auto ev_def = pmu_manager->findEventDef("cycles");
if (!ev_def) {
GTEST_SKIP() << "Cannot find event cycles";
}
auto ev_conf =
pmu->makeConf(ev_def->id, EventExtraAttr(), EventValueTransforms());

auto system = BPerfEventsGroup(EventConfs({ev_conf}), 0, true, "cycles");
EXPECT_EQ(system.open(), true);

// Start of lead exit test. Set testStage to 0
testStage = 0;
auto t = std::thread(readerThread);

// Wait for the first test is done.
while (testStage != 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

// Disable the lead. Set testStage to 2
system.disable();
testStage = 2;

// Wait until reader notice the lead is gone.
while (testStage != 3) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

// Start another instance of lead, set testStage to 4
auto system2 = BPerfEventsGroup(EventConfs({ev_conf}), 0, true, "cycles");
EXPECT_EQ(system2.open(), true);

testStage = 4;
t.join();
}

0 comments on commit bc3362d

Please sign in to comment.