Skip to content

Commit

Permalink
Address flakiness in rosbag2_play_end_to_end tests (#1297)
Browse files Browse the repository at this point in the history
* Make test_rosbag2_play_end_to_end more deterministic

- Change QoS depth in test databases to correspond number of messages
- Change QoS durability to transient local in test DB and mcap file
- Explicitly specify QoS depth = 10 for subscribers
- Explicitly specify QoS reliability to reliable for subscribers
- Explicitly specify QoS durability to transient local for subscribers
- Update metadata in test DB and mcap files to the latest version(7)
- Remove xfail test_rosbag2_play_end_to_end

Signed-off-by: Michael Orlov <[email protected]>

* Add wait_for_matched for record_end_to_end_exits_gracefully_on_sigterm

- Remove xfail for test_rosbag2_info_end_to_end

Signed-off-by: Michael Orlov <[email protected]>

* Fix for play_filters_by_topic test

- Uncomment play_filters_by_topic test
- Use proper qos settings for subscribers in `play_filters_by_topic`
and fix expectations about number of published messages.
- Log warning if SubscriptionManager::continue_spinning(..) finished by
timeout.
- Enable `play_end_to_end_test` for windows.

Signed-off-by: Michael Orlov <[email protected]>

* Make test_rosbag2_play_end_to_end deterministic

- Start player in pause mode and wait on subscribers for matched
publishers from player then send resume service call to unpause.
- Add spin_and_wait_for_matched(topic_names) for SubscriptionManager

Signed-off-by: Michael Orlov <[email protected]>

* Remove redundant includes from test_rosbag2_play_end_to_end.cpp

Signed-off-by: Michael Orlov <[email protected]>

* Sleep for a few milliseconds in SubscriptionManager to avoid busy loop

Signed-off-by: Michael Orlov <[email protected]>

* Update rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp

Co-authored-by: Chris Lalancette <[email protected]>
Signed-off-by: Michael Orlov <[email protected]>

Co-authored-by: Chris Lalancette <[email protected]>

* Add missing include<thread> in process_execution_helpers_unix.hpp

Signed-off-by: Michael Orlov <[email protected]>

* Address race condition in process termination routines

- Added wait_until_completion(process_id, timeout) helper function

Signed-off-by: Michael Orlov <[email protected]>

* Move wait_until_completion before stop_execution to fix compilation error

Signed-off-by: Michael Orlov <[email protected]>

* Fix for Windows build error. Rename process_id to handle.

Signed-off-by: Michael Orlov <[email protected]>

* Increase timeout for service call and wait_until_completion up to 10 sec

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
  • Loading branch information
MichaelOrlov and clalancette authored May 12, 2023
1 parent f3bc1af commit af4ca0c
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <signal.h>
#include <stdlib.h>
#include <thread>

#include <chrono>
#include <cstdlib>
Expand Down Expand Up @@ -93,33 +94,63 @@ ProcessHandle start_execution(const std::string & command)
return process_id;
}

void stop_execution(
/// @brief Wait for process to finish with timeout
/// @param process_id Process ID
/// @param timeout Timeout in fraction of seconds
/// @return true if process has finished during timeout and false if timeout was reached and
/// process is still running
bool wait_until_completion(
const ProcessHandle & process_id,
int signum = SIGINT,
std::chrono::seconds timeout = std::chrono::seconds(10))
std::chrono::duration<double> timeout = std::chrono::seconds(10))
{
EXPECT_NE(kill(process_id, signum), -1) << "Failed to send signal " << signum <<
" to process: " << process_id;
int status = EXIT_FAILURE;
pid_t wait_ret_code = 0;
int status = EXIT_FAILURE;
std::chrono::steady_clock::time_point const start = std::chrono::steady_clock::now();
// Wait for process to finish with timeout
while (wait_ret_code == 0 && std::chrono::steady_clock::now() - start < timeout) {
std::this_thread::sleep_for(std::chrono::milliseconds(30));
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// WNOHANG - wait for processes without causing the caller to be blocked
wait_ret_code = waitpid(process_id, &status, WNOHANG);
}
if (wait_ret_code == 0) {
std::cerr << "Testing process " << process_id << " hangout. Killing it with SIGKILL \n";
kill(process_id, SIGKILL);
}
// Make sure that the process does execute without issues before it is killed by
// the user in the test or, in case it runs until completion, that it has correctly executed.
EXPECT_NE(wait_ret_code, -1);
EXPECT_EQ(wait_ret_code, process_id);
EXPECT_EQ(WIFEXITED(status), true) << "status = " << status;
EXPECT_EQ(WIFSIGNALED(status), false) << "Process terminated by signal: " << WTERMSIG(status);
EXPECT_EQ(WEXITSTATUS(status), EXIT_SUCCESS) << "status = " << status;
EXPECT_EQ(wait_ret_code, process_id) << "status = " << status;
return wait_ret_code != 0;
}

/// @brief Force to stop process with signal if it's currently running
/// @param process_id Process ID
/// @param signum Signal to use for stopping process. The default is SIGINT.
/// @param timeout Timeout in fraction of seconds
void stop_execution(
const ProcessHandle & process_id,
int signum = SIGINT,
std::chrono::duration<double> timeout = std::chrono::seconds(10))
{
if (kill(process_id, 0) == 0) {
// If process is still running, then send signal to stop it and check return code
EXPECT_NE(kill(process_id, signum), -1) << "Failed to send signal " << signum <<
" to process: " << process_id;
int status = EXIT_FAILURE;
pid_t wait_ret_code = 0;
std::chrono::steady_clock::time_point const start = std::chrono::steady_clock::now();
// Wait for process to finish with timeout
while (wait_ret_code == 0 && std::chrono::steady_clock::now() - start < timeout) {
std::this_thread::sleep_for(std::chrono::milliseconds(30));
// WNOHANG - wait for processes without causing the caller to be blocked
wait_ret_code = waitpid(process_id, &status, WNOHANG);
}
if (wait_ret_code == 0) {
std::cerr << "Testing process " << process_id << " hangout. Killing it with SIGKILL \n";
kill(process_id, SIGKILL);
}
// Make sure that the process does execute without issues before it is killed by
// the user in the test or, in case it runs until completion, that it has correctly executed.
EXPECT_NE(wait_ret_code, -1);
EXPECT_EQ(wait_ret_code, process_id);
EXPECT_EQ(WIFEXITED(status), true) << "status = " << status;
EXPECT_EQ(WIFSIGNALED(status), false) << "Process terminated by signal: " << WTERMSIG(status);
EXPECT_EQ(WEXITSTATUS(status), EXIT_SUCCESS) << "status = " << status;
}
}

#endif // ROSBAG2_TEST_COMMON__PROCESS_EXECUTION_HELPERS_UNIX_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ int execute_and_wait_until_completion(const std::string & command, const std::st
auto process = create_process(command_char, path.c_str());
DWORD exit_code = 259; // 259 is the code one gets if the process is still active.
while (exit_code == 259) {
GetExitCodeProcess(process.hProcess, &exit_code);
EXPECT_TRUE(GetExitCodeProcess(process.hProcess, &exit_code));
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
close_process_handles(process);
Expand Down Expand Up @@ -105,16 +105,51 @@ ProcessHandle start_execution(const std::string & command)
return process;
}

void stop_execution(const ProcessHandle & handle, int signum = SIGINT)
/// @brief Wait for process to finish with timeout
/// @param handle Process handle returned from start_execution(command)
/// @param timeout Timeout in fraction of seconds
/// @return true if process has finished during timeout and false if timeout was reached and
/// process is still running
bool wait_until_completion(
const ProcessHandle & handle,
std::chrono::duration<double> timeout = std::chrono::seconds(10))
{
DWORD exit_code = 0;
std::chrono::steady_clock::time_point const start = std::chrono::steady_clock::now();
EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code));
// 259 indicates that the process is still active
while (exit_code == 259 && std::chrono::steady_clock::now() - start < timeout) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code));
}
return exit_code != 259;
}

/// @brief Force to stop process with signal if it's currently running
/// @param handle Process handle returned from start_execution(command)
/// @param signum Not uses for Windows version
/// @param timeout Timeout in fraction of seconds
void stop_execution(
const ProcessHandle & handle,
int signum = SIGINT,
std::chrono::duration<double> timeout = std::chrono::seconds(10))
{
// Match the Unix version by allowing for int signum argument - however Windows does not have
// Linux signals in the same way, so there isn't a 1:1 mapping to dispatch e.g. SIGTERM
DWORD exit_code;
GetExitCodeProcess(handle.process_info.hProcess, &exit_code);
EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code));
// 259 indicates that the process is still active: we want to make sure that the process is
// still running properly before killing it.
ASSERT_THAT(exit_code, Eq(259));
GenerateConsoleCtrlEvent(CTRL_C_EVENT, handle.process_info.dwThreadId);
if (exit_code == 259) {
EXPECT_TRUE(GenerateConsoleCtrlEvent(CTRL_C_EVENT, handle.process_info.dwThreadId));
bool process_finished = wait_until_completion(handle, timeout);
if (!process_finished) {
std::cerr << "Testing process " << handle.process_info.hProcess <<
" hangout. Killing it with TerminateProcess(..) \n";
EXPECT_TRUE(TerminateProcess(handle.process_info.hProcess, 2));
EXPECT_TRUE(process_finished);
}
}
close_process_handles(handle.process_info);
CloseHandle(handle.job_handle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ class SubscriptionManager
auto options = rclcpp::SubscriptionOptions();
options.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable;

subscriptions_.push_back(
subscriptions_[topic_name] =
subscriber_node_->create_subscription<MessageT>(
topic_name,
qos,
[this, topic_name](std::shared_ptr<rclcpp::SerializedMessage> msg) {
subscribed_messages_[topic_name].push_back(msg);
},
options));
topic_name,
qos,
[this, topic_name](std::shared_ptr<rclcpp::SerializedMessage> msg) {
subscribed_messages_[topic_name].push_back(msg);
},
options);
}

template<typename MessageT>
Expand Down Expand Up @@ -113,7 +113,6 @@ class SubscriptionManager
bool matched = false;
while (!matched && ((clock::now() - start) < timeout)) {
exec.spin_node_some(subscriber_node_);

matched = true;
for (const auto publisher_ptr : publishers) {
if (publisher_ptr->get_subscription_count() +
Expand All @@ -123,17 +122,63 @@ class SubscriptionManager
break;
}
}
// Sleep for a few milliseconds to avoid busy loop
std::this_thread::sleep_for(sleep_per_loop_);
}
return matched;
}

/// @brief Wait until publishers will be connected to the subscriptions or timeout occur.
/// @param topic_names List of topic names
/// @param timeout Maximum time duration during which discovery should happen.
/// @param n_publishers_to_match Number of publishers each subscription should have for match.
/// @return true if subscriptions have specified number of publishers, otherwise false.
bool spin_and_wait_for_matched(
const std::vector<std::string> & topic_names,
std::chrono::duration<double> timeout = std::chrono::seconds(10),
size_t n_publishers_to_match = 1)
{
// Sanity check that we have valid input
if (topic_names.empty()) {
throw std::invalid_argument("List of topic names is empty");
}
for (const auto & topic_name : topic_names) {
if (subscriptions_.count(topic_name) == 0) {
throw std::invalid_argument(
"Publisher's topic name = `" + topic_name + "` not found in expected topics list");
}
}

using clock = std::chrono::steady_clock;
auto start = clock::now();

rclcpp::executors::SingleThreadedExecutor exec;
bool matched = false;
while (!matched && ((clock::now() - start) < timeout)) {
exec.spin_node_some(subscriber_node_);
matched = true;
for (const auto & topic_name : topic_names) {
if (subscriptions_.find(topic_name) == subscriptions_.end() ||
subscriptions_[topic_name]->get_publisher_count() < n_publishers_to_match)
{
matched = false;
break;
}
}
// Sleep for a few milliseconds to avoid busy loop
std::this_thread::sleep_for(sleep_per_loop_);
}
return matched;
}

std::future<void> spin_subscriptions()
std::future<void> spin_subscriptions(
std::chrono::duration<double> timeout = std::chrono::seconds(10))
{
return async(
std::launch::async, [this]() {
std::launch::async, [this, timeout]() {
rclcpp::executors::SingleThreadedExecutor exec;
auto start = std::chrono::high_resolution_clock::now();
while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start)) {
while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start, timeout)) {
exec.spin_node_some(subscriber_node_);
}
});
Expand All @@ -151,26 +196,33 @@ class SubscriptionManager
private:
bool continue_spinning(
const std::unordered_map<std::string, size_t> & expected_topics_with_sizes,
std::chrono::time_point<std::chrono::high_resolution_clock> start_time)
std::chrono::time_point<std::chrono::high_resolution_clock> start_time,
std::chrono::duration<double> timeout = std::chrono::seconds(10))
{
auto current = std::chrono::high_resolution_clock::now();
if (current - start_time > std::chrono::seconds(10)) {
if (current - start_time > timeout) {
RCLCPP_WARN(
subscriber_node_->get_logger(),
"SubscriptionManager::continue_spinning(..) finished by timeout");
return false;
}

for (const auto & topic_expected : expected_topics_with_sizes) {
if (subscribed_messages_[topic_expected.first].size() < topic_expected.second) {
// Sleep for a few milliseconds to avoid busy loop
std::this_thread::sleep_for(sleep_per_loop_);
return true;
}
}
return false;
}

std::vector<rclcpp::SubscriptionBase::SharedPtr> subscriptions_;
std::unordered_map<std::string, rclcpp::SubscriptionBase::SharedPtr> subscriptions_;
std::unordered_map<std::string,
std::vector<std::shared_ptr<rclcpp::SerializedMessage>>> subscribed_messages_;
std::unordered_map<std::string, size_t> expected_topics_with_size_;
rclcpp::Node::SharedPtr subscriber_node_;
const std::chrono::milliseconds sleep_per_loop_ = std::chrono::milliseconds(2);
};

} // namespace rosbag2_test_common
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ if(BUILD_TESTING)
find_package(rcpputils REQUIRED)
find_package(rosbag2_compression REQUIRED)
find_package(rosbag2_cpp REQUIRED)
find_package(rosbag2_interfaces REQUIRED)
find_package(rosbag2_storage REQUIRED)
find_package(rosbag2_storage_default_plugins REQUIRED)
find_package(rosbag2_test_common REQUIRED)
Expand Down Expand Up @@ -63,11 +64,11 @@ if(BUILD_TESTING)
if(TARGET test_rosbag2_play_end_to_end)
target_link_libraries(test_rosbag2_play_end_to_end
rclcpp::rclcpp
${rosbag2_interfaces_TARGETS}
rosbag2_storage::rosbag2_storage
rosbag2_test_common::rosbag2_test_common
${test_msgs_TARGETS}
)
ament_add_test_label(test_rosbag2_play_end_to_end xfail)
endif()

ament_add_gmock(test_rosbag2_info_end_to_end
Expand All @@ -78,7 +79,6 @@ if(BUILD_TESTING)
rosbag2_storage::rosbag2_storage
rosbag2_test_common::rosbag2_test_common
)
ament_add_test_label(test_rosbag2_info_end_to_end xfail)
endif()

ament_add_gmock(test_converter
Expand Down
1 change: 1 addition & 0 deletions rosbag2_tests/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<test_depend>rosbag2_compression</test_depend>
<test_depend>rosbag2_compression_zstd</test_depend>
<test_depend>rosbag2_cpp</test_depend>
<test_depend>rosbag2_interfaces</test_depend>
<test_depend>rosbag2_storage_default_plugins</test_depend>
<test_depend>rosbag2_storage</test_depend>
<test_depend>rosbag2_test_common</test_depend>
Expand Down
Binary file modified rosbag2_tests/resources/mcap/cdr_test/cdr_test_0.mcap
Binary file not shown.
8 changes: 5 additions & 3 deletions rosbag2_tests/resources/mcap/cdr_test/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
rosbag2_bagfile_information:
version: 6
version: 7
storage_identifier: mcap
duration:
nanoseconds: 151137181
Expand All @@ -11,13 +11,15 @@ rosbag2_bagfile_information:
name: /array_topic
type: test_msgs/msg/Arrays
serialization_format: cdr
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 2\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
offered_qos_profiles: "- history: 3\n depth: 4\n reliability: 1\n durability: 1\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
type_description_hash: ""
message_count: 4
- topic_metadata:
name: /test_topic
type: test_msgs/msg/BasicTypes
serialization_format: cdr
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 2\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
offered_qos_profiles: "- history: 3\n depth: 3\n reliability: 1\n durability: 1\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
type_description_hash: ""
message_count: 3
compression_format: ""
compression_mode: ""
Expand Down
Binary file modified rosbag2_tests/resources/sqlite3/cdr_test/cdr_test_0.db3
Binary file not shown.
30 changes: 20 additions & 10 deletions rosbag2_tests/resources/sqlite3/cdr_test/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
rosbag2_bagfile_information:
version: 4
version: 7
storage_identifier: sqlite3
relative_file_paths:
- cdr_test_0.db3
duration:
nanoseconds: 151137181
starting_time:
nanoseconds_since_epoch: 1586406456763032325
message_count: 7
topics_with_message_count:
- topic_metadata:
name: /test_topic
type: test_msgs/msg/BasicTypes
serialization_format: cdr
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 2\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
message_count: 3
- topic_metadata:
name: /array_topic
type: test_msgs/msg/Arrays
serialization_format: cdr
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 2\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
offered_qos_profiles: "- history: 3\n depth: 4\n reliability: 1\n durability: 1\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
type_description_hash: ""
message_count: 4
- topic_metadata:
name: /test_topic
type: test_msgs/msg/BasicTypes
serialization_format: cdr
offered_qos_profiles: "- history: 3\n depth: 3\n reliability: 1\n durability: 1\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
type_description_hash: ""
message_count: 3
compression_format: ""
compression_mode: ""
relative_file_paths:
- cdr_test_0.db3
files:
- path: cdr_test_0.db3
starting_time:
nanoseconds_since_epoch: 1586406456763032325
duration:
nanoseconds: 151137181
message_count: 7
custom_data: ~
Loading

0 comments on commit af4ca0c

Please sign in to comment.