Skip to content

Commit

Permalink
feat(task/timer): Update Task to use std::variant<> and support new…
Browse files Browse the repository at this point in the history
… callback with notified flag (#341)

* feat(task/timer): Update `Task` to use std::variant<> and support new callback with notified flag
* Update `Task` to have `notified_` flag which is used to signal the callback that it has been notified with the condition variable. This should be used as predicate with cv.wait, cv.wait_for, and cv.wait_until and then should be cleared
* Update `Task` to have additional callback function signature which takes a third `bool&` parameter which is the notification flag for use in the callback function as the predicate for the wait.
* Update `Task` to use `std::variant<>` for callback storage and update interfaces accordingly. Should be mostly backwards-compatible... 🤞
* Update `Timer` to use new `Task` callback to have notification flag and use it as predicate when waiting in the timers internal callback function

* add example for notification; fix bug in start which improperly left notification=true; update task info example to have changing stack usage for better demo

* update readmes

* update interrupt, monitor, and event_manager

* wip fixing affected components

* fix socket and rtsp components

* fix ftp

* fix examples

* update bindings, pc tests, and python tests
  • Loading branch information
finger563 authored Nov 20, 2024
1 parent 63fb8b3 commit 856770b
Show file tree
Hide file tree
Showing 36 changed files with 761 additions and 581 deletions.
12 changes: 6 additions & 6 deletions components/adc/include/continuous_adc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ class ContinuousAdc : public espp::BaseComponent {
init(config.channels);
// and start the task
using namespace std::placeholders;
task_ =
espp::Task::make_unique({.name = "ContinuousAdc Task",
.callback = std::bind(&ContinuousAdc::update_task, this, _1, _2),
.priority = config.task_priority,
.log_level = espp::Logger::Verbosity::WARN});
task_ = espp::Task::make_unique(
{.name = "ContinuousAdc Task",
.callback = std::bind(&ContinuousAdc::update_task, this, _1, _2, _3),
.priority = config.task_priority,
.log_level = espp::Logger::Verbosity::WARN});
task_->start();
}

Expand Down Expand Up @@ -167,7 +167,7 @@ class ContinuousAdc : public espp::BaseComponent {
return static_cast<int>(unit) * 32 + static_cast<int>(channel);
}

bool update_task(std::mutex &task_m, std::condition_variable &task_cv) {
bool update_task(std::mutex &task_m, std::condition_variable &task_cv, bool &task_notified) {
task_handle_ = xTaskGetCurrentTaskHandle();
static auto previous_timestamp = std::chrono::high_resolution_clock::now();
// wait until conversion is ready (will be notified by the registered
Expand Down
7 changes: 4 additions & 3 deletions components/as5600/include/as5600.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class As5600 : public BasePeripheral<> {
}
}

bool update_task(std::mutex &m, std::condition_variable &cv) {
bool update_task(std::mutex &m, std::condition_variable &cv, bool &task_notified) {
auto start = std::chrono::high_resolution_clock::now();
std::error_code ec;
update(ec);
Expand All @@ -222,7 +222,8 @@ class As5600 : public BasePeripheral<> {
}
{
std::unique_lock<std::mutex> lk(m);
cv.wait_until(lk, start + update_period_);
cv.wait_until(lk, start + update_period_, [&task_notified] { return task_notified; });
task_notified = false;
}
// don't want the task to stop
return false;
Expand All @@ -238,7 +239,7 @@ class As5600 : public BasePeripheral<> {
// start the task
using namespace std::placeholders;
task_ = Task::make_unique(
{.name = "As5600", .callback = std::bind(&As5600::update_task, this, _1, _2)});
{.name = "As5600", .callback = std::bind(&As5600::update_task, this, _1, _2, _3)});
task_->start();
}

Expand Down
18 changes: 10 additions & 8 deletions components/bldc_haptics/include/bldc_haptics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ template <MotorConcept M> class BldcHaptics : public BaseComponent {
// set the motion control type to torque
motor_.get().set_motion_control_type(detail::MotionControlType::TORQUE);
// create the motor task
motor_task_ =
Task::make_unique({.name = "haptic_motor",
.callback = std::bind(&BldcHaptics::motor_task, this,
std::placeholders::_1, std::placeholders::_2),
.stack_size_bytes = 1024 * 6,
.log_level = Logger::Verbosity::WARN});
motor_task_ = Task::make_unique(
{.name = "haptic_motor",
.callback = std::bind(&BldcHaptics::motor_task, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3),
.stack_size_bytes = 1024 * 6,
.log_level = Logger::Verbosity::WARN});
}

/// @brief Destructor for the haptic motor
Expand Down Expand Up @@ -275,8 +275,9 @@ template <MotorConcept M> class BldcHaptics : public BaseComponent {
/// @brief Task which runs the haptic motor
/// @param m Mutex to use for the task
/// @param cv Condition variable to use for the task
/// @param task_notified True if the task has been notified, false otherwise
/// @return True if the task should be stopped, false otherwise
bool motor_task(std::mutex &m, std::condition_variable &cv) {
bool motor_task(std::mutex &m, std::condition_variable &cv, bool &task_notified) {
auto start_time = std::chrono::steady_clock::now();
// if we are not moving, and we're close to the center (but not exactly at
// the center), slowly move back to the center
Expand Down Expand Up @@ -412,7 +413,8 @@ template <MotorConcept M> class BldcHaptics : public BaseComponent {
{
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lk(m);
cv.wait_until(lk, start_time + 1ms);
cv.wait_until(lk, start_time + 1ms, [&task_notified] { return task_notified; });
task_notified = false;
}

// don't want to stop the task
Expand Down
7 changes: 4 additions & 3 deletions components/display/include/display.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ template <typename Pixel> class Display : public BaseComponent {
// Now start the task for the ui management
using namespace std::placeholders;
task_ = Task::make_unique({
.callback = std::bind(&Display::update, this, _1, _2),
.callback = std::bind(&Display::update, this, _1, _2, _3),
.task_config = task_config,
});
task_->start();
Expand All @@ -328,7 +328,7 @@ template <typename Pixel> class Display : public BaseComponent {
* than the task running lv_task_handler(). For more info, see
* https://docs.lvgl.io/latest/en/html/porting/tick.html
*/
bool update(std::mutex &m, std::condition_variable &cv) {
bool update(std::mutex &m, std::condition_variable &cv, bool &task_notified) {
auto now = std::chrono::high_resolution_clock::now();
static auto prev = now;
if (!paused_) {
Expand All @@ -342,7 +342,8 @@ template <typename Pixel> class Display : public BaseComponent {
{
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, update_period_);
cv.wait_for(lk, update_period_, [&task_notified] { return task_notified; });
task_notified = false;
}
// don't want to stop the task
return false;
Expand Down
9 changes: 5 additions & 4 deletions components/display_drivers/example/main/gui.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Gui {
// now start the gui updater task
using namespace std::placeholders;
task_ = espp::Task::make_unique({.name = "Gui Task",
.callback = std::bind(&Gui::update, this, _1, _2),
.callback = std::bind(&Gui::update, this, _1, _2, _3),
.stack_size_bytes = 6 * 1024});
task_->start();
}
Expand Down Expand Up @@ -66,15 +66,16 @@ class Gui {
lv_obj_add_style(meter_, &style_indic, LV_PART_INDICATOR);
}

bool update(std::mutex &m, std::condition_variable &cv) {
bool update(std::mutex &m, std::condition_variable &cv, bool &task_notified) {
{
std::scoped_lock<std::recursive_mutex> lk(mutex_);
lv_task_handler();
}
{
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, 16ms);
cv.wait_for(lk, 16ms, [&task_notified] { return task_notified; });
task_notified = false;
}
// don't want to stop the task
return false;
Expand All @@ -88,4 +89,4 @@ class Gui {
std::unique_ptr<espp::Task> task_;
espp::Logger logger_;
std::recursive_mutex mutex_;
};
};
2 changes: 1 addition & 1 deletion components/esp-box/include/esp-box.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class EspBox : public BaseComponent {
bool update_gt911();
bool update_tt21100();
void update_volume_output();
bool audio_task_callback(std::mutex &m, std::condition_variable &cv);
bool audio_task_callback(std::mutex &m, std::condition_variable &cv, bool &task_notified);
void lcd_wait_lines();

// box 3:
Expand Down
7 changes: 4 additions & 3 deletions components/esp-box/src/esp-box.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -618,10 +618,10 @@ bool EspBox::initialize_sound(uint32_t default_audio_rate) {
gpio_set_direction(sound_power_pin, GPIO_MODE_OUTPUT);
enable_sound(true);

using namespace std::placeholders;
audio_task_ = std::make_unique<espp::Task>(espp::Task::Config{
.name = "audio task",
.callback = std::bind(&EspBox::audio_task_callback, this, std::placeholders::_1,
std::placeholders::_2),
.callback = std::bind(&EspBox::audio_task_callback, this, _1, _2, _3),
.stack_size_bytes = 1024 * 4,
.priority = 19,
.core_id = 1,
Expand All @@ -634,7 +634,8 @@ bool EspBox::initialize_sound(uint32_t default_audio_rate) {

void EspBox::enable_sound(bool enable) { gpio_set_level(sound_power_pin, enable); }

bool IRAM_ATTR EspBox::audio_task_callback(std::mutex &m, std::condition_variable &cv) {
bool IRAM_ATTR EspBox::audio_task_callback(std::mutex &m, std::condition_variable &cv,
bool &task_notified) {
// Queue the next I2S out frame to write
uint16_t available = xStreamBufferBytesAvailable(audio_tx_stream);
int buffer_size = audio_tx_buffer.size();
Expand Down
2 changes: 1 addition & 1 deletion components/esp32-timer-cam/include/esp32-timer-cam.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class EspTimerCam : public BaseComponent {
protected:
EspTimerCam();
float led_breathe();
bool led_task_callback(std::mutex &m, std::condition_variable &cv);
bool led_task_callback(std::mutex &m, std::condition_variable &cv, bool &task_notified);

// internal i2c (touchscreen, audio codec)
static constexpr auto internal_i2c_port = I2C_NUM_0;
Expand Down
10 changes: 6 additions & 4 deletions components/esp32-timer-cam/src/esp32-timer-cam.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ bool EspTimerCam::initialize_led(float breathing_period) {
.channels = led_channels_,
.duty_resolution = LEDC_TIMER_10_BIT,
});
using namespace std::placeholders;
led_task_ = espp::Task::make_unique(
{.name = "breathe",
.callback = std::bind(&EspTimerCam::led_task_callback, this, std::placeholders::_1,
std::placeholders::_2)});
.callback = std::bind(&EspTimerCam::led_task_callback, this, _1, _2, _3)});
set_led_breathing_period(breathing_period);
return true;
}
Expand Down Expand Up @@ -84,11 +84,13 @@ float EspTimerCam::led_breathe() {
return gaussian_(t);
}

bool EspTimerCam::led_task_callback(std::mutex &m, std::condition_variable &cv) {
bool EspTimerCam::led_task_callback(std::mutex &m, std::condition_variable &cv,
bool &task_notified) {
using namespace std::chrono_literals;
led_->set_duty(led_channels_[0].channel, 100.0f * led_breathe());
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, 10ms);
cv.wait_for(lk, 10ms, [&task_notified] { return task_notified; });
task_notified = false;
return false;
};

Expand Down
3 changes: 2 additions & 1 deletion components/event_manager/include/event_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class EventManager : public espp::BaseComponent {
std::deque<std::vector<uint8_t>> deq;
};

bool subscriber_task_fn(const std::string &topic, std::mutex &m, std::condition_variable &cv);
bool subscriber_task_fn(const std::string &topic, std::mutex &m, std::condition_variable &cv,
bool &task_notified);

std::recursive_mutex events_mutex_;
detail::EventMap events_;
Expand Down
6 changes: 3 additions & 3 deletions components/event_manager/src/event_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ bool EventManager::add_subscriber(const std::string &topic, const std::string &c
logger_.debug("Creating task for topic '{}'", topic);
logger_.debug(" with config: {}", config);
subscriber_tasks_[topic] = Task::make_unique(
{.callback = std::bind(&EventManager::subscriber_task_fn, this, topic, _1, _2),
{.callback = std::bind(&EventManager::subscriber_task_fn, this, topic, _1, _2, _3),
.task_config = config});
// and start it
subscriber_tasks_[topic]->start();
Expand Down Expand Up @@ -177,7 +177,7 @@ bool EventManager::remove_subscriber(const std::string &topic, const std::string
{
std::lock_guard<std::recursive_mutex> lk(data_mutex_);
{
std::unique_lock<std::mutex> lk(subscriber_data_[topic].m);
std::unique_lock<std::mutex> data_lk(subscriber_data_[topic].m);
subscriber_data_[topic].notified = true;
}
subscriber_data_[topic].cv.notify_all();
Expand All @@ -199,7 +199,7 @@ bool EventManager::remove_subscriber(const std::string &topic, const std::string
}

bool EventManager::subscriber_task_fn(const std::string &topic, std::mutex &m,
std::condition_variable &cv) {
std::condition_variable &cv, bool &task_notified) {
// get the data queue
SubscriberData *sub_data;
{
Expand Down
8 changes: 5 additions & 3 deletions components/ftp/include/ftp_client_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class FtpClientSession : public BaseComponent {
using namespace std::placeholders;
task_ = std::make_unique<Task>(Task::Config{
.name = "FtpClientSession",
.callback = std::bind(&FtpClientSession::task_function, this, _1, _2),
.callback = std::bind(&FtpClientSession::task_function, this, _1, _2, _3),
.stack_size_bytes = 1024 * 6,
.log_level = Logger::Verbosity::WARN,
});
Expand Down Expand Up @@ -91,13 +91,15 @@ class FtpClientSession : public BaseComponent {
/// true, which indicates that the task should stop.
/// \param m The mutex to use for waiting on the condition variable.
/// \param cv The condition variable to use for waiting.
/// \param task_notified A flag to indicate if the task was notified.
/// \return True if the task should stop, false otherwise.
bool task_function(std::mutex &m, std::condition_variable &cv) {
bool task_function(std::mutex &m, std::condition_variable &cv, bool &task_notified) {
{
// delay here
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, 1ms);
cv.wait_for(lk, 1ms, [&task_notified] { return task_notified; });
task_notified = false;
}

if (!socket_) {
Expand Down
7 changes: 4 additions & 3 deletions components/ftp/include/ftp_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class FtpServer : public BaseComponent {
using namespace std::placeholders;
accept_task_ = std::make_unique<Task>(Task::Config{
.name = "FtpServer::accept_task",
.callback = std::bind(&FtpServer::accept_task_function, this, _1, _2),
.callback = std::bind(&FtpServer::accept_task_function, this, _1, _2, _3),
.stack_size_bytes = 1024 * 4,
.log_level = Logger::Verbosity::WARN,
});
Expand All @@ -92,15 +92,16 @@ class FtpServer : public BaseComponent {
clients_.clear();
}

bool accept_task_function(std::mutex &m, std::condition_variable &cv) {
bool accept_task_function(std::mutex &m, std::condition_variable &cv, bool &task_notified) {
auto client_ptr = server_.accept();
if (!client_ptr) {
logger_.error("Could not accept connection");
// if we failed to accept that means there are no connections available
// so we should delay a little bit
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, 1ms);
cv.wait_for(lk, 1ms, [&task_notified] { return task_notified; });
task_notified = false;
// don't want to stop the task
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions components/interrupt/include/interrupt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class Interrupt : public BaseComponent {
// now make and start the task
task_ = espp::Task::make_unique({
.callback = std::bind(&Interrupt::task_callback, this, std::placeholders::_1,
std::placeholders::_2),
std::placeholders::_2, std::placeholders::_3),
.task_config = config.task_config,
});
task_->start();
Expand Down Expand Up @@ -260,7 +260,7 @@ class Interrupt : public BaseComponent {
return level == static_cast<int>(active_level);
}

bool task_callback(std::mutex &m, std::condition_variable &cv) {
bool task_callback(std::mutex &m, std::condition_variable &cv, bool &task_notified) {
EventData event_data;
if (xQueueReceive(queue_, &event_data, portMAX_DELAY)) {
if (event_data.gpio_num == -1) {
Expand Down
12 changes: 6 additions & 6 deletions components/monitor/example/main/monitor_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ extern "C" void app_main(void) {
espp::TaskMonitor tm({.period = 500ms});
// create threads
auto start = std::chrono::high_resolution_clock::now();
auto task_fn = [&start](int task_id, auto &, auto &) {
auto task_fn = [&start](int task_id, auto &, auto &, auto &) {
auto now = std::chrono::high_resolution_clock::now();
auto seconds_since_start = std::chrono::duration<float>(now - start).count();
// do some work
Expand All @@ -35,7 +35,7 @@ extern "C" void app_main(void) {
for (size_t i = 0; i < num_tasks; i++) {
std::string task_name = fmt::format("Task {}", i);
auto task = espp::Task::make_unique({.name = task_name,
.callback = std::bind(task_fn, i, _1, _2),
.callback = std::bind(task_fn, i, _1, _2, _3),
.stack_size_bytes = 5 * 1024});
tasks[i] = std::move(task);
tasks[i]->start();
Expand All @@ -49,7 +49,7 @@ extern "C" void app_main(void) {
//! [get_latest_info_vector example]
// create threads
auto start = std::chrono::high_resolution_clock::now();
auto task_fn = [&start](int task_id, auto &, auto &) {
auto task_fn = [&start](int task_id, auto &, auto &, auto &) {
auto now = std::chrono::high_resolution_clock::now();
auto seconds_since_start = std::chrono::duration<float>(now - start).count();
// do some work
Expand All @@ -65,7 +65,7 @@ extern "C" void app_main(void) {
for (size_t i = 0; i < num_tasks; i++) {
std::string task_name = fmt::format("Task {}", i);
auto task = espp::Task::make_unique({.name = task_name,
.callback = std::bind(task_fn, i, _1, _2),
.callback = std::bind(task_fn, i, _1, _2, _3),
.stack_size_bytes = 5 * 1024});
tasks[i] = std::move(task);
tasks[i]->start();
Expand All @@ -83,7 +83,7 @@ extern "C" void app_main(void) {
//! [get_latest_info example]
// create threads
auto start = std::chrono::high_resolution_clock::now();
auto task_fn = [&start](int task_id, auto &, auto &) {
auto task_fn = [&start](int task_id, auto &, auto &, auto &) {
auto now = std::chrono::high_resolution_clock::now();
auto seconds_since_start = std::chrono::duration<float>(now - start).count();
// do some work
Expand All @@ -99,7 +99,7 @@ extern "C" void app_main(void) {
for (size_t i = 0; i < num_tasks; i++) {
std::string task_name = fmt::format("Task {}", i);
auto task = espp::Task::make_unique({.name = task_name,
.callback = std::bind(task_fn, i, _1, _2),
.callback = std::bind(task_fn, i, _1, _2, _3),
.stack_size_bytes = 5 * 1024});
tasks[i] = std::move(task);
tasks[i]->start();
Expand Down
Loading

0 comments on commit 856770b

Please sign in to comment.