Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

host: Fix RFNoC graph action queue lockup on action exceptions #730

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions host/lib/rfnoc/graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,37 @@ auto get_dirty_props(graph_t::node_ref_t node_ref)
});
}

/*! Atomic flag locking RAII wrapper
*/
class atomic_flag_lock
{
private:
atomic_flag_lock(const atomic_flag_lock&) = delete;
atomic_flag_lock(atomic_flag_lock&&) = delete;
atomic_flag_lock& operator=(const atomic_flag_lock&) = delete;
atomic_flag_lock& operator=(atomic_flag_lock&&) = delete;

public:
atomic_flag_lock(std::atomic_flag& flag)
: _flag(flag), _owns_lock(!flag.test_and_set())
{
}

~atomic_flag_lock()
{
if (_owns_lock)
_flag.clear();
}

bool owns_lock() const
{
return _owns_lock;
}
private:
std::atomic_flag& _flag;
bool _owns_lock;
};

} // namespace

/*! Graph-filtering predicate to find dirty nodes only
Expand Down Expand Up @@ -481,11 +512,11 @@ void graph_t::enqueue_action(
// Check if we're already in the middle of handling actions. In that case,
// we're already in the loop below, and then all we want to do is to enqueue
// this action tuple. The first call to enqueue_action() within this thread
// context will have handling_ongoing == false.
const bool handling_ongoing = _action_handling_ongoing.test_and_set();
// context will have handling_ongoing.owns_lock() == true.
const atomic_flag_lock handling_ongoing{_action_handling_ongoing};
// In any case, stash the new action at the end of the action queue
_action_queue.emplace_back(std::make_tuple(src_node, src_edge, action));
if (handling_ongoing) {
if (!handling_ongoing.owns_lock()) {
UHD_LOG_TRACE(LOG_ID,
"Action handling ongoing, deferring delivery of " << action->key << "#"
<< action->id);
Expand Down Expand Up @@ -541,10 +572,8 @@ void graph_t::enqueue_action(
}
UHD_LOG_TRACE(LOG_ID, "Delivered all actions, terminating action handling.");

// Release the action handling flag
_action_handling_ongoing.clear();
// Now, the _graph_mutex is released, and someone else can start sending
// actions.
// Now, the _graph_mutex and handling_ongoing are released, and someone else can start
// sending actions.
}

/******************************************************************************
Expand Down
26 changes: 26 additions & 0 deletions host/tests/actions_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,29 @@ BOOST_AUTO_TEST_CASE(test_action_forwarding_map_exception_invalid_destination)
auto cmd = action_info::make("action");
BOOST_REQUIRE_THROW(generator.post_output_edge_action(cmd, 0), uhd::rfnoc_error);
}

BOOST_AUTO_TEST_CASE(test_action_exception_handling)
{
node_accessor_t node_accessor{};
uhd::rfnoc::detail::graph_t graph{};

class mock_throwing_node_t : public mock_radio_node_t
{
public:
mock_throwing_node_t() : mock_radio_node_t(0)
{
register_action_handler(
"throwing_action", [](const res_source_info&, action_info::sptr) {
throw uhd::runtime_error("Arbitrary UHD exception");
});
}
};

mock_throwing_node_t mock_radio{};
graph.connect(&mock_radio, &mock_radio, {0, 0, graph_edge_t::DYNAMIC, false});
graph.commit();
// Check that it throws the first time
BOOST_REQUIRE_THROW(node_accessor.post_action(&mock_radio, {res_source_info::USER, 0}, action_info::make("throwing_action")), uhd::runtime_error);
// It should also throw the second time: we should actually be running this action even though the previous one threw an exception
BOOST_REQUIRE_THROW(node_accessor.post_action(&mock_radio, {res_source_info::USER, 0}, action_info::make("throwing_action")), uhd::runtime_error);
}
Loading