Skip to content

Commit

Permalink
Merge branch 'master' into error_code_desc_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Dec 30, 2023
2 parents 7f1635f + 03901b9 commit 3c9f26f
Show file tree
Hide file tree
Showing 24 changed files with 356 additions and 333 deletions.
2 changes: 1 addition & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ ReportWorker::ReportWorker(std::string name, const TMasterInfo& master_info, int
};

auto st = Thread::create("ReportWorker", _name, report_loop, &_thread);
CHECK(st.ok()) << name << ": " << st;
CHECK(st.ok()) << _name << ": " << st;
}

ReportWorker::~ReportWorker() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ DEFINE_Int32(sys_log_roll_num, "10");
DEFINE_Strings(sys_log_verbose_modules, "");
// verbose log level
DEFINE_Int32(sys_log_verbose_level, "10");
// verbose log FLAGS_v
DEFINE_Int32(sys_log_verbose_flags_v, "-1");
// log buffer level
DEFINE_String(log_buffer_level, "");

Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ DECLARE_Int32(sys_log_roll_num);
DECLARE_Strings(sys_log_verbose_modules);
// verbose log level
DECLARE_Int32(sys_log_verbose_level);
// verbose log FLAGS_v
DECLARE_Int32(sys_log_verbose_flags_v);
// log buffer level
DECLARE_String(log_buffer_level);

Expand Down
2 changes: 1 addition & 1 deletion be/src/common/logconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ bool init_glog(const char* basename) {
}

// set verbose modules.
FLAGS_v = -1;
FLAGS_v = config::sys_log_verbose_flags_v;
std::vector<std::string>& verbose_modules = config::sys_log_verbose_modules;
int32_t vlog_level = config::sys_log_verbose_level;
for (size_t i = 0; i < verbose_modules.size(); i++) {
Expand Down
1 change: 0 additions & 1 deletion be/src/http/action/config_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ void ConfigAction::handle_update_config(HttpRequest* req) {
std::string status(s.ok() ? "OK" : "BAD");
rapidjson::Value result;
result.SetObject();
rapidjson::Value(key.c_str(), key.size(), results.GetAllocator());
result.AddMember("config_name",
rapidjson::Value(key.c_str(), key.size(), results.GetAllocator()),
results.GetAllocator());
Expand Down
16 changes: 10 additions & 6 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = _process_put(req, ctx);
ctx->status = process_put(req, ctx);
}
}

Expand All @@ -272,7 +272,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute "
<< "here";
ctx->is_read_schema = false;
ctx->status = _process_put(req, ctx);
ctx->status = process_put(req, ctx);
}
ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
}
Expand All @@ -290,11 +290,15 @@ void HttpStreamAction::free_handler_ctx(std::shared_ptr<void> param) {
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
}

Status HttpStreamAction::_process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx) {
Status HttpStreamAction::process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx) {
TStreamLoadPutRequest request;
set_request_auth(&request, ctx->auth);
request.__set_load_sql(http_req->header(HTTP_SQL));
if (http_req != nullptr) {
request.__set_load_sql(http_req->header(HTTP_SQL));
} else {
request.__set_load_sql(ctx->sql_str);
}
request.__set_loadId(ctx->id.to_thrift());
request.__set_label(ctx->label);
if (ctx->group_commit) {
Expand Down Expand Up @@ -330,7 +334,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/http_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ class HttpStreamAction : public HttpHandler {

void on_chunk_data(HttpRequest* req) override;
void free_handler_ctx(std::shared_ptr<void> ctx) override;
Status process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);

private:
Status _on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
Status _handle(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx);
Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);

Expand Down
5 changes: 1 addition & 4 deletions be/src/io/fs/s3_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ S3FileSystem::S3FileSystem(S3Conf&& s3_conf, std::string&& id)
}
}
_executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(
id.c_str(), config::s3_transfer_executor_pool_size);
_id.c_str(), config::s3_transfer_executor_pool_size);
}

S3FileSystem::~S3FileSystem() = default;
Expand Down Expand Up @@ -211,9 +211,6 @@ Status S3FileSystem::delete_directory_impl(const Path& dir) {
return Status::IOError("fail to delete object: {}",
error_msg(e.GetKey(), e.GetMessage()));
}
VLOG_TRACE << "delete " << objects.size()
<< " s3 objects, endpoint: " << _s3_conf.endpoint
<< ", bucket: " << _s3_conf.bucket << ", prefix: " << _s3_conf.prefix;
}
is_trucated = result.GetIsTruncated();
request.SetContinuationToken(result.GetNextContinuationToken());
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/wal_dirs_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ size_t WalDirsInfo::get_max_available_size() {

Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) {
for (const auto& wal_dir_info : _wal_dirs_info_vec) {
LOG(INFO) << "wal_dir_info:" << wal_dir_info->get_wal_dir();
if (wal_dir_info->get_wal_dir() == wal_dir) {
return wal_dir_info->update_wal_dir_limit(limit);
}
Expand Down
41 changes: 41 additions & 0 deletions be/src/olap/wal_info.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/wal_info.h"
namespace doris {
WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms)
: _wal_id(wal_id),
_wal_path(wal_path),
_retry_num(retry_num),
_start_time_ms(start_time_ms) {}
WalInfo::~WalInfo() {}
int64_t WalInfo::get_wal_id() {
return _wal_id;
}
std::string WalInfo::get_wal_path() {
return _wal_path;
}
int64_t WalInfo::get_retry_num() {
return _retry_num;
}
int64_t WalInfo::get_start_time_ms() {
return _start_time_ms;
}
void WalInfo::add_retry_num() {
_retry_num++;
}
} // namespace doris
38 changes: 38 additions & 0 deletions be/src/olap/wal_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "runtime/exec_env.h"

namespace doris {
class WalInfo {
public:
WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms);
~WalInfo();
int64_t get_wal_id();
int64_t get_retry_num();
int64_t get_start_time_ms();
std::string get_wal_path();
void add_retry_num();

private:
int64_t _wal_id;
std::string _wal_path;
int64_t _retry_num;
int64_t _start_time_ms;
};

} // namespace doris
41 changes: 19 additions & 22 deletions be/src/olap/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ Status WalManager::init() {
RETURN_IF_ERROR(_init_wal_dirs_conf());
RETURN_IF_ERROR(_init_wal_dirs());
RETURN_IF_ERROR(_init_wal_dirs_info());
for (auto wal_dir : _wal_dirs) {
RETURN_IF_ERROR(scan_wals(wal_dir));
}
return Thread::create(
"WalMgr", "replay_wal", [this]() { static_cast<void>(this->replay()); },
&_replay_thread);
Expand Down Expand Up @@ -112,7 +115,7 @@ Status WalManager::_init_wal_dirs_conf() {
Status WalManager::_init_wal_dirs() {
bool exists = false;
for (auto wal_dir : _wal_dirs) {
std::string tmp_dir = wal_dir + "/tmp";
std::string tmp_dir = wal_dir + "/" + tmp;
LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists));
if (!exists) {
Expand All @@ -122,7 +125,6 @@ Status WalManager::_init_wal_dirs() {
if (!exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir));
}
RETURN_IF_ERROR(scan_wals(wal_dir));
}
return Status::OK();
}
Expand Down Expand Up @@ -164,15 +166,15 @@ Status WalManager::_init_wal_dirs_info() {
&_update_wal_dirs_info_thread);
}

void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) {
void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status) {
std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
LOG(INFO) << "add wal queue "
<< ",table_id:" << table_id << ",wal_id:" << wal_id << ",status:" << wal_status;
auto it = _wal_status_queues.find(table_id);
if (it == _wal_status_queues.end()) {
std::unordered_map<int64_t, WAL_STATUS> tmp;
tmp.emplace(wal_id, wal_status);
_wal_status_queues.emplace(table_id, tmp);
std::unordered_map<int64_t, WalStatus> tmp_map;
tmp_map.emplace(wal_id, wal_status);
_wal_status_queues.emplace(table_id, tmp_map);
} else {
it->second.emplace(wal_id, wal_status);
}
Expand Down Expand Up @@ -305,12 +307,12 @@ Status WalManager::scan_wals(const std::string& wal_path) {
LOG(WARNING) << "Failed list files for dir=" << wal_path << ", st=" << st.to_string();
return st;
}
for (const auto& db_id : dbs) {
if (db_id.is_file) {
for (const auto& database_id : dbs) {
if (database_id.is_file || database_id.file_name == tmp) {
continue;
}
std::vector<io::FileInfo> tables;
auto db_path = wal_path + "/" + db_id.file_name;
auto db_path = wal_path + "/" + database_id.file_name;
st = io::global_local_filesystem()->list(db_path, false, &tables, &exists);
if (!st.ok()) {
LOG(WARNING) << "Failed list files for dir=" << db_path << ", st=" << st.to_string();
Expand Down Expand Up @@ -342,20 +344,16 @@ Status WalManager::scan_wals(const std::string& wal_path) {
int64_t wal_id =
std::strtoll(wal.file_name.substr(0, pos).c_str(), NULL, 10);
_wal_path_map.emplace(wal_id, wal_file);
int64_t db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10);
int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10);
add_wal_status_queue(tb_id, wal_id, WalManager::WAL_STATUS::REPLAY);
add_wal_status_queue(tb_id, wal_id, WalManager::WalStatus::REPLAY);
RETURN_IF_ERROR(add_recover_wal(db_id, tb_id, wal_id, wal_file));
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
}
}
st = add_recover_wal(std::stoll(db_id.file_name), std::stoll(table_id.file_name), res);
count += res.size();
if (!st.ok()) {
LOG(WARNING) << "Failed add replay wal, db=" << db_id.file_name
<< ", table=" << table_id.file_name << ", st=" << st.to_string();
return st;
}
}
}
LOG(INFO) << "Finish list all wals, size:" << count;
Expand Down Expand Up @@ -396,7 +394,8 @@ Status WalManager::replay() {
return Status::OK();
}

Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector<std::string> wals) {
Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id,
std::string wal) {
std::lock_guard<std::shared_mutex> wrlock(_lock);
std::shared_ptr<WalTable> table_ptr;
auto it = _table_map.find(table_id);
Expand All @@ -406,12 +405,10 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector<
} else {
table_ptr = it->second;
}
table_ptr->add_wals(wals);
table_ptr->add_wal(wal_id, wal);
#ifndef BE_TEST
for (auto wal : wals) {
RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
}
RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
#endif
return Status::OK();
}
Expand Down
Loading

0 comments on commit 3c9f26f

Please sign in to comment.