Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
Doris-Extras committed Dec 24, 2023
1 parent bd90f38 commit 3be320d
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 23 deletions.
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ bool ScanOperator::can_read() {
return false;
}
} else {
// If scanner meet any error, done == true
if (_node->_eos || _node->_scanner_ctx->done()) {
// _eos: need eos
// _scanner_ctx->done(): need finish
Expand Down Expand Up @@ -560,11 +561,11 @@ std::string ScanLocalState<Derived>::debug_string(int indentation_level) const {
fmt::format_to(debug_string_buffer,
", Scanner Context: (_is_finished = {}, _should_stop = {}, "
"_num_running_scanners={}, "
" _num_unfinished_scanners = {}, status = {}, error = {})",
" _num_unfinished_scanners = {}, status = {})",
_scanner_ctx->is_finished(), _scanner_ctx->should_stop(),
_scanner_ctx->get_num_running_scanners(),
_scanner_ctx->get_num_unfinished_scanners(),
_scanner_ctx->status().to_string(), _scanner_ctx->status_error());
_scanner_ctx->status().to_string());
}

return fmt::to_string(debug_string_buffer);
Expand Down
29 changes: 13 additions & 16 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,19 +369,13 @@ void ScannerContext::inc_num_running_scanners(int32_t inc) {
_num_running_scanners += inc;
}

void ScannerContext::dec_num_running_scanners(int32_t scanner_dec) {
std::lock_guard l(_transfer_lock);
_num_running_scanners -= scanner_dec;
}

void ScannerContext::set_status_on_error(const Status& status, bool need_lock) {
std::unique_lock l(_transfer_lock, std::defer_lock);
if (need_lock) {
l.lock();
}
if (this->status().ok()) {
_process_status = status;
_status_error = true;
_blocks_queue_added_cv.notify_one();
_should_stop = true;
_set_scanner_done();
Expand Down Expand Up @@ -475,19 +469,22 @@ void ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDel
// calling "_scanners.push_front(scanner)", there may be other ctx in scheduler
// to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed
// before we call the following if() block.
if (scanner->_scanner->need_to_close()) {
--_num_unfinished_scanners;
if (_num_unfinished_scanners == 0) {
_dispose_coloate_blocks_not_in_queue();
_is_finished = true;
_set_scanner_done();
_blocks_queue_added_cv.notify_one();
return;
{
Defer defer {[&]() { --_num_running_scanners; }};
if (scanner->_scanner->need_to_close()) {
--_num_unfinished_scanners;
if (_num_unfinished_scanners == 0) {
_dispose_coloate_blocks_not_in_queue();
_is_finished = true;
_set_scanner_done();
_blocks_queue_added_cv.notify_one();
return;
}
} else {
_scanners.push_front(scanner);
}
}

_scanners.push_front(scanner);

if (should_be_scheduled()) {
auto submit_status = _scanner_scheduler->submit(shared_from_this());
if (!submit_status.ok()) {
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,9 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext> {
bool done() const { return _is_finished || _should_stop; }
bool is_finished() { return _is_finished.load(); }
bool should_stop() { return _should_stop.load(); }
bool status_error() { return _status_error.load(); }

void inc_num_running_scanners(int32_t scanner_inc);

void dec_num_running_scanners(int32_t scanner_dec);

int get_num_running_scanners() const { return _num_running_scanners; }

int get_num_unfinished_scanners() const { return _num_unfinished_scanners; }
Expand All @@ -125,6 +122,8 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext> {
RuntimeState* state() { return _state; }

void incr_num_ctx_scheduling(int64_t num) { _scanner_ctx_sched_counter->update(num); }

int64_t num_ctx_scheduled() { _scanner_ctx_sched_counter->value(); }
void incr_ctx_scheduling_time(int64_t num) { _scanner_ctx_sched_time->update(num); }

std::string parent_name();
Expand Down Expand Up @@ -215,7 +214,6 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext> {
// Always be set by ScannerScheduler.
// True means all scanners are finished to scan.
Status _process_status;
std::atomic_bool _status_error = false;
std::atomic_bool _should_stop = false;
std::atomic_bool _is_finished = false;

Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx) {
if (ctx->done()) {
return Status::EndOfFile("ScannerContext is done");
}
if (ctx->num_ctx_scheduled() > 100) {
LOG(WARNING) << "yyyy " << Status::InternalError("Too many scheduled");
}
ctx->queue_idx = (_queue_idx++ % QUEUE_NUM);
if (!_pending_queues[ctx->queue_idx]->blocking_put(ctx)) {
return Status::InternalError("failed to submit scanner context to scheduler");
Expand Down Expand Up @@ -268,7 +271,6 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
std::shared_ptr<ScannerContext> ctx,
std::weak_ptr<ScannerDelegate> scanner_ref) {
Defer defer {[&]() { ctx->dec_num_running_scanners(1); }};
auto task_lock = ctx->get_task_execution_context().lock();
if (task_lock == nullptr) {
// LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id)
Expand Down

0 comments on commit 3be320d

Please sign in to comment.