Skip to content

Commit

Permalink
fix: Update EventManager and task::run_on_core to ignore spurious…
Browse files Browse the repository at this point in the history
… wakeups if they happen (#340)

* fix: Update `EventManager` and `task::run_on_core` to ignore spurious wakeups if they happen
* Added `notified` flag to `SubscriberData` which is set / cleared appropriately and used as predicate to `cv.wait(...)`
* Added `notified` bool used in `run_on_core` to ensure the `cv.wait(...)` uses it as the predicate, ensuring the blocking run on core does not return early

* update to protect shared memory with lock

* fix comment
  • Loading branch information
finger563 authored Nov 19, 2024
1 parent d1e1942 commit 63fb8b3
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
1 change: 1 addition & 0 deletions components/event_manager/include/event_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class EventManager : public espp::BaseComponent {

struct SubscriberData {
std::mutex m;
bool notified = false; // Allows cv to ignore spurious wakeups
std::condition_variable cv;
std::deque<std::vector<uint8_t>> deq;
};
Expand Down
10 changes: 9 additions & 1 deletion components/event_manager/src/event_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ bool EventManager::publish(const std::string &topic, const std::vector<uint8_t>
std::unique_lock<std::mutex> lk(sub_data->m);
// push the data into the queue
sub_data->deq.push_back(data);
// update the notified flag (used to ignore spurious wakeups)
sub_data->notified = true;
}
// notify the task that there is new data in the queue
sub_data->cv.notify_all();
Expand Down Expand Up @@ -174,6 +176,10 @@ bool EventManager::remove_subscriber(const std::string &topic, const std::string
// notify the data (so the subscriber task function can stop waiting on the data cv)
{
std::lock_guard<std::recursive_mutex> lk(data_mutex_);
{
std::unique_lock<std::mutex> lk(subscriber_data_[topic].m);
subscriber_data_[topic].notified = true;
}
subscriber_data_[topic].cv.notify_all();
}
{
Expand Down Expand Up @@ -210,7 +216,7 @@ bool EventManager::subscriber_task_fn(const std::string &topic, std::mutex &m,
{
// wait on sub_data's mutex/cv
std::unique_lock<std::mutex> lk(sub_data->m);
sub_data->cv.wait(lk);
sub_data->cv.wait(lk, [&sub_data] { return sub_data->notified; });
if (sub_data->deq.empty()) {
// stop the task, we were notified, but there was no data available.
return true;
Expand All @@ -224,6 +230,8 @@ bool EventManager::subscriber_task_fn(const std::string &topic, std::mutex &m,
{
std::unique_lock<std::mutex> lk(sub_data->m);
if (sub_data->deq.empty()) {
// reset the notified flag
sub_data->notified = false;
// we've gotten all the data, so break out of the loop
break;
}
Expand Down
11 changes: 7 additions & 4 deletions components/task/include/run_on_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20
// If the core id is larger than the number of cores, run on the last core
core_id = configNUM_CORES - 1;
}
bool notified = false;
std::mutex mutex;
std::unique_lock lock(mutex); // cppcheck-suppress localMutex
std::condition_variable cv; ///< Signal for when the task is done / function is run
Expand All @@ -43,13 +44,14 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20
decltype(f()) ret_val;
auto f_task = espp::Task::make_unique(espp::Task::Config{
.name = name,
.callback = [&mutex, &cv, &f, &ret_val](auto &cb_m, auto &cb_cv) -> bool {
.callback = [&mutex, &cv, &f, &ret_val, &notified](auto &cb_m, auto &cb_cv) -> bool {
// synchronize with the main thread - block here until the main thread
// waits on the condition variable (cv), then run the function
std::unique_lock lock(mutex);
// run the function
ret_val = f();
// signal that the task is done
notified = true;
cv.notify_all();
return true; // stop the task
},
Expand All @@ -58,19 +60,20 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20
.core_id = core_id,
});
f_task->start();
cv.wait(lock);
cv.wait(lock, [&notified] { return notified; });
return ret_val;
} else {
// the function returns void
auto f_task = espp::Task::make_unique(espp::Task::Config{
.name = name,
.callback = [&mutex, &cv, &f](auto &cb_m, auto &cb_cv) -> bool {
.callback = [&mutex, &cv, &f, &notified](auto &cb_m, auto &cb_cv) -> bool {
// synchronize with the main thread - block here until the main thread
// waits on the condition variable (cv), then run the function
std::unique_lock lock(mutex);
// run the function
f();
// signal that the task is done
notified = true;
cv.notify_all();
return true; // stop the task
},
Expand All @@ -79,7 +82,7 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20
.core_id = core_id,
});
f_task->start();
cv.wait(lock);
cv.wait(lock, [&notified] { return notified; });
}
}
}
Expand Down

0 comments on commit 63fb8b3

Please sign in to comment.