Skip to content

Commit

Permalink
Recaftor the code and add pipe_sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafał Hibner committed Feb 7, 2025
1 parent d818bff commit 8d183c8
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 116 deletions.
307 changes: 192 additions & 115 deletions cpp/src/arrow/acero/pipe_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/acero/pipe_node.h"
#include <iostream>
#include "arrow/acero/concurrent_queue_internal.h"
#include "arrow/acero/exec_plan.h"
Expand All @@ -38,49 +39,12 @@ using internal::checked_cast;
using compute::FilterOptions;

namespace acero {
namespace {

class BackpressureController {
public:
BackpressureController(ExecNode* node) : node_(node) {}

void Pause(ExecNode* output, int counter) {
std::lock_guard<std::mutex> lg(mutex_);
if (!paused_[output]) {
paused_[output] = true;
if (0 == paused_count_++) {
node_->PauseProducing(output, ++backpressure_counter_);
}
}
}
void Resume(ExecNode* output, int counter) {
std::lock_guard<std::mutex> lg(mutex_);
if (paused_[output]) {
paused_[output] = false;
if (0 == --paused_count_) {
node_->ResumeProducing(output, ++backpressure_counter_);
}
}
}

// this can be factored out if ForceShutdown of BackpressureConcurrentQueue is extracted
// to some kind of specialization
ExecNode* getInput() { return node_; }

private:
std::unordered_map<ExecNode*, bool> paused_;
ExecNode* node_;
std::mutex mutex_;
std::atomic<int32_t> paused_count_;
std::atomic<int32_t> backpressure_counter_;
};
namespace {

struct PipeSourceNode : public ExecNode, public TracedNode {
struct PipeSourceNode : public PipeSource, public ExecNode {
PipeSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema, std::string pipe_name)
: ExecNode(plan, {}, {}, std::move(schema)),
TracedNode(this),
// ordering_(Ordering::Implicit()),
pipe_name_(pipe_name) {}
: ExecNode(plan, {}, {}, std::move(schema)), pipe_name_(pipe_name) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
Expand All @@ -96,120 +60,143 @@ struct PipeSourceNode : public ExecNode, public TracedNode {
[[noreturn]] Status InputReceived(ExecNode*, ExecBatch) override { NoInputs(); }
[[noreturn]] Status InputFinished(ExecNode*, int) override { NoInputs(); }

// const Ordering& ordering() const override { return ordering_; }

void Initialize(std::shared_ptr<BackpressureController> ctrl) { bp_ctrl_ = ctrl; }

Status ForwardInputReceived(ExecBatch batch) {
Status HandleInputReceived(ExecBatch batch) override {
return output_->InputReceived(this, std::move(batch));
}
Status ForwardInputFinished(int total_batches) {
Status HandleInputFinished(int total_batches) override {
return output_->InputFinished(this, total_batches);
}

Status StartProducing() override { return Status::OK(); }

void PauseProducing(ExecNode* output, int32_t counter) override {
bp_ctrl_->Pause(this, counter);
PipeSource::Pause(counter);
}
void ResumeProducing(ExecNode* output, int32_t counter) override {
bp_ctrl_->Resume(this, counter);
PipeSource::Resume(counter);
}

Status StopProducingImpl() override { return Status::OK(); }

static const char kKindName[];
const char* kind_name() const override { return kKindName; }
std::string pipe_name_;

private:
std::mutex mutex_;
std::atomic<int32_t> backpressure_counter_{0};
std::shared_ptr<BackpressureController> bp_ctrl_;
const std::string pipe_name_;
};

const char PipeSourceNode::kKindName[] = "PipeSourceNode";

class PipeTeeNode : public ExecNode {
class PipeSinkBackpressureControl : public BackpressureControl {
public:
PipeSinkBackpressureControl(ExecNode* node, ExecNode* output)
: node_(node), output_(output) {}

void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }

private:
ExecNode* node_;
ExecNode* output_;
std::atomic<int32_t> backpressure_counter_;
};

class PipeSinkNode : public ExecNode {
public:
PipeTeeNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> schema,
std::shared_ptr<BackpressureController> ctrl, std::string pipe_name)
: ExecNode(plan, std::move(inputs), /*input_labels=*/{pipe_name}, schema),
ctrl_(ctrl),
pipe_name_(std::move(pipe_name)) {}
PipeSinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs, std::string pipe_name)
: ExecNode(plan, inputs, /*input_labels=*/{pipe_name}, {}) {
pipe_ = std::make_shared<Pipe>(
plan, std::move(pipe_name),
std::make_unique<PipeSinkBackpressureControl>(inputs[0], this));
}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "PipeTeeNode"));
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "PipeSinkNode"));
const auto& pipe_tee_options = checked_cast<const PipeSinkNodeOptions&>(options);
auto schema = inputs[0]->output_schema();
auto ctrl = std::make_shared<BackpressureController>(inputs[0]);
return plan->EmplaceNode<PipeTeeNode>(plan, std::move(inputs), std::move(schema),
std::move(ctrl), pipe_tee_options.pipe_name);
return plan->EmplaceNode<PipeSinkNode>(plan, std::move(inputs),
pipe_tee_options.pipe_name);
}
static const char kKindName[];
const char* kind_name() const override { return kKindName; }

Status InputReceived(ExecNode* input, ExecBatch batch) override {
DCHECK_EQ(input, inputs_[0]);

for (auto& output_node : output_nodes_) {
plan_->query_context()->ScheduleTask(
[output_node, batch]() mutable {
return output_node->ForwardInputReceived(batch);
},
"PipeTeeNode::ForwardInputReceived");
}
return output_->InputReceived(this, batch);
return pipe_->InputReceived(batch);
}

Status InputFinished(ExecNode* input, int total_batches) override {
DCHECK_EQ(input, inputs_[0]);
return pipe_->InputFinished(total_batches);
}

for (auto& output_node : output_nodes_) {
plan_->query_context()->ScheduleTask(
[output_node, total_batches]() {
return output_node->ForwardInputFinished(total_batches);
},
"PipeTeeNode::ForwardInputFinished");
}
return output_->InputFinished(this, total_batches);
Status Init() { return pipe_->Init(inputs_[0]->output_schema()); }

Status StartProducing() override { return Status::OK(); }

// sink nodes have no outputs from which to feel backpressure
[[noreturn]] static void NoOutputs() {
Unreachable("no outputs; this should never be called");
}
[[noreturn]] void ResumeProducing(ExecNode* output, int32_t counter) override {
NoOutputs();
}
[[noreturn]] void PauseProducing(ExecNode* output, int32_t counter) override {
NoOutputs();
}

Status Init() {
for (auto node : plan()->nodes()) {
if (node->kind_name() == PipeSourceNode::kKindName) {
if (!inputs_[0]->output_schema()->Equals(node->output_schema())) {
return Status::Invalid("Pipe sechma does not match for " + pipe_name_);
}
PipeSourceNode* pipe_source = checked_cast<PipeSourceNode*>(node);
if (pipe_source->pipe_name_ == pipe_name_) {
pipe_source->Initialize(ctrl_);
output_nodes_.push_back(pipe_source);
// std::cout << std::string(node->kind_name()) + ":! " + node->label()
// << std::endl;
}
// else {
// std::cout << std::string(node->kind_name()) + ":+ " + node->label()
// << std::endl;
// }
}
// else {
// std::cout << std::string(node->kind_name()) + ":- " + node->label() <<
// std::endl;
// }
}
return Status::OK();
protected:
Status StopProducingImpl() override { return Status::OK(); }

std::string ToStringExtra(int indent = 0) const override {
// std::string ret="pipe_tee(";
// bool first=true;
// for(auto &stream:streams_){
// if(!first)
// ret+=",";
// ret+=stream->declaration_.label;
// first=false;
// }
// ret+=")";
return "pipe_tee";
}

Status StartProducing() override { return Status::OK(); }
protected:
std::shared_ptr<Pipe> pipe_;
};

const char PipeSinkNode::kKindName[] = "PipeSinkNode";

class PipeTeeNode : public PipeSource, public PipeSinkNode {
public:
PipeTeeNode(ExecPlan* plan, std::vector<ExecNode*> inputs, std::string pipe_name)
: PipeSinkNode(plan, inputs, pipe_name) {
output_schema_ = inputs[0]->output_schema();
PipeSinkNode::pipe_->addSource(this);
}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "PipeTeeNode"));
const auto& pipe_tee_options = checked_cast<const PipeSinkNodeOptions&>(options);
return plan->EmplaceNode<PipeTeeNode>(plan, std::move(inputs),
pipe_tee_options.pipe_name);
}
static const char kKindName[];
const char* kind_name() const override { return kKindName; }

void PauseProducing(ExecNode* output, int32_t counter) override {
ctrl_->Pause(this, counter);
PipeSource::Pause(counter);
}

void ResumeProducing(ExecNode* output, int32_t counter) override {
ctrl_->Resume(this, counter);
PipeSource::Resume(counter);
}

Status HandleInputReceived(ExecBatch batch) override {
return output_->InputReceived(this, std::move(batch));
}
Status HandleInputFinished(int total_batches) override {
return output_->InputFinished(this, total_batches);
}

protected:
Expand All @@ -227,21 +214,111 @@ class PipeTeeNode : public ExecNode {
// ret+=")";
return "pipe_tee";
}

private:
private:
std::shared_ptr<BackpressureController> ctrl_;
std::string pipe_name_;
std::vector<PipeSourceNode*> output_nodes_;
};

const char PipeTeeNode::kKindName[] = "PipeTeeNode";

} // namespace

PipeSource::PipeSource() {}
void PipeSource::Initialize(Pipe* pipe) { pipe_ = pipe; }

void PipeSource::Pause(int32_t counter) { pipe_->Pause(this, counter); }
void PipeSource::Resume(int32_t counter) { pipe_->Resume(this, counter); }

Pipe::Pipe(ExecPlan* plan, std::string pipe_name,
std::unique_ptr<BackpressureControl> ctrl)
: plan_(plan), pipe_name_(pipe_name), ctrl_(std::move(ctrl)) {}

// Called from pipe_source nodes
void Pipe::Pause(PipeSource* output, int counter) {
std::lock_guard<std::mutex> lg(mutex_);
if (!paused_[output]) {
paused_[output] = true;
if (0 == paused_count_++) {
ctrl_->Pause();
}
}
}

// Called from pipe_source nodes
void Pipe::Resume(PipeSource* output, int counter) {
std::lock_guard<std::mutex> lg(mutex_);
if (paused_[output]) {
paused_[output] = false;
if (0 == --paused_count_) {
ctrl_->Resume();
}
}
}

// Called from pipe_sink
Status Pipe::InputReceived(ExecBatch batch) {
for (auto& source_node : source_nodes_) {
plan_->query_context()->ScheduleTask(
[source_node, batch]() mutable {
return source_node->HandleInputReceived(batch);
},
"Pipe::InputReceived");
}
if (last_source_node_) return last_source_node_->HandleInputReceived(batch);
// No consumers registered;
return Status::OK();
}
// Called from pipe_sink
Status Pipe::InputFinished(int total_batches) {
for (auto& source_node : source_nodes_) {
plan_->query_context()->ScheduleTask(
[source_node, total_batches]() {
return source_node->HandleInputFinished(total_batches);
},
"Pipe::HandleInputFinished");
}
if (last_source_node_) return last_source_node_->HandleInputFinished(total_batches);
// No consumers registered;
return Status::OK();
}

void Pipe::addSource(PipeSource* source) {
source->Initialize(this);
// First added source is handled in receiving task. All additional sources are delivered
// in their own sutmit tasks
if (!last_source_node_)
last_source_node_ = source;
else {
source_nodes_.push_back(source);
}
}

Status Pipe::Init(const std::shared_ptr<Schema> schema) {
for (auto node : plan_->nodes()) {
if (node->kind_name() == PipeSourceNode::kKindName) {
if (!schema->Equals(node->output_schema())) {
return Status::Invalid("Pipe sechma does not match for " + pipe_name_);
}
PipeSourceNode* pipe_source = checked_cast<PipeSourceNode*>(node);
if (pipe_source->pipe_name_ == pipe_name_) {
addSource(pipe_source);
// std::cout << std::string(node->kind_name()) + ":! " + node->label()
// << std::endl;
}
// else {
// std::cout << std::string(node->kind_name()) + ":+ " + node->label()
// << std::endl;
// }
}
// else {
// std::cout << std::string(node->kind_name()) + ":- " + node->label() <<
// std::endl;
// }
}
return Status::OK();
}

namespace internal {
void RegisterPipeNodes(ExecFactoryRegistry* registry) {
DCHECK_OK(registry->AddFactory("pipe_source", PipeSourceNode::Make));
DCHECK_OK(registry->AddFactory("pipe_sink", PipeSinkNode::Make));
DCHECK_OK(registry->AddFactory("pipe_tee", PipeTeeNode::Make));
}

Expand Down
Loading

0 comments on commit 8d183c8

Please sign in to comment.