Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-45434: [C++][Acero] Add pipe_sink, pipe_source and pipe_tee nodes #45435

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/acero/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ set(ARROW_ACERO_SRCS
time_series_util.cc
tpch_node.cc
union_node.cc
pipe_node.cc
util.cc)

append_runtime_avx2_src(ARROW_ACERO_SRCS bloom_filter_avx2.cc)
Expand Down Expand Up @@ -167,6 +168,7 @@ add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc)

add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc)
add_arrow_acero_test(sorted_merge_node_test SOURCES sorted_merge_node_test.cc)
add_arrow_acero_test(pipe_node_test SOURCES pipe_node_test.cc)

add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
add_arrow_acero_test(union_node_test SOURCES union_node_test.cc)
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/acero/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,7 @@ void RegisterSinkNode(ExecFactoryRegistry*);
void RegisterHashJoinNode(ExecFactoryRegistry*);
void RegisterAsofJoinNode(ExecFactoryRegistry*);
void RegisterSortedMergeNode(ExecFactoryRegistry*);
void RegisterPipeNodes(ExecFactoryRegistry* registry);

} // namespace internal

Expand All @@ -1134,6 +1135,7 @@ ExecFactoryRegistry* default_exec_factory_registry() {
internal::RegisterHashJoinNode(this);
internal::RegisterAsofJoinNode(this);
internal::RegisterSortedMergeNode(this);
internal::RegisterPipeNodes(this);
}

Result<Factory> GetFactory(const std::string& factory_name) override {
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <string>
#include <vector>

#include "arrow/acero/exec_plan.h"
#include "arrow/acero/type_fwd.h"
#include "arrow/acero/visibility.h"
#include "arrow/compute/api_aggregate.h"
Expand Down Expand Up @@ -865,6 +866,30 @@ class ARROW_ACERO_EXPORT PivotLongerNodeOptions : public ExecNodeOptions {
std::vector<std::string> measurement_field_names;
};

/// \brief a node which implements experimental node that enables multiple sink exec nodes
///
/// Note, this API is experimental and will change in the future
///
/// This node forwards each exec batch to its output and also provides number of
/// additional source nodes for additional acero pipelines.
class ARROW_ACERO_EXPORT PipeSourceNodeOptions : public ExecNodeOptions {
public:
PipeSourceNodeOptions(std::string pipe_name, std::shared_ptr<Schema> output_schema)
: pipe_name(std::move(pipe_name)), output_schema(std::move(output_schema)) {}

/// \brief List of declarations that will receive duplicated ExecBatches
std::string pipe_name;
std::shared_ptr<Schema> output_schema;
};

class ARROW_ACERO_EXPORT PipeSinkNodeOptions : public ExecNodeOptions {
public:
PipeSinkNodeOptions(std::string pipe_name) : pipe_name(std::move(pipe_name)) {}

/// \brief List of declarations that will receive duplicated ExecBatches
std::string pipe_name;
};

/// @}

} // namespace acero
Expand Down
Loading