Skip to content

Commit

Permalink
[improvement](group-commit) insert table should fail when relay wal f…
Browse files Browse the repository at this point in the history
…ail too much time
  • Loading branch information
hust-hhb committed Dec 22, 2023
1 parent 0af6bd6 commit 6b7b4e7
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 5 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,7 @@ DEFINE_String(group_commit_wal_path, "./wal");
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
DEFINE_Int32(group_commit_relay_wal_threads, "10");
DEFINE_Bool(group_commit_disable_when_relay_fail_too_much, "true");

// the count of thread to group commit insert
DEFINE_Int32(group_commit_insert_threads, "10");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,7 @@ DECLARE_String(group_commit_wal_path);
DECLARE_Int32(group_commit_replay_wal_retry_num);
DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
DECLARE_mInt32(group_commit_relay_wal_threads);
DECLARE_Bool(group_commit_disable_when_relay_fail_too_much);

// This config can be set to limit thread number in group commit insert thread pool.
DECLARE_mInt32(group_commit_insert_threads);
Expand Down
12 changes: 12 additions & 0 deletions be/src/olap/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,4 +409,16 @@ Status WalManager::get_wal_column_index(int64_t wal_id, std::vector<size_t>& col
return Status::OK();
}

bool WalManager::is_table_available(int64_t table_id) {
std::shared_lock rdlock(_lock);
auto it = _table_map.find(table_id);
if (it != _table_map.end()) {
return it->second->is_running();
} else {
// if not found table_id in map, which means this table is a normal state table,
// not doing relay wal now.
return true;
}
}

} // namespace doris
1 change: 1 addition & 0 deletions be/src/olap/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class WalManager {
void add_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);
void erase_wal_column_index(int64_t wal_id);
Status get_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);
bool is_table_available(int64_t table_id);

private:
ExecEnv* _exec_env = nullptr;
Expand Down
10 changes: 7 additions & 3 deletions be/src/olap/wal_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ Status WalTable::replay_wals() {
return Status::OK();
}
if (retry_num >= config::group_commit_replay_wal_retry_num) {
if (config::group_commit_disable_when_relay_fail_too_much && !_stop.load()) {
this->_stop.store(true);
}
LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id
<< ", wal=" << wal
<< ", retry_num=" << config::group_commit_replay_wal_retry_num;
std::string rename_path = _get_tmp_path(wal);
LOG(INFO) << "rename wal from " << wal << " to " << rename_path;
std::rename(wal.c_str(), rename_path.c_str());
need_erase_wals.push_back(wal);
continue;
}
Expand Down Expand Up @@ -447,4 +447,8 @@ Status WalTable::_read_wal_header(const std::string& wal_path, std::string& colu
return Status::OK();
}

bool WalTable::is_running() {
return !_stop.load();
}

} // namespace doris
1 change: 1 addition & 0 deletions be/src/olap/wal_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class WalTable {
Status replay_wals();
size_t size();
void stop();
bool is_running();

public:
// <retry_num, start_time_ms, is_doing_replay>
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/sink/group_commit_block_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,15 @@ Status GroupCommitBlockSink::_add_blocks() {
load_id.__set_hi(_load_id.hi);
load_id.__set_lo(_load_id.lo);
if (_load_block_queue == nullptr) {
if (_state->exec_env()->wal_mgr()->is_running()) {
if (_state->exec_env()->wal_mgr()->is_running() &&
_state->exec_env()->wal_mgr()->is_table_available(_table_id)) {
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
_db_id, _table_id, _base_schema_version, load_id, _load_block_queue,
_state->be_exec_version()));
_state->set_import_label(_load_block_queue->label);
_state->set_wal_id(_load_block_queue->txn_id);
} else {
return Status::InternalError("be is stopping");
return Status::InternalError("wal_mgr or wal_table is stop");
}
}
for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
Expand Down

0 comments on commit 6b7b4e7

Please sign in to comment.