Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[debug](don't merge) test memtable on sink node dcheck #29120

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,8 +776,6 @@ DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s
// timeout for load stream close wait in ms
DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min

// idle timeout for load stream in ms
DEFINE_mInt64(load_stream_idle_timeout_ms, "600000");
// brpc streaming max_buf_size in bytes
DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
// brpc streaming messages_in_batch
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -829,8 +829,6 @@ DECLARE_Int64(open_load_stream_timeout_ms);
// timeout for load stream close wait in ms
DECLARE_Int64(close_load_stream_timeout_ms);

// idle timeout for load stream in ms
DECLARE_Int64(load_stream_idle_timeout_ms);
// brpc streaming max_buf_size in bytes
DECLARE_Int64(load_stream_max_buf_size);
// brpc streaming messages_in_batch
Expand Down
15 changes: 12 additions & 3 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <runtime/exec_env.h>

#include <memory>
#include <sstream>

#include "common/signal_handler.h"
#include "exec/tablet_info.h"
Expand Down Expand Up @@ -362,7 +363,7 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t
_open_streams.erase(src_id);
}
_close_load_cnt++;
LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
LOG(INFO) << *this << " received CLOSE_LOAD from sender " << src_id << ", remaining "
<< _total_streams - _close_load_cnt << " senders";

_tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
Expand Down Expand Up @@ -504,6 +505,9 @@ int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[]
messages[i]->cutn(&hdr_buf, hdr_len);
_parse_header(&hdr_buf, hdr);

// debug
_remote_stream_id = hdr.stream_id();

// step 2: cut data
size_t data_len = 0;
messages[i]->cutn((void*)&data_len, sizeof(size_t));
Expand Down Expand Up @@ -561,6 +565,7 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
_report_result(id, st, success_tablet_ids, failed_tablets, true);
LOG(INFO) << "closing stream " << *this;
brpc::StreamClose(id);
} break;
case PStreamHeader::GET_SCHEMA: {
Expand All @@ -578,15 +583,19 @@ void LoadStream::on_idle_timeout(StreamId id) {
}

void LoadStream::on_closed(StreamId id) {
// convert to string in advance to prevent use-after-free
std::stringstream self;
self << "stream " << *this;
auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1;
LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams;
LOG(INFO) << self.str() << " on_closed, remaining streams = " << remaining_streams;
if (remaining_streams == 0) {
_load_stream_mgr->clear_load(_load_id);
}
}

inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) {
ostr << "load_id=" << UniqueId(load_stream._load_id) << ", txn_id=" << load_stream._txn_id;
ostr << "load_id=" << UniqueId(load_stream._load_id) << ", txn_id=" << load_stream._txn_id
<< ", remote stream_id=" << load_stream._remote_stream_id;
return ostr;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class LoadStream : public brpc::StreamInputHandler {
RuntimeProfile::Counter* _append_data_timer = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
brpc::StreamId _remote_stream_id = 0;
};

using LoadStreamSharedPtr = std::shared_ptr<LoadStream>;
Expand Down
4 changes: 3 additions & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,9 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
}

stream_options.handler = load_stream.get();
stream_options.idle_timeout_ms = config::load_stream_idle_timeout_ms;
stream_options.idle_timeout_ms = request->idle_timeout_ms();
DBUG_EXECUTE_IF("PInternalServiceImpl.open_load_stream.set_idle_timeout",
{ stream_options.idle_timeout_ms = 1; });

StreamId streamid;
if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) {
Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
bool enable_profile) {
int64_t idle_timeout_ms, bool enable_profile) {
std::unique_lock<bthread::Mutex> lock(_open_mutex);
if (_is_init.load()) {
return Status::OK();
Expand All @@ -160,7 +160,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self,
std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
brpc::StreamOptions opt;
opt.max_buf_size = config::load_stream_max_buf_size;
opt.idle_timeout_ms = config::load_stream_idle_timeout_ms;
opt.idle_timeout_ms = idle_timeout_ms;
opt.messages_in_batch = config::load_stream_messages_in_batch;
opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, self);
brpc::Controller cntl;
Expand All @@ -174,6 +174,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self,
request.set_txn_id(txn_id);
request.set_enable_profile(enable_profile);
request.set_total_streams(total_streams);
request.set_idle_timeout_ms(idle_timeout_ms);
schema.to_protobuf(request.mutable_schema());
for (auto& tablet : tablets_for_schema) {
*request.add_tablets() = tablet;
Expand Down Expand Up @@ -333,6 +334,7 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {

Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) {
butil::IOBuf buf;
header.set_stream_id(_stream_id);
size_t header_len = header.ByteSizeLong();
buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
buf.append(header.SerializeAsString());
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class LoadStreamStub {
BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
bool enable_profile);
int64_t idle_timeout_ms, bool enable_profile);

// for mock this class in UT
#ifdef BE_TEST
Expand Down
11 changes: 8 additions & 3 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,19 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreams& st
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location", dst_id);
}
auto idle_timeout_ms = _state->execution_timeout() * 1000;
// get tablet schema from each backend only in the 1st stream
for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, tablets_for_schema,
_total_streams, _state->enable_profile()));
_total_streams, idle_timeout_ms, _state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, {}, _total_streams,
_state->enable_profile()));
idle_timeout_ms, _state->enable_profile()));
}
return Status::OK();
}
Expand Down Expand Up @@ -536,7 +537,11 @@ Status VTabletWriterV2::close(Status exec_status) {
SCOPED_TIMER(_close_load_timer);
for (const auto& [_, streams] : _streams_for_node) {
for (const auto& stream : streams->streams()) {
RETURN_IF_ERROR(stream->close_wait());
LOG(INFO) << "begin waiting stream close load_id=" << print_id(_load_id)
<< ", stream_id=" << stream->stream_id();
DCHECK(stream->close_wait().ok());
LOG(INFO) << "end waiting stream close load_id=" << print_id(_load_id)
<< ", stream_id=" << stream->stream_id();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion docs/en/docs/advanced/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ Note that the comment must start with /*+ and can only follow the SELECT.
* `enable_memtable_on_sink_node`

<version since="2.1.0">
Whether to enable MemTable on DataSink node when loading data, default is false.
Whether to enable MemTable on DataSink node when loading data, default is true.
</version>

Build MemTable on DataSink node, and send segments to other backends through brpc streaming.
Expand Down
2 changes: 1 addition & 1 deletion docs/zh-CN/docs/advanced/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ try (Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/
* `enable_memtable_on_sink_node`

<version since="2.1.0">
是否在数据导入中启用 MemTable 前移,默认为 false
是否在数据导入中启用 MemTable 前移,默认为 true
</version>

在 DataSink 节点上构建 MemTable,并通过 brpc streaming 发送 segment 到其他 BE。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
public boolean truncateCharOrVarcharColumns = false;

@VariableMgr.VarAttr(name = ENABLE_MEMTABLE_ON_SINK_NODE, needForward = true)
public boolean enableMemtableOnSinkNode = false;
public boolean enableMemtableOnSinkNode = true;

@VariableMgr.VarAttr(name = LOAD_STREAM_PER_NODE)
public int loadStreamPerNode = 20;
Expand Down
2 changes: 2 additions & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ message POpenLoadStreamRequest {
repeated PTabletID tablets = 5;
optional bool enable_profile = 6 [default = false];
optional int64 total_streams = 7;
optional int64 idle_timeout_ms = 8;
}

message PTabletSchemaWithIndex {
Expand Down Expand Up @@ -800,6 +801,7 @@ message PStreamHeader {
repeated PTabletID tablets = 10;
optional TabletSchemaPB flush_schema = 11;
optional uint64 offset = 12;
optional uint64 stream_id = 13;
}

message PGetWalQueueSizeRequest{
Expand Down
4 changes: 4 additions & 0 deletions regression-test/pipeline/p0/conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ user_files_secure_path=/
enable_debug_points=true
# debug scanner context dead loop
enable_debug_log_timeout_secs=300

sys_log_verbose_modules = src/brpc
sys_log_verbose_level = 100
sys_log_verbose_flags_v = 100
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ suite("load_stream_fault_injection", "nonConcurrent") {
}
}

def load_with_injection2 = { injection1, injection2, error_msg->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection1)
GetDebugPoint().enableDebugPointForAllBEs(injection2)
sql "insert into test select * from baseall where k1 <= 3"
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
} finally {
GetDebugPoint().disableDebugPointForAllBEs(injection1)
GetDebugPoint().disableDebugPointForAllBEs(injection2)
}
}

// LoadStreamWriter create file failed
load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", "")
// LoadStreamWriter append_data meet null file writer error
Expand Down Expand Up @@ -161,14 +175,10 @@ suite("load_stream_fault_injection", "nonConcurrent") {
load_with_injection("LoadStream._dispatch.unknown_srcid", "")

// LoadStream meets StreamRPC idle timeout
get_be_param("load_stream_idle_timeout_ms")
set_be_param("load_stream_idle_timeout_ms", 500)
try {
load_with_injection("LoadStreamStub._send_with_retry.delay_before_send", "")
load_with_injection2("LoadStreamStub._send_with_retry.delay_before_send", "PInternalServiceImpl.open_load_stream.set_idle_timeout", "")
} catch(Exception e) {
logger.info(e.getMessage())
} finally {
reset_be_param("load_stream_idle_timeout_ms")
}
}

Loading