Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[try_put_and_wait] Add tag-based implementation for try_put_and_wait for multifunction and async nodes #1604

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
51 changes: 46 additions & 5 deletions include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2024 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -168,23 +168,64 @@ class function_body_leaf< continue_msg, Output, B > : public function_body< cont
B body;
};

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
class metainfo_tag_type;
#endif

// TODO: add description
struct invoke_body_with_tag_helper {
using first_priority = int;
using second_priority = double;

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename Body, typename... Args>
static auto invoke(first_priority, Body&& body, metainfo_tag_type&& tag, Args&&... args)
noexcept(noexcept(tbb::detail::invoke(std::forward<Body>(body), std::forward<Args>(args)..., std::move(tag))))
-> decltype(tbb::detail::invoke(std::forward<Body>(body), std::forward<Args>(args)..., std::move(tag)), void())
{
tbb::detail::invoke(std::forward<Body>(body), std::forward<Args>(args)..., std::move(tag));
}
#endif
template <typename Body, typename... Args>
static void invoke(second_priority, Body&& body __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&&),
Args&&... args)
noexcept(noexcept(tbb::detail::invoke(std::forward<Body>(body), std::forward<Args>(args)...)))
{
tbb::detail::invoke(std::forward<Body>(body), std::forward<Args>(args)...);
}
};

// TODO: add comment
template <typename Body, typename... Args>
void invoke_body_with_tag(Body&& body __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& tag), Args&&... args)
noexcept(noexcept(invoke_body_with_tag_helper::invoke(1, std::forward<Body>(body) __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)),
std::forward<Args>(args)...)))
{
invoke_body_with_tag_helper::invoke(/*overload priority helper*/1,
std::forward<Body>(body) __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)),
std::forward<Args>(args)...);
}


//! function_body that takes an Input and a set of output ports
template<typename Input, typename OutputSet>
class multifunction_body : no_assign {
public:
virtual ~multifunction_body () {}
virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/) = 0;
virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/
__TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& /*tag*/)) = 0;
virtual multifunction_body* clone() = 0;
virtual void* get_body_ptr() = 0;
};

//! leaf for multifunction. OutputSet can be a std::tuple or a vector.
template<typename Input, typename OutputSet, typename B >
template<typename Input, typename OutputSet, typename B>
class multifunction_body_leaf : public multifunction_body<Input, OutputSet> {
public:
multifunction_body_leaf(const B &_body) : body(_body) { }
void operator()(const Input &input, OutputSet &oset) override {
tbb::detail::invoke(body, input, oset); // body may explicitly put() to one or more of oset.
// body may explicitly put() to one or more of oset.
void operator()(const Input &input, OutputSet &oset __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& tag)) override {
invoke_body_with_tag(body __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), input, oset);
}
void* get_body_ptr() override { return &body; }
multifunction_body_leaf* clone() override {
Expand Down
7 changes: 4 additions & 3 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2024 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -419,12 +419,13 @@ class broadcast_cache : public successor_cache<T, M> {
#endif

// call try_put_task and return list of received tasks
bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
bool gather_successful_try_puts( const T &t, graph_task_list& tasks
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
bool is_at_least_one_put_successful = false;
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task * new_task = (*i)->try_put_task(t);
graph_task * new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if(new_task) {
++i;
if(new_task != SUCCESSFULLY_ENQUEUED) {
Expand Down
104 changes: 95 additions & 9 deletions include/oneapi/tbb/detail/_flow_graph_node_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2024 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -554,6 +554,70 @@ struct init_output_ports {
}
}; // struct init_output_ports

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT

class metainfo_tag_type {
public:
metainfo_tag_type() = default;

metainfo_tag_type(const metainfo_tag_type&) = delete;

metainfo_tag_type(metainfo_tag_type&& other)
: my_metainfo(std::move(other.my_metainfo)) {}

metainfo_tag_type(const message_metainfo& metainfo) : my_metainfo(metainfo) {
for (auto waiter : my_metainfo.waiters()) {
waiter->reserve();
}
}

metainfo_tag_type& operator=(const metainfo_tag_type&) = delete;
metainfo_tag_type& operator=(metainfo_tag_type&& other) {
// TODO: should this method be thread-safe?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could someone want to use assignment in place of reset to start a next batch of accumulations -- starting with a different first tag. Then there could be a race between the assignment and merge..?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is in theory really interesting, which thread-safety guarantees should be provided.
It is obvious, that:

  • merge should be thread-safe with other merge
  • reset should be thread-safe with reset

merge can be thread-safe with reset (as well as with operator=) but it is hard to imagine the real use case. If we have both merge and reset simultaneously, which result can the user expect - merged or reset object.
Currently, the first function that acquires the mutex will define the behavior.
But let's consider the 2 consecutive reductions that use the single accumulators. If in the edge between 2 reductions, there would be simultaneous merge from reduction 2 and reset from the reduction 1, some elements from the reduction 2 can be missed if the merge will go before the reset.

For me, it is unclear how we can manage this. I think the answer for this question is the answer also to thread-safety of operator=

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other interesting question is should it be thread-safe to have the left.merge(middle) simultaneous with middle.merge(right). I feel that the answer is no, but still want to highlight this.

if (this != &other) {
reset();
my_metainfo = std::move(other.my_metainfo);
}
return *this;
}

~metainfo_tag_type() {
reset();
}

void merge(const metainfo_tag_type& other_tag) {
tbb::spin_mutex::scoped_lock lock(my_mutex);

// TODO: add comment
for (auto waiter : other_tag.my_metainfo.waiters()) {
waiter->reserve();
}
my_metainfo.merge(other_tag.my_metainfo);
}

void reset() {
tbb::spin_mutex::scoped_lock lock(my_mutex);

for (auto waiter : my_metainfo.waiters()) {
waiter->release();
}
my_metainfo = message_metainfo{};
}
private:
friend struct metainfo_tag_accessor;

message_metainfo my_metainfo;
tbb::spin_mutex my_mutex;
};

struct metainfo_tag_accessor {
static const message_metainfo& get_metainfo(const metainfo_tag_type& tag) {
return tag.my_metainfo;
}
};

#endif

//! Implements methods for a function node that takes a type Input as input
// and has a tuple of output ports specified.
template< typename Input, typename OutputPortSet, typename Policy, typename A>
Expand All @@ -562,6 +626,9 @@ class multifunction_input : public function_input_base<Input, Policy, A, multifu
static const int N = std::tuple_size<OutputPortSet>::value;
typedef Input input_type;
typedef OutputPortSet output_ports_type;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
typedef metainfo_tag_type tag_type;
#endif
typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class;
typedef function_input_base<Input, Policy, A, my_class> base_type;
Expand All @@ -570,7 +637,9 @@ class multifunction_input : public function_input_base<Input, Policy, A, multifu
// constructor
template<typename Body>
multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority )
: base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type(), my_output_ports)))
: base_type(g, max_concurrency, a_priority,
noexcept(invoke_body_with_tag(body __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type{}),
input_type(), my_output_ports)))
, my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
, my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
, my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
Expand Down Expand Up @@ -599,10 +668,13 @@ class multifunction_input : public function_input_base<Input, Policy, A, multifu
// the task we were successful.
//TODO: consider moving common parts with implementation in function_input into separate function
graph_task* apply_body_impl_bypass( const input_type &i
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo&) )
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) )
{
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
metainfo_tag_type tag(metainfo);
#endif
fgt_begin_body( my_body );
(*my_body)(i, my_output_ports);
(*my_body)(i, my_output_ports __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)));
fgt_end_body( my_body );
graph_task* ttask = nullptr;
if(base_type::my_max_concurrency != 0) {
Expand Down Expand Up @@ -851,7 +923,25 @@ class multifunction_output : public function_output<Output> {
multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {}

bool try_put(const output_type &i) {
graph_task *res = try_put_task(i);
return try_put_impl(i __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool try_put(const output_type& i, const metainfo_tag_type& tag) {
return try_put_impl(i, metainfo_tag_accessor::get_metainfo(tag));
}

bool try_put(const output_type& i, metainfo_tag_type&& tag) {
metainfo_tag_type local_tag = std::move(tag);
return try_put_impl(i, metainfo_tag_accessor::get_metainfo(local_tag));
}
#endif

using base_type::graph_reference;

protected:
bool try_put_impl(const output_type& i __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
graph_task *res = try_put_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if( !res ) return false;
if( res != SUCCESSFULLY_ENQUEUED ) {
// wrapping in task_arena::execute() is not needed since the method is called from
Expand All @@ -861,10 +951,6 @@ class multifunction_output : public function_output<Output> {
return true;
}

using base_type::graph_reference;

protected:

graph_task* try_put_task(const output_type &i) {
return my_successors.try_put_task(i);
}
Expand Down
27 changes: 21 additions & 6 deletions include/oneapi/tbb/flow_graph.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2024 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -3130,8 +3130,11 @@ class async_body: public async_body_base<Gateway> {
async_body(const Body &body, gateway_type *gateway)
: base_type(gateway), my_body(body) { }

void operator()( const Input &v, Ports & ) noexcept(noexcept(tbb::detail::invoke(my_body, v, std::declval<gateway_type&>()))) {
tbb::detail::invoke(my_body, v, *this->my_gateway);
void operator()( const Input &v, Ports & __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo_tag_type&& tag) )
noexcept(noexcept(invoke_body_with_tag(my_body __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)),
v, std::declval<gateway_type&>())))
{
invoke_body_with_tag(my_body __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(tag)), v, *this->my_gateway);
}

Body get_body() { return my_body; }
Expand Down Expand Up @@ -3176,9 +3179,20 @@ class async_node

//! Implements gateway_type::try_put for an external activity to submit a message to FG
bool try_put(const Output &i) override {
return my_node->try_put_impl(i);
return my_node->try_put_impl(i __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool try_put(const Output &i, const metainfo_tag_type& tag) override {
return my_node->try_put_impl(i, metainfo_tag_accessor::get_metainfo(tag));
}

bool try_put(const Output &i, metainfo_tag_type&& tag) override {
metainfo_tag_type local_tag = std::move(tag);
return my_node->try_put_impl(i, metainfo_tag_accessor::get_metainfo(local_tag));
}
#endif

private:
async_node* my_node;
} my_gateway;
Expand All @@ -3187,13 +3201,14 @@ class async_node
async_node* self() { return this; }

//! Implements gateway_type::try_put for an external activity to submit a message to FG
bool try_put_impl(const Output &i) {
bool try_put_impl(const Output &i __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
multifunction_output<Output> &port_0 = output_port<0>(*this);
broadcast_cache<output_type>& port_successors = port_0.successors();
fgt_async_try_put_begin(this, &port_0);
// TODO revamp: change to std::list<graph_task*>
graph_task_list tasks;
bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
bool is_at_least_one_put_successful =
port_successors.gather_successful_try_puts(i, tasks __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
__TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
"Return status is inconsistent with the method operation." );

Expand Down
9 changes: 8 additions & 1 deletion include/oneapi/tbb/flow_graph_abstractions.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2024 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,9 @@ class graph_proxy {
virtual ~graph_proxy() {}
};

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
class metainfo_tag_type;
#endif
template <typename Input>
class receiver_gateway : public graph_proxy {
public:
Expand All @@ -41,6 +44,10 @@ class receiver_gateway : public graph_proxy {

//! Submit signal from an asynchronous activity to FG.
virtual bool try_put(const input_type&) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual bool try_put(const input_type&, const metainfo_tag_type&) = 0;
virtual bool try_put(const input_type&, metainfo_tag_type&&) = 0;
#endif
};

} // d2
Expand Down
13 changes: 10 additions & 3 deletions test/tbb/test_async_node.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
#include "common/spin_barrier.h"
#include "common/test_follows_and_precedes_api.h"
#include "common/concepts_common.h"
#include "test_try_put_and_wait.h"

#include <string>
#include <thread>
Expand All @@ -40,8 +41,6 @@

//! \file test_async_node.cpp
//! \brief Test for [flow_graph.async_node] specification


class minimal_type {
template<typename T>
friend struct place_wrapper;
Expand Down Expand Up @@ -878,3 +877,11 @@ TEST_CASE("constraints for async_node body") {
}

#endif // __TBB_CPP20_CONCEPTS_PRESENT

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
//! \brief \ref error_guessing
TEST_CASE("test async_node try_put_and_wait") {
using node_type = oneapi::tbb::flow::async_node<int, int, tbb::flow::queueing>;
test_try_put_and_wait::test_multioutput<node_type>();
}
#endif
4 changes: 2 additions & 2 deletions test/tbb/test_buffer_node.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2024 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,7 @@
#include "common/graph_utils.h"
#include "common/test_follows_and_precedes_api.h"

#include "test_buffering_try_put_and_wait.h"
#include "test_try_put_and_wait.h"

//! \file test_buffer_node.cpp
//! \brief Test for [flow_graph.buffer_node] specification
Expand Down
Loading
Loading