From faaefb2b9ec0dcb4bad380deda91297db55dacd0 Mon Sep 17 00:00:00 2001 From: "Boyarinov, Konstantin" Date: Fri, 20 Dec 2024 04:15:21 -0800 Subject: [PATCH 01/11] Add impl --- .../oneapi/tbb/detail/_flow_graph_body_impl.h | 32 +++++++- .../oneapi/tbb/detail/_flow_graph_node_impl.h | 72 ++++++++++++++++-- test/tbb/test_multifunction_node.cpp | 75 +++++++++++++++++-- 3 files changed, 162 insertions(+), 17 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index 21da06ce03..f2f1ca2617 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -168,23 +168,47 @@ class function_body_leaf< continue_msg, Output, B > : public function_body< cont B body; }; +class multifunction_node_tag; + //! function_body that takes an Input and a set of output ports template class multifunction_body : no_assign { public: virtual ~multifunction_body () {} - virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/) = 0; + virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/ __TBB_FLOW_GRAPH_METAINFO_ARG(const multifunction_node_tag& /*tag*/)) = 0; virtual multifunction_body* clone() = 0; virtual void* get_body_ptr() = 0; }; //! leaf for multifunction. OutputSet can be a std::tuple or a vector. -template +template class multifunction_body_leaf : public multifunction_body { + using first_priority = int; + using second_priority = double; + + // body may explicitly put() to one or more of oset. + void invoke_body_impl(const Input& input, OutputSet& oset __TBB_FLOW_GRAPH_METAINFO_ARG(const multifunction_node_tag&), second_priority) + { + tbb::detail::invoke(body, input, oset); + } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + template + auto invoke_body_impl(const InputT& input, OutputSetT& oset, const multifunction_node_tag& tag, first_priority) + -> decltype(tbb::detail::invoke(std::declval(), input, oset, tag), void()) + { + tbb::detail::invoke(body, input, oset, tag); + } +#endif + + void invoke_body(const Input& input, OutputSet& oset __TBB_FLOW_GRAPH_METAINFO_ARG(const multifunction_node_tag& tag)) { + invoke_body_impl(input, oset __TBB_FLOW_GRAPH_METAINFO_ARG(tag), 1); + } + public: multifunction_body_leaf(const B &_body) : body(_body) { } - void operator()(const Input &input, OutputSet &oset) override { - tbb::detail::invoke(body, input, oset); // body may explicitly put() to one or more of oset. + void operator()(const Input &input, OutputSet &oset __TBB_FLOW_GRAPH_METAINFO_ARG(const multifunction_node_tag& tag)) override { + invoke_body(input, oset __TBB_FLOW_GRAPH_METAINFO_ARG(tag)); } void* get_body_ptr() override { return &body; } multifunction_body_leaf* clone() override { diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 336cb069c6..11a5e6703a 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -554,6 +554,44 @@ struct init_output_ports { } }; // struct init_output_ports +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +class multifunction_node_tag { +public: + multifunction_node_tag() = default; + multifunction_node_tag(const message_metainfo& metainfo) : my_metainfo(metainfo) { + for (auto waiter : my_metainfo.waiters()) { + waiter->reserve(); + } + } + + multifunction_node_tag(const multifunction_node_tag& other) : my_metainfo(other.my_metainfo) { + for (auto waiter : my_metainfo.waiters()) { + waiter->reserve(); + } + } + + multifunction_node_tag(multifunction_node_tag&& other) = default; + + ~multifunction_node_tag() { + for (auto waiter : my_metainfo.waiters()) { + waiter->release(); + } + } + + void merge(const multifunction_node_tag& other_tag) { + for (auto waiter : other_tag.my_metainfo.waiters()) { + waiter->reserve(); + } + my_metainfo.merge(other_tag.my_metainfo); + } +private: + template + friend class multifunction_output; + + message_metainfo my_metainfo; +}; +#endif + //! Implements methods for a function node that takes a type Input as input // and has a tuple of output ports specified. template< typename Input, typename OutputPortSet, typename Policy, typename A> @@ -562,6 +600,9 @@ class multifunction_input : public function_input_base::value; typedef Input input_type; typedef OutputPortSet output_ports_type; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + typedef multifunction_node_tag tag_type; +#endif typedef multifunction_body multifunction_body_type; typedef multifunction_input my_class; typedef function_input_base base_type; @@ -570,7 +611,7 @@ class multifunction_input : public function_input_base multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority ) - : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type(), my_output_ports))) + : base_type(g, max_concurrency, a_priority, noexcept(1)) , my_body( new multifunction_body_leaf(body) ) , my_init_body( new multifunction_body_leaf(body) ) , my_output_ports(init_output_ports::call(g, my_output_ports)){ @@ -599,10 +640,11 @@ class multifunction_input : public function_input_base { multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {} bool try_put(const output_type &i) { - graph_task *res = try_put_task(i); + return try_put_impl(i __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); + } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool try_put(const output_type& i, const multifunction_node_tag& tag) { + return try_put_impl(i, tag.my_metainfo); + } + + bool try_put(const output_type& i, multifunction_node_tag&& tag) { + multifunction_node_tag local_tag = std::move(tag); + return try_put_impl(i, local_tag.my_metainfo); + } +#endif + + using base_type::graph_reference; + +protected: + bool try_put_impl(const output_type& i __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) { + graph_task *res = try_put_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); if( !res ) return false; if( res != SUCCESSFULLY_ENQUEUED ) { // wrapping in task_arena::execute() is not needed since the method is called from @@ -861,10 +921,6 @@ class multifunction_output : public function_output { return true; } - using base_type::graph_reference; - -protected: - graph_task* try_put_task(const output_type &i) { return my_successors.try_put_task(i); } diff --git a/test/tbb/test_multifunction_node.cpp b/test/tbb/test_multifunction_node.cpp index bfebbbe94f..f5c6c3f7f2 100644 --- a/test/tbb/test_multifunction_node.cpp +++ b/test/tbb/test_multifunction_node.cpp @@ -541,11 +541,11 @@ TEST_CASE("Test ports retrurn references"){ test_ports_return_references(); } -//! NativeParallelFor testing with various concurrency settings -//! \brief \ref error_guessing -TEST_CASE("Lightweight testing"){ - lightweight_testing::test(10); -} +// //! NativeParallelFor testing with various concurrency settings +// //! \brief \ref error_guessing +// TEST_CASE("Lightweight testing"){ +// lightweight_testing::test(10); +// } #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET //! Test follows and precedes API @@ -623,3 +623,68 @@ TEST_CASE("constraints for multifunction_node body") { static_assert(!can_call_multifunction_node_ctor>); } #endif // __TBB_CPP20_CONCEPTS_PRESENT + +TEST_CASE("multifunction try_put_and_wait") { + std::cout << "start test" << std::endl; + tbb::task_arena arena(1); + + arena.execute([]{ + int num_items = 5; + tbb::flow::graph g; + + using func_node_type = tbb::flow::function_node; + using multi_node_type = tbb::flow::multifunction_node>; + using ports_type = typename multi_node_type::output_ports_type; + using tag_type = typename multi_node_type::tag_type; + + func_node_type* start_node = nullptr; + + func_node_type start(g, tbb::flow::serial, + [&](int i) { + std::cout << "processing " << i << std::endl; + if (start_node != nullptr) { + for (int j = 2; j <= num_items; ++j) { + start_node->try_put(j); + } + start_node = nullptr; + } + return i; + }); + + start_node = &start; + + int num_accumulated = 0; + int accumulated_result = 0; + tag_type accumulated_hint; + + tbb::flow::multifunction_node> multifunction(g, tbb::flow::serial, + // [&](int i, ports_type& ports, const tag_type& tag) { + [&](int i, ports_type& ports) { + std::cout << "mf processing " << i << std::endl; + + // ++num_accumulated; + // std::cout << "num_accumulated = " << num_accumulated << std::endl; + // accumulated_result += i; + // accumulated_hint.merge(tag); + + // if (num_accumulated == num_items) { + // std::get<0>(ports).try_put(accumulated_result, std::move(accumulated_hint)); + // } + + // std::get<0>(ports).try_put(i, tag); + + std::get<0>(ports).try_put(i); + }); + + tbb::flow::function_node writer(g, tbb::flow::serial, + [](int res) { std::cout << "res = " << res << std::endl; return 0; }); + + tbb::flow::make_edge(start, multifunction); + tbb::flow::make_edge(tbb::flow::output_port<0>(multifunction), writer); + + start.try_put_and_wait(1); + + std::cout << "Wait for all" << std::endl; + g.wait_for_all(); + }); +} From ef41553b75c560bcc6301c9176dccd2fc83b39c5 Mon Sep 17 00:00:00 2001 From: "Boyarinov, Konstantin" Date: Mon, 6 Jan 2025 07:52:52 -0800 Subject: [PATCH 02/11] Draft unique-ptr-based implementation --- .../oneapi/tbb/detail/_flow_graph_node_impl.h | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 11a5e6703a..24e3453c9f 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -555,41 +555,51 @@ struct init_output_ports { }; // struct init_output_ports #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + class multifunction_node_tag { public: multifunction_node_tag() = default; - multifunction_node_tag(const message_metainfo& metainfo) : my_metainfo(metainfo) { - for (auto waiter : my_metainfo.waiters()) { - waiter->reserve(); - } - } - multifunction_node_tag(const multifunction_node_tag& other) : my_metainfo(other.my_metainfo) { + multifunction_node_tag(const multifunction_node_tag&) = delete; + + multifunction_node_tag(multifunction_node_tag&&); + + multifunction_node_tag(const message_metainfo& metainfo) : my_metainfo(metainfo) { for (auto waiter : my_metainfo.waiters()) { waiter->reserve(); } } - multifunction_node_tag(multifunction_node_tag&& other) = default; + multifunction_node_tag& operator=(const multifunction_node_tag&) = delete; + multifunction_node_tag& operator=(multifunction_node_tag&&) = delete; ~multifunction_node_tag() { - for (auto waiter : my_metainfo.waiters()) { - waiter->release(); - } + reset(); } void merge(const multifunction_node_tag& other_tag) { + tbb::spin_mutex::scoped_lock lock(my_mutex); + + // TODO: add comment for (auto waiter : other_tag.my_metainfo.waiters()) { waiter->reserve(); } my_metainfo.merge(other_tag.my_metainfo); } + + void reset() { + for (auto waiter : my_metainfo.waiters()) { + waiter->release(); + } + } private: template friend class multifunction_output; message_metainfo my_metainfo; + tbb::spin_mutex my_mutex; }; + #endif //! Implements methods for a function node that takes a type Input as input From 8e14ce1b7fd1ca7ac7b77b7f00fab398cd2d7a6e Mon Sep 17 00:00:00 2001 From: "Boyarinov, Konstantin" Date: Wed, 8 Jan 2025 04:13:57 -0800 Subject: [PATCH 03/11] Save progress --- .../oneapi/tbb/detail/_flow_graph_body_impl.h | 18 +- .../oneapi/tbb/detail/_flow_graph_node_impl.h | 14 +- test/tbb/test_function_node.cpp | 2 +- test/tbb/test_multifunction_node.cpp | 217 ++++++++++++++++-- 4 files changed, 215 insertions(+), 36 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index f2f1ca2617..90393fea03 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -175,7 +175,7 @@ template class multifunction_body : no_assign { public: virtual ~multifunction_body () {} - virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/ __TBB_FLOW_GRAPH_METAINFO_ARG(const multifunction_node_tag& /*tag*/)) = 0; + virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/ __TBB_FLOW_GRAPH_METAINFO_ARG(multifunction_node_tag&& /*tag*/)) = 0; virtual multifunction_body* clone() = 0; virtual void* get_body_ptr() = 0; }; @@ -187,28 +187,28 @@ class multifunction_body_leaf : public multifunction_body { using second_priority = double; // body may explicitly put() to one or more of oset. - void invoke_body_impl(const Input& input, OutputSet& oset __TBB_FLOW_GRAPH_METAINFO_ARG(const multifunction_node_tag&), second_priority) + void invoke_body_impl(const Input& input, OutputSet& oset __TBB_FLOW_GRAPH_METAINFO_ARG(multifunction_node_tag&&), second_priority) { tbb::detail::invoke(body, input, oset); } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT template - auto invoke_body_impl(const InputT& input, OutputSetT& oset, const multifunction_node_tag& tag, first_priority) - -> decltype(tbb::detail::invoke(std::declval(), input, oset, tag), void()) + auto invoke_body_impl(const InputT& input, OutputSetT& oset, multifunction_node_tag&& tag, first_priority) + -> decltype(tbb::detail::invoke(std::declval(), input, oset, std::move(tag)), void()) { - tbb::detail::invoke(body, input, oset, tag); + tbb::detail::invoke(body, input, oset, std::move(tag)); } #endif - void invoke_body(const Input& input, OutputSet& oset __TBB_FLOW_GRAPH_METAINFO_ARG(const multifunction_node_tag& tag)) { - invoke_body_impl(input, oset __TBB_FLOW_GRAPH_METAINFO_ARG(tag), 1); + void invoke_body(const Input& input, OutputSet& oset __TBB_FLOW_GRAPH_METAINFO_ARG(multifunction_node_tag&& tag)) { + invoke_body_impl(input, oset __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), 1); } public: multifunction_body_leaf(const B &_body) : body(_body) { } - void operator()(const Input &input, OutputSet &oset __TBB_FLOW_GRAPH_METAINFO_ARG(const multifunction_node_tag& tag)) override { - invoke_body(input, oset __TBB_FLOW_GRAPH_METAINFO_ARG(tag)); + void operator()(const Input &input, OutputSet &oset __TBB_FLOW_GRAPH_METAINFO_ARG(multifunction_node_tag&& tag)) override { + invoke_body(input, oset __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag))); } void* get_body_ptr() override { return &body; } multifunction_body_leaf* clone() override { diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 24e3453c9f..41d7a3c2fc 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -562,7 +562,8 @@ class multifunction_node_tag { multifunction_node_tag(const multifunction_node_tag&) = delete; - multifunction_node_tag(multifunction_node_tag&&); + multifunction_node_tag(multifunction_node_tag&& other) + : my_metainfo(std::move(other.my_metainfo)) {} multifunction_node_tag(const message_metainfo& metainfo) : my_metainfo(metainfo) { for (auto waiter : my_metainfo.waiters()) { @@ -571,7 +572,13 @@ class multifunction_node_tag { } multifunction_node_tag& operator=(const multifunction_node_tag&) = delete; - multifunction_node_tag& operator=(multifunction_node_tag&&) = delete; + multifunction_node_tag& operator=(multifunction_node_tag&& other) { + if (this != &other) { + reset(); + my_metainfo = std::move(other.my_metainfo); + } + return *this; + } ~multifunction_node_tag() { reset(); @@ -591,6 +598,7 @@ class multifunction_node_tag { for (auto waiter : my_metainfo.waiters()) { waiter->release(); } + my_metainfo = message_metainfo{}; } private: template @@ -654,7 +662,7 @@ class multifunction_input : public function_input_base processed_items; std::vector new_work_items; - int wait_message = 0; + int wait_message = 10; for (int i = 1; i < wait_message; ++i) { new_work_items.emplace_back(i); diff --git a/test/tbb/test_multifunction_node.cpp b/test/tbb/test_multifunction_node.cpp index f5c6c3f7f2..eed24d0ed3 100644 --- a/test/tbb/test_multifunction_node.cpp +++ b/test/tbb/test_multifunction_node.cpp @@ -624,8 +624,165 @@ TEST_CASE("constraints for multifunction_node body") { } #endif // __TBB_CPP20_CONCEPTS_PRESENT -TEST_CASE("multifunction try_put_and_wait") { - std::cout << "start test" << std::endl; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + +void test_tag_type() { + using multinode_type = tbb::flow::multifunction_node>; + using ports_type = typename multinode_type::output_ports_type; + using tag_type = typename multinode_type::tag_type; + + int processed = 0; + + tbb::flow::graph g; + multinode_type node(g, tbb::flow::unlimited, + [&](int input, ports_type&, tag_type&& tag) { + processed = input; + tag_type tag1; + tag_type tag2(std::move(tag)); + + tag1 = std::move(tag2); + tag = std::move(tag1); + }); + + node.try_put_and_wait(1); + CHECK_MESSAGE(processed == 1, "Body wait not called in try_put_and_wait call"); + g.wait_for_all(); +} + +void test_simple_broadcast() { + tbb::task_arena arena(1); + + using funcnode_type = tbb::flow::function_node; + using multinode_type = tbb::flow::multifunction_node>; + using ports_type = typename multinode_type::output_ports_type; + using tag_type = typename multinode_type::tag_type; + + arena.execute([&] { + tbb::flow::graph g; + + std::vector processed_items1; + std::vector processed_items2; + std::vector new_work_items; + + int wait_message = 10; + + for (int i = 0; i < wait_message; ++i) { + new_work_items.emplace_back(i); + } + + multinode_type* start_node = nullptr; + + multinode_type node(g, tbb::flow::unlimited, + [&](int input, ports_type& ports, tag_type&& tag) { + if (input == wait_message) { + for (int item : new_work_items) { + start_node->try_put(item); + } + } + + std::get<0>(ports).try_put(input, tag); + std::get<1>(ports).try_put(input, std::move(tag)); + }); + + start_node = &node; + + funcnode_type func1(g, tbb::flow::unlimited, + [&](int input) noexcept { + processed_items1.emplace_back(input); + return 0; + }); + + funcnode_type func2(g, tbb::flow::unlimited, + [&](int input) noexcept { + processed_items2.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(tbb::flow::output_port<0>(node), func1); + tbb::flow::make_edge(tbb::flow::output_port<1>(node), func2); + + bool result = node.try_put_and_wait(wait_message); + CHECK_MESSAGE(result, "unexpected try_put_and_wait result"); + + CHECK(processed_items1.size() == 1); + CHECK(processed_items2.size() == 1); + CHECK_MESSAGE(processed_items1[0] == wait_message, "Only the wait message should be processed by try_put_and_wait"); + CHECK_MESSAGE(processed_items2[0] == wait_message, "Only the wait message should be processed by try_put_and_wait"); + + g.wait_for_all(); + + CHECK(processed_items1.size() == new_work_items.size() + 1); + CHECK(processed_items2.size() == new_work_items.size() + 1); + + std::size_t check_index = 1; + for (std::size_t i = new_work_items.size(); i != 0; --i) { + CHECK_MESSAGE(processed_items1[check_index] == new_work_items[i - 1], "Unexpected items processing order"); + CHECK_MESSAGE(processed_items2[check_index] == new_work_items[i - 1], "Unexpected items processing order"); + ++check_index; + } + CHECK(check_index == processed_items1.size()); + }); +} + +void test_no_broadcast() { + using multinode_type = tbb::flow::multifunction_node>; + using ports_type = typename multinode_type::output_ports_type; + using tag_type = typename multinode_type::tag_type; + + std::size_t num_items = 10; + std::atomic num_processed_items = 0; + int wait_message = 42; + tag_type global_tag; + + std::vector processed_items; + multinode_type* this_node = nullptr; + + tbb::flow::graph g; + multinode_type node(g, tbb::flow::unlimited, + [&](int input, ports_type& ports, tag_type&& local_tag) { + // std::cout << "Process " << input << std::endl; + printf("Processed %li items\n", num_processed_items.load()); + if (num_processed_items < num_items) { + if (input == wait_message) { + global_tag = std::move(local_tag); + for (int i = 0; i < int(num_items - 1); ++i) { + this_node->try_put(i); + } + } + std::get<0>(ports).try_put(input); + } else { + global_tag.reset(); + } + ++num_processed_items; + }); + + this_node = &node; + + tbb::flow::function_node write_node(g, tbb::flow::unlimited, + [&](int value) noexcept { processed_items.emplace_back(value); return 0; }); + + tbb::flow::make_edge(tbb::flow::output_port<0>(node), write_node); + + node.try_put_and_wait(wait_message); + + CHECK(num_items == num_processed_items); + CHECK(processed_items.size() == num_items - 1); + std::sort(processed_items.begin(), processed_items.end()); + + for (auto item : processed_items) { + std::cout << item << " "; + } + std::cout << std::endl; + + for (std::size_t i = 0; i < num_items - 1; ++i) { + CHECK_MESSAGE(processed_items[i] == i, "Incorrect items processing order"); + } + CHECK_MESSAGE(processed_items.back() == wait_message, "Incorrect items processing"); + + g.wait_for_all(); +} + +void test_reduction() { tbb::task_arena arena(1); arena.execute([]{ @@ -639,14 +796,15 @@ TEST_CASE("multifunction try_put_and_wait") { func_node_type* start_node = nullptr; - func_node_type start(g, tbb::flow::serial, + func_node_type start(g, tbb::flow::unlimited, [&](int i) { std::cout << "processing " << i << std::endl; - if (start_node != nullptr) { - for (int j = 2; j <= num_items; ++j) { + static bool extra_work_added = false; + if (!extra_work_added) { + extra_work_added = true; + for (int j = i + 1; j < i + num_items; ++j) { start_node->try_put(j); } - start_node = nullptr; } return i; }); @@ -657,28 +815,32 @@ TEST_CASE("multifunction try_put_and_wait") { int accumulated_result = 0; tag_type accumulated_hint; - tbb::flow::multifunction_node> multifunction(g, tbb::flow::serial, - // [&](int i, ports_type& ports, const tag_type& tag) { - [&](int i, ports_type& ports) { - std::cout << "mf processing " << i << std::endl; + tbb::flow::multifunction_node> multifunction(g, tbb::flow::unlimited, + [&](int i, ports_type& ports, const tag_type& tag) { + ++num_accumulated; + accumulated_result += i; + accumulated_hint.merge(tag); - // ++num_accumulated; - // std::cout << "num_accumulated = " << num_accumulated << std::endl; - // accumulated_result += i; - // accumulated_hint.merge(tag); - - // if (num_accumulated == num_items) { - // std::get<0>(ports).try_put(accumulated_result, std::move(accumulated_hint)); - // } + if (num_accumulated == num_items) { + std::get<0>(ports).try_put(accumulated_result, std::move(accumulated_hint)); + num_accumulated = 0; + } + }); - // std::get<0>(ports).try_put(i, tag); + tbb::flow::function_node writer(g, tbb::flow::unlimited, + [&](int res) { + std::cout << "Write = " << res << std::endl; + static bool extra_loop_added = false; - std::get<0>(ports).try_put(i); + if (!extra_loop_added) { + extra_loop_added = true; + for (int i = 100; i < 100 + num_items; ++i) { + multifunction.try_put(i); + } + } + return 0; }); - tbb::flow::function_node writer(g, tbb::flow::serial, - [](int res) { std::cout << "res = " << res << std::endl; return 0; }); - tbb::flow::make_edge(start, multifunction); tbb::flow::make_edge(tbb::flow::output_port<0>(multifunction), writer); @@ -688,3 +850,12 @@ TEST_CASE("multifunction try_put_and_wait") { g.wait_for_all(); }); } + +TEST_CASE("multifunction_node try_put_and_wait") { + test_tag_type(); + test_simple_broadcast(); + test_no_broadcast(); + // test_reduction(); +} + +#endif From b8149b0de1efd73c5f15a0f346d465cef29aa801 Mon Sep 17 00:00:00 2001 From: "Boyarinov, Konstantin" Date: Tue, 14 Jan 2025 06:49:38 -0800 Subject: [PATCH 04/11] Save progress --- .../oneapi/tbb/detail/_flow_graph_node_impl.h | 7 +- test/tbb/test_multifunction_node.cpp | 106 +++++++++++------- 2 files changed, 69 insertions(+), 44 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 41d7a3c2fc..62091c767e 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -573,6 +573,7 @@ class multifunction_node_tag { multifunction_node_tag& operator=(const multifunction_node_tag&) = delete; multifunction_node_tag& operator=(multifunction_node_tag&& other) { + // TODO: should this method be thread-safe? if (this != &other) { reset(); my_metainfo = std::move(other.my_metainfo); @@ -595,6 +596,8 @@ class multifunction_node_tag { } void reset() { + tbb::spin_mutex::scoped_lock lock(my_mutex); + for (auto waiter : my_metainfo.waiters()) { waiter->release(); } @@ -660,9 +663,11 @@ class multifunction_input : public function_input_base>; - using ports_type = typename multinode_type::output_ports_type; - using tag_type = typename multinode_type::tag_type; + if (std::thread::hardware_concurrency() == 1) { + return; + } - std::size_t num_items = 10; - std::atomic num_processed_items = 0; - int wait_message = 42; - tag_type global_tag; + tbb::task_arena arena(std::thread::hardware_concurrency(), 0); - std::vector processed_items; - multinode_type* this_node = nullptr; + arena.execute([]() { + using multinode_type = tbb::flow::multifunction_node>; + using ports_type = typename multinode_type::output_ports_type; + using tag_type = typename multinode_type::tag_type; + + std::size_t num_items = 10; + std::size_t num_additional_items = 10; + + std::atomic num_processed_items = 0; + std::atomic num_processed_accumulators = 0; + std::atomic try_put_and_wait_exit_flag{}; + + int accumulator_message = 1; + int add_message = 2; + + tag_type global_tag; + + multinode_type* this_node = nullptr; + + std::vector postprocessed_items; + auto global_index = tbb::this_task_arena::current_thread_index(); + + tbb::flow::graph g; + multinode_type node(g, tbb::flow::unlimited, + [&](int input, ports_type& ports, tag_type&& local_tag) { + if (num_processed_items++ == 0) { + CHECK(input == accumulator_message); + ++num_processed_accumulators; - tbb::flow::graph g; - multinode_type node(g, tbb::flow::unlimited, - [&](int input, ports_type& ports, tag_type&& local_tag) { - // std::cout << "Process " << input << std::endl; - printf("Processed %li items\n", num_processed_items.load()); - if (num_processed_items < num_items) { - if (input == wait_message) { global_tag = std::move(local_tag); - for (int i = 0; i < int(num_items - 1); ++i) { - this_node->try_put(i); + for (std::size_t i = 1; i < num_items; ++i) { + this_node->try_put(accumulator_message); + } + for (std::size_t i = 0; i < num_additional_items; ++i) { + this_node->try_put(add_message); + } + } else { + if (input == accumulator_message) { + if (num_processed_accumulators++ == num_items - 1) { + // The last accumulator was received - "cancel" the operation + global_tag.reset(); + } + } else { + if (global_index != tbb::this_task_arena::current_thread_index()) { + // Block the worker thread until the try_put_and_wait was exitted + while(!try_put_and_wait_exit_flag.load()) { + std::this_thread::yield(); + } + } + std::get<0>(ports).try_put(input); } } - std::get<0>(ports).try_put(input); - } else { - global_tag.reset(); - } - ++num_processed_items; - }); - - this_node = &node; - - tbb::flow::function_node write_node(g, tbb::flow::unlimited, - [&](int value) noexcept { processed_items.emplace_back(value); return 0; }); + }); - tbb::flow::make_edge(tbb::flow::output_port<0>(node), write_node); + this_node = &node; - node.try_put_and_wait(wait_message); + tbb::flow::function_node write_node(g, tbb::flow::serial, + [&](int value) noexcept { postprocessed_items.emplace_back(value); return 0; }); - CHECK(num_items == num_processed_items); - CHECK(processed_items.size() == num_items - 1); - std::sort(processed_items.begin(), processed_items.end()); + tbb::flow::make_edge(tbb::flow::output_port<0>(node), write_node); - for (auto item : processed_items) { - std::cout << item << " "; - } - std::cout << std::endl; + node.try_put_and_wait(accumulator_message); - for (std::size_t i = 0; i < num_items - 1; ++i) { - CHECK_MESSAGE(processed_items[i] == i, "Incorrect items processing order"); - } - CHECK_MESSAGE(processed_items.back() == wait_message, "Incorrect items processing"); + std::cout << num_processed_accumulators << std::endl; + std::cout << num_processed_items << std::endl; + std::cout << postprocessed_items.size() << std::endl; - g.wait_for_all(); + g.wait_for_all(); + }); } void test_reduction() { From d9ebc3bba5fd6915c8b6b7e7981b266ad2d6483e Mon Sep 17 00:00:00 2001 From: "Boyarinov, Konstantin" Date: Wed, 15 Jan 2025 04:31:47 -0800 Subject: [PATCH 05/11] Add async_node implementation and tests --- .../oneapi/tbb/detail/_flow_graph_body_impl.h | 68 ++- .../tbb/detail/_flow_graph_cache_impl.h | 5 +- .../oneapi/tbb/detail/_flow_graph_node_impl.h | 41 +- include/oneapi/tbb/flow_graph.h | 25 +- include/oneapi/tbb/flow_graph_abstractions.h | 7 + test/tbb/test_async_node.cpp | 15 +- test/tbb/test_buffering_try_put_and_wait.h | 189 ------- test/tbb/test_multifunction_node.cpp | 254 +--------- test/tbb/test_try_put_and_wait.h | 468 ++++++++++++++++++ 9 files changed, 579 insertions(+), 493 deletions(-) delete mode 100644 test/tbb/test_buffering_try_put_and_wait.h create mode 100644 test/tbb/test_try_put_and_wait.h diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index 90393fea03..fc6b9f49cd 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -168,14 +168,51 @@ class function_body_leaf< continue_msg, Output, B > : public function_body< cont B body; }; -class multifunction_node_tag; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +class metainfo_tag_type; +#endif + +// TODO: add description +struct invoke_body_with_tag_helper { + using first_priority = int; + using second_priority = double; + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + template + static auto invoke(first_priority, Body&& body, metainfo_tag_type&& tag, Args&&... args) + noexcept(noexcept(tbb::detail::invoke(body, std::forward(args)..., std::move(tag)))) + -> decltype(tbb::detail::invoke(body, std::forward(args)..., std::move(tag))) + { + tbb::detail::invoke(body, std::forward(args)..., std::move(tag)); + } +#endif + template + static void invoke(second_priority, Body&& body __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&&), + Args&&... args) + noexcept(noexcept(tbb::detail::invoke(body, std::forward(args)...))) + { + tbb::detail::invoke(body, std::forward(args)...); + } +}; + +// TODO: add comment +template +void invoke_body_with_tag(Body&& body __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& tag), Args&&... args) + noexcept(noexcept(invoke_body_with_tag_helper::invoke(1, std::forward(body) __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), + std::forward(args)...))) +{ + invoke_body_with_tag_helper::invoke(/*overload priority helper*/1, + std::forward(body) __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), + std::forward(args)...); +} + //! function_body that takes an Input and a set of output ports template class multifunction_body : no_assign { public: virtual ~multifunction_body () {} - virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/ __TBB_FLOW_GRAPH_METAINFO_ARG(multifunction_node_tag&& /*tag*/)) = 0; + virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/ __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& /*tag*/)) = 0; virtual multifunction_body* clone() = 0; virtual void* get_body_ptr() = 0; }; @@ -183,32 +220,11 @@ class multifunction_body : no_assign { //! leaf for multifunction. OutputSet can be a std::tuple or a vector. template class multifunction_body_leaf : public multifunction_body { - using first_priority = int; - using second_priority = double; - - // body may explicitly put() to one or more of oset. - void invoke_body_impl(const Input& input, OutputSet& oset __TBB_FLOW_GRAPH_METAINFO_ARG(multifunction_node_tag&&), second_priority) - { - tbb::detail::invoke(body, input, oset); - } - -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - template - auto invoke_body_impl(const InputT& input, OutputSetT& oset, multifunction_node_tag&& tag, first_priority) - -> decltype(tbb::detail::invoke(std::declval(), input, oset, std::move(tag)), void()) - { - tbb::detail::invoke(body, input, oset, std::move(tag)); - } -#endif - - void invoke_body(const Input& input, OutputSet& oset __TBB_FLOW_GRAPH_METAINFO_ARG(multifunction_node_tag&& tag)) { - invoke_body_impl(input, oset __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), 1); - } - public: multifunction_body_leaf(const B &_body) : body(_body) { } - void operator()(const Input &input, OutputSet &oset __TBB_FLOW_GRAPH_METAINFO_ARG(multifunction_node_tag&& tag)) override { - invoke_body(input, oset __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag))); + // body may explicitly put() to one or more of oset. + void operator()(const Input &input, OutputSet &oset __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& tag)) override { + invoke_body_with_tag(body __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), input, oset); } void* get_body_ptr() override { return &body; } multifunction_body_leaf* clone() override { diff --git a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h index 647f3dc1b6..344ad36d92 100644 --- a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h @@ -419,12 +419,13 @@ class broadcast_cache : public successor_cache { #endif // call try_put_task and return list of received tasks - bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) { + bool gather_successful_try_puts( const T &t, graph_task_list& tasks + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) { bool is_at_least_one_put_successful = false; typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); typename successors_type::iterator i = this->my_successors.begin(); while ( i != this->my_successors.end() ) { - graph_task * new_task = (*i)->try_put_task(t); + graph_task * new_task = (*i)->try_put_task(t, metainfo); if(new_task) { ++i; if(new_task != SUCCESSFULLY_ENQUEUED) { diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 62091c767e..6ba61e7230 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -556,23 +556,23 @@ struct init_output_ports { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT -class multifunction_node_tag { +class metainfo_tag_type { public: - multifunction_node_tag() = default; + metainfo_tag_type() = default; - multifunction_node_tag(const multifunction_node_tag&) = delete; + metainfo_tag_type(const metainfo_tag_type&) = delete; - multifunction_node_tag(multifunction_node_tag&& other) + metainfo_tag_type(metainfo_tag_type&& other) : my_metainfo(std::move(other.my_metainfo)) {} - multifunction_node_tag(const message_metainfo& metainfo) : my_metainfo(metainfo) { + metainfo_tag_type(const message_metainfo& metainfo) : my_metainfo(metainfo) { for (auto waiter : my_metainfo.waiters()) { waiter->reserve(); } } - multifunction_node_tag& operator=(const multifunction_node_tag&) = delete; - multifunction_node_tag& operator=(multifunction_node_tag&& other) { + metainfo_tag_type& operator=(const metainfo_tag_type&) = delete; + metainfo_tag_type& operator=(metainfo_tag_type&& other) { // TODO: should this method be thread-safe? if (this != &other) { reset(); @@ -581,11 +581,11 @@ class multifunction_node_tag { return *this; } - ~multifunction_node_tag() { + ~metainfo_tag_type() { reset(); } - void merge(const multifunction_node_tag& other_tag) { + void merge(const metainfo_tag_type& other_tag) { tbb::spin_mutex::scoped_lock lock(my_mutex); // TODO: add comment @@ -604,13 +604,18 @@ class multifunction_node_tag { my_metainfo = message_metainfo{}; } private: - template - friend class multifunction_output; + friend class metainfo_tag_accessor; message_metainfo my_metainfo; tbb::spin_mutex my_mutex; }; +struct metainfo_tag_accessor { + static const message_metainfo& get_metainfo(const metainfo_tag_type& tag) { + return tag.my_metainfo; + } +}; + #endif //! Implements methods for a function node that takes a type Input as input @@ -622,7 +627,7 @@ class multifunction_input : public function_input_base multifunction_body_type; typedef multifunction_input my_class; @@ -664,7 +669,7 @@ class multifunction_input : public function_input_base { } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - bool try_put(const output_type& i, const multifunction_node_tag& tag) { - return try_put_impl(i, tag.my_metainfo); + bool try_put(const output_type& i, const metainfo_tag_type& tag) { + return try_put_impl(i, metainfo_tag_accessor::get_metainfo(tag)); } - bool try_put(const output_type& i, multifunction_node_tag&& tag) { - multifunction_node_tag local_tag = std::move(tag); - return try_put_impl(i, local_tag.my_metainfo); + bool try_put(const output_type& i, metainfo_tag_type&& tag) { + metainfo_tag_type local_tag = std::move(tag); + return try_put_impl(i, metainfo_tag_accessor::get_metainfo(local_tag)); } #endif diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 5b438faabf..37ca50ccda 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -3130,8 +3130,11 @@ class async_body: public async_body_base { async_body(const Body &body, gateway_type *gateway) : base_type(gateway), my_body(body) { } - void operator()( const Input &v, Ports & ) noexcept(noexcept(tbb::detail::invoke(my_body, v, std::declval()))) { - tbb::detail::invoke(my_body, v, *this->my_gateway); + void operator()( const Input &v, Ports & __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& tag) ) + noexcept(noexcept(invoke_body_with_tag(my_body __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), + v, *this->my_gateway))) + { + invoke_body_with_tag(my_body __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), v, *this->my_gateway); } Body get_body() { return my_body; } @@ -3176,9 +3179,20 @@ class async_node //! Implements gateway_type::try_put for an external activity to submit a message to FG bool try_put(const Output &i) override { - return my_node->try_put_impl(i); + return my_node->try_put_impl(i __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); + } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool try_put(const Output &i, const metainfo_tag_type& tag) override { + return my_node->try_put_impl(i, metainfo_tag_accessor::get_metainfo(tag)); } + bool try_put(const Output &i, metainfo_tag_type&& tag) override { + metainfo_tag_type local_tag = std::move(tag); + return my_node->try_put_impl(i, metainfo_tag_accessor::get_metainfo(local_tag)); + } +#endif + private: async_node* my_node; } my_gateway; @@ -3187,13 +3201,14 @@ class async_node async_node* self() { return this; } //! Implements gateway_type::try_put for an external activity to submit a message to FG - bool try_put_impl(const Output &i) { + bool try_put_impl(const Output &i __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) { multifunction_output &port_0 = output_port<0>(*this); broadcast_cache& port_successors = port_0.successors(); fgt_async_try_put_begin(this, &port_0); // TODO revamp: change to std::list graph_task_list tasks; - bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks); + bool is_at_least_one_put_successful = + port_successors.gather_successful_try_puts(i, tasks __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(), "Return status is inconsistent with the method operation." ); diff --git a/include/oneapi/tbb/flow_graph_abstractions.h b/include/oneapi/tbb/flow_graph_abstractions.h index 329e75c43e..2d03c054f3 100644 --- a/include/oneapi/tbb/flow_graph_abstractions.h +++ b/include/oneapi/tbb/flow_graph_abstractions.h @@ -33,6 +33,9 @@ class graph_proxy { virtual ~graph_proxy() {} }; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +class metainfo_tag_type; +#endif template class receiver_gateway : public graph_proxy { public: @@ -41,6 +44,10 @@ class receiver_gateway : public graph_proxy { //! Submit signal from an asynchronous activity to FG. virtual bool try_put(const input_type&) = 0; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + virtual bool try_put(const input_type&, const metainfo_tag_type&) = 0; + virtual bool try_put(const input_type&, metainfo_tag_type&&) = 0; +#endif }; } // d2 diff --git a/test/tbb/test_async_node.cpp b/test/tbb/test_async_node.cpp index edab0c3857..1448ac51f8 100644 --- a/test/tbb/test_async_node.cpp +++ b/test/tbb/test_async_node.cpp @@ -32,6 +32,7 @@ #include "common/spin_barrier.h" #include "common/test_follows_and_precedes_api.h" #include "common/concepts_common.h" +#include "test_try_put_and_wait.h" #include #include @@ -802,9 +803,9 @@ TEST_CASE("Basic tests"){ //! NativeParallelFor test with various concurrency settings //! \brief \ref requirement \ref error_guessing -TEST_CASE("Lightweight tests"){ - lightweight_testing::test(NUMBER_OF_MSGS); -} +// TEST_CASE("Lightweight tests"){ +// lightweight_testing::test(NUMBER_OF_MSGS); +// } //! Test reset and cancellation //! \brief \ref error_guessing @@ -878,3 +879,11 @@ TEST_CASE("constraints for async_node body") { } #endif // __TBB_CPP20_CONCEPTS_PRESENT + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +//! \brief \ref error_guessing +TEST_CASE("test async_node try_put_and_wait") { + using node_type = oneapi::tbb::flow::async_node; + test_try_put_and_wait::test_multioutput(); +} +#endif diff --git a/test/tbb/test_buffering_try_put_and_wait.h b/test/tbb/test_buffering_try_put_and_wait.h deleted file mode 100644 index 300521233f..0000000000 --- a/test/tbb/test_buffering_try_put_and_wait.h +++ /dev/null @@ -1,189 +0,0 @@ -/* - Copyright (c) 2024 Intel Corporation - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -#ifndef __TBB_test_tbb_buffering_try_put_and_wait_H -#define __TBB_test_tbb_buffering_try_put_and_wait_H - -#include -#include - -#include - -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - -namespace test_try_put_and_wait { - -template -std::size_t test_buffer_push(const std::vector& start_work_items, - int wait_message, - const std::vector& new_work_items, - std::vector& processed_items, - Args... args) -{ - std::size_t after_try_put_and_wait_start_index = 0; - tbb::task_arena arena(1); - - arena.execute([&] { - tbb::flow::graph g; - - using function_node_type = tbb::flow::function_node; - - BufferingNode buffer1(g, args...); - - function_node_type function(g, tbb::flow::serial, - [&](int input) noexcept { - if (input == wait_message) { - for (auto item : new_work_items) { - buffer1.try_put(item); - } - } - return input; - }); - - BufferingNode buffer2(g, args...); - - function_node_type writer(g, tbb::flow::unlimited, - [&](int input) noexcept { - processed_items.emplace_back(input); - return 0; - }); - - tbb::flow::make_edge(buffer1, function); - tbb::flow::make_edge(function, buffer2); - tbb::flow::make_edge(buffer2, writer); - - for (auto item : start_work_items) { - buffer1.try_put(item); - } - - buffer1.try_put_and_wait(wait_message); - - after_try_put_and_wait_start_index = processed_items.size(); - - g.wait_for_all(); - }); - - return after_try_put_and_wait_start_index; -} - -template -std::size_t test_buffer_pull(const std::vector& start_work_items, - int wait_message, - int occupier, - const std::vector& new_work_items, - std::vector& processed_items, - Args... args) -{ - tbb::task_arena arena(1); - std::size_t after_try_put_and_wait_start_index = 0; - - arena.execute([&] { - tbb::flow::graph g; - - using function_node_type = tbb::flow::function_node; - - BufferingNode buffer(g, args...); - - function_node_type function(g, tbb::flow::serial, - [&](int input) noexcept { - if (input == wait_message) { - for (auto item : new_work_items) { - buffer.try_put(item); - } - } - - processed_items.emplace_back(input); - return 0; - }); - - // Occupy the concurrency of function_node - // This call spawns the task to process the occupier - function.try_put(occupier); - - // Make edge between buffer and function after occupying the concurrency - // To ensure that forward task of the buffer would be spawned after the occupier task - // And the function_node would reject the items from the buffer - // and process them later by calling try_get on the buffer - tbb::flow::make_edge(buffer, function); - - for (auto item : start_work_items) { - buffer.try_put(item); - } - - buffer.try_put_and_wait(wait_message); - - after_try_put_and_wait_start_index = processed_items.size(); - - g.wait_for_all(); - }); - - return after_try_put_and_wait_start_index; -} - -template -std::size_t test_buffer_reserve(std::size_t limiter_threshold, - const std::vector& start_work_items, - int wait_message, - const std::vector& new_work_items, - std::vector& processed_items, - Args... args) -{ - tbb::task_arena arena(1); - std::size_t after_try_put_and_wait_start_index = 0; - - arena.execute([&] { - tbb::flow::graph g; - - BufferingNode buffer(g, args...); - - tbb::flow::limiter_node limiter(g, limiter_threshold); - tbb::flow::function_node function(g, tbb::flow::serial, - [&](int input) { - if (input == wait_message) { - for (auto item : new_work_items) { - buffer.try_put(item); - } - } - // Explicitly put to the decrementer instead of making edge - // to guarantee that the next task would be spawned and not returned - // to the current thread as the next task - // Otherwise, all elements would be processed during the try_put_and_wait - limiter.decrementer().try_put(1); - processed_items.emplace_back(input); - return 0; - }); - - tbb::flow::make_edge(buffer, limiter); - tbb::flow::make_edge(limiter, function); - - for (auto item : start_work_items) { - buffer.try_put(item); - } - - buffer.try_put_and_wait(wait_message); - - after_try_put_and_wait_start_index = processed_items.size(); - - g.wait_for_all(); - }); - - return after_try_put_and_wait_start_index; -} - -} // test_try_put_and_wait - -#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT -#endif // __TBB_test_tbb_buffering_try_put_and_wait_H diff --git a/test/tbb/test_multifunction_node.cpp b/test/tbb/test_multifunction_node.cpp index 20b2354c5d..5fc8b5549f 100644 --- a/test/tbb/test_multifunction_node.cpp +++ b/test/tbb/test_multifunction_node.cpp @@ -28,6 +28,7 @@ #include "common/graph_utils.h" #include "common/test_follows_and_precedes_api.h" #include "common/concepts_common.h" +#include "test_try_put_and_wait.h" //! \file test_multifunction_node.cpp @@ -625,257 +626,10 @@ TEST_CASE("constraints for multifunction_node body") { #endif // __TBB_CPP20_CONCEPTS_PRESENT #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - -void test_tag_type() { - using multinode_type = tbb::flow::multifunction_node>; - using ports_type = typename multinode_type::output_ports_type; - using tag_type = typename multinode_type::tag_type; - - int processed = 0; - - tbb::flow::graph g; - multinode_type node(g, tbb::flow::unlimited, - [&](int input, ports_type&, tag_type&& tag) { - processed = input; - tag_type tag1; - tag_type tag2(std::move(tag)); - - tag1 = std::move(tag2); - tag = std::move(tag1); - }); - - node.try_put_and_wait(1); - CHECK_MESSAGE(processed == 1, "Body wait not called in try_put_and_wait call"); - g.wait_for_all(); -} - -void test_simple_broadcast() { - tbb::task_arena arena(1); - - using funcnode_type = tbb::flow::function_node; - using multinode_type = tbb::flow::multifunction_node>; - using ports_type = typename multinode_type::output_ports_type; - using tag_type = typename multinode_type::tag_type; - - arena.execute([&] { - tbb::flow::graph g; - - std::vector processed_items1; - std::vector processed_items2; - std::vector new_work_items; - - int wait_message = 10; - - for (int i = 0; i < wait_message; ++i) { - new_work_items.emplace_back(i); - } - - multinode_type* start_node = nullptr; - - multinode_type node(g, tbb::flow::unlimited, - [&](int input, ports_type& ports, tag_type&& tag) { - if (input == wait_message) { - for (int item : new_work_items) { - start_node->try_put(item); - } - } - - std::get<0>(ports).try_put(input, tag); - std::get<1>(ports).try_put(input, std::move(tag)); - }); - - start_node = &node; - - funcnode_type func1(g, tbb::flow::unlimited, - [&](int input) noexcept { - processed_items1.emplace_back(input); - return 0; - }); - - funcnode_type func2(g, tbb::flow::unlimited, - [&](int input) noexcept { - processed_items2.emplace_back(input); - return 0; - }); - - tbb::flow::make_edge(tbb::flow::output_port<0>(node), func1); - tbb::flow::make_edge(tbb::flow::output_port<1>(node), func2); - - bool result = node.try_put_and_wait(wait_message); - CHECK_MESSAGE(result, "unexpected try_put_and_wait result"); - - CHECK(processed_items1.size() == 1); - CHECK(processed_items2.size() == 1); - CHECK_MESSAGE(processed_items1[0] == wait_message, "Only the wait message should be processed by try_put_and_wait"); - CHECK_MESSAGE(processed_items2[0] == wait_message, "Only the wait message should be processed by try_put_and_wait"); - - g.wait_for_all(); - - CHECK(processed_items1.size() == new_work_items.size() + 1); - CHECK(processed_items2.size() == new_work_items.size() + 1); - - std::size_t check_index = 1; - for (std::size_t i = new_work_items.size(); i != 0; --i) { - CHECK_MESSAGE(processed_items1[check_index] == new_work_items[i - 1], "Unexpected items processing order"); - CHECK_MESSAGE(processed_items2[check_index] == new_work_items[i - 1], "Unexpected items processing order"); - ++check_index; - } - CHECK(check_index == processed_items1.size()); - }); -} - -void test_no_broadcast() { - if (std::thread::hardware_concurrency() == 1) { - return; - } - - tbb::task_arena arena(std::thread::hardware_concurrency(), 0); - - arena.execute([]() { - using multinode_type = tbb::flow::multifunction_node>; - using ports_type = typename multinode_type::output_ports_type; - using tag_type = typename multinode_type::tag_type; - - std::size_t num_items = 10; - std::size_t num_additional_items = 10; - - std::atomic num_processed_items = 0; - std::atomic num_processed_accumulators = 0; - std::atomic try_put_and_wait_exit_flag{}; - - int accumulator_message = 1; - int add_message = 2; - - tag_type global_tag; - - multinode_type* this_node = nullptr; - - std::vector postprocessed_items; - auto global_index = tbb::this_task_arena::current_thread_index(); - - tbb::flow::graph g; - multinode_type node(g, tbb::flow::unlimited, - [&](int input, ports_type& ports, tag_type&& local_tag) { - if (num_processed_items++ == 0) { - CHECK(input == accumulator_message); - ++num_processed_accumulators; - - global_tag = std::move(local_tag); - for (std::size_t i = 1; i < num_items; ++i) { - this_node->try_put(accumulator_message); - } - for (std::size_t i = 0; i < num_additional_items; ++i) { - this_node->try_put(add_message); - } - } else { - if (input == accumulator_message) { - if (num_processed_accumulators++ == num_items - 1) { - // The last accumulator was received - "cancel" the operation - global_tag.reset(); - } - } else { - if (global_index != tbb::this_task_arena::current_thread_index()) { - // Block the worker thread until the try_put_and_wait was exitted - while(!try_put_and_wait_exit_flag.load()) { - std::this_thread::yield(); - } - } - std::get<0>(ports).try_put(input); - } - } - }); - - this_node = &node; - - tbb::flow::function_node write_node(g, tbb::flow::serial, - [&](int value) noexcept { postprocessed_items.emplace_back(value); return 0; }); - - tbb::flow::make_edge(tbb::flow::output_port<0>(node), write_node); - - node.try_put_and_wait(accumulator_message); - - std::cout << num_processed_accumulators << std::endl; - std::cout << num_processed_items << std::endl; - std::cout << postprocessed_items.size() << std::endl; - - g.wait_for_all(); - }); -} - -void test_reduction() { - tbb::task_arena arena(1); - - arena.execute([]{ - int num_items = 5; - tbb::flow::graph g; - - using func_node_type = tbb::flow::function_node; - using multi_node_type = tbb::flow::multifunction_node>; - using ports_type = typename multi_node_type::output_ports_type; - using tag_type = typename multi_node_type::tag_type; - - func_node_type* start_node = nullptr; - - func_node_type start(g, tbb::flow::unlimited, - [&](int i) { - std::cout << "processing " << i << std::endl; - static bool extra_work_added = false; - if (!extra_work_added) { - extra_work_added = true; - for (int j = i + 1; j < i + num_items; ++j) { - start_node->try_put(j); - } - } - return i; - }); - - start_node = &start; - - int num_accumulated = 0; - int accumulated_result = 0; - tag_type accumulated_hint; - - tbb::flow::multifunction_node> multifunction(g, tbb::flow::unlimited, - [&](int i, ports_type& ports, const tag_type& tag) { - ++num_accumulated; - accumulated_result += i; - accumulated_hint.merge(tag); - - if (num_accumulated == num_items) { - std::get<0>(ports).try_put(accumulated_result, std::move(accumulated_hint)); - num_accumulated = 0; - } - }); - - tbb::flow::function_node writer(g, tbb::flow::unlimited, - [&](int res) { - std::cout << "Write = " << res << std::endl; - static bool extra_loop_added = false; - - if (!extra_loop_added) { - extra_loop_added = true; - for (int i = 100; i < 100 + num_items; ++i) { - multifunction.try_put(i); - } - } - return 0; - }); - - tbb::flow::make_edge(start, multifunction); - tbb::flow::make_edge(tbb::flow::output_port<0>(multifunction), writer); - - start.try_put_and_wait(1); - - std::cout << "Wait for all" << std::endl; - g.wait_for_all(); - }); -} - +//! \brief \ref error_guessing TEST_CASE("multifunction_node try_put_and_wait") { - test_tag_type(); - test_simple_broadcast(); - test_no_broadcast(); - // test_reduction(); + using node_type = oneapi::tbb::flow::multifunction_node>; + test_try_put_and_wait::test_multioutput(); } #endif diff --git a/test/tbb/test_try_put_and_wait.h b/test/tbb/test_try_put_and_wait.h new file mode 100644 index 0000000000..00e2cec9a2 --- /dev/null +++ b/test/tbb/test_try_put_and_wait.h @@ -0,0 +1,468 @@ +/* + Copyright (c) 2024 Intel Corporation + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __TBB_test_tbb_buffering_try_put_and_wait_H +#define __TBB_test_tbb_buffering_try_put_and_wait_H + +#include +#include + +#include + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + +namespace test_try_put_and_wait { + +template +std::size_t test_buffer_push(const std::vector& start_work_items, + int wait_message, + const std::vector& new_work_items, + std::vector& processed_items, + Args... args) +{ + std::size_t after_try_put_and_wait_start_index = 0; + tbb::task_arena arena(1); + + arena.execute([&] { + tbb::flow::graph g; + + using function_node_type = tbb::flow::function_node; + + BufferingNode buffer1(g, args...); + + function_node_type function(g, tbb::flow::serial, + [&](int input) noexcept { + if (input == wait_message) { + for (auto item : new_work_items) { + buffer1.try_put(item); + } + } + return input; + }); + + BufferingNode buffer2(g, args...); + + function_node_type writer(g, tbb::flow::unlimited, + [&](int input) noexcept { + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(buffer1, function); + tbb::flow::make_edge(function, buffer2); + tbb::flow::make_edge(buffer2, writer); + + for (auto item : start_work_items) { + buffer1.try_put(item); + } + + buffer1.try_put_and_wait(wait_message); + + after_try_put_and_wait_start_index = processed_items.size(); + + g.wait_for_all(); + }); + + return after_try_put_and_wait_start_index; +} + +template +std::size_t test_buffer_pull(const std::vector& start_work_items, + int wait_message, + int occupier, + const std::vector& new_work_items, + std::vector& processed_items, + Args... args) +{ + tbb::task_arena arena(1); + std::size_t after_try_put_and_wait_start_index = 0; + + arena.execute([&] { + tbb::flow::graph g; + + using function_node_type = tbb::flow::function_node; + + BufferingNode buffer(g, args...); + + function_node_type function(g, tbb::flow::serial, + [&](int input) noexcept { + if (input == wait_message) { + for (auto item : new_work_items) { + buffer.try_put(item); + } + } + + processed_items.emplace_back(input); + return 0; + }); + + // Occupy the concurrency of function_node + // This call spawns the task to process the occupier + function.try_put(occupier); + + // Make edge between buffer and function after occupying the concurrency + // To ensure that forward task of the buffer would be spawned after the occupier task + // And the function_node would reject the items from the buffer + // and process them later by calling try_get on the buffer + tbb::flow::make_edge(buffer, function); + + for (auto item : start_work_items) { + buffer.try_put(item); + } + + buffer.try_put_and_wait(wait_message); + + after_try_put_and_wait_start_index = processed_items.size(); + + g.wait_for_all(); + }); + + return after_try_put_and_wait_start_index; +} + +template +std::size_t test_buffer_reserve(std::size_t limiter_threshold, + const std::vector& start_work_items, + int wait_message, + const std::vector& new_work_items, + std::vector& processed_items, + Args... args) +{ + tbb::task_arena arena(1); + std::size_t after_try_put_and_wait_start_index = 0; + + arena.execute([&] { + tbb::flow::graph g; + + BufferingNode buffer(g, args...); + + tbb::flow::limiter_node limiter(g, limiter_threshold); + tbb::flow::function_node function(g, tbb::flow::serial, + [&](int input) { + if (input == wait_message) { + for (auto item : new_work_items) { + buffer.try_put(item); + } + } + // Explicitly put to the decrementer instead of making edge + // to guarantee that the next task would be spawned and not returned + // to the current thread as the next task + // Otherwise, all elements would be processed during the try_put_and_wait + limiter.decrementer().try_put(1); + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(buffer, limiter); + tbb::flow::make_edge(limiter, function); + + for (auto item : start_work_items) { + buffer.try_put(item); + } + + buffer.try_put_and_wait(wait_message); + + after_try_put_and_wait_start_index = processed_items.size(); + + g.wait_for_all(); + }); + + return after_try_put_and_wait_start_index; +} + +template +struct ports_or_gateway; + +template +struct ports_or_gateway> { + using type = typename tbb::flow::multifunction_node::output_ports_type; +}; + +template +struct ports_or_gateway> { + using type = typename tbb::flow::async_node::gateway_type; +}; + +template +using ports_or_gateway_t = typename ports_or_gateway::type; + +template +void put_to_ports_or_gateway(Gateway& gateway, const T& item, Tag&&... tag) { + gateway.try_put(item, std::forward(tag)...); +} + +template +void put_to_ports_or_gateway(std::tuple& ports, const T& item, Tag&&... tag) { + std::get<0>(ports).try_put(item, std::forward(tag)...); +} + +template +void test_multioutput_tag_type() { + static_assert(std::is_same::value, "Unexpected input type"); + using second_arg_type = ports_or_gateway_t; + using tag_type = typename NodeType::tag_type; + + int processed = 0; + + tbb::flow::graph g; + NodeType node(g, tbb::flow::unlimited, + [&](int input, second_arg_type&, tag_type&& tag) { + processed = input; + tag_type tag1; + tag_type tag2(std::move(tag)); + + tag1 = std::move(tag2); + tag = std::move(tag1); + }); + + node.try_put_and_wait(1); + CHECK_MESSAGE(processed == 1, "Body wait not called in try_put_and_wait call"); + g.wait_for_all(); +} + +// TODO: add description +template +void test_multioutput_simple_broadcast() { + static_assert(std::is_same::value, "Unexpected input type"); + tbb::task_arena arena(1); + + using funcnode_type = tbb::flow::function_node; + using second_argument_type = ports_or_gateway_t; + using tag_type = typename NodeType::tag_type; + + arena.execute([&] { + tbb::flow::graph g; + + std::vector processed_items; + std::vector new_work_items; + + int wait_message = 10; + + for (int i = 0; i < wait_message; ++i) { + new_work_items.emplace_back(i); + } + + NodeType* start_node = nullptr; + + NodeType node(g, tbb::flow::unlimited, + [&](int input, second_argument_type& port, tag_type&& tag) { + if (input == wait_message) { + for (int item : new_work_items) { + start_node->try_put(item); + } + } + + // Each even body execution copy-consumes the tag + // each odd execution - move-consumes + static bool copy_consume = true; + + if (copy_consume) { + put_to_ports_or_gateway(port, input, tag); + } else { + put_to_ports_or_gateway(port, input, std::move(tag)); + } + + copy_consume = !copy_consume; + }); + + start_node = &node; + + funcnode_type next_func(g, tbb::flow::unlimited, + [&](int input) noexcept { + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(node, next_func); + + bool result = node.try_put_and_wait(wait_message); + CHECK_MESSAGE(result, "unexpected try_put_and_wait result"); + + CHECK(processed_items.size() == 1); + CHECK_MESSAGE(processed_items[0] == wait_message, "Only the wait message should be processed by try_put_and_wait"); + + g.wait_for_all(); + + CHECK(processed_items.size() == new_work_items.size() + 1); + + std::size_t check_index = 1; + for (std::size_t i = new_work_items.size(); i != 0; --i) { + CHECK_MESSAGE(processed_items[check_index++] == new_work_items[i - 1], "Unexpected items processing order"); + } + CHECK(check_index == processed_items.size()); + }); +} + +// TODO: add description +template +void test_multioutput_no_broadcast() { + using second_argument_type = ports_or_gateway_t; + using tag_type = typename NodeType::tag_type; + + std::size_t num_items = 10; + std::size_t num_additional_items = 10; + + std::atomic num_processed_items = 0; + std::atomic num_processed_accumulators = 0; + + int accumulator_message = 1; + int add_message = 2; + + tag_type global_tag; + + NodeType* this_node = nullptr; + + std::vector postprocessed_items; + + tbb::flow::graph g; + NodeType node(g, tbb::flow::unlimited, + [&](int input, second_argument_type& port, tag_type&& local_tag) { + if (num_processed_items++ == 0) { + CHECK(input == accumulator_message); + ++num_processed_accumulators; + + global_tag = std::move(local_tag); + for (std::size_t i = 1; i < num_items; ++i) { + this_node->try_put(accumulator_message); + } + for (std::size_t i = 0; i < num_additional_items; ++i) { + this_node->try_put(add_message); + } + } else { + if (input == accumulator_message) { + global_tag.merge(std::move(local_tag)); + if (num_processed_accumulators++ == num_items - 1) { + // The last accumulator was received - "cancel" the operation + global_tag.reset(); + } + } else { + put_to_ports_or_gateway(port, input); + } + } + }); + + this_node = &node; + + tbb::flow::function_node write_node(g, tbb::flow::serial, + [&](int value) noexcept { postprocessed_items.emplace_back(value); return 0; }); + + tbb::flow::make_edge(tbb::flow::output_port<0>(node), write_node); + + node.try_put_and_wait(accumulator_message); + + CHECK_MESSAGE(num_processed_accumulators == num_items, "Unexpected number of accumulators processed"); + + g.wait_for_all(); + + CHECK_MESSAGE(num_processed_items == num_items + num_additional_items, "Unexpected number of items processed"); + CHECK_MESSAGE(postprocessed_items.size() == num_additional_items, "Unexpected number of items written"); + for (auto item : postprocessed_items) { + CHECK_MESSAGE(item == add_message, "Unexpected item written"); + } +} + +// TODO: add test description +template +void test_multioutput_reduction() { + tbb::task_arena arena(1); + + arena.execute([]{ + int num_items = 5; + tbb::flow::graph g; + + using func_node_type = tbb::flow::function_node; + using second_argument_type = ports_or_gateway_t; + using tag_type = typename NodeType::tag_type; + + func_node_type* start_node = nullptr; + + func_node_type start(g, tbb::flow::unlimited, + [&](int i) { + static bool extra_work_added = false; + if (!extra_work_added) { + extra_work_added = true; + for (int j = i + 1; j < i + num_items; ++j) { + start_node->try_put(j); + } + } + return i; + }); + + start_node = &start; + + int num_accumulated = 0; + int accumulated_result = 0; + tag_type accumulated_hint; + + std::vector processed_items; + + NodeType node(g, tbb::flow::unlimited, + [&](int i, second_argument_type& ports, const tag_type& tag) { + ++num_accumulated; + accumulated_result += i; + accumulated_hint.merge(tag); + + if (num_accumulated == num_items) { + put_to_ports_or_gateway(ports, accumulated_result, std::move(accumulated_hint)); + num_accumulated = 0; + } + }); + + tbb::flow::function_node writer(g, tbb::flow::unlimited, + [&](int res) { + // Start extra reduction that should not be handled by try_put_and_wait + static bool extra_loop_added = false; + + if (!extra_loop_added) { + extra_loop_added = true; + for (int i = 100; i < 100 + num_items; ++i) { + node.try_put(i); + } + } + + processed_items.emplace_back(res); + return 0; + }); + + tbb::flow::make_edge(start, node); + tbb::flow::make_edge(node, writer); + + start.try_put_and_wait(1); + + auto first_reduction_result = accumulated_result; + CHECK_MESSAGE(processed_items.size() == 1, "More than one reduction was processed"); + CHECK_MESSAGE(processed_items[0] == first_reduction_result, "Unexpected reduction result"); + + g.wait_for_all(); + + CHECK_MESSAGE(processed_items.size() == 2, "More than one reduction was processed"); + CHECK_MESSAGE(accumulated_result != first_reduction_result, "Unexpected reduction result"); + CHECK_MESSAGE(processed_items[1] == accumulated_result, "Unexpected reduction result"); + }); +} + +template +void test_multioutput() { + test_multioutput_tag_type(); + test_multioutput_simple_broadcast(); + test_multioutput_no_broadcast(); + test_multioutput_reduction(); +} + +} // test_try_put_and_wait + +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +#endif // __TBB_test_tbb_buffering_try_put_and_wait_H From 64b9d92bd0ae49169e9d355c1ee9fa4c21c6e7b0 Mon Sep 17 00:00:00 2001 From: kboyarinov Date: Wed, 15 Jan 2025 07:18:54 -0800 Subject: [PATCH 06/11] Update copyright years --- include/oneapi/tbb/detail/_flow_graph_body_impl.h | 2 +- include/oneapi/tbb/detail/_flow_graph_cache_impl.h | 2 +- include/oneapi/tbb/detail/_flow_graph_node_impl.h | 2 +- include/oneapi/tbb/flow_graph.h | 2 +- include/oneapi/tbb/flow_graph_abstractions.h | 2 +- test/tbb/test_async_node.cpp | 2 +- test/tbb/test_multifunction_node.cpp | 2 +- test/tbb/test_try_put_and_wait.h | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index fc6b9f49cd..ad6596e8ec 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h index 344ad36d92..801572b2fe 100644 --- a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 6ba61e7230..5fede54977 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 37ca50ccda..887e1a8964 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/include/oneapi/tbb/flow_graph_abstractions.h b/include/oneapi/tbb/flow_graph_abstractions.h index 2d03c054f3..6ae4777639 100644 --- a/include/oneapi/tbb/flow_graph_abstractions.h +++ b/include/oneapi/tbb/flow_graph_abstractions.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/tbb/test_async_node.cpp b/test/tbb/test_async_node.cpp index 1448ac51f8..a3c4afab1b 100644 --- a/test/tbb/test_async_node.cpp +++ b/test/tbb/test_async_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2023 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/tbb/test_multifunction_node.cpp b/test/tbb/test_multifunction_node.cpp index 5fc8b5549f..0b61081129 100644 --- a/test/tbb/test_multifunction_node.cpp +++ b/test/tbb/test_multifunction_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2021 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/tbb/test_try_put_and_wait.h b/test/tbb/test_try_put_and_wait.h index 00e2cec9a2..2c51502b53 100644 --- a/test/tbb/test_try_put_and_wait.h +++ b/test/tbb/test_try_put_and_wait.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2024 Intel Corporation + Copyright (c) 2024-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From ebb419460443e3f6020149ba0acfb08cf9392197 Mon Sep 17 00:00:00 2001 From: kboyarinov Date: Wed, 15 Jan 2025 07:42:13 -0800 Subject: [PATCH 07/11] Fix minor compilation issues --- include/oneapi/tbb/detail/_flow_graph_cache_impl.h | 2 +- test/tbb/test_buffer_node.cpp | 2 +- test/tbb/test_overwrite_node.cpp | 2 +- test/tbb/test_priority_queue_node.cpp | 2 +- test/tbb/test_queue_node.cpp | 2 +- test/tbb/test_sequencer_node.cpp | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h index 801572b2fe..64d8468107 100644 --- a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h @@ -425,7 +425,7 @@ class broadcast_cache : public successor_cache { typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); typename successors_type::iterator i = this->my_successors.begin(); while ( i != this->my_successors.end() ) { - graph_task * new_task = (*i)->try_put_task(t, metainfo); + graph_task * new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); if(new_task) { ++i; if(new_task != SUCCESSFULLY_ENQUEUED) { diff --git a/test/tbb/test_buffer_node.cpp b/test/tbb/test_buffer_node.cpp index 527005aecb..cb793c2d42 100644 --- a/test/tbb/test_buffer_node.cpp +++ b/test/tbb/test_buffer_node.cpp @@ -24,7 +24,7 @@ #include "common/graph_utils.h" #include "common/test_follows_and_precedes_api.h" -#include "test_buffering_try_put_and_wait.h" +#include "test_try_put_and_wait.h" //! \file test_buffer_node.cpp //! \brief Test for [flow_graph.buffer_node] specification diff --git a/test/tbb/test_overwrite_node.cpp b/test/tbb/test_overwrite_node.cpp index 3f5ed8fec0..0c41196a18 100644 --- a/test/tbb/test_overwrite_node.cpp +++ b/test/tbb/test_overwrite_node.cpp @@ -24,7 +24,7 @@ #include "common/graph_utils.h" #include "common/test_follows_and_precedes_api.h" -#include "test_buffering_try_put_and_wait.h" +#include "test_try_put_and_wait.h" //! \file test_overwrite_node.cpp //! \brief Test for [flow_graph.overwrite_node] specification diff --git a/test/tbb/test_priority_queue_node.cpp b/test/tbb/test_priority_queue_node.cpp index 18a60eb935..40a85e2faf 100644 --- a/test/tbb/test_priority_queue_node.cpp +++ b/test/tbb/test_priority_queue_node.cpp @@ -30,7 +30,7 @@ #include -#include "test_buffering_try_put_and_wait.h" +#include "test_try_put_and_wait.h" //! \file test_priority_queue_node.cpp //! \brief Test for [flow_graph.priority_queue_node] specification diff --git a/test/tbb/test_queue_node.cpp b/test/tbb/test_queue_node.cpp index 546b47edae..d60ecf5b13 100644 --- a/test/tbb/test_queue_node.cpp +++ b/test/tbb/test_queue_node.cpp @@ -30,7 +30,7 @@ #include -#include "test_buffering_try_put_and_wait.h" +#include "test_try_put_and_wait.h" //! \file test_queue_node.cpp //! \brief Test for [flow_graph.queue_node] specification diff --git a/test/tbb/test_sequencer_node.cpp b/test/tbb/test_sequencer_node.cpp index 1e6494d69b..94320be9cf 100644 --- a/test/tbb/test_sequencer_node.cpp +++ b/test/tbb/test_sequencer_node.cpp @@ -28,7 +28,7 @@ #include #include -#include "test_buffering_try_put_and_wait.h" +#include "test_try_put_and_wait.h" //! \file test_sequencer_node.cpp //! \brief Test for [flow_graph.sequencer_node] specification From c1a1cc03cee8cf5f48033e87034e905311de2a6a Mon Sep 17 00:00:00 2001 From: kboyarinov Date: Wed, 15 Jan 2025 09:29:06 -0800 Subject: [PATCH 08/11] Fix lightweight test --- include/oneapi/tbb/detail/_flow_graph_body_impl.h | 10 +++++----- include/oneapi/tbb/detail/_flow_graph_node_impl.h | 7 +++++-- include/oneapi/tbb/flow_graph.h | 2 +- test/tbb/test_async_node.cpp | 11 +++++++---- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index ad6596e8ec..72a2bf586f 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -180,18 +180,18 @@ struct invoke_body_with_tag_helper { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT template static auto invoke(first_priority, Body&& body, metainfo_tag_type&& tag, Args&&... args) - noexcept(noexcept(tbb::detail::invoke(body, std::forward(args)..., std::move(tag)))) - -> decltype(tbb::detail::invoke(body, std::forward(args)..., std::move(tag))) + noexcept(noexcept(tbb::detail::invoke(std::forward(body), std::forward(args)..., std::move(tag)))) + -> decltype(tbb::detail::invoke(std::forward(body), std::forward(args)..., std::move(tag)), void()) { - tbb::detail::invoke(body, std::forward(args)..., std::move(tag)); + tbb::detail::invoke(std::forward(body), std::forward(args)..., std::move(tag)); } #endif template static void invoke(second_priority, Body&& body __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&&), Args&&... args) - noexcept(noexcept(tbb::detail::invoke(body, std::forward(args)...))) + noexcept(noexcept(tbb::detail::invoke(std::forward(body), std::forward(args)...))) { - tbb::detail::invoke(body, std::forward(args)...); + tbb::detail::invoke(std::forward(body), std::forward(args)...); } }; diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 5fede54977..b7c90a01bd 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -604,7 +604,7 @@ class metainfo_tag_type { my_metainfo = message_metainfo{}; } private: - friend class metainfo_tag_accessor; + friend struct metainfo_tag_accessor; message_metainfo my_metainfo; tbb::spin_mutex my_mutex; @@ -637,7 +637,10 @@ class multifunction_input : public function_input_base multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority ) - : base_type(g, max_concurrency, a_priority, noexcept(1)) + : base_type(g, max_concurrency, a_priority, + // noexcept(1)) + noexcept(invoke_body_with_tag(body __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type{}), + input_type(), my_output_ports))) , my_body( new multifunction_body_leaf(body) ) , my_init_body( new multifunction_body_leaf(body) ) , my_output_ports(init_output_ports::call(g, my_output_ports)){ diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 887e1a8964..a991411685 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -3132,7 +3132,7 @@ class async_body: public async_body_base { void operator()( const Input &v, Ports & __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& tag) ) noexcept(noexcept(invoke_body_with_tag(my_body __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), - v, *this->my_gateway))) + v, std::declval()))) { invoke_body_with_tag(my_body __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), v, *this->my_gateway); } diff --git a/test/tbb/test_async_node.cpp b/test/tbb/test_async_node.cpp index a3c4afab1b..5fe64acb93 100644 --- a/test/tbb/test_async_node.cpp +++ b/test/tbb/test_async_node.cpp @@ -42,7 +42,6 @@ //! \file test_async_node.cpp //! \brief Test for [flow_graph.async_node] specification - class minimal_type { template friend struct place_wrapper; @@ -803,9 +802,13 @@ TEST_CASE("Basic tests"){ //! NativeParallelFor test with various concurrency settings //! \brief \ref requirement \ref error_guessing -// TEST_CASE("Lightweight tests"){ -// lightweight_testing::test(NUMBER_OF_MSGS); -// } +TEST_CASE("Lightweight tests"){ + tbb::task_arena arena(1); + + arena.execute([]() { + lightweight_testing::test(NUMBER_OF_MSGS); + }); +} //! Test reset and cancellation //! \brief \ref error_guessing From 9357efb05619918ea77e3ca30d5a0604f02c5c4b Mon Sep 17 00:00:00 2001 From: kboyarinov Date: Wed, 15 Jan 2025 09:37:47 -0800 Subject: [PATCH 09/11] Fix copyrights & remove unnecessary comments --- include/oneapi/tbb/detail/_flow_graph_body_impl.h | 3 ++- include/oneapi/tbb/detail/_flow_graph_node_impl.h | 1 - test/tbb/test_async_node.cpp | 7 +------ test/tbb/test_buffer_node.cpp | 2 +- test/tbb/test_multifunction_node.cpp | 10 +++++----- test/tbb/test_overwrite_node.cpp | 2 +- test/tbb/test_priority_queue_node.cpp | 2 +- test/tbb/test_queue_node.cpp | 2 +- test/tbb/test_sequencer_node.cpp | 2 +- 9 files changed, 13 insertions(+), 18 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index 72a2bf586f..df5160c84e 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -212,7 +212,8 @@ template class multifunction_body : no_assign { public: virtual ~multifunction_body () {} - virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/ __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& /*tag*/)) = 0; + virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/ + __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& /*tag*/)) = 0; virtual multifunction_body* clone() = 0; virtual void* get_body_ptr() = 0; }; diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index b7c90a01bd..bc22f81e8a 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -638,7 +638,6 @@ class multifunction_input : public function_input_base multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority ) : base_type(g, max_concurrency, a_priority, - // noexcept(1)) noexcept(invoke_body_with_tag(body __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type{}), input_type(), my_output_ports))) , my_body( new multifunction_body_leaf(body) ) diff --git a/test/tbb/test_async_node.cpp b/test/tbb/test_async_node.cpp index 5fe64acb93..bba27ededf 100644 --- a/test/tbb/test_async_node.cpp +++ b/test/tbb/test_async_node.cpp @@ -41,7 +41,6 @@ //! \file test_async_node.cpp //! \brief Test for [flow_graph.async_node] specification - class minimal_type { template friend struct place_wrapper; @@ -803,11 +802,7 @@ TEST_CASE("Basic tests"){ //! NativeParallelFor test with various concurrency settings //! \brief \ref requirement \ref error_guessing TEST_CASE("Lightweight tests"){ - tbb::task_arena arena(1); - - arena.execute([]() { - lightweight_testing::test(NUMBER_OF_MSGS); - }); + lightweight_testing::test(NUMBER_OF_MSGS); } //! Test reset and cancellation diff --git a/test/tbb/test_buffer_node.cpp b/test/tbb/test_buffer_node.cpp index cb793c2d42..80fd77790a 100644 --- a/test/tbb/test_buffer_node.cpp +++ b/test/tbb/test_buffer_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/tbb/test_multifunction_node.cpp b/test/tbb/test_multifunction_node.cpp index 0b61081129..f5696afb95 100644 --- a/test/tbb/test_multifunction_node.cpp +++ b/test/tbb/test_multifunction_node.cpp @@ -542,11 +542,11 @@ TEST_CASE("Test ports retrurn references"){ test_ports_return_references(); } -// //! NativeParallelFor testing with various concurrency settings -// //! \brief \ref error_guessing -// TEST_CASE("Lightweight testing"){ -// lightweight_testing::test(10); -// } +//! NativeParallelFor testing with various concurrency settings +//! \brief \ref error_guessing +TEST_CASE("Lightweight testing"){ + lightweight_testing::test(10); +} #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET //! Test follows and precedes API diff --git a/test/tbb/test_overwrite_node.cpp b/test/tbb/test_overwrite_node.cpp index 0c41196a18..270624abf7 100644 --- a/test/tbb/test_overwrite_node.cpp +++ b/test/tbb/test_overwrite_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/tbb/test_priority_queue_node.cpp b/test/tbb/test_priority_queue_node.cpp index 40a85e2faf..5971f2d62c 100644 --- a/test/tbb/test_priority_queue_node.cpp +++ b/test/tbb/test_priority_queue_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/tbb/test_queue_node.cpp b/test/tbb/test_queue_node.cpp index d60ecf5b13..25dd3cbc29 100644 --- a/test/tbb/test_queue_node.cpp +++ b/test/tbb/test_queue_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/tbb/test_sequencer_node.cpp b/test/tbb/test_sequencer_node.cpp index 94320be9cf..bc75d7d891 100644 --- a/test/tbb/test_sequencer_node.cpp +++ b/test/tbb/test_sequencer_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 8ab461bd27136c77c98e0b48adb0c662b1d742a8 Mon Sep 17 00:00:00 2001 From: kboyarinov Date: Thu, 16 Jan 2025 05:26:15 -0800 Subject: [PATCH 10/11] Fix copyright in test_function_node --- test/tbb/test_function_node.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/tbb/test_function_node.cpp b/test/tbb/test_function_node.cpp index 563aec1ba1..f11324f2e7 100644 --- a/test/tbb/test_function_node.cpp +++ b/test/tbb/test_function_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2024 Intel Corporation + Copyright (c) 2005-2025 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From a0d26a1313135ea499a7ee451066f47976adf34e Mon Sep 17 00:00:00 2001 From: kboyarinov Date: Thu, 16 Jan 2025 05:58:14 -0800 Subject: [PATCH 11/11] Fix C++20 --- include/oneapi/tbb/flow_graph.h | 50 +++++++++++++++++++++----------- test/tbb/test_try_put_and_wait.h | 4 +-- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index a991411685..5ed95fa46a 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -84,6 +84,13 @@ class continue_msg {}; } // namespace d2 +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) , metainfo + +#else +#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + #if __TBB_CPP20_CONCEPTS_PRESENT namespace d0 { @@ -114,18 +121,26 @@ concept input_node_body = std::copy_constructible && { body(fc) } -> adaptive_same_as; }; -template +template concept multifunction_node_body = std::copy_constructible && - std::invocable; + std::invocable +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + || std::invocable&&> +#endif + ; template concept sequencer = std::copy_constructible && std::invocable && std::convertible_to, std::size_t>; -template +template concept async_node_body = std::copy_constructible && - std::invocable; + std::invocable +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + || std::invocable&&> +#endif + ; } // namespace d0 #endif // __TBB_CPP20_CONCEPTS_PRESENT @@ -212,11 +227,6 @@ class message_metainfo { private: waiters_type my_waiters; }; // class message_metainfo - -#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) , metainfo - -#else -#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT //! Pure virtual template class that defines a sender of messages of type T @@ -989,8 +999,11 @@ class multifunction_node : private: using input_impl_type::my_predecessors; public: +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + typedef typename input_impl_type::tag_type tag_type; +#endif template - __TBB_requires(multifunction_node_body) + __TBB_requires(multifunction_node_body) __TBB_NOINLINE_SYM multifunction_node( graph &g, size_t concurrency, Body body, Policy = Policy(), node_priority_t a_priority = no_priority @@ -1003,13 +1016,13 @@ class multifunction_node : } template - __TBB_requires(multifunction_node_body) + __TBB_requires(multifunction_node_body) __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority) : multifunction_node(g, concurrency, body, Policy(), a_priority) {} #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET template - __TBB_requires(multifunction_node_body) + __TBB_requires(multifunction_node_body) __TBB_NOINLINE_SYM multifunction_node(const node_set& nodes, size_t concurrency, Body body, Policy p = Policy(), node_priority_t a_priority = no_priority) : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) { @@ -1017,7 +1030,7 @@ class multifunction_node : } template - __TBB_requires(multifunction_node_body) + __TBB_requires(multifunction_node_body) __TBB_NOINLINE_SYM multifunction_node(const node_set& nodes, size_t concurrency, Body body, node_priority_t a_priority) : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {} #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET @@ -3160,6 +3173,9 @@ class async_node typedef receiver_gateway gateway_type; typedef async_body_base async_body_base_type; typedef typename base_type::output_ports_type output_ports_type; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + typedef typename mfn_input_type::tag_type tag_type; +#endif private: class receiver_gateway_impl: public receiver_gateway { @@ -3221,7 +3237,7 @@ class async_node public: template - __TBB_requires(async_node_body) + __TBB_requires(async_node_body) __TBB_NOINLINE_SYM async_node( graph &g, size_t concurrency, Body body, Policy = Policy(), node_priority_t a_priority = no_priority @@ -3237,13 +3253,13 @@ class async_node } template - __TBB_requires(async_node_body) + __TBB_requires(async_node_body) __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority) : async_node(g, concurrency, body, Policy(), a_priority) {} #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET template - __TBB_requires(async_node_body) + __TBB_requires(async_node_body) __TBB_NOINLINE_SYM async_node( const node_set& nodes, size_t concurrency, Body body, Policy = Policy(), node_priority_t a_priority = no_priority ) @@ -3252,7 +3268,7 @@ class async_node } template - __TBB_requires(async_node_body) + __TBB_requires(async_node_body) __TBB_NOINLINE_SYM async_node(const node_set& nodes, size_t concurrency, Body body, node_priority_t a_priority) : async_node(nodes, concurrency, body, Policy(), a_priority) {} #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET diff --git a/test/tbb/test_try_put_and_wait.h b/test/tbb/test_try_put_and_wait.h index 2c51502b53..3e9c3d94f4 100644 --- a/test/tbb/test_try_put_and_wait.h +++ b/test/tbb/test_try_put_and_wait.h @@ -315,8 +315,8 @@ void test_multioutput_no_broadcast() { std::size_t num_items = 10; std::size_t num_additional_items = 10; - std::atomic num_processed_items = 0; - std::atomic num_processed_accumulators = 0; + std::atomic num_processed_items{0}; + std::atomic num_processed_accumulators{0}; int accumulator_message = 1; int add_message = 2;