Skip to content

Commit

Permalink
Revert "[improve](move-memtable) cancel load rapidly when stream clos…
Browse files Browse the repository at this point in the history
…e wait (apache#29322)" (apache#29371)

This reverts commit bbf58c5.
  • Loading branch information
sollhui authored and seawinde committed Jan 3, 2024
1 parent 250b261 commit 7cd522a
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 185 deletions.
11 changes: 4 additions & 7 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,19 @@ int LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
LOG(INFO) << "on_closed, load_id=" << _load_id << ", stream_id=" << id;
std::lock_guard<bthread::Mutex> lock(_mutex);
DBUG_EXECUTE_IF("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait", { return; });
_is_closed.store(true);
_close_cv.notify_all();
}

LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use, RuntimeState* state)
: _state(state),
_use_cnt(num_use),
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
: _use_cnt(num_use),
_load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {};

LoadStreamStub::LoadStreamStub(LoadStreamStub& stub, RuntimeState* state)
: _state(state),
_use_cnt(stub._use_cnt.load()),
LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
: _use_cnt(stub._use_cnt.load()),
_load_id(stub._load_id),
_src_id(stub._src_id),
_tablet_schema_for_index(stub._tablet_schema_for_index),
Expand Down
54 changes: 6 additions & 48 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
#include "gutil/ref_counted.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "util/countdown_latch.h"
Expand Down Expand Up @@ -102,29 +101,11 @@ class LoadStreamStub {
Status close_wait(int64_t timeout_ms) {
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
std::unique_lock<bthread::Mutex> lock(_mutex);
int ret = 0;
MonotonicStopWatch watch;
watch.start();
while (true) {
if (_is_closed) {
return Status::OK();
}
if (_stub->is_cancelled()) {
return _stub->cancelled_state();
}
if (_stub->runtime_state()->is_cancelled()) {
return Status::Cancelled(_stub->runtime_state()->cancel_reason());
}
// wait 1s once time.
ret = _close_cv.wait_for(lock, 1);
if (ret == 0) {
return Status::OK();
}
if (watch.elapsed_time() / 1000 / 1000 >= timeout_ms) {
return Status::InternalError("stream close wait timeout, result: {}", ret);
}
if (_is_closed) {
return Status::OK();
}
return Status::OK();
int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream close_wait timeout");
};

std::vector<int64_t> success_tablets() {
Expand Down Expand Up @@ -157,28 +138,10 @@ class LoadStreamStub {

public:
// construct new stub
LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use, RuntimeState* state);
LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);

// copy constructor, shared_ptr members are shared
LoadStreamStub(LoadStreamStub& stub, RuntimeState* state);

void cancel(Status status) {
_cancel = true;
_cancel_status = status;
}

RuntimeState* runtime_state() const { return _state; }

bool is_cancelled() const { return _cancel; }

Status cancelled_state() const { return _cancel_status; }

std::string cancel_reason() const {
if (_state == nullptr) {
return "";
}
return _state->cancel_reason();
}
LoadStreamStub(LoadStreamStub& stub);

// for mock this class in UT
#ifdef BE_TEST
Expand Down Expand Up @@ -268,11 +231,6 @@ class LoadStreamStub {
Status _send_with_retry(butil::IOBuf& buf);

protected:
RuntimeState* _state = nullptr;

bool _cancel = false;
Status _cancel_status;

std::atomic<bool> _is_init;
bthread::Mutex _mutex;

Expand Down
13 changes: 3 additions & 10 deletions be/src/vec/sink/load_stream_stub_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,13 @@ void LoadStreams::release() {
}
}

void LoadStreams::cancel(Status status) {
for (auto& stream : _streams) {
stream->cancel(status);
}
}

LoadStreamStubPool::LoadStreamStubPool() = default;

LoadStreamStubPool::~LoadStreamStubPool() = default;

std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id, int num_streams,
int num_sink, RuntimeState* state) {
int num_sink) {
auto key = std::make_pair(UniqueId(load_id), dst_id);
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<LoadStreams> streams = _pool[key];
Expand All @@ -75,12 +69,11 @@ std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId load_id
}
DCHECK(num_streams > 0) << "stream num should be greater than 0";
DCHECK(num_sink > 0) << "sink num should be greater than 0";
auto [it, _] =
_template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink, state});
auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink});
streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
for (int32_t i = 0; i < num_streams; i++) {
// copy construct, internal tablet schema map will be shared among all stubs
streams->streams().emplace_back(new LoadStreamStub {*it->second, state});
streams->streams().emplace_back(new LoadStreamStub {*it->second});
}
_pool[key] = streams;
return streams;
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/sink/load_stream_stub_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ class LoadStreams {

void release();

void cancel(Status status);

Streams& streams() { return _streams; }

private:
Expand All @@ -97,7 +95,7 @@ class LoadStreamStubPool {
~LoadStreamStubPool();

std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id,
int num_streams, int num_sink, RuntimeState* state);
int num_streams, int num_sink);

void erase(UniqueId load_id, int64_t dst_id);

Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Status VTabletWriterV2::_incremental_open_streams(
}
for (int64_t node_id : new_backends) {
auto load_streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, _backend_id, node_id, _stream_per_node, _num_local_sink, _state);
_load_id, _backend_id, node_id, _stream_per_node, _num_local_sink);
RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
_streams_for_node[node_id] = load_streams;
}
Expand Down Expand Up @@ -261,7 +261,7 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
Status VTabletWriterV2::_open_streams(int64_t src_id) {
for (auto& [dst_id, _] : _tablets_for_node) {
auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, src_id, dst_id, _stream_per_node, _num_local_sink, _state);
_load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
_streams_for_node[dst_id] = streams;
}
Expand Down Expand Up @@ -457,7 +457,6 @@ Status VTabletWriterV2::_cancel(Status status) {
}
for (const auto& [_, streams] : _streams_for_node) {
streams->release();
streams->cancel(status);
}
return Status::OK();
}
Expand Down
6 changes: 2 additions & 4 deletions be/test/io/fs/stream_sink_file_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ static std::atomic<int64_t> g_num_request;
class StreamSinkFileWriterTest : public testing::Test {
class MockStreamStub : public LoadStreamStub {
public:
MockStreamStub(PUniqueId load_id, int64_t src_id, RuntimeState* state)
: LoadStreamStub(load_id, src_id, 1, state) {};
MockStreamStub(PUniqueId load_id, int64_t src_id) : LoadStreamStub(load_id, src_id, 1) {};

virtual ~MockStreamStub() = default;

Expand Down Expand Up @@ -86,9 +85,8 @@ class StreamSinkFileWriterTest : public testing::Test {
virtual void SetUp() {
_load_id.set_hi(LOAD_ID_HI);
_load_id.set_lo(LOAD_ID_LO);
RuntimeState state;
for (int src_id = 0; src_id < NUM_STREAM; src_id++) {
_streams.emplace_back(new MockStreamStub(_load_id, src_id, &state));
_streams.emplace_back(new MockStreamStub(_load_id, src_id));
}
}

Expand Down
7 changes: 3 additions & 4 deletions be/test/vec/exec/load_stream_stub_pool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@ class LoadStreamStubPoolTest : public testing::Test {

TEST_F(LoadStreamStubPoolTest, test) {
LoadStreamStubPool pool;
RuntimeState state;
int64_t src_id = 100;
PUniqueId load_id;
load_id.set_hi(1);
load_id.set_hi(2);
auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1, &state);
auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1, &state);
auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1, &state);
auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1);
auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1);
EXPECT_EQ(2, pool.size());
EXPECT_EQ(1, pool.templates_size());
EXPECT_EQ(streams1, streams3);
Expand Down

This file was deleted.

0 comments on commit 7cd522a

Please sign in to comment.