diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index c9640214220f97..6c0a90babee4f2 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -294,7 +294,6 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i watch.start(); while (!_tablet_schema_for_index->contains(index_id) && watch.elapsed_time() / 1000 / 1000 < timeout_ms) { - RETURN_IF_ERROR(_check_cancel()); static_cast(wait_for_new_schema(100)); } @@ -309,12 +308,8 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) { while (true) { }; }); - if (!_is_init.load()) { - return Status::InternalError("stream {} is not opened, load_id={}", _stream_id, - print_id(_load_id)); - } - if (_is_closed.load()) { - return _check_cancel(); + if (!_is_init.load() || _is_closed.load()) { + return Status::OK(); } DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0"; std::unique_lock lock(_close_mutex); @@ -326,7 +321,6 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) { print_id(_load_id), _dst_id, _stream_id); } } - RETURN_IF_ERROR(_check_cancel()); if (!_is_eos.load()) { return Status::InternalError( "stream closed without eos, load_id={}, dst_id={}, stream_id={}", @@ -335,20 +329,6 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) { return Status::OK(); } -void LoadStreamStub::cancel(Status reason) { - LOG(WARNING) << *this << " is cancelled because of " << reason; - { - std::lock_guard lock(_cancel_mutex); - _cancel_reason = reason; - _is_cancelled.store(true); - } - { - std::lock_guard lock(_close_mutex); - _is_closed.store(true); - _close_cv.notify_all(); - } -} - Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span data) { butil::IOBuf buf; header.set_stream_id(_stream_id); @@ -383,7 +363,6 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) { Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { for (;;) { - RETURN_IF_ERROR(_check_cancel()); int ret; { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 3bc6331dc02c61..b5bbfc28a82b8a 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -152,9 +152,6 @@ class LoadStreamStub { // remote will close stream when it receives CLOSE_LOAD Status close_wait(int64_t timeout_ms = 0); - // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled - void cancel(Status reason); - Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t timeout_ms = 60000); @@ -199,19 +196,9 @@ class LoadStreamStub { Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); Status _send_with_retry(butil::IOBuf& buf); - Status _check_cancel() { - if (!_is_cancelled.load()) { - return Status::OK(); - } - std::lock_guard lock(_cancel_mutex); - return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id), - _cancel_reason.to_string_no_stack()); - } - protected: std::atomic _is_init; std::atomic _is_closed; - std::atomic _is_cancelled; std::atomic _is_eos; std::atomic _use_cnt; @@ -219,11 +206,9 @@ class LoadStreamStub { brpc::StreamId _stream_id; int64_t _src_id = -1; // source backend_id int64_t _dst_id = -1; // destination backend_id - Status _cancel_reason; bthread::Mutex _open_mutex; bthread::Mutex _close_mutex; - bthread::Mutex _cancel_mutex; bthread::ConditionVariable _close_cv; std::mutex _tablets_to_commit_mutex; diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index d76402b57d5020..1baa903f2eeba8 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -26,7 +26,7 @@ class TExpr; LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool) : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {} -void LoadStreams::release() { +void LoadStreams::release(Status status) { int num_use = --_use_cnt; DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; }); if (num_use == 0) { diff --git a/be/src/vec/sink/load_stream_stub_pool.h b/be/src/vec/sink/load_stream_stub_pool.h index 662fc5bc1a143d..b34383b25f9e2d 100644 --- a/be/src/vec/sink/load_stream_stub_pool.h +++ b/be/src/vec/sink/load_stream_stub_pool.h @@ -76,7 +76,7 @@ class LoadStreams { public: LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool); - void release(); + void release(Status status); Streams& streams() { return _streams; } @@ -116,4 +116,4 @@ class LoadStreamStubPool { std::unordered_map, std::shared_ptr> _pool; }; -} // namespace doris +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 4d9693b851dcba..0e1075551c9a22 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -470,10 +470,7 @@ Status VTabletWriterV2::_cancel(Status status) { _delta_writer_for_tablet.reset(); } for (const auto& [_, streams] : _streams_for_node) { - for (const auto& stream : streams->streams()) { - stream->cancel(status); - } - streams->release(); + streams->release(status); } return Status::OK(); } @@ -530,7 +527,7 @@ Status VTabletWriterV2::close(Status exec_status) { // defer stream release to prevent memory leak Defer defer([&] { for (const auto& [_, streams] : _streams_for_node) { - streams->release(); + streams->release(status); } _streams_for_node.clear(); }); diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp b/be/test/vec/exec/load_stream_stub_pool_test.cpp index 24da3bb6999f36..bea5443b4ff7fd 100644 --- a/be/test/vec/exec/load_stream_stub_pool_test.cpp +++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp @@ -32,6 +32,7 @@ TEST_F(LoadStreamStubPoolTest, test) { LoadStreamStubPool pool; int64_t src_id = 100; PUniqueId load_id; + Status st = Status::OK(); load_id.set_hi(1); load_id.set_hi(2); auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1); @@ -41,9 +42,9 @@ TEST_F(LoadStreamStubPoolTest, test) { EXPECT_EQ(1, pool.templates_size()); EXPECT_EQ(streams1, streams3); EXPECT_NE(streams1, streams2); - streams1->release(); - streams2->release(); - streams3->release(); + streams1->release(st); + streams2->release(st); + streams3->release(st); EXPECT_EQ(0, pool.size()); EXPECT_EQ(0, pool.templates_size()); }