diff --git a/cpp/src/arrow/acero/pipe_node.cc b/cpp/src/arrow/acero/pipe_node.cc index c7fe6dc677851..c59c8cdbc3419 100644 --- a/cpp/src/arrow/acero/pipe_node.cc +++ b/cpp/src/arrow/acero/pipe_node.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include "arrow/acero/pipe_node.h" #include #include "arrow/acero/concurrent_queue_internal.h" #include "arrow/acero/exec_plan.h" @@ -38,49 +39,12 @@ using internal::checked_cast; using compute::FilterOptions; namespace acero { -namespace { - -class BackpressureController { - public: - BackpressureController(ExecNode* node) : node_(node) {} - - void Pause(ExecNode* output, int counter) { - std::lock_guard lg(mutex_); - if (!paused_[output]) { - paused_[output] = true; - if (0 == paused_count_++) { - node_->PauseProducing(output, ++backpressure_counter_); - } - } - } - void Resume(ExecNode* output, int counter) { - std::lock_guard lg(mutex_); - if (paused_[output]) { - paused_[output] = false; - if (0 == --paused_count_) { - node_->ResumeProducing(output, ++backpressure_counter_); - } - } - } - // this can be factored out if ForceShutdown of BackpressureConcurrentQueue is extracted - // to some kind of specialization - ExecNode* getInput() { return node_; } - - private: - std::unordered_map paused_; - ExecNode* node_; - std::mutex mutex_; - std::atomic paused_count_; - std::atomic backpressure_counter_; -}; +namespace { -struct PipeSourceNode : public ExecNode, public TracedNode { +struct PipeSourceNode : public PipeSource, public ExecNode { PipeSourceNode(ExecPlan* plan, std::shared_ptr schema, std::string pipe_name) - : ExecNode(plan, {}, {}, std::move(schema)), - TracedNode(this), - // ordering_(Ordering::Implicit()), - pipe_name_(pipe_name) {} + : ExecNode(plan, {}, {}, std::move(schema)), pipe_name_(pipe_name) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -96,120 +60,143 @@ struct PipeSourceNode : public ExecNode, public TracedNode { [[noreturn]] Status InputReceived(ExecNode*, ExecBatch) override { NoInputs(); } [[noreturn]] Status InputFinished(ExecNode*, int) override { NoInputs(); } - // const Ordering& ordering() const override { return ordering_; } - - void Initialize(std::shared_ptr ctrl) { bp_ctrl_ = ctrl; } - - Status ForwardInputReceived(ExecBatch batch) { + Status HandleInputReceived(ExecBatch batch) override { return output_->InputReceived(this, std::move(batch)); } - Status ForwardInputFinished(int total_batches) { + Status HandleInputFinished(int total_batches) override { return output_->InputFinished(this, total_batches); } Status StartProducing() override { return Status::OK(); } void PauseProducing(ExecNode* output, int32_t counter) override { - bp_ctrl_->Pause(this, counter); + PipeSource::Pause(counter); } void ResumeProducing(ExecNode* output, int32_t counter) override { - bp_ctrl_->Resume(this, counter); + PipeSource::Resume(counter); } Status StopProducingImpl() override { return Status::OK(); } static const char kKindName[]; const char* kind_name() const override { return kKindName; } - std::string pipe_name_; - private: - std::mutex mutex_; - std::atomic backpressure_counter_{0}; - std::shared_ptr bp_ctrl_; + const std::string pipe_name_; }; + const char PipeSourceNode::kKindName[] = "PipeSourceNode"; -class PipeTeeNode : public ExecNode { +class PipeSinkBackpressureControl : public BackpressureControl { + public: + PipeSinkBackpressureControl(ExecNode* node, ExecNode* output) + : node_(node), output_(output) {} + + void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); } + void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); } + + private: + ExecNode* node_; + ExecNode* output_; + std::atomic backpressure_counter_; +}; + +class PipeSinkNode : public ExecNode { public: - PipeTeeNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr schema, - std::shared_ptr ctrl, std::string pipe_name) - : ExecNode(plan, std::move(inputs), /*input_labels=*/{pipe_name}, schema), - ctrl_(ctrl), - pipe_name_(std::move(pipe_name)) {} + PipeSinkNode(ExecPlan* plan, std::vector inputs, std::string pipe_name) + : ExecNode(plan, inputs, /*input_labels=*/{pipe_name}, {}) { + pipe_ = std::make_shared( + plan, std::move(pipe_name), + std::make_unique(inputs[0], this)); + } static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { - RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "PipeTeeNode")); + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "PipeSinkNode")); const auto& pipe_tee_options = checked_cast(options); - auto schema = inputs[0]->output_schema(); - auto ctrl = std::make_shared(inputs[0]); - return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), - std::move(ctrl), pipe_tee_options.pipe_name); + return plan->EmplaceNode(plan, std::move(inputs), + pipe_tee_options.pipe_name); } static const char kKindName[]; const char* kind_name() const override { return kKindName; } Status InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); - - for (auto& output_node : output_nodes_) { - plan_->query_context()->ScheduleTask( - [output_node, batch]() mutable { - return output_node->ForwardInputReceived(batch); - }, - "PipeTeeNode::ForwardInputReceived"); - } - return output_->InputReceived(this, batch); + return pipe_->InputReceived(batch); } Status InputFinished(ExecNode* input, int total_batches) override { DCHECK_EQ(input, inputs_[0]); + return pipe_->InputFinished(total_batches); + } - for (auto& output_node : output_nodes_) { - plan_->query_context()->ScheduleTask( - [output_node, total_batches]() { - return output_node->ForwardInputFinished(total_batches); - }, - "PipeTeeNode::ForwardInputFinished"); - } - return output_->InputFinished(this, total_batches); + Status Init() { return pipe_->Init(inputs_[0]->output_schema()); } + + Status StartProducing() override { return Status::OK(); } + + // sink nodes have no outputs from which to feel backpressure + [[noreturn]] static void NoOutputs() { + Unreachable("no outputs; this should never be called"); + } + [[noreturn]] void ResumeProducing(ExecNode* output, int32_t counter) override { + NoOutputs(); + } + [[noreturn]] void PauseProducing(ExecNode* output, int32_t counter) override { + NoOutputs(); } - Status Init() { - for (auto node : plan()->nodes()) { - if (node->kind_name() == PipeSourceNode::kKindName) { - if (!inputs_[0]->output_schema()->Equals(node->output_schema())) { - return Status::Invalid("Pipe sechma does not match for " + pipe_name_); - } - PipeSourceNode* pipe_source = checked_cast(node); - if (pipe_source->pipe_name_ == pipe_name_) { - pipe_source->Initialize(ctrl_); - output_nodes_.push_back(pipe_source); - // std::cout << std::string(node->kind_name()) + ":! " + node->label() - // << std::endl; - } - // else { - // std::cout << std::string(node->kind_name()) + ":+ " + node->label() - // << std::endl; - // } - } - // else { - // std::cout << std::string(node->kind_name()) + ":- " + node->label() << - // std::endl; - // } - } - return Status::OK(); + protected: + Status StopProducingImpl() override { return Status::OK(); } + + std::string ToStringExtra(int indent = 0) const override { + // std::string ret="pipe_tee("; + // bool first=true; + // for(auto &stream:streams_){ + // if(!first) + // ret+=","; + // ret+=stream->declaration_.label; + // first=false; + // } + // ret+=")"; + return "pipe_tee"; } - Status StartProducing() override { return Status::OK(); } + protected: + std::shared_ptr pipe_; +}; + +const char PipeSinkNode::kKindName[] = "PipeSinkNode"; + +class PipeTeeNode : public PipeSource, public PipeSinkNode { + public: + PipeTeeNode(ExecPlan* plan, std::vector inputs, std::string pipe_name) + : PipeSinkNode(plan, inputs, pipe_name) { + output_schema_ = inputs[0]->output_schema(); + PipeSinkNode::pipe_->addSource(this); + } + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "PipeTeeNode")); + const auto& pipe_tee_options = checked_cast(options); + return plan->EmplaceNode(plan, std::move(inputs), + pipe_tee_options.pipe_name); + } + static const char kKindName[]; + const char* kind_name() const override { return kKindName; } void PauseProducing(ExecNode* output, int32_t counter) override { - ctrl_->Pause(this, counter); + PipeSource::Pause(counter); } void ResumeProducing(ExecNode* output, int32_t counter) override { - ctrl_->Resume(this, counter); + PipeSource::Resume(counter); + } + + Status HandleInputReceived(ExecBatch batch) override { + return output_->InputReceived(this, std::move(batch)); + } + Status HandleInputFinished(int total_batches) override { + return output_->InputFinished(this, total_batches); } protected: @@ -227,21 +214,111 @@ class PipeTeeNode : public ExecNode { // ret+=")"; return "pipe_tee"; } - - private: - private: - std::shared_ptr ctrl_; - std::string pipe_name_; - std::vector output_nodes_; }; const char PipeTeeNode::kKindName[] = "PipeTeeNode"; } // namespace +PipeSource::PipeSource() {} +void PipeSource::Initialize(Pipe* pipe) { pipe_ = pipe; } + +void PipeSource::Pause(int32_t counter) { pipe_->Pause(this, counter); } +void PipeSource::Resume(int32_t counter) { pipe_->Resume(this, counter); } + +Pipe::Pipe(ExecPlan* plan, std::string pipe_name, + std::unique_ptr ctrl) + : plan_(plan), pipe_name_(pipe_name), ctrl_(std::move(ctrl)) {} + +// Called from pipe_source nodes +void Pipe::Pause(PipeSource* output, int counter) { + std::lock_guard lg(mutex_); + if (!paused_[output]) { + paused_[output] = true; + if (0 == paused_count_++) { + ctrl_->Pause(); + } + } +} + +// Called from pipe_source nodes +void Pipe::Resume(PipeSource* output, int counter) { + std::lock_guard lg(mutex_); + if (paused_[output]) { + paused_[output] = false; + if (0 == --paused_count_) { + ctrl_->Resume(); + } + } +} + +// Called from pipe_sink +Status Pipe::InputReceived(ExecBatch batch) { + for (auto& source_node : source_nodes_) { + plan_->query_context()->ScheduleTask( + [source_node, batch]() mutable { + return source_node->HandleInputReceived(batch); + }, + "Pipe::InputReceived"); + } + if (last_source_node_) return last_source_node_->HandleInputReceived(batch); + // No consumers registered; + return Status::OK(); +} +// Called from pipe_sink +Status Pipe::InputFinished(int total_batches) { + for (auto& source_node : source_nodes_) { + plan_->query_context()->ScheduleTask( + [source_node, total_batches]() { + return source_node->HandleInputFinished(total_batches); + }, + "Pipe::HandleInputFinished"); + } + if (last_source_node_) return last_source_node_->HandleInputFinished(total_batches); + // No consumers registered; + return Status::OK(); +} + +void Pipe::addSource(PipeSource* source) { + source->Initialize(this); + // First added source is handled in receiving task. All additional sources are delivered + // in their own sutmit tasks + if (!last_source_node_) + last_source_node_ = source; + else { + source_nodes_.push_back(source); + } +} + +Status Pipe::Init(const std::shared_ptr schema) { + for (auto node : plan_->nodes()) { + if (node->kind_name() == PipeSourceNode::kKindName) { + if (!schema->Equals(node->output_schema())) { + return Status::Invalid("Pipe sechma does not match for " + pipe_name_); + } + PipeSourceNode* pipe_source = checked_cast(node); + if (pipe_source->pipe_name_ == pipe_name_) { + addSource(pipe_source); + // std::cout << std::string(node->kind_name()) + ":! " + node->label() + // << std::endl; + } + // else { + // std::cout << std::string(node->kind_name()) + ":+ " + node->label() + // << std::endl; + // } + } + // else { + // std::cout << std::string(node->kind_name()) + ":- " + node->label() << + // std::endl; + // } + } + return Status::OK(); +} + namespace internal { void RegisterPipeNodes(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("pipe_source", PipeSourceNode::Make)); + DCHECK_OK(registry->AddFactory("pipe_sink", PipeSinkNode::Make)); DCHECK_OK(registry->AddFactory("pipe_tee", PipeTeeNode::Make)); } diff --git a/cpp/src/arrow/acero/pipe_node.h b/cpp/src/arrow/acero/pipe_node.h new file mode 100644 index 0000000000000..ccfb2aa4c1c28 --- /dev/null +++ b/cpp/src/arrow/acero/pipe_node.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include +#include +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" +#include "arrow/acero/visibility.h" + +namespace arrow { + +using internal::checked_cast; + +using compute::FilterOptions; + +namespace acero { +class Pipe; +class PipeSource { + public: + PipeSource(); + virtual ~PipeSource() {} + void Pause(int32_t counter); + void Resume(int32_t counter); + + private: + friend class Pipe; + void Initialize(Pipe* pipe); + + virtual Status HandleInputReceived(ExecBatch batch) = 0; + virtual Status HandleInputFinished(int total_batches) = 0; + + Pipe* pipe_; +}; + +class ARROW_ACERO_EXPORT Pipe { + public: + Pipe(ExecPlan* plan, std::string pipe_name, std::unique_ptr ctrl); + + // Called from pipe_source nodes + void Pause(PipeSource* output, int counter); + + // Called from pipe_source nodes + void Resume(PipeSource* output, int counter); + + // Called from pipe_sink + Status InputReceived(ExecBatch batch); + // Called from pipe_sink + Status InputFinished(int total_batches); + + void addSource(PipeSource* source); + + // Called from pipe_sink Init + Status Init(const std::shared_ptr schema); + + private: + // pipe + ExecPlan* plan_; + std::string pipe_name_; + std::vector source_nodes_; + PipeSource* last_source_node_{nullptr}; + // backpressure + std::unordered_map paused_; + std::mutex mutex_; + std::atomic paused_count_; + std::unique_ptr ctrl_; +}; + +} // namespace acero +} // namespace arrow diff --git a/cpp/src/arrow/acero/pipe_node_test.cc b/cpp/src/arrow/acero/pipe_node_test.cc index c7886e9dd6a04..604df84bcfecf 100644 --- a/cpp/src/arrow/acero/pipe_node_test.cc +++ b/cpp/src/arrow/acero/pipe_node_test.cc @@ -224,7 +224,7 @@ TEST(ExecPlanExecution, PipeBackpressure) { { {"pipe_source", PipeSourceNodeOptions{"named_pipe_1", schema_}}, //{"filter", FilterNodeOptions{equal(field_ref("data"), - //literal("6"))}}, + // literal("6"))}}, {"filter", FilterNodeOptions{equal(field_ref("data"), literal(6))}}, {"sink", SinkNodeOptions{&dup1Sink.sink_gen, /*schema=*/nullptr,