Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added subscription queued for ROS2 #136

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions examples/ros2/subscriber/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,24 @@ install(
TARGETS rt_ros2_subscriber_complex_data
DESTINATION lib/${PROJECT_NAME}
)

add_executable(rt_ros2_subscriber_queued_data
queued_data.cc
)

target_link_libraries(rt_ros2_subscriber_queued_data
PRIVATE
cactus_rt
)

setup_cactus_rt_target_options(rt_ros2_subscriber_queued_data)

ament_target_dependencies(rt_ros2_subscriber_queued_data
PUBLIC
std_msgs
)

install(
TARGETS rt_ros2_subscriber_queued_data
DESTINATION lib/${PROJECT_NAME}
)
73 changes: 73 additions & 0 deletions examples/ros2/subscriber/queued_data.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#include <cactus_rt/ros2/app.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <memory>
#include <std_msgs/msg/int64.hpp>

using RealtimeType = std_msgs::msg::Int64;
using RosType = std_msgs::msg::Int64;

class RTROS2SubscriberThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t run_duration_;

// We force turn off the trivial data check, because the ROS message data type
// has a defined constructor with code in it. That said, the code really is
// pretty trivial so it is safe to use in real-time. We thus disable the trivial
// type check manually.
std::shared_ptr<cactus_rt::ros2::SubscriptionQueued<RealtimeType, RosType, false>> subscription_;

static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

return thread_config;
}

public:
explicit RTROS2SubscriberThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30))
: cactus_rt::CyclicThread("RTROS2Subscriber", CreateThreadConfig()),
run_duration_(run_duration.count()) {}

void InitializeForRos2() override {
subscription_ = ros2_adapter_->CreateSubscriptionQueued<RealtimeType, RosType, false>("/cactus_rt/simple", rclcpp::QoS(100));
}

protected:
LoopControl Loop(int64_t elapsed_ns) noexcept override {
while (true) {
auto msg = subscription_->Read();
if (!msg) {
break;
}

LOG_INFO(Logger(), "Got new data at {}: {} {}", elapsed_ns, msg->id, msg->value.data);
}

return elapsed_ns > run_duration_ ? LoopControl::Stop : LoopControl::Continue;
}
};

int main(int argc, const char* argv[]) {
cactus_rt::ros2::App app(argc, argv, "QueuedDataROS2Subscriber");

constexpr std::chrono::seconds time(30);
std::cout << "Testing RT loop for " << time.count() << " seconds.\n";

auto thread = app.CreateROS2EnabledThread<RTROS2SubscriberThread>(time);

app.Start();

std::thread t([&app, &time]() {
std::this_thread::sleep_for(time);
app.RequestStop();
});
t.detach();

app.Join();

std::cout << "Done\n";
return 0;
}
13 changes: 13 additions & 0 deletions include/cactus_rt/ros2/ros2_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ class Ros2Adapter {
return subscriber;
}

template <typename RealtimeT, typename RosT, bool CheckForTrivialRealtimeT = true>
std::shared_ptr<SubscriptionQueued<RealtimeT, RosT, CheckForTrivialRealtimeT>> CreateSubscriptionQueued(
const std::string& topic_name,
const rclcpp::QoS& qos,
size_t rt_queue_size = 1000
) {
auto subscriber = SubscriptionQueued<RealtimeT, RosT, CheckForTrivialRealtimeT>::Create(logger_, *this->ros_node_, topic_name, qos, rt_queue_size);

const std::scoped_lock lock(mut_);
subscriptions_.push_back(subscriber);
return subscriber;
}

private:
void TimerCallback();
void DrainQueues();
Expand Down
70 changes: 70 additions & 0 deletions include/cactus_rt/ros2/subscription.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#ifndef CACTUS_RT_ROS2_SUBSCRIPTION_H_
#define CACTUS_RT_ROS2_SUBSCRIPTION_H_

#include <readerwriterqueue.h>

#include <memory>
#include <optional>
#include <rclcpp/rclcpp.hpp>
#include <type_traits>

Expand Down Expand Up @@ -85,6 +88,73 @@ class SubscriptionLatest : public ISubscription {
return latest_value_.Read();
}
};

template <typename RealtimeT, typename RosT, bool CheckForTrivialRealtimeT = true>
class SubscriptionQueued : public ISubscription {
friend class Ros2Adapter;

static_assert(!CheckForTrivialRealtimeT || std::is_trivial_v<RealtimeT>, "RealtimeT must be a trivial object to be real-time safe");

using NoConversion = std::is_same<RealtimeT, RosT>;
using AdaptedRosType = typename std::conditional_t<NoConversion::value, RosT, rclcpp::TypeAdapter<RealtimeT, RosT>>;

using Queue = moodycamel::ReaderWriterQueue<StampedValue<RealtimeT>>;

quill::Logger* logger_;
typename rclcpp::Subscription<AdaptedRosType>::SharedPtr ros_subscription_;
int64_t current_msg_id_ = 0;
Queue queue_;

void SubscriptionCallback(const RealtimeT& rt_msg) {
current_msg_id_++; // Useful to detect message has changed by the real-time thread.

// A copy directly into the queue.
// TODO: warn that we are losing data?
queue_.try_emplace(current_msg_id_, rt_msg);
}

static std::shared_ptr<SubscriptionQueued<RealtimeT, RosT, CheckForTrivialRealtimeT>> Create(
quill::Logger* logger,
rclcpp::Node& node,
const std::string& topic_name,
const rclcpp::QoS& qos,
const size_t rt_queue_size = 1000
) {
std::shared_ptr<SubscriptionQueued<RealtimeT, RosT, CheckForTrivialRealtimeT>> subscription(
new SubscriptionQueued<RealtimeT, RosT, CheckForTrivialRealtimeT>(
logger,
moodycamel::ReaderWriterQueue<StampedValue<RealtimeT>>(rt_queue_size)
)
);

subscription->ros_subscription_ = node.create_subscription<AdaptedRosType>(
topic_name,
qos,
[subscription](const RealtimeT& rt_msg) {
// TODO: we are capturing the subscription shared_ptr by value. Will this cause a memory leak?
subscription->SubscriptionCallback(rt_msg);
}
);

return subscription;
}

SubscriptionQueued(
quill::Logger* logger,
moodycamel::ReaderWriterQueue<StampedValue<RealtimeT>>&& queue
) : logger_(logger), queue_(std::move(queue)) {}

public:
std::optional<StampedValue<RealtimeT>> Read() noexcept {
StampedValue<RealtimeT> v;
if (queue_.try_dequeue(v)) {
return v;
}

return std::nullopt;
}
};

} // namespace cactus_rt::ros2

#endif
Loading