Skip to content

Commit

Permalink
Revert "[improve](move-memtable) add cancel method to load stream stub (
Browse files Browse the repository at this point in the history
#29994)"

This reverts commit b60a8be.
  • Loading branch information
kaijchen committed Jan 25, 2024
1 parent bd70ee1 commit fd1d1f3
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 49 deletions.
25 changes: 2 additions & 23 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i
watch.start();
while (!_tablet_schema_for_index->contains(index_id) &&
watch.elapsed_time() / 1000 / 1000 < timeout_ms) {
RETURN_IF_ERROR(_check_cancel());
static_cast<void>(wait_for_new_schema(100));
}

Expand All @@ -309,12 +308,8 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {
while (true) {
};
});
if (!_is_init.load()) {
return Status::InternalError("stream {} is not opened, load_id={}", _stream_id,
print_id(_load_id));
}
if (_is_closed.load()) {
return _check_cancel();
if (!_is_init.load() || _is_closed.load()) {
return Status::OK();
}
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
std::unique_lock<bthread::Mutex> lock(_close_mutex);
Expand All @@ -326,7 +321,6 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {
print_id(_load_id), _dst_id, _stream_id);
}
}
RETURN_IF_ERROR(_check_cancel());
if (!_is_eos.load()) {
return Status::InternalError(
"stream closed without eos, load_id={}, dst_id={}, stream_id={}",
Expand All @@ -335,20 +329,6 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {
return Status::OK();
}

void LoadStreamStub::cancel(Status reason) {
LOG(WARNING) << *this << " is cancelled because of " << reason;
{
std::lock_guard<bthread::Mutex> lock(_cancel_mutex);
_cancel_reason = reason;
_is_cancelled.store(true);
}
{
std::lock_guard<bthread::Mutex> lock(_close_mutex);
_is_closed.store(true);
_close_cv.notify_all();
}
}

Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) {
butil::IOBuf buf;
header.set_stream_id(_stream_id);
Expand Down Expand Up @@ -383,7 +363,6 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) {

Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
for (;;) {
RETURN_IF_ERROR(_check_cancel());
int ret;
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
Expand Down
15 changes: 0 additions & 15 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ class LoadStreamStub {
// remote will close stream when it receives CLOSE_LOAD
Status close_wait(int64_t timeout_ms = 0);

// cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
void cancel(Status reason);

Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t timeout_ms = 60000);

Expand Down Expand Up @@ -199,31 +196,19 @@ class LoadStreamStub {
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
Status _send_with_retry(butil::IOBuf& buf);

Status _check_cancel() {
if (!_is_cancelled.load()) {
return Status::OK();
}
std::lock_guard<bthread::Mutex> lock(_cancel_mutex);
return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id),
_cancel_reason.to_string_no_stack());
}

protected:
std::atomic<bool> _is_init;
std::atomic<bool> _is_closed;
std::atomic<bool> _is_cancelled;
std::atomic<bool> _is_eos;
std::atomic<int> _use_cnt;

PUniqueId _load_id;
brpc::StreamId _stream_id;
int64_t _src_id = -1; // source backend_id
int64_t _dst_id = -1; // destination backend_id
Status _cancel_reason;

bthread::Mutex _open_mutex;
bthread::Mutex _close_mutex;
bthread::Mutex _cancel_mutex;
bthread::ConditionVariable _close_cv;

std::mutex _tablets_to_commit_mutex;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/load_stream_stub_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class TExpr;
LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool)
: _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}

void LoadStreams::release() {
void LoadStreams::release(Status status) {
int num_use = --_use_cnt;
DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
if (num_use == 0) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/load_stream_stub_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class LoadStreams {
public:
LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool);

void release();
void release(Status status);

Streams& streams() { return _streams; }

Expand Down Expand Up @@ -116,4 +116,4 @@ class LoadStreamStubPool {
std::unordered_map<std::pair<UniqueId, int64_t>, std::shared_ptr<LoadStreams>> _pool;
};

} // namespace doris
} // namespace doris
7 changes: 2 additions & 5 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,7 @@ Status VTabletWriterV2::_cancel(Status status) {
_delta_writer_for_tablet.reset();
}
for (const auto& [_, streams] : _streams_for_node) {
for (const auto& stream : streams->streams()) {
stream->cancel(status);
}
streams->release();
streams->release(status);
}
return Status::OK();
}
Expand Down Expand Up @@ -530,7 +527,7 @@ Status VTabletWriterV2::close(Status exec_status) {
// defer stream release to prevent memory leak
Defer defer([&] {
for (const auto& [_, streams] : _streams_for_node) {
streams->release();
streams->release(status);
}
_streams_for_node.clear();
});
Expand Down
7 changes: 4 additions & 3 deletions be/test/vec/exec/load_stream_stub_pool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ TEST_F(LoadStreamStubPoolTest, test) {
LoadStreamStubPool pool;
int64_t src_id = 100;
PUniqueId load_id;
Status st = Status::OK();
load_id.set_hi(1);
load_id.set_hi(2);
auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
Expand All @@ -41,9 +42,9 @@ TEST_F(LoadStreamStubPoolTest, test) {
EXPECT_EQ(1, pool.templates_size());
EXPECT_EQ(streams1, streams3);
EXPECT_NE(streams1, streams2);
streams1->release();
streams2->release();
streams3->release();
streams1->release(st);
streams2->release(st);
streams3->release(st);
EXPECT_EQ(0, pool.size());
EXPECT_EQ(0, pool.templates_size());
}
Expand Down

0 comments on commit fd1d1f3

Please sign in to comment.