diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index ce6a103ab2..c7d6be7e44 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -415,14 +415,11 @@ EventsExecutor::refresh_current_collection_from_callback_groups() // We could explicitly check for the notify waitable ID when we receive a waitable event // but I think that it's better if the waitable was in the collection and it could be // retrieved in the "standard" way. - // To do it, we need to add the notify waitable as an entry in both the new and - // current collections such that it's neither added or removed. + // To do it, we need to add the notify waitable as an entry in the new collection + // such that it's neither added or removed (it should have already been added + // to the current collection in the constructor) this->add_notify_waitable_to_collection(new_collection.waitables); - // Acquire lock before modifying the current collection - std::lock_guard lock(collection_mutex_); - this->add_notify_waitable_to_collection(current_entities_collection_->waitables); - this->refresh_current_collection(new_collection); } diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index a82b702db5..dfbdbb8f4c 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -807,6 +807,67 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode) } } +// Check that executors are correctly notified while they are spinning +// we notify twice to ensure that the notify waitable is still working +// after the first notification +TYPED_TEST(TestExecutors, notifyTwiceWhileSpinning) +{ + using ExecutorType = TypeParam; + + // Create executor, add the node and start spinning + ExecutorType executor; + executor.add_node(this->node); + std::thread spinner([&]() {executor.spin();}); + + // Wait for executor to be spinning + while (!executor.is_spinning()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + // Create the first subscription while the executor is already spinning + std::atomic sub1_msg_count {0}; + auto sub1 = this->node->template create_subscription( + this->publisher->get_topic_name(), + rclcpp::QoS(10), + [&sub1_msg_count](test_msgs::msg::Empty::ConstSharedPtr) { + sub1_msg_count++; + }); + + // Publish a message and verify it's received + this->publisher->publish(test_msgs::msg::Empty()); + auto start = std::chrono::steady_clock::now(); + while (sub1_msg_count == 0 && (std::chrono::steady_clock::now() - start) < 10s) { + std::this_thread::sleep_for(1ms); + } + EXPECT_EQ(sub1_msg_count, 1u); + + // Create a second subscription while the executor is already spinning + std::atomic sub2_msg_count {0}; + auto sub2 = this->node->template create_subscription( + this->publisher->get_topic_name(), + rclcpp::QoS(10), + [&sub2_msg_count](test_msgs::msg::Empty::ConstSharedPtr) { + sub2_msg_count++; + }); + + // Publish a message and verify it's received by both subscriptions + this->publisher->publish(test_msgs::msg::Empty()); + start = std::chrono::steady_clock::now(); + while ( + sub1_msg_count == 1 && + sub2_msg_count == 0 && + (std::chrono::steady_clock::now() - start) < 10s) + { + std::this_thread::sleep_for(1ms); + } + EXPECT_EQ(sub1_msg_count, 2u); + EXPECT_EQ(sub2_msg_count, 1u); + + // Cancel needs to be called before join, so that executor.spin() returns. + executor.cancel(); + spinner.join(); +} + // Check spin_until_future_complete with node base pointer (instantiates its own executor) TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) {