diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1241ae39e67dd2..338cffb829ea97 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1120,6 +1120,7 @@ DEFINE_String(group_commit_wal_path, "./wal"); DEFINE_Int32(group_commit_replay_wal_retry_num, "10"); DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5"); DEFINE_Int32(group_commit_relay_wal_threads, "10"); +DEFINE_Bool(group_commit_disable_when_relay_fail_too_much, "true"); // the count of thread to group commit insert DEFINE_Int32(group_commit_insert_threads, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 856e248274524c..e112d7b269423d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1193,6 +1193,7 @@ DECLARE_String(group_commit_wal_path); DECLARE_Int32(group_commit_replay_wal_retry_num); DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds); DECLARE_mInt32(group_commit_relay_wal_threads); +DECLARE_Bool(group_commit_disable_when_relay_fail_too_much); // This config can be set to limit thread number in group commit insert thread pool. DECLARE_mInt32(group_commit_insert_threads); diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp index 238f88ebd00707..a2c7c0e65c9cdf 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal_manager.cpp @@ -409,4 +409,16 @@ Status WalManager::get_wal_column_index(int64_t wal_id, std::vector& col return Status::OK(); } +bool WalManager::is_table_available(int64_t table_id) { + std::shared_lock rdlock(_lock); + auto it = _table_map.find(table_id); + if (it != _table_map.end()) { + return it->second->is_running(); + } else { + // if not found table_id in map, which means this table is a normal state table, + // not doing relay wal now. + return true; + } +} + } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index 83847beabf0b85..442ac72c1fb4cd 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -69,6 +69,7 @@ class WalManager { void add_wal_column_index(int64_t wal_id, std::vector& column_index); void erase_wal_column_index(int64_t wal_id); Status get_wal_column_index(int64_t wal_id, std::vector& column_index); + bool is_table_available(int64_t table_id); private: ExecEnv* _exec_env = nullptr; diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp index bde0e8dd69d65f..b5c2767e2d63c4 100644 --- a/be/src/olap/wal_table.cpp +++ b/be/src/olap/wal_table.cpp @@ -74,12 +74,12 @@ Status WalTable::replay_wals() { return Status::OK(); } if (retry_num >= config::group_commit_replay_wal_retry_num) { + if (config::group_commit_disable_when_relay_fail_too_much && !_stop.load()) { + this->_stop.store(true); + } LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id << ", wal=" << wal << ", retry_num=" << config::group_commit_replay_wal_retry_num; - std::string rename_path = _get_tmp_path(wal); - LOG(INFO) << "rename wal from " << wal << " to " << rename_path; - std::rename(wal.c_str(), rename_path.c_str()); need_erase_wals.push_back(wal); continue; } @@ -447,4 +447,8 @@ Status WalTable::_read_wal_header(const std::string& wal_path, std::string& colu return Status::OK(); } +bool WalTable::is_running() { + return !_stop.load(); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_table.h b/be/src/olap/wal_table.h index 354f4f16b05cc4..6ebc8c399c65b0 100644 --- a/be/src/olap/wal_table.h +++ b/be/src/olap/wal_table.h @@ -36,6 +36,7 @@ class WalTable { Status replay_wals(); size_t size(); void stop(); + bool is_running(); public: // diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index d4ede45868fe5d..ea3c1903fd1d44 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -227,14 +227,15 @@ Status GroupCommitBlockSink::_add_blocks() { load_id.__set_hi(_load_id.hi); load_id.__set_lo(_load_id.lo); if (_load_block_queue == nullptr) { - if (_state->exec_env()->wal_mgr()->is_running()) { + if (_state->exec_env()->wal_mgr()->is_running() && + _state->exec_env()->wal_mgr()->is_table_available(_table_id)) { RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( _db_id, _table_id, _base_schema_version, load_id, _load_block_queue, _state->be_exec_version())); _state->set_import_label(_load_block_queue->label); _state->set_wal_id(_load_block_queue->txn_id); } else { - return Status::InternalError("be is stopping"); + return Status::InternalError("wal_mgr or wal_table is stop"); } } for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {