Skip to content

Commit

Permalink
add example with bthread for anyflow
Browse files Browse the repository at this point in the history
  • Loading branch information
oathdruid committed May 30, 2024
1 parent ad9afd8 commit 517e2ee
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 0 deletions.
20 changes: 20 additions & 0 deletions example/use-with-bthread/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ cc_library(
],
)

cc_library(
name = 'bthread_graph_executor',
srcs = ['bthread_graph_executor.cpp'],
hdrs = ['bthread_graph_executor.h'],
deps = [
':butex_interface',
'@babylon//:anyflow',
'@brpc//:bthread',
],
)

cc_binary(
name = 'example',
srcs = ['example.cpp'],
Expand All @@ -27,3 +38,12 @@ cc_binary(
'@babylon//:logging',
],
)

cc_binary(
name = 'anyflow_multi_nodes',
srcs = ['anyflow_multi_nodes.cpp'],
deps = [
':bthread_graph_executor',
'@babylon//:anyflow',
],
)
125 changes: 125 additions & 0 deletions example/use-with-bthread/anyflow_multi_nodes.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#include "babylon/anyflow/builder.h"

#include "bthread_graph_executor.h"

#include <iostream>

using ::babylon::anyflow::GraphBuilder;
using ::babylon::anyflow::GraphProcessor;

struct AddProcessor : public GraphProcessor {
virtual int process() noexcept override {
*c.emit() = *a + *b;
return 0;
}

ANYFLOW_INTERFACE(ANYFLOW_DEPEND_DATA(int32_t, a, 0)
ANYFLOW_DEPEND_DATA(int32_t, b, 1)
ANYFLOW_EMIT_DATA(int32_t, c))
};

struct SubtractProcessor : public GraphProcessor {
virtual int process() noexcept override {
*c.emit() = *a - *b;
return 0;
}

ANYFLOW_INTERFACE(ANYFLOW_DEPEND_DATA(int32_t, a, 0)
ANYFLOW_DEPEND_DATA(int32_t, b, 1)
ANYFLOW_EMIT_DATA(int32_t, c))
};

struct MultiplyProcessor : public GraphProcessor {
virtual int process() noexcept override {
*c.emit() = (*a) * (*b);
return 0;
}

ANYFLOW_INTERFACE(ANYFLOW_DEPEND_DATA(int32_t, a, 0)
ANYFLOW_DEPEND_DATA(int32_t, b, 1)
ANYFLOW_EMIT_DATA(int32_t, c))
};

int main() {
// let A = 10, B = 5
// try to prove that (A + B) * (A - B) = A * A - B * B
int input_a = 10;
int input_b = 5;
int res_left = 0;
int res_right = 0;
{
GraphBuilder builder;

auto& v1 = builder.add_vertex([] {
return ::std::unique_ptr<AddProcessor>(new AddProcessor);
});
v1.named_depend("a").to("A");
v1.named_depend("b").to("B");
v1.named_emit("c").to("AddRes");

auto& v2 = builder.add_vertex([] {
return ::std::unique_ptr<SubtractProcessor>(new SubtractProcessor);
});
v2.named_depend("a").to("A");
v2.named_depend("b").to("B");
v2.named_emit("c").to("SubtractRes");

auto& v3 = builder.add_vertex([] {
return ::std::unique_ptr<MultiplyProcessor>(new MultiplyProcessor);
});
v3.named_depend("a").to("AddRes");
v3.named_depend("b").to("SubtractRes");
v3.named_emit("c").to("FinalRes");

builder.set_executor(::babylon::anyflow::BthreadGraphExecutor::instance());
builder.finish();
auto graph = builder.build();
auto a = graph->find_data("A");
auto b = graph->find_data("B");
auto final_res = graph->find_data("FinalRes");

*(a->emit<int>()) = input_a;
*(b->emit<int>()) = input_b;
graph->run(final_res);
res_left = *final_res->value<int>();
}
{
GraphBuilder builder;

auto& v1 = builder.add_vertex([] {
return ::std::unique_ptr<MultiplyProcessor>(new MultiplyProcessor);
});
v1.named_depend("a").to("A");
v1.named_depend("b").to("A");
v1.named_emit("c").to("MultiplyResForA");

auto& v2 = builder.add_vertex([] {
return ::std::unique_ptr<MultiplyProcessor>(new MultiplyProcessor);
});
v2.named_depend("a").to("B");
v2.named_depend("b").to("B");
v2.named_emit("c").to("MultiplyResForB");

auto& v3 = builder.add_vertex([] {
return ::std::unique_ptr<SubtractProcessor>(new SubtractProcessor);
});
v3.named_depend("a").to("MultiplyResForA");
v3.named_depend("b").to("MultiplyResForB");
v3.named_emit("c").to("FinalRes");

builder.set_executor(::babylon::anyflow::BthreadGraphExecutor::instance());
builder.finish();
auto graph = builder.build();
auto a = graph->find_data("A");
auto b = graph->find_data("B");
auto final_res = graph->find_data("FinalRes");

*(a->emit<int>()) = input_a;
*(b->emit<int>()) = input_b;
graph->run(final_res);
res_right = *final_res->value<int>();
}
::std::cout << "(A + B) * (A - B) = " << res_left << '\n';
::std::cout << "A * A - B * B = " << res_right << '\n';
return 0;
}
67 changes: 67 additions & 0 deletions example/use-with-bthread/bthread_graph_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include "bthread_graph_executor.h"
#include "butex_interface.h"

#include "babylon/logging/interface.h"

#include <tuple>

BABYLON_NAMESPACE_BEGIN
namespace anyflow {

static void* execute_invoke_vertex(void* args) {
auto param = reinterpret_cast<::std::tuple<GraphVertex*, GraphVertexClosure>*>(args);
auto vertex = ::std::get<0>(*param);
auto& closure = ::std::get<1>(*param);
vertex->run(::std::move(closure));
delete param;
return NULL;
}

static void* execute_invoke_closure(void* args) {
auto param =
reinterpret_cast<::std::tuple<ClosureContext*, Closure::Callback*>*>(args);
auto closure = ::std::get<0>(*param);
auto callback = ::std::get<1>(*param);
closure->run(callback);
delete param;
return NULL;
}

BthreadGraphExecutor& BthreadGraphExecutor::instance() {
static BthreadGraphExecutor executor;
return executor;
}

Closure BthreadGraphExecutor::create_closure() noexcept {
return Closure::create<ButexInterface>(*this);
}

int BthreadGraphExecutor::run(GraphVertex* vertex,
GraphVertexClosure&& closure) noexcept {
bthread_t th;
auto param = new ::std::tuple<GraphVertex*, GraphVertexClosure>(
vertex, ::std::move(closure));
if (0 != bthread_start_background(&th, NULL, execute_invoke_vertex, param)) {
LOG(WARNING) << "start bthread to run vertex failed";
closure = ::std::move(::std::get<1>(*param));
delete param;
return -1;
}
return 0;
}

int BthreadGraphExecutor::run(ClosureContext* closure,
Closure::Callback* callback) noexcept {
bthread_t th;
auto param =
new ::std::tuple<ClosureContext*, Closure::Callback*>(closure, callback);
if (0 != bthread_start_background(&th, NULL, execute_invoke_closure, param)) {
LOG(WARNING) << "start bthread to run closure failed";
delete param;
return -1;
}
return 0;
}

} // namespace anyflow
BABYLON_NAMESPACE_END
21 changes: 21 additions & 0 deletions example/use-with-bthread/bthread_graph_executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "babylon/anyflow/closure.h"
#include "babylon/anyflow/executor.h"
#include "babylon/anyflow/vertex.h"

#include "bthread/bthread.h"

BABYLON_NAMESPACE_BEGIN
namespace anyflow {

class BthreadGraphExecutor : public GraphExecutor {
public:
static BthreadGraphExecutor& instance();
virtual Closure create_closure() noexcept override;
virtual int run(GraphVertex* vertex,
GraphVertexClosure&& closure) noexcept override;
virtual int run(ClosureContext* closure,
Closure::Callback* callback) noexcept override;
};

} // namespace anyflow
BABYLON_NAMESPACE_END
1 change: 1 addition & 0 deletions example/use-with-bthread/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
set -ex

bazel run example
bazel run anyflow_multi_nodes

0 comments on commit 517e2ee

Please sign in to comment.