Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored and dataroaring committed Dec 31, 2023
1 parent ecb3486 commit a8a53b5
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 6 deletions.
22 changes: 16 additions & 6 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ class LoadStreamStub {
return Status::OK();
}
if (_stub->is_cancelled()) {
return Status::Cancelled(_stub->cancel_reason());
return _stub->cancelled_state();
}
if (_stub->runtime_state()->is_cancelled()) {
return Status::Cancelled(_stub->runtime_state()->cancel_reason());
}
// wait 1s once time.
ret = _close_cv.wait_for(lock, 1);
Expand Down Expand Up @@ -159,13 +162,17 @@ class LoadStreamStub {
// copy constructor, shared_ptr members are shared
LoadStreamStub(LoadStreamStub& stub, RuntimeState* state);

bool is_cancelled() const {
if (_state == nullptr) {
return false;
}
return _state->is_cancelled();
void cancel(Status status) {
_cancel = true;
_cancel_status = status;
}

RuntimeState* runtime_state() const { return _state; }

bool is_cancelled() const { return _cancel; }

Status cancelled_state() const { return _cancel_status; }

std::string cancel_reason() const {
if (_state == nullptr) {
return "";
Expand Down Expand Up @@ -263,6 +270,9 @@ class LoadStreamStub {
protected:
RuntimeState* _state = nullptr;

bool _cancel = false;
Status _cancel_status;

std::atomic<bool> _is_init;
bthread::Mutex _mutex;

Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/sink/load_stream_stub_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ void LoadStreams::release() {
}
}

void LoadStreams::cancel(Status status) {
for (auto& stream : _streams) {
stream->cancel(status);
}
}

LoadStreamStubPool::LoadStreamStubPool() = default;

LoadStreamStubPool::~LoadStreamStubPool() = default;
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/sink/load_stream_stub_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class LoadStreams {

void release();

void cancel(Status status);

Streams& streams() { return _streams; }

private:
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ Status VTabletWriterV2::_cancel(Status status) {
}
for (const auto& [_, streams] : _streams_for_node) {
streams->release();
streams->cancel(status);
}
return Status::OK();
}
Expand Down

0 comments on commit a8a53b5

Please sign in to comment.