diff --git a/rclcpp_action/src/server.cpp b/rclcpp_action/src/server.cpp index fefc02d6ad..9b51142277 100644 --- a/rclcpp_action/src/server.cpp +++ b/rclcpp_action/src/server.cpp @@ -31,6 +31,8 @@ #include "rclcpp/exceptions.hpp" #include "rclcpp_action/server.hpp" +#include + using rclcpp_action::ServerBase; using rclcpp_action::GoalUUID; @@ -81,6 +83,159 @@ class ServerBaseImpl { } + std::recursive_mutex expire_thread_reentrant_mutex_; + std::thread expire_thread; + + std::function expire_ready_callback; + // must be a class member, so keep it in scope + std::function expire_reset_callback; + rclcpp::Clock::SharedPtr expire_clock; + + void stop_expire_thread() + { + std::lock_guard lock(expire_thread_reentrant_mutex_); + expire_ready_callback = std::function(); + if(expire_clock) + { + expire_clock->cancel_sleep_or_wait(); + } + expire_thread.join(); + expire_reset_callback = std::function(); + expire_clock.reset(); + } + + void start_expire_thread(std::function expire_ready_cb) + { + { + std::lock_guard lock(expire_thread_reentrant_mutex_); + if(expire_clock) + { + return; + } + + expire_ready_callback = expire_ready_cb; + } + + RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "starting expire thread"); + + rclcpp::Context::SharedPtr context = rclcpp::contexts::get_global_default_context(); + std::shared_ptr rcl_context = context->get_rcl_context(); + + rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); + rcl_ret_t ret = + rcl_wait_set_init(&wait_set, num_subscriptions_, num_guard_conditions_, num_timers_, num_clients_, num_services_, 0, rcl_context.get(), rcl_get_default_allocator()); + if(ret != RCL_RET_OK) + { + throw std::runtime_error("Internal error starting timer thread"); + } + + ret = rcl_action_wait_set_add_action_server( + &wait_set, action_server_.get(), NULL); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "ServerBase::add_to_wait_set() failed"); + } + + // extract the timer from the wait set + const rcl_timer_t *expire_timer = wait_set.timers[0]; + + //don't need the wait set any more + ret = rcl_wait_set_fini(&wait_set); + + rcl_clock_t expire_clock_rcl; + rcl_clock_t *expire_clock_rcl_ptr = &expire_clock_rcl; + + ret = rcl_timer_clock(expire_timer, &expire_clock_rcl_ptr); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to gather expire clock failed"); + } + + expire_clock = std::make_shared(expire_clock_rcl_ptr->type); + + expire_reset_callback = [this] (size_t /*reset_calls*/) + { + expire_clock->cancel_sleep_or_wait(); + RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "expire timer was reset"); + }; + + rcl_event_callback_t rcl_reset_callback = rclcpp::detail::cpp_callback_trampoline< + decltype(expire_reset_callback), const void *, size_t>; + + ret = rcl_timer_set_on_reset_callback(expire_timer, rcl_reset_callback, static_cast(&expire_reset_callback)); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to set timer on reset callback"); + } + + expire_thread = std::thread([this, expire_timer]() { + + std::function cb_copy = expire_ready_callback; + + while(true) + { + { + std::lock_guard lock(expire_thread_reentrant_mutex_); + if(!expire_ready_callback) + { + break; + } + } + + int64_t time_until_call = 0; + bool canceled = false; + + auto ret = rcl_timer_get_time_until_next_call(expire_timer, &time_until_call); + if (ret == RCL_RET_TIMER_CANCELED) { + canceled = true; + } + + // note, event though we drop the lock here, there is no race, + // as expire_clock->cancel_sleep_or_wait buffers the cancel + if(canceled) + { + // no inactive goal, sleep for a long time +// RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "expire timer is canceled, sleeping long time"); + expire_clock->sleep_for(std::chrono::hours(100)); + } + else + { + if(time_until_call > 0) + { +// RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "expire timer is active, sleeping short time %+" PRId64, time_until_call); + bool sleep_worked = expire_clock->sleep_for(std::chrono::nanoseconds(time_until_call)); +// RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), ("sleep done " + std::to_string(sleep_worked)).c_str()); + } + else + { +// RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "Timer is ready, skipping sleep"); + } + } + + bool is_ready = false; + if(rcl_timer_is_ready(expire_timer, &is_ready) == RCL_RET_OK && is_ready) + { + bool triggered = false; + { +// std::lock_guard lock(expire_thread_reentrant_mutex_); + + // we need to cancel the timer here, to avoid a endless loop + // in case a new goal expires, the timer will be reset. + ret = rcl_timer_cancel(const_cast(expire_timer)); + if( ret != RCL_RET_OK) + { + rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to cancel timer"); + } + + triggered = true; + cb_copy(1, static_cast(ServerBase::EntityType::Expired)); + } + if(triggered) + { +// RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "expire timer triggered"); + } + } + } + }); + } + // Lock for action_server_ std::recursive_mutex action_server_reentrant_mutex_; @@ -785,6 +940,8 @@ ServerBase::set_on_ready_callback(std::function callback) "is not callable."); } + pimpl_->start_expire_thread(callback); + set_callback_to_entity(EntityType::GoalService, callback); set_callback_to_entity(EntityType::ResultService, callback); set_callback_to_entity(EntityType::CancelService, callback); @@ -905,6 +1062,7 @@ ServerBase::clear_on_ready_callback() set_on_ready_callback(EntityType::GoalService, nullptr, nullptr); set_on_ready_callback(EntityType::ResultService, nullptr, nullptr); set_on_ready_callback(EntityType::CancelService, nullptr, nullptr); + pimpl_->stop_expire_thread(); on_ready_callback_set_ = false; }