Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixed expiring of goals if events executor is used #2674

Open
wants to merge 1 commit into
base: jazzy
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions rclcpp_action/src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "rclcpp/exceptions.hpp"
#include "rclcpp_action/server.hpp"

#include <inttypes.h>

using rclcpp_action::ServerBase;
using rclcpp_action::GoalUUID;

Expand Down Expand Up @@ -81,6 +83,159 @@ class ServerBaseImpl
{
}

std::recursive_mutex expire_thread_reentrant_mutex_;
std::thread expire_thread;

std::function<void(size_t, int)> expire_ready_callback;
// must be a class member, so keep it in scope
std::function<void(size_t)> expire_reset_callback;
rclcpp::Clock::SharedPtr expire_clock;

void stop_expire_thread()
{
std::lock_guard<std::recursive_mutex> lock(expire_thread_reentrant_mutex_);
expire_ready_callback = std::function<void(size_t, int)>();
if(expire_clock)
{
expire_clock->cancel_sleep_or_wait();
}
expire_thread.join();
expire_reset_callback = std::function<void(size_t)>();
expire_clock.reset();
}

void start_expire_thread(std::function<void(size_t, int)> expire_ready_cb)
{
{
std::lock_guard<std::recursive_mutex> lock(expire_thread_reentrant_mutex_);
if(expire_clock)
{
return;
}

expire_ready_callback = expire_ready_cb;
}

RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "starting expire thread");

rclcpp::Context::SharedPtr context = rclcpp::contexts::get_global_default_context();
std::shared_ptr<rcl_context_t> rcl_context = context->get_rcl_context();

rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
rcl_ret_t ret =
rcl_wait_set_init(&wait_set, num_subscriptions_, num_guard_conditions_, num_timers_, num_clients_, num_services_, 0, rcl_context.get(), rcl_get_default_allocator());
if(ret != RCL_RET_OK)
{
throw std::runtime_error("Internal error starting timer thread");
}

ret = rcl_action_wait_set_add_action_server(
&wait_set, action_server_.get(), NULL);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "ServerBase::add_to_wait_set() failed");
}

// extract the timer from the wait set
const rcl_timer_t *expire_timer = wait_set.timers[0];

//don't need the wait set any more
ret = rcl_wait_set_fini(&wait_set);

rcl_clock_t expire_clock_rcl;
rcl_clock_t *expire_clock_rcl_ptr = &expire_clock_rcl;

ret = rcl_timer_clock(expire_timer, &expire_clock_rcl_ptr);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to gather expire clock failed");
}

expire_clock = std::make_shared<rclcpp::Clock>(expire_clock_rcl_ptr->type);

expire_reset_callback = [this] (size_t /*reset_calls*/)
{
expire_clock->cancel_sleep_or_wait();
RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "expire timer was reset");
};

rcl_event_callback_t rcl_reset_callback = rclcpp::detail::cpp_callback_trampoline<
decltype(expire_reset_callback), const void *, size_t>;

ret = rcl_timer_set_on_reset_callback(expire_timer, rcl_reset_callback, static_cast<const void *>(&expire_reset_callback));
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to set timer on reset callback");
}

expire_thread = std::thread([this, expire_timer]() {

std::function<void(size_t, int)> cb_copy = expire_ready_callback;

while(true)
{
{
std::lock_guard<std::recursive_mutex> lock(expire_thread_reentrant_mutex_);
if(!expire_ready_callback)
{
break;
}
}

int64_t time_until_call = 0;
bool canceled = false;

auto ret = rcl_timer_get_time_until_next_call(expire_timer, &time_until_call);
if (ret == RCL_RET_TIMER_CANCELED) {
canceled = true;
}

// note, event though we drop the lock here, there is no race,
// as expire_clock->cancel_sleep_or_wait buffers the cancel
if(canceled)
{
// no inactive goal, sleep for a long time
// RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "expire timer is canceled, sleeping long time");
expire_clock->sleep_for(std::chrono::hours(100));
}
else
{
if(time_until_call > 0)
{
// RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "expire timer is active, sleeping short time %+" PRId64, time_until_call);
bool sleep_worked = expire_clock->sleep_for(std::chrono::nanoseconds(time_until_call));
// RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), ("sleep done " + std::to_string(sleep_worked)).c_str());
}
else
{
// RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "Timer is ready, skipping sleep");
}
}

bool is_ready = false;
if(rcl_timer_is_ready(expire_timer, &is_ready) == RCL_RET_OK && is_ready)
{
bool triggered = false;
{
// std::lock_guard<std::recursive_mutex> lock(expire_thread_reentrant_mutex_);

// we need to cancel the timer here, to avoid a endless loop
// in case a new goal expires, the timer will be reset.
ret = rcl_timer_cancel(const_cast<rcl_timer_t *>(expire_timer));
if( ret != RCL_RET_OK)
{
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to cancel timer");
}

triggered = true;
cb_copy(1, static_cast<int>(ServerBase::EntityType::Expired));
}
if(triggered)
{
// RCLCPP_INFO(rclcpp::get_logger("rclcpp_action"), "expire timer triggered");
}
}
}
});
}

// Lock for action_server_
std::recursive_mutex action_server_reentrant_mutex_;

Expand Down Expand Up @@ -785,6 +940,8 @@ ServerBase::set_on_ready_callback(std::function<void(size_t, int)> callback)
"is not callable.");
}

pimpl_->start_expire_thread(callback);

set_callback_to_entity(EntityType::GoalService, callback);
set_callback_to_entity(EntityType::ResultService, callback);
set_callback_to_entity(EntityType::CancelService, callback);
Expand Down Expand Up @@ -905,6 +1062,7 @@ ServerBase::clear_on_ready_callback()
set_on_ready_callback(EntityType::GoalService, nullptr, nullptr);
set_on_ready_callback(EntityType::ResultService, nullptr, nullptr);
set_on_ready_callback(EntityType::CancelService, nullptr, nullptr);
pimpl_->stop_expire_thread();
on_ready_callback_set_ = false;
}

Expand Down